_exceptions.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. from __future__ import annotations
  2. from collections.abc import Callable, Sequence
  3. from functools import partial
  4. from inspect import getmro, isclass
  5. from typing import TYPE_CHECKING, Generic, Type, TypeVar, cast, overload
  6. _BaseExceptionT_co = TypeVar("_BaseExceptionT_co", bound=BaseException, covariant=True)
  7. _BaseExceptionT = TypeVar("_BaseExceptionT", bound=BaseException)
  8. _ExceptionT_co = TypeVar("_ExceptionT_co", bound=Exception, covariant=True)
  9. _ExceptionT = TypeVar("_ExceptionT", bound=Exception)
  10. # using typing.Self would require a typing_extensions dependency on py<3.11
  11. _ExceptionGroupSelf = TypeVar("_ExceptionGroupSelf", bound="ExceptionGroup")
  12. _BaseExceptionGroupSelf = TypeVar("_BaseExceptionGroupSelf", bound="BaseExceptionGroup")
  13. def check_direct_subclass(
  14. exc: BaseException, parents: tuple[type[BaseException]]
  15. ) -> bool:
  16. for cls in getmro(exc.__class__)[:-1]:
  17. if cls in parents:
  18. return True
  19. return False
  20. def get_condition_filter(
  21. condition: type[_BaseExceptionT]
  22. | tuple[type[_BaseExceptionT], ...]
  23. | Callable[[_BaseExceptionT_co], bool],
  24. ) -> Callable[[_BaseExceptionT_co], bool]:
  25. if isclass(condition) and issubclass(
  26. cast(Type[BaseException], condition), BaseException
  27. ):
  28. return partial(check_direct_subclass, parents=(condition,))
  29. elif isinstance(condition, tuple):
  30. if all(isclass(x) and issubclass(x, BaseException) for x in condition):
  31. return partial(check_direct_subclass, parents=condition)
  32. elif callable(condition):
  33. return cast("Callable[[BaseException], bool]", condition)
  34. raise TypeError("expected a function, exception type or tuple of exception types")
  35. def _derive_and_copy_attributes(self, excs):
  36. eg = self.derive(excs)
  37. eg.__cause__ = self.__cause__
  38. eg.__context__ = self.__context__
  39. eg.__traceback__ = self.__traceback__
  40. if hasattr(self, "__notes__"):
  41. # Create a new list so that add_note() only affects one exceptiongroup
  42. eg.__notes__ = list(self.__notes__)
  43. return eg
  44. class BaseExceptionGroup(BaseException, Generic[_BaseExceptionT_co]):
  45. """A combination of multiple unrelated exceptions."""
  46. def __new__(
  47. cls: type[_BaseExceptionGroupSelf],
  48. __message: str,
  49. __exceptions: Sequence[_BaseExceptionT_co],
  50. ) -> _BaseExceptionGroupSelf:
  51. if not isinstance(__message, str):
  52. raise TypeError(f"argument 1 must be str, not {type(__message)}")
  53. if not isinstance(__exceptions, Sequence):
  54. raise TypeError("second argument (exceptions) must be a sequence")
  55. if not __exceptions:
  56. raise ValueError(
  57. "second argument (exceptions) must be a non-empty sequence"
  58. )
  59. for i, exc in enumerate(__exceptions):
  60. if not isinstance(exc, BaseException):
  61. raise ValueError(
  62. f"Item {i} of second argument (exceptions) is not an exception"
  63. )
  64. if cls is BaseExceptionGroup:
  65. if all(isinstance(exc, Exception) for exc in __exceptions):
  66. cls = ExceptionGroup
  67. if issubclass(cls, Exception):
  68. for exc in __exceptions:
  69. if not isinstance(exc, Exception):
  70. if cls is ExceptionGroup:
  71. raise TypeError(
  72. "Cannot nest BaseExceptions in an ExceptionGroup"
  73. )
  74. else:
  75. raise TypeError(
  76. f"Cannot nest BaseExceptions in {cls.__name__!r}"
  77. )
  78. instance = super().__new__(cls, __message, __exceptions)
  79. instance._message = __message
  80. instance._exceptions = __exceptions
  81. return instance
  82. def add_note(self, note: str) -> None:
  83. if not isinstance(note, str):
  84. raise TypeError(
  85. f"Expected a string, got note={note!r} (type {type(note).__name__})"
  86. )
  87. if not hasattr(self, "__notes__"):
  88. self.__notes__: list[str] = []
  89. self.__notes__.append(note)
  90. @property
  91. def message(self) -> str:
  92. return self._message
  93. @property
  94. def exceptions(
  95. self,
  96. ) -> tuple[_BaseExceptionT_co | BaseExceptionGroup[_BaseExceptionT_co], ...]:
  97. return tuple(self._exceptions)
  98. @overload
  99. def subgroup(
  100. self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
  101. ) -> ExceptionGroup[_ExceptionT] | None: ...
  102. @overload
  103. def subgroup(
  104. self, __condition: type[_BaseExceptionT] | tuple[type[_BaseExceptionT], ...]
  105. ) -> BaseExceptionGroup[_BaseExceptionT] | None: ...
  106. @overload
  107. def subgroup(
  108. self,
  109. __condition: Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool],
  110. ) -> BaseExceptionGroup[_BaseExceptionT_co] | None: ...
  111. def subgroup(
  112. self,
  113. __condition: type[_BaseExceptionT]
  114. | tuple[type[_BaseExceptionT], ...]
  115. | Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool],
  116. ) -> BaseExceptionGroup[_BaseExceptionT] | None:
  117. condition = get_condition_filter(__condition)
  118. modified = False
  119. if condition(self):
  120. return self
  121. exceptions: list[BaseException] = []
  122. for exc in self.exceptions:
  123. if isinstance(exc, BaseExceptionGroup):
  124. subgroup = exc.subgroup(__condition)
  125. if subgroup is not None:
  126. exceptions.append(subgroup)
  127. if subgroup is not exc:
  128. modified = True
  129. elif condition(exc):
  130. exceptions.append(exc)
  131. else:
  132. modified = True
  133. if not modified:
  134. return self
  135. elif exceptions:
  136. group = _derive_and_copy_attributes(self, exceptions)
  137. return group
  138. else:
  139. return None
  140. @overload
  141. def split(
  142. self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
  143. ) -> tuple[
  144. ExceptionGroup[_ExceptionT] | None,
  145. BaseExceptionGroup[_BaseExceptionT_co] | None,
  146. ]: ...
  147. @overload
  148. def split(
  149. self, __condition: type[_BaseExceptionT] | tuple[type[_BaseExceptionT], ...]
  150. ) -> tuple[
  151. BaseExceptionGroup[_BaseExceptionT] | None,
  152. BaseExceptionGroup[_BaseExceptionT_co] | None,
  153. ]: ...
  154. @overload
  155. def split(
  156. self,
  157. __condition: Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool],
  158. ) -> tuple[
  159. BaseExceptionGroup[_BaseExceptionT_co] | None,
  160. BaseExceptionGroup[_BaseExceptionT_co] | None,
  161. ]: ...
  162. def split(
  163. self,
  164. __condition: type[_BaseExceptionT]
  165. | tuple[type[_BaseExceptionT], ...]
  166. | Callable[[_BaseExceptionT_co], bool],
  167. ) -> (
  168. tuple[
  169. ExceptionGroup[_ExceptionT] | None,
  170. BaseExceptionGroup[_BaseExceptionT_co] | None,
  171. ]
  172. | tuple[
  173. BaseExceptionGroup[_BaseExceptionT] | None,
  174. BaseExceptionGroup[_BaseExceptionT_co] | None,
  175. ]
  176. | tuple[
  177. BaseExceptionGroup[_BaseExceptionT_co] | None,
  178. BaseExceptionGroup[_BaseExceptionT_co] | None,
  179. ]
  180. ):
  181. condition = get_condition_filter(__condition)
  182. if condition(self):
  183. return self, None
  184. matching_exceptions: list[BaseException] = []
  185. nonmatching_exceptions: list[BaseException] = []
  186. for exc in self.exceptions:
  187. if isinstance(exc, BaseExceptionGroup):
  188. matching, nonmatching = exc.split(condition)
  189. if matching is not None:
  190. matching_exceptions.append(matching)
  191. if nonmatching is not None:
  192. nonmatching_exceptions.append(nonmatching)
  193. elif condition(exc):
  194. matching_exceptions.append(exc)
  195. else:
  196. nonmatching_exceptions.append(exc)
  197. matching_group: _BaseExceptionGroupSelf | None = None
  198. if matching_exceptions:
  199. matching_group = _derive_and_copy_attributes(self, matching_exceptions)
  200. nonmatching_group: _BaseExceptionGroupSelf | None = None
  201. if nonmatching_exceptions:
  202. nonmatching_group = _derive_and_copy_attributes(
  203. self, nonmatching_exceptions
  204. )
  205. return matching_group, nonmatching_group
  206. @overload
  207. def derive(self, __excs: Sequence[_ExceptionT]) -> ExceptionGroup[_ExceptionT]: ...
  208. @overload
  209. def derive(
  210. self, __excs: Sequence[_BaseExceptionT]
  211. ) -> BaseExceptionGroup[_BaseExceptionT]: ...
  212. def derive(
  213. self, __excs: Sequence[_BaseExceptionT]
  214. ) -> BaseExceptionGroup[_BaseExceptionT]:
  215. return BaseExceptionGroup(self.message, __excs)
  216. def __str__(self) -> str:
  217. suffix = "" if len(self._exceptions) == 1 else "s"
  218. return f"{self.message} ({len(self._exceptions)} sub-exception{suffix})"
  219. def __repr__(self) -> str:
  220. return f"{self.__class__.__name__}({self.message!r}, {self._exceptions!r})"
  221. class ExceptionGroup(BaseExceptionGroup[_ExceptionT_co], Exception):
  222. def __new__(
  223. cls: type[_ExceptionGroupSelf],
  224. __message: str,
  225. __exceptions: Sequence[_ExceptionT_co],
  226. ) -> _ExceptionGroupSelf:
  227. return super().__new__(cls, __message, __exceptions)
  228. if TYPE_CHECKING:
  229. @property
  230. def exceptions(
  231. self,
  232. ) -> tuple[_ExceptionT_co | ExceptionGroup[_ExceptionT_co], ...]: ...
  233. @overload # type: ignore[override]
  234. def subgroup(
  235. self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
  236. ) -> ExceptionGroup[_ExceptionT] | None: ...
  237. @overload
  238. def subgroup(
  239. self, __condition: Callable[[_ExceptionT_co | _ExceptionGroupSelf], bool]
  240. ) -> ExceptionGroup[_ExceptionT_co] | None: ...
  241. def subgroup(
  242. self,
  243. __condition: type[_ExceptionT]
  244. | tuple[type[_ExceptionT], ...]
  245. | Callable[[_ExceptionT_co], bool],
  246. ) -> ExceptionGroup[_ExceptionT] | None:
  247. return super().subgroup(__condition)
  248. @overload
  249. def split(
  250. self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
  251. ) -> tuple[
  252. ExceptionGroup[_ExceptionT] | None, ExceptionGroup[_ExceptionT_co] | None
  253. ]: ...
  254. @overload
  255. def split(
  256. self, __condition: Callable[[_ExceptionT_co | _ExceptionGroupSelf], bool]
  257. ) -> tuple[
  258. ExceptionGroup[_ExceptionT_co] | None, ExceptionGroup[_ExceptionT_co] | None
  259. ]: ...
  260. def split(
  261. self: _ExceptionGroupSelf,
  262. __condition: type[_ExceptionT]
  263. | tuple[type[_ExceptionT], ...]
  264. | Callable[[_ExceptionT_co], bool],
  265. ) -> tuple[
  266. ExceptionGroup[_ExceptionT_co] | None, ExceptionGroup[_ExceptionT_co] | None
  267. ]:
  268. return super().split(__condition)