| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893 |
- # mypy: allow-untyped-defs
- import gzip
- import json
- import os
- import shutil
- import tempfile
- from abc import ABC, abstractmethod
- from enum import Enum
- from functools import partial
- from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
- from typing_extensions import Self
- from warnings import warn
- import torch
- import torch.autograd.profiler as prof
- from torch._C import _get_privateuse1_backend_name
- from torch._C._profiler import (
- _add_execution_trace_observer,
- _disable_execution_trace_observer,
- _enable_execution_trace_observer,
- _ExperimentalConfig,
- _remove_execution_trace_observer,
- )
- from torch.autograd import kineto_available, ProfilerActivity
- from torch.profiler._memory_profiler import MemoryProfile, MemoryProfileTimeline
- __all__ = [
- "supported_activities",
- "ProfilerAction",
- "schedule",
- "tensorboard_trace_handler",
- "profile",
- "ExecutionTraceObserver",
- ]
- PROFILER_STEP_NAME = "ProfilerStep"
- def supported_activities():
- """
- Returns a set of supported profiler tracing activities.
- Note: profiler uses CUPTI library to trace on-device CUDA kernels.
- In case when CUDA is enabled but CUPTI is not available, passing
- ``ProfilerActivity.CUDA`` to profiler results in using the legacy CUDA
- profiling code (same as in the legacy ``torch.autograd.profiler``).
- This, in turn, results in including CUDA time in the profiler table output,
- but not in the JSON trace.
- """
- return torch.autograd._supported_activities()
- class _ITraceObserver(ABC):
- """Abstract interface for a Trace observer.
- This satisfies 3 methods: start, stop and cleanup"""
- @abstractmethod
- def start(self):
- pass
- @abstractmethod
- def stop(self):
- pass
- @abstractmethod
- def cleanup(self):
- pass
- class _KinetoProfile:
- """Low-level profiler wrap the autograd profile
- Args:
- activities (iterable): list of activity groups (CPU, CUDA) to use in profiling, supported values:
- ``torch.profiler.ProfilerActivity.CPU``, ``torch.profiler.ProfilerActivity.CUDA``,
- ``torch.profiler.ProfilerActivity.XPU``.
- Default value: ProfilerActivity.CPU and (when available) ProfilerActivity.CUDA
- or (when available) ProfilerActivity.XPU.
- record_shapes (bool): save information about operator's input shapes.
- profile_memory (bool): track tensor memory allocation/deallocation (see ``export_memory_timeline``
- for more details).
- with_stack (bool): record source information (file and line number) for the ops.
- with_flops (bool): use formula to estimate the FLOPS of specific operators
- (matrix multiplication and 2D convolution).
- with_modules (bool): record module hierarchy (including function names)
- corresponding to the callstack of the op. e.g. If module A's forward call's
- module B's forward which contains an aten::add op,
- then aten::add's module hierarchy is A.B
- Note that this support exist, at the moment, only for TorchScript models
- and not eager mode models.
- experimental_config (_ExperimentalConfig) : A set of experimental options
- used by profiler libraries like Kineto. Note, backward compatibility is not guaranteed.
- execution_trace_observer (ExecutionTraceObserver) : A PyTorch Execution Trace Observer object.
- `PyTorch Execution Traces <https://arxiv.org/pdf/2305.14516.pdf>`__ offer a graph based
- representation of AI/ML workloads and enable replay benchmarks, simulators, and emulators.
- When this argument is included the observer start() and stop() will be called for the
- same time window as PyTorch profiler.
- .. note::
- This API is experimental and subject to change in the future.
- Enabling shape and stack tracing results in additional overhead.
- When record_shapes=True is specified, profiler will temporarily hold references to the tensors;
- that may further prevent certain optimizations that depend on the reference count and introduce
- extra tensor copies.
- """
- def __init__(
- self,
- *,
- activities: Optional[Iterable[ProfilerActivity]] = None,
- record_shapes: bool = False,
- profile_memory: bool = False,
- with_stack: bool = False,
- with_flops: bool = False,
- with_modules: bool = False,
- experimental_config: Optional[_ExperimentalConfig] = None,
- execution_trace_observer: Optional[_ITraceObserver] = None,
- ):
- self.activities = set(activities) if activities else supported_activities()
- self.record_shapes = record_shapes
- self.with_flops = with_flops
- self.profile_memory = profile_memory
- self.with_stack = with_stack
- self.with_modules = with_modules
- self.experimental_config = experimental_config
- self.execution_trace_observer = execution_trace_observer
- self.profiler: Optional[prof.profile] = None
- self.mem_tl: Optional[MemoryProfileTimeline] = None
- self.use_device = None
- if ProfilerActivity.CUDA in self.activities:
- self.use_device = "cuda"
- elif ProfilerActivity.XPU in self.activities:
- self.use_device = "xpu"
- elif ProfilerActivity.PrivateUse1 in self.activities:
- self.use_device = _get_privateuse1_backend_name()
- # user-defined metadata to be amended to the trace
- self.preset_metadata: Dict[str, str] = dict()
- def start(self):
- self.prepare_trace()
- self.start_trace()
- def stop(self):
- self.stop_trace()
- def prepare_trace(self):
- if self.profiler is None:
- self.profiler = prof.profile(
- use_cpu=(ProfilerActivity.CPU in self.activities),
- use_mtia=(ProfilerActivity.MTIA in self.activities),
- use_device=self.use_device,
- record_shapes=self.record_shapes,
- with_flops=self.with_flops,
- profile_memory=self.profile_memory,
- with_stack=self.with_stack,
- with_modules=self.with_modules,
- use_kineto=True,
- experimental_config=self.experimental_config,
- )
- self.profiler._prepare_trace()
- def start_trace(self):
- if self.execution_trace_observer:
- self.execution_trace_observer.start()
- assert self.profiler is not None
- self.profiler._start_trace()
- if self.profile_memory:
- self.add_metadata_json("profile_memory", "1")
- if self.with_stack:
- self.add_metadata_json("with_stack", "1")
- if self.record_shapes:
- self.add_metadata_json("record_shapes", "1")
- if self.with_modules:
- self.add_metadata_json("with_modules", "1")
- if self.with_flops:
- self.add_metadata_json("with_flops", "1")
- if kineto_available():
- dist_info = self._get_distributed_info()
- if dist_info:
- self.add_metadata_json("distributedInfo", json.dumps(dist_info))
- if hasattr(torch, "_inductor"):
- import torch._inductor.config as inductor_config
- if inductor_config.triton.cudagraphs:
- os.environ["DISABLE_CUPTI_LAZY_REINIT"] = "1"
- self.add_metadata_json("DISABLE_CUPTI_LAZY_REINIT", "1")
- # FIXME: CUDA Graph does not work well with CUPTI teardown.
- # 1) crashes on 1st lazy CUPTI re-init after teardown (CUDA 11)
- # 2) crashes on 2nd non-lazy CUPTI re-init after teardown (CUDA 12)
- # Workaround: turn off CUPTI teardown when using CUDA Graphs.
- os.environ["TEARDOWN_CUPTI"] = "0"
- # Insert the preset user metadata to the trace
- for k, v in self.preset_metadata.items():
- self.add_metadata_json(k, v)
- def stop_trace(self):
- if self.execution_trace_observer:
- self.execution_trace_observer.stop()
- assert self.profiler is not None
- self.profiler.__exit__(None, None, None)
- def export_chrome_trace(self, path: str):
- """
- Exports the collected trace in Chrome JSON format. If kineto is enabled, only
- last cycle in schedule is exported.
- """
- assert self.profiler
- if path.endswith(".gz"):
- fp = tempfile.NamedTemporaryFile("w+t", suffix=".json", delete=False)
- fp.close()
- retvalue = self.profiler.export_chrome_trace(fp.name)
- with open(fp.name) as fin:
- with gzip.open(path, "wt") as fout:
- fout.writelines(fin)
- os.remove(fp.name)
- return retvalue
- else:
- return self.profiler.export_chrome_trace(path)
- def export_stacks(self, path: str, metric: str = "self_cpu_time_total"):
- """Save stack traces to a file
- Args:
- path (str): save stacks file to this location;
- metric (str): metric to use: "self_cpu_time_total" or "self_cuda_time_total"
- """
- assert self.profiler
- return self.profiler.export_stacks(path, metric)
- def key_averages(
- self, group_by_input_shape: bool = False, group_by_stack_n: int = 0
- ):
- """Averages events, grouping them by operator name and (optionally) input shapes and
- stack.
- .. note::
- To use shape/stack functionality make sure to set record_shapes/with_stack
- when creating profiler context manager.
- """
- assert self.profiler
- return self.profiler.key_averages(group_by_input_shape, group_by_stack_n)
- def events(self):
- """
- Returns the list of unaggregated profiler events,
- to be used in the trace callback or after the profiling is finished
- """
- assert self.profiler
- return self.profiler.function_events
- def add_metadata(self, key: str, value: str):
- """
- Adds a user defined metadata with a string key and a string value
- into the trace file
- """
- wrapped_value = '"' + value.replace('"', '\\"') + '"'
- torch.autograd._add_metadata_json(key, wrapped_value)
- def add_metadata_json(self, key: str, value: str):
- """
- Adds a user defined metadata with a string key and a valid json value
- into the trace file
- """
- torch.autograd._add_metadata_json(key, value)
- def preset_metadata_json(self, key: str, value: str):
- """
- Preset a user defined metadata when the profiler is not started
- and added into the trace file later.
- Metadata is in the format of a string key and a valid json value
- """
- self.preset_metadata[key] = value
- def _get_distributed_info(self):
- import torch.distributed as dist
- if not dist.is_available() or not dist.is_initialized():
- return None
- backend = dist.get_backend()
- dist_info = {
- "backend": backend,
- "rank": dist.get_rank(),
- "world_size": dist.get_world_size(),
- "pg_count": dist.get_pg_count(),
- "pg_config": dist.distributed_c10d._get_all_pg_configs(),
- }
- if backend == "nccl":
- nccl_version = torch.cuda.nccl.version()
- dist_info["nccl_version"] = ".".join(str(v) for v in nccl_version)
- return dist_info
- def _memory_profile(self) -> MemoryProfile:
- required = ("record_shapes", "profile_memory", "with_stack")
- missing = [f"{i}=True" for i in required if not getattr(self, i)]
- if missing:
- raise ValueError(f"{', '.join(missing)} required for memory profiling.")
- assert self.profiler is not None and self.profiler.kineto_results is not None
- return MemoryProfile(self.profiler.kineto_results)
- def export_memory_timeline(self, path: str, device: Optional[str] = None) -> None:
- """Export memory event information from the profiler collected
- tree for a given device, and export a timeline plot. There are 3
- exportable files using ``export_memory_timeline``, each controlled by the
- ``path``'s suffix.
- - For an HTML compatible plot, use the suffix ``.html``, and a memory timeline
- plot will be embedded as a PNG file in the HTML file.
- - For plot points consisting of ``[times, [sizes by category]]``, where
- ``times`` are timestamps and ``sizes`` are memory usage for each category.
- The memory timeline plot will be saved a JSON (``.json``) or gzipped JSON
- (``.json.gz``) depending on the suffix.
- - For raw memory points, use the suffix ``.raw.json.gz``. Each raw memory
- event will consist of ``(timestamp, action, numbytes, category)``, where
- ``action`` is one of ``[PREEXISTING, CREATE, INCREMENT_VERSION, DESTROY]``,
- and ``category`` is one of the enums from
- ``torch.profiler._memory_profiler.Category``.
- Output: Memory timeline written as gzipped JSON, JSON, or HTML.
- """
- # Default to device 0, if unset. Fallback on cpu.
- if device is None and self.use_device and self.use_device != "cuda":
- device = self.use_device + ":0"
- if device is None:
- device = "cuda:0" if torch.cuda.is_available() else "cpu"
- # Construct the memory timeline plot data
- self.mem_tl = MemoryProfileTimeline(self._memory_profile())
- # Depending on the file suffix, save the data as json.gz or json.
- # For html, we can embed the image into an HTML file.
- if path.endswith(".html"):
- self.mem_tl.export_memory_timeline_html(path, device)
- elif path.endswith(".gz"):
- fp = tempfile.NamedTemporaryFile("w+t", suffix=".json", delete=False)
- fp.close()
- if path.endswith("raw.json.gz"):
- self.mem_tl.export_memory_timeline_raw(fp.name, device)
- else:
- self.mem_tl.export_memory_timeline(fp.name, device)
- with open(fp.name) as fin:
- with gzip.open(path, "wt") as fout:
- fout.writelines(fin)
- os.remove(fp.name)
- else:
- self.mem_tl.export_memory_timeline(path, device)
- class ProfilerAction(Enum):
- """
- Profiler actions that can be taken at the specified intervals
- """
- NONE = 0
- WARMUP = 1
- RECORD = 2
- RECORD_AND_SAVE = 3
- def schedule(
- *, wait: int, warmup: int, active: int, repeat: int = 0, skip_first: int = 0
- ) -> Callable:
- """
- Returns a callable that can be used as profiler ``schedule`` argument. The profiler will skip
- the first ``skip_first`` steps, then wait for ``wait`` steps, then do the warmup for the next ``warmup`` steps,
- then do the active recording for the next ``active`` steps and then repeat the cycle starting with ``wait`` steps.
- The optional number of cycles is specified with the ``repeat`` parameter, the zero value means that
- the cycles will continue until the profiling is finished.
- """
- def schedule_fn(step: int) -> ProfilerAction:
- assert step >= 0
- if step < skip_first:
- return ProfilerAction.NONE
- else:
- step -= skip_first
- num_steps = wait + warmup + active
- if repeat > 0 and step / num_steps >= repeat:
- return ProfilerAction.NONE
- mod_step = step % num_steps
- if mod_step < wait:
- return ProfilerAction.NONE
- elif mod_step < wait + warmup:
- return ProfilerAction.WARMUP
- else:
- return (
- ProfilerAction.RECORD
- if mod_step < num_steps - 1
- else ProfilerAction.RECORD_AND_SAVE
- )
- assert (
- wait >= 0 and warmup >= 0 and active > 0 and repeat >= 0 and skip_first >= 0
- ), "Invalid profiler schedule arguments"
- if warmup == 0:
- warn("Profiler won't be using warmup, this can skew profiler results")
- return schedule_fn
- def _default_schedule_fn(_: int) -> ProfilerAction:
- """
- Default profiler behavior - immediately starts recording the events,
- keeps doing it on every profiler step.
- """
- return ProfilerAction.RECORD
- def tensorboard_trace_handler(
- dir_name: str, worker_name: Optional[str] = None, use_gzip: bool = False
- ):
- """
- Outputs tracing files to directory of ``dir_name``, then that directory can be
- directly delivered to tensorboard as logdir.
- ``worker_name`` should be unique for each worker in distributed scenario,
- it will be set to '[hostname]_[pid]' by default.
- """
- import os
- import socket
- import time
- def handler_fn(prof) -> None:
- nonlocal worker_name
- if not os.path.isdir(dir_name):
- try:
- os.makedirs(dir_name, exist_ok=True)
- except Exception as e:
- raise RuntimeError("Can't create directory: " + dir_name) from e
- if not worker_name:
- worker_name = f"{socket.gethostname()}_{os.getpid()}"
- # Use nanosecond here to avoid naming clash when exporting the trace
- file_name = f"{worker_name}.{time.time_ns()}.pt.trace.json"
- if use_gzip:
- file_name = file_name + ".gz"
- prof.export_chrome_trace(os.path.join(dir_name, file_name))
- return handler_fn
- class profile(_KinetoProfile):
- """Profiler context manager.
- Args:
- activities (iterable): list of activity groups (CPU, CUDA) to use in profiling, supported values:
- ``torch.profiler.ProfilerActivity.CPU``, ``torch.profiler.ProfilerActivity.CUDA``,
- ``torch.profiler.ProfilerActivity.XPU``.
- Default value: ProfilerActivity.CPU and (when available) ProfilerActivity.CUDA
- or (when available) ProfilerActivity.XPU.
- schedule (Callable): callable that takes step (int) as a single parameter and returns
- ``ProfilerAction`` value that specifies the profiler action to perform at each step.
- on_trace_ready (Callable): callable that is called at each step when ``schedule``
- returns ``ProfilerAction.RECORD_AND_SAVE`` during the profiling.
- record_shapes (bool): save information about operator's input shapes.
- profile_memory (bool): track tensor memory allocation/deallocation.
- with_stack (bool): record source information (file and line number) for the ops.
- with_flops (bool): use formula to estimate the FLOPs (floating point operations) of specific operators
- (matrix multiplication and 2D convolution).
- with_modules (bool): record module hierarchy (including function names)
- corresponding to the callstack of the op. e.g. If module A's forward call's
- module B's forward which contains an aten::add op,
- then aten::add's module hierarchy is A.B
- Note that this support exist, at the moment, only for TorchScript models
- and not eager mode models.
- experimental_config (_ExperimentalConfig) : A set of experimental options
- used for Kineto library features. Note, backward compatibility is not guaranteed.
- execution_trace_observer (ExecutionTraceObserver) : A PyTorch Execution Trace Observer object.
- `PyTorch Execution Traces <https://arxiv.org/pdf/2305.14516.pdf>`__ offer a graph based
- representation of AI/ML workloads and enable replay benchmarks, simulators, and emulators.
- When this argument is included the observer start() and stop() will be called for the
- same time window as PyTorch profiler. See the examples section below for a code sample.
- use_cuda (bool):
- .. deprecated:: 1.8.1
- use ``activities`` instead.
- .. note::
- Use :func:`~torch.profiler.schedule` to generate the callable schedule.
- Non-default schedules are useful when profiling long training jobs
- and allow the user to obtain multiple traces at the different iterations
- of the training process.
- The default schedule simply records all the events continuously for the
- duration of the context manager.
- .. note::
- Use :func:`~torch.profiler.tensorboard_trace_handler` to generate result files for TensorBoard:
- ``on_trace_ready=torch.profiler.tensorboard_trace_handler(dir_name)``
- After profiling, result files can be found in the specified directory. Use the command:
- ``tensorboard --logdir dir_name``
- to see the results in TensorBoard.
- For more information, see
- `PyTorch Profiler TensorBoard Plugin <https://github.com/pytorch/kineto/tree/master/tb_plugin>`__
- .. note::
- Enabling shape and stack tracing results in additional overhead.
- When record_shapes=True is specified, profiler will temporarily hold references to the tensors;
- that may further prevent certain optimizations that depend on the reference count and introduce
- extra tensor copies.
- Examples:
- .. code-block:: python
- with torch.profiler.profile(
- activities=[
- torch.profiler.ProfilerActivity.CPU,
- torch.profiler.ProfilerActivity.CUDA,
- ]
- ) as p:
- code_to_profile()
- print(p.key_averages().table(
- sort_by="self_cuda_time_total", row_limit=-1))
- Using the profiler's ``schedule``, ``on_trace_ready`` and ``step`` functions:
- .. code-block:: python
- # Non-default profiler schedule allows user to turn profiler on and off
- # on different iterations of the training loop;
- # trace_handler is called every time a new trace becomes available
- def trace_handler(prof):
- print(prof.key_averages().table(
- sort_by="self_cuda_time_total", row_limit=-1))
- # prof.export_chrome_trace("/tmp/test_trace_" + str(prof.step_num) + ".json")
- with torch.profiler.profile(
- activities=[
- torch.profiler.ProfilerActivity.CPU,
- torch.profiler.ProfilerActivity.CUDA,
- ],
- # In this example with wait=1, warmup=1, active=2, repeat=1,
- # profiler will skip the first step/iteration,
- # start warming up on the second, record
- # the third and the forth iterations,
- # after which the trace will become available
- # and on_trace_ready (when set) is called;
- # the cycle repeats starting with the next step
- schedule=torch.profiler.schedule(
- wait=1,
- warmup=1,
- active=2,
- repeat=1),
- on_trace_ready=trace_handler
- # on_trace_ready=torch.profiler.tensorboard_trace_handler('./log')
- # used when outputting for tensorboard
- ) as p:
- for iter in range(N):
- code_iteration_to_profile(iter)
- # send a signal to the profiler that the next iteration has started
- p.step()
- The following sample shows how to setup up an Execution Trace Observer (`execution_trace_observer`)
- .. code-block:: python
- with torch.profiler.profile(
- ...
- execution_trace_observer=(
- ExecutionTraceObserver().register_callback("./execution_trace.json")
- ),
- ) as p:
- for iter in range(N):
- code_iteration_to_profile(iter)
- p.step()
- You can also refer to test_execution_trace_with_kineto() in tests/profiler/test_profiler.py.
- Note: One can also pass any object satisfying the _ITraceObserver interface.
- """
- def __init__(
- self,
- *,
- activities: Optional[Iterable[ProfilerActivity]] = None,
- schedule: Optional[Callable[[int], ProfilerAction]] = None,
- on_trace_ready: Optional[Callable[..., Any]] = None,
- record_shapes: bool = False,
- profile_memory: bool = False,
- with_stack: bool = False,
- with_flops: bool = False,
- with_modules: bool = False,
- experimental_config: Optional[_ExperimentalConfig] = None,
- execution_trace_observer: Optional[_ITraceObserver] = None,
- # deprecated:
- use_cuda: Optional[bool] = None,
- ):
- activities_set = set(activities) if activities else supported_activities()
- if use_cuda is not None:
- warn(
- "`use_cuda` is deprecated, use `activities` argument instead",
- FutureWarning,
- stacklevel=2,
- )
- if use_cuda:
- activities_set.add(ProfilerActivity.CUDA)
- elif ProfilerActivity.CUDA in activities_set:
- activities_set.remove(ProfilerActivity.CUDA)
- assert len(activities_set) > 0, "No valid profiler activities found"
- super().__init__(
- activities=activities,
- record_shapes=record_shapes,
- profile_memory=profile_memory,
- with_stack=with_stack,
- with_flops=with_flops,
- with_modules=with_modules,
- experimental_config=experimental_config,
- execution_trace_observer=execution_trace_observer,
- )
- if schedule:
- self.schedule = schedule
- # add step markers into the trace and table view
- self.record_steps = True
- else:
- self.schedule = _default_schedule_fn
- self.record_steps = False
- self.on_trace_ready = on_trace_ready
- self.step_num = 0
- self.current_action = self.schedule(self.step_num)
- self.step_rec_fn: Optional[prof.record_function] = None
- self.action_map: Dict[
- Tuple[ProfilerAction, Optional[ProfilerAction]], List[Any]
- ] = {
- # key is (prev_action, current_action), value is action list corresponding to the state pair.
- (ProfilerAction.NONE, ProfilerAction.NONE): [],
- (ProfilerAction.NONE, ProfilerAction.WARMUP): [self.prepare_trace],
- (ProfilerAction.NONE, ProfilerAction.RECORD): [
- self.prepare_trace,
- self.start_trace,
- ],
- (ProfilerAction.NONE, ProfilerAction.RECORD_AND_SAVE): [
- self.prepare_trace,
- self.start_trace,
- ],
- (ProfilerAction.WARMUP, ProfilerAction.NONE): [
- partial(warn, "Incorrect schedule: WARMUP followed by NONE"),
- self.start_trace,
- self.stop_trace,
- ],
- (ProfilerAction.WARMUP, ProfilerAction.WARMUP): [],
- (ProfilerAction.WARMUP, ProfilerAction.RECORD): [self.start_trace],
- (ProfilerAction.WARMUP, ProfilerAction.RECORD_AND_SAVE): [self.start_trace],
- (ProfilerAction.RECORD, ProfilerAction.NONE): [
- partial(warn, "Incorrect schedule: RECORD followed by NONE"),
- self.stop_trace,
- ],
- (ProfilerAction.RECORD, ProfilerAction.WARMUP): [
- partial(warn, "Incorrect schedule: RECORD followed by WARMUP"),
- self.stop_trace,
- ],
- (ProfilerAction.RECORD, ProfilerAction.RECORD): [],
- (ProfilerAction.RECORD, ProfilerAction.RECORD_AND_SAVE): [],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.NONE): [
- self.stop_trace,
- self._trace_ready,
- ],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.WARMUP): [
- self.stop_trace,
- self._trace_ready,
- self.prepare_trace,
- ],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.RECORD): [
- self.stop_trace,
- self._trace_ready,
- self.prepare_trace,
- self.start_trace,
- ],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.RECORD_AND_SAVE): [
- self.stop_trace,
- self._trace_ready,
- self.prepare_trace,
- self.start_trace,
- ],
- # used for exit action
- (ProfilerAction.WARMUP, None): [self.start_trace, self.stop_trace],
- (ProfilerAction.RECORD, None): [self.stop_trace, self._trace_ready],
- (ProfilerAction.RECORD_AND_SAVE, None): [
- self.stop_trace,
- self._trace_ready,
- ],
- }
- # Start tracking increments to profiler step, this will be used
- # by Kineto
- prof.KinetoStepTracker.init_step_count(PROFILER_STEP_NAME)
- def __enter__(self):
- self.start()
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.stop()
- prof.KinetoStepTracker.erase_step_count(PROFILER_STEP_NAME)
- if self.execution_trace_observer:
- self.execution_trace_observer.cleanup()
- def start(self):
- self._transit_action(ProfilerAction.NONE, self.current_action)
- if self.record_steps:
- self.step_rec_fn = prof.record_function(
- "ProfilerStep#" + str(self.step_num)
- )
- self.step_rec_fn.__enter__()
- def stop(self):
- if self.record_steps and self.step_rec_fn:
- self.step_rec_fn.__exit__(None, None, None)
- self._transit_action(self.current_action, None)
- def step(self):
- """
- Signals the profiler that the next profiling step has started.
- """
- if self.record_steps and self.step_rec_fn:
- self.step_rec_fn.__exit__(None, None, None)
- prev_action = self.current_action
- self.step_num += 1
- self.current_action = self.schedule(self.step_num)
- self._transit_action(prev_action, self.current_action)
- prof.KinetoStepTracker.increment_step(PROFILER_STEP_NAME)
- if self.record_steps:
- self.step_rec_fn = prof.record_function(
- "ProfilerStep#" + str(self.step_num)
- )
- self.step_rec_fn.__enter__()
- def _trace_ready(self):
- if self.on_trace_ready:
- self.on_trace_ready(self)
- def _transit_action(self, prev_action, current_action):
- action_list = self.action_map.get((prev_action, current_action))
- if action_list:
- for action in action_list:
- action()
- def _stats(self) -> Optional[prof._ProfilerStats]:
- if self.profiler is None:
- return None
- return self.profiler._stats
- class ExecutionTraceObserver(_ITraceObserver):
- """Execution Trace Observer
- Each process can have a single ExecutionTraceObserver instance. The observer
- can be added to record function callbacks via calling register_callback()
- explicitly. Without calling unregister_callback(), repeated calls to
- register_callback() will not add additional observers to record function
- callbacks. Once an ExecutionTraceObserver is created, the start() and stop()
- methods control when the event data is recorded.
- Deleting or calling unregister_callback() will remove the observer from the
- record function callbacks, finalize the output file, and will stop
- incurring any overheads.
- """
- def __init__(self):
- """
- Initializes the default states.
- """
- self._registered = False
- self._execution_trace_running = False
- def __del__(self):
- """
- Calls unregister_callback() to make sure to finalize outputs.
- """
- self.unregister_callback()
- def register_callback(self, output_file_path: str) -> Self:
- """
- Adds ET observer to record function callbacks. The data will be
- written to output_file_path.
- """
- if not self._registered:
- self._output_file_path = output_file_path
- self._registered = _add_execution_trace_observer(output_file_path)
- return self
- def unregister_callback(self):
- """
- Removes ET observer from record function callbacks.
- """
- def _save_triton_kernels():
- # Save the kernel paths for the generated kernels
- from torch._inductor.codecache import PyCodeCache as PyCodeCache
- kernel_files = [
- v.__file__
- for v in PyCodeCache.cache.values()
- if getattr(v, "__file__", None) is not None
- ]
- work_dir, file_name = os.path.split(self._output_file_path)
- resource_dir = os.path.join(
- work_dir, os.path.splitext(file_name)[0] + "_resources"
- )
- if not os.path.exists(resource_dir):
- os.mkdir(resource_dir)
- for kernel_file in kernel_files:
- if kernel_file is None:
- continue
- path, name = os.path.split(kernel_file)
- dst = os.path.join(resource_dir, name)
- shutil.copyfile(kernel_file, dst)
- if self._registered:
- self.stop()
- try:
- _save_triton_kernels()
- except Exception as e:
- warn(f"Execution trace failed to save kernels: {e}")
- _remove_execution_trace_observer()
- self._registered = False
- @property
- def is_registered(self):
- """
- Returns True if the execution trace observer is registered, otherwise False.
- """
- return self._registered
- def is_running(self):
- """
- Returns True if the observer is running, otherwise False.
- """
- return self._execution_trace_running
- def start(self):
- """
- Starts to capture.
- """
- if self._registered and not self._execution_trace_running:
- _enable_execution_trace_observer()
- self._execution_trace_running = True
- self._record_pg_config()
- def stop(self):
- """
- Stops to capture.
- """
- if self._execution_trace_running:
- _disable_execution_trace_observer()
- self._execution_trace_running = False
- def cleanup(self):
- """
- Calls unregister_callback() to make sure to finalize outputs.
- """
- self.unregister_callback()
- def get_output_file_path(self) -> str:
- """
- Returns the output file name.
- """
- if self.is_registered:
- return self._output_file_path
- else:
- raise RuntimeError(
- "A callback to the ET profiler needs to be registered "
- "first before getting the output file path"
- )
- def _record_pg_config(self) -> None:
- # Records the PG config info to the trace as node:
- # ## process_group:init ##
- if (
- self.is_registered
- and torch.distributed.is_available()
- and torch.distributed.is_initialized()
- ):
- pg_config_info = torch.distributed.distributed_c10d._world.pg_config_info
- torch.autograd._record_function_with_args_enter(
- "## process_group:init ##", json.dumps(pg_config_info)
- )
|