| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- from __future__ import annotations
- import inspect
- import logging
- import types
- import typing
- from ._models import Request
- class Trace:
- def __init__(
- self,
- name: str,
- logger: logging.Logger,
- request: Request | None = None,
- kwargs: dict[str, typing.Any] | None = None,
- ) -> None:
- self.name = name
- self.logger = logger
- self.trace_extension = (
- None if request is None else request.extensions.get("trace")
- )
- self.debug = self.logger.isEnabledFor(logging.DEBUG)
- self.kwargs = kwargs or {}
- self.return_value: typing.Any = None
- self.should_trace = self.debug or self.trace_extension is not None
- self.prefix = self.logger.name.split(".")[-1]
- def trace(self, name: str, info: dict[str, typing.Any]) -> None:
- if self.trace_extension is not None:
- prefix_and_name = f"{self.prefix}.{name}"
- ret = self.trace_extension(prefix_and_name, info)
- if inspect.iscoroutine(ret): # pragma: no cover
- raise TypeError(
- "If you are using a synchronous interface, "
- "the callback of the `trace` extension should "
- "be a normal function instead of an asynchronous function."
- )
- if self.debug:
- if not info or "return_value" in info and info["return_value"] is None:
- message = name
- else:
- args = " ".join([f"{key}={value!r}" for key, value in info.items()])
- message = f"{name} {args}"
- self.logger.debug(message)
- def __enter__(self) -> Trace:
- if self.should_trace:
- info = self.kwargs
- self.trace(f"{self.name}.started", info)
- return self
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- if self.should_trace:
- if exc_value is None:
- info = {"return_value": self.return_value}
- self.trace(f"{self.name}.complete", info)
- else:
- info = {"exception": exc_value}
- self.trace(f"{self.name}.failed", info)
- async def atrace(self, name: str, info: dict[str, typing.Any]) -> None:
- if self.trace_extension is not None:
- prefix_and_name = f"{self.prefix}.{name}"
- coro = self.trace_extension(prefix_and_name, info)
- if not inspect.iscoroutine(coro): # pragma: no cover
- raise TypeError(
- "If you're using an asynchronous interface, "
- "the callback of the `trace` extension should "
- "be an asynchronous function rather than a normal function."
- )
- await coro
- if self.debug:
- if not info or "return_value" in info and info["return_value"] is None:
- message = name
- else:
- args = " ".join([f"{key}={value!r}" for key, value in info.items()])
- message = f"{name} {args}"
- self.logger.debug(message)
- async def __aenter__(self) -> Trace:
- if self.should_trace:
- info = self.kwargs
- await self.atrace(f"{self.name}.started", info)
- return self
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: types.TracebackType | None = None,
- ) -> None:
- if self.should_trace:
- if exc_value is None:
- info = {"return_value": self.return_value}
- await self.atrace(f"{self.name}.complete", info)
- else:
- info = {"exception": exc_value}
- await self.atrace(f"{self.name}.failed", info)
|