_generate_schema.py 111 KB


  1. """Convert python types to pydantic-core schema."""
  2. from __future__ import annotations as _annotations
  3. import collections.abc
  4. import dataclasses
  5. import datetime
  6. import inspect
  7. import os
  8. import pathlib
  9. import re
  10. import sys
  11. import typing
  12. import warnings
  13. from contextlib import contextmanager
  14. from copy import copy, deepcopy
  15. from decimal import Decimal
  16. from enum import Enum
  17. from fractions import Fraction
  18. from functools import partial
  19. from inspect import Parameter, _ParameterKind, signature
  20. from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
  21. from itertools import chain
  22. from operator import attrgetter
  23. from types import FunctionType, LambdaType, MethodType
  24. from typing import (
  25. TYPE_CHECKING,
  26. Any,
  27. Callable,
  28. Dict,
  29. Final,
  30. ForwardRef,
  31. Iterable,
  32. Iterator,
  33. Mapping,
  34. Type,
  35. TypeVar,
  36. Union,
  37. cast,
  38. overload,
  39. )
  40. from uuid import UUID
  41. from warnings import warn
  42. import typing_extensions
  43. from pydantic_core import (
  44. CoreSchema,
  45. MultiHostUrl,
  46. PydanticCustomError,
  47. PydanticSerializationUnexpectedValue,
  48. PydanticUndefined,
  49. Url,
  50. core_schema,
  51. to_jsonable_python,
  52. )
  53. from typing_extensions import Literal, TypeAliasType, TypedDict, get_args, get_origin, is_typeddict
  54. from ..aliases import AliasChoices, AliasGenerator, AliasPath
  55. from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler
  56. from ..config import ConfigDict, JsonDict, JsonEncoder, JsonSchemaExtraCallable
  57. from ..errors import PydanticSchemaGenerationError, PydanticUndefinedAnnotation, PydanticUserError
  58. from ..functional_validators import AfterValidator, BeforeValidator, FieldValidatorModes, PlainValidator, WrapValidator
  59. from ..json_schema import JsonSchemaValue
  60. from ..version import version_short
  61. from ..warnings import PydanticDeprecatedSince20
  62. from . import _core_utils, _decorators, _discriminated_union, _known_annotated_metadata, _typing_extra
  63. from ._config import ConfigWrapper, ConfigWrapperStack
  64. from ._core_metadata import update_core_metadata
  65. from ._core_utils import (
  66. collect_invalid_schemas,
  67. define_expected_missing_refs,
  68. get_ref,
  69. get_type_ref,
  70. is_function_with_inner_schema,
  71. is_list_like_schema_with_items_schema,
  72. simplify_schema_references,
  73. validate_core_schema,
  74. )
  75. from ._decorators import (
  76. Decorator,
  77. DecoratorInfos,
  78. FieldSerializerDecoratorInfo,
  79. FieldValidatorDecoratorInfo,
  80. ModelSerializerDecoratorInfo,
  81. ModelValidatorDecoratorInfo,
  82. RootValidatorDecoratorInfo,
  83. ValidatorDecoratorInfo,
  84. get_attribute_from_bases,
  85. inspect_field_serializer,
  86. inspect_model_serializer,
  87. inspect_validator,
  88. )
  89. from ._docs_extraction import extract_docstrings_from_cls
  90. from ._fields import collect_dataclass_fields, takes_validated_data_argument
  91. from ._forward_ref import PydanticRecursiveRef
  92. from ._generics import get_standard_typevars_map, has_instance_in_type, recursively_defined_type_refs, replace_types
  93. from ._import_utils import import_cached_base_model, import_cached_field_info
  94. from ._mock_val_ser import MockCoreSchema
  95. from ._namespace_utils import NamespacesTuple, NsResolver
  96. from ._schema_generation_shared import CallbackGetCoreSchemaHandler
  97. from ._utils import lenient_issubclass, smart_deepcopy
  98. if TYPE_CHECKING:
  99. from ..fields import ComputedFieldInfo, FieldInfo
  100. from ..main import BaseModel
  101. from ..types import Discriminator
  102. from ._dataclasses import StandardDataclass
  103. from ._schema_generation_shared import GetJsonSchemaFunction
  104. _SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12)
  105. FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo]
  106. FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo)
  107. AnyFieldDecorator = Union[
  108. Decorator[ValidatorDecoratorInfo],
  109. Decorator[FieldValidatorDecoratorInfo],
  110. Decorator[FieldSerializerDecoratorInfo],
  111. ]
  112. ModifyCoreSchemaWrapHandler = GetCoreSchemaHandler
  113. GetCoreSchemaFunction = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema]
  114. TUPLE_TYPES: list[type] = [tuple, typing.Tuple]
  115. LIST_TYPES: list[type] = [list, typing.List, collections.abc.MutableSequence]
  116. SET_TYPES: list[type] = [set, typing.Set, collections.abc.MutableSet]
  117. FROZEN_SET_TYPES: list[type] = [frozenset, typing.FrozenSet, collections.abc.Set]
  118. DICT_TYPES: list[type] = [dict, typing.Dict]
  119. IP_TYPES: list[type] = [IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network]
  120. SEQUENCE_TYPES: list[type] = [typing.Sequence, collections.abc.Sequence]
  121. PATH_TYPES: list[type] = [
  122. os.PathLike,
  123. pathlib.Path,
  124. pathlib.PurePath,
  125. pathlib.PosixPath,
  126. pathlib.PurePosixPath,
  127. pathlib.PureWindowsPath,
  128. ]
  129. MAPPING_TYPES = [
  130. typing.Mapping,
  131. typing.MutableMapping,
  132. collections.abc.Mapping,
  133. collections.abc.MutableMapping,
  134. collections.OrderedDict,
  135. typing_extensions.OrderedDict,
  136. typing.DefaultDict,
  137. collections.defaultdict,
  138. collections.Counter,
  139. typing.Counter,
  140. ]
  141. DEQUE_TYPES: list[type] = [collections.deque, typing.Deque]
  142. # Note: This does not play very well with type checkers. For example,
  143. # `a: LambdaType = lambda x: x` will raise a type error by Pyright.
  144. ValidateCallSupportedTypes = Union[
  145. LambdaType,
  146. FunctionType,
  147. MethodType,
  148. partial,
  149. ]
  150. VALIDATE_CALL_SUPPORTED_TYPES = get_args(ValidateCallSupportedTypes)
  151. _mode_to_validator: dict[
  152. FieldValidatorModes, type[BeforeValidator | AfterValidator | PlainValidator | WrapValidator]
  153. ] = {'before': BeforeValidator, 'after': AfterValidator, 'plain': PlainValidator, 'wrap': WrapValidator}
  154. def check_validator_fields_against_field_name(
  155. info: FieldDecoratorInfo,
  156. field: str,
  157. ) -> bool:
  158. """Check if field name is in validator fields.
  159. Args:
  160. info: The field info.
  161. field: The field name to check.
  162. Returns:
  163. `True` if field name is in validator fields, `False` otherwise.
  164. """
  165. if '*' in info.fields:
  166. return True
  167. for v_field_name in info.fields:
  168. if v_field_name == field:
  169. return True
  170. return False
  171. def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None:
  172. """Check if the defined fields in decorators exist in `fields` param.
  173. It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`.
  174. Args:
  175. decorators: An iterable of decorators.
  176. fields: An iterable of fields name.
  177. Raises:
  178. PydanticUserError: If one of the field names does not exist in `fields` param.
  179. """
  180. fields = set(fields)
  181. for dec in decorators:
  182. if '*' in dec.info.fields:
  183. continue
  184. if dec.info.check_fields is False:
  185. continue
  186. for field in dec.info.fields:
  187. if field not in fields:
  188. raise PydanticUserError(
  189. f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}'
  190. " (use check_fields=False if you're inheriting from the model and intended this)",
  191. code='decorator-missing-field',
  192. )
  193. def filter_field_decorator_info_by_field(
  194. validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str
  195. ) -> list[Decorator[FieldDecoratorInfoType]]:
  196. return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)]
  197. def apply_each_item_validators(
  198. schema: core_schema.CoreSchema,
  199. each_item_validators: list[Decorator[ValidatorDecoratorInfo]],
  200. field_name: str | None,
  201. ) -> core_schema.CoreSchema:
  202. # This V1 compatibility shim should eventually be removed
  203. # fail early if each_item_validators is empty
  204. if not each_item_validators:
  205. return schema
  206. # push down any `each_item=True` validators
  207. # note that this won't work for any Annotated types that get wrapped by a function validator
  208. # but that's okay because that didn't exist in V1
  209. if schema['type'] == 'nullable':
  210. schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators, field_name)
  211. return schema
  212. elif schema['type'] == 'tuple':
  213. if (variadic_item_index := schema.get('variadic_item_index')) is not None:
  214. schema['items_schema'][variadic_item_index] = apply_validators(
  215. schema['items_schema'][variadic_item_index],
  216. each_item_validators,
  217. field_name,
  218. )
  219. elif is_list_like_schema_with_items_schema(schema):
  220. inner_schema = schema.get('items_schema', core_schema.any_schema())
  221. schema['items_schema'] = apply_validators(inner_schema, each_item_validators, field_name)
  222. elif schema['type'] == 'dict':
  223. inner_schema = schema.get('values_schema', core_schema.any_schema())
  224. schema['values_schema'] = apply_validators(inner_schema, each_item_validators, field_name)
  225. else:
  226. raise TypeError(
  227. f"`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema['type']}"
  228. )
  229. return schema
  230. def _extract_json_schema_info_from_field_info(
  231. info: FieldInfo | ComputedFieldInfo,
  232. ) -> tuple[JsonDict | None, JsonDict | JsonSchemaExtraCallable | None]:
  233. json_schema_updates = {
  234. 'title': info.title,
  235. 'description': info.description,
  236. 'deprecated': bool(info.deprecated) or info.deprecated == '' or None,
  237. 'examples': to_jsonable_python(info.examples),
  238. }
  239. json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None}
  240. return (json_schema_updates or None, info.json_schema_extra)
  241. JsonEncoders = Dict[Type[Any], JsonEncoder]
  242. def _add_custom_serialization_from_json_encoders(
  243. json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema
  244. ) -> CoreSchema:
  245. """Iterate over the json_encoders and add the first matching encoder to the schema.
  246. Args:
  247. json_encoders: A dictionary of types and their encoder functions.
  248. tp: The type to check for a matching encoder.
  249. schema: The schema to add the encoder to.
  250. """
  251. if not json_encoders:
  252. return schema
  253. if 'serialization' in schema:
  254. return schema
  255. # Check the class type and its superclasses for a matching encoder
  256. # Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself
  257. # if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__
  258. for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]):
  259. encoder = json_encoders.get(base)
  260. if encoder is None:
  261. continue
  262. warnings.warn(
  263. f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives',
  264. PydanticDeprecatedSince20,
  265. )
  266. # TODO: in theory we should check that the schema accepts a serialization key
  267. schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json')
  268. return schema
  269. return schema
  270. def _get_first_non_null(a: Any, b: Any) -> Any:
  271. """Return the first argument if it is not None, otherwise return the second argument.
  272. Use case: serialization_alias (argument a) and alias (argument b) are both defined, and serialization_alias is ''.
  273. This function will return serialization_alias, which is the first argument, even though it is an empty string.
  274. """
  275. return a if a is not None else b
  276. class GenerateSchema:
  277. """Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... ."""
  278. __slots__ = (
  279. '_config_wrapper_stack',
  280. '_ns_resolver',
  281. '_typevars_map',
  282. 'field_name_stack',
  283. 'model_type_stack',
  284. 'defs',
  285. )
  286. def __init__(
  287. self,
  288. config_wrapper: ConfigWrapper,
  289. ns_resolver: NsResolver | None = None,
  290. typevars_map: dict[Any, Any] | None = None,
  291. ) -> None:
  292. # we need a stack for recursing into nested models
  293. self._config_wrapper_stack = ConfigWrapperStack(config_wrapper)
  294. self._ns_resolver = ns_resolver or NsResolver()
  295. self._typevars_map = typevars_map
  296. self.field_name_stack = _FieldNameStack()
  297. self.model_type_stack = _ModelTypeStack()
  298. self.defs = _Definitions()
  299. def __init_subclass__(cls) -> None:
  300. super().__init_subclass__()
  301. warnings.warn(
  302. 'Subclassing `GenerateSchema` is not supported. The API is highly subject to change in minor versions.',
  303. UserWarning,
  304. stacklevel=2,
  305. )
  306. @property
  307. def _config_wrapper(self) -> ConfigWrapper:
  308. return self._config_wrapper_stack.tail
  309. @property
  310. def _types_namespace(self) -> NamespacesTuple:
  311. return self._ns_resolver.types_namespace
  312. @property
  313. def _arbitrary_types(self) -> bool:
  314. return self._config_wrapper.arbitrary_types_allowed
  315. # the following methods can be overridden but should be considered
  316. # unstable / private APIs
  317. def _list_schema(self, items_type: Any) -> CoreSchema:
  318. return core_schema.list_schema(self.generate_schema(items_type))
  319. def _dict_schema(self, keys_type: Any, values_type: Any) -> CoreSchema:
  320. return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type))
  321. def _set_schema(self, items_type: Any) -> CoreSchema:
  322. return core_schema.set_schema(self.generate_schema(items_type))
  323. def _frozenset_schema(self, items_type: Any) -> CoreSchema:
  324. return core_schema.frozenset_schema(self.generate_schema(items_type))
  325. def _enum_schema(self, enum_type: type[Enum]) -> CoreSchema:
  326. cases: list[Any] = list(enum_type.__members__.values())
  327. enum_ref = get_type_ref(enum_type)
  328. description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__)
  329. if (
  330. description == 'An enumeration.'
  331. ): # This is the default value provided by enum.EnumMeta.__new__; don't use it
  332. description = None
  333. js_updates = {'title': enum_type.__name__, 'description': description}
  334. js_updates = {k: v for k, v in js_updates.items() if v is not None}
  335. sub_type: Literal['str', 'int', 'float'] | None = None
  336. if issubclass(enum_type, int):
  337. sub_type = 'int'
  338. value_ser_type: core_schema.SerSchema = core_schema.simple_ser_schema('int')
  339. elif issubclass(enum_type, str):
  340. # this handles `StrEnum` (3.11 only), and also `Foobar(str, Enum)`
  341. sub_type = 'str'
  342. value_ser_type = core_schema.simple_ser_schema('str')
  343. elif issubclass(enum_type, float):
  344. sub_type = 'float'
  345. value_ser_type = core_schema.simple_ser_schema('float')
  346. else:
  347. # TODO this is an ugly hack, how do we trigger an Any schema for serialization?
  348. value_ser_type = core_schema.plain_serializer_function_ser_schema(lambda x: x)
  349. if cases:
  350. def get_json_schema(schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
  351. json_schema = handler(schema)
  352. original_schema = handler.resolve_ref_schema(json_schema)
  353. original_schema.update(js_updates)
  354. return json_schema
  355. # we don't want to add the missing to the schema if it's the default one
  356. default_missing = getattr(enum_type._missing_, '__func__', None) is Enum._missing_.__func__ # pyright: ignore[reportFunctionMemberAccess]
  357. enum_schema = core_schema.enum_schema(
  358. enum_type,
  359. cases,
  360. sub_type=sub_type,
  361. missing=None if default_missing else enum_type._missing_,
  362. ref=enum_ref,
  363. metadata={'pydantic_js_functions': [get_json_schema]},
  364. )
  365. if self._config_wrapper.use_enum_values:
  366. enum_schema = core_schema.no_info_after_validator_function(
  367. attrgetter('value'), enum_schema, serialization=value_ser_type
  368. )
  369. return enum_schema
  370. else:
  371. def get_json_schema_no_cases(_, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
  372. json_schema = handler(core_schema.enum_schema(enum_type, cases, sub_type=sub_type, ref=enum_ref))
  373. original_schema = handler.resolve_ref_schema(json_schema)
  374. original_schema.update(js_updates)
  375. return json_schema
  376. # Use an isinstance check for enums with no cases.
  377. # The most important use case for this is creating TypeVar bounds for generics that should
  378. # be restricted to enums. This is more consistent than it might seem at first, since you can only
  379. # subclass enum.Enum (or subclasses of enum.Enum) if all parent classes have no cases.
  380. # We use the get_json_schema function when an Enum subclass has been declared with no cases
  381. # so that we can still generate a valid json schema.
  382. return core_schema.is_instance_schema(
  383. enum_type,
  384. metadata={'pydantic_js_functions': [get_json_schema_no_cases]},
  385. )
  386. def _ip_schema(self, tp: Any) -> CoreSchema:
  387. from ._validators import IP_VALIDATOR_LOOKUP, IpType
  388. ip_type_json_schema_format: dict[type[IpType], str] = {
  389. IPv4Address: 'ipv4',
  390. IPv4Network: 'ipv4network',
  391. IPv4Interface: 'ipv4interface',
  392. IPv6Address: 'ipv6',
  393. IPv6Network: 'ipv6network',
  394. IPv6Interface: 'ipv6interface',
  395. }
  396. def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType:
  397. if not isinstance(ip, (tp, str)):
  398. raise PydanticSerializationUnexpectedValue(
  399. f"Expected `{tp}` but got `{type(ip)}` with value `'{ip}'` - serialized value may not be as expected."
  400. )
  401. if info.mode == 'python':
  402. return ip
  403. return str(ip)
  404. return core_schema.lax_or_strict_schema(
  405. lax_schema=core_schema.no_info_plain_validator_function(IP_VALIDATOR_LOOKUP[tp]),
  406. strict_schema=core_schema.json_or_python_schema(
  407. json_schema=core_schema.no_info_after_validator_function(tp, core_schema.str_schema()),
  408. python_schema=core_schema.is_instance_schema(tp),
  409. ),
  410. serialization=core_schema.plain_serializer_function_ser_schema(ser_ip, info_arg=True, when_used='always'),
  411. metadata={
  412. 'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': ip_type_json_schema_format[tp]}]
  413. },
  414. )
  415. def _fraction_schema(self) -> CoreSchema:
  416. """Support for [`fractions.Fraction`][fractions.Fraction]."""
  417. from ._validators import fraction_validator
  418. # TODO: note, this is a fairly common pattern, re lax / strict for attempted type coercion,
  419. # can we use a helper function to reduce boilerplate?
  420. return core_schema.lax_or_strict_schema(
  421. lax_schema=core_schema.no_info_plain_validator_function(fraction_validator),
  422. strict_schema=core_schema.json_or_python_schema(
  423. json_schema=core_schema.no_info_plain_validator_function(fraction_validator),
  424. python_schema=core_schema.is_instance_schema(Fraction),
  425. ),
  426. # use str serialization to guarantee round trip behavior
  427. serialization=core_schema.to_string_ser_schema(when_used='always'),
  428. metadata={'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'fraction'}]},
  429. )
  430. def _arbitrary_type_schema(self, tp: Any) -> CoreSchema:
  431. if not isinstance(tp, type):
  432. warn(
  433. f'{tp!r} is not a Python type (it may be an instance of an object),'
  434. ' Pydantic will allow any object with no validation since we cannot even'
  435. ' enforce that the input is an instance of the given type.'
  436. ' To get rid of this error wrap the type with `pydantic.SkipValidation`.',
  437. UserWarning,
  438. )
  439. return core_schema.any_schema()
  440. return core_schema.is_instance_schema(tp)
  441. def _unknown_type_schema(self, obj: Any) -> CoreSchema:
  442. raise PydanticSchemaGenerationError(
  443. f'Unable to generate pydantic-core schema for {obj!r}. '
  444. 'Set `arbitrary_types_allowed=True` in the model_config to ignore this error'
  445. ' or implement `__get_pydantic_core_schema__` on your type to fully support it.'
  446. '\n\nIf you got this error by calling handler(<some type>) within'
  447. ' `__get_pydantic_core_schema__` then you likely need to call'
  448. ' `handler.generate_schema(<some type>)` since we do not call'
  449. ' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.'
  450. )
  451. def _apply_discriminator_to_union(
  452. self, schema: CoreSchema, discriminator: str | Discriminator | None
  453. ) -> CoreSchema:
  454. if discriminator is None:
  455. return schema
  456. try:
  457. return _discriminated_union.apply_discriminator(
  458. schema,
  459. discriminator,
  460. )
  461. except _discriminated_union.MissingDefinitionForUnionRef:
  462. # defer until defs are resolved
  463. _discriminated_union.set_discriminator_in_metadata(
  464. schema,
  465. discriminator,
  466. )
  467. return schema
  468. class CollectedInvalid(Exception):
  469. pass
  470. def clean_schema(self, schema: CoreSchema) -> CoreSchema:
  471. schema = self.collect_definitions(schema)
  472. schema = simplify_schema_references(schema)
  473. if collect_invalid_schemas(schema):
  474. raise self.CollectedInvalid()
  475. schema = _discriminated_union.apply_discriminators(schema)
  476. schema = validate_core_schema(schema)
  477. return schema
  478. def collect_definitions(self, schema: CoreSchema) -> CoreSchema:
  479. ref = cast('str | None', schema.get('ref', None))
  480. if ref:
  481. self.defs.definitions[ref] = schema
  482. if 'ref' in schema:
  483. schema = core_schema.definition_reference_schema(schema['ref'])
  484. return core_schema.definitions_schema(
  485. schema,
  486. list(self.defs.definitions.values()),
  487. )
  488. def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None:
  489. metadata = metadata_schema.get('metadata', {})
  490. pydantic_js_functions = metadata.setdefault('pydantic_js_functions', [])
  491. # because of how we generate core schemas for nested generic models
  492. # we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times
  493. # this check may fail to catch duplicates if the function is a `functools.partial`
  494. # or something like that, but if it does it'll fail by inserting the duplicate
  495. if js_function not in pydantic_js_functions:
  496. pydantic_js_functions.append(js_function)
  497. metadata_schema['metadata'] = metadata
  498. def generate_schema(
  499. self,
  500. obj: Any,
  501. from_dunder_get_core_schema: bool = True,
  502. ) -> core_schema.CoreSchema:
  503. """Generate core schema.
  504. Args:
  505. obj: The object to generate core schema for.
  506. from_dunder_get_core_schema: Whether to generate schema from either the
  507. `__get_pydantic_core_schema__` function or `__pydantic_core_schema__` property.
  508. Returns:
  509. The generated core schema.
  510. Raises:
  511. PydanticUndefinedAnnotation:
  512. If it is not possible to evaluate forward reference.
  513. PydanticSchemaGenerationError:
  514. If it is not possible to generate pydantic-core schema.
  515. TypeError:
  516. - If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices).
  517. - If V1 style validator with `each_item=True` applied on a wrong field.
  518. PydanticUserError:
  519. - If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12.
  520. - If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`.
  521. """
  522. schema: CoreSchema | None = None
  523. if from_dunder_get_core_schema:
  524. from_property = self._generate_schema_from_property(obj, obj)
  525. if from_property is not None:
  526. schema = from_property
  527. if schema is None:
  528. schema = self._generate_schema_inner(obj)
  529. metadata_js_function = _extract_get_pydantic_json_schema(obj, schema)
  530. if metadata_js_function is not None:
  531. metadata_schema = resolve_original_schema(schema, self.defs.definitions)
  532. if metadata_schema:
  533. self._add_js_function(metadata_schema, metadata_js_function)
  534. schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema)
  535. return schema
  536. def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema:
  537. """Generate schema for a Pydantic model."""
  538. with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema):
  539. if maybe_schema is not None:
  540. return maybe_schema
  541. fields = getattr(cls, '__pydantic_fields__', {})
  542. decorators = cls.__pydantic_decorators__
  543. computed_fields = decorators.computed_fields
  544. check_decorator_fields_exist(
  545. chain(
  546. decorators.field_validators.values(),
  547. decorators.field_serializers.values(),
  548. decorators.validators.values(),
  549. ),
  550. {*fields.keys(), *computed_fields.keys()},
  551. )
  552. config_wrapper = ConfigWrapper(cls.model_config, check=False)
  553. core_config = config_wrapper.core_config(title=cls.__name__)
  554. model_validators = decorators.model_validators.values()
  555. with self._config_wrapper_stack.push(config_wrapper), self._ns_resolver.push(cls):
  556. extras_schema = None
  557. if core_config.get('extra_fields_behavior') == 'allow':
  558. assert cls.__mro__[0] is cls
  559. assert cls.__mro__[-1] is object
  560. for candidate_cls in cls.__mro__[:-1]:
  561. extras_annotation = getattr(candidate_cls, '__annotations__', {}).get(
  562. '__pydantic_extra__', None
  563. )
  564. if extras_annotation is not None:
  565. if isinstance(extras_annotation, str):
  566. extras_annotation = _typing_extra.eval_type_backport(
  567. _typing_extra._make_forward_ref(
  568. extras_annotation, is_argument=False, is_class=True
  569. ),
  570. *self._types_namespace,
  571. )
  572. tp = get_origin(extras_annotation)
  573. if tp not in (Dict, dict):
  574. raise PydanticSchemaGenerationError(
  575. 'The type annotation for `__pydantic_extra__` must be `Dict[str, ...]`'
  576. )
  577. extra_items_type = self._get_args_resolving_forward_refs(
  578. extras_annotation,
  579. required=True,
  580. )[1]
  581. if not _typing_extra.is_any(extra_items_type):
  582. extras_schema = self.generate_schema(extra_items_type)
  583. break
  584. generic_origin: type[BaseModel] | None = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin')
  585. if cls.__pydantic_root_model__:
  586. root_field = self._common_field_schema('root', fields['root'], decorators)
  587. inner_schema = root_field['schema']
  588. inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
  589. model_schema = core_schema.model_schema(
  590. cls,
  591. inner_schema,
  592. generic_origin=generic_origin,
  593. custom_init=getattr(cls, '__pydantic_custom_init__', None),
  594. root_model=True,
  595. post_init=getattr(cls, '__pydantic_post_init__', None),
  596. config=core_config,
  597. ref=model_ref,
  598. )
  599. else:
  600. fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema(
  601. {k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()},
  602. computed_fields=[
  603. self._computed_field_schema(d, decorators.field_serializers)
  604. for d in computed_fields.values()
  605. ],
  606. extras_schema=extras_schema,
  607. model_name=cls.__name__,
  608. )
  609. inner_schema = apply_validators(fields_schema, decorators.root_validators.values(), None)
  610. new_inner_schema = define_expected_missing_refs(inner_schema, recursively_defined_type_refs())
  611. if new_inner_schema is not None:
  612. inner_schema = new_inner_schema
  613. inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
  614. model_schema = core_schema.model_schema(
  615. cls,
  616. inner_schema,
  617. generic_origin=generic_origin,
  618. custom_init=getattr(cls, '__pydantic_custom_init__', None),
  619. root_model=False,
  620. post_init=getattr(cls, '__pydantic_post_init__', None),
  621. config=core_config,
  622. ref=model_ref,
  623. )
  624. schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values())
  625. schema = apply_model_validators(schema, model_validators, 'outer')
  626. self.defs.definitions[model_ref] = schema
  627. return core_schema.definition_reference_schema(model_ref)
  628. def _unpack_refs_defs(self, schema: CoreSchema) -> CoreSchema:
  629. """Unpack all 'definitions' schemas into `GenerateSchema.defs.definitions`
  630. and return the inner schema.
  631. """
  632. if schema['type'] == 'definitions':
  633. definitions = self.defs.definitions
  634. for s in schema['definitions']:
  635. definitions[s['ref']] = s # type: ignore
  636. return schema['schema']
  637. return schema
  638. def _resolve_self_type(self, obj: Any) -> Any:
  639. obj = self.model_type_stack.get()
  640. if obj is None:
  641. raise PydanticUserError('`typing.Self` is invalid in this context', code='invalid-self-type')
  642. return obj
  643. def _generate_schema_from_property(self, obj: Any, source: Any) -> core_schema.CoreSchema | None:
  644. """Try to generate schema from either the `__get_pydantic_core_schema__` function or
  645. `__pydantic_core_schema__` property.
  646. Note: `__get_pydantic_core_schema__` takes priority so it can
  647. decide whether to use a `__pydantic_core_schema__` attribute, or generate a fresh schema.
  648. """
  649. # avoid calling `__get_pydantic_core_schema__` if we've already visited this object
  650. if _typing_extra.is_self(obj):
  651. obj = self._resolve_self_type(obj)
  652. with self.defs.get_schema_or_ref(obj) as (_, maybe_schema):
  653. if maybe_schema is not None:
  654. return maybe_schema
  655. if obj is source:
  656. ref_mode = 'unpack'
  657. else:
  658. ref_mode = 'to-def'
  659. schema: CoreSchema
  660. if (get_schema := getattr(obj, '__get_pydantic_core_schema__', None)) is not None:
  661. schema = get_schema(
  662. source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode)
  663. )
  664. elif (
  665. hasattr(obj, '__dict__')
  666. # In some cases (e.g. a stdlib dataclass subclassing a Pydantic dataclass),
  667. # doing an attribute access to get the schema will result in the parent schema
  668. # being fetched. Thus, only look for the current obj's dict:
  669. and (existing_schema := obj.__dict__.get('__pydantic_core_schema__')) is not None
  670. and not isinstance(existing_schema, MockCoreSchema)
  671. ):
  672. schema = existing_schema
  673. elif (validators := getattr(obj, '__get_validators__', None)) is not None:
  674. from pydantic.v1 import BaseModel as BaseModelV1
  675. if issubclass(obj, BaseModelV1):
  676. warn(
  677. f'Mixing V1 models and V2 models (or constructs, like `TypeAdapter`) is not supported. Please upgrade `{obj.__name__}` to V2.',
  678. UserWarning,
  679. )
  680. else:
  681. warn(
  682. '`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.',
  683. PydanticDeprecatedSince20,
  684. )
  685. schema = core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()])
  686. else:
  687. # we have no existing schema information on the property, exit early so that we can go generate a schema
  688. return None
  689. schema = self._unpack_refs_defs(schema)
  690. if is_function_with_inner_schema(schema):
  691. ref = schema['schema'].pop('ref', None) # pyright: ignore[reportCallIssue, reportArgumentType]
  692. if ref:
  693. schema['ref'] = ref
  694. else:
  695. ref = get_ref(schema)
  696. if ref:
  697. self.defs.definitions[ref] = schema
  698. return core_schema.definition_reference_schema(ref)
  699. return schema
  700. def _resolve_forward_ref(self, obj: Any) -> Any:
  701. # we assume that types_namespace has the target of forward references in its scope,
  702. # but this could fail, for example, if calling Validator on an imported type which contains
  703. # forward references to other types only defined in the module from which it was imported
  704. # `Validator(SomeImportedTypeAliasWithAForwardReference)`
  705. # or the equivalent for BaseModel
  706. # class Model(BaseModel):
  707. # x: SomeImportedTypeAliasWithAForwardReference
  708. try:
  709. obj = _typing_extra.eval_type_backport(obj, *self._types_namespace)
  710. except NameError as e:
  711. raise PydanticUndefinedAnnotation.from_name_error(e) from e
  712. # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation
  713. if isinstance(obj, ForwardRef):
  714. raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}')
  715. if self._typevars_map:
  716. obj = replace_types(obj, self._typevars_map)
  717. return obj
  718. @overload
  719. def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]: ...
  720. @overload
  721. def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None: ...
  722. def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None:
  723. args = get_args(obj)
  724. if args:
  725. if sys.version_info >= (3, 9):
  726. from types import GenericAlias
  727. if isinstance(obj, GenericAlias):
  728. # PEP 585 generic aliases don't convert args to ForwardRefs, unlike `typing.List/Dict` etc.
  729. args = (_typing_extra._make_forward_ref(a) if isinstance(a, str) else a for a in args)
  730. args = tuple(self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args)
  731. elif required: # pragma: no cover
  732. raise TypeError(f'Expected {obj} to have generic parameters but it had none')
  733. return args
  734. def _get_first_arg_or_any(self, obj: Any) -> Any:
  735. args = self._get_args_resolving_forward_refs(obj)
  736. if not args:
  737. return Any
  738. return args[0]
  739. def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]:
  740. args = self._get_args_resolving_forward_refs(obj)
  741. if not args:
  742. return (Any, Any)
  743. if len(args) < 2:
  744. origin = get_origin(obj)
  745. raise TypeError(f'Expected two type arguments for {origin}, got 1')
  746. return args[0], args[1]
  747. def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema:
  748. if _typing_extra.is_annotated(obj):
  749. return self._annotated_schema(obj)
  750. if isinstance(obj, dict):
  751. # we assume this is already a valid schema
  752. return obj # type: ignore[return-value]
  753. if isinstance(obj, str):
  754. obj = ForwardRef(obj)
  755. if isinstance(obj, ForwardRef):
  756. return self.generate_schema(self._resolve_forward_ref(obj))
  757. BaseModel = import_cached_base_model()
  758. if lenient_issubclass(obj, BaseModel):
  759. with self.model_type_stack.push(obj):
  760. return self._model_schema(obj)
  761. if isinstance(obj, PydanticRecursiveRef):
  762. return core_schema.definition_reference_schema(schema_ref=obj.type_ref)
  763. return self.match_type(obj)
  764. def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901
  765. """Main mapping of types to schemas.
  766. The general structure is a series of if statements starting with the simple cases
  767. (non-generic primitive types) and then handling generics and other more complex cases.
  768. Each case either generates a schema directly, calls into a public user-overridable method
  769. (like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some
  770. boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`).
  771. The idea is that we'll evolve this into adding more and more user facing methods over time
  772. as they get requested and we figure out what the right API for them is.
  773. """
  774. if obj is str:
  775. return core_schema.str_schema()
  776. elif obj is bytes:
  777. return core_schema.bytes_schema()
  778. elif obj is int:
  779. return core_schema.int_schema()
  780. elif obj is float:
  781. return core_schema.float_schema()
  782. elif obj is bool:
  783. return core_schema.bool_schema()
  784. elif obj is complex:
  785. return core_schema.complex_schema()
  786. elif _typing_extra.is_any(obj) or obj is object:
  787. return core_schema.any_schema()
  788. elif obj is datetime.date:
  789. return core_schema.date_schema()
  790. elif obj is datetime.datetime:
  791. return core_schema.datetime_schema()
  792. elif obj is datetime.time:
  793. return core_schema.time_schema()
  794. elif obj is datetime.timedelta:
  795. return core_schema.timedelta_schema()
  796. elif obj is Decimal:
  797. return core_schema.decimal_schema()
  798. elif obj is UUID:
  799. return core_schema.uuid_schema()
  800. elif obj is Url:
  801. return core_schema.url_schema()
  802. elif obj is Fraction:
  803. return self._fraction_schema()
  804. elif obj is MultiHostUrl:
  805. return core_schema.multi_host_url_schema()
  806. elif obj is None or obj is _typing_extra.NoneType:
  807. return core_schema.none_schema()
  808. elif obj in IP_TYPES:
  809. return self._ip_schema(obj)
  810. elif obj in TUPLE_TYPES:
  811. return self._tuple_schema(obj)
  812. elif obj in LIST_TYPES:
  813. return self._list_schema(Any)
  814. elif obj in SET_TYPES:
  815. return self._set_schema(Any)
  816. elif obj in FROZEN_SET_TYPES:
  817. return self._frozenset_schema(Any)
  818. elif obj in SEQUENCE_TYPES:
  819. return self._sequence_schema(Any)
  820. elif obj in DICT_TYPES:
  821. return self._dict_schema(Any, Any)
  822. elif _typing_extra.is_type_alias_type(obj):
  823. return self._type_alias_type_schema(obj)
  824. elif obj is type:
  825. return self._type_schema()
  826. elif _typing_extra.is_callable(obj):
  827. return core_schema.callable_schema()
  828. elif _typing_extra.is_literal(obj):
  829. return self._literal_schema(obj)
  830. elif is_typeddict(obj):
  831. return self._typed_dict_schema(obj, None)
  832. elif _typing_extra.is_namedtuple(obj):
  833. return self._namedtuple_schema(obj, None)
  834. elif _typing_extra.is_new_type(obj):
  835. # NewType, can't use isinstance because it fails <3.10
  836. return self.generate_schema(obj.__supertype__)
  837. elif obj is re.Pattern:
  838. return self._pattern_schema(obj)
  839. elif _typing_extra.is_hashable(obj):
  840. return self._hashable_schema()
  841. elif isinstance(obj, typing.TypeVar):
  842. return self._unsubstituted_typevar_schema(obj)
  843. elif _typing_extra.is_finalvar(obj):
  844. if obj is Final:
  845. return core_schema.any_schema()
  846. return self.generate_schema(
  847. self._get_first_arg_or_any(obj),
  848. )
  849. elif isinstance(obj, VALIDATE_CALL_SUPPORTED_TYPES):
  850. return self._call_schema(obj)
  851. elif inspect.isclass(obj) and issubclass(obj, Enum):
  852. return self._enum_schema(obj)
  853. elif _typing_extra.is_zoneinfo_type(obj):
  854. return self._zoneinfo_schema()
  855. if dataclasses.is_dataclass(obj):
  856. return self._dataclass_schema(obj, None)
  857. origin = get_origin(obj)
  858. if origin is not None:
  859. return self._match_generic_type(obj, origin)
  860. res = self._get_prepare_pydantic_annotations_for_known_type(obj, ())
  861. if res is not None:
  862. source_type, annotations = res
  863. return self._apply_annotations(source_type, annotations)
  864. if self._arbitrary_types:
  865. return self._arbitrary_type_schema(obj)
  866. return self._unknown_type_schema(obj)
  867. def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901
  868. # Need to handle generic dataclasses before looking for the schema properties because attribute accesses
  869. # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization
  870. # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible
  871. # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game.
  872. if dataclasses.is_dataclass(origin):
  873. return self._dataclass_schema(obj, origin) # pyright: ignore[reportArgumentType]
  874. if _typing_extra.is_namedtuple(origin):
  875. return self._namedtuple_schema(obj, origin)
  876. from_property = self._generate_schema_from_property(origin, obj)
  877. if from_property is not None:
  878. return from_property
  879. if _typing_extra.is_type_alias_type(origin):
  880. return self._type_alias_type_schema(obj)
  881. elif _typing_extra.origin_is_union(origin):
  882. return self._union_schema(obj)
  883. elif origin in TUPLE_TYPES:
  884. return self._tuple_schema(obj)
  885. elif origin in LIST_TYPES:
  886. return self._list_schema(self._get_first_arg_or_any(obj))
  887. elif origin in SET_TYPES:
  888. return self._set_schema(self._get_first_arg_or_any(obj))
  889. elif origin in FROZEN_SET_TYPES:
  890. return self._frozenset_schema(self._get_first_arg_or_any(obj))
  891. elif origin in DICT_TYPES:
  892. return self._dict_schema(*self._get_first_two_args_or_any(obj))
  893. elif is_typeddict(origin):
  894. return self._typed_dict_schema(obj, origin)
  895. elif origin in (typing.Type, type):
  896. return self._subclass_schema(obj)
  897. elif origin in SEQUENCE_TYPES:
  898. return self._sequence_schema(self._get_first_arg_or_any(obj))
  899. elif origin in {typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator}:
  900. return self._iterable_schema(obj)
  901. elif origin in (re.Pattern, typing.Pattern):
  902. return self._pattern_schema(obj)
  903. res = self._get_prepare_pydantic_annotations_for_known_type(obj, ())
  904. if res is not None:
  905. source_type, annotations = res
  906. return self._apply_annotations(source_type, annotations)
  907. if self._arbitrary_types:
  908. return self._arbitrary_type_schema(origin)
  909. return self._unknown_type_schema(obj)
  910. def _generate_td_field_schema(
  911. self,
  912. name: str,
  913. field_info: FieldInfo,
  914. decorators: DecoratorInfos,
  915. *,
  916. required: bool = True,
  917. ) -> core_schema.TypedDictField:
  918. """Prepare a TypedDictField to represent a model or typeddict field."""
  919. common_field = self._common_field_schema(name, field_info, decorators)
  920. return core_schema.typed_dict_field(
  921. common_field['schema'],
  922. required=False if not field_info.is_required() else required,
  923. serialization_exclude=common_field['serialization_exclude'],
  924. validation_alias=common_field['validation_alias'],
  925. serialization_alias=common_field['serialization_alias'],
  926. metadata=common_field['metadata'],
  927. )
  928. def _generate_md_field_schema(
  929. self,
  930. name: str,
  931. field_info: FieldInfo,
  932. decorators: DecoratorInfos,
  933. ) -> core_schema.ModelField:
  934. """Prepare a ModelField to represent a model field."""
  935. common_field = self._common_field_schema(name, field_info, decorators)
  936. return core_schema.model_field(
  937. common_field['schema'],
  938. serialization_exclude=common_field['serialization_exclude'],
  939. validation_alias=common_field['validation_alias'],
  940. serialization_alias=common_field['serialization_alias'],
  941. frozen=common_field['frozen'],
  942. metadata=common_field['metadata'],
  943. )
  944. def _generate_dc_field_schema(
  945. self,
  946. name: str,
  947. field_info: FieldInfo,
  948. decorators: DecoratorInfos,
  949. ) -> core_schema.DataclassField:
  950. """Prepare a DataclassField to represent the parameter/field, of a dataclass."""
  951. common_field = self._common_field_schema(name, field_info, decorators)
  952. return core_schema.dataclass_field(
  953. name,
  954. common_field['schema'],
  955. init=field_info.init,
  956. init_only=field_info.init_var or None,
  957. kw_only=None if field_info.kw_only else False,
  958. serialization_exclude=common_field['serialization_exclude'],
  959. validation_alias=common_field['validation_alias'],
  960. serialization_alias=common_field['serialization_alias'],
  961. frozen=common_field['frozen'],
  962. metadata=common_field['metadata'],
  963. )
  964. @staticmethod
  965. def _apply_alias_generator_to_field_info(
  966. alias_generator: Callable[[str], str] | AliasGenerator, field_info: FieldInfo, field_name: str
  967. ) -> None:
  968. """Apply an alias_generator to aliases on a FieldInfo instance if appropriate.
  969. Args:
  970. alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance.
  971. field_info: The FieldInfo instance to which the alias_generator is (maybe) applied.
  972. field_name: The name of the field from which to generate the alias.
  973. """
  974. # Apply an alias_generator if
  975. # 1. An alias is not specified
  976. # 2. An alias is specified, but the priority is <= 1
  977. if (
  978. field_info.alias_priority is None
  979. or field_info.alias_priority <= 1
  980. or field_info.alias is None
  981. or field_info.validation_alias is None
  982. or field_info.serialization_alias is None
  983. ):
  984. alias, validation_alias, serialization_alias = None, None, None
  985. if isinstance(alias_generator, AliasGenerator):
  986. alias, validation_alias, serialization_alias = alias_generator.generate_aliases(field_name)
  987. elif isinstance(alias_generator, Callable):
  988. alias = alias_generator(field_name)
  989. if not isinstance(alias, str):
  990. raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}')
  991. # if priority is not set, we set to 1
  992. # which supports the case where the alias_generator from a child class is used
  993. # to generate an alias for a field in a parent class
  994. if field_info.alias_priority is None or field_info.alias_priority <= 1:
  995. field_info.alias_priority = 1
  996. # if the priority is 1, then we set the aliases to the generated alias
  997. if field_info.alias_priority == 1:
  998. field_info.serialization_alias = _get_first_non_null(serialization_alias, alias)
  999. field_info.validation_alias = _get_first_non_null(validation_alias, alias)
  1000. field_info.alias = alias
  1001. # if any of the aliases are not set, then we set them to the corresponding generated alias
  1002. if field_info.alias is None:
  1003. field_info.alias = alias
  1004. if field_info.serialization_alias is None:
  1005. field_info.serialization_alias = _get_first_non_null(serialization_alias, alias)
  1006. if field_info.validation_alias is None:
  1007. field_info.validation_alias = _get_first_non_null(validation_alias, alias)
  1008. @staticmethod
  1009. def _apply_alias_generator_to_computed_field_info(
  1010. alias_generator: Callable[[str], str] | AliasGenerator,
  1011. computed_field_info: ComputedFieldInfo,
  1012. computed_field_name: str,
  1013. ):
  1014. """Apply an alias_generator to alias on a ComputedFieldInfo instance if appropriate.
  1015. Args:
  1016. alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance.
  1017. computed_field_info: The ComputedFieldInfo instance to which the alias_generator is (maybe) applied.
  1018. computed_field_name: The name of the computed field from which to generate the alias.
  1019. """
  1020. # Apply an alias_generator if
  1021. # 1. An alias is not specified
  1022. # 2. An alias is specified, but the priority is <= 1
  1023. if (
  1024. computed_field_info.alias_priority is None
  1025. or computed_field_info.alias_priority <= 1
  1026. or computed_field_info.alias is None
  1027. ):
  1028. alias, validation_alias, serialization_alias = None, None, None
  1029. if isinstance(alias_generator, AliasGenerator):
  1030. alias, validation_alias, serialization_alias = alias_generator.generate_aliases(computed_field_name)
  1031. elif isinstance(alias_generator, Callable):
  1032. alias = alias_generator(computed_field_name)
  1033. if not isinstance(alias, str):
  1034. raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}')
  1035. # if priority is not set, we set to 1
  1036. # which supports the case where the alias_generator from a child class is used
  1037. # to generate an alias for a field in a parent class
  1038. if computed_field_info.alias_priority is None or computed_field_info.alias_priority <= 1:
  1039. computed_field_info.alias_priority = 1
  1040. # if the priority is 1, then we set the aliases to the generated alias
  1041. # note that we use the serialization_alias with priority over alias, as computed_field
  1042. # aliases are used for serialization only (not validation)
  1043. if computed_field_info.alias_priority == 1:
  1044. computed_field_info.alias = _get_first_non_null(serialization_alias, alias)
  1045. @staticmethod
  1046. def _apply_field_title_generator_to_field_info(
  1047. config_wrapper: ConfigWrapper, field_info: FieldInfo | ComputedFieldInfo, field_name: str
  1048. ) -> None:
  1049. """Apply a field_title_generator on a FieldInfo or ComputedFieldInfo instance if appropriate
  1050. Args:
  1051. config_wrapper: The config of the model
  1052. field_info: The FieldInfo or ComputedField instance to which the title_generator is (maybe) applied.
  1053. field_name: The name of the field from which to generate the title.
  1054. """
  1055. field_title_generator = field_info.field_title_generator or config_wrapper.field_title_generator
  1056. if field_title_generator is None:
  1057. return
  1058. if field_info.title is None:
  1059. title = field_title_generator(field_name, field_info) # type: ignore
  1060. if not isinstance(title, str):
  1061. raise TypeError(f'field_title_generator {field_title_generator} must return str, not {title.__class__}')
  1062. field_info.title = title
  1063. def _common_field_schema( # C901
  1064. self, name: str, field_info: FieldInfo, decorators: DecoratorInfos
  1065. ) -> _CommonField:
  1066. # Update FieldInfo annotation if appropriate:
  1067. FieldInfo = import_cached_field_info()
  1068. if not field_info.evaluated:
  1069. # TODO Can we use field_info.apply_typevars_map here?
  1070. try:
  1071. evaluated_type = _typing_extra.eval_type(field_info.annotation, *self._types_namespace)
  1072. except NameError as e:
  1073. raise PydanticUndefinedAnnotation.from_name_error(e) from e
  1074. evaluated_type = replace_types(evaluated_type, self._typevars_map)
  1075. field_info.evaluated = True
  1076. if not has_instance_in_type(evaluated_type, PydanticRecursiveRef):
  1077. new_field_info = FieldInfo.from_annotation(evaluated_type)
  1078. field_info.annotation = new_field_info.annotation
  1079. # Handle any field info attributes that may have been obtained from now-resolved annotations
  1080. for k, v in new_field_info._attributes_set.items():
  1081. # If an attribute is already set, it means it was set by assigning to a call to Field (or just a
  1082. # default value), and that should take the highest priority. So don't overwrite existing attributes.
  1083. # We skip over "attributes" that are present in the metadata_lookup dict because these won't
  1084. # actually end up as attributes of the `FieldInfo` instance.
  1085. if k not in field_info._attributes_set and k not in field_info.metadata_lookup:
  1086. setattr(field_info, k, v)
  1087. # Finally, ensure the field info also reflects all the `_attributes_set` that are actually metadata.
  1088. field_info.metadata = [*new_field_info.metadata, *field_info.metadata]
  1089. source_type, annotations = field_info.annotation, field_info.metadata
  1090. def set_discriminator(schema: CoreSchema) -> CoreSchema:
  1091. schema = self._apply_discriminator_to_union(schema, field_info.discriminator)
  1092. return schema
  1093. # Convert `@field_validator` decorators to `Before/After/Plain/WrapValidator` instances:
  1094. validators_from_decorators = []
  1095. for decorator in filter_field_decorator_info_by_field(decorators.field_validators.values(), name):
  1096. validators_from_decorators.append(_mode_to_validator[decorator.info.mode]._from_decorator(decorator))
  1097. with self.field_name_stack.push(name):
  1098. if field_info.discriminator is not None:
  1099. schema = self._apply_annotations(
  1100. source_type, annotations + validators_from_decorators, transform_inner_schema=set_discriminator
  1101. )
  1102. else:
  1103. schema = self._apply_annotations(
  1104. source_type,
  1105. annotations + validators_from_decorators,
  1106. )
  1107. # This V1 compatibility shim should eventually be removed
  1108. # push down any `each_item=True` validators
  1109. # note that this won't work for any Annotated types that get wrapped by a function validator
  1110. # but that's okay because that didn't exist in V1
  1111. this_field_validators = filter_field_decorator_info_by_field(decorators.validators.values(), name)
  1112. if _validators_require_validate_default(this_field_validators):
  1113. field_info.validate_default = True
  1114. each_item_validators = [v for v in this_field_validators if v.info.each_item is True]
  1115. this_field_validators = [v for v in this_field_validators if v not in each_item_validators]
  1116. schema = apply_each_item_validators(schema, each_item_validators, name)
  1117. schema = apply_validators(schema, this_field_validators, name)
  1118. # the default validator needs to go outside of any other validators
  1119. # so that it is the topmost validator for the field validator
  1120. # which uses it to check if the field has a default value or not
  1121. if not field_info.is_required():
  1122. schema = wrap_default(field_info, schema)
  1123. schema = self._apply_field_serializers(
  1124. schema, filter_field_decorator_info_by_field(decorators.field_serializers.values(), name)
  1125. )
  1126. self._apply_field_title_generator_to_field_info(self._config_wrapper, field_info, name)
  1127. pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(field_info)
  1128. core_metadata: dict[str, Any] = {}
  1129. update_core_metadata(
  1130. core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
  1131. )
  1132. alias_generator = self._config_wrapper.alias_generator
  1133. if alias_generator is not None:
  1134. self._apply_alias_generator_to_field_info(alias_generator, field_info, name)
  1135. if isinstance(field_info.validation_alias, (AliasChoices, AliasPath)):
  1136. validation_alias = field_info.validation_alias.convert_to_aliases()
  1137. else:
  1138. validation_alias = field_info.validation_alias
  1139. return _common_field(
  1140. schema,
  1141. serialization_exclude=True if field_info.exclude else None,
  1142. validation_alias=validation_alias,
  1143. serialization_alias=field_info.serialization_alias,
  1144. frozen=field_info.frozen,
  1145. metadata=core_metadata,
  1146. )
  1147. def _union_schema(self, union_type: Any) -> core_schema.CoreSchema:
  1148. """Generate schema for a Union."""
  1149. args = self._get_args_resolving_forward_refs(union_type, required=True)
  1150. choices: list[CoreSchema] = []
  1151. nullable = False
  1152. for arg in args:
  1153. if arg is None or arg is _typing_extra.NoneType:
  1154. nullable = True
  1155. else:
  1156. choices.append(self.generate_schema(arg))
  1157. if len(choices) == 1:
  1158. s = choices[0]
  1159. else:
  1160. choices_with_tags: list[CoreSchema | tuple[CoreSchema, str]] = []
  1161. for choice in choices:
  1162. tag = choice.get('metadata', {}).get(_core_utils.TAGGED_UNION_TAG_KEY)
  1163. if tag is not None:
  1164. choices_with_tags.append((choice, tag))
  1165. else:
  1166. choices_with_tags.append(choice)
  1167. s = core_schema.union_schema(choices_with_tags)
  1168. if nullable:
  1169. s = core_schema.nullable_schema(s)
  1170. return s
  1171. def _type_alias_type_schema(self, obj: TypeAliasType) -> CoreSchema:
  1172. with self.defs.get_schema_or_ref(obj) as (ref, maybe_schema):
  1173. if maybe_schema is not None:
  1174. return maybe_schema
  1175. origin: TypeAliasType = get_origin(obj) or obj
  1176. typevars_map = get_standard_typevars_map(obj)
  1177. with self._ns_resolver.push(origin):
  1178. try:
  1179. annotation = _typing_extra.eval_type(origin.__value__, *self._types_namespace)
  1180. except NameError as e:
  1181. raise PydanticUndefinedAnnotation.from_name_error(e) from e
  1182. annotation = replace_types(annotation, typevars_map)
  1183. schema = self.generate_schema(annotation)
  1184. assert schema['type'] != 'definitions'
  1185. schema['ref'] = ref # type: ignore
  1186. self.defs.definitions[ref] = schema
  1187. return core_schema.definition_reference_schema(ref)
  1188. def _literal_schema(self, literal_type: Any) -> CoreSchema:
  1189. """Generate schema for a Literal."""
  1190. expected = _typing_extra.literal_values(literal_type)
  1191. assert expected, f'literal "expected" cannot be empty, obj={literal_type}'
  1192. schema = core_schema.literal_schema(expected)
  1193. if self._config_wrapper.use_enum_values and any(isinstance(v, Enum) for v in expected):
  1194. schema = core_schema.no_info_after_validator_function(
  1195. lambda v: v.value if isinstance(v, Enum) else v, schema
  1196. )
  1197. return schema
  1198. def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema:
  1199. """Generate schema for a TypedDict.
  1200. It is not possible to track required/optional keys in TypedDict without __required_keys__
  1201. since TypedDict.__new__ erases the base classes (it replaces them with just `dict`)
  1202. and thus we can track usage of total=True/False
  1203. __required_keys__ was added in Python 3.9
  1204. (https://github.com/miss-islington/cpython/blob/1e9939657dd1f8eb9f596f77c1084d2d351172fc/Doc/library/typing.rst?plain=1#L1546-L1548)
  1205. however it is buggy
  1206. (https://github.com/python/typing_extensions/blob/ac52ac5f2cb0e00e7988bae1e2a1b8257ac88d6d/src/typing_extensions.py#L657-L666).
  1207. On 3.11 but < 3.12 TypedDict does not preserve inheritance information.
  1208. Hence to avoid creating validators that do not do what users expect we only
  1209. support typing.TypedDict on Python >= 3.12 or typing_extension.TypedDict on all versions
  1210. """
  1211. FieldInfo = import_cached_field_info()
  1212. with self.model_type_stack.push(typed_dict_cls), self.defs.get_schema_or_ref(typed_dict_cls) as (
  1213. typed_dict_ref,
  1214. maybe_schema,
  1215. ):
  1216. if maybe_schema is not None:
  1217. return maybe_schema
  1218. typevars_map = get_standard_typevars_map(typed_dict_cls)
  1219. if origin is not None:
  1220. typed_dict_cls = origin
  1221. if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing':
  1222. raise PydanticUserError(
  1223. 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12.',
  1224. code='typed-dict-version',
  1225. )
  1226. try:
  1227. # if a typed dictionary class doesn't have config, we use the parent's config, hence a default of `None`
  1228. # see https://github.com/pydantic/pydantic/issues/10917
  1229. config: ConfigDict | None = get_attribute_from_bases(typed_dict_cls, '__pydantic_config__')
  1230. except AttributeError:
  1231. config = None
  1232. with self._config_wrapper_stack.push(config):
  1233. core_config = self._config_wrapper.core_config(title=typed_dict_cls.__name__)
  1234. required_keys: frozenset[str] = typed_dict_cls.__required_keys__
  1235. fields: dict[str, core_schema.TypedDictField] = {}
  1236. decorators = DecoratorInfos.build(typed_dict_cls)
  1237. if self._config_wrapper.use_attribute_docstrings:
  1238. field_docstrings = extract_docstrings_from_cls(typed_dict_cls, use_inspect=True)
  1239. else:
  1240. field_docstrings = None
  1241. try:
  1242. annotations = _typing_extra.get_cls_type_hints(typed_dict_cls, ns_resolver=self._ns_resolver)
  1243. except NameError as e:
  1244. raise PydanticUndefinedAnnotation.from_name_error(e) from e
  1245. for field_name, annotation in annotations.items():
  1246. annotation = replace_types(annotation, typevars_map)
  1247. required = field_name in required_keys
  1248. if _typing_extra.is_required(annotation):
  1249. required = True
  1250. annotation = self._get_args_resolving_forward_refs(
  1251. annotation,
  1252. required=True,
  1253. )[0]
  1254. elif _typing_extra.is_not_required(annotation):
  1255. required = False
  1256. annotation = self._get_args_resolving_forward_refs(
  1257. annotation,
  1258. required=True,
  1259. )[0]
  1260. field_info = FieldInfo.from_annotation(annotation)
  1261. if (
  1262. field_docstrings is not None
  1263. and field_info.description is None
  1264. and field_name in field_docstrings
  1265. ):
  1266. field_info.description = field_docstrings[field_name]
  1267. self._apply_field_title_generator_to_field_info(self._config_wrapper, field_info, field_name)
  1268. fields[field_name] = self._generate_td_field_schema(
  1269. field_name, field_info, decorators, required=required
  1270. )
  1271. td_schema = core_schema.typed_dict_schema(
  1272. fields,
  1273. cls=typed_dict_cls,
  1274. computed_fields=[
  1275. self._computed_field_schema(d, decorators.field_serializers)
  1276. for d in decorators.computed_fields.values()
  1277. ],
  1278. ref=typed_dict_ref,
  1279. config=core_config,
  1280. )
  1281. schema = self._apply_model_serializers(td_schema, decorators.model_serializers.values())
  1282. schema = apply_model_validators(schema, decorators.model_validators.values(), 'all')
  1283. self.defs.definitions[typed_dict_ref] = schema
  1284. return core_schema.definition_reference_schema(typed_dict_ref)
  1285. def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema:
  1286. """Generate schema for a NamedTuple."""
  1287. with self.model_type_stack.push(namedtuple_cls), self.defs.get_schema_or_ref(namedtuple_cls) as (
  1288. namedtuple_ref,
  1289. maybe_schema,
  1290. ):
  1291. if maybe_schema is not None:
  1292. return maybe_schema
  1293. typevars_map = get_standard_typevars_map(namedtuple_cls)
  1294. if origin is not None:
  1295. namedtuple_cls = origin
  1296. try:
  1297. annotations = _typing_extra.get_cls_type_hints(namedtuple_cls, ns_resolver=self._ns_resolver)
  1298. except NameError as e:
  1299. raise PydanticUndefinedAnnotation.from_name_error(e) from e
  1300. if not annotations:
  1301. # annotations is empty, happens if namedtuple_cls defined via collections.namedtuple(...)
  1302. annotations: dict[str, Any] = {k: Any for k in namedtuple_cls._fields}
  1303. if typevars_map:
  1304. annotations = {
  1305. field_name: replace_types(annotation, typevars_map)
  1306. for field_name, annotation in annotations.items()
  1307. }
  1308. arguments_schema = core_schema.arguments_schema(
  1309. [
  1310. self._generate_parameter_schema(
  1311. field_name,
  1312. annotation,
  1313. default=namedtuple_cls._field_defaults.get(field_name, Parameter.empty),
  1314. )
  1315. for field_name, annotation in annotations.items()
  1316. ],
  1317. metadata={'pydantic_js_prefer_positional_arguments': True},
  1318. )
  1319. return core_schema.call_schema(arguments_schema, namedtuple_cls, ref=namedtuple_ref)
  1320. def _generate_parameter_schema(
  1321. self,
  1322. name: str,
  1323. annotation: type[Any],
  1324. default: Any = Parameter.empty,
  1325. mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None,
  1326. ) -> core_schema.ArgumentsParameter:
  1327. """Prepare a ArgumentsParameter to represent a field in a namedtuple or function signature."""
  1328. FieldInfo = import_cached_field_info()
  1329. if default is Parameter.empty:
  1330. field = FieldInfo.from_annotation(annotation)
  1331. else:
  1332. field = FieldInfo.from_annotated_attribute(annotation, default)
  1333. assert field.annotation is not None, 'field.annotation should not be None when generating a schema'
  1334. with self.field_name_stack.push(name):
  1335. schema = self._apply_annotations(field.annotation, [field])
  1336. if not field.is_required():
  1337. schema = wrap_default(field, schema)
  1338. parameter_schema = core_schema.arguments_parameter(name, schema)
  1339. if mode is not None:
  1340. parameter_schema['mode'] = mode
  1341. if field.alias is not None:
  1342. parameter_schema['alias'] = field.alias
  1343. else:
  1344. alias_generator = self._config_wrapper.alias_generator
  1345. if isinstance(alias_generator, AliasGenerator) and alias_generator.alias is not None:
  1346. parameter_schema['alias'] = alias_generator.alias(name)
  1347. elif isinstance(alias_generator, Callable):
  1348. parameter_schema['alias'] = alias_generator(name)
  1349. return parameter_schema
  1350. def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema:
  1351. """Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`."""
  1352. # TODO: do we really need to resolve type vars here?
  1353. typevars_map = get_standard_typevars_map(tuple_type)
  1354. params = self._get_args_resolving_forward_refs(tuple_type)
  1355. if typevars_map and params:
  1356. params = tuple(replace_types(param, typevars_map) for param in params)
  1357. # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)`
  1358. # This is only true for <3.11, on Python 3.11+ `typing.Tuple[()]` gives `params=()`
  1359. if not params:
  1360. if tuple_type in TUPLE_TYPES:
  1361. return core_schema.tuple_schema([core_schema.any_schema()], variadic_item_index=0)
  1362. else:
  1363. # special case for `tuple[()]` which means `tuple[]` - an empty tuple
  1364. return core_schema.tuple_schema([])
  1365. elif params[-1] is Ellipsis:
  1366. if len(params) == 2:
  1367. return core_schema.tuple_schema([self.generate_schema(params[0])], variadic_item_index=0)
  1368. else:
  1369. # TODO: something like https://github.com/pydantic/pydantic/issues/5952
  1370. raise ValueError('Variable tuples can only have one type')
  1371. elif len(params) == 1 and params[0] == ():
  1372. # special case for `Tuple[()]` which means `Tuple[]` - an empty tuple
  1373. # NOTE: This conditional can be removed when we drop support for Python 3.10.
  1374. return core_schema.tuple_schema([])
  1375. else:
  1376. return core_schema.tuple_schema([self.generate_schema(param) for param in params])
  1377. def _type_schema(self) -> core_schema.CoreSchema:
  1378. return core_schema.custom_error_schema(
  1379. core_schema.is_instance_schema(type),
  1380. custom_error_type='is_type',
  1381. custom_error_message='Input should be a type',
  1382. )
  1383. def _zoneinfo_schema(self) -> core_schema.CoreSchema:
  1384. """Generate schema for a zone_info.ZoneInfo object"""
  1385. # we're def >=py3.9 if ZoneInfo was included in input
  1386. if sys.version_info < (3, 9):
  1387. assert False, 'Unreachable'
  1388. # import in this path is safe
  1389. from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
  1390. def validate_str_is_valid_iana_tz(value: Any, /) -> ZoneInfo:
  1391. if isinstance(value, ZoneInfo):
  1392. return value
  1393. try:
  1394. return ZoneInfo(value)
  1395. except (ZoneInfoNotFoundError, ValueError, TypeError):
  1396. raise PydanticCustomError('zoneinfo_str', 'invalid timezone: {value}', {'value': value})
  1397. metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'zoneinfo'}]}
  1398. return core_schema.no_info_plain_validator_function(
  1399. validate_str_is_valid_iana_tz,
  1400. serialization=core_schema.to_string_ser_schema(),
  1401. metadata=metadata,
  1402. )
  1403. def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema:
  1404. """Generate schema for `Type[Union[X, ...]]`."""
  1405. args = self._get_args_resolving_forward_refs(union_type, required=True)
  1406. return core_schema.union_schema([self.generate_schema(typing.Type[args]) for args in args])
  1407. def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema:
  1408. """Generate schema for a Type, e.g. `Type[int]`."""
  1409. type_param = self._get_first_arg_or_any(type_)
  1410. # Assume `type[Annotated[<typ>, ...]]` is equivalent to `type[<typ>]`:
  1411. type_param = _typing_extra.annotated_type(type_param) or type_param
  1412. if _typing_extra.is_any(type_param):
  1413. return self._type_schema()
  1414. elif _typing_extra.is_type_alias_type(type_param):
  1415. return self.generate_schema(typing.Type[type_param.__value__])
  1416. elif isinstance(type_param, typing.TypeVar):
  1417. if type_param.__bound__:
  1418. if _typing_extra.origin_is_union(get_origin(type_param.__bound__)):
  1419. return self._union_is_subclass_schema(type_param.__bound__)
  1420. return core_schema.is_subclass_schema(type_param.__bound__)
  1421. elif type_param.__constraints__:
  1422. return core_schema.union_schema(
  1423. [self.generate_schema(typing.Type[c]) for c in type_param.__constraints__]
  1424. )
  1425. else:
  1426. return self._type_schema()
  1427. elif _typing_extra.origin_is_union(get_origin(type_param)):
  1428. return self._union_is_subclass_schema(type_param)
  1429. else:
  1430. if _typing_extra.is_self(type_param):
  1431. type_param = self._resolve_self_type(type_param)
  1432. if not inspect.isclass(type_param):
  1433. raise TypeError(f'Expected a class, got {type_param!r}')
  1434. return core_schema.is_subclass_schema(type_param)
  1435. def _sequence_schema(self, items_type: Any) -> core_schema.CoreSchema:
  1436. """Generate schema for a Sequence, e.g. `Sequence[int]`."""
  1437. from ._serializers import serialize_sequence_via_list
  1438. item_type_schema = self.generate_schema(items_type)
  1439. list_schema = core_schema.list_schema(item_type_schema)
  1440. json_schema = smart_deepcopy(list_schema)
  1441. python_schema = core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence')
  1442. if not _typing_extra.is_any(items_type):
  1443. from ._validators import sequence_validator
  1444. python_schema = core_schema.chain_schema(
  1445. [python_schema, core_schema.no_info_wrap_validator_function(sequence_validator, list_schema)],
  1446. )
  1447. serialization = core_schema.wrap_serializer_function_ser_schema(
  1448. serialize_sequence_via_list, schema=item_type_schema, info_arg=True
  1449. )
  1450. return core_schema.json_or_python_schema(
  1451. json_schema=json_schema, python_schema=python_schema, serialization=serialization
  1452. )
  1453. def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema:
  1454. """Generate a schema for an `Iterable`."""
  1455. item_type = self._get_first_arg_or_any(type_)
  1456. return core_schema.generator_schema(self.generate_schema(item_type))
  1457. def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema:
  1458. from . import _validators
  1459. metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'regex'}]}
  1460. ser = core_schema.plain_serializer_function_ser_schema(
  1461. attrgetter('pattern'), when_used='json', return_schema=core_schema.str_schema()
  1462. )
  1463. if pattern_type is typing.Pattern or pattern_type is re.Pattern:
  1464. # bare type
  1465. return core_schema.no_info_plain_validator_function(
  1466. _validators.pattern_either_validator, serialization=ser, metadata=metadata
  1467. )
  1468. param = self._get_args_resolving_forward_refs(
  1469. pattern_type,
  1470. required=True,
  1471. )[0]
  1472. if param is str:
  1473. return core_schema.no_info_plain_validator_function(
  1474. _validators.pattern_str_validator, serialization=ser, metadata=metadata
  1475. )
  1476. elif param is bytes:
  1477. return core_schema.no_info_plain_validator_function(
  1478. _validators.pattern_bytes_validator, serialization=ser, metadata=metadata
  1479. )
  1480. else:
  1481. raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.')
  1482. def _hashable_schema(self) -> core_schema.CoreSchema:
  1483. return core_schema.custom_error_schema(
  1484. schema=core_schema.json_or_python_schema(
  1485. json_schema=core_schema.chain_schema(
  1486. [core_schema.any_schema(), core_schema.is_instance_schema(collections.abc.Hashable)]
  1487. ),
  1488. python_schema=core_schema.is_instance_schema(collections.abc.Hashable),
  1489. ),
  1490. custom_error_type='is_hashable',
  1491. custom_error_message='Input should be hashable',
  1492. )
  1493. def _dataclass_schema(
  1494. self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None
  1495. ) -> core_schema.CoreSchema:
  1496. """Generate schema for a dataclass."""
  1497. with self.model_type_stack.push(dataclass), self.defs.get_schema_or_ref(dataclass) as (
  1498. dataclass_ref,
  1499. maybe_schema,
  1500. ):
  1501. if maybe_schema is not None:
  1502. return maybe_schema
  1503. typevars_map = get_standard_typevars_map(dataclass)
  1504. if origin is not None:
  1505. dataclass = origin
  1506. # if (plain) dataclass doesn't have config, we use the parent's config, hence a default of `None`
  1507. # (Pydantic dataclasses have an empty dict config by default).
  1508. # see https://github.com/pydantic/pydantic/issues/10917
  1509. config = getattr(dataclass, '__pydantic_config__', None)
  1510. from ..dataclasses import is_pydantic_dataclass
  1511. with self._ns_resolver.push(dataclass), self._config_wrapper_stack.push(config):
  1512. if is_pydantic_dataclass(dataclass):
  1513. fields = deepcopy(dataclass.__pydantic_fields__)
  1514. if typevars_map:
  1515. for field in fields.values():
  1516. field.apply_typevars_map(typevars_map, *self._types_namespace)
  1517. else:
  1518. fields = collect_dataclass_fields(
  1519. dataclass,
  1520. typevars_map=typevars_map,
  1521. )
  1522. if self._config_wrapper.extra == 'allow':
  1523. # disallow combination of init=False on a dataclass field and extra='allow' on a dataclass
  1524. for field_name, field in fields.items():
  1525. if field.init is False:
  1526. raise PydanticUserError(
  1527. f'Field {field_name} has `init=False` and dataclass has config setting `extra="allow"`. '
  1528. f'This combination is not allowed.',
  1529. code='dataclass-init-false-extra-allow',
  1530. )
  1531. decorators = dataclass.__dict__.get('__pydantic_decorators__') or DecoratorInfos.build(dataclass)
  1532. # Move kw_only=False args to the start of the list, as this is how vanilla dataclasses work.
  1533. # Note that when kw_only is missing or None, it is treated as equivalent to kw_only=True
  1534. args = sorted(
  1535. (self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()),
  1536. key=lambda a: a.get('kw_only') is not False,
  1537. )
  1538. has_post_init = hasattr(dataclass, '__post_init__')
  1539. has_slots = hasattr(dataclass, '__slots__')
  1540. args_schema = core_schema.dataclass_args_schema(
  1541. dataclass.__name__,
  1542. args,
  1543. computed_fields=[
  1544. self._computed_field_schema(d, decorators.field_serializers)
  1545. for d in decorators.computed_fields.values()
  1546. ],
  1547. collect_init_only=has_post_init,
  1548. )
  1549. inner_schema = apply_validators(args_schema, decorators.root_validators.values(), None)
  1550. model_validators = decorators.model_validators.values()
  1551. inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
  1552. core_config = self._config_wrapper.core_config(title=dataclass.__name__)
  1553. dc_schema = core_schema.dataclass_schema(
  1554. dataclass,
  1555. inner_schema,
  1556. generic_origin=origin,
  1557. post_init=has_post_init,
  1558. ref=dataclass_ref,
  1559. fields=[field.name for field in dataclasses.fields(dataclass)],
  1560. slots=has_slots,
  1561. config=core_config,
  1562. # we don't use a custom __setattr__ for dataclasses, so we must
  1563. # pass along the frozen config setting to the pydantic-core schema
  1564. frozen=self._config_wrapper_stack.tail.frozen,
  1565. )
  1566. schema = self._apply_model_serializers(dc_schema, decorators.model_serializers.values())
  1567. schema = apply_model_validators(schema, model_validators, 'outer')
  1568. self.defs.definitions[dataclass_ref] = schema
  1569. return core_schema.definition_reference_schema(dataclass_ref)
  1570. def _call_schema(self, function: ValidateCallSupportedTypes) -> core_schema.CallSchema:
  1571. """Generate schema for a Callable.
  1572. TODO support functional validators once we support them in Config
  1573. """
  1574. sig = signature(function)
  1575. globalns, localns = self._types_namespace
  1576. type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns)
  1577. mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = {
  1578. Parameter.POSITIONAL_ONLY: 'positional_only',
  1579. Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
  1580. Parameter.KEYWORD_ONLY: 'keyword_only',
  1581. }
  1582. arguments_list: list[core_schema.ArgumentsParameter] = []
  1583. var_args_schema: core_schema.CoreSchema | None = None
  1584. var_kwargs_schema: core_schema.CoreSchema | None = None
  1585. var_kwargs_mode: core_schema.VarKwargsMode | None = None
  1586. for name, p in sig.parameters.items():
  1587. if p.annotation is sig.empty:
  1588. annotation = typing.cast(Any, Any)
  1589. else:
  1590. annotation = type_hints[name]
  1591. parameter_mode = mode_lookup.get(p.kind)
  1592. if parameter_mode is not None:
  1593. arg_schema = self._generate_parameter_schema(name, annotation, p.default, parameter_mode)
  1594. arguments_list.append(arg_schema)
  1595. elif p.kind == Parameter.VAR_POSITIONAL:
  1596. var_args_schema = self.generate_schema(annotation)
  1597. else:
  1598. assert p.kind == Parameter.VAR_KEYWORD, p.kind
  1599. unpack_type = _typing_extra.unpack_type(annotation)
  1600. if unpack_type is not None:
  1601. if not is_typeddict(unpack_type):
  1602. raise PydanticUserError(
  1603. f'Expected a `TypedDict` class, got {unpack_type.__name__!r}', code='unpack-typed-dict'
  1604. )
  1605. non_pos_only_param_names = {
  1606. name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
  1607. }
  1608. overlapping_params = non_pos_only_param_names.intersection(unpack_type.__annotations__)
  1609. if overlapping_params:
  1610. raise PydanticUserError(
  1611. f'Typed dictionary {unpack_type.__name__!r} overlaps with parameter'
  1612. f"{'s' if len(overlapping_params) >= 2 else ''} "
  1613. f"{', '.join(repr(p) for p in sorted(overlapping_params))}",
  1614. code='overlapping-unpack-typed-dict',
  1615. )
  1616. var_kwargs_mode = 'unpacked-typed-dict'
  1617. var_kwargs_schema = self._typed_dict_schema(unpack_type, None)
  1618. else:
  1619. var_kwargs_mode = 'uniform'
  1620. var_kwargs_schema = self.generate_schema(annotation)
  1621. return_schema: core_schema.CoreSchema | None = None
  1622. config_wrapper = self._config_wrapper
  1623. if config_wrapper.validate_return:
  1624. return_hint = sig.return_annotation
  1625. if return_hint is not sig.empty:
  1626. return_schema = self.generate_schema(return_hint)
  1627. return core_schema.call_schema(
  1628. core_schema.arguments_schema(
  1629. arguments_list,
  1630. var_args_schema=var_args_schema,
  1631. var_kwargs_mode=var_kwargs_mode,
  1632. var_kwargs_schema=var_kwargs_schema,
  1633. populate_by_name=config_wrapper.populate_by_name,
  1634. ),
  1635. function,
  1636. return_schema=return_schema,
  1637. )
  1638. def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema:
  1639. assert isinstance(typevar, typing.TypeVar)
  1640. bound = typevar.__bound__
  1641. constraints = typevar.__constraints__
  1642. try:
  1643. typevar_has_default = typevar.has_default() # type: ignore
  1644. except AttributeError:
  1645. # could still have a default if it's an old version of typing_extensions.TypeVar
  1646. typevar_has_default = getattr(typevar, '__default__', None) is not None
  1647. if (bound is not None) + (len(constraints) != 0) + typevar_has_default > 1:
  1648. raise NotImplementedError(
  1649. 'Pydantic does not support mixing more than one of TypeVar bounds, constraints and defaults'
  1650. )
  1651. if typevar_has_default:
  1652. return self.generate_schema(typevar.__default__) # type: ignore
  1653. elif constraints:
  1654. return self._union_schema(typing.Union[constraints]) # type: ignore
  1655. elif bound:
  1656. schema = self.generate_schema(bound)
  1657. schema['serialization'] = core_schema.wrap_serializer_function_ser_schema(
  1658. lambda x, h: h(x), schema=core_schema.any_schema()
  1659. )
  1660. return schema
  1661. else:
  1662. return core_schema.any_schema()
  1663. def _computed_field_schema(
  1664. self,
  1665. d: Decorator[ComputedFieldInfo],
  1666. field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]],
  1667. ) -> core_schema.ComputedField:
  1668. try:
  1669. # Do not pass in globals as the function could be defined in a different module.
  1670. # Instead, let `get_function_return_type` infer the globals to use, but still pass
  1671. # in locals that may contain a parent/rebuild namespace:
  1672. return_type = _decorators.get_function_return_type(
  1673. d.func, d.info.return_type, localns=self._types_namespace.locals
  1674. )
  1675. except NameError as e:
  1676. raise PydanticUndefinedAnnotation.from_name_error(e) from e
  1677. if return_type is PydanticUndefined:
  1678. raise PydanticUserError(
  1679. 'Computed field is missing return type annotation or specifying `return_type`'
  1680. ' to the `@computed_field` decorator (e.g. `@computed_field(return_type=int|str)`)',
  1681. code='model-field-missing-annotation',
  1682. )
  1683. return_type = replace_types(return_type, self._typevars_map)
  1684. # Create a new ComputedFieldInfo so that different type parametrizations of the same
  1685. # generic model's computed field can have different return types.
  1686. d.info = dataclasses.replace(d.info, return_type=return_type)
  1687. return_type_schema = self.generate_schema(return_type)
  1688. # Apply serializers to computed field if there exist
  1689. return_type_schema = self._apply_field_serializers(
  1690. return_type_schema,
  1691. filter_field_decorator_info_by_field(field_serializers.values(), d.cls_var_name),
  1692. )
  1693. alias_generator = self._config_wrapper.alias_generator
  1694. if alias_generator is not None:
  1695. self._apply_alias_generator_to_computed_field_info(
  1696. alias_generator=alias_generator, computed_field_info=d.info, computed_field_name=d.cls_var_name
  1697. )
  1698. self._apply_field_title_generator_to_field_info(self._config_wrapper, d.info, d.cls_var_name)
  1699. pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(d.info)
  1700. core_metadata: dict[str, Any] = {}
  1701. update_core_metadata(
  1702. core_metadata,
  1703. pydantic_js_updates={'readOnly': True, **(pydantic_js_updates if pydantic_js_updates else {})},
  1704. pydantic_js_extra=pydantic_js_extra,
  1705. )
  1706. return core_schema.computed_field(
  1707. d.cls_var_name, return_schema=return_type_schema, alias=d.info.alias, metadata=core_metadata
  1708. )
  1709. def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema:
  1710. """Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`."""
  1711. FieldInfo = import_cached_field_info()
  1712. source_type, *annotations = self._get_args_resolving_forward_refs(
  1713. annotated_type,
  1714. required=True,
  1715. )
  1716. schema = self._apply_annotations(source_type, annotations)
  1717. # put the default validator last so that TypeAdapter.get_default_value() works
  1718. # even if there are function validators involved
  1719. for annotation in annotations:
  1720. if isinstance(annotation, FieldInfo):
  1721. schema = wrap_default(annotation, schema)
  1722. return schema
  1723. def _get_prepare_pydantic_annotations_for_known_type(
  1724. self, obj: Any, annotations: tuple[Any, ...]
  1725. ) -> tuple[Any, list[Any]] | None:
  1726. from ._std_types_schema import (
  1727. deque_schema_prepare_pydantic_annotations,
  1728. mapping_like_prepare_pydantic_annotations,
  1729. path_schema_prepare_pydantic_annotations,
  1730. )
  1731. # Check for hashability
  1732. try:
  1733. hash(obj)
  1734. except TypeError:
  1735. # obj is definitely not a known type if this fails
  1736. return None
  1737. # TODO: I'd rather we didn't handle the generic nature in the annotations prep, but the same way we do other
  1738. # generic types like list[str] via _match_generic_type, but I'm not sure if we can do that because this is
  1739. # not always called from match_type, but sometimes from _apply_annotations
  1740. obj_origin = get_origin(obj) or obj
  1741. if obj_origin in PATH_TYPES:
  1742. return path_schema_prepare_pydantic_annotations(obj, annotations)
  1743. elif obj_origin in DEQUE_TYPES:
  1744. return deque_schema_prepare_pydantic_annotations(obj, annotations)
  1745. elif obj_origin in MAPPING_TYPES:
  1746. return mapping_like_prepare_pydantic_annotations(obj, annotations)
  1747. else:
  1748. return None
  1749. def _apply_annotations(
  1750. self,
  1751. source_type: Any,
  1752. annotations: list[Any],
  1753. transform_inner_schema: Callable[[CoreSchema], CoreSchema] = lambda x: x,
  1754. ) -> CoreSchema:
  1755. """Apply arguments from `Annotated` or from `FieldInfo` to a schema.
  1756. This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does
  1757. not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that
  1758. (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it).
  1759. """
  1760. annotations = list(_known_annotated_metadata.expand_grouped_metadata(annotations))
  1761. res = self._get_prepare_pydantic_annotations_for_known_type(source_type, tuple(annotations))
  1762. if res is not None:
  1763. source_type, annotations = res
  1764. pydantic_js_annotation_functions: list[GetJsonSchemaFunction] = []
  1765. def inner_handler(obj: Any) -> CoreSchema:
  1766. from_property = self._generate_schema_from_property(obj, source_type)
  1767. if from_property is None:
  1768. schema = self._generate_schema_inner(obj)
  1769. else:
  1770. schema = from_property
  1771. metadata_js_function = _extract_get_pydantic_json_schema(obj, schema)
  1772. if metadata_js_function is not None:
  1773. metadata_schema = resolve_original_schema(schema, self.defs.definitions)
  1774. if metadata_schema is not None:
  1775. self._add_js_function(metadata_schema, metadata_js_function)
  1776. return transform_inner_schema(schema)
  1777. get_inner_schema = CallbackGetCoreSchemaHandler(inner_handler, self)
  1778. for annotation in annotations:
  1779. if annotation is None:
  1780. continue
  1781. get_inner_schema = self._get_wrapped_inner_schema(
  1782. get_inner_schema, annotation, pydantic_js_annotation_functions
  1783. )
  1784. schema = get_inner_schema(source_type)
  1785. if pydantic_js_annotation_functions:
  1786. core_metadata = schema.setdefault('metadata', {})
  1787. update_core_metadata(core_metadata, pydantic_js_annotation_functions=pydantic_js_annotation_functions)
  1788. return _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, source_type, schema)
  1789. def _apply_single_annotation(self, schema: core_schema.CoreSchema, metadata: Any) -> core_schema.CoreSchema:
  1790. FieldInfo = import_cached_field_info()
  1791. if isinstance(metadata, FieldInfo):
  1792. for field_metadata in metadata.metadata:
  1793. schema = self._apply_single_annotation(schema, field_metadata)
  1794. if metadata.discriminator is not None:
  1795. schema = self._apply_discriminator_to_union(schema, metadata.discriminator)
  1796. return schema
  1797. if schema['type'] == 'nullable':
  1798. # for nullable schemas, metadata is automatically applied to the inner schema
  1799. inner = schema.get('schema', core_schema.any_schema())
  1800. inner = self._apply_single_annotation(inner, metadata)
  1801. if inner:
  1802. schema['schema'] = inner
  1803. return schema
  1804. original_schema = schema
  1805. ref = schema.get('ref', None)
  1806. if ref is not None:
  1807. schema = schema.copy()
  1808. new_ref = ref + f'_{repr(metadata)}'
  1809. if new_ref in self.defs.definitions:
  1810. return self.defs.definitions[new_ref]
  1811. schema['ref'] = new_ref # type: ignore
  1812. elif schema['type'] == 'definition-ref':
  1813. ref = schema['schema_ref']
  1814. if ref in self.defs.definitions:
  1815. schema = self.defs.definitions[ref].copy()
  1816. new_ref = ref + f'_{repr(metadata)}'
  1817. if new_ref in self.defs.definitions:
  1818. return self.defs.definitions[new_ref]
  1819. schema['ref'] = new_ref # type: ignore
  1820. maybe_updated_schema = _known_annotated_metadata.apply_known_metadata(metadata, schema.copy())
  1821. if maybe_updated_schema is not None:
  1822. return maybe_updated_schema
  1823. return original_schema
  1824. def _apply_single_annotation_json_schema(
  1825. self, schema: core_schema.CoreSchema, metadata: Any
  1826. ) -> core_schema.CoreSchema:
  1827. FieldInfo = import_cached_field_info()
  1828. if isinstance(metadata, FieldInfo):
  1829. for field_metadata in metadata.metadata:
  1830. schema = self._apply_single_annotation_json_schema(schema, field_metadata)
  1831. pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(metadata)
  1832. core_metadata = schema.setdefault('metadata', {})
  1833. update_core_metadata(
  1834. core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
  1835. )
  1836. return schema
  1837. def _get_wrapped_inner_schema(
  1838. self,
  1839. get_inner_schema: GetCoreSchemaHandler,
  1840. annotation: Any,
  1841. pydantic_js_annotation_functions: list[GetJsonSchemaFunction],
  1842. ) -> CallbackGetCoreSchemaHandler:
  1843. metadata_get_schema: GetCoreSchemaFunction = getattr(annotation, '__get_pydantic_core_schema__', None) or (
  1844. lambda source, handler: handler(source)
  1845. )
  1846. def new_handler(source: Any) -> core_schema.CoreSchema:
  1847. schema = metadata_get_schema(source, get_inner_schema)
  1848. schema = self._apply_single_annotation(schema, annotation)
  1849. schema = self._apply_single_annotation_json_schema(schema, annotation)
  1850. metadata_js_function = _extract_get_pydantic_json_schema(annotation, schema)
  1851. if metadata_js_function is not None:
  1852. pydantic_js_annotation_functions.append(metadata_js_function)
  1853. return schema
  1854. return CallbackGetCoreSchemaHandler(new_handler, self)
  1855. def _apply_field_serializers(
  1856. self,
  1857. schema: core_schema.CoreSchema,
  1858. serializers: list[Decorator[FieldSerializerDecoratorInfo]],
  1859. ) -> core_schema.CoreSchema:
  1860. """Apply field serializers to a schema."""
  1861. if serializers:
  1862. schema = copy(schema)
  1863. if schema['type'] == 'definitions':
  1864. inner_schema = schema['schema']
  1865. schema['schema'] = self._apply_field_serializers(inner_schema, serializers)
  1866. return schema
  1867. else:
  1868. ref = typing.cast('str|None', schema.get('ref', None))
  1869. if ref is not None:
  1870. self.defs.definitions[ref] = schema
  1871. schema = core_schema.definition_reference_schema(ref)
  1872. # use the last serializer to make it easy to override a serializer set on a parent model
  1873. serializer = serializers[-1]
  1874. is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode)
  1875. try:
  1876. # Do not pass in globals as the function could be defined in a different module.
  1877. # Instead, let `get_function_return_type` infer the globals to use, but still pass
  1878. # in locals that may contain a parent/rebuild namespace:
  1879. return_type = _decorators.get_function_return_type(
  1880. serializer.func, serializer.info.return_type, localns=self._types_namespace.locals
  1881. )
  1882. except NameError as e:
  1883. raise PydanticUndefinedAnnotation.from_name_error(e) from e
  1884. if return_type is PydanticUndefined:
  1885. return_schema = None
  1886. else:
  1887. return_schema = self.generate_schema(return_type)
  1888. if serializer.info.mode == 'wrap':
  1889. schema['serialization'] = core_schema.wrap_serializer_function_ser_schema(
  1890. serializer.func,
  1891. is_field_serializer=is_field_serializer,
  1892. info_arg=info_arg,
  1893. return_schema=return_schema,
  1894. when_used=serializer.info.when_used,
  1895. )
  1896. else:
  1897. assert serializer.info.mode == 'plain'
  1898. schema['serialization'] = core_schema.plain_serializer_function_ser_schema(
  1899. serializer.func,
  1900. is_field_serializer=is_field_serializer,
  1901. info_arg=info_arg,
  1902. return_schema=return_schema,
  1903. when_used=serializer.info.when_used,
  1904. )
  1905. return schema
  1906. def _apply_model_serializers(
  1907. self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]]
  1908. ) -> core_schema.CoreSchema:
  1909. """Apply model serializers to a schema."""
  1910. ref: str | None = schema.pop('ref', None) # type: ignore
  1911. if serializers:
  1912. serializer = list(serializers)[-1]
  1913. info_arg = inspect_model_serializer(serializer.func, serializer.info.mode)
  1914. try:
  1915. # Do not pass in globals as the function could be defined in a different module.
  1916. # Instead, let `get_function_return_type` infer the globals to use, but still pass
  1917. # in locals that may contain a parent/rebuild namespace:
  1918. return_type = _decorators.get_function_return_type(
  1919. serializer.func, serializer.info.return_type, localns=self._types_namespace.locals
  1920. )
  1921. except NameError as e:
  1922. raise PydanticUndefinedAnnotation.from_name_error(e) from e
  1923. if return_type is PydanticUndefined:
  1924. return_schema = None
  1925. else:
  1926. return_schema = self.generate_schema(return_type)
  1927. if serializer.info.mode == 'wrap':
  1928. ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema(
  1929. serializer.func,
  1930. info_arg=info_arg,
  1931. return_schema=return_schema,
  1932. when_used=serializer.info.when_used,
  1933. )
  1934. else:
  1935. # plain
  1936. ser_schema = core_schema.plain_serializer_function_ser_schema(
  1937. serializer.func,
  1938. info_arg=info_arg,
  1939. return_schema=return_schema,
  1940. when_used=serializer.info.when_used,
  1941. )
  1942. schema['serialization'] = ser_schema
  1943. if ref:
  1944. schema['ref'] = ref # type: ignore
  1945. return schema
  1946. _VALIDATOR_F_MATCH: Mapping[
  1947. tuple[FieldValidatorModes, Literal['no-info', 'with-info']],
  1948. Callable[[Callable[..., Any], core_schema.CoreSchema, str | None], core_schema.CoreSchema],
  1949. ] = {
  1950. ('before', 'no-info'): lambda f, schema, _: core_schema.no_info_before_validator_function(f, schema),
  1951. ('after', 'no-info'): lambda f, schema, _: core_schema.no_info_after_validator_function(f, schema),
  1952. ('plain', 'no-info'): lambda f, _1, _2: core_schema.no_info_plain_validator_function(f),
  1953. ('wrap', 'no-info'): lambda f, schema, _: core_schema.no_info_wrap_validator_function(f, schema),
  1954. ('before', 'with-info'): lambda f, schema, field_name: core_schema.with_info_before_validator_function(
  1955. f, schema, field_name=field_name
  1956. ),
  1957. ('after', 'with-info'): lambda f, schema, field_name: core_schema.with_info_after_validator_function(
  1958. f, schema, field_name=field_name
  1959. ),
  1960. ('plain', 'with-info'): lambda f, _, field_name: core_schema.with_info_plain_validator_function(
  1961. f, field_name=field_name
  1962. ),
  1963. ('wrap', 'with-info'): lambda f, schema, field_name: core_schema.with_info_wrap_validator_function(
  1964. f, schema, field_name=field_name
  1965. ),
  1966. }
  1967. # TODO V3: this function is only used for deprecated decorators. It should
  1968. # be removed once we drop support for those.
  1969. def apply_validators(
  1970. schema: core_schema.CoreSchema,
  1971. validators: Iterable[Decorator[RootValidatorDecoratorInfo]]
  1972. | Iterable[Decorator[ValidatorDecoratorInfo]]
  1973. | Iterable[Decorator[FieldValidatorDecoratorInfo]],
  1974. field_name: str | None,
  1975. ) -> core_schema.CoreSchema:
  1976. """Apply validators to a schema.
  1977. Args:
  1978. schema: The schema to apply validators on.
  1979. validators: An iterable of validators.
  1980. field_name: The name of the field if validators are being applied to a model field.
  1981. Returns:
  1982. The updated schema.
  1983. """
  1984. for validator in validators:
  1985. info_arg = inspect_validator(validator.func, validator.info.mode)
  1986. val_type = 'with-info' if info_arg else 'no-info'
  1987. schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema, field_name)
  1988. return schema
  1989. def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool:
  1990. """In v1, if any of the validators for a field had `always=True`, the default value would be validated.
  1991. This serves as an auxiliary function for re-implementing that logic, by looping over a provided
  1992. collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`.
  1993. We should be able to drop this function and the associated logic calling it once we drop support
  1994. for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent
  1995. to the v1-validator `always` kwarg to `field_validator`.)
  1996. """
  1997. for validator in validators:
  1998. if validator.info.always:
  1999. return True
  2000. return False
  2001. def apply_model_validators(
  2002. schema: core_schema.CoreSchema,
  2003. validators: Iterable[Decorator[ModelValidatorDecoratorInfo]],
  2004. mode: Literal['inner', 'outer', 'all'],
  2005. ) -> core_schema.CoreSchema:
  2006. """Apply model validators to a schema.
  2007. If mode == 'inner', only "before" validators are applied
  2008. If mode == 'outer', validators other than "before" are applied
  2009. If mode == 'all', all validators are applied
  2010. Args:
  2011. schema: The schema to apply validators on.
  2012. validators: An iterable of validators.
  2013. mode: The validator mode.
  2014. Returns:
  2015. The updated schema.
  2016. """
  2017. ref: str | None = schema.pop('ref', None) # type: ignore
  2018. for validator in validators:
  2019. if mode == 'inner' and validator.info.mode != 'before':
  2020. continue
  2021. if mode == 'outer' and validator.info.mode == 'before':
  2022. continue
  2023. info_arg = inspect_validator(validator.func, validator.info.mode)
  2024. if validator.info.mode == 'wrap':
  2025. if info_arg:
  2026. schema = core_schema.with_info_wrap_validator_function(function=validator.func, schema=schema)
  2027. else:
  2028. schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema)
  2029. elif validator.info.mode == 'before':
  2030. if info_arg:
  2031. schema = core_schema.with_info_before_validator_function(function=validator.func, schema=schema)
  2032. else:
  2033. schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema)
  2034. else:
  2035. assert validator.info.mode == 'after'
  2036. if info_arg:
  2037. schema = core_schema.with_info_after_validator_function(function=validator.func, schema=schema)
  2038. else:
  2039. schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema)
  2040. if ref:
  2041. schema['ref'] = ref # type: ignore
  2042. return schema
  2043. def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema:
  2044. """Wrap schema with default schema if default value or `default_factory` are available.
  2045. Args:
  2046. field_info: The field info object.
  2047. schema: The schema to apply default on.
  2048. Returns:
  2049. Updated schema by default value or `default_factory`.
  2050. """
  2051. if field_info.default_factory:
  2052. return core_schema.with_default_schema(
  2053. schema,
  2054. default_factory=field_info.default_factory,
  2055. default_factory_takes_data=takes_validated_data_argument(field_info.default_factory),
  2056. validate_default=field_info.validate_default,
  2057. )
  2058. elif field_info.default is not PydanticUndefined:
  2059. return core_schema.with_default_schema(
  2060. schema, default=field_info.default, validate_default=field_info.validate_default
  2061. )
  2062. else:
  2063. return schema
  2064. def _extract_get_pydantic_json_schema(tp: Any, schema: CoreSchema) -> GetJsonSchemaFunction | None:
  2065. """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`."""
  2066. js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None)
  2067. if hasattr(tp, '__modify_schema__'):
  2068. BaseModel = import_cached_base_model()
  2069. has_custom_v2_modify_js_func = (
  2070. js_modify_function is not None
  2071. and BaseModel.__get_pydantic_json_schema__.__func__ # type: ignore
  2072. not in (js_modify_function, getattr(js_modify_function, '__func__', None))
  2073. )
  2074. if not has_custom_v2_modify_js_func:
  2075. cls_name = getattr(tp, '__name__', None)
  2076. raise PydanticUserError(
  2077. f'The `__modify_schema__` method is not supported in Pydantic v2. '
  2078. f'Use `__get_pydantic_json_schema__` instead{f" in class `{cls_name}`" if cls_name else ""}.',
  2079. code='custom-json-schema',
  2080. )
  2081. # handle GenericAlias' but ignore Annotated which "lies" about its origin (in this case it would be `int`)
  2082. if hasattr(tp, '__origin__') and not _typing_extra.is_annotated(tp):
  2083. return _extract_get_pydantic_json_schema(tp.__origin__, schema)
  2084. if js_modify_function is None:
  2085. return None
  2086. return js_modify_function
  2087. class _CommonField(TypedDict):
  2088. schema: core_schema.CoreSchema
  2089. validation_alias: str | list[str | int] | list[list[str | int]] | None
  2090. serialization_alias: str | None
  2091. serialization_exclude: bool | None
  2092. frozen: bool | None
  2093. metadata: dict[str, Any]
  2094. def _common_field(
  2095. schema: core_schema.CoreSchema,
  2096. *,
  2097. validation_alias: str | list[str | int] | list[list[str | int]] | None = None,
  2098. serialization_alias: str | None = None,
  2099. serialization_exclude: bool | None = None,
  2100. frozen: bool | None = None,
  2101. metadata: Any = None,
  2102. ) -> _CommonField:
  2103. return {
  2104. 'schema': schema,
  2105. 'validation_alias': validation_alias,
  2106. 'serialization_alias': serialization_alias,
  2107. 'serialization_exclude': serialization_exclude,
  2108. 'frozen': frozen,
  2109. 'metadata': metadata,
  2110. }
  2111. class _Definitions:
  2112. """Keeps track of references and definitions."""
  2113. def __init__(self) -> None:
  2114. self.seen: set[str] = set()
  2115. self.definitions: dict[str, core_schema.CoreSchema] = {}
  2116. @contextmanager
  2117. def get_schema_or_ref(self, tp: Any) -> Iterator[tuple[str, None] | tuple[str, CoreSchema]]:
  2118. """Get a definition for `tp` if one exists.
  2119. If a definition exists, a tuple of `(ref_string, CoreSchema)` is returned.
  2120. If no definition exists yet, a tuple of `(ref_string, None)` is returned.
  2121. Note that the returned `CoreSchema` will always be a `DefinitionReferenceSchema`,
  2122. not the actual definition itself.
  2123. This should be called for any type that can be identified by reference.
  2124. This includes any recursive types.
  2125. At present the following types can be named/recursive:
  2126. - BaseModel
  2127. - Dataclasses
  2128. - TypedDict
  2129. - TypeAliasType
  2130. """
  2131. ref = get_type_ref(tp)
  2132. # return the reference if we're either (1) in a cycle or (2) it was already defined
  2133. if ref in self.seen or ref in self.definitions:
  2134. yield (ref, core_schema.definition_reference_schema(ref))
  2135. else:
  2136. self.seen.add(ref)
  2137. try:
  2138. yield (ref, None)
  2139. finally:
  2140. self.seen.discard(ref)
  2141. def resolve_original_schema(schema: CoreSchema, definitions: dict[str, CoreSchema]) -> CoreSchema | None:
  2142. if schema['type'] == 'definition-ref':
  2143. return definitions.get(schema['schema_ref'], None)
  2144. elif schema['type'] == 'definitions':
  2145. return schema['schema']
  2146. else:
  2147. return schema
  2148. class _FieldNameStack:
  2149. __slots__ = ('_stack',)
  2150. def __init__(self) -> None:
  2151. self._stack: list[str] = []
  2152. @contextmanager
  2153. def push(self, field_name: str) -> Iterator[None]:
  2154. self._stack.append(field_name)
  2155. yield
  2156. self._stack.pop()
  2157. def get(self) -> str | None:
  2158. if self._stack:
  2159. return self._stack[-1]
  2160. else:
  2161. return None
  2162. class _ModelTypeStack:
  2163. __slots__ = ('_stack',)
  2164. def __init__(self) -> None:
  2165. self._stack: list[type] = []
  2166. @contextmanager
  2167. def push(self, type_obj: type) -> Iterator[None]:
  2168. self._stack.append(type_obj)
  2169. yield
  2170. self._stack.pop()
  2171. def get(self) -> type | None:
  2172. if self._stack:
  2173. return self._stack[-1]
  2174. else:
  2175. return None