cli_function_profiler.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. # mypy: disallow-untyped-defs
  2. import functools
  3. import logging
  4. import os
  5. import re
  6. import subprocess
  7. import time
  8. from threading import Lock
  9. from timeit import default_timer as timer
  10. from typing import Any, List, Optional, Sequence
  11. logger = logging.getLogger("strobelight_function_profiler")
  12. console_handler = logging.StreamHandler()
  13. formatter = logging.Formatter(
  14. "%(name)s, line %(lineno)d, %(asctime)s, %(levelname)s: %(message)s"
  15. )
  16. console_handler.setFormatter(formatter)
  17. logger.addHandler(console_handler)
  18. logger.setLevel(logging.INFO)
  19. logger.propagate = False
  20. class StrobelightCLIProfilerError(Exception):
  21. """
  22. Raised when an error happens during strobelight profiling
  23. """
  24. def _pid_namespace_link(pid: Optional[int] = None) -> str:
  25. """Returns the link to the process's namespace, example: pid:[4026531836]"""
  26. PID_NAMESPACE_PATH = "/proc/{}/ns/pid"
  27. pid = pid or os.getpid()
  28. return os.readlink(PID_NAMESPACE_PATH.format(pid))
  29. def _pid_namespace(pid: Optional[int] = None) -> int:
  30. """Returns the process's namespace id"""
  31. pid = pid or os.getpid()
  32. link = _pid_namespace_link(pid)
  33. return int(link[link.find("[") + 1 : -1])
  34. def _command_to_string(command: Sequence[str]) -> str:
  35. return " ".join(command)
  36. class StrobelightCLIFunctionProfiler:
  37. """
  38. Note: this is a Meta only tool.
  39. StrobelightCLIFunctionProfiler can be used to profile a python function and
  40. generate a strobelight link with the results. It works on meta servers but
  41. does not requries an fbcode target.
  42. When stop_at_error is false(default), error during profiling does not prevent
  43. the work function from running.
  44. Check function_profiler_example.py for an example.
  45. """
  46. # This lock is used to make sure only one thread is running the profiler at any point.
  47. _lock = Lock()
  48. def __init__(
  49. self,
  50. *,
  51. stop_at_error: bool = False,
  52. max_profile_duration_sec: int = 60 * 10,
  53. sample_each: float = 1e7, # sample each sample_each cycles.
  54. run_user_name: str = "pytorch-strobelight-ondemand",
  55. timeout_wait_for_running_sec: int = 60,
  56. timeout_wait_for_finished_sec: int = 60,
  57. recorded_env_variables: Optional[List[str]] = None,
  58. sample_tags: Optional[List[str]] = None,
  59. stack_max_len: int = 127,
  60. async_stack_max_len: int = 127,
  61. ):
  62. self.stop_at_error = stop_at_error
  63. self.max_profile_duration_sec = max_profile_duration_sec
  64. self.sample_each = sample_each
  65. self.run_user_name = run_user_name
  66. self.timeout_wait_for_running_sec = timeout_wait_for_running_sec
  67. self.timeout_wait_for_finished_sec = timeout_wait_for_finished_sec
  68. # Results of the most recent run.
  69. # Tracks the strobelight run id of the most recent run
  70. self.current_run_id: Optional[int] = None
  71. self.profile_result: Optional[List[str]] = None
  72. self.sample_tags = sample_tags
  73. def _run_async(self) -> None:
  74. processId = os.getpid()
  75. namespace = _pid_namespace(processId)
  76. command = [
  77. "strobeclient",
  78. "run",
  79. "--profiler",
  80. "pyperf",
  81. "--event",
  82. "cycles",
  83. "--async",
  84. "--sample-interval",
  85. f"{int(self.sample_each)}",
  86. "--duration-ms",
  87. f"{int(self.max_profile_duration_sec * 1000)}",
  88. "--pid",
  89. f"{namespace}:{processId}",
  90. ]
  91. if self.sample_tags:
  92. command.append("--sample-tags")
  93. command.append(",".join(self.sample_tags))
  94. logger.debug("running command: %s", _command_to_string(command))
  95. result = subprocess.run(command, capture_output=True)
  96. output = result.stderr.decode("utf-8")
  97. logger.debug("output:\n{%s}", output)
  98. if result.returncode != 0:
  99. raise StrobelightCLIProfilerError(
  100. f"failed to start strobelight profiling, error in run_async:{output}"
  101. )
  102. if match := re.search(r"INFO Run Id: (-?\d+)", output):
  103. self.current_run_id = int(match.group(1))
  104. return
  105. raise StrobelightCLIProfilerError(
  106. f"failed to start strobelight profiling, unexpected result {output}"
  107. )
  108. def _wait_for_running(self, counter: int = 0) -> None:
  109. if counter > 20:
  110. raise StrobelightCLIProfilerError(
  111. "wait_for_running called more than 20 times"
  112. )
  113. command = ["strobeclient", "getRunStatus", "--run-id", f"{self.current_run_id}"]
  114. logger.debug("running command: %s", _command_to_string(command))
  115. result = subprocess.run(command, capture_output=True)
  116. output = result.stderr.decode("utf-8")
  117. logger.debug("output:\n{%s}", output)
  118. if result.returncode != 0:
  119. raise StrobelightCLIProfilerError(
  120. f"failed to start strobelight profiling, error in wait_for_running:{output}"
  121. )
  122. if match := re.search("Profile run status: (.*)", output):
  123. current_status = match.group(1)
  124. if current_status == "RUNNING":
  125. return
  126. elif current_status == "PREPARING":
  127. time.sleep(10)
  128. self._wait_for_running(counter + 1)
  129. return
  130. else:
  131. raise StrobelightCLIProfilerError(f"unexpected {current_status} phase")
  132. raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")
  133. def _stop_run(self) -> None:
  134. command = ["strobeclient", "stopRun", "--run-id", str(self.current_run_id)]
  135. logger.debug("running command: %s", _command_to_string(command))
  136. result = subprocess.run(command, capture_output=True)
  137. output = result.stderr.decode("utf-8")
  138. logger.debug("output:\n{%s}", output)
  139. if result.returncode != 0:
  140. raise StrobelightCLIProfilerError(
  141. f"failed to stop strobelight profiling, return code is not 0 :{output}"
  142. )
  143. if match := re.search("INFO ::1:(.*)", output):
  144. current_status = match.group(1)
  145. if current_status.__contains__("Success!"):
  146. return
  147. else:
  148. raise StrobelightCLIProfilerError(
  149. f"failed to stop strobelight profiling, got {current_status} result"
  150. )
  151. raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")
  152. def _get_results(self) -> None:
  153. command = ["strobeclient", "getRunStatus", "--run-id", str(self.current_run_id)]
  154. logger.debug("running command: %s", _command_to_string(command))
  155. result = subprocess.run(command, capture_output=True)
  156. output = result.stderr.decode("utf-8")
  157. logger.debug("output:\n{%s}", output)
  158. if result.returncode != 0:
  159. raise StrobelightCLIProfilerError(
  160. f"failed to extract profiling results, return code is not 0 : {output}"
  161. )
  162. if match := re.search("INFO ::1:(.*)", output):
  163. current_status = match.group(1)
  164. if current_status.__contains__("Profile run status: PROCESSING"):
  165. time.sleep(10)
  166. self._get_results()
  167. return
  168. elif not current_status.__contains__("Profile run finished with SUCCESS"):
  169. raise StrobelightCLIProfilerError(
  170. f"failed to extract profiling results, unexpected response {output}"
  171. )
  172. self.profile_result = []
  173. for item in re.findall(
  174. r"(Total samples(.*)|GraphProfiler(.*)|Icicle view \(python stack\)(.*))",
  175. output,
  176. ):
  177. self.profile_result += item[0]
  178. logger.info(item[0])
  179. def _stop_strobelight_no_throw(
  180. self,
  181. collect_results: bool,
  182. ) -> None:
  183. try:
  184. # call stop run
  185. self._stop_run()
  186. logger.info("strobelight profiling stopped")
  187. logger.debug("collection stopped")
  188. if not collect_results:
  189. return
  190. self._get_results()
  191. except Exception as error:
  192. logger.warning("error during stop_strobelight", exc_info=True)
  193. # Return true if strobelight started and is running. Never throw.
  194. def _start_strobelight(self) -> bool:
  195. strobelight_started = False
  196. try:
  197. self._run_async()
  198. strobelight_started = True
  199. logger.info("strobelight run id is: %s", self.current_run_id)
  200. self._wait_for_running()
  201. logger.info("strobelight profiling running")
  202. return True
  203. except Exception as error:
  204. logger.warning("error during start_strobelight:", exc_info=True)
  205. if strobelight_started:
  206. self._stop_strobelight_no_throw(collect_results=False)
  207. return False
  208. def profile(self, work_function: Any, *args: Any, **kwargs: Any) -> Any:
  209. self.current_run_id = None
  210. self.profile_result = None
  211. if locked := StrobelightCLIFunctionProfiler._lock.acquire(False):
  212. if not locked:
  213. if self.stop_at_error:
  214. raise StrobelightCLIProfilerError("concurrent runs not supported")
  215. logger.warning("concurrent runs not supported")
  216. return work_function(*args, **kwargs)
  217. started = self._start_strobelight()
  218. if not started:
  219. if self.stop_at_error:
  220. StrobelightCLIFunctionProfiler._lock.release()
  221. raise StrobelightCLIProfilerError(
  222. "failed to start strobelight profiling"
  223. )
  224. result = work_function(*args, **kwargs)
  225. StrobelightCLIFunctionProfiler._lock.release()
  226. return result
  227. try:
  228. logger.debug("collection started")
  229. start = timer()
  230. result = work_function(*args, **kwargs)
  231. end = timer()
  232. total_time = end - start # Time in seconds, e.g. 5.38091952400282
  233. logger.info("work function took %s seconds", total_time)
  234. self._stop_strobelight_no_throw(collect_results=True)
  235. StrobelightCLIFunctionProfiler._lock.release()
  236. return result
  237. except Exception as error:
  238. logger.warning("work function throw exception", exc_info=True)
  239. self._stop_strobelight_no_throw(collect_results=False)
  240. StrobelightCLIFunctionProfiler._lock.release()
  241. raise error
  242. # A function decorator that wraps profile, if no profiler is provided one with
  243. # default args is created. A function can be annotated as:
  244. # @strobelight()
  245. # @strobelight(profiler = StrobelightFunctionProfiler(stop_at_error=True,..))
  246. # @strobelight(stop_at_error=True,...)
  247. def strobelight(
  248. profiler: Optional[StrobelightCLIFunctionProfiler] = None, **kwargs: Any
  249. ) -> Any:
  250. if not profiler:
  251. profiler = StrobelightCLIFunctionProfiler(**kwargs)
  252. def strobelight_inner(work_function: Any) -> Any:
  253. @functools.wraps(work_function)
  254. def wrapper_function(*args: Any, **kwargs: Any) -> Any:
  255. return profiler.profile(work_function, *args, **kwargs)
  256. return wrapper_function
  257. return strobelight_inner