_utils.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. """Bucket of reusable internal utilities.
  2. This should be reduced as much as possible with functions only used in one place, moved to that place.
  3. """
  4. from __future__ import annotations as _annotations
  5. import dataclasses
  6. import keyword
  7. import typing
  8. import weakref
  9. from collections import OrderedDict, defaultdict, deque
  10. from copy import deepcopy
  11. from functools import cached_property
  12. from inspect import Parameter
  13. from itertools import zip_longest
  14. from types import BuiltinFunctionType, CodeType, FunctionType, GeneratorType, LambdaType, ModuleType
  15. from typing import Any, Callable, Mapping, TypeVar
  16. from typing_extensions import TypeAlias, TypeGuard
  17. from . import _repr, _typing_extra
  18. from ._import_utils import import_cached_base_model
  19. if typing.TYPE_CHECKING:
  20. MappingIntStrAny: TypeAlias = 'typing.Mapping[int, Any] | typing.Mapping[str, Any]'
  21. AbstractSetIntStr: TypeAlias = 'typing.AbstractSet[int] | typing.AbstractSet[str]'
  22. from ..main import BaseModel
  23. # these are types that are returned unchanged by deepcopy
  24. IMMUTABLE_NON_COLLECTIONS_TYPES: set[type[Any]] = {
  25. int,
  26. float,
  27. complex,
  28. str,
  29. bool,
  30. bytes,
  31. type,
  32. _typing_extra.NoneType,
  33. FunctionType,
  34. BuiltinFunctionType,
  35. LambdaType,
  36. weakref.ref,
  37. CodeType,
  38. # note: including ModuleType will differ from behaviour of deepcopy by not producing error.
  39. # It might be not a good idea in general, but considering that this function used only internally
  40. # against default values of fields, this will allow to actually have a field with module as default value
  41. ModuleType,
  42. NotImplemented.__class__,
  43. Ellipsis.__class__,
  44. }
  45. # these are types that if empty, might be copied with simple copy() instead of deepcopy()
  46. BUILTIN_COLLECTIONS: set[type[Any]] = {
  47. list,
  48. set,
  49. tuple,
  50. frozenset,
  51. dict,
  52. OrderedDict,
  53. defaultdict,
  54. deque,
  55. }
  56. def can_be_positional(param: Parameter) -> bool:
  57. """Return whether the parameter accepts a positional argument.
  58. ```python {test="skip" lint="skip"}
  59. def func(a, /, b, *, c):
  60. pass
  61. params = inspect.signature(func).parameters
  62. can_be_positional(params['a'])
  63. #> True
  64. can_be_positional(params['b'])
  65. #> True
  66. can_be_positional(params['c'])
  67. #> False
  68. ```
  69. """
  70. return param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD)
  71. def sequence_like(v: Any) -> bool:
  72. return isinstance(v, (list, tuple, set, frozenset, GeneratorType, deque))
  73. def lenient_isinstance(o: Any, class_or_tuple: type[Any] | tuple[type[Any], ...] | None) -> bool: # pragma: no cover
  74. try:
  75. return isinstance(o, class_or_tuple) # type: ignore[arg-type]
  76. except TypeError:
  77. return False
  78. def lenient_issubclass(cls: Any, class_or_tuple: Any) -> bool: # pragma: no cover
  79. try:
  80. return isinstance(cls, type) and issubclass(cls, class_or_tuple)
  81. except TypeError:
  82. if isinstance(cls, _typing_extra.WithArgsTypes):
  83. return False
  84. raise # pragma: no cover
  85. def is_model_class(cls: Any) -> TypeGuard[type[BaseModel]]:
  86. """Returns true if cls is a _proper_ subclass of BaseModel, and provides proper type-checking,
  87. unlike raw calls to lenient_issubclass.
  88. """
  89. BaseModel = import_cached_base_model()
  90. return lenient_issubclass(cls, BaseModel) and cls is not BaseModel
  91. def is_valid_identifier(identifier: str) -> bool:
  92. """Checks that a string is a valid identifier and not a Python keyword.
  93. :param identifier: The identifier to test.
  94. :return: True if the identifier is valid.
  95. """
  96. return identifier.isidentifier() and not keyword.iskeyword(identifier)
  97. KeyType = TypeVar('KeyType')
  98. def deep_update(mapping: dict[KeyType, Any], *updating_mappings: dict[KeyType, Any]) -> dict[KeyType, Any]:
  99. updated_mapping = mapping.copy()
  100. for updating_mapping in updating_mappings:
  101. for k, v in updating_mapping.items():
  102. if k in updated_mapping and isinstance(updated_mapping[k], dict) and isinstance(v, dict):
  103. updated_mapping[k] = deep_update(updated_mapping[k], v)
  104. else:
  105. updated_mapping[k] = v
  106. return updated_mapping
  107. def update_not_none(mapping: dict[Any, Any], **update: Any) -> None:
  108. mapping.update({k: v for k, v in update.items() if v is not None})
  109. T = TypeVar('T')
  110. def unique_list(
  111. input_list: list[T] | tuple[T, ...],
  112. *,
  113. name_factory: typing.Callable[[T], str] = str,
  114. ) -> list[T]:
  115. """Make a list unique while maintaining order.
  116. We update the list if another one with the same name is set
  117. (e.g. model validator overridden in subclass).
  118. """
  119. result: list[T] = []
  120. result_names: list[str] = []
  121. for v in input_list:
  122. v_name = name_factory(v)
  123. if v_name not in result_names:
  124. result_names.append(v_name)
  125. result.append(v)
  126. else:
  127. result[result_names.index(v_name)] = v
  128. return result
  129. class ValueItems(_repr.Representation):
  130. """Class for more convenient calculation of excluded or included fields on values."""
  131. __slots__ = ('_items', '_type')
  132. def __init__(self, value: Any, items: AbstractSetIntStr | MappingIntStrAny) -> None:
  133. items = self._coerce_items(items)
  134. if isinstance(value, (list, tuple)):
  135. items = self._normalize_indexes(items, len(value)) # type: ignore
  136. self._items: MappingIntStrAny = items # type: ignore
  137. def is_excluded(self, item: Any) -> bool:
  138. """Check if item is fully excluded.
  139. :param item: key or index of a value
  140. """
  141. return self.is_true(self._items.get(item))
  142. def is_included(self, item: Any) -> bool:
  143. """Check if value is contained in self._items.
  144. :param item: key or index of value
  145. """
  146. return item in self._items
  147. def for_element(self, e: int | str) -> AbstractSetIntStr | MappingIntStrAny | None:
  148. """:param e: key or index of element on value
  149. :return: raw values for element if self._items is dict and contain needed element
  150. """
  151. item = self._items.get(e) # type: ignore
  152. return item if not self.is_true(item) else None
  153. def _normalize_indexes(self, items: MappingIntStrAny, v_length: int) -> dict[int | str, Any]:
  154. """:param items: dict or set of indexes which will be normalized
  155. :param v_length: length of sequence indexes of which will be
  156. >>> self._normalize_indexes({0: True, -2: True, -1: True}, 4)
  157. {0: True, 2: True, 3: True}
  158. >>> self._normalize_indexes({'__all__': True}, 4)
  159. {0: True, 1: True, 2: True, 3: True}
  160. """
  161. normalized_items: dict[int | str, Any] = {}
  162. all_items = None
  163. for i, v in items.items():
  164. if not (isinstance(v, typing.Mapping) or isinstance(v, typing.AbstractSet) or self.is_true(v)):
  165. raise TypeError(f'Unexpected type of exclude value for index "{i}" {v.__class__}')
  166. if i == '__all__':
  167. all_items = self._coerce_value(v)
  168. continue
  169. if not isinstance(i, int):
  170. raise TypeError(
  171. 'Excluding fields from a sequence of sub-models or dicts must be performed index-wise: '
  172. 'expected integer keys or keyword "__all__"'
  173. )
  174. normalized_i = v_length + i if i < 0 else i
  175. normalized_items[normalized_i] = self.merge(v, normalized_items.get(normalized_i))
  176. if not all_items:
  177. return normalized_items
  178. if self.is_true(all_items):
  179. for i in range(v_length):
  180. normalized_items.setdefault(i, ...)
  181. return normalized_items
  182. for i in range(v_length):
  183. normalized_item = normalized_items.setdefault(i, {})
  184. if not self.is_true(normalized_item):
  185. normalized_items[i] = self.merge(all_items, normalized_item)
  186. return normalized_items
  187. @classmethod
  188. def merge(cls, base: Any, override: Any, intersect: bool = False) -> Any:
  189. """Merge a `base` item with an `override` item.
  190. Both `base` and `override` are converted to dictionaries if possible.
  191. Sets are converted to dictionaries with the sets entries as keys and
  192. Ellipsis as values.
  193. Each key-value pair existing in `base` is merged with `override`,
  194. while the rest of the key-value pairs are updated recursively with this function.
  195. Merging takes place based on the "union" of keys if `intersect` is
  196. set to `False` (default) and on the intersection of keys if
  197. `intersect` is set to `True`.
  198. """
  199. override = cls._coerce_value(override)
  200. base = cls._coerce_value(base)
  201. if override is None:
  202. return base
  203. if cls.is_true(base) or base is None:
  204. return override
  205. if cls.is_true(override):
  206. return base if intersect else override
  207. # intersection or union of keys while preserving ordering:
  208. if intersect:
  209. merge_keys = [k for k in base if k in override] + [k for k in override if k in base]
  210. else:
  211. merge_keys = list(base) + [k for k in override if k not in base]
  212. merged: dict[int | str, Any] = {}
  213. for k in merge_keys:
  214. merged_item = cls.merge(base.get(k), override.get(k), intersect=intersect)
  215. if merged_item is not None:
  216. merged[k] = merged_item
  217. return merged
  218. @staticmethod
  219. def _coerce_items(items: AbstractSetIntStr | MappingIntStrAny) -> MappingIntStrAny:
  220. if isinstance(items, typing.Mapping):
  221. pass
  222. elif isinstance(items, typing.AbstractSet):
  223. items = dict.fromkeys(items, ...) # type: ignore
  224. else:
  225. class_name = getattr(items, '__class__', '???')
  226. raise TypeError(f'Unexpected type of exclude value {class_name}')
  227. return items # type: ignore
  228. @classmethod
  229. def _coerce_value(cls, value: Any) -> Any:
  230. if value is None or cls.is_true(value):
  231. return value
  232. return cls._coerce_items(value)
  233. @staticmethod
  234. def is_true(v: Any) -> bool:
  235. return v is True or v is ...
  236. def __repr_args__(self) -> _repr.ReprArgs:
  237. return [(None, self._items)]
  238. if typing.TYPE_CHECKING:
  239. def LazyClassAttribute(name: str, get_value: Callable[[], T]) -> T: ...
  240. else:
  241. class LazyClassAttribute:
  242. """A descriptor exposing an attribute only accessible on a class (hidden from instances).
  243. The attribute is lazily computed and cached during the first access.
  244. """
  245. def __init__(self, name: str, get_value: Callable[[], Any]) -> None:
  246. self.name = name
  247. self.get_value = get_value
  248. @cached_property
  249. def value(self) -> Any:
  250. return self.get_value()
  251. def __get__(self, instance: Any, owner: type[Any]) -> None:
  252. if instance is None:
  253. return self.value
  254. raise AttributeError(f'{self.name!r} attribute of {owner.__name__!r} is class-only')
  255. Obj = TypeVar('Obj')
  256. def smart_deepcopy(obj: Obj) -> Obj:
  257. """Return type as is for immutable built-in types
  258. Use obj.copy() for built-in empty collections
  259. Use copy.deepcopy() for non-empty collections and unknown objects.
  260. """
  261. obj_type = obj.__class__
  262. if obj_type in IMMUTABLE_NON_COLLECTIONS_TYPES:
  263. return obj # fastest case: obj is immutable and not collection therefore will not be copied anyway
  264. try:
  265. if not obj and obj_type in BUILTIN_COLLECTIONS:
  266. # faster way for empty collections, no need to copy its members
  267. return obj if obj_type is tuple else obj.copy() # tuple doesn't have copy method # type: ignore
  268. except (TypeError, ValueError, RuntimeError):
  269. # do we really dare to catch ALL errors? Seems a bit risky
  270. pass
  271. return deepcopy(obj) # slowest way when we actually might need a deepcopy
  272. _SENTINEL = object()
  273. def all_identical(left: typing.Iterable[Any], right: typing.Iterable[Any]) -> bool:
  274. """Check that the items of `left` are the same objects as those in `right`.
  275. >>> a, b = object(), object()
  276. >>> all_identical([a, b, a], [a, b, a])
  277. True
  278. >>> all_identical([a, b, [a]], [a, b, [a]]) # new list object, while "equal" is not "identical"
  279. False
  280. """
  281. for left_item, right_item in zip_longest(left, right, fillvalue=_SENTINEL):
  282. if left_item is not right_item:
  283. return False
  284. return True
  285. @dataclasses.dataclass(frozen=True)
  286. class SafeGetItemProxy:
  287. """Wrapper redirecting `__getitem__` to `get` with a sentinel value as default
  288. This makes is safe to use in `operator.itemgetter` when some keys may be missing
  289. """
  290. # Define __slots__manually for performances
  291. # @dataclasses.dataclass() only support slots=True in python>=3.10
  292. __slots__ = ('wrapped',)
  293. wrapped: Mapping[str, Any]
  294. def __getitem__(self, key: str, /) -> Any:
  295. return self.wrapped.get(key, _SENTINEL)
  296. # required to pass the object to operator.itemgetter() instances due to a quirk of typeshed
  297. # https://github.com/python/mypy/issues/13713
  298. # https://github.com/python/typeshed/pull/8785
  299. # Since this is typing-only, hide it in a typing.TYPE_CHECKING block
  300. if typing.TYPE_CHECKING:
  301. def __contains__(self, key: str, /) -> bool:
  302. return self.wrapped.__contains__(key)