_schema_validator.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. """Pluggable schema validator for pydantic."""
  2. from __future__ import annotations
  3. import functools
  4. from typing import TYPE_CHECKING, Any, Callable, Iterable, TypeVar
  5. from pydantic_core import CoreConfig, CoreSchema, SchemaValidator, ValidationError
  6. from typing_extensions import Literal, ParamSpec
  7. if TYPE_CHECKING:
  8. from . import BaseValidateHandlerProtocol, PydanticPluginProtocol, SchemaKind, SchemaTypePath
  9. P = ParamSpec('P')
  10. R = TypeVar('R')
  11. Event = Literal['on_validate_python', 'on_validate_json', 'on_validate_strings']
  12. events: list[Event] = list(Event.__args__) # type: ignore
  13. def create_schema_validator(
  14. schema: CoreSchema,
  15. schema_type: Any,
  16. schema_type_module: str,
  17. schema_type_name: str,
  18. schema_kind: SchemaKind,
  19. config: CoreConfig | None = None,
  20. plugin_settings: dict[str, Any] | None = None,
  21. ) -> SchemaValidator | PluggableSchemaValidator:
  22. """Create a `SchemaValidator` or `PluggableSchemaValidator` if plugins are installed.
  23. Returns:
  24. If plugins are installed then return `PluggableSchemaValidator`, otherwise return `SchemaValidator`.
  25. """
  26. from . import SchemaTypePath
  27. from ._loader import get_plugins
  28. plugins = get_plugins()
  29. if plugins:
  30. return PluggableSchemaValidator(
  31. schema,
  32. schema_type,
  33. SchemaTypePath(schema_type_module, schema_type_name),
  34. schema_kind,
  35. config,
  36. plugins,
  37. plugin_settings or {},
  38. )
  39. else:
  40. return SchemaValidator(schema, config)
  41. class PluggableSchemaValidator:
  42. """Pluggable schema validator."""
  43. __slots__ = '_schema_validator', 'validate_json', 'validate_python', 'validate_strings'
  44. def __init__(
  45. self,
  46. schema: CoreSchema,
  47. schema_type: Any,
  48. schema_type_path: SchemaTypePath,
  49. schema_kind: SchemaKind,
  50. config: CoreConfig | None,
  51. plugins: Iterable[PydanticPluginProtocol],
  52. plugin_settings: dict[str, Any],
  53. ) -> None:
  54. self._schema_validator = SchemaValidator(schema, config)
  55. python_event_handlers: list[BaseValidateHandlerProtocol] = []
  56. json_event_handlers: list[BaseValidateHandlerProtocol] = []
  57. strings_event_handlers: list[BaseValidateHandlerProtocol] = []
  58. for plugin in plugins:
  59. try:
  60. p, j, s = plugin.new_schema_validator(
  61. schema, schema_type, schema_type_path, schema_kind, config, plugin_settings
  62. )
  63. except TypeError as e: # pragma: no cover
  64. raise TypeError(f'Error using plugin `{plugin.__module__}:{plugin.__class__.__name__}`: {e}') from e
  65. if p is not None:
  66. python_event_handlers.append(p)
  67. if j is not None:
  68. json_event_handlers.append(j)
  69. if s is not None:
  70. strings_event_handlers.append(s)
  71. self.validate_python = build_wrapper(self._schema_validator.validate_python, python_event_handlers)
  72. self.validate_json = build_wrapper(self._schema_validator.validate_json, json_event_handlers)
  73. self.validate_strings = build_wrapper(self._schema_validator.validate_strings, strings_event_handlers)
  74. def __getattr__(self, name: str) -> Any:
  75. return getattr(self._schema_validator, name)
  76. def build_wrapper(func: Callable[P, R], event_handlers: list[BaseValidateHandlerProtocol]) -> Callable[P, R]:
  77. if not event_handlers:
  78. return func
  79. else:
  80. on_enters = tuple(h.on_enter for h in event_handlers if filter_handlers(h, 'on_enter'))
  81. on_successes = tuple(h.on_success for h in event_handlers if filter_handlers(h, 'on_success'))
  82. on_errors = tuple(h.on_error for h in event_handlers if filter_handlers(h, 'on_error'))
  83. on_exceptions = tuple(h.on_exception for h in event_handlers if filter_handlers(h, 'on_exception'))
  84. @functools.wraps(func)
  85. def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
  86. for on_enter_handler in on_enters:
  87. on_enter_handler(*args, **kwargs)
  88. try:
  89. result = func(*args, **kwargs)
  90. except ValidationError as error:
  91. for on_error_handler in on_errors:
  92. on_error_handler(error)
  93. raise
  94. except Exception as exception:
  95. for on_exception_handler in on_exceptions:
  96. on_exception_handler(exception)
  97. raise
  98. else:
  99. for on_success_handler in on_successes:
  100. on_success_handler(result)
  101. return result
  102. return wrapper
  103. def filter_handlers(handler_cls: BaseValidateHandlerProtocol, method_name: str) -> bool:
  104. """Filter out handler methods which are not implemented by the plugin directly - e.g. are missing
  105. or are inherited from the protocol.
  106. """
  107. handler = getattr(handler_cls, method_name, None)
  108. if handler is None:
  109. return False
  110. elif handler.__module__ == 'pydantic.plugin':
  111. # this is the original handler, from the protocol due to runtime inheritance
  112. # we don't want to call it
  113. return False
  114. else:
  115. return True