| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- # mypy: disallow-untyped-defs
- import functools
- import logging
- import os
- import re
- import subprocess
- import time
- from threading import Lock
- from timeit import default_timer as timer
- from typing import Any, List, Optional, Sequence
- logger = logging.getLogger("strobelight_function_profiler")
- console_handler = logging.StreamHandler()
- formatter = logging.Formatter(
- "%(name)s, line %(lineno)d, %(asctime)s, %(levelname)s: %(message)s"
- )
- console_handler.setFormatter(formatter)
- logger.addHandler(console_handler)
- logger.setLevel(logging.INFO)
- logger.propagate = False
- class StrobelightCLIProfilerError(Exception):
- """
- Raised when an error happens during strobelight profiling
- """
- def _pid_namespace_link(pid: Optional[int] = None) -> str:
- """Returns the link to the process's namespace, example: pid:[4026531836]"""
- PID_NAMESPACE_PATH = "/proc/{}/ns/pid"
- pid = pid or os.getpid()
- return os.readlink(PID_NAMESPACE_PATH.format(pid))
- def _pid_namespace(pid: Optional[int] = None) -> int:
- """Returns the process's namespace id"""
- pid = pid or os.getpid()
- link = _pid_namespace_link(pid)
- return int(link[link.find("[") + 1 : -1])
- def _command_to_string(command: Sequence[str]) -> str:
- return " ".join(command)
- class StrobelightCLIFunctionProfiler:
- """
- Note: this is a Meta only tool.
- StrobelightCLIFunctionProfiler can be used to profile a python function and
- generate a strobelight link with the results. It works on meta servers but
- does not requries an fbcode target.
- When stop_at_error is false(default), error during profiling does not prevent
- the work function from running.
- Check function_profiler_example.py for an example.
- """
- # This lock is used to make sure only one thread is running the profiler at any point.
- _lock = Lock()
- def __init__(
- self,
- *,
- stop_at_error: bool = False,
- max_profile_duration_sec: int = 60 * 10,
- sample_each: float = 1e7, # sample each sample_each cycles.
- run_user_name: str = "pytorch-strobelight-ondemand",
- timeout_wait_for_running_sec: int = 60,
- timeout_wait_for_finished_sec: int = 60,
- recorded_env_variables: Optional[List[str]] = None,
- sample_tags: Optional[List[str]] = None,
- stack_max_len: int = 127,
- async_stack_max_len: int = 127,
- ):
- self.stop_at_error = stop_at_error
- self.max_profile_duration_sec = max_profile_duration_sec
- self.sample_each = sample_each
- self.run_user_name = run_user_name
- self.timeout_wait_for_running_sec = timeout_wait_for_running_sec
- self.timeout_wait_for_finished_sec = timeout_wait_for_finished_sec
- # Results of the most recent run.
- # Tracks the strobelight run id of the most recent run
- self.current_run_id: Optional[int] = None
- self.profile_result: Optional[List[str]] = None
- self.sample_tags = sample_tags
- def _run_async(self) -> None:
- processId = os.getpid()
- namespace = _pid_namespace(processId)
- command = [
- "strobeclient",
- "run",
- "--profiler",
- "pyperf",
- "--event",
- "cycles",
- "--async",
- "--sample-interval",
- f"{int(self.sample_each)}",
- "--duration-ms",
- f"{int(self.max_profile_duration_sec * 1000)}",
- "--pid",
- f"{namespace}:{processId}",
- ]
- if self.sample_tags:
- command.append("--sample-tags")
- command.append(",".join(self.sample_tags))
- logger.debug("running command: %s", _command_to_string(command))
- result = subprocess.run(command, capture_output=True)
- output = result.stderr.decode("utf-8")
- logger.debug("output:\n{%s}", output)
- if result.returncode != 0:
- raise StrobelightCLIProfilerError(
- f"failed to start strobelight profiling, error in run_async:{output}"
- )
- if match := re.search(r"INFO Run Id: (-?\d+)", output):
- self.current_run_id = int(match.group(1))
- return
- raise StrobelightCLIProfilerError(
- f"failed to start strobelight profiling, unexpected result {output}"
- )
- def _wait_for_running(self, counter: int = 0) -> None:
- if counter > 20:
- raise StrobelightCLIProfilerError(
- "wait_for_running called more than 20 times"
- )
- command = ["strobeclient", "getRunStatus", "--run-id", f"{self.current_run_id}"]
- logger.debug("running command: %s", _command_to_string(command))
- result = subprocess.run(command, capture_output=True)
- output = result.stderr.decode("utf-8")
- logger.debug("output:\n{%s}", output)
- if result.returncode != 0:
- raise StrobelightCLIProfilerError(
- f"failed to start strobelight profiling, error in wait_for_running:{output}"
- )
- if match := re.search("Profile run status: (.*)", output):
- current_status = match.group(1)
- if current_status == "RUNNING":
- return
- elif current_status == "PREPARING":
- time.sleep(10)
- self._wait_for_running(counter + 1)
- return
- else:
- raise StrobelightCLIProfilerError(f"unexpected {current_status} phase")
- raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")
- def _stop_run(self) -> None:
- command = ["strobeclient", "stopRun", "--run-id", str(self.current_run_id)]
- logger.debug("running command: %s", _command_to_string(command))
- result = subprocess.run(command, capture_output=True)
- output = result.stderr.decode("utf-8")
- logger.debug("output:\n{%s}", output)
- if result.returncode != 0:
- raise StrobelightCLIProfilerError(
- f"failed to stop strobelight profiling, return code is not 0 :{output}"
- )
- if match := re.search("INFO ::1:(.*)", output):
- current_status = match.group(1)
- if current_status.__contains__("Success!"):
- return
- else:
- raise StrobelightCLIProfilerError(
- f"failed to stop strobelight profiling, got {current_status} result"
- )
- raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")
- def _get_results(self) -> None:
- command = ["strobeclient", "getRunStatus", "--run-id", str(self.current_run_id)]
- logger.debug("running command: %s", _command_to_string(command))
- result = subprocess.run(command, capture_output=True)
- output = result.stderr.decode("utf-8")
- logger.debug("output:\n{%s}", output)
- if result.returncode != 0:
- raise StrobelightCLIProfilerError(
- f"failed to extract profiling results, return code is not 0 : {output}"
- )
- if match := re.search("INFO ::1:(.*)", output):
- current_status = match.group(1)
- if current_status.__contains__("Profile run status: PROCESSING"):
- time.sleep(10)
- self._get_results()
- return
- elif not current_status.__contains__("Profile run finished with SUCCESS"):
- raise StrobelightCLIProfilerError(
- f"failed to extract profiling results, unexpected response {output}"
- )
- self.profile_result = []
- for item in re.findall(
- r"(Total samples(.*)|GraphProfiler(.*)|Icicle view \(python stack\)(.*))",
- output,
- ):
- self.profile_result += item[0]
- logger.info(item[0])
- def _stop_strobelight_no_throw(
- self,
- collect_results: bool,
- ) -> None:
- try:
- # call stop run
- self._stop_run()
- logger.info("strobelight profiling stopped")
- logger.debug("collection stopped")
- if not collect_results:
- return
- self._get_results()
- except Exception as error:
- logger.warning("error during stop_strobelight", exc_info=True)
- # Return true if strobelight started and is running. Never throw.
- def _start_strobelight(self) -> bool:
- strobelight_started = False
- try:
- self._run_async()
- strobelight_started = True
- logger.info("strobelight run id is: %s", self.current_run_id)
- self._wait_for_running()
- logger.info("strobelight profiling running")
- return True
- except Exception as error:
- logger.warning("error during start_strobelight:", exc_info=True)
- if strobelight_started:
- self._stop_strobelight_no_throw(collect_results=False)
- return False
- def profile(self, work_function: Any, *args: Any, **kwargs: Any) -> Any:
- self.current_run_id = None
- self.profile_result = None
- if locked := StrobelightCLIFunctionProfiler._lock.acquire(False):
- if not locked:
- if self.stop_at_error:
- raise StrobelightCLIProfilerError("concurrent runs not supported")
- logger.warning("concurrent runs not supported")
- return work_function(*args, **kwargs)
- started = self._start_strobelight()
- if not started:
- if self.stop_at_error:
- StrobelightCLIFunctionProfiler._lock.release()
- raise StrobelightCLIProfilerError(
- "failed to start strobelight profiling"
- )
- result = work_function(*args, **kwargs)
- StrobelightCLIFunctionProfiler._lock.release()
- return result
- try:
- logger.debug("collection started")
- start = timer()
- result = work_function(*args, **kwargs)
- end = timer()
- total_time = end - start # Time in seconds, e.g. 5.38091952400282
- logger.info("work function took %s seconds", total_time)
- self._stop_strobelight_no_throw(collect_results=True)
- StrobelightCLIFunctionProfiler._lock.release()
- return result
- except Exception as error:
- logger.warning("work function throw exception", exc_info=True)
- self._stop_strobelight_no_throw(collect_results=False)
- StrobelightCLIFunctionProfiler._lock.release()
- raise error
- # A function decorator that wraps profile, if no profiler is provided one with
- # default args is created. A function can be annotated as:
- # @strobelight()
- # @strobelight(profiler = StrobelightFunctionProfiler(stop_at_error=True,..))
- # @strobelight(stop_at_error=True,...)
- def strobelight(
- profiler: Optional[StrobelightCLIFunctionProfiler] = None, **kwargs: Any
- ) -> Any:
- if not profiler:
- profiler = StrobelightCLIFunctionProfiler(**kwargs)
- def strobelight_inner(work_function: Any) -> Any:
- @functools.wraps(work_function)
- def wrapper_function(*args: Any, **kwargs: Any) -> Any:
- return profiler.profile(work_function, *args, **kwargs)
- return wrapper_function
- return strobelight_inner
|