Drop cerberus, it seems it's not used anywhere (#5699)

* Drop cerberus, it seems it's not used anywhere.
This commit is contained in:
Oz Tiram
2023-05-21 12:52:27 +02:00
committed by GitHub
parent 69e0e432f4
commit b1e3128e15
10 changed files with 4 additions and 3116 deletions
+1
View File
@@ -0,0 +1 @@
Drop vendored library cerberus. This isn't actually used by pipenv.
-15
View File
@@ -1,15 +0,0 @@
ISC License
Copyright (c) 2012-2016 Nicola Iarocci.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
-32
View File
@@ -1,32 +0,0 @@
"""
Extensible validation for Python dictionaries.
:copyright: 2012-2021 by Nicola Iarocci.
:license: ISC, see LICENSE for more details.
Full documentation is available at https://python-cerberus.org/
"""
from __future__ import absolute_import
from pipenv.patched.pip._vendor.pkg_resources import get_distribution, DistributionNotFound
from pipenv.vendor.cerberus.validator import DocumentError, Validator
from pipenv.vendor.cerberus.schema import rules_set_registry, schema_registry, SchemaError
from pipenv.vendor.cerberus.utils import TypeDefinition
try:
__version__ = get_distribution("Cerberus").version
except DistributionNotFound:
__version__ = "unknown"
__all__ = [
DocumentError.__name__,
SchemaError.__name__,
TypeDefinition.__name__,
Validator.__name__,
"schema_registry",
"rules_set_registry",
]
-654
View File
@@ -1,654 +0,0 @@
# -*-: coding utf-8 -*-
""" This module contains the error-related constants and classes. """
from __future__ import absolute_import
from collections import defaultdict, namedtuple
from copy import copy, deepcopy
from functools import wraps
from pprint import pformat
from pipenv.vendor.cerberus.platform import PYTHON_VERSION, MutableMapping
from pipenv.vendor.cerberus.utils import compare_paths_lt, quote_string
ErrorDefinition = namedtuple('ErrorDefinition', 'code, rule')
"""
This class is used to define possible errors. Each distinguishable error is
defined by a *unique* error ``code`` as integer and the ``rule`` that can
cause it as string.
The instances' names do not contain a common prefix as they are supposed to be
referenced within the module namespace, e.g. ``errors.CUSTOM``.
"""
# custom
CUSTOM = ErrorDefinition(0x00, None)
# existence
DOCUMENT_MISSING = ErrorDefinition(0x01, None) # issues/141
DOCUMENT_MISSING = "document is missing"
REQUIRED_FIELD = ErrorDefinition(0x02, 'required')
UNKNOWN_FIELD = ErrorDefinition(0x03, None)
DEPENDENCIES_FIELD = ErrorDefinition(0x04, 'dependencies')
DEPENDENCIES_FIELD_VALUE = ErrorDefinition(0x05, 'dependencies')
EXCLUDES_FIELD = ErrorDefinition(0x06, 'excludes')
# shape
DOCUMENT_FORMAT = ErrorDefinition(0x21, None) # issues/141
DOCUMENT_FORMAT = "'{0}' is not a document, must be a dict"
EMPTY_NOT_ALLOWED = ErrorDefinition(0x22, 'empty')
NOT_NULLABLE = ErrorDefinition(0x23, 'nullable')
BAD_TYPE = ErrorDefinition(0x24, 'type')
BAD_TYPE_FOR_SCHEMA = ErrorDefinition(0x25, 'schema')
ITEMS_LENGTH = ErrorDefinition(0x26, 'items')
MIN_LENGTH = ErrorDefinition(0x27, 'minlength')
MAX_LENGTH = ErrorDefinition(0x28, 'maxlength')
# color
REGEX_MISMATCH = ErrorDefinition(0x41, 'regex')
MIN_VALUE = ErrorDefinition(0x42, 'min')
MAX_VALUE = ErrorDefinition(0x43, 'max')
UNALLOWED_VALUE = ErrorDefinition(0x44, 'allowed')
UNALLOWED_VALUES = ErrorDefinition(0x45, 'allowed')
FORBIDDEN_VALUE = ErrorDefinition(0x46, 'forbidden')
FORBIDDEN_VALUES = ErrorDefinition(0x47, 'forbidden')
MISSING_MEMBERS = ErrorDefinition(0x48, 'contains')
# other
NORMALIZATION = ErrorDefinition(0x60, None)
COERCION_FAILED = ErrorDefinition(0x61, 'coerce')
RENAMING_FAILED = ErrorDefinition(0x62, 'rename_handler')
READONLY_FIELD = ErrorDefinition(0x63, 'readonly')
SETTING_DEFAULT_FAILED = ErrorDefinition(0x64, 'default_setter')
# groups
ERROR_GROUP = ErrorDefinition(0x80, None)
MAPPING_SCHEMA = ErrorDefinition(0x81, 'schema')
SEQUENCE_SCHEMA = ErrorDefinition(0x82, 'schema')
# TODO remove KEYSCHEMA AND VALUESCHEMA with next major release
KEYSRULES = KEYSCHEMA = ErrorDefinition(0x83, 'keysrules')
VALUESRULES = VALUESCHEMA = ErrorDefinition(0x84, 'valuesrules')
BAD_ITEMS = ErrorDefinition(0x8F, 'items')
LOGICAL = ErrorDefinition(0x90, None)
NONEOF = ErrorDefinition(0x91, 'noneof')
ONEOF = ErrorDefinition(0x92, 'oneof')
ANYOF = ErrorDefinition(0x93, 'anyof')
ALLOF = ErrorDefinition(0x94, 'allof')
""" SchemaError messages """
SCHEMA_ERROR_DEFINITION_TYPE = "schema definition for field '{0}' must be a dict"
SCHEMA_ERROR_MISSING = "validation schema missing"
""" Error representations """
class ValidationError(object):
"""A simple class to store and query basic error information."""
def __init__(self, document_path, schema_path, code, rule, constraint, value, info):
self.document_path = document_path
""" The path to the field within the document that caused the error.
Type: :class:`tuple` """
self.schema_path = schema_path
""" The path to the rule within the schema that caused the error.
Type: :class:`tuple` """
self.code = code
""" The error's identifier code. Type: :class:`int` """
self.rule = rule
""" The rule that failed. Type: `string` """
self.constraint = constraint
""" The constraint that failed. """
self.value = value
""" The value that failed. """
self.info = info
""" May hold additional information about the error.
Type: :class:`tuple` """
def __eq__(self, other):
"""Assumes the errors relate to the same document and schema."""
return hash(self) == hash(other)
def __hash__(self):
"""Expects that all other properties are transitively determined."""
return hash(self.document_path) ^ hash(self.schema_path) ^ hash(self.code)
def __lt__(self, other):
if self.document_path != other.document_path:
return compare_paths_lt(self.document_path, other.document_path)
else:
return compare_paths_lt(self.schema_path, other.schema_path)
def __repr__(self):
return (
"{class_name} @ {memptr} ( "
"document_path={document_path},"
"schema_path={schema_path},"
"code={code},"
"constraint={constraint},"
"value={value},"
"info={info} )".format(
class_name=self.__class__.__name__,
memptr=hex(id(self)), # noqa: E501
document_path=self.document_path,
schema_path=self.schema_path,
code=hex(self.code),
constraint=quote_string(self.constraint),
value=quote_string(self.value),
info=self.info,
)
)
@property
def child_errors(self):
"""
A list that contains the individual errors of a bulk validation error.
"""
return self.info[0] if self.is_group_error else None
@property
def definitions_errors(self):
"""
Dictionary with errors of an \*of-rule mapped to the index of the definition it
occurred in. Returns :obj:`None` if not applicable.
"""
if not self.is_logic_error:
return None
result = defaultdict(list)
for error in self.child_errors:
i = error.schema_path[len(self.schema_path)]
result[i].append(error)
return result
@property
def field(self):
"""Field of the contextual mapping, possibly :obj:`None`."""
if self.document_path:
return self.document_path[-1]
else:
return None
@property
def is_group_error(self):
"""``True`` for errors of bulk validations."""
return bool(self.code & ERROR_GROUP.code)
@property
def is_logic_error(self):
"""
``True`` for validation errors against different schemas with \*of-rules.
"""
return bool(self.code & LOGICAL.code - ERROR_GROUP.code)
@property
def is_normalization_error(self):
"""``True`` for normalization errors."""
return bool(self.code & NORMALIZATION.code)
class ErrorList(list):
"""
A list for :class:`~cerberus.errors.ValidationError` instances that can be queried
with the ``in`` keyword for a particular :class:`~cerberus.errors.ErrorDefinition`.
"""
def __contains__(self, error_definition):
if not isinstance(error_definition, ErrorDefinition):
raise TypeError
wanted_code = error_definition.code
return any(x.code == wanted_code for x in self)
class ErrorTreeNode(MutableMapping):
__slots__ = ('descendants', 'errors', 'parent_node', 'path', 'tree_root')
def __init__(self, path, parent_node):
self.parent_node = parent_node
self.tree_root = self.parent_node.tree_root
self.path = path[: self.parent_node.depth + 1]
self.errors = ErrorList()
self.descendants = {}
def __contains__(self, item):
if isinstance(item, ErrorDefinition):
return item in self.errors
else:
return item in self.descendants
def __delitem__(self, key):
del self.descendants[key]
def __iter__(self):
return iter(self.errors)
def __getitem__(self, item):
if isinstance(item, ErrorDefinition):
for error in self.errors:
if item.code == error.code:
return error
return None
else:
return self.descendants.get(item)
def __len__(self):
return len(self.errors)
def __repr__(self):
return self.__str__()
def __setitem__(self, key, value):
self.descendants[key] = value
def __str__(self):
return str(self.errors) + ',' + str(self.descendants)
@property
def depth(self):
return len(self.path)
@property
def tree_type(self):
return self.tree_root.tree_type
def add(self, error):
error_path = self._path_of_(error)
key = error_path[self.depth]
if key not in self.descendants:
self[key] = ErrorTreeNode(error_path, self)
node = self[key]
if len(error_path) == self.depth + 1:
node.errors.append(error)
node.errors.sort()
if error.is_group_error:
for child_error in error.child_errors:
self.tree_root.add(child_error)
else:
node.add(error)
def _path_of_(self, error):
return getattr(error, self.tree_type + '_path')
class ErrorTree(ErrorTreeNode):
"""
Base class for :class:`~cerberus.errors.DocumentErrorTree` and
:class:`~cerberus.errors.SchemaErrorTree`.
"""
def __init__(self, errors=()):
self.parent_node = None
self.tree_root = self
self.path = ()
self.errors = ErrorList()
self.descendants = {}
for error in errors:
self.add(error)
def add(self, error):
"""
Add an error to the tree.
:param error: :class:`~cerberus.errors.ValidationError`
"""
if not self._path_of_(error):
self.errors.append(error)
self.errors.sort()
else:
super(ErrorTree, self).add(error)
def fetch_errors_from(self, path):
"""
Returns all errors for a particular path.
:param path: :class:`tuple` of :term:`hashable` s.
:rtype: :class:`~cerberus.errors.ErrorList`
"""
node = self.fetch_node_from(path)
if node is not None:
return node.errors
else:
return ErrorList()
def fetch_node_from(self, path):
"""
Returns a node for a path.
:param path: Tuple of :term:`hashable` s.
:rtype: :class:`~cerberus.errors.ErrorTreeNode` or :obj:`None`
"""
context = self
for key in path:
context = context[key]
if context is None:
break
return context
class DocumentErrorTree(ErrorTree):
"""
Implements a dict-like class to query errors by indexes following the structure of a
validated document.
"""
tree_type = 'document'
class SchemaErrorTree(ErrorTree):
"""
Implements a dict-like class to query errors by indexes following the structure of
the used schema.
"""
tree_type = 'schema'
class BaseErrorHandler(object):
"""Base class for all error handlers.
Subclasses are identified as error-handlers with an instance-test."""
def __init__(self, *args, **kwargs):
"""Optionally initialize a new instance."""
pass
def __call__(self, errors):
"""
Returns errors in a handler-specific format.
:param errors: An object containing the errors.
:type errors: :term:`iterable` of
:class:`~cerberus.errors.ValidationError` instances or a
:class:`~cerberus.Validator` instance
"""
raise NotImplementedError
def __iter__(self):
"""Be a superhero and implement an iterator over errors."""
raise NotImplementedError
def add(self, error):
"""
Add an error to the errors' container object of a handler.
:param error: The error to add.
:type error: :class:`~cerberus.errors.ValidationError`
"""
raise NotImplementedError
def emit(self, error):
"""
Optionally emits an error in the handler's format to a stream. Or light a LED,
or even shut down a power plant.
:param error: The error to emit.
:type error: :class:`~cerberus.errors.ValidationError`
"""
pass
def end(self, validator):
"""
Gets called when a validation ends.
:param validator: The calling validator.
:type validator: :class:`~cerberus.Validator`
"""
pass
def extend(self, errors):
"""
Adds all errors to the handler's container object.
:param errors: The errors to add.
:type errors: :term:`iterable` of
:class:`~cerberus.errors.ValidationError` instances
"""
for error in errors:
self.add(error)
def start(self, validator):
"""
Gets called when a validation starts.
:param validator: The calling validator.
:type validator: :class:`~cerberus.Validator`
"""
pass
class ToyErrorHandler(BaseErrorHandler):
def __call__(self, *args, **kwargs):
raise RuntimeError('This is not supposed to happen.')
def clear(self):
pass
def encode_unicode(f):
"""Cerberus error messages expect regular binary strings.
If unicode is used in a ValidationError message can't be printed.
This decorator ensures that if legacy Python is used unicode
strings are encoded before passing to a function.
"""
@wraps(f)
def wrapped(obj, error):
def _encode(value):
"""Helper encoding unicode strings into binary utf-8"""
if isinstance(value, unicode): # noqa: F821
return value.encode('utf-8')
return value
error = copy(error)
error.document_path = _encode(error.document_path)
error.schema_path = _encode(error.schema_path)
error.constraint = _encode(error.constraint)
error.value = _encode(error.value)
error.info = _encode(error.info)
return f(obj, error)
return wrapped if PYTHON_VERSION < 3 else f
class BasicErrorHandler(BaseErrorHandler):
"""
Models cerberus' legacy. Returns a :class:`dict`. When mangled through :class:`str`
a pretty-formatted representation of that tree is returned.
"""
messages = {
0x00: "{0}",
0x01: "document is missing",
0x02: "required field",
0x03: "unknown field",
0x04: "field '{0}' is required",
0x05: "depends on these values: {constraint}",
0x06: "{0} must not be present with '{field}'",
0x21: "'{0}' is not a document, must be a dict",
0x22: "empty values not allowed",
0x23: "null value not allowed",
0x24: "must be of {constraint} type",
0x25: "must be of dict type",
0x26: "length of list should be {0}, it is {1}",
0x27: "min length is {constraint}",
0x28: "max length is {constraint}",
0x41: "value does not match regex '{constraint}'",
0x42: "min value is {constraint}",
0x43: "max value is {constraint}",
0x44: "unallowed value {value}",
0x45: "unallowed values {0}",
0x46: "unallowed value {value}",
0x47: "unallowed values {0}",
0x48: "missing members {0}",
0x61: "field '{field}' cannot be coerced: {0}",
0x62: "field '{field}' cannot be renamed: {0}",
0x63: "field is read-only",
0x64: "default value for '{field}' cannot be set: {0}",
0x81: "mapping doesn't validate subschema: {0}",
0x82: "one or more sequence-items don't validate: {0}",
0x83: "one or more keys of a mapping don't validate: {0}",
0x84: "one or more values in a mapping don't validate: {0}",
0x85: "one or more sequence-items don't validate: {0}",
0x91: "one or more definitions validate",
0x92: "none or more than one rule validate",
0x93: "no definitions validate",
0x94: "one or more definitions don't validate",
}
def __init__(self, tree=None):
self.tree = {} if tree is None else tree
def __call__(self, errors):
self.clear()
self.extend(errors)
return self.pretty_tree
def __str__(self):
return pformat(self.pretty_tree)
@property
def pretty_tree(self):
pretty = deepcopy(self.tree)
for field in pretty:
self._purge_empty_dicts(pretty[field])
return pretty
@encode_unicode
def add(self, error):
# Make sure the original error is not altered with
# error paths specific to the handler.
error = deepcopy(error)
self._rewrite_error_path(error)
if error.is_logic_error:
self._insert_logic_error(error)
elif error.is_group_error:
self._insert_group_error(error)
elif error.code in self.messages:
self._insert_error(
error.document_path, self._format_message(error.field, error)
)
def clear(self):
self.tree = {}
def start(self, validator):
self.clear()
def _format_message(self, field, error):
return self.messages[error.code].format(
*error.info, constraint=error.constraint, field=field, value=error.value
)
def _insert_error(self, path, node):
"""
Adds an error or sub-tree to :attr:tree.
:param path: Path to the error.
:type path: Tuple of strings and integers.
:param node: An error message or a sub-tree.
:type node: String or dictionary.
"""
field = path[0]
if len(path) == 1:
if field in self.tree:
subtree = self.tree[field].pop()
self.tree[field] += [node, subtree]
else:
self.tree[field] = [node, {}]
elif len(path) >= 1:
if field not in self.tree:
self.tree[field] = [{}]
subtree = self.tree[field][-1]
if subtree:
new = self.__class__(tree=copy(subtree))
else:
new = self.__class__()
new._insert_error(path[1:], node)
subtree.update(new.tree)
def _insert_group_error(self, error):
for child_error in error.child_errors:
if child_error.is_logic_error:
self._insert_logic_error(child_error)
elif child_error.is_group_error:
self._insert_group_error(child_error)
else:
self._insert_error(
child_error.document_path,
self._format_message(child_error.field, child_error),
)
def _insert_logic_error(self, error):
field = error.field
self._insert_error(error.document_path, self._format_message(field, error))
for definition_errors in error.definitions_errors.values():
for child_error in definition_errors:
if child_error.is_logic_error:
self._insert_logic_error(child_error)
elif child_error.is_group_error:
self._insert_group_error(child_error)
else:
self._insert_error(
child_error.document_path,
self._format_message(field, child_error),
)
def _purge_empty_dicts(self, error_list):
subtree = error_list[-1]
if not error_list[-1]:
error_list.pop()
else:
for key in subtree:
self._purge_empty_dicts(subtree[key])
def _rewrite_error_path(self, error, offset=0):
"""
Recursively rewrites the error path to correctly represent logic errors
"""
if error.is_logic_error:
self._rewrite_logic_error_path(error, offset)
elif error.is_group_error:
self._rewrite_group_error_path(error, offset)
def _rewrite_group_error_path(self, error, offset=0):
child_start = len(error.document_path) - offset
for child_error in error.child_errors:
relative_path = child_error.document_path[child_start:]
child_error.document_path = error.document_path + relative_path
self._rewrite_error_path(child_error, offset)
def _rewrite_logic_error_path(self, error, offset=0):
child_start = len(error.document_path) - offset
for i, definition_errors in error.definitions_errors.items():
if not definition_errors:
continue
nodename = '%s definition %s' % (error.rule, i)
path = error.document_path + (nodename,)
for child_error in definition_errors:
rel_path = child_error.document_path[child_start:]
child_error.document_path = path + rel_path
self._rewrite_error_path(child_error, offset + 1)
class SchemaErrorHandler(BasicErrorHandler):
messages = BasicErrorHandler.messages.copy()
messages[0x03] = "unknown rule"
-43
View File
@@ -1,43 +0,0 @@
""" Platform-dependent objects """
import sys
if sys.flags.optimize == 2:
raise RuntimeError("Cerberus can't be run with Python's optimization level 2.")
PYTHON_VERSION = float(sys.version_info[0]) + float(sys.version_info[1]) / 10
if PYTHON_VERSION < 3:
_str_type = basestring # noqa: F821
_int_types = (int, long) # noqa: F821
else:
_str_type = str
_int_types = (int,)
if PYTHON_VERSION < 3.3:
from collections import ( # noqa: F401
Callable,
Container,
Hashable,
Iterable,
Mapping,
MutableMapping,
Sequence,
Set,
Sized,
)
else:
from collections.abc import ( # noqa: F401
Callable,
Container,
Hashable,
Iterable,
Mapping,
MutableMapping,
Sequence,
Set,
Sized,
)
-556
View File
@@ -1,556 +0,0 @@
from __future__ import absolute_import
from warnings import warn
from pipenv.vendor.cerberus import errors
from pipenv.vendor.cerberus.platform import (
_str_type,
Callable,
Hashable,
Mapping,
MutableMapping,
Sequence,
)
from pipenv.vendor.cerberus.utils import (
get_Validator_class,
validator_factory,
mapping_hash,
TypeDefinition,
)
class _Abort(Exception):
pass
class SchemaError(Exception):
"""
Raised when the validation schema is missing, has the wrong format or contains
errors."""
pass
class DefinitionSchema(MutableMapping):
"""A dict-subclass for caching of validated schemas."""
def __new__(cls, *args, **kwargs):
if 'SchemaValidator' not in globals():
global SchemaValidator
SchemaValidator = validator_factory('SchemaValidator', SchemaValidatorMixin)
types_mapping = SchemaValidator.types_mapping.copy()
types_mapping.update(
{
'callable': TypeDefinition('callable', (Callable,), ()),
'hashable': TypeDefinition('hashable', (Hashable,), ()),
}
)
SchemaValidator.types_mapping = types_mapping
return super(DefinitionSchema, cls).__new__(cls)
def __init__(self, validator, schema):
"""
:param validator: An instance of Validator-(sub-)class that uses this
schema.
:param schema: A definition-schema as ``dict``. Defaults to an empty
one.
"""
if not isinstance(validator, get_Validator_class()):
raise RuntimeError('validator argument must be a Validator-' 'instance.')
self.validator = validator
if isinstance(schema, _str_type):
schema = validator.schema_registry.get(schema, schema)
if not isinstance(schema, Mapping):
try:
schema = dict(schema)
except Exception:
raise SchemaError(errors.SCHEMA_ERROR_DEFINITION_TYPE.format(schema))
self.validation_schema = SchemaValidationSchema(validator)
self.schema_validator = SchemaValidator(
None,
allow_unknown=self.validation_schema,
error_handler=errors.SchemaErrorHandler,
target_schema=schema,
target_validator=validator,
)
schema = self.expand(schema)
self.validate(schema)
self.schema = schema
def __delitem__(self, key):
_new_schema = self.schema.copy()
try:
del _new_schema[key]
except ValueError:
raise SchemaError("Schema has no field '%s' defined" % key)
except Exception as e:
raise e
else:
del self.schema[key]
def __getitem__(self, item):
return self.schema[item]
def __iter__(self):
return iter(self.schema)
def __len__(self):
return len(self.schema)
def __repr__(self):
return str(self)
def __setitem__(self, key, value):
value = self.expand({0: value})[0]
self.validate({key: value})
self.schema[key] = value
def __str__(self):
if hasattr(self, "schema"):
return str(self.schema)
else:
return "No schema data is set yet."
def copy(self):
return self.__class__(self.validator, self.schema.copy())
@classmethod
def expand(cls, schema):
try:
schema = cls._expand_logical_shortcuts(schema)
schema = cls._expand_subschemas(schema)
except Exception:
pass
# TODO remove this with the next major release
schema = cls._rename_deprecated_rulenames(schema)
return schema
@classmethod
def _expand_logical_shortcuts(cls, schema):
"""
Expand agglutinated rules in a definition-schema.
:param schema: The schema-definition to expand.
:return: The expanded schema-definition.
"""
def is_of_rule(x):
return isinstance(x, _str_type) and x.startswith(
('allof_', 'anyof_', 'noneof_', 'oneof_')
)
for field, rules in schema.items():
for of_rule in [x for x in rules if is_of_rule(x)]:
operator, rule = of_rule.split('_', 1)
rules.update({operator: []})
for value in rules[of_rule]:
rules[operator].append({rule: value})
del rules[of_rule]
return schema
@classmethod
def _expand_subschemas(cls, schema):
def has_schema_rule():
return isinstance(schema[field], Mapping) and 'schema' in schema[field]
def has_mapping_schema():
"""
Tries to determine heuristically if the schema-constraints are aimed to
mappings.
"""
try:
return all(
isinstance(x, Mapping) for x in schema[field]['schema'].values()
)
except TypeError:
return False
for field in schema:
if not has_schema_rule():
pass
elif has_mapping_schema():
schema[field]['schema'] = cls.expand(schema[field]['schema'])
else: # assumes schema-constraints for a sequence
schema[field]['schema'] = cls.expand({0: schema[field]['schema']})[0]
# TODO remove the last two values in the tuple with the next major release
for rule in ('keysrules', 'valuesrules', 'keyschema', 'valueschema'):
if rule in schema[field]:
schema[field][rule] = cls.expand({0: schema[field][rule]})[0]
for rule in ('allof', 'anyof', 'items', 'noneof', 'oneof'):
if rule in schema[field]:
if not isinstance(schema[field][rule], Sequence):
continue
new_rules_definition = []
for item in schema[field][rule]:
new_rules_definition.append(cls.expand({0: item})[0])
schema[field][rule] = new_rules_definition
return schema
def get(self, item, default=None):
return self.schema.get(item, default)
def items(self):
return self.schema.items()
def update(self, schema):
try:
schema = self.expand(schema)
_new_schema = self.schema.copy()
_new_schema.update(schema)
self.validate(_new_schema)
except ValueError:
raise SchemaError(errors.SCHEMA_ERROR_DEFINITION_TYPE.format(schema))
except Exception as e:
raise e
else:
self.schema = _new_schema
# TODO remove with next major release
@staticmethod
def _rename_deprecated_rulenames(schema):
for field, rules in schema.items():
if isinstance(rules, str): # registry reference
continue
for old, new in (
('keyschema', 'keysrules'),
('validator', 'check_with'),
('valueschema', 'valuesrules'),
):
if old not in rules:
continue
if new in rules:
raise RuntimeError(
"The rule '{new}' is also present with its old "
"name '{old}' in the same set of rules."
)
warn(
"The rule '{old}' was renamed to '{new}'. The old name will "
"not be available in the next major release of "
"Cerberus.".format(old=old, new=new),
DeprecationWarning,
)
schema[field][new] = schema[field][old]
schema[field].pop(old)
return schema
def regenerate_validation_schema(self):
self.validation_schema = SchemaValidationSchema(self.validator)
def validate(self, schema=None):
"""
Validates a schema that defines rules against supported rules.
:param schema: The schema to be validated as a legal cerberus schema
according to the rules of the assigned Validator object.
Raises a :class:`~cerberus.base.SchemaError` when an invalid
schema is encountered.
"""
if schema is None:
schema = self.schema
_hash = (mapping_hash(schema), mapping_hash(self.validator.types_mapping))
if _hash not in self.validator._valid_schemas:
self._validate(schema)
self.validator._valid_schemas.add(_hash)
def _validate(self, schema):
if isinstance(schema, _str_type):
schema = self.validator.schema_registry.get(schema, schema)
test_schema = {}
for field, rules in schema.items():
if isinstance(rules, _str_type):
test_schema[field] = rules_set_registry.get(rules, rules)
else:
test_rules = {}
for rule, constraint in rules.items():
test_rules[rule.replace(" ", "_")] = constraint
test_schema[field] = test_rules
if not self.schema_validator(test_schema, normalize=False):
raise SchemaError(self.schema_validator.errors)
class UnvalidatedSchema(DefinitionSchema):
def __init__(self, schema={}):
if not isinstance(schema, Mapping):
schema = dict(schema)
self.schema = schema
def validate(self, schema):
pass
def copy(self):
# Override ancestor's copy, because
# UnvalidatedSchema does not have .validator:
return self.__class__(self.schema.copy())
class SchemaValidationSchema(UnvalidatedSchema):
def __init__(self, validator):
self.schema = {
'allow_unknown': False,
'schema': validator.rules,
'type': 'dict',
}
class SchemaValidatorMixin(object):
"""
This validator mixin provides mechanics to validate schemas passed to a Cerberus
validator.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault('known_rules_set_refs', set())
kwargs.setdefault('known_schema_refs', set())
super(SchemaValidatorMixin, self).__init__(*args, **kwargs)
@property
def known_rules_set_refs(self):
"""The encountered references to rules set registry items."""
return self._config['known_rules_set_refs']
@property
def known_schema_refs(self):
"""The encountered references to schema registry items."""
return self._config['known_schema_refs']
@property
def target_schema(self):
"""The schema that is being validated."""
return self._config['target_schema']
@property
def target_validator(self):
"""The validator whose schema is being validated."""
return self._config['target_validator']
def _check_with_bulk_schema(self, field, value):
# resolve schema registry reference
if isinstance(value, _str_type):
if value in self.known_rules_set_refs:
return
else:
self.known_rules_set_refs.add(value)
definition = self.target_validator.rules_set_registry.get(value)
if definition is None:
self._error(field, 'Rules set definition %s not found.' % value)
return
else:
value = definition
_hash = (
mapping_hash({'turing': value}),
mapping_hash(self.target_validator.types_mapping),
)
if _hash in self.target_validator._valid_schemas:
return
validator = self._get_child_validator(
document_crumb=field,
allow_unknown=False,
schema=self.target_validator.rules,
)
validator(value, normalize=False)
if validator._errors:
self._error(validator._errors)
else:
self.target_validator._valid_schemas.add(_hash)
def _check_with_dependencies(self, field, value):
if isinstance(value, _str_type):
pass
elif isinstance(value, Mapping):
validator = self._get_child_validator(
document_crumb=field,
schema={'valuesrules': {'type': 'list'}},
allow_unknown=True,
)
if not validator(value, normalize=False):
self._error(validator._errors)
elif isinstance(value, Sequence):
if not all(isinstance(x, Hashable) for x in value):
path = self.document_path + (field,)
self._error(path, 'All dependencies must be a hashable type.')
def _check_with_items(self, field, value):
for i, schema in enumerate(value):
self._check_with_bulk_schema((field, i), schema)
def _check_with_schema(self, field, value):
try:
value = self._handle_schema_reference_for_validator(field, value)
except _Abort:
return
_hash = (mapping_hash(value), mapping_hash(self.target_validator.types_mapping))
if _hash in self.target_validator._valid_schemas:
return
validator = self._get_child_validator(
document_crumb=field, schema=None, allow_unknown=self.root_allow_unknown
)
validator(self._expand_rules_set_refs(value), normalize=False)
if validator._errors:
self._error(validator._errors)
else:
self.target_validator._valid_schemas.add(_hash)
def _check_with_type(self, field, value):
value = set((value,)) if isinstance(value, _str_type) else set(value)
invalid_constraints = value - set(self.target_validator.types)
if invalid_constraints:
self._error(
field, 'Unsupported types: {}'.format(', '.join(invalid_constraints))
)
def _expand_rules_set_refs(self, schema):
result = {}
for k, v in schema.items():
if isinstance(v, _str_type):
result[k] = self.target_validator.rules_set_registry.get(v)
else:
result[k] = v
return result
def _handle_schema_reference_for_validator(self, field, value):
if not isinstance(value, _str_type):
return value
if value in self.known_schema_refs:
raise _Abort
self.known_schema_refs.add(value)
definition = self.target_validator.schema_registry.get(value)
if definition is None:
path = self.document_path + (field,)
self._error(path, 'Schema definition {} not found.'.format(value))
raise _Abort
return definition
def _validate_logical(self, rule, field, value):
"""{'allowed': ('allof', 'anyof', 'noneof', 'oneof')}"""
if not isinstance(value, Sequence):
self._error(field, errors.BAD_TYPE)
return
validator = self._get_child_validator(
document_crumb=rule,
allow_unknown=False,
schema=self.target_validator.validation_rules,
)
for constraints in value:
_hash = (
mapping_hash({'turing': constraints}),
mapping_hash(self.target_validator.types_mapping),
)
if _hash in self.target_validator._valid_schemas:
continue
validator(constraints, normalize=False)
if validator._errors:
self._error(validator._errors)
else:
self.target_validator._valid_schemas.add(_hash)
####
class Registry(object):
"""
A registry to store and retrieve schemas and parts of it by a name that can be used
in validation schemas.
:param definitions: Optional, initial definitions.
:type definitions: any :term:`mapping`
"""
def __init__(self, definitions={}):
self._storage = {}
self.extend(definitions)
def add(self, name, definition):
"""
Register a definition to the registry. Existing definitions are replaced
silently.
:param name: The name which can be used as reference in a validation
schema.
:type name: :class:`str`
:param definition: The definition.
:type definition: any :term:`mapping`
"""
self._storage[name] = self._expand_definition(definition)
def all(self):
"""
Returns a :class:`dict` with all registered definitions mapped to their name.
"""
return self._storage
def clear(self):
"""Purge all definitions in the registry."""
self._storage.clear()
def extend(self, definitions):
"""
Add several definitions at once. Existing definitions are
replaced silently.
:param definitions: The names and definitions.
:type definitions: a :term:`mapping` or an :term:`iterable` with
two-value :class:`tuple` s
"""
for name, definition in dict(definitions).items():
self.add(name, definition)
def get(self, name, default=None):
"""
Retrieve a definition from the registry.
:param name: The reference that points to the definition.
:type name: :class:`str`
:param default: Return value if the reference isn't registered.
"""
return self._storage.get(name, default)
def remove(self, *names):
"""
Unregister definitions from the registry.
:param names: The names of the definitions that are to be
unregistered.
"""
for name in names:
self._storage.pop(name, None)
class SchemaRegistry(Registry):
@classmethod
def _expand_definition(cls, definition):
return DefinitionSchema.expand(definition)
class RulesSetRegistry(Registry):
@classmethod
def _expand_definition(cls, definition):
return DefinitionSchema.expand({0: definition})[0]
schema_registry, rules_set_registry = SchemaRegistry(), RulesSetRegistry()
-132
View File
@@ -1,132 +0,0 @@
from __future__ import absolute_import
from collections import namedtuple
from pipenv.vendor.cerberus.platform import _int_types, _str_type, Mapping, Sequence, Set
TypeDefinition = namedtuple('TypeDefinition', 'name,included_types,excluded_types')
"""
This class is used to define types that can be used as value in the
:attr:`~cerberus.Validator.types_mapping` property.
The ``name`` should be descriptive and match the key it is going to be assigned
to.
A value that is validated against such definition must be an instance of any of
the types contained in ``included_types`` and must not match any of the types
contained in ``excluded_types``.
"""
def compare_paths_lt(x, y):
min_length = min(len(x), len(y))
if x[:min_length] == y[:min_length]:
return len(x) == min_length
for i in range(min_length):
a, b = x[i], y[i]
for _type in (_int_types, _str_type, tuple):
if isinstance(a, _type):
if isinstance(b, _type):
break
else:
return True
if a == b:
continue
elif a < b:
return True
else:
return False
raise RuntimeError
def drop_item_from_tuple(t, i):
return t[:i] + t[i + 1 :]
def get_Validator_class():
global Validator
if 'Validator' not in globals():
from pipenv.vendor.cerberus.validator import Validator
return Validator
def mapping_hash(schema):
return hash(mapping_to_frozenset(schema))
def mapping_to_frozenset(mapping):
"""
Be aware that this treats any sequence type with the equal members as equal. As it
is used to identify equality of schemas, this can be considered okay as definitions
are semantically equal regardless the container type.
"""
aggregation = {}
for key, value in mapping.items():
if isinstance(value, Mapping):
aggregation[key] = mapping_to_frozenset(value)
elif isinstance(value, Sequence):
value = list(value)
for i, item in enumerate(value):
if isinstance(item, Mapping):
value[i] = mapping_to_frozenset(item)
aggregation[key] = tuple(value)
elif isinstance(value, Set):
aggregation[key] = frozenset(value)
else:
aggregation[key] = value
return frozenset(aggregation.items())
def quote_string(value):
if isinstance(value, _str_type):
return '"%s"' % value
else:
return value
class readonly_classproperty(property):
def __get__(self, instance, owner):
return super(readonly_classproperty, self).__get__(owner)
def __set__(self, instance, value):
raise RuntimeError('This is a readonly class property.')
def __delete__(self, instance):
raise RuntimeError('This is a readonly class property.')
def validator_factory(name, bases=None, namespace={}):
"""
Dynamically create a :class:`~cerberus.Validator` subclass.
Docstrings of mixin-classes will be added to the resulting class' one if ``__doc__``
is not in :obj:`namespace`.
:param name: The name of the new class.
:type name: :class:`str`
:param bases: Class(es) with additional and overriding attributes.
:type bases: :class:`tuple` of or a single :term:`class`
:param namespace: Attributes for the new class.
:type namespace: :class:`dict`
:return: The created class.
"""
Validator = get_Validator_class()
if bases is None:
bases = (Validator,)
elif isinstance(bases, tuple):
bases += (Validator,)
else:
bases = (bases, Validator)
docstrings = [x.__doc__ for x in bases if x.__doc__]
if len(docstrings) > 1 and '__doc__' not in namespace:
namespace.update({'__doc__': '\n'.join(docstrings)})
return type(name, bases, namespace)
File diff suppressed because it is too large Load Diff
+1 -2
View File
@@ -1,5 +1,4 @@
attrs==23.1.0
cerberus==1.3.4
click-didyoumean==0.3.0
click==8.1.3
colorama==0.4.6
@@ -8,7 +7,7 @@ markupsafe==2.1.2
pep517==0.13.0
pexpect==4.8.0
pipdeptree==2.7.0
plette[validation]==0.4.4
plette==0.4.4
ptyprocess==0.7.0
pydantic==1.10.7
python-dotenv==1.0.0
@@ -14,6 +14,7 @@ pytestmark = pytest.mark.skipif(
@pytest.mark.utils
@pytest.mark.skipif(os.name != "nt", reason="Windows test only")
@mock.patch('os.path.isfile')
@mock.patch('shutil.which')
def test_find_windows_executable_when_not_found(mocked_which, mocked_isfile):
@@ -32,6 +33,7 @@ def test_find_windows_executable_when_not_found(mocked_which, mocked_isfile):
@pytest.mark.utils
@pytest.mark.skipif(os.name != "nt", reason="Windows test only")
@mock.patch('os.path.isfile')
@mock.patch('shutil.which')
def test_find_windows_executable_when_found(mocked_which, mocked_isfile):