From b1e3128e153077a843bace460b18fecba30845ea Mon Sep 17 00:00:00 2001 From: Oz Tiram Date: Sun, 21 May 2023 12:52:27 +0200 Subject: [PATCH] Drop cerberus, it seems it's not used anywhere (#5699) * Drop cerberus, it seems it's not used anywhere. --- news/5699.vendor.rst | 1 + pipenv/vendor/cerberus/LICENSE | 15 - pipenv/vendor/cerberus/__init__.py | 32 - pipenv/vendor/cerberus/errors.py | 654 ------- pipenv/vendor/cerberus/platform.py | 43 - pipenv/vendor/cerberus/schema.py | 556 ------ pipenv/vendor/cerberus/utils.py | 132 -- pipenv/vendor/cerberus/validator.py | 1682 ------------------- pipenv/vendor/vendor.txt | 3 +- tests/unit/test_utils_windows_executable.py | 2 + 10 files changed, 4 insertions(+), 3116 deletions(-) create mode 100644 news/5699.vendor.rst delete mode 100644 pipenv/vendor/cerberus/LICENSE delete mode 100644 pipenv/vendor/cerberus/__init__.py delete mode 100644 pipenv/vendor/cerberus/errors.py delete mode 100644 pipenv/vendor/cerberus/platform.py delete mode 100644 pipenv/vendor/cerberus/schema.py delete mode 100644 pipenv/vendor/cerberus/utils.py delete mode 100644 pipenv/vendor/cerberus/validator.py diff --git a/news/5699.vendor.rst b/news/5699.vendor.rst new file mode 100644 index 00000000..599ed26f --- /dev/null +++ b/news/5699.vendor.rst @@ -0,0 +1 @@ +Drop vendored library cerberus. This isn't actually used by pipenv. diff --git a/pipenv/vendor/cerberus/LICENSE b/pipenv/vendor/cerberus/LICENSE deleted file mode 100644 index 2e0bcdd8..00000000 --- a/pipenv/vendor/cerberus/LICENSE +++ /dev/null @@ -1,15 +0,0 @@ -ISC License - -Copyright (c) 2012-2016 Nicola Iarocci. - -Permission to use, copy, modify, and/or distribute this software for any -purpose with or without fee is hereby granted, provided that the above -copyright notice and this permission notice appear in all copies. - -THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH -REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND -FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, -INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM -LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR -OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR -PERFORMANCE OF THIS SOFTWARE. diff --git a/pipenv/vendor/cerberus/__init__.py b/pipenv/vendor/cerberus/__init__.py deleted file mode 100644 index 7cb82be2..00000000 --- a/pipenv/vendor/cerberus/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -""" - Extensible validation for Python dictionaries. - - :copyright: 2012-2021 by Nicola Iarocci. - :license: ISC, see LICENSE for more details. - - Full documentation is available at https://python-cerberus.org/ - -""" - -from __future__ import absolute_import - -from pipenv.patched.pip._vendor.pkg_resources import get_distribution, DistributionNotFound - -from pipenv.vendor.cerberus.validator import DocumentError, Validator -from pipenv.vendor.cerberus.schema import rules_set_registry, schema_registry, SchemaError -from pipenv.vendor.cerberus.utils import TypeDefinition - - -try: - __version__ = get_distribution("Cerberus").version -except DistributionNotFound: - __version__ = "unknown" - -__all__ = [ - DocumentError.__name__, - SchemaError.__name__, - TypeDefinition.__name__, - Validator.__name__, - "schema_registry", - "rules_set_registry", -] diff --git a/pipenv/vendor/cerberus/errors.py b/pipenv/vendor/cerberus/errors.py deleted file mode 100644 index 1492f1d1..00000000 --- a/pipenv/vendor/cerberus/errors.py +++ /dev/null @@ -1,654 +0,0 @@ -# -*-: coding utf-8 -*- -""" This module contains the error-related constants and classes. """ - -from __future__ import absolute_import - -from collections import defaultdict, namedtuple -from copy import copy, deepcopy -from functools import wraps -from pprint import pformat - -from pipenv.vendor.cerberus.platform import PYTHON_VERSION, MutableMapping -from pipenv.vendor.cerberus.utils import compare_paths_lt, quote_string - - -ErrorDefinition = namedtuple('ErrorDefinition', 'code, rule') -""" -This class is used to define possible errors. Each distinguishable error is -defined by a *unique* error ``code`` as integer and the ``rule`` that can -cause it as string. -The instances' names do not contain a common prefix as they are supposed to be -referenced within the module namespace, e.g. ``errors.CUSTOM``. -""" - - -# custom -CUSTOM = ErrorDefinition(0x00, None) - -# existence -DOCUMENT_MISSING = ErrorDefinition(0x01, None) # issues/141 -DOCUMENT_MISSING = "document is missing" -REQUIRED_FIELD = ErrorDefinition(0x02, 'required') -UNKNOWN_FIELD = ErrorDefinition(0x03, None) -DEPENDENCIES_FIELD = ErrorDefinition(0x04, 'dependencies') -DEPENDENCIES_FIELD_VALUE = ErrorDefinition(0x05, 'dependencies') -EXCLUDES_FIELD = ErrorDefinition(0x06, 'excludes') - -# shape -DOCUMENT_FORMAT = ErrorDefinition(0x21, None) # issues/141 -DOCUMENT_FORMAT = "'{0}' is not a document, must be a dict" -EMPTY_NOT_ALLOWED = ErrorDefinition(0x22, 'empty') -NOT_NULLABLE = ErrorDefinition(0x23, 'nullable') -BAD_TYPE = ErrorDefinition(0x24, 'type') -BAD_TYPE_FOR_SCHEMA = ErrorDefinition(0x25, 'schema') -ITEMS_LENGTH = ErrorDefinition(0x26, 'items') -MIN_LENGTH = ErrorDefinition(0x27, 'minlength') -MAX_LENGTH = ErrorDefinition(0x28, 'maxlength') - - -# color -REGEX_MISMATCH = ErrorDefinition(0x41, 'regex') -MIN_VALUE = ErrorDefinition(0x42, 'min') -MAX_VALUE = ErrorDefinition(0x43, 'max') -UNALLOWED_VALUE = ErrorDefinition(0x44, 'allowed') -UNALLOWED_VALUES = ErrorDefinition(0x45, 'allowed') -FORBIDDEN_VALUE = ErrorDefinition(0x46, 'forbidden') -FORBIDDEN_VALUES = ErrorDefinition(0x47, 'forbidden') -MISSING_MEMBERS = ErrorDefinition(0x48, 'contains') - -# other -NORMALIZATION = ErrorDefinition(0x60, None) -COERCION_FAILED = ErrorDefinition(0x61, 'coerce') -RENAMING_FAILED = ErrorDefinition(0x62, 'rename_handler') -READONLY_FIELD = ErrorDefinition(0x63, 'readonly') -SETTING_DEFAULT_FAILED = ErrorDefinition(0x64, 'default_setter') - -# groups -ERROR_GROUP = ErrorDefinition(0x80, None) -MAPPING_SCHEMA = ErrorDefinition(0x81, 'schema') -SEQUENCE_SCHEMA = ErrorDefinition(0x82, 'schema') -# TODO remove KEYSCHEMA AND VALUESCHEMA with next major release -KEYSRULES = KEYSCHEMA = ErrorDefinition(0x83, 'keysrules') -VALUESRULES = VALUESCHEMA = ErrorDefinition(0x84, 'valuesrules') -BAD_ITEMS = ErrorDefinition(0x8F, 'items') - -LOGICAL = ErrorDefinition(0x90, None) -NONEOF = ErrorDefinition(0x91, 'noneof') -ONEOF = ErrorDefinition(0x92, 'oneof') -ANYOF = ErrorDefinition(0x93, 'anyof') -ALLOF = ErrorDefinition(0x94, 'allof') - - -""" SchemaError messages """ - -SCHEMA_ERROR_DEFINITION_TYPE = "schema definition for field '{0}' must be a dict" -SCHEMA_ERROR_MISSING = "validation schema missing" - - -""" Error representations """ - - -class ValidationError(object): - """A simple class to store and query basic error information.""" - - def __init__(self, document_path, schema_path, code, rule, constraint, value, info): - self.document_path = document_path - """ The path to the field within the document that caused the error. - Type: :class:`tuple` """ - self.schema_path = schema_path - """ The path to the rule within the schema that caused the error. - Type: :class:`tuple` """ - self.code = code - """ The error's identifier code. Type: :class:`int` """ - self.rule = rule - """ The rule that failed. Type: `string` """ - self.constraint = constraint - """ The constraint that failed. """ - self.value = value - """ The value that failed. """ - self.info = info - """ May hold additional information about the error. - Type: :class:`tuple` """ - - def __eq__(self, other): - """Assumes the errors relate to the same document and schema.""" - return hash(self) == hash(other) - - def __hash__(self): - """Expects that all other properties are transitively determined.""" - return hash(self.document_path) ^ hash(self.schema_path) ^ hash(self.code) - - def __lt__(self, other): - if self.document_path != other.document_path: - return compare_paths_lt(self.document_path, other.document_path) - else: - return compare_paths_lt(self.schema_path, other.schema_path) - - def __repr__(self): - return ( - "{class_name} @ {memptr} ( " - "document_path={document_path}," - "schema_path={schema_path}," - "code={code}," - "constraint={constraint}," - "value={value}," - "info={info} )".format( - class_name=self.__class__.__name__, - memptr=hex(id(self)), # noqa: E501 - document_path=self.document_path, - schema_path=self.schema_path, - code=hex(self.code), - constraint=quote_string(self.constraint), - value=quote_string(self.value), - info=self.info, - ) - ) - - @property - def child_errors(self): - """ - A list that contains the individual errors of a bulk validation error. - """ - return self.info[0] if self.is_group_error else None - - @property - def definitions_errors(self): - """ - Dictionary with errors of an \*of-rule mapped to the index of the definition it - occurred in. Returns :obj:`None` if not applicable. - """ - if not self.is_logic_error: - return None - - result = defaultdict(list) - for error in self.child_errors: - i = error.schema_path[len(self.schema_path)] - result[i].append(error) - return result - - @property - def field(self): - """Field of the contextual mapping, possibly :obj:`None`.""" - if self.document_path: - return self.document_path[-1] - else: - return None - - @property - def is_group_error(self): - """``True`` for errors of bulk validations.""" - return bool(self.code & ERROR_GROUP.code) - - @property - def is_logic_error(self): - """ - ``True`` for validation errors against different schemas with \*of-rules. - """ - return bool(self.code & LOGICAL.code - ERROR_GROUP.code) - - @property - def is_normalization_error(self): - """``True`` for normalization errors.""" - return bool(self.code & NORMALIZATION.code) - - -class ErrorList(list): - """ - A list for :class:`~cerberus.errors.ValidationError` instances that can be queried - with the ``in`` keyword for a particular :class:`~cerberus.errors.ErrorDefinition`. - """ - - def __contains__(self, error_definition): - if not isinstance(error_definition, ErrorDefinition): - raise TypeError - - wanted_code = error_definition.code - return any(x.code == wanted_code for x in self) - - -class ErrorTreeNode(MutableMapping): - __slots__ = ('descendants', 'errors', 'parent_node', 'path', 'tree_root') - - def __init__(self, path, parent_node): - self.parent_node = parent_node - self.tree_root = self.parent_node.tree_root - self.path = path[: self.parent_node.depth + 1] - self.errors = ErrorList() - self.descendants = {} - - def __contains__(self, item): - if isinstance(item, ErrorDefinition): - return item in self.errors - else: - return item in self.descendants - - def __delitem__(self, key): - del self.descendants[key] - - def __iter__(self): - return iter(self.errors) - - def __getitem__(self, item): - if isinstance(item, ErrorDefinition): - for error in self.errors: - if item.code == error.code: - return error - return None - else: - return self.descendants.get(item) - - def __len__(self): - return len(self.errors) - - def __repr__(self): - return self.__str__() - - def __setitem__(self, key, value): - self.descendants[key] = value - - def __str__(self): - return str(self.errors) + ',' + str(self.descendants) - - @property - def depth(self): - return len(self.path) - - @property - def tree_type(self): - return self.tree_root.tree_type - - def add(self, error): - error_path = self._path_of_(error) - - key = error_path[self.depth] - if key not in self.descendants: - self[key] = ErrorTreeNode(error_path, self) - - node = self[key] - - if len(error_path) == self.depth + 1: - node.errors.append(error) - node.errors.sort() - if error.is_group_error: - for child_error in error.child_errors: - self.tree_root.add(child_error) - else: - node.add(error) - - def _path_of_(self, error): - return getattr(error, self.tree_type + '_path') - - -class ErrorTree(ErrorTreeNode): - """ - Base class for :class:`~cerberus.errors.DocumentErrorTree` and - :class:`~cerberus.errors.SchemaErrorTree`. - """ - - def __init__(self, errors=()): - self.parent_node = None - self.tree_root = self - self.path = () - self.errors = ErrorList() - self.descendants = {} - for error in errors: - self.add(error) - - def add(self, error): - """ - Add an error to the tree. - - :param error: :class:`~cerberus.errors.ValidationError` - """ - if not self._path_of_(error): - self.errors.append(error) - self.errors.sort() - else: - super(ErrorTree, self).add(error) - - def fetch_errors_from(self, path): - """ - Returns all errors for a particular path. - - :param path: :class:`tuple` of :term:`hashable` s. - :rtype: :class:`~cerberus.errors.ErrorList` - """ - node = self.fetch_node_from(path) - if node is not None: - return node.errors - else: - return ErrorList() - - def fetch_node_from(self, path): - """ - Returns a node for a path. - - :param path: Tuple of :term:`hashable` s. - :rtype: :class:`~cerberus.errors.ErrorTreeNode` or :obj:`None` - """ - context = self - for key in path: - context = context[key] - if context is None: - break - return context - - -class DocumentErrorTree(ErrorTree): - """ - Implements a dict-like class to query errors by indexes following the structure of a - validated document. - """ - - tree_type = 'document' - - -class SchemaErrorTree(ErrorTree): - """ - Implements a dict-like class to query errors by indexes following the structure of - the used schema. - """ - - tree_type = 'schema' - - -class BaseErrorHandler(object): - """Base class for all error handlers. - Subclasses are identified as error-handlers with an instance-test.""" - - def __init__(self, *args, **kwargs): - """Optionally initialize a new instance.""" - pass - - def __call__(self, errors): - """ - Returns errors in a handler-specific format. - - :param errors: An object containing the errors. - :type errors: :term:`iterable` of - :class:`~cerberus.errors.ValidationError` instances or a - :class:`~cerberus.Validator` instance - """ - raise NotImplementedError - - def __iter__(self): - """Be a superhero and implement an iterator over errors.""" - raise NotImplementedError - - def add(self, error): - """ - Add an error to the errors' container object of a handler. - - :param error: The error to add. - :type error: :class:`~cerberus.errors.ValidationError` - """ - raise NotImplementedError - - def emit(self, error): - """ - Optionally emits an error in the handler's format to a stream. Or light a LED, - or even shut down a power plant. - - :param error: The error to emit. - :type error: :class:`~cerberus.errors.ValidationError` - """ - pass - - def end(self, validator): - """ - Gets called when a validation ends. - - :param validator: The calling validator. - :type validator: :class:`~cerberus.Validator` - """ - pass - - def extend(self, errors): - """ - Adds all errors to the handler's container object. - - :param errors: The errors to add. - :type errors: :term:`iterable` of - :class:`~cerberus.errors.ValidationError` instances - """ - for error in errors: - self.add(error) - - def start(self, validator): - """ - Gets called when a validation starts. - - :param validator: The calling validator. - :type validator: :class:`~cerberus.Validator` - """ - pass - - -class ToyErrorHandler(BaseErrorHandler): - def __call__(self, *args, **kwargs): - raise RuntimeError('This is not supposed to happen.') - - def clear(self): - pass - - -def encode_unicode(f): - """Cerberus error messages expect regular binary strings. - If unicode is used in a ValidationError message can't be printed. - - This decorator ensures that if legacy Python is used unicode - strings are encoded before passing to a function. - """ - - @wraps(f) - def wrapped(obj, error): - def _encode(value): - """Helper encoding unicode strings into binary utf-8""" - if isinstance(value, unicode): # noqa: F821 - return value.encode('utf-8') - return value - - error = copy(error) - error.document_path = _encode(error.document_path) - error.schema_path = _encode(error.schema_path) - error.constraint = _encode(error.constraint) - error.value = _encode(error.value) - error.info = _encode(error.info) - return f(obj, error) - - return wrapped if PYTHON_VERSION < 3 else f - - -class BasicErrorHandler(BaseErrorHandler): - """ - Models cerberus' legacy. Returns a :class:`dict`. When mangled through :class:`str` - a pretty-formatted representation of that tree is returned. - """ - - messages = { - 0x00: "{0}", - 0x01: "document is missing", - 0x02: "required field", - 0x03: "unknown field", - 0x04: "field '{0}' is required", - 0x05: "depends on these values: {constraint}", - 0x06: "{0} must not be present with '{field}'", - 0x21: "'{0}' is not a document, must be a dict", - 0x22: "empty values not allowed", - 0x23: "null value not allowed", - 0x24: "must be of {constraint} type", - 0x25: "must be of dict type", - 0x26: "length of list should be {0}, it is {1}", - 0x27: "min length is {constraint}", - 0x28: "max length is {constraint}", - 0x41: "value does not match regex '{constraint}'", - 0x42: "min value is {constraint}", - 0x43: "max value is {constraint}", - 0x44: "unallowed value {value}", - 0x45: "unallowed values {0}", - 0x46: "unallowed value {value}", - 0x47: "unallowed values {0}", - 0x48: "missing members {0}", - 0x61: "field '{field}' cannot be coerced: {0}", - 0x62: "field '{field}' cannot be renamed: {0}", - 0x63: "field is read-only", - 0x64: "default value for '{field}' cannot be set: {0}", - 0x81: "mapping doesn't validate subschema: {0}", - 0x82: "one or more sequence-items don't validate: {0}", - 0x83: "one or more keys of a mapping don't validate: {0}", - 0x84: "one or more values in a mapping don't validate: {0}", - 0x85: "one or more sequence-items don't validate: {0}", - 0x91: "one or more definitions validate", - 0x92: "none or more than one rule validate", - 0x93: "no definitions validate", - 0x94: "one or more definitions don't validate", - } - - def __init__(self, tree=None): - self.tree = {} if tree is None else tree - - def __call__(self, errors): - self.clear() - self.extend(errors) - return self.pretty_tree - - def __str__(self): - return pformat(self.pretty_tree) - - @property - def pretty_tree(self): - pretty = deepcopy(self.tree) - for field in pretty: - self._purge_empty_dicts(pretty[field]) - return pretty - - @encode_unicode - def add(self, error): - # Make sure the original error is not altered with - # error paths specific to the handler. - error = deepcopy(error) - - self._rewrite_error_path(error) - - if error.is_logic_error: - self._insert_logic_error(error) - elif error.is_group_error: - self._insert_group_error(error) - elif error.code in self.messages: - self._insert_error( - error.document_path, self._format_message(error.field, error) - ) - - def clear(self): - self.tree = {} - - def start(self, validator): - self.clear() - - def _format_message(self, field, error): - return self.messages[error.code].format( - *error.info, constraint=error.constraint, field=field, value=error.value - ) - - def _insert_error(self, path, node): - """ - Adds an error or sub-tree to :attr:tree. - - :param path: Path to the error. - :type path: Tuple of strings and integers. - :param node: An error message or a sub-tree. - :type node: String or dictionary. - """ - field = path[0] - if len(path) == 1: - if field in self.tree: - subtree = self.tree[field].pop() - self.tree[field] += [node, subtree] - else: - self.tree[field] = [node, {}] - elif len(path) >= 1: - if field not in self.tree: - self.tree[field] = [{}] - subtree = self.tree[field][-1] - - if subtree: - new = self.__class__(tree=copy(subtree)) - else: - new = self.__class__() - new._insert_error(path[1:], node) - subtree.update(new.tree) - - def _insert_group_error(self, error): - for child_error in error.child_errors: - if child_error.is_logic_error: - self._insert_logic_error(child_error) - elif child_error.is_group_error: - self._insert_group_error(child_error) - else: - self._insert_error( - child_error.document_path, - self._format_message(child_error.field, child_error), - ) - - def _insert_logic_error(self, error): - field = error.field - self._insert_error(error.document_path, self._format_message(field, error)) - - for definition_errors in error.definitions_errors.values(): - for child_error in definition_errors: - if child_error.is_logic_error: - self._insert_logic_error(child_error) - elif child_error.is_group_error: - self._insert_group_error(child_error) - else: - self._insert_error( - child_error.document_path, - self._format_message(field, child_error), - ) - - def _purge_empty_dicts(self, error_list): - subtree = error_list[-1] - if not error_list[-1]: - error_list.pop() - else: - for key in subtree: - self._purge_empty_dicts(subtree[key]) - - def _rewrite_error_path(self, error, offset=0): - """ - Recursively rewrites the error path to correctly represent logic errors - """ - if error.is_logic_error: - self._rewrite_logic_error_path(error, offset) - elif error.is_group_error: - self._rewrite_group_error_path(error, offset) - - def _rewrite_group_error_path(self, error, offset=0): - child_start = len(error.document_path) - offset - - for child_error in error.child_errors: - relative_path = child_error.document_path[child_start:] - child_error.document_path = error.document_path + relative_path - - self._rewrite_error_path(child_error, offset) - - def _rewrite_logic_error_path(self, error, offset=0): - child_start = len(error.document_path) - offset - - for i, definition_errors in error.definitions_errors.items(): - if not definition_errors: - continue - - nodename = '%s definition %s' % (error.rule, i) - path = error.document_path + (nodename,) - - for child_error in definition_errors: - rel_path = child_error.document_path[child_start:] - child_error.document_path = path + rel_path - - self._rewrite_error_path(child_error, offset + 1) - - -class SchemaErrorHandler(BasicErrorHandler): - messages = BasicErrorHandler.messages.copy() - messages[0x03] = "unknown rule" diff --git a/pipenv/vendor/cerberus/platform.py b/pipenv/vendor/cerberus/platform.py deleted file mode 100644 index ae229c7b..00000000 --- a/pipenv/vendor/cerberus/platform.py +++ /dev/null @@ -1,43 +0,0 @@ -""" Platform-dependent objects """ - -import sys - -if sys.flags.optimize == 2: - raise RuntimeError("Cerberus can't be run with Python's optimization level 2.") - - -PYTHON_VERSION = float(sys.version_info[0]) + float(sys.version_info[1]) / 10 - - -if PYTHON_VERSION < 3: - _str_type = basestring # noqa: F821 - _int_types = (int, long) # noqa: F821 -else: - _str_type = str - _int_types = (int,) - - -if PYTHON_VERSION < 3.3: - from collections import ( # noqa: F401 - Callable, - Container, - Hashable, - Iterable, - Mapping, - MutableMapping, - Sequence, - Set, - Sized, - ) -else: - from collections.abc import ( # noqa: F401 - Callable, - Container, - Hashable, - Iterable, - Mapping, - MutableMapping, - Sequence, - Set, - Sized, - ) diff --git a/pipenv/vendor/cerberus/schema.py b/pipenv/vendor/cerberus/schema.py deleted file mode 100644 index 3841d384..00000000 --- a/pipenv/vendor/cerberus/schema.py +++ /dev/null @@ -1,556 +0,0 @@ -from __future__ import absolute_import - -from warnings import warn - -from pipenv.vendor.cerberus import errors -from pipenv.vendor.cerberus.platform import ( - _str_type, - Callable, - Hashable, - Mapping, - MutableMapping, - Sequence, -) -from pipenv.vendor.cerberus.utils import ( - get_Validator_class, - validator_factory, - mapping_hash, - TypeDefinition, -) - - -class _Abort(Exception): - pass - - -class SchemaError(Exception): - """ - Raised when the validation schema is missing, has the wrong format or contains - errors.""" - - pass - - -class DefinitionSchema(MutableMapping): - """A dict-subclass for caching of validated schemas.""" - - def __new__(cls, *args, **kwargs): - if 'SchemaValidator' not in globals(): - global SchemaValidator - SchemaValidator = validator_factory('SchemaValidator', SchemaValidatorMixin) - types_mapping = SchemaValidator.types_mapping.copy() - types_mapping.update( - { - 'callable': TypeDefinition('callable', (Callable,), ()), - 'hashable': TypeDefinition('hashable', (Hashable,), ()), - } - ) - SchemaValidator.types_mapping = types_mapping - - return super(DefinitionSchema, cls).__new__(cls) - - def __init__(self, validator, schema): - """ - :param validator: An instance of Validator-(sub-)class that uses this - schema. - :param schema: A definition-schema as ``dict``. Defaults to an empty - one. - """ - if not isinstance(validator, get_Validator_class()): - raise RuntimeError('validator argument must be a Validator-' 'instance.') - self.validator = validator - - if isinstance(schema, _str_type): - schema = validator.schema_registry.get(schema, schema) - - if not isinstance(schema, Mapping): - try: - schema = dict(schema) - except Exception: - raise SchemaError(errors.SCHEMA_ERROR_DEFINITION_TYPE.format(schema)) - - self.validation_schema = SchemaValidationSchema(validator) - self.schema_validator = SchemaValidator( - None, - allow_unknown=self.validation_schema, - error_handler=errors.SchemaErrorHandler, - target_schema=schema, - target_validator=validator, - ) - - schema = self.expand(schema) - self.validate(schema) - self.schema = schema - - def __delitem__(self, key): - _new_schema = self.schema.copy() - try: - del _new_schema[key] - except ValueError: - raise SchemaError("Schema has no field '%s' defined" % key) - except Exception as e: - raise e - else: - del self.schema[key] - - def __getitem__(self, item): - return self.schema[item] - - def __iter__(self): - return iter(self.schema) - - def __len__(self): - return len(self.schema) - - def __repr__(self): - return str(self) - - def __setitem__(self, key, value): - value = self.expand({0: value})[0] - self.validate({key: value}) - self.schema[key] = value - - def __str__(self): - if hasattr(self, "schema"): - return str(self.schema) - else: - return "No schema data is set yet." - - def copy(self): - return self.__class__(self.validator, self.schema.copy()) - - @classmethod - def expand(cls, schema): - try: - schema = cls._expand_logical_shortcuts(schema) - schema = cls._expand_subschemas(schema) - except Exception: - pass - - # TODO remove this with the next major release - schema = cls._rename_deprecated_rulenames(schema) - - return schema - - @classmethod - def _expand_logical_shortcuts(cls, schema): - """ - Expand agglutinated rules in a definition-schema. - - :param schema: The schema-definition to expand. - :return: The expanded schema-definition. - """ - - def is_of_rule(x): - return isinstance(x, _str_type) and x.startswith( - ('allof_', 'anyof_', 'noneof_', 'oneof_') - ) - - for field, rules in schema.items(): - for of_rule in [x for x in rules if is_of_rule(x)]: - operator, rule = of_rule.split('_', 1) - rules.update({operator: []}) - for value in rules[of_rule]: - rules[operator].append({rule: value}) - del rules[of_rule] - return schema - - @classmethod - def _expand_subschemas(cls, schema): - def has_schema_rule(): - return isinstance(schema[field], Mapping) and 'schema' in schema[field] - - def has_mapping_schema(): - """ - Tries to determine heuristically if the schema-constraints are aimed to - mappings. - """ - try: - return all( - isinstance(x, Mapping) for x in schema[field]['schema'].values() - ) - except TypeError: - return False - - for field in schema: - if not has_schema_rule(): - pass - elif has_mapping_schema(): - schema[field]['schema'] = cls.expand(schema[field]['schema']) - else: # assumes schema-constraints for a sequence - schema[field]['schema'] = cls.expand({0: schema[field]['schema']})[0] - - # TODO remove the last two values in the tuple with the next major release - for rule in ('keysrules', 'valuesrules', 'keyschema', 'valueschema'): - if rule in schema[field]: - schema[field][rule] = cls.expand({0: schema[field][rule]})[0] - - for rule in ('allof', 'anyof', 'items', 'noneof', 'oneof'): - if rule in schema[field]: - if not isinstance(schema[field][rule], Sequence): - continue - new_rules_definition = [] - for item in schema[field][rule]: - new_rules_definition.append(cls.expand({0: item})[0]) - schema[field][rule] = new_rules_definition - return schema - - def get(self, item, default=None): - return self.schema.get(item, default) - - def items(self): - return self.schema.items() - - def update(self, schema): - try: - schema = self.expand(schema) - _new_schema = self.schema.copy() - _new_schema.update(schema) - self.validate(_new_schema) - except ValueError: - raise SchemaError(errors.SCHEMA_ERROR_DEFINITION_TYPE.format(schema)) - except Exception as e: - raise e - else: - self.schema = _new_schema - - # TODO remove with next major release - @staticmethod - def _rename_deprecated_rulenames(schema): - for field, rules in schema.items(): - - if isinstance(rules, str): # registry reference - continue - - for old, new in ( - ('keyschema', 'keysrules'), - ('validator', 'check_with'), - ('valueschema', 'valuesrules'), - ): - - if old not in rules: - continue - - if new in rules: - raise RuntimeError( - "The rule '{new}' is also present with its old " - "name '{old}' in the same set of rules." - ) - - warn( - "The rule '{old}' was renamed to '{new}'. The old name will " - "not be available in the next major release of " - "Cerberus.".format(old=old, new=new), - DeprecationWarning, - ) - schema[field][new] = schema[field][old] - schema[field].pop(old) - - return schema - - def regenerate_validation_schema(self): - self.validation_schema = SchemaValidationSchema(self.validator) - - def validate(self, schema=None): - """ - Validates a schema that defines rules against supported rules. - - :param schema: The schema to be validated as a legal cerberus schema - according to the rules of the assigned Validator object. - Raises a :class:`~cerberus.base.SchemaError` when an invalid - schema is encountered. - """ - if schema is None: - schema = self.schema - _hash = (mapping_hash(schema), mapping_hash(self.validator.types_mapping)) - if _hash not in self.validator._valid_schemas: - self._validate(schema) - self.validator._valid_schemas.add(_hash) - - def _validate(self, schema): - if isinstance(schema, _str_type): - schema = self.validator.schema_registry.get(schema, schema) - - test_schema = {} - for field, rules in schema.items(): - if isinstance(rules, _str_type): - test_schema[field] = rules_set_registry.get(rules, rules) - else: - test_rules = {} - for rule, constraint in rules.items(): - test_rules[rule.replace(" ", "_")] = constraint - test_schema[field] = test_rules - - if not self.schema_validator(test_schema, normalize=False): - raise SchemaError(self.schema_validator.errors) - - -class UnvalidatedSchema(DefinitionSchema): - def __init__(self, schema={}): - if not isinstance(schema, Mapping): - schema = dict(schema) - self.schema = schema - - def validate(self, schema): - pass - - def copy(self): - # Override ancestor's copy, because - # UnvalidatedSchema does not have .validator: - return self.__class__(self.schema.copy()) - - -class SchemaValidationSchema(UnvalidatedSchema): - def __init__(self, validator): - self.schema = { - 'allow_unknown': False, - 'schema': validator.rules, - 'type': 'dict', - } - - -class SchemaValidatorMixin(object): - """ - This validator mixin provides mechanics to validate schemas passed to a Cerberus - validator. - """ - - def __init__(self, *args, **kwargs): - kwargs.setdefault('known_rules_set_refs', set()) - kwargs.setdefault('known_schema_refs', set()) - super(SchemaValidatorMixin, self).__init__(*args, **kwargs) - - @property - def known_rules_set_refs(self): - """The encountered references to rules set registry items.""" - return self._config['known_rules_set_refs'] - - @property - def known_schema_refs(self): - """The encountered references to schema registry items.""" - return self._config['known_schema_refs'] - - @property - def target_schema(self): - """The schema that is being validated.""" - return self._config['target_schema'] - - @property - def target_validator(self): - """The validator whose schema is being validated.""" - return self._config['target_validator'] - - def _check_with_bulk_schema(self, field, value): - # resolve schema registry reference - if isinstance(value, _str_type): - if value in self.known_rules_set_refs: - return - else: - self.known_rules_set_refs.add(value) - definition = self.target_validator.rules_set_registry.get(value) - if definition is None: - self._error(field, 'Rules set definition %s not found.' % value) - return - else: - value = definition - - _hash = ( - mapping_hash({'turing': value}), - mapping_hash(self.target_validator.types_mapping), - ) - if _hash in self.target_validator._valid_schemas: - return - - validator = self._get_child_validator( - document_crumb=field, - allow_unknown=False, - schema=self.target_validator.rules, - ) - validator(value, normalize=False) - if validator._errors: - self._error(validator._errors) - else: - self.target_validator._valid_schemas.add(_hash) - - def _check_with_dependencies(self, field, value): - if isinstance(value, _str_type): - pass - elif isinstance(value, Mapping): - validator = self._get_child_validator( - document_crumb=field, - schema={'valuesrules': {'type': 'list'}}, - allow_unknown=True, - ) - if not validator(value, normalize=False): - self._error(validator._errors) - elif isinstance(value, Sequence): - if not all(isinstance(x, Hashable) for x in value): - path = self.document_path + (field,) - self._error(path, 'All dependencies must be a hashable type.') - - def _check_with_items(self, field, value): - for i, schema in enumerate(value): - self._check_with_bulk_schema((field, i), schema) - - def _check_with_schema(self, field, value): - try: - value = self._handle_schema_reference_for_validator(field, value) - except _Abort: - return - - _hash = (mapping_hash(value), mapping_hash(self.target_validator.types_mapping)) - if _hash in self.target_validator._valid_schemas: - return - - validator = self._get_child_validator( - document_crumb=field, schema=None, allow_unknown=self.root_allow_unknown - ) - validator(self._expand_rules_set_refs(value), normalize=False) - if validator._errors: - self._error(validator._errors) - else: - self.target_validator._valid_schemas.add(_hash) - - def _check_with_type(self, field, value): - value = set((value,)) if isinstance(value, _str_type) else set(value) - invalid_constraints = value - set(self.target_validator.types) - if invalid_constraints: - self._error( - field, 'Unsupported types: {}'.format(', '.join(invalid_constraints)) - ) - - def _expand_rules_set_refs(self, schema): - result = {} - for k, v in schema.items(): - if isinstance(v, _str_type): - result[k] = self.target_validator.rules_set_registry.get(v) - else: - result[k] = v - return result - - def _handle_schema_reference_for_validator(self, field, value): - if not isinstance(value, _str_type): - return value - if value in self.known_schema_refs: - raise _Abort - - self.known_schema_refs.add(value) - definition = self.target_validator.schema_registry.get(value) - if definition is None: - path = self.document_path + (field,) - self._error(path, 'Schema definition {} not found.'.format(value)) - raise _Abort - return definition - - def _validate_logical(self, rule, field, value): - """{'allowed': ('allof', 'anyof', 'noneof', 'oneof')}""" - if not isinstance(value, Sequence): - self._error(field, errors.BAD_TYPE) - return - - validator = self._get_child_validator( - document_crumb=rule, - allow_unknown=False, - schema=self.target_validator.validation_rules, - ) - - for constraints in value: - _hash = ( - mapping_hash({'turing': constraints}), - mapping_hash(self.target_validator.types_mapping), - ) - if _hash in self.target_validator._valid_schemas: - continue - - validator(constraints, normalize=False) - if validator._errors: - self._error(validator._errors) - else: - self.target_validator._valid_schemas.add(_hash) - - -#### - - -class Registry(object): - """ - A registry to store and retrieve schemas and parts of it by a name that can be used - in validation schemas. - - :param definitions: Optional, initial definitions. - :type definitions: any :term:`mapping` - """ - - def __init__(self, definitions={}): - self._storage = {} - self.extend(definitions) - - def add(self, name, definition): - """ - Register a definition to the registry. Existing definitions are replaced - silently. - - :param name: The name which can be used as reference in a validation - schema. - :type name: :class:`str` - :param definition: The definition. - :type definition: any :term:`mapping` - """ - self._storage[name] = self._expand_definition(definition) - - def all(self): - """ - Returns a :class:`dict` with all registered definitions mapped to their name. - """ - return self._storage - - def clear(self): - """Purge all definitions in the registry.""" - self._storage.clear() - - def extend(self, definitions): - """ - Add several definitions at once. Existing definitions are - replaced silently. - - :param definitions: The names and definitions. - :type definitions: a :term:`mapping` or an :term:`iterable` with - two-value :class:`tuple` s - """ - for name, definition in dict(definitions).items(): - self.add(name, definition) - - def get(self, name, default=None): - """ - Retrieve a definition from the registry. - - :param name: The reference that points to the definition. - :type name: :class:`str` - :param default: Return value if the reference isn't registered. - """ - return self._storage.get(name, default) - - def remove(self, *names): - """ - Unregister definitions from the registry. - - :param names: The names of the definitions that are to be - unregistered. - """ - for name in names: - self._storage.pop(name, None) - - -class SchemaRegistry(Registry): - @classmethod - def _expand_definition(cls, definition): - return DefinitionSchema.expand(definition) - - -class RulesSetRegistry(Registry): - @classmethod - def _expand_definition(cls, definition): - return DefinitionSchema.expand({0: definition})[0] - - -schema_registry, rules_set_registry = SchemaRegistry(), RulesSetRegistry() diff --git a/pipenv/vendor/cerberus/utils.py b/pipenv/vendor/cerberus/utils.py deleted file mode 100644 index 908c5fa1..00000000 --- a/pipenv/vendor/cerberus/utils.py +++ /dev/null @@ -1,132 +0,0 @@ -from __future__ import absolute_import - -from collections import namedtuple - -from pipenv.vendor.cerberus.platform import _int_types, _str_type, Mapping, Sequence, Set - - -TypeDefinition = namedtuple('TypeDefinition', 'name,included_types,excluded_types') -""" -This class is used to define types that can be used as value in the -:attr:`~cerberus.Validator.types_mapping` property. -The ``name`` should be descriptive and match the key it is going to be assigned -to. -A value that is validated against such definition must be an instance of any of -the types contained in ``included_types`` and must not match any of the types -contained in ``excluded_types``. -""" - - -def compare_paths_lt(x, y): - min_length = min(len(x), len(y)) - - if x[:min_length] == y[:min_length]: - return len(x) == min_length - - for i in range(min_length): - a, b = x[i], y[i] - - for _type in (_int_types, _str_type, tuple): - if isinstance(a, _type): - if isinstance(b, _type): - break - else: - return True - - if a == b: - continue - elif a < b: - return True - else: - return False - - raise RuntimeError - - -def drop_item_from_tuple(t, i): - return t[:i] + t[i + 1 :] - - -def get_Validator_class(): - global Validator - if 'Validator' not in globals(): - from pipenv.vendor.cerberus.validator import Validator - return Validator - - -def mapping_hash(schema): - return hash(mapping_to_frozenset(schema)) - - -def mapping_to_frozenset(mapping): - """ - Be aware that this treats any sequence type with the equal members as equal. As it - is used to identify equality of schemas, this can be considered okay as definitions - are semantically equal regardless the container type. - """ - - aggregation = {} - - for key, value in mapping.items(): - if isinstance(value, Mapping): - aggregation[key] = mapping_to_frozenset(value) - elif isinstance(value, Sequence): - value = list(value) - for i, item in enumerate(value): - if isinstance(item, Mapping): - value[i] = mapping_to_frozenset(item) - aggregation[key] = tuple(value) - elif isinstance(value, Set): - aggregation[key] = frozenset(value) - else: - aggregation[key] = value - - return frozenset(aggregation.items()) - - -def quote_string(value): - if isinstance(value, _str_type): - return '"%s"' % value - else: - return value - - -class readonly_classproperty(property): - def __get__(self, instance, owner): - return super(readonly_classproperty, self).__get__(owner) - - def __set__(self, instance, value): - raise RuntimeError('This is a readonly class property.') - - def __delete__(self, instance): - raise RuntimeError('This is a readonly class property.') - - -def validator_factory(name, bases=None, namespace={}): - """ - Dynamically create a :class:`~cerberus.Validator` subclass. - Docstrings of mixin-classes will be added to the resulting class' one if ``__doc__`` - is not in :obj:`namespace`. - - :param name: The name of the new class. - :type name: :class:`str` - :param bases: Class(es) with additional and overriding attributes. - :type bases: :class:`tuple` of or a single :term:`class` - :param namespace: Attributes for the new class. - :type namespace: :class:`dict` - :return: The created class. - """ - Validator = get_Validator_class() - - if bases is None: - bases = (Validator,) - elif isinstance(bases, tuple): - bases += (Validator,) - else: - bases = (bases, Validator) - - docstrings = [x.__doc__ for x in bases if x.__doc__] - if len(docstrings) > 1 and '__doc__' not in namespace: - namespace.update({'__doc__': '\n'.join(docstrings)}) - - return type(name, bases, namespace) diff --git a/pipenv/vendor/cerberus/validator.py b/pipenv/vendor/cerberus/validator.py deleted file mode 100644 index 3d6b2122..00000000 --- a/pipenv/vendor/cerberus/validator.py +++ /dev/null @@ -1,1682 +0,0 @@ -""" - Extensible validation for Python dictionaries. - This module implements Cerberus Validator class - - :copyright: 2012-2016 by Nicola Iarocci. - :license: ISC, see LICENSE for more details. - - Full documentation is available at http://python-cerberus.org -""" - -from __future__ import absolute_import - -from ast import literal_eval -from copy import copy -from datetime import date, datetime -import re -from warnings import warn - -from pipenv.vendor.cerberus import errors -from pipenv.vendor.cerberus.platform import ( - _int_types, - _str_type, - Container, - Hashable, - Iterable, - Mapping, - Sequence, - Sized, -) -from pipenv.vendor.cerberus.schema import ( - schema_registry, - rules_set_registry, - DefinitionSchema, - SchemaError, -) -from pipenv.vendor.cerberus.utils import drop_item_from_tuple, readonly_classproperty, TypeDefinition - -toy_error_handler = errors.ToyErrorHandler() - - -def dummy_for_rule_validation(rule_constraints): - def dummy(self, constraint, field, value): - raise RuntimeError( - 'Dummy method called. Its purpose is to hold just' - 'validation constraints for a rule in its ' - 'docstring.' - ) - - f = dummy - f.__doc__ = rule_constraints - return f - - -class DocumentError(Exception): - """Raised when the target document is missing or has the wrong format""" - - pass - - -class _SchemaRuleTypeError(Exception): - """ - Raised when a schema (list) validation encounters a mapping. - Not supposed to be used outside this module. - """ - - pass - - -class BareValidator(object): - """ - Validator class. Normalizes and/or validates any mapping against a - validation-schema which is provided as an argument at class instantiation - or upon calling the :meth:`~cerberus.Validator.validate`, - :meth:`~cerberus.Validator.validated` or - :meth:`~cerberus.Validator.normalized` method. An instance itself is - callable and executes a validation. - - All instantiation parameters are optional. - - There are the introspective properties :attr:`types`, :attr:`validators`, - :attr:`coercers`, :attr:`default_setters`, :attr:`rules`, - :attr:`normalization_rules` and :attr:`validation_rules`. - - The attributes reflecting the available rules are assembled considering - constraints that are defined in the docstrings of rules' methods and is - effectively used as validation schema for :attr:`schema`. - - :param schema: See :attr:`~cerberus.Validator.schema`. - Defaults to :obj:`None`. - :type schema: any :term:`mapping` - :param ignore_none_values: See :attr:`~cerberus.Validator.ignore_none_values`. - Defaults to ``False``. - :type ignore_none_values: :class:`bool` - :param allow_unknown: See :attr:`~cerberus.Validator.allow_unknown`. - Defaults to ``False``. - :type allow_unknown: :class:`bool` or any :term:`mapping` - :param require_all: See :attr:`~cerberus.Validator.require_all`. - Defaults to ``False``. - :type require_all: :class:`bool` - :param purge_unknown: See :attr:`~cerberus.Validator.purge_unknown`. - Defaults to to ``False``. - :type purge_unknown: :class:`bool` - :param purge_readonly: Removes all fields that are defined as ``readonly`` in the - normalization phase. - :type purge_readonly: :class:`bool` - :param error_handler: The error handler that formats the result of - :attr:`~cerberus.Validator.errors`. - When given as two-value tuple with an error-handler - class and a dictionary, the latter is passed to the - initialization of the error handler. - Default: :class:`~cerberus.errors.BasicErrorHandler`. - :type error_handler: class or instance based on - :class:`~cerberus.errors.BaseErrorHandler` or - :class:`tuple` - """ # noqa: E501 - - mandatory_validations = ('nullable',) - """ - Rules that are evaluated on any field, regardless whether defined in the schema or - not. - Type: :class:`tuple` - """ - priority_validations = ('nullable', 'readonly', 'type', 'empty') - """ - Rules that will be processed in that order before any other. - Type: :class:`tuple` - """ - types_mapping = { - 'binary': TypeDefinition('binary', (bytes, bytearray), ()), - 'boolean': TypeDefinition('boolean', (bool,), ()), - 'container': TypeDefinition('container', (Container,), (_str_type,)), - 'date': TypeDefinition('date', (date,), ()), - 'datetime': TypeDefinition('datetime', (datetime,), ()), - 'dict': TypeDefinition('dict', (Mapping,), ()), - 'float': TypeDefinition('float', (float, _int_types), ()), - 'integer': TypeDefinition('integer', (_int_types,), ()), - 'list': TypeDefinition('list', (Sequence,), (_str_type,)), - 'number': TypeDefinition('number', (_int_types, float), (bool,)), - 'set': TypeDefinition('set', (set,), ()), - 'string': TypeDefinition('string', (_str_type,), ()), - } - """ - This mapping holds all available constraints for the type rule and their assigned - :class:`~cerberus.TypeDefinition`. - """ - _valid_schemas = set() - """ - A :class:`set` of hashes derived from validation schemas that are legit for a - particular ``Validator`` class. - """ - - def __init__(self, *args, **kwargs): - """ - The arguments will be treated as with this signature: - - __init__(self, schema=None, ignore_none_values=False, - allow_unknown=False, require_all=False, - purge_unknown=False, purge_readonly=False, - error_handler=errors.BasicErrorHandler) - """ - - self.document = None - """ The document that is or was recently processed. - Type: any :term:`mapping` """ - self._errors = errors.ErrorList() - """ The list of errors that were encountered since the last document - processing was invoked. - Type: :class:`~cerberus.errors.ErrorList` """ - self.recent_error = None - """ The last individual error that was submitted. - Type: :class:`~cerberus.errors.ValidationError` """ - self.document_error_tree = errors.DocumentErrorTree() - """ A tree representiation of encountered errors following the - structure of the document. - Type: :class:`~cerberus.errors.DocumentErrorTree` """ - self.schema_error_tree = errors.SchemaErrorTree() - """ A tree representiation of encountered errors following the - structure of the schema. - Type: :class:`~cerberus.errors.SchemaErrorTree` """ - self.document_path = () - """ The path within the document to the current sub-document. - Type: :class:`tuple` """ - self.schema_path = () - """ The path within the schema to the current sub-schema. - Type: :class:`tuple` """ - self.update = False - self.error_handler = self.__init_error_handler(kwargs) - """ The error handler used to format :attr:`~cerberus.Validator.errors` - and process submitted errors with - :meth:`~cerberus.Validator._error`. - Type: :class:`~cerberus.errors.BaseErrorHandler` """ - self.__store_config(args, kwargs) - self.schema = kwargs.get('schema', None) - self.allow_unknown = kwargs.get('allow_unknown', False) - self.require_all = kwargs.get('require_all', False) - self._remaining_rules = [] - """ Keeps track of the rules that are next in line to be evaluated - during the validation of a field. - Type: :class:`list` """ - - super(BareValidator, self).__init__() - - @staticmethod - def __init_error_handler(kwargs): - error_handler = kwargs.pop('error_handler', errors.BasicErrorHandler) - if isinstance(error_handler, tuple): - error_handler, eh_config = error_handler - else: - eh_config = {} - if isinstance(error_handler, type) and issubclass( - error_handler, errors.BaseErrorHandler - ): - return error_handler(**eh_config) - elif isinstance(error_handler, errors.BaseErrorHandler): - return error_handler - else: - raise RuntimeError('Invalid error_handler.') - - def __store_config(self, args, kwargs): - """Assign args to kwargs and store configuration.""" - signature = ( - 'schema', - 'ignore_none_values', - 'allow_unknown', - 'require_all', - 'purge_unknown', - 'purge_readonly', - ) - for i, p in enumerate(signature[: len(args)]): - if p in kwargs: - raise TypeError("__init__ got multiple values for argument " "'%s'" % p) - else: - kwargs[p] = args[i] - self._config = kwargs - """ This dictionary holds the configuration arguments that were used to - initialize the :class:`Validator` instance except the - ``error_handler``. """ - - @classmethod - def clear_caches(cls): - """Purge the cache of known valid schemas.""" - cls._valid_schemas.clear() - - def _error(self, *args): - """ - Creates and adds one or multiple errors. - - :param args: Accepts different argument's signatures. - - *1. Bulk addition of errors:* - - - :term:`iterable` of - :class:`~cerberus.errors.ValidationError`-instances - - The errors will be added to - :attr:`~cerberus.Validator._errors`. - - *2. Custom error:* - - - the invalid field's name - - - the error message - - A custom error containing the message will be created and - added to :attr:`~cerberus.Validator._errors`. - There will however be fewer information contained in the - error (no reference to the violated rule and its - constraint). - - *3. Defined error:* - - - the invalid field's name - - - the error-reference, see :mod:`cerberus.errors` - - - arbitrary, supplemental information about the error - - A :class:`~cerberus.errors.ValidationError` instance will - be created and added to - :attr:`~cerberus.Validator._errors`. - """ - if len(args) == 1: - self._errors.extend(args[0]) - self._errors.sort() - for error in args[0]: - self.document_error_tree.add(error) - self.schema_error_tree.add(error) - self.error_handler.emit(error) - elif len(args) == 2 and isinstance(args[1], _str_type): - self._error(args[0], errors.CUSTOM, args[1]) - elif len(args) >= 2: - field = args[0] - code = args[1].code - rule = args[1].rule - info = args[2:] - - document_path = self.document_path + (field,) - - schema_path = self.schema_path - if code != errors.UNKNOWN_FIELD.code and rule is not None: - schema_path += (field, rule) - - if not rule: - constraint = None - else: - rules_set = self._resolve_rules_set( - self._resolve_schema(self.schema)[field] - ) - if rule == 'nullable': - constraint = rules_set.get(rule, False) - elif rule == 'required': - constraint = rules_set.get(rule, self.require_all) - if rule not in rules_set: - schema_path = "__require_all__" - else: - constraint = rules_set[rule] - - value = self.document.get(field) - - self.recent_error = errors.ValidationError( - document_path, schema_path, code, rule, constraint, value, info - ) - self._error([self.recent_error]) - - def _get_child_validator(self, document_crumb=None, schema_crumb=None, **kwargs): - """ - Creates a new instance of Validator-(sub-)class. All initial parameters of the - parent are passed to the initialization, unless a parameter is given as an - explicit *keyword*-parameter. - - :param document_crumb: Extends the - :attr:`~cerberus.Validator.document_path` - of the child-validator. - :type document_crumb: :class:`tuple` or :term:`hashable` - :param schema_crumb: Extends the - :attr:`~cerberus.Validator.schema_path` - of the child-validator. - :type schema_crumb: :class:`tuple` or hashable - :param kwargs: Overriding keyword-arguments for initialization. - :type kwargs: :class:`dict` - - :return: an instance of ``self.__class__`` - """ - child_config = self._config.copy() - child_config.update(kwargs) - if not self.is_child: - child_config['is_child'] = True - child_config['error_handler'] = toy_error_handler - child_config['root_allow_unknown'] = self.allow_unknown - child_config['root_require_all'] = self.require_all - child_config['root_document'] = self.document - child_config['root_schema'] = self.schema - - child_validator = self.__class__(**child_config) - - if document_crumb is None: - child_validator.document_path = self.document_path - else: - if not isinstance(document_crumb, tuple): - document_crumb = (document_crumb,) - child_validator.document_path = self.document_path + document_crumb - - if schema_crumb is None: - child_validator.schema_path = self.schema_path - else: - if not isinstance(schema_crumb, tuple): - schema_crumb = (schema_crumb,) - child_validator.schema_path = self.schema_path + schema_crumb - - return child_validator - - def __get_rule_handler(self, domain, rule): - methodname = '_{0}_{1}'.format(domain, rule.replace(' ', '_')) - result = getattr(self, methodname, None) - if result is None: - raise RuntimeError( - "There's no handler for '{}' in the '{}' " - "domain.".format(rule, domain) - ) - return result - - def _drop_nodes_from_errorpaths(self, _errors, dp_items, sp_items): - """ - Removes nodes by index from an errorpath, relatively to the basepaths of self. - - :param errors: A list of :class:`errors.ValidationError` instances. - :param dp_items: A list of integers, pointing at the nodes to drop from - the :attr:`document_path`. - :param sp_items: Alike ``dp_items``, but for :attr:`schema_path`. - """ - dp_basedepth = len(self.document_path) - sp_basedepth = len(self.schema_path) - for error in _errors: - for i in sorted(dp_items, reverse=True): - error.document_path = drop_item_from_tuple( - error.document_path, dp_basedepth + i - ) - for i in sorted(sp_items, reverse=True): - error.schema_path = drop_item_from_tuple( - error.schema_path, sp_basedepth + i - ) - if error.child_errors: - self._drop_nodes_from_errorpaths(error.child_errors, dp_items, sp_items) - - def _lookup_field(self, path): - """ - Searches for a field as defined by path. This method is used by the - ``dependency`` evaluation logic. - - :param path: Path elements are separated by a ``.``. A leading ``^`` - indicates that the path relates to the document root, - otherwise it relates to the currently evaluated document, - which is possibly a subdocument. - The sequence ``^^`` at the start will be interpreted as a - literal ``^``. - :type path: :class:`str` - :returns: Either the found field name and its value or :obj:`None` for - both. - :rtype: A two-value :class:`tuple`. - """ - if path.startswith('^'): - path = path[1:] - context = self.document if path.startswith('^') else self.root_document - else: - context = self.document - - parts = path.split('.') - for part in parts: - if part not in context: - return None, None - context = context.get(part, {}) - - return parts[-1], context - - def _resolve_rules_set(self, rules_set): - if isinstance(rules_set, Mapping): - return rules_set - elif isinstance(rules_set, _str_type): - return self.rules_set_registry.get(rules_set) - return None - - def _resolve_schema(self, schema): - if isinstance(schema, Mapping): - return schema - elif isinstance(schema, _str_type): - return self.schema_registry.get(schema) - return None - - # Properties - - @property - def allow_unknown(self): - """ - If ``True`` unknown fields that are not defined in the schema will be ignored. - If a mapping with a validation schema is given, any undefined field will be - validated against its rules. Also see :ref:`allowing-the-unknown`. - Type: :class:`bool` or any :term:`mapping` - """ - return self._config.get('allow_unknown', False) - - @allow_unknown.setter - def allow_unknown(self, value): - if not (self.is_child or isinstance(value, (bool, DefinitionSchema))): - DefinitionSchema(self, {'allow_unknown': value}) - self._config['allow_unknown'] = value - - @property - def require_all(self): - """ - If ``True`` known fields that are defined in the schema will be required. - Type: :class:`bool` - """ - return self._config.get('require_all', False) - - @require_all.setter - def require_all(self, value): - self._config['require_all'] = value - - @property - def errors(self): - """ - The errors of the last processing formatted by the handler that is bound to - :attr:`~cerberus.Validator.error_handler`. - """ - return self.error_handler(self._errors) - - @property - def ignore_none_values(self): - """ - Whether to not process :obj:`None`-values in a document or not. - Type: :class:`bool` - """ - return self._config.get('ignore_none_values', False) - - @ignore_none_values.setter - def ignore_none_values(self, value): - self._config['ignore_none_values'] = value - - @property - def is_child(self): - """ - ``True`` for child-validators obtained with - :meth:`~cerberus.Validator._get_child_validator`. - Type: :class:`bool` - """ - return self._config.get('is_child', False) - - @property - def _is_normalized(self): - """``True`` if the document is already normalized.""" - return self._config.get('_is_normalized', False) - - @_is_normalized.setter - def _is_normalized(self, value): - self._config['_is_normalized'] = value - - @property - def purge_unknown(self): - """ - If ``True``, unknown fields will be deleted from the document unless a - validation is called with disabled normalization. Also see - :ref:`purging-unknown-fields`. - Type: :class:`bool` - """ - return self._config.get('purge_unknown', False) - - @purge_unknown.setter - def purge_unknown(self, value): - self._config['purge_unknown'] = value - - @property - def purge_readonly(self): - """ - If ``True``, fields declared as readonly will be deleted from the document - unless a validation is called with disabled normalization. - Type: :class:`bool` - """ - return self._config.get('purge_readonly', False) - - @purge_readonly.setter - def purge_readonly(self, value): - self._config['purge_readonly'] = value - - @property - def root_allow_unknown(self): - """ - The :attr:`~cerberus.Validator.allow_unknown` attribute of the first level - ancestor of a child validator. - """ - return self._config.get('root_allow_unknown', self.allow_unknown) - - @property - def root_require_all(self): - """ - The :attr:`~cerberus.Validator.require_all` attribute of the first level - ancestor of a child validator. - """ - return self._config.get('root_require_all', self.require_all) - - @property - def root_document(self): - """ - The :attr:`~cerberus.Validator.document` attribute of the first level ancestor - of a child validator. - """ - return self._config.get('root_document', self.document) - - @property - def rules_set_registry(self): - """ - The registry that holds referenced rules sets. - Type: :class:`~cerberus.Registry` - """ - return self._config.get('rules_set_registry', rules_set_registry) - - @rules_set_registry.setter - def rules_set_registry(self, registry): - self._config['rules_set_registry'] = registry - - @property - def root_schema(self): - """ - The :attr:`~cerberus.Validator.schema` attribute of the first level ancestor of - a child validator. - """ - return self._config.get('root_schema', self.schema) - - @property - def schema(self): - """ - The validation schema of a validator. When a schema is passed to a method, it - replaces this attribute. - Type: any :term:`mapping` or :obj:`None` - """ - return self._schema - - @schema.setter - def schema(self, schema): - if schema is None: - self._schema = None - elif self.is_child or isinstance(schema, DefinitionSchema): - self._schema = schema - else: - self._schema = DefinitionSchema(self, schema) - - @property - def schema_registry(self): - """ - The registry that holds referenced schemas. - Type: :class:`~cerberus.Registry` - """ - return self._config.get('schema_registry', schema_registry) - - @schema_registry.setter - def schema_registry(self, registry): - self._config['schema_registry'] = registry - - # FIXME the returned method has the correct docstring, but doesn't appear - # in the API docs - @readonly_classproperty - def types(cls): - """ - The constraints that can be used for the 'type' rule. - Type: A tuple of strings. - """ - redundant_types = set(cls.types_mapping) & set(cls._types_from_methods) - if redundant_types: - warn( - "These types are defined both with a method and in the" - "'types_mapping' property of this validator: %s" % redundant_types - ) - - return tuple(cls.types_mapping) + cls._types_from_methods - - # Document processing - - def __init_processing(self, document, schema=None): - self._errors = errors.ErrorList() - self.recent_error = None - self.document_error_tree = errors.DocumentErrorTree() - self.schema_error_tree = errors.SchemaErrorTree() - self.document = copy(document) - if not self.is_child: - self._is_normalized = False - - if schema is not None: - self.schema = DefinitionSchema(self, schema) - elif self.schema is None: - if isinstance(self.allow_unknown, Mapping): - self._schema = {} - else: - raise SchemaError(errors.SCHEMA_ERROR_MISSING) - if document is None: - raise DocumentError(errors.DOCUMENT_MISSING) - if not isinstance(document, Mapping): - raise DocumentError(errors.DOCUMENT_FORMAT.format(document)) - self.error_handler.start(self) - - def _drop_remaining_rules(self, *rules): - """ - Drops rules from the queue of the rules that still need to be evaluated for the - currently processed field. If no arguments are given, the whole queue is - emptied. - """ - if rules: - for rule in rules: - try: - self._remaining_rules.remove(rule) - except ValueError: - pass - else: - self._remaining_rules = [] - - # # Normalizing - - def normalized(self, document, schema=None, always_return_document=False): - """ - Returns the document normalized according to the specified rules of a schema. - - :param document: The document to normalize. - :type document: any :term:`mapping` - :param schema: The validation schema. Defaults to :obj:`None`. If not - provided here, the schema must have been provided at - class instantiation. - :type schema: any :term:`mapping` - :param always_return_document: Return the document, even if an error - occurred. Defaults to: ``False``. - :type always_return_document: :class:`bool` - :return: A normalized copy of the provided mapping or :obj:`None` if an - error occurred during normalization. - """ - self.__init_processing(document, schema) - self.__normalize_mapping(self.document, self.schema) - self.error_handler.end(self) - if self._errors and not always_return_document: - return None - else: - return self.document - - def __normalize_mapping(self, mapping, schema): - if isinstance(schema, _str_type): - schema = self._resolve_schema(schema) - schema = schema.copy() - for field in schema: - schema[field] = self._resolve_rules_set(schema[field]) - - self.__normalize_rename_fields(mapping, schema) - if self.purge_unknown and not self.allow_unknown: - self._normalize_purge_unknown(mapping, schema) - if self.purge_readonly: - self.__normalize_purge_readonly(mapping, schema) - # Check `readonly` fields before applying default values because - # a field's schema definition might contain both `readonly` and - # `default`. - self.__validate_readonly_fields(mapping, schema) - self.__normalize_default_fields(mapping, schema) - self._normalize_coerce(mapping, schema) - self.__normalize_containers(mapping, schema) - self._is_normalized = True - return mapping - - def _normalize_coerce(self, mapping, schema): - """ - {'oneof': [ - {'type': 'callable'}, - {'type': 'list', - 'schema': {'oneof': [{'type': 'callable'}, - {'type': 'string'}]}}, - {'type': 'string'} - ]} - """ - - error = errors.COERCION_FAILED - for field in mapping: - if field in schema and 'coerce' in schema[field]: - mapping[field] = self.__normalize_coerce( - schema[field]['coerce'], - field, - mapping[field], - schema[field].get('nullable', False), - error, - ) - elif ( - isinstance(self.allow_unknown, Mapping) - and 'coerce' in self.allow_unknown - ): - mapping[field] = self.__normalize_coerce( - self.allow_unknown['coerce'], - field, - mapping[field], - self.allow_unknown.get('nullable', False), - error, - ) - - def __normalize_coerce(self, processor, field, value, nullable, error): - if isinstance(processor, _str_type): - processor = self.__get_rule_handler('normalize_coerce', processor) - - elif isinstance(processor, Iterable): - result = value - for p in processor: - result = self.__normalize_coerce(p, field, result, nullable, error) - if ( - errors.COERCION_FAILED - in self.document_error_tree.fetch_errors_from( - self.document_path + (field,) - ) - ): - break - return result - - try: - return processor(value) - except Exception as e: - if not (nullable and value is None): - self._error(field, error, str(e)) - return value - - def __normalize_containers(self, mapping, schema): - for field in mapping: - rules = set(schema.get(field, ())) - - # TODO: This check conflates validation and normalization - if isinstance(mapping[field], Mapping): - if 'keysrules' in rules: - self.__normalize_mapping_per_keysrules( - field, mapping, schema[field]['keysrules'] - ) - if 'valuesrules' in rules: - self.__normalize_mapping_per_valuesrules( - field, mapping, schema[field]['valuesrules'] - ) - if rules & set( - ('allow_unknown', 'purge_unknown', 'schema') - ) or isinstance(self.allow_unknown, Mapping): - try: - self.__normalize_mapping_per_schema(field, mapping, schema) - except _SchemaRuleTypeError: - pass - - elif isinstance(mapping[field], _str_type): - continue - - elif isinstance(mapping[field], Sequence): - if 'schema' in rules: - self.__normalize_sequence_per_schema(field, mapping, schema) - elif 'items' in rules: - self.__normalize_sequence_per_items(field, mapping, schema) - - def __normalize_mapping_per_keysrules(self, field, mapping, property_rules): - schema = dict(((k, property_rules) for k in mapping[field])) - document = dict(((k, k) for k in mapping[field])) - validator = self._get_child_validator( - document_crumb=field, schema_crumb=(field, 'keysrules'), schema=schema - ) - result = validator.normalized(document, always_return_document=True) - if validator._errors: - self._drop_nodes_from_errorpaths(validator._errors, [], [2, 4]) - self._error(validator._errors) - for k in result: - if k == result[k]: - continue - if result[k] in mapping[field]: - warn( - "Normalizing keys of {path}: {key} already exists, " - "its value is replaced.".format( - path='.'.join(str(x) for x in self.document_path + (field,)), - key=k, - ) - ) - mapping[field][result[k]] = mapping[field][k] - else: - mapping[field][result[k]] = mapping[field][k] - del mapping[field][k] - - def __normalize_mapping_per_valuesrules(self, field, mapping, value_rules): - schema = dict(((k, value_rules) for k in mapping[field])) - validator = self._get_child_validator( - document_crumb=field, schema_crumb=(field, 'valuesrules'), schema=schema - ) - mapping[field] = validator.normalized( - mapping[field], always_return_document=True - ) - if validator._errors: - self._drop_nodes_from_errorpaths(validator._errors, [], [2]) - self._error(validator._errors) - - def __normalize_mapping_per_schema(self, field, mapping, schema): - rules = schema.get(field, {}) - if not rules and isinstance(self.allow_unknown, Mapping): - rules = self.allow_unknown - validator = self._get_child_validator( - document_crumb=field, - schema_crumb=(field, 'schema'), - schema=rules.get('schema', {}), - allow_unknown=rules.get('allow_unknown', self.allow_unknown), # noqa: E501 - purge_unknown=rules.get('purge_unknown', self.purge_unknown), - require_all=rules.get('require_all', self.require_all), - ) # noqa: E501 - value_type = type(mapping[field]) - result_value = validator.normalized(mapping[field], always_return_document=True) - mapping[field] = value_type(result_value) - if validator._errors: - self._error(validator._errors) - - def __normalize_sequence_per_schema(self, field, mapping, schema): - schema = dict( - ((k, schema[field]['schema']) for k in range(len(mapping[field]))) - ) - document = dict((k, v) for k, v in enumerate(mapping[field])) - validator = self._get_child_validator( - document_crumb=field, schema_crumb=(field, 'schema'), schema=schema - ) - value_type = type(mapping[field]) - result = validator.normalized(document, always_return_document=True) - mapping[field] = value_type(result.values()) - if validator._errors: - self._drop_nodes_from_errorpaths(validator._errors, [], [2]) - self._error(validator._errors) - - def __normalize_sequence_per_items(self, field, mapping, schema): - rules, values = schema[field]['items'], mapping[field] - if len(rules) != len(values): - return - schema = dict(((k, v) for k, v in enumerate(rules))) - document = dict((k, v) for k, v in enumerate(values)) - validator = self._get_child_validator( - document_crumb=field, schema_crumb=(field, 'items'), schema=schema - ) - value_type = type(mapping[field]) - result = validator.normalized(document, always_return_document=True) - mapping[field] = value_type(result.values()) - if validator._errors: - self._drop_nodes_from_errorpaths(validator._errors, [], [2]) - self._error(validator._errors) - - @staticmethod - def __normalize_purge_readonly(mapping, schema): - for field in [x for x in mapping if schema.get(x, {}).get('readonly', False)]: - mapping.pop(field) - return mapping - - @staticmethod - def _normalize_purge_unknown(mapping, schema): - """{'type': 'boolean'}""" - for field in [x for x in mapping if x not in schema]: - mapping.pop(field) - return mapping - - def __normalize_rename_fields(self, mapping, schema): - for field in tuple(mapping): - if field in schema: - self._normalize_rename(mapping, schema, field) - self._normalize_rename_handler(mapping, schema, field) - elif ( - isinstance(self.allow_unknown, Mapping) - and 'rename_handler' in self.allow_unknown - ): - self._normalize_rename_handler( - mapping, {field: self.allow_unknown}, field - ) - return mapping - - def _normalize_rename(self, mapping, schema, field): - """{'type': 'hashable'}""" - if 'rename' in schema[field]: - mapping[schema[field]['rename']] = mapping[field] - del mapping[field] - - def _normalize_rename_handler(self, mapping, schema, field): - """ - {'oneof': [ - {'type': 'callable'}, - {'type': 'list', - 'schema': {'oneof': [{'type': 'callable'}, - {'type': 'string'}]}}, - {'type': 'string'} - ]} - """ - if 'rename_handler' not in schema[field]: - return - new_name = self.__normalize_coerce( - schema[field]['rename_handler'], field, field, False, errors.RENAMING_FAILED - ) - if new_name != field: - mapping[new_name] = mapping[field] - del mapping[field] - - def __validate_readonly_fields(self, mapping, schema): - for field in ( - x - for x in schema - if x in mapping and self._resolve_rules_set(schema[x]).get('readonly') - ): - self._validate_readonly(schema[field]['readonly'], field, mapping[field]) - - def __normalize_default_fields(self, mapping, schema): - empty_fields = [ - x - for x in schema - if x not in mapping - or ( - mapping[x] is None # noqa: W503 - and not schema[x].get('nullable', False) - ) # noqa: W503 - ] - - try: - fields_with_default = [x for x in empty_fields if 'default' in schema[x]] - except TypeError: - raise _SchemaRuleTypeError - for field in fields_with_default: - self._normalize_default(mapping, schema, field) - - known_fields_states = set() - fields_with_default_setter = [ - x for x in empty_fields if 'default_setter' in schema[x] - ] - while fields_with_default_setter: - field = fields_with_default_setter.pop(0) - try: - self._normalize_default_setter(mapping, schema, field) - except KeyError: - fields_with_default_setter.append(field) - except Exception as e: - self._error(field, errors.SETTING_DEFAULT_FAILED, str(e)) - - fields_processing_state = hash(tuple(fields_with_default_setter)) - if fields_processing_state in known_fields_states: - for field in fields_with_default_setter: - self._error( - field, - errors.SETTING_DEFAULT_FAILED, - 'Circular dependencies of default setters.', - ) - break - else: - known_fields_states.add(fields_processing_state) - - def _normalize_default(self, mapping, schema, field): - """{'nullable': True}""" - mapping[field] = schema[field]['default'] - - def _normalize_default_setter(self, mapping, schema, field): - """ - {'oneof': [ - {'type': 'callable'}, - {'type': 'string'} - ]} - """ - if 'default_setter' in schema[field]: - setter = schema[field]['default_setter'] - if isinstance(setter, _str_type): - setter = self.__get_rule_handler('normalize_default_setter', setter) - mapping[field] = setter(mapping) - - # # Validating - - def validate(self, document, schema=None, update=False, normalize=True): - """ - Normalizes and validates a mapping against a validation-schema of defined rules. - - :param document: The document to normalize. - :type document: any :term:`mapping` - :param schema: The validation schema. Defaults to :obj:`None`. If not - provided here, the schema must have been provided at - class instantiation. - :type schema: any :term:`mapping` - :param update: If ``True``, required fields won't be checked. - :type update: :class:`bool` - :param normalize: If ``True``, normalize the document before validation. - :type normalize: :class:`bool` - - :return: ``True`` if validation succeeds, otherwise ``False``. Check - the :func:`errors` property for a list of processing errors. - :rtype: :class:`bool` - """ - self.update = update - self._unrequired_by_excludes = set() - - self.__init_processing(document, schema) - if normalize: - self.__normalize_mapping(self.document, self.schema) - - for field in self.document: - if self.ignore_none_values and self.document[field] is None: - continue - definitions = self.schema.get(field) - if definitions is not None: - self.__validate_definitions(definitions, field) - else: - self.__validate_unknown_fields(field) - - if not self.update: - self.__validate_required_fields(self.document) - - self.error_handler.end(self) - - return not bool(self._errors) - - __call__ = validate - - def validated(self, *args, **kwargs): - """ - Wrapper around :meth:`~cerberus.Validator.validate` that returns the normalized - and validated document or :obj:`None` if validation failed. - """ - always_return_document = kwargs.pop('always_return_document', False) - self.validate(*args, **kwargs) - if self._errors and not always_return_document: - return None - else: - return self.document - - def __validate_unknown_fields(self, field): - if self.allow_unknown: - value = self.document[field] - if isinstance(self.allow_unknown, (Mapping, _str_type)): - # validate that unknown fields matches the schema - # for unknown_fields - schema_crumb = 'allow_unknown' if self.is_child else '__allow_unknown__' - validator = self._get_child_validator( - schema_crumb=schema_crumb, schema={field: self.allow_unknown} - ) - if not validator({field: value}, normalize=False): - self._error(validator._errors) - else: - self._error(field, errors.UNKNOWN_FIELD) - - def __validate_definitions(self, definitions, field): - """Validate a field's value against its defined rules.""" - - def validate_rule(rule): - validator = self.__get_rule_handler('validate', rule) - return validator(definitions.get(rule, None), field, value) - - definitions = self._resolve_rules_set(definitions) - value = self.document[field] - - rules_queue = [ - x - for x in self.priority_validations - if x in definitions or x in self.mandatory_validations - ] - rules_queue.extend( - x for x in self.mandatory_validations if x not in rules_queue - ) - rules_queue.extend( - x - for x in definitions - if x not in rules_queue - and x not in self.normalization_rules - and x not in ('allow_unknown', 'require_all', 'meta', 'required') - ) - self._remaining_rules = rules_queue - - while self._remaining_rules: - rule = self._remaining_rules.pop(0) - try: - result = validate_rule(rule) - # TODO remove on next breaking release - if result: - break - except _SchemaRuleTypeError: - break - - self._drop_remaining_rules() - - # Remember to keep the validation methods below this line - # sorted alphabetically - - _validate_allow_unknown = dummy_for_rule_validation( - """ {'oneof': [{'type': 'boolean'}, - {'type': ['dict', 'string'], - 'check_with': 'bulk_schema'}]} """ - ) - - def _validate_allowed(self, allowed_values, field, value): - """{'type': 'container'}""" - if isinstance(value, Iterable) and not isinstance(value, _str_type): - unallowed = tuple(x for x in value if x not in allowed_values) - if unallowed: - self._error(field, errors.UNALLOWED_VALUES, unallowed) - else: - if value not in allowed_values: - self._error(field, errors.UNALLOWED_VALUE, value) - - def _validate_check_with(self, checks, field, value): - """ - {'oneof': [ - {'type': 'callable'}, - {'type': 'list', - 'schema': {'oneof': [{'type': 'callable'}, - {'type': 'string'}]}}, - {'type': 'string'} - ]} - """ - if isinstance(checks, _str_type): - try: - value_checker = self.__get_rule_handler('check_with', checks) - # TODO remove on next major release - except RuntimeError: - value_checker = self.__get_rule_handler('validator', checks) - warn( - "The 'validator' rule was renamed to 'check_with'. Please update " - "your schema and method names accordingly.", - DeprecationWarning, - ) - value_checker(field, value) - elif isinstance(checks, Iterable): - for v in checks: - self._validate_check_with(v, field, value) - else: - checks(field, value, self._error) - - def _validate_contains(self, expected_values, field, value): - """{'empty': False }""" - if not isinstance(value, Iterable): - return - - if not isinstance(expected_values, Iterable) or isinstance( - expected_values, _str_type - ): - expected_values = set((expected_values,)) - else: - expected_values = set(expected_values) - - missing_values = expected_values - set(value) - if missing_values: - self._error(field, errors.MISSING_MEMBERS, missing_values) - - def _validate_dependencies(self, dependencies, field, value): - """{'type': ('dict', 'hashable', 'list'), 'check_with': 'dependencies'}""" - if isinstance(dependencies, _str_type) or not isinstance( - dependencies, (Iterable, Mapping) - ): - dependencies = (dependencies,) - - if isinstance(dependencies, Sequence): - self.__validate_dependencies_sequence(dependencies, field) - elif isinstance(dependencies, Mapping): - self.__validate_dependencies_mapping(dependencies, field) - - if ( - self.document_error_tree.fetch_node_from( - self.schema_path + (field, 'dependencies') - ) - is not None - ): - return True - - def __validate_dependencies_mapping(self, dependencies, field): - validated_dependencies_counter = 0 - error_info = {} - for dependency_name, dependency_values in dependencies.items(): - if not isinstance(dependency_values, Sequence) or isinstance( - dependency_values, _str_type - ): - dependency_values = [dependency_values] - - wanted_field, wanted_field_value = self._lookup_field(dependency_name) - if wanted_field_value in dependency_values: - validated_dependencies_counter += 1 - else: - error_info.update({dependency_name: wanted_field_value}) - - if validated_dependencies_counter != len(dependencies): - self._error(field, errors.DEPENDENCIES_FIELD_VALUE, error_info) - - def __validate_dependencies_sequence(self, dependencies, field): - for dependency in dependencies: - if self._lookup_field(dependency)[0] is None: - self._error(field, errors.DEPENDENCIES_FIELD, dependency) - - def _validate_empty(self, empty, field, value): - """{'type': 'boolean'}""" - if isinstance(value, Sized) and len(value) == 0: - self._drop_remaining_rules( - 'allowed', - 'forbidden', - 'items', - 'minlength', - 'maxlength', - 'regex', - 'check_with', - ) - if not empty: - self._error(field, errors.EMPTY_NOT_ALLOWED) - - def _validate_excludes(self, excluded_fields, field, value): - """{'type': ('hashable', 'list'), 'schema': {'type': 'hashable'}}""" - if isinstance(excluded_fields, Hashable): - excluded_fields = [excluded_fields] - - # Mark the currently evaluated field as not required for now if it actually is. - # One of the so marked will be needed to pass when required fields are checked. - if self.schema[field].get('required', self.require_all): - self._unrequired_by_excludes.add(field) - - for excluded_field in excluded_fields: - if excluded_field in self.schema and self.schema[field].get( - 'required', self.require_all - ): - - self._unrequired_by_excludes.add(excluded_field) - - if any(excluded_field in self.document for excluded_field in excluded_fields): - exclusion_str = ', '.join( - "'{0}'".format(field) for field in excluded_fields - ) - self._error(field, errors.EXCLUDES_FIELD, exclusion_str) - - def _validate_forbidden(self, forbidden_values, field, value): - """{'type': 'list'}""" - if isinstance(value, Sequence) and not isinstance(value, _str_type): - forbidden = set(value) & set(forbidden_values) - if forbidden: - self._error(field, errors.FORBIDDEN_VALUES, list(forbidden)) - else: - if value in forbidden_values: - self._error(field, errors.FORBIDDEN_VALUE, value) - - def _validate_items(self, items, field, values): - """{'type': 'list', 'check_with': 'items'}""" - if len(items) != len(values): - self._error(field, errors.ITEMS_LENGTH, len(items), len(values)) - else: - schema = dict( - (i, definition) for i, definition in enumerate(items) - ) # noqa: E501 - validator = self._get_child_validator( - document_crumb=field, - schema_crumb=(field, 'items'), # noqa: E501 - schema=schema, - ) - if not validator( - dict((i, value) for i, value in enumerate(values)), - update=self.update, - normalize=False, - ): - self._error(field, errors.BAD_ITEMS, validator._errors) - - def __validate_logical(self, operator, definitions, field, value): - """ - Validates value against all definitions and logs errors according to the - operator. - """ - valid_counter = 0 - _errors = errors.ErrorList() - - for i, definition in enumerate(definitions): - schema = {field: definition.copy()} - for rule in ('allow_unknown', 'type'): - if rule not in schema[field] and rule in self.schema[field]: - schema[field][rule] = self.schema[field][rule] - if 'allow_unknown' not in schema[field]: - schema[field]['allow_unknown'] = self.allow_unknown - - validator = self._get_child_validator( - schema_crumb=(field, operator, i), schema=schema, allow_unknown=True - ) - if validator(self.document, update=self.update, normalize=False): - valid_counter += 1 - else: - self._drop_nodes_from_errorpaths(validator._errors, [], [3]) - _errors.extend(validator._errors) - - return valid_counter, _errors - - def _validate_anyof(self, definitions, field, value): - """{'type': 'list', 'logical': 'anyof'}""" - valids, _errors = self.__validate_logical('anyof', definitions, field, value) - if valids < 1: - self._error(field, errors.ANYOF, _errors, valids, len(definitions)) - - def _validate_allof(self, definitions, field, value): - """{'type': 'list', 'logical': 'allof'}""" - valids, _errors = self.__validate_logical('allof', definitions, field, value) - if valids < len(definitions): - self._error(field, errors.ALLOF, _errors, valids, len(definitions)) - - def _validate_noneof(self, definitions, field, value): - """{'type': 'list', 'logical': 'noneof'}""" - valids, _errors = self.__validate_logical('noneof', definitions, field, value) - if valids > 0: - self._error(field, errors.NONEOF, _errors, valids, len(definitions)) - - def _validate_oneof(self, definitions, field, value): - """{'type': 'list', 'logical': 'oneof'}""" - valids, _errors = self.__validate_logical('oneof', definitions, field, value) - if valids != 1: - self._error(field, errors.ONEOF, _errors, valids, len(definitions)) - - def _validate_max(self, max_value, field, value): - """{'nullable': False }""" - try: - if value > max_value: - self._error(field, errors.MAX_VALUE) - except TypeError: - pass - - def _validate_min(self, min_value, field, value): - """{'nullable': False }""" - try: - if value < min_value: - self._error(field, errors.MIN_VALUE) - except TypeError: - pass - - def _validate_maxlength(self, max_length, field, value): - """{'type': 'integer'}""" - if isinstance(value, Iterable) and len(value) > max_length: - self._error(field, errors.MAX_LENGTH, len(value)) - - _validate_meta = dummy_for_rule_validation('') - - def _validate_minlength(self, min_length, field, value): - """{'type': 'integer'}""" - if isinstance(value, Iterable) and len(value) < min_length: - self._error(field, errors.MIN_LENGTH, len(value)) - - def _validate_nullable(self, nullable, field, value): - """{'type': 'boolean'}""" - if value is None: - if not nullable: - self._error(field, errors.NOT_NULLABLE) - self._drop_remaining_rules( - 'allowed', - 'empty', - 'forbidden', - 'items', - 'keysrules', - 'min', - 'max', - 'minlength', - 'maxlength', - 'regex', - 'schema', - 'type', - 'valuesrules', - ) - - def _validate_keysrules(self, schema, field, value): - """ - {'type': ['dict', 'string'], - 'check_with': 'bulk_schema', - 'forbidden': ['rename', 'rename_handler']} - """ - if isinstance(value, Mapping): - validator = self._get_child_validator( - document_crumb=field, - schema_crumb=(field, 'keysrules'), - schema=dict(((k, schema) for k in value.keys())), - ) - if not validator(dict(((k, k) for k in value.keys())), normalize=False): - self._drop_nodes_from_errorpaths(validator._errors, [], [2, 4]) - self._error(field, errors.KEYSRULES, validator._errors) - - def _validate_readonly(self, readonly, field, value): - """{'type': 'boolean'}""" - if readonly: - if not self._is_normalized: - self._error(field, errors.READONLY_FIELD) - # If the document was normalized (and therefore already been - # checked for readonly fields), we still have to return True - # if an error was filed. - has_error = ( - errors.READONLY_FIELD - in self.document_error_tree.fetch_errors_from( - self.document_path + (field,) - ) - ) - if self._is_normalized and has_error: - self._drop_remaining_rules() - - def _validate_regex(self, pattern, field, value): - """{'type': 'string'}""" - if not isinstance(value, _str_type): - return - if not pattern.endswith('$'): - pattern += '$' - re_obj = re.compile(pattern) - if not re_obj.match(value): - self._error(field, errors.REGEX_MISMATCH) - - _validate_required = dummy_for_rule_validation(""" {'type': 'boolean'} """) - - _validate_require_all = dummy_for_rule_validation(""" {'type': 'boolean'} """) - - def __validate_required_fields(self, document): - """ - Validates that required fields are not missing. - - :param document: The document being validated. - """ - try: - required = set( - field - for field, definition in self.schema.items() - if self._resolve_rules_set(definition).get('required', self.require_all) - is True - ) - except AttributeError: - if self.is_child and self.schema_path[-1] == 'schema': - raise _SchemaRuleTypeError - else: - raise - required -= self._unrequired_by_excludes - missing = required - set( - field - for field in document - if document.get(field) is not None or not self.ignore_none_values - ) - - for field in missing: - self._error(field, errors.REQUIRED_FIELD) - - # At least one field from self._unrequired_by_excludes should be present in - # document. - if self._unrequired_by_excludes: - fields = set(field for field in document if document.get(field) is not None) - if self._unrequired_by_excludes.isdisjoint(fields): - for field in self._unrequired_by_excludes - fields: - self._error(field, errors.REQUIRED_FIELD) - - def _validate_schema(self, schema, field, value): - """ - {'type': ['dict', 'string'], - 'anyof': [{'check_with': 'schema'}, - {'check_with': 'bulk_schema'}]} - """ - if schema is None: - return - - if isinstance(value, Sequence) and not isinstance(value, _str_type): - self.__validate_schema_sequence(field, schema, value) - elif isinstance(value, Mapping): - self.__validate_schema_mapping(field, schema, value) - - def __validate_schema_mapping(self, field, schema, value): - schema = self._resolve_schema(schema) - allow_unknown = self.schema[field].get('allow_unknown', self.allow_unknown) - require_all = self.schema[field].get('require_all', self.require_all) - validator = self._get_child_validator( - document_crumb=field, - schema_crumb=(field, 'schema'), - schema=schema, - allow_unknown=allow_unknown, - require_all=require_all, - ) - try: - if not validator(value, update=self.update, normalize=False): - self._error(field, errors.MAPPING_SCHEMA, validator._errors) - except _SchemaRuleTypeError: - self._error(field, errors.BAD_TYPE_FOR_SCHEMA) - raise - - def __validate_schema_sequence(self, field, schema, value): - schema = dict(((i, schema) for i in range(len(value)))) - validator = self._get_child_validator( - document_crumb=field, - schema_crumb=(field, 'schema'), - schema=schema, - allow_unknown=self.allow_unknown, - ) - validator( - dict(((i, v) for i, v in enumerate(value))), - update=self.update, - normalize=False, - ) - - if validator._errors: - self._drop_nodes_from_errorpaths(validator._errors, [], [2]) - self._error(field, errors.SEQUENCE_SCHEMA, validator._errors) - - def _validate_type(self, data_type, field, value): - """ - {'type': ['string', 'list'], - 'check_with': 'type'} - """ - if not data_type: - return - - types = (data_type,) if isinstance(data_type, _str_type) else data_type - - for _type in types: - # TODO remove this block on next major release - # this implementation still supports custom type validation methods - type_definition = self.types_mapping.get(_type) - if type_definition is not None: - matched = isinstance( - value, type_definition.included_types - ) and not isinstance(value, type_definition.excluded_types) - else: - type_handler = self.__get_rule_handler('validate_type', _type) - matched = type_handler(value) - if matched: - return - - # TODO uncomment this block on next major release - # when _validate_type_* methods were deprecated: - # type_definition = self.types_mapping[_type] - # if isinstance(value, type_definition.included_types) \ - # and not isinstance(value, type_definition.excluded_types): # noqa 501 - # return - - self._error(field, errors.BAD_TYPE) - self._drop_remaining_rules() - - def _validate_valuesrules(self, schema, field, value): - """ - {'type': ['dict', 'string'], - 'check_with': 'bulk_schema', - 'forbidden': ['rename', 'rename_handler']} - """ - schema_crumb = (field, 'valuesrules') - if isinstance(value, Mapping): - validator = self._get_child_validator( - document_crumb=field, - schema_crumb=schema_crumb, - schema=dict((k, schema) for k in value), - ) - validator(value, update=self.update, normalize=False) - if validator._errors: - self._drop_nodes_from_errorpaths(validator._errors, [], [2]) - self._error(field, errors.VALUESRULES, validator._errors) - - -RULE_SCHEMA_SEPARATOR = "The rule's arguments are validated against this schema:" - - -class InspectedValidator(type): - """Metaclass for all validators""" - - def __new__(cls, *args): - if '__doc__' not in args[2]: - args[2].update({'__doc__': args[1][0].__doc__}) - return super(InspectedValidator, cls).__new__(cls, *args) - - def __init__(cls, *args): - def attributes_with_prefix(prefix): - return tuple( - x[len(prefix) + 2 :] - for x in dir(cls) - if x.startswith('_' + prefix + '_') - ) - - super(InspectedValidator, cls).__init__(*args) - - cls._types_from_methods, cls.validation_rules = (), {} - for attribute in attributes_with_prefix('validate'): - # TODO remove inspection of type test methods in next major release - if attribute.startswith('type_'): - cls._types_from_methods += (attribute[len('type_') :],) - else: - cls.validation_rules[attribute] = cls.__get_rule_schema( - '_validate_' + attribute - ) - - # TODO remove on next major release - if cls._types_from_methods: - warn( - "Methods for type testing are deprecated, use TypeDefinition " - "and the 'types_mapping'-property of a Validator-instance " - "instead.", - DeprecationWarning, - ) - - # TODO remove second summand on next major release - cls.checkers = tuple(x for x in attributes_with_prefix('check_with')) + tuple( - x for x in attributes_with_prefix('validator') - ) - x = cls.validation_rules['check_with']['oneof'] - x[1]['schema']['oneof'][1]['allowed'] = x[2]['allowed'] = cls.checkers - - for rule in (x for x in cls.mandatory_validations if x != 'nullable'): - cls.validation_rules[rule]['required'] = True - - cls.coercers, cls.default_setters, cls.normalization_rules = (), (), {} - for attribute in attributes_with_prefix('normalize'): - if attribute.startswith('coerce_'): - cls.coercers += (attribute[len('coerce_') :],) - elif attribute.startswith('default_setter_'): - cls.default_setters += (attribute[len('default_setter_') :],) - else: - cls.normalization_rules[attribute] = cls.__get_rule_schema( - '_normalize_' + attribute - ) - - for rule in ('coerce', 'rename_handler'): - x = cls.normalization_rules[rule]['oneof'] - x[1]['schema']['oneof'][1]['allowed'] = x[2]['allowed'] = cls.coercers - cls.normalization_rules['default_setter']['oneof'][1][ - 'allowed' - ] = cls.default_setters - - cls.rules = {} - cls.rules.update(cls.validation_rules) - cls.rules.update(cls.normalization_rules) - - def __get_rule_schema(cls, method_name): - docstring = getattr(cls, method_name).__doc__ - if docstring is None: - result = {} - else: - if RULE_SCHEMA_SEPARATOR in docstring: - docstring = docstring.split(RULE_SCHEMA_SEPARATOR)[1] - try: - result = literal_eval(docstring.strip()) - except Exception: - result = {} - - if not result and method_name != '_validate_meta': - warn( - "No validation schema is defined for the arguments of rule " - "'%s'" % method_name.split('_', 2)[-1] - ) - - return result - - -Validator = InspectedValidator('Validator', (BareValidator,), {}) diff --git a/pipenv/vendor/vendor.txt b/pipenv/vendor/vendor.txt index 8eb02f2e..aa2c6697 100644 --- a/pipenv/vendor/vendor.txt +++ b/pipenv/vendor/vendor.txt @@ -1,5 +1,4 @@ attrs==23.1.0 -cerberus==1.3.4 click-didyoumean==0.3.0 click==8.1.3 colorama==0.4.6 @@ -8,7 +7,7 @@ markupsafe==2.1.2 pep517==0.13.0 pexpect==4.8.0 pipdeptree==2.7.0 -plette[validation]==0.4.4 +plette==0.4.4 ptyprocess==0.7.0 pydantic==1.10.7 python-dotenv==1.0.0 diff --git a/tests/unit/test_utils_windows_executable.py b/tests/unit/test_utils_windows_executable.py index b3b340f1..73f8b0a6 100644 --- a/tests/unit/test_utils_windows_executable.py +++ b/tests/unit/test_utils_windows_executable.py @@ -14,6 +14,7 @@ pytestmark = pytest.mark.skipif( @pytest.mark.utils +@pytest.mark.skipif(os.name != "nt", reason="Windows test only") @mock.patch('os.path.isfile') @mock.patch('shutil.which') def test_find_windows_executable_when_not_found(mocked_which, mocked_isfile): @@ -32,6 +33,7 @@ def test_find_windows_executable_when_not_found(mocked_which, mocked_isfile): @pytest.mark.utils +@pytest.mark.skipif(os.name != "nt", reason="Windows test only") @mock.patch('os.path.isfile') @mock.patch('shutil.which') def test_find_windows_executable_when_found(mocked_which, mocked_isfile):