Merge pull request #5467 from pypa/drop-wheel-from-vendor

vendor: drop more unused code
This commit is contained in:
Oz N Tiram
2022-11-18 10:29:31 +01:00
committed by GitHub
33 changed files with 5 additions and 6624 deletions
+2
View File
@@ -0,0 +1,2 @@
* Drop unused code from cerberus
* Drop unused module wheel
-4
View File
@@ -1,4 +0,0 @@
from pathlib import Path
DOCUMENTS_PATH = Path(__file__).parent / "documents"
@@ -1,212 +0,0 @@
"""
some notes regarding this test suite:
- results are only comparable using the semantically equal schema against and
identical set of documents in the same execution environment
- the module can be executed to generate a new set of test documents
- it is intended to detect *significant* changes in validation time
- benchmarks should run with as few other processes running on the system as
possible (e.g. an Alpine Linux on bare metal w/o a Desktop environment)
"""
import json
from collections import Counter
from pathlib import Path
from random import choice, randrange
from typing import Callable, List
from pytest import mark
from pipenv.vendor.cerberus import rules_set_registry, schema_registry, TypeDefinition, Validator
from pipenv.vendor.cerberus.benchmarks import DOCUMENTS_PATH
rules_set_registry.add("path_rules", {"coerce": Path, "type": "path"})
schema_registry.add(
"field_3_schema",
{
# an outer rule requires all fields' values to be a list
"field_31": {"contains": 0, "empty": False},
"field_32": {
"default": [None, None, None],
"items": [
{"type": "integer"},
{"type": "string"},
{"type": ["integer", "string"]},
],
"schema": {"nullable": True},
},
},
)
def schema_1_field_3_allow_unknown_check_with(field, value, error):
if len(value) > 9:
error(field, "Requires a smaller list.")
schema_1 = {
"field_1": {
"type": "dict",
"required": True,
"allow_unknown": True,
"keysrules": {"regex": r"field_1[12345]"},
"minlength": 3,
"maxlength": 5,
"schema": {
"field_11": {
"type": "integer",
"allowed": list(range(100)),
"dependencies": {"field_12": 0, "^field_1.field_13": 0},
},
"field_12": {
"type": "integer",
"default_setter": lambda _: 1,
"forbidden": (1,),
},
"field_13": {"type": "integer"},
"field_14": {"rename": "field_13"},
},
},
"field_2": {
"type": "dict",
"allow_unknown": False,
"schema": {
"field_21": {
"type": "integer",
"coerce": [str.strip, int],
"min": 9,
"max": 89,
"anyof": [{"dependencies": "field_22"}, {"dependencies": "field_23"}],
},
"field_22": {"excludes": "field_23", "nullable": True},
"field_23": {"nullable": True},
},
},
"field_3": {
"allow_unknown": {"check_with": schema_1_field_3_allow_unknown_check_with},
"valuesrules": {"type": "list"},
"require_all": True,
"schema": "field_3_schema",
},
"field_4": "path_rules",
}
def init_validator():
class TestValidator(Validator):
types_mapping = {
**Validator.types_mapping,
"path": TypeDefinition("path", (Path,), ()),
}
return TestValidator(schema_1, purge_unknown=True)
def load_documents():
with (DOCUMENTS_PATH / "overall_documents_1.json").open() as f:
documents = json.load(f)
return documents
def validate_documents(init_validator: Callable, documents: List[dict]):
doc_count = failed_count = 0
error_paths = Counter()
validator = init_validator()
def count_errors(errors):
if errors is None:
return
for error in errors:
if error.is_group_error:
count_errors(error.child_errors)
else:
error_paths[error.schema_path] += 1
for document in documents:
if validator.validated(document) is None:
failed_count += 1
count_errors(validator._errors)
doc_count += 1
print(
f"{failed_count} out of {doc_count} documents failed with "
f"{len(error_paths)} different error leafs."
)
print("Top 3 errors, excluding container errors:")
for path, count in error_paths.most_common(3):
print(f"{count}: {path}")
@mark.benchmark(group="overall-1")
def test_overall_performance_1(benchmark):
benchmark.pedantic(validate_documents, (init_validator, load_documents()), rounds=5)
#
def generate_sample_document_1() -> dict:
result = {}
for i in (1, 2, 3, 4, 5):
if randrange(100):
result[f"field_{i}"] = globals()[f"generate_document_1_field_{i}"]()
return result
def generate_document_1_field_1() -> dict:
result = {"field_11": randrange(100), "field_13": 0}
if randrange(100):
result["field_12"] = 0
if not randrange(100):
result["field_14"] = None
if randrange(100):
result["field_15"] = None
return result
def generate_document_1_field_2() -> dict:
x = "*" if not randrange(50) else " "
result = {"field_21": x + str(randrange(100)) + x}
if randrange(100):
result["field_22"] = None
if "field_22" in result and not randrange(100):
result["field_23"] = None
return result
def generate_document_1_field_3() -> dict:
result = {}
if randrange(100):
result["field_31"] = [randrange(2) for _ in range(randrange(20))]
else:
result["field_31"] = None
if randrange(100):
result["field_32"] = [
choice((0, 0, 0, 0, 0, 0, 0, 0, "", None)),
choice(("", "", "", "", "", "", "", "", 0, None)),
choice((0, 0, 0, 0, "", "", "", "", None)),
]
if not randrange(10):
result["3_unknown"] = [0] * (randrange(10) + 1)
return result
def generate_document_1_field_4():
return "/foo/bar" if randrange(100) else 0
def generate_document_1_field_5():
return None
def write_sample_documents():
with (DOCUMENTS_PATH / "overall_documents_1.json").open("wt") as f:
json.dump([generate_sample_document_1() for _ in range(10_000)], f)
if __name__ == "__main__":
write_sample_documents()
@@ -1,54 +0,0 @@
import json
from collections import Counter
from typing import Callable, List
from typing import Counter as CounterType
from pytest import mark
from pipenv.vendor.cerberus import Validator
from pipenv.vendor.cerberus.benchmarks.schemas.overalll_schema_2 import product_schema
from pipenv.vendor.cerberus.benchmarks import DOCUMENTS_PATH
def init_validator():
return Validator(product_schema, purge_unknown=True)
def load_documents():
with (DOCUMENTS_PATH / "overall_documents_2.json").open() as f:
documents = json.load(f)
return documents
def validate_documents(init_validator: Callable, documents: List[dict]) -> None:
doc_count = failed_count = 0
error_paths: CounterType[tuple] = Counter()
validator = init_validator()
def count_errors(errors):
if errors is None:
return
for error in errors:
if error.is_group_error:
count_errors(error.child_errors)
else:
error_paths[error.schema_path] += 1
for document in documents:
if validator.validated(document) is None:
failed_count += 1
count_errors(validator._errors)
doc_count += 1
print(
f"{failed_count} out of {doc_count} documents failed with "
f"{len(error_paths)} different error leafs."
)
print("Top 3 errors, excluding container errors:")
for path, count in error_paths.most_common(3):
print(f"{count}: {path}")
@mark.benchmark(group="overall-2")
def test_overall_performance_2(benchmark):
benchmark.pedantic(validate_documents, (init_validator, load_documents()), rounds=5)
-159
View File
@@ -1,159 +0,0 @@
# -*- coding: utf-8 -*-
import re
import pytest
from pipenv.vendor.cerberus import errors, Validator, SchemaError, DocumentError
from pipenv.vendor.cerberus.tests.conftest import sample_schema
def assert_exception(exception, document={}, schema=None, validator=None, msg=None):
"""
Tests whether a specific exception is raised. Optionally also tests whether the
exception message is as expected.
"""
if validator is None:
validator = Validator()
if msg is None:
with pytest.raises(exception):
validator(document, schema)
else:
with pytest.raises(exception, match=re.escape(msg)):
validator(document, schema)
def assert_schema_error(*args):
"""Tests whether a validation raises an exception due to a malformed schema."""
assert_exception(SchemaError, *args)
def assert_document_error(*args):
"""Tests whether a validation raises an exception due to a malformed document."""
assert_exception(DocumentError, *args)
def assert_fail(
document,
schema=None,
validator=None,
update=False,
error=None,
errors=None,
child_errors=None,
):
"""Tests whether a validation fails."""
if validator is None:
validator = Validator(sample_schema)
result = validator(document, schema, update)
assert isinstance(result, bool)
assert not result
actual_errors = validator._errors
assert not (error is not None and errors is not None)
assert not (errors is not None and child_errors is not None), (
'child_errors can only be tested in ' 'conjunction with the error parameter'
)
assert not (child_errors is not None and error is None)
if error is not None:
assert len(actual_errors) == 1
assert_has_error(actual_errors, *error)
if child_errors is not None:
assert len(actual_errors[0].child_errors) == len(child_errors)
assert_has_errors(actual_errors[0].child_errors, child_errors)
elif errors is not None:
assert len(actual_errors) == len(errors)
assert_has_errors(actual_errors, errors)
return actual_errors
def assert_success(document, schema=None, validator=None, update=False):
"""Tests whether a validation succeeds."""
if validator is None:
validator = Validator(sample_schema)
result = validator(document, schema, update)
assert isinstance(result, bool)
if not result:
raise AssertionError(validator.errors)
def assert_has_error(_errors, d_path, s_path, error_def, constraint, info=()):
if not isinstance(d_path, tuple):
d_path = (d_path,)
if not isinstance(info, tuple):
info = (info,)
assert isinstance(_errors, errors.ErrorList)
for i, error in enumerate(_errors):
assert isinstance(error, errors.ValidationError)
try:
assert error.document_path == d_path
assert error.schema_path == s_path
assert error.code == error_def.code
assert error.rule == error_def.rule
assert error.constraint == constraint
if not error.is_group_error:
assert error.info == info
except AssertionError:
pass
except Exception:
raise
else:
break
else:
raise AssertionError(
"""
Error with properties:
document_path={doc_path}
schema_path={schema_path}
code={code}
constraint={constraint}
info={info}
not found in errors:
{errors}
""".format(
doc_path=d_path,
schema_path=s_path,
code=hex(error.code),
info=info,
constraint=constraint,
errors=_errors,
)
)
return i
def assert_has_errors(_errors, _exp_errors):
assert isinstance(_exp_errors, list)
for error in _exp_errors:
assert isinstance(error, tuple)
assert_has_error(_errors, *error)
def assert_not_has_error(_errors, *args, **kwargs):
try:
assert_has_error(_errors, *args, **kwargs)
except AssertionError:
pass
except Exception as e:
raise e
else:
raise AssertionError('An unexpected error occurred.')
def assert_bad_type(field, data_type, value):
assert_fail(
{field: value}, error=(field, (field, 'type'), errors.BAD_TYPE, data_type)
)
def assert_normalized(document, expected, schema=None, validator=None):
if validator is None:
validator = Validator(sample_schema)
assert_success(document, schema, validator)
assert validator.document == expected
-81
View File
@@ -1,81 +0,0 @@
# -*- coding: utf-8 -*-
from copy import deepcopy
import pytest
from pipenv.vendor.cerberus import Validator
@pytest.fixture
def document():
return deepcopy(sample_document)
@pytest.fixture
def schema():
return deepcopy(sample_schema)
@pytest.fixture
def validator():
return Validator(sample_schema)
sample_schema = {
'a_string': {'type': 'string', 'minlength': 2, 'maxlength': 10},
'a_binary': {'type': 'binary', 'minlength': 2, 'maxlength': 10},
'a_nullable_integer': {'type': 'integer', 'nullable': True},
'an_integer': {'type': 'integer', 'min': 1, 'max': 100},
'a_restricted_integer': {'type': 'integer', 'allowed': [-1, 0, 1]},
'a_boolean': {'type': 'boolean', 'meta': 'can haz two distinct states'},
'a_datetime': {'type': 'datetime', 'meta': {'format': '%a, %d. %b %Y'}},
'a_float': {'type': 'float', 'min': 1, 'max': 100},
'a_number': {'type': 'number', 'min': 1, 'max': 100},
'a_set': {'type': 'set'},
'one_or_more_strings': {'type': ['string', 'list'], 'schema': {'type': 'string'}},
'a_regex_email': {
'type': 'string',
'regex': r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$',
},
'a_readonly_string': {'type': 'string', 'readonly': True},
'a_restricted_string': {'type': 'string', 'allowed': ['agent', 'client', 'vendor']},
'an_array': {'type': 'list', 'allowed': ['agent', 'client', 'vendor']},
'an_array_from_set': {
'type': 'list',
'allowed': set(['agent', 'client', 'vendor']),
},
'a_list_of_dicts': {
'type': 'list',
'schema': {
'type': 'dict',
'schema': {
'sku': {'type': 'string'},
'price': {'type': 'integer', 'required': True},
},
},
},
'a_list_of_values': {
'type': 'list',
'items': [{'type': 'string'}, {'type': 'integer'}],
},
'a_list_of_integers': {'type': 'list', 'schema': {'type': 'integer'}},
'a_dict': {
'type': 'dict',
'schema': {
'address': {'type': 'string'},
'city': {'type': 'string', 'required': True},
},
},
'a_dict_with_valuesrules': {'type': 'dict', 'valuesrules': {'type': 'integer'}},
'a_list_length': {
'type': 'list',
'schema': {'type': 'integer'},
'minlength': 2,
'maxlength': 5,
},
'a_nullable_field_without_type': {'nullable': True},
'a_not_nullable_field_without_type': {},
}
sample_document = {'name': 'john doe'}
-111
View File
@@ -1,111 +0,0 @@
# -*- coding: utf-8 -*-
from decimal import Decimal
from pipenv.patched.pip._vendor.pkg_resources import Distribution, DistributionNotFound
from pytest import mark
from pipenv.vendor.cerberus import TypeDefinition, Validator
from pipenv.vendor.cerberus.tests import assert_fail, assert_success
from pipenv.vendor.cerberus.utils import validator_factory
from pipenv.vendor.cerberus.validator import BareValidator
from pipenv.vendor.cerberus.platform import PYTHON_VERSION
if PYTHON_VERSION > 3 and PYTHON_VERSION < 3.4:
from imp import reload
elif PYTHON_VERSION >= 3.4:
from importlib import reload
else:
pass # Python 2.x
def test_pkgresources_version(monkeypatch):
def create_fake_distribution(name):
return Distribution(project_name="cerberus", version="1.2.3")
with monkeypatch.context() as m:
cerberus = __import__("cerberus")
m.setattr("pkg_resources.get_distribution", create_fake_distribution)
reload(cerberus)
assert cerberus.__version__ == "1.2.3"
def test_version_not_found(monkeypatch):
def raise_distribution_not_found(name):
raise DistributionNotFound("pkg_resources cannot get distribution")
with monkeypatch.context() as m:
cerberus = __import__("cerberus")
m.setattr("pkg_resources.get_distribution", raise_distribution_not_found)
reload(cerberus)
assert cerberus.__version__ == "unknown"
def test_clear_cache(validator):
assert len(validator._valid_schemas) > 0
validator.clear_caches()
assert len(validator._valid_schemas) == 0
def test_docstring(validator):
assert validator.__doc__
# Test that testing with the sample schema works as expected
# as there might be rules with side-effects in it
@mark.parametrize(
"test,document",
((assert_fail, {"an_integer": 60}), (assert_success, {"an_integer": 110})),
)
def test_that_test_fails(test, document):
try:
test(document)
except AssertionError:
pass
else:
raise AssertionError("test didn't fail")
def test_dynamic_types():
decimal_type = TypeDefinition("decimal", (Decimal,), ())
document = {"measurement": Decimal(0)}
schema = {"measurement": {"type": "decimal"}}
validator = Validator()
validator.types_mapping["decimal"] = decimal_type
assert_success(document, schema, validator)
class MyValidator(Validator):
types_mapping = Validator.types_mapping.copy()
types_mapping["decimal"] = decimal_type
validator = MyValidator()
assert_success(document, schema, validator)
def test_mro():
assert Validator.__mro__ == (Validator, BareValidator, object), Validator.__mro__
def test_mixin_init():
class Mixin(object):
def __init__(self, *args, **kwargs):
kwargs["test"] = True
super(Mixin, self).__init__(*args, **kwargs)
MyValidator = validator_factory("MyValidator", Mixin)
validator = MyValidator()
assert validator._config["test"]
def test_sub_init():
class MyValidator(Validator):
def __init__(self, *args, **kwargs):
kwargs["test"] = True
super(MyValidator, self).__init__(*args, **kwargs)
validator = MyValidator()
assert validator._config["test"]
-101
View File
@@ -1,101 +0,0 @@
# -*- coding: utf-8 -*-
from pytest import mark
import pipenv.vendor.cerberus as cerberus
from pipenv.vendor.cerberus.tests import assert_fail, assert_success
from pipenv.vendor.cerberus.tests.conftest import sample_schema
def test_contextual_data_preservation():
class InheritedValidator(cerberus.Validator):
def __init__(self, *args, **kwargs):
if 'working_dir' in kwargs:
self.working_dir = kwargs['working_dir']
super(InheritedValidator, self).__init__(*args, **kwargs)
def _validate_type_test(self, value):
if self.working_dir:
return True
assert 'test' in InheritedValidator.types
v = InheritedValidator(
{'test': {'type': 'list', 'schema': {'type': 'test'}}}, working_dir='/tmp'
)
assert_success({'test': ['foo']}, validator=v)
def test_docstring_parsing():
class CustomValidator(cerberus.Validator):
def _validate_foo(self, argument, field, value):
"""{'type': 'zap'}"""
pass
def _validate_bar(self, value):
"""
Test the barreness of a value.
The rule's arguments are validated against this schema:
{'type': 'boolean'}
"""
pass
assert 'foo' in CustomValidator.validation_rules
assert 'bar' in CustomValidator.validation_rules
# TODO remove 'validator' as rule parameter with the next major release
@mark.parametrize('rule', ('check_with', 'validator'))
def test_check_with_method(rule):
# https://github.com/pyeve/cerberus/issues/265
class MyValidator(cerberus.Validator):
def _check_with_oddity(self, field, value):
if not value & 1:
self._error(field, "Must be an odd number")
v = MyValidator(schema={'amount': {rule: 'oddity'}})
assert_success(document={'amount': 1}, validator=v)
assert_fail(
document={'amount': 2},
validator=v,
error=('amount', (), cerberus.errors.CUSTOM, None, ('Must be an odd number',)),
)
# TODO remove test with the next major release
@mark.parametrize('rule', ('check_with', 'validator'))
def test_validator_method(rule):
class MyValidator(cerberus.Validator):
def _validator_oddity(self, field, value):
if not value & 1:
self._error(field, "Must be an odd number")
v = MyValidator(schema={'amount': {rule: 'oddity'}})
assert_success(document={'amount': 1}, validator=v)
assert_fail(
document={'amount': 2},
validator=v,
error=('amount', (), cerberus.errors.CUSTOM, None, ('Must be an odd number',)),
)
def test_schema_validation_can_be_disabled_in_schema_setter():
class NonvalidatingValidator(cerberus.Validator):
"""
Skips schema validation to speed up initialization
"""
@cerberus.Validator.schema.setter
def schema(self, schema):
if schema is None:
self._schema = None
elif self.is_child:
self._schema = schema
elif isinstance(schema, cerberus.schema.DefinitionSchema):
self._schema = schema
else:
self._schema = cerberus.schema.UnvalidatedSchema(schema)
v = NonvalidatingValidator(schema=sample_schema)
assert v.validate(document={'an_integer': 1})
assert not v.validate(document={'an_integer': 'a'})
-365
View File
@@ -1,365 +0,0 @@
# -*- coding: utf-8 -*-
from pipenv.vendor.cerberus import Validator, errors
from pipenv.vendor.cerberus.tests import assert_fail
ValidationError = errors.ValidationError
def test__error_1():
v = Validator(schema={'foo': {'type': 'string'}})
v.document = {'foo': 42}
v._error('foo', errors.BAD_TYPE, 'string')
error = v._errors[0]
assert error.document_path == ('foo',)
assert error.schema_path == ('foo', 'type')
assert error.code == 0x24
assert error.rule == 'type'
assert error.constraint == 'string'
assert error.value == 42
assert error.info == ('string',)
assert not error.is_group_error
assert not error.is_logic_error
def test__error_2():
v = Validator(schema={'foo': {'keysrules': {'type': 'integer'}}})
v.document = {'foo': {'0': 'bar'}}
v._error('foo', errors.KEYSRULES, ())
error = v._errors[0]
assert error.document_path == ('foo',)
assert error.schema_path == ('foo', 'keysrules')
assert error.code == 0x83
assert error.rule == 'keysrules'
assert error.constraint == {'type': 'integer'}
assert error.value == {'0': 'bar'}
assert error.info == ((),)
assert error.is_group_error
assert not error.is_logic_error
def test__error_3():
valids = [
{'type': 'string', 'regex': '0x[0-9a-f]{2}'},
{'type': 'integer', 'min': 0, 'max': 255},
]
v = Validator(schema={'foo': {'oneof': valids}})
v.document = {'foo': '0x100'}
v._error('foo', errors.ONEOF, (), 0, 2)
error = v._errors[0]
assert error.document_path == ('foo',)
assert error.schema_path == ('foo', 'oneof')
assert error.code == 0x92
assert error.rule == 'oneof'
assert error.constraint == valids
assert error.value == '0x100'
assert error.info == ((), 0, 2)
assert error.is_group_error
assert error.is_logic_error
def test_error_tree_from_subschema(validator):
schema = {'foo': {'schema': {'bar': {'type': 'string'}}}}
document = {'foo': {'bar': 0}}
assert_fail(document, schema, validator=validator)
d_error_tree = validator.document_error_tree
s_error_tree = validator.schema_error_tree
assert 'foo' in d_error_tree
assert len(d_error_tree['foo'].errors) == 1, d_error_tree['foo']
assert d_error_tree['foo'].errors[0].code == errors.MAPPING_SCHEMA.code
assert 'bar' in d_error_tree['foo']
assert d_error_tree['foo']['bar'].errors[0].value == 0
assert d_error_tree.fetch_errors_from(('foo', 'bar'))[0].value == 0
assert 'foo' in s_error_tree
assert 'schema' in s_error_tree['foo']
assert 'bar' in s_error_tree['foo']['schema']
assert 'type' in s_error_tree['foo']['schema']['bar']
assert s_error_tree['foo']['schema']['bar']['type'].errors[0].value == 0
assert (
s_error_tree.fetch_errors_from(('foo', 'schema', 'bar', 'type'))[0].value == 0
)
def test_error_tree_from_anyof(validator):
schema = {'foo': {'anyof': [{'type': 'string'}, {'type': 'integer'}]}}
document = {'foo': []}
assert_fail(document, schema, validator=validator)
d_error_tree = validator.document_error_tree
s_error_tree = validator.schema_error_tree
assert 'foo' in d_error_tree
assert d_error_tree['foo'].errors[0].value == []
assert 'foo' in s_error_tree
assert 'anyof' in s_error_tree['foo']
assert 0 in s_error_tree['foo']['anyof']
assert 1 in s_error_tree['foo']['anyof']
assert 'type' in s_error_tree['foo']['anyof'][0]
assert s_error_tree['foo']['anyof'][0]['type'].errors[0].value == []
def test_nested_error_paths(validator):
# interpreters of the same version on some platforms showed different sort results
# over various runs:
def assert_has_all_errors(errors, *ref_errs):
for ref_err in ref_errs:
for error in errors:
if error == ref_err:
break
else:
raise AssertionError
schema = {
'a_dict': {
'keysrules': {'type': 'integer'},
'valuesrules': {'regex': '[a-z]*'},
},
'a_list': {'schema': {'type': 'string', 'oneof_regex': ['[a-z]*$', '[A-Z]*']}},
}
document = {
'a_dict': {0: 'abc', 'one': 'abc', 2: 'aBc', 'three': 'abC'},
'a_list': [0, 'abc', 'abC'],
}
assert_fail(document, schema, validator=validator)
det = validator.document_error_tree
set = validator.schema_error_tree
assert len(det.errors) == 0
assert len(set.errors) == 0
assert len(det['a_dict'].errors) == 2
assert len(set['a_dict'].errors) == 0
assert det['a_dict'][0] is None
assert len(det['a_dict']['one'].errors) == 1
assert len(det['a_dict'][2].errors) == 1
assert len(det['a_dict']['three'].errors) == 2
assert len(set['a_dict']['keysrules'].errors) == 1
assert len(set['a_dict']['valuesrules'].errors) == 1
assert len(set['a_dict']['keysrules']['type'].errors) == 2
assert len(set['a_dict']['valuesrules']['regex'].errors) == 2
ref_err1 = ValidationError(
('a_dict', 'one'),
('a_dict', 'keysrules', 'type'),
errors.BAD_TYPE.code,
'type',
'integer',
'one',
(),
)
ref_err2 = ValidationError(
('a_dict', 2),
('a_dict', 'valuesrules', 'regex'),
errors.REGEX_MISMATCH.code,
'regex',
'[a-z]*$',
'aBc',
(),
)
ref_err3 = ValidationError(
('a_dict', 'three'),
('a_dict', 'keysrules', 'type'),
errors.BAD_TYPE.code,
'type',
'integer',
'three',
(),
)
ref_err4 = ValidationError(
('a_dict', 'three'),
('a_dict', 'valuesrules', 'regex'),
errors.REGEX_MISMATCH.code,
'regex',
'[a-z]*$',
'abC',
(),
)
assert det['a_dict'][2].errors[0] == ref_err2
assert det['a_dict']['one'].errors[0] == ref_err1
assert_has_all_errors(det['a_dict']['three'].errors, ref_err3, ref_err4)
assert_has_all_errors(set['a_dict']['keysrules']['type'].errors, ref_err1, ref_err3)
assert_has_all_errors(
set['a_dict']['valuesrules']['regex'].errors, ref_err2, ref_err4
)
assert len(det['a_list'].errors) == 1
assert len(det['a_list'][0].errors) == 1
assert det['a_list'][1] is None
assert len(det['a_list'][2].errors) == 3
assert len(set['a_list'].errors) == 0
assert len(set['a_list']['schema'].errors) == 1
assert len(set['a_list']['schema']['type'].errors) == 1
assert len(set['a_list']['schema']['oneof'][0]['regex'].errors) == 1
assert len(set['a_list']['schema']['oneof'][1]['regex'].errors) == 1
ref_err5 = ValidationError(
('a_list', 0),
('a_list', 'schema', 'type'),
errors.BAD_TYPE.code,
'type',
'string',
0,
(),
)
ref_err6 = ValidationError(
('a_list', 2),
('a_list', 'schema', 'oneof'),
errors.ONEOF.code,
'oneof',
'irrelevant_at_this_point',
'abC',
(),
)
ref_err7 = ValidationError(
('a_list', 2),
('a_list', 'schema', 'oneof', 0, 'regex'),
errors.REGEX_MISMATCH.code,
'regex',
'[a-z]*$',
'abC',
(),
)
ref_err8 = ValidationError(
('a_list', 2),
('a_list', 'schema', 'oneof', 1, 'regex'),
errors.REGEX_MISMATCH.code,
'regex',
'[a-z]*$',
'abC',
(),
)
assert det['a_list'][0].errors[0] == ref_err5
assert_has_all_errors(det['a_list'][2].errors, ref_err6, ref_err7, ref_err8)
assert set['a_list']['schema']['oneof'].errors[0] == ref_err6
assert set['a_list']['schema']['oneof'][0]['regex'].errors[0] == ref_err7
assert set['a_list']['schema']['oneof'][1]['regex'].errors[0] == ref_err8
assert set['a_list']['schema']['type'].errors[0] == ref_err5
def test_path_resolution_for_registry_references():
class CustomValidator(Validator):
def _normalize_coerce_custom(self, value):
raise Exception("Failed coerce")
validator = CustomValidator()
validator.schema_registry.add(
"schema1", {"child": {"type": "boolean", "coerce": "custom"}}
)
validator.schema = {"parent": {"schema": "schema1"}}
validator.validate({"parent": {"child": "["}})
expected = {
'parent': [
{
'child': [
"must be of boolean type",
"field 'child' cannot be coerced: Failed coerce",
]
}
]
}
assert validator.errors == expected
def test_queries():
schema = {'foo': {'type': 'dict', 'schema': {'bar': {'type': 'number'}}}}
document = {'foo': {'bar': 'zero'}}
validator = Validator(schema)
validator(document)
assert 'foo' in validator.document_error_tree
assert 'bar' in validator.document_error_tree['foo']
assert 'foo' in validator.schema_error_tree
assert 'schema' in validator.schema_error_tree['foo']
assert errors.MAPPING_SCHEMA in validator.document_error_tree['foo'].errors
assert errors.MAPPING_SCHEMA in validator.document_error_tree['foo']
assert errors.BAD_TYPE in validator.document_error_tree['foo']['bar']
assert errors.MAPPING_SCHEMA in validator.schema_error_tree['foo']['schema']
assert (
errors.BAD_TYPE in validator.schema_error_tree['foo']['schema']['bar']['type']
)
assert (
validator.document_error_tree['foo'][errors.MAPPING_SCHEMA].child_errors[0].code
== errors.BAD_TYPE.code
)
def test_basic_error_handler():
handler = errors.BasicErrorHandler()
_errors, ref = [], {}
_errors.append(ValidationError(['foo'], ['foo'], 0x63, 'readonly', True, None, ()))
ref.update({'foo': [handler.messages[0x63]]})
assert handler(_errors) == ref
_errors.append(ValidationError(['bar'], ['foo'], 0x42, 'min', 1, 2, ()))
ref.update({'bar': [handler.messages[0x42].format(constraint=1)]})
assert handler(_errors) == ref
_errors.append(
ValidationError(
['zap', 'foo'], ['zap', 'schema', 'foo'], 0x24, 'type', 'string', True, ()
)
)
ref.update({'zap': [{'foo': [handler.messages[0x24].format(constraint='string')]}]})
assert handler(_errors) == ref
_errors.append(
ValidationError(
['zap', 'foo'],
['zap', 'schema', 'foo'],
0x41,
'regex',
'^p[äe]ng$',
'boom',
(),
)
)
ref['zap'][0]['foo'].append(handler.messages[0x41].format(constraint='^p[äe]ng$'))
assert handler(_errors) == ref
def test_basic_error_of_errors(validator):
schema = {'foo': {'oneof': [{'type': 'integer'}, {'type': 'string'}]}}
document = {'foo': 23.42}
error = ('foo', ('foo', 'oneof'), errors.ONEOF, schema['foo']['oneof'], ())
child_errors = [
(error[0], error[1] + (0, 'type'), errors.BAD_TYPE, 'integer'),
(error[0], error[1] + (1, 'type'), errors.BAD_TYPE, 'string'),
]
assert_fail(
document, schema, validator=validator, error=error, child_errors=child_errors
)
assert validator.errors == {
'foo': [
errors.BasicErrorHandler.messages[0x92],
{
'oneof definition 0': ['must be of integer type'],
'oneof definition 1': ['must be of string type'],
},
]
}
def test_wrong_amount_of_items(validator):
# https://github.com/pyeve/cerberus/issues/505
validator.schema = {
'test_list': {
'type': 'list',
'required': True,
'items': [{'type': 'string'}, {'type': 'string'}],
}
}
validator({'test_list': ['test']})
assert validator.errors == {'test_list': ["length of list should be 2, it is 1"]}
-3
View File
@@ -1,3 +0,0 @@
# -*- coding: utf-8 -*-
pass
-543
View File
@@ -1,543 +0,0 @@
# -*- coding: utf-8 -*-
from copy import deepcopy
from tempfile import NamedTemporaryFile
from pytest import mark
from pipenv.vendor.cerberus import Validator, errors
from pipenv.vendor.cerberus.tests import (
assert_fail,
assert_has_error,
assert_normalized,
assert_success,
)
def must_not_be_called(*args, **kwargs):
raise RuntimeError('This shall not be called.')
def test_coerce():
schema = {'amount': {'coerce': int}}
document = {'amount': '1'}
expected = {'amount': 1}
assert_normalized(document, expected, schema)
def test_coerce_in_dictschema():
schema = {'thing': {'type': 'dict', 'schema': {'amount': {'coerce': int}}}}
document = {'thing': {'amount': '2'}}
expected = {'thing': {'amount': 2}}
assert_normalized(document, expected, schema)
def test_coerce_in_listschema():
schema = {'things': {'type': 'list', 'schema': {'coerce': int}}}
document = {'things': ['1', '2', '3']}
expected = {'things': [1, 2, 3]}
assert_normalized(document, expected, schema)
def test_coerce_in_listitems():
schema = {'things': {'type': 'list', 'items': [{'coerce': int}, {'coerce': str}]}}
document = {'things': ['1', 2]}
expected = {'things': [1, '2']}
assert_normalized(document, expected, schema)
validator = Validator(schema)
document['things'].append(3)
assert not validator(document)
assert validator.document['things'] == document['things']
def test_coerce_in_dictschema_in_listschema():
item_schema = {'type': 'dict', 'schema': {'amount': {'coerce': int}}}
schema = {'things': {'type': 'list', 'schema': item_schema}}
document = {'things': [{'amount': '2'}]}
expected = {'things': [{'amount': 2}]}
assert_normalized(document, expected, schema)
def test_coerce_not_destructive():
schema = {'amount': {'coerce': int}}
v = Validator(schema)
doc = {'amount': '1'}
v.validate(doc)
assert v.document is not doc
def test_coerce_catches_ValueError():
schema = {'amount': {'coerce': int}}
_errors = assert_fail({'amount': 'not_a_number'}, schema)
_errors[0].info = () # ignore exception message here
assert_has_error(
_errors, 'amount', ('amount', 'coerce'), errors.COERCION_FAILED, int
)
def test_coerce_in_listitems_catches_ValueError():
schema = {'things': {'type': 'list', 'items': [{'coerce': int}, {'coerce': str}]}}
document = {'things': ['not_a_number', 2]}
_errors = assert_fail(document, schema)
_errors[0].info = () # ignore exception message here
assert_has_error(
_errors,
('things', 0),
('things', 'items', 'coerce'),
errors.COERCION_FAILED,
int,
)
def test_coerce_catches_TypeError():
schema = {'name': {'coerce': str.lower}}
_errors = assert_fail({'name': 1234}, schema)
_errors[0].info = () # ignore exception message here
assert_has_error(
_errors, 'name', ('name', 'coerce'), errors.COERCION_FAILED, str.lower
)
def test_coerce_in_listitems_catches_TypeError():
schema = {
'things': {'type': 'list', 'items': [{'coerce': int}, {'coerce': str.lower}]}
}
document = {'things': ['1', 2]}
_errors = assert_fail(document, schema)
_errors[0].info = () # ignore exception message here
assert_has_error(
_errors,
('things', 1),
('things', 'items', 'coerce'),
errors.COERCION_FAILED,
str.lower,
)
def test_coerce_unknown():
schema = {'foo': {'schema': {}, 'allow_unknown': {'coerce': int}}}
document = {'foo': {'bar': '0'}}
expected = {'foo': {'bar': 0}}
assert_normalized(document, expected, schema)
def test_custom_coerce_and_rename():
class MyNormalizer(Validator):
def __init__(self, multiplier, *args, **kwargs):
super(MyNormalizer, self).__init__(*args, **kwargs)
self.multiplier = multiplier
def _normalize_coerce_multiply(self, value):
return value * self.multiplier
v = MyNormalizer(2, {'foo': {'coerce': 'multiply'}})
assert v.normalized({'foo': 2})['foo'] == 4
v = MyNormalizer(3, allow_unknown={'rename_handler': 'multiply'})
assert v.normalized({3: None}) == {9: None}
def test_coerce_chain():
drop_prefix = lambda x: x[2:] # noqa: E731
upper = lambda x: x.upper() # noqa: E731
schema = {'foo': {'coerce': [hex, drop_prefix, upper]}}
assert_normalized({'foo': 15}, {'foo': 'F'}, schema)
def test_coerce_chain_aborts(validator):
def dont_do_me(value):
raise AssertionError('The coercion chain did not abort after an ' 'error.')
schema = {'foo': {'coerce': [hex, dont_do_me]}}
validator({'foo': '0'}, schema)
assert errors.COERCION_FAILED in validator._errors
def test_coerce_non_digit_in_sequence(validator):
# https://github.com/pyeve/cerberus/issues/211
schema = {'data': {'type': 'list', 'schema': {'type': 'integer', 'coerce': int}}}
document = {'data': ['q']}
assert validator.validated(document, schema) is None
assert (
validator.validated(document, schema, always_return_document=True) == document
) # noqa: W503
def test_nullables_dont_fail_coerce():
schema = {'foo': {'coerce': int, 'nullable': True, 'type': 'integer'}}
document = {'foo': None}
assert_normalized(document, document, schema)
def test_nullables_fail_coerce_on_non_null_values(validator):
def failing_coercion(value):
raise Exception("expected to fail")
schema = {'foo': {'coerce': failing_coercion, 'nullable': True, 'type': 'integer'}}
document = {'foo': None}
assert_normalized(document, document, schema)
validator({'foo': 2}, schema)
assert errors.COERCION_FAILED in validator._errors
def test_normalized():
schema = {'amount': {'coerce': int}}
document = {'amount': '2'}
expected = {'amount': 2}
assert_normalized(document, expected, schema)
def test_rename(validator):
schema = {'foo': {'rename': 'bar'}}
document = {'foo': 0}
expected = {'bar': 0}
# We cannot use assertNormalized here since there is bug where
# Cerberus says that the renamed field is an unknown field:
# {'bar': 'unknown field'}
validator(document, schema, False)
assert validator.document == expected
def test_rename_handler():
validator = Validator(allow_unknown={'rename_handler': int})
schema = {}
document = {'0': 'foo'}
expected = {0: 'foo'}
assert_normalized(document, expected, schema, validator)
def test_purge_unknown():
validator = Validator(purge_unknown=True)
schema = {'foo': {'type': 'string'}}
document = {'bar': 'foo'}
expected = {}
assert_normalized(document, expected, schema, validator)
def test_purge_unknown_in_subschema():
schema = {
'foo': {
'type': 'dict',
'schema': {'foo': {'type': 'string'}},
'purge_unknown': True,
}
}
document = {'foo': {'bar': ''}}
expected = {'foo': {}}
assert_normalized(document, expected, schema)
def test_issue_147_complex():
schema = {'revision': {'coerce': int}}
document = {'revision': '5', 'file': NamedTemporaryFile(mode='w+')}
document['file'].write(r'foobar')
document['file'].seek(0)
normalized = Validator(schema, allow_unknown=True).normalized(document)
assert normalized['revision'] == 5
assert normalized['file'].read() == 'foobar'
document['file'].close()
normalized['file'].close()
def test_issue_147_nested_dict():
schema = {'thing': {'type': 'dict', 'schema': {'amount': {'coerce': int}}}}
ref_obj = '2'
document = {'thing': {'amount': ref_obj}}
normalized = Validator(schema).normalized(document)
assert document is not normalized
assert normalized['thing']['amount'] == 2
assert ref_obj == '2'
assert document['thing']['amount'] is ref_obj
def test_coerce_in_valuesrules():
# https://github.com/pyeve/cerberus/issues/155
schema = {
'thing': {'type': 'dict', 'valuesrules': {'coerce': int, 'type': 'integer'}}
}
document = {'thing': {'amount': '2'}}
expected = {'thing': {'amount': 2}}
assert_normalized(document, expected, schema)
def test_coerce_in_keysrules():
# https://github.com/pyeve/cerberus/issues/155
schema = {
'thing': {'type': 'dict', 'keysrules': {'coerce': int, 'type': 'integer'}}
}
document = {'thing': {'5': 'foo'}}
expected = {'thing': {5: 'foo'}}
assert_normalized(document, expected, schema)
def test_coercion_of_sequence_items(validator):
# https://github.com/pyeve/cerberus/issues/161
schema = {'a_list': {'type': 'list', 'schema': {'type': 'float', 'coerce': float}}}
document = {'a_list': [3, 4, 5]}
expected = {'a_list': [3.0, 4.0, 5.0]}
assert_normalized(document, expected, schema, validator)
for x in validator.document['a_list']:
assert isinstance(x, float)
@mark.parametrize(
'default', ({'default': 'bar_value'}, {'default_setter': lambda doc: 'bar_value'})
)
def test_default_missing(default):
bar_schema = {'type': 'string'}
bar_schema.update(default)
schema = {'foo': {'type': 'string'}, 'bar': bar_schema}
document = {'foo': 'foo_value'}
expected = {'foo': 'foo_value', 'bar': 'bar_value'}
assert_normalized(document, expected, schema)
@mark.parametrize(
'default', ({'default': 'bar_value'}, {'default_setter': must_not_be_called})
)
def test_default_existent(default):
bar_schema = {'type': 'string'}
bar_schema.update(default)
schema = {'foo': {'type': 'string'}, 'bar': bar_schema}
document = {'foo': 'foo_value', 'bar': 'non_default'}
assert_normalized(document, document.copy(), schema)
@mark.parametrize(
'default', ({'default': 'bar_value'}, {'default_setter': must_not_be_called})
)
def test_default_none_nullable(default):
bar_schema = {'type': 'string', 'nullable': True}
bar_schema.update(default)
schema = {'foo': {'type': 'string'}, 'bar': bar_schema}
document = {'foo': 'foo_value', 'bar': None}
assert_normalized(document, document.copy(), schema)
@mark.parametrize(
'default', ({'default': 'bar_value'}, {'default_setter': lambda doc: 'bar_value'})
)
def test_default_none_nonnullable(default):
bar_schema = {'type': 'string', 'nullable': False}
bar_schema.update(default)
schema = {'foo': {'type': 'string'}, 'bar': bar_schema}
document = {'foo': 'foo_value', 'bar': None}
expected = {'foo': 'foo_value', 'bar': 'bar_value'}
assert_normalized(document, expected, schema)
def test_default_none_default_value():
schema = {
'foo': {'type': 'string'},
'bar': {'type': 'string', 'nullable': True, 'default': None},
}
document = {'foo': 'foo_value'}
expected = {'foo': 'foo_value', 'bar': None}
assert_normalized(document, expected, schema)
@mark.parametrize(
'default', ({'default': 'bar_value'}, {'default_setter': lambda doc: 'bar_value'})
)
def test_default_missing_in_subschema(default):
bar_schema = {'type': 'string'}
bar_schema.update(default)
schema = {
'thing': {
'type': 'dict',
'schema': {'foo': {'type': 'string'}, 'bar': bar_schema},
}
}
document = {'thing': {'foo': 'foo_value'}}
expected = {'thing': {'foo': 'foo_value', 'bar': 'bar_value'}}
assert_normalized(document, expected, schema)
def test_depending_default_setters():
schema = {
'a': {'type': 'integer'},
'b': {'type': 'integer', 'default_setter': lambda d: d['a'] + 1},
'c': {'type': 'integer', 'default_setter': lambda d: d['b'] * 2},
'd': {'type': 'integer', 'default_setter': lambda d: d['b'] + d['c']},
}
document = {'a': 1}
expected = {'a': 1, 'b': 2, 'c': 4, 'd': 6}
assert_normalized(document, expected, schema)
def test_circular_depending_default_setters(validator):
schema = {
'a': {'type': 'integer', 'default_setter': lambda d: d['b'] + 1},
'b': {'type': 'integer', 'default_setter': lambda d: d['a'] + 1},
}
validator({}, schema)
assert errors.SETTING_DEFAULT_FAILED in validator._errors
def test_issue_250():
# https://github.com/pyeve/cerberus/issues/250
schema = {
'list': {
'type': 'list',
'schema': {
'type': 'dict',
'allow_unknown': True,
'schema': {'a': {'type': 'string'}},
},
}
}
document = {'list': {'is_a': 'mapping'}}
assert_fail(
document,
schema,
error=('list', ('list', 'type'), errors.BAD_TYPE, schema['list']['type']),
)
def test_issue_250_no_type_pass_on_list():
# https://github.com/pyeve/cerberus/issues/250
schema = {
'list': {
'schema': {
'allow_unknown': True,
'type': 'dict',
'schema': {'a': {'type': 'string'}},
}
}
}
document = {'list': [{'a': 'known', 'b': 'unknown'}]}
assert_normalized(document, document, schema)
def test_issue_250_no_type_fail_on_dict():
# https://github.com/pyeve/cerberus/issues/250
schema = {
'list': {'schema': {'allow_unknown': True, 'schema': {'a': {'type': 'string'}}}}
}
document = {'list': {'a': {'a': 'known'}}}
assert_fail(
document,
schema,
error=(
'list',
('list', 'schema'),
errors.BAD_TYPE_FOR_SCHEMA,
schema['list']['schema'],
),
)
def test_issue_250_no_type_fail_pass_on_other():
# https://github.com/pyeve/cerberus/issues/250
schema = {
'list': {'schema': {'allow_unknown': True, 'schema': {'a': {'type': 'string'}}}}
}
document = {'list': 1}
assert_normalized(document, document, schema)
def test_allow_unknown_with_of_rules():
# https://github.com/pyeve/cerberus/issues/251
schema = {
'test': {
'oneof': [
{
'type': 'dict',
'allow_unknown': True,
'schema': {'known': {'type': 'string'}},
},
{'type': 'dict', 'schema': {'known': {'type': 'string'}}},
]
}
}
# check regression and that allow unknown does not cause any different
# than expected behaviour for one-of.
document = {'test': {'known': 's'}}
assert_fail(
document,
schema,
error=('test', ('test', 'oneof'), errors.ONEOF, schema['test']['oneof']),
)
# check that allow_unknown is actually applied
document = {'test': {'known': 's', 'unknown': 'asd'}}
assert_success(document, schema)
def test_271_normalising_tuples():
# https://github.com/pyeve/cerberus/issues/271
schema = {
'my_field': {'type': 'list', 'schema': {'type': ('string', 'number', 'dict')}}
}
document = {'my_field': ('foo', 'bar', 42, 'albert', 'kandinsky', {'items': 23})}
assert_success(document, schema)
normalized = Validator(schema).normalized(document)
assert normalized['my_field'] == (
'foo',
'bar',
42,
'albert',
'kandinsky',
{'items': 23},
)
def test_allow_unknown_wo_schema():
# https://github.com/pyeve/cerberus/issues/302
v = Validator({'a': {'type': 'dict', 'allow_unknown': True}})
v({'a': {}})
def test_allow_unknown_with_purge_unknown():
validator = Validator(purge_unknown=True)
schema = {'foo': {'type': 'dict', 'allow_unknown': True}}
document = {'foo': {'bar': True}, 'bar': 'foo'}
expected = {'foo': {'bar': True}}
assert_normalized(document, expected, schema, validator)
def test_allow_unknown_with_purge_unknown_subdocument():
validator = Validator(purge_unknown=True)
schema = {
'foo': {
'type': 'dict',
'schema': {'bar': {'type': 'string'}},
'allow_unknown': True,
}
}
document = {'foo': {'bar': 'baz', 'corge': False}, 'thud': 'xyzzy'}
expected = {'foo': {'bar': 'baz', 'corge': False}}
assert_normalized(document, expected, schema, validator)
def test_purge_readonly():
schema = {
'description': {'type': 'string', 'maxlength': 500},
'last_updated': {'readonly': True},
}
validator = Validator(schema=schema, purge_readonly=True)
document = {'description': 'it is a thing'}
expected = deepcopy(document)
document['last_updated'] = 'future'
assert_normalized(document, expected, validator=validator)
def test_defaults_in_allow_unknown_schema():
schema = {'meta': {'type': 'dict'}, 'version': {'type': 'string'}}
allow_unknown = {
'type': 'dict',
'schema': {
'cfg_path': {'type': 'string', 'default': 'cfg.yaml'},
'package': {'type': 'string'},
},
}
validator = Validator(schema=schema, allow_unknown=allow_unknown)
document = {'version': '1.2.3', 'plugin_foo': {'package': 'foo'}}
expected = {
'version': '1.2.3',
'plugin_foo': {'package': 'foo', 'cfg_path': 'cfg.yaml'},
}
assert_normalized(document, expected, schema, validator)
-84
View File
@@ -1,84 +0,0 @@
# -*- coding: utf-8 -*-
from pipenv.vendor.cerberus import schema_registry, rules_set_registry, Validator
from pipenv.vendor.cerberus.tests import (
assert_fail,
assert_normalized,
assert_schema_error,
assert_success,
)
def test_schema_registry_simple():
schema_registry.add('foo', {'bar': {'type': 'string'}})
schema = {'a': {'schema': 'foo'}, 'b': {'schema': 'foo'}}
document = {'a': {'bar': 'a'}, 'b': {'bar': 'b'}}
assert_success(document, schema)
def test_top_level_reference():
schema_registry.add('peng', {'foo': {'type': 'integer'}})
document = {'foo': 42}
assert_success(document, 'peng')
def test_rules_set_simple():
rules_set_registry.add('foo', {'type': 'integer'})
assert_success({'bar': 1}, {'bar': 'foo'})
assert_fail({'bar': 'one'}, {'bar': 'foo'})
def test_allow_unknown_as_reference():
rules_set_registry.add('foo', {'type': 'number'})
v = Validator(allow_unknown='foo')
assert_success({0: 1}, {}, v)
assert_fail({0: 'one'}, {}, v)
def test_recursion():
rules_set_registry.add('self', {'type': 'dict', 'allow_unknown': 'self'})
v = Validator(allow_unknown='self')
assert_success({0: {1: {2: {}}}}, {}, v)
def test_references_remain_unresolved(validator):
rules_set_registry.extend(
(('boolean', {'type': 'boolean'}), ('booleans', {'valuesrules': 'boolean'}))
)
validator.schema = {'foo': 'booleans'}
assert 'booleans' == validator.schema['foo']
assert 'boolean' == rules_set_registry._storage['booleans']['valuesrules']
def test_rules_registry_with_anyof_type():
rules_set_registry.add('string_or_integer', {'anyof_type': ['string', 'integer']})
schema = {'soi': 'string_or_integer'}
assert_success({'soi': 'hello'}, schema)
def test_schema_registry_with_anyof_type():
schema_registry.add('soi_id', {'id': {'anyof_type': ['string', 'integer']}})
schema = {'soi': {'schema': 'soi_id'}}
assert_success({'soi': {'id': 'hello'}}, schema)
def test_normalization_with_rules_set():
# https://github.com/pyeve/cerberus/issues/283
rules_set_registry.add('foo', {'default': 42})
assert_normalized({}, {'bar': 42}, {'bar': 'foo'})
rules_set_registry.add('foo', {'default_setter': lambda _: 42})
assert_normalized({}, {'bar': 42}, {'bar': 'foo'})
rules_set_registry.add('foo', {'type': 'integer', 'nullable': True})
assert_success({'bar': None}, {'bar': 'foo'})
def test_rules_set_with_dict_field():
document = {'a_dict': {'foo': 1}}
schema = {'a_dict': {'type': 'dict', 'schema': {'foo': 'rule'}}}
# the schema's not yet added to the valid ones, so test the faulty first
rules_set_registry.add('rule', {'tüpe': 'integer'})
assert_schema_error(document, schema)
rules_set_registry.add('rule', {'type': 'integer'})
assert_success(document, schema)
-178
View File
@@ -1,178 +0,0 @@
# -*- coding: utf-8 -*-
import re
import pytest
from pipenv.vendor.cerberus import Validator, errors, SchemaError
from pipenv.vendor.cerberus.schema import UnvalidatedSchema
from pipenv.vendor.cerberus.tests import assert_schema_error
def test_empty_schema():
validator = Validator()
with pytest.raises(SchemaError, match=errors.SCHEMA_ERROR_MISSING):
validator({}, schema=None)
def test_bad_schema_type(validator):
schema = "this string should really be dict"
msg = errors.SCHEMA_ERROR_DEFINITION_TYPE.format(schema)
with pytest.raises(SchemaError, match=msg):
validator.schema = schema
def test_bad_schema_type_field(validator):
field = 'foo'
schema = {field: {'schema': {'bar': {'type': 'strong'}}}}
with pytest.raises(SchemaError):
validator.schema = schema
def test_unknown_rule(validator):
msg = "{'foo': [{'unknown': ['unknown rule']}]}"
with pytest.raises(SchemaError, match=re.escape(msg)):
validator.schema = {'foo': {'unknown': 'rule'}}
def test_unknown_type(validator):
msg = str({'foo': [{'type': ['Unsupported types: unknown']}]})
with pytest.raises(SchemaError, match=re.escape(msg)):
validator.schema = {'foo': {'type': 'unknown'}}
def test_bad_schema_definition(validator):
field = 'name'
msg = str({field: ['must be of dict type']})
with pytest.raises(SchemaError, match=re.escape(msg)):
validator.schema = {field: 'this should really be a dict'}
def test_bad_of_rules():
schema = {'foo': {'anyof': {'type': 'string'}}}
assert_schema_error({}, schema)
def test_normalization_rules_are_invalid_in_of_rules():
schema = {0: {'anyof': [{'coerce': lambda x: x}]}}
assert_schema_error({}, schema)
def test_anyof_allof_schema_validate():
# make sure schema with 'anyof' and 'allof' constraints are checked
# correctly
schema = {
'doc': {'type': 'dict', 'anyof': [{'schema': [{'param': {'type': 'number'}}]}]}
}
assert_schema_error({'doc': 'this is my document'}, schema)
schema = {
'doc': {'type': 'dict', 'allof': [{'schema': [{'param': {'type': 'number'}}]}]}
}
assert_schema_error({'doc': 'this is my document'}, schema)
def test_repr():
v = Validator({'foo': {'type': 'string'}})
assert repr(v.schema) == "{'foo': {'type': 'string'}}"
def test_validated_schema_cache():
v = Validator({'foozifix': {'coerce': int}})
cache_size = len(v._valid_schemas)
v = Validator({'foozifix': {'type': 'integer'}})
cache_size += 1
assert len(v._valid_schemas) == cache_size
v = Validator({'foozifix': {'coerce': int}})
assert len(v._valid_schemas) == cache_size
max_cache_size = 163
assert cache_size <= max_cache_size, (
"There's an unexpected high amount (%s) of cached valid "
"definition schemas. Unless you added further tests, "
"there are good chances that something is wrong. "
"If you added tests with new schemas, you can try to "
"adjust the variable `max_cache_size` according to "
"the added schemas." % cache_size
)
def test_expansion_in_nested_schema():
schema = {'detroit': {'schema': {'anyof_regex': ['^Aladdin', 'Sane$']}}}
v = Validator(schema)
assert v.schema['detroit']['schema'] == {
'anyof': [{'regex': '^Aladdin'}, {'regex': 'Sane$'}]
}
def test_unvalidated_schema_can_be_copied():
schema = UnvalidatedSchema()
schema_copy = schema.copy()
assert schema_copy == schema
# TODO remove with next major release
def test_deprecated_rule_names_in_valueschema():
def check_with(field, value, error):
pass
schema = {
"field_1": {
"type": "dict",
"valueschema": {
"type": "dict",
"keyschema": {"type": "string"},
"valueschema": {"type": "string"},
},
},
"field_2": {
"type": "list",
"items": [
{"keyschema": {}},
{"validator": check_with},
{"valueschema": {}},
],
},
}
validator = Validator(schema)
assert validator.schema == {
"field_1": {
"type": "dict",
"valuesrules": {
"type": "dict",
"keysrules": {"type": "string"},
"valuesrules": {"type": "string"},
},
},
"field_2": {
"type": "list",
"items": [
{"keysrules": {}},
{"check_with": check_with},
{"valuesrules": {}},
],
},
}
def test_anyof_check_with():
def foo(field, value, error):
pass
def bar(field, value, error):
pass
schema = {'field': {'anyof_check_with': [foo, bar]}}
validator = Validator(schema)
assert validator.schema == {
'field': {'anyof': [{'check_with': foo}, {'check_with': bar}]}
}
def test_rulename_space_is_normalized():
Validator(schema={"field": {"default setter": lambda x: x, "type": "string"}})
-11
View File
@@ -1,11 +0,0 @@
from pipenv.vendor.cerberus.utils import compare_paths_lt
def test_compare_paths():
lesser = ('a_dict', 'keysrules')
greater = ('a_dict', 'valuesrules')
assert compare_paths_lt(lesser, greater)
lesser += ('type',)
greater += ('regex',)
assert compare_paths_lt(lesser, greater)
File diff suppressed because it is too large Load Diff
-22
View File
@@ -1,22 +0,0 @@
"wheel" copyright (c) 2012-2014 Daniel Holth <dholth@fastmail.fm> and
contributors.
The MIT License
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
-1
View File
@@ -1 +0,0 @@
__version__ = '0.37.1'
-19
View File
@@ -1,19 +0,0 @@
"""
Wheel command line tool (enable python -m wheel syntax)
"""
import sys
def main(): # needed for console script
if __package__ == '':
# To be able to run 'python wheel-0.9.whl/wheel':
import os.path
path = os.path.dirname(os.path.dirname(__file__))
sys.path[0:0] = [path]
import wheel.cli
sys.exit(wheel.cli.main())
if __name__ == "__main__":
sys.exit(main())
-492
View File
@@ -1,492 +0,0 @@
"""
Create a wheel (.whl) distribution.
A wheel is a built archive format.
"""
import distutils
import os
import shutil
import stat
import sys
import re
import warnings
from collections import OrderedDict
from distutils.core import Command
from distutils import log as logger
from io import BytesIO
from glob import iglob
from shutil import rmtree
from sysconfig import get_config_var
from zipfile import ZIP_DEFLATED, ZIP_STORED
import pipenv.patched.pip._vendor.pkg_resources as pkg_resources
from .pkginfo import write_pkg_info
from .macosx_libfile import calculate_macosx_platform_tag
from .metadata import pkginfo_to_metadata
from .vendored.packaging import tags
from .wheelfile import WheelFile
from . import __version__ as wheel_version
if sys.version_info < (3,):
from email.generator import Generator as BytesGenerator
else:
from email.generator import BytesGenerator
safe_name = pkg_resources.safe_name
safe_version = pkg_resources.safe_version
PY_LIMITED_API_PATTERN = r'cp3\d'
def python_tag():
return 'py{}'.format(sys.version_info[0])
def get_platform(archive_root):
"""Return our platform name 'win32', 'linux_x86_64'"""
# XXX remove distutils dependency
result = distutils.util.get_platform()
if result.startswith("macosx") and archive_root is not None:
result = calculate_macosx_platform_tag(archive_root, result)
if result == "linux_x86_64" and sys.maxsize == 2147483647:
# pip pull request #3497
result = "linux_i686"
return result
def get_flag(var, fallback, expected=True, warn=True):
"""Use a fallback value for determining SOABI flags if the needed config
var is unset or unavailable."""
val = get_config_var(var)
if val is None:
if warn:
warnings.warn("Config variable '{0}' is unset, Python ABI tag may "
"be incorrect".format(var), RuntimeWarning, 2)
return fallback
return val == expected
def get_abi_tag():
"""Return the ABI tag based on SOABI (if available) or emulate SOABI
(CPython 2, PyPy)."""
soabi = get_config_var('SOABI')
impl = tags.interpreter_name()
if not soabi and impl in ('cp', 'pp') and hasattr(sys, 'maxunicode'):
d = ''
m = ''
u = ''
if get_flag('Py_DEBUG',
hasattr(sys, 'gettotalrefcount'),
warn=(impl == 'cp')):
d = 'd'
if get_flag('WITH_PYMALLOC',
impl == 'cp',
warn=(impl == 'cp' and
sys.version_info < (3, 8))) \
and sys.version_info < (3, 8):
m = 'm'
if get_flag('Py_UNICODE_SIZE',
sys.maxunicode == 0x10ffff,
expected=4,
warn=(impl == 'cp' and
sys.version_info < (3, 3))) \
and sys.version_info < (3, 3):
u = 'u'
abi = '%s%s%s%s%s' % (impl, tags.interpreter_version(), d, m, u)
elif soabi and soabi.startswith('cpython-'):
abi = 'cp' + soabi.split('-')[1]
elif soabi and soabi.startswith('pypy-'):
# we want something like pypy36-pp73
abi = '-'.join(soabi.split('-')[:2])
abi = abi.replace('.', '_').replace('-', '_')
elif soabi:
abi = soabi.replace('.', '_').replace('-', '_')
else:
abi = None
return abi
def safer_name(name):
return safe_name(name).replace('-', '_')
def safer_version(version):
return safe_version(version).replace('-', '_')
def remove_readonly(func, path, excinfo):
print(str(excinfo[1]))
os.chmod(path, stat.S_IWRITE)
func(path)
class bdist_wheel(Command):
description = 'create a wheel distribution'
supported_compressions = OrderedDict([
('stored', ZIP_STORED),
('deflated', ZIP_DEFLATED)
])
user_options = [('bdist-dir=', 'b',
"temporary directory for creating the distribution"),
('plat-name=', 'p',
"platform name to embed in generated filenames "
"(default: %s)" % get_platform(None)),
('keep-temp', 'k',
"keep the pseudo-installation tree around after " +
"creating the distribution archive"),
('dist-dir=', 'd',
"directory to put final built distributions in"),
('skip-build', None,
"skip rebuilding everything (for testing/debugging)"),
('relative', None,
"build the archive using relative paths "
"(default: false)"),
('owner=', 'u',
"Owner name used when creating a tar file"
" [default: current user]"),
('group=', 'g',
"Group name used when creating a tar file"
" [default: current group]"),
('universal', None,
"make a universal wheel"
" (default: false)"),
('compression=', None,
"zipfile compression (one of: {})"
" (default: 'deflated')"
.format(', '.join(supported_compressions))),
('python-tag=', None,
"Python implementation compatibility tag"
" (default: '%s')" % (python_tag())),
('build-number=', None,
"Build number for this particular version. "
"As specified in PEP-0427, this must start with a digit. "
"[default: None]"),
('py-limited-api=', None,
"Python tag (cp32|cp33|cpNN) for abi3 wheel tag"
" (default: false)"),
]
boolean_options = ['keep-temp', 'skip-build', 'relative', 'universal']
def initialize_options(self):
self.bdist_dir = None
self.data_dir = None
self.plat_name = None
self.plat_tag = None
self.format = 'zip'
self.keep_temp = False
self.dist_dir = None
self.egginfo_dir = None
self.root_is_pure = None
self.skip_build = None
self.relative = False
self.owner = None
self.group = None
self.universal = False
self.compression = 'deflated'
self.python_tag = python_tag()
self.build_number = None
self.py_limited_api = False
self.plat_name_supplied = False
def finalize_options(self):
if self.bdist_dir is None:
bdist_base = self.get_finalized_command('bdist').bdist_base
self.bdist_dir = os.path.join(bdist_base, 'wheel')
self.data_dir = self.wheel_dist_name + '.data'
self.plat_name_supplied = self.plat_name is not None
try:
self.compression = self.supported_compressions[self.compression]
except KeyError:
raise ValueError('Unsupported compression: {}'.format(self.compression))
need_options = ('dist_dir', 'plat_name', 'skip_build')
self.set_undefined_options('bdist',
*zip(need_options, need_options))
self.root_is_pure = not (self.distribution.has_ext_modules()
or self.distribution.has_c_libraries())
if self.py_limited_api and not re.match(PY_LIMITED_API_PATTERN, self.py_limited_api):
raise ValueError("py-limited-api must match '%s'" % PY_LIMITED_API_PATTERN)
# Support legacy [wheel] section for setting universal
wheel = self.distribution.get_option_dict('wheel')
if 'universal' in wheel:
# please don't define this in your global configs
logger.warn('The [wheel] section is deprecated. Use [bdist_wheel] instead.')
val = wheel['universal'][1].strip()
if val.lower() in ('1', 'true', 'yes'):
self.universal = True
if self.build_number is not None and not self.build_number[:1].isdigit():
raise ValueError("Build tag (build-number) must start with a digit.")
@property
def wheel_dist_name(self):
"""Return distribution full name with - replaced with _"""
components = (safer_name(self.distribution.get_name()),
safer_version(self.distribution.get_version()))
if self.build_number:
components += (self.build_number,)
return '-'.join(components)
def get_tag(self):
# bdist sets self.plat_name if unset, we should only use it for purepy
# wheels if the user supplied it.
if self.plat_name_supplied:
plat_name = self.plat_name
elif self.root_is_pure:
plat_name = 'any'
else:
# macosx contains system version in platform name so need special handle
if self.plat_name and not self.plat_name.startswith("macosx"):
plat_name = self.plat_name
else:
# on macosx always limit the platform name to comply with any
# c-extension modules in bdist_dir, since the user can specify
# a higher MACOSX_DEPLOYMENT_TARGET via tools like CMake
# on other platforms, and on macosx if there are no c-extension
# modules, use the default platform name.
plat_name = get_platform(self.bdist_dir)
if plat_name in ('linux-x86_64', 'linux_x86_64') and sys.maxsize == 2147483647:
plat_name = 'linux_i686'
plat_name = plat_name.lower().replace('-', '_').replace('.', '_')
if self.root_is_pure:
if self.universal:
impl = 'py2.py3'
else:
impl = self.python_tag
tag = (impl, 'none', plat_name)
else:
impl_name = tags.interpreter_name()
impl_ver = tags.interpreter_version()
impl = impl_name + impl_ver
# We don't work on CPython 3.1, 3.0.
if self.py_limited_api and (impl_name + impl_ver).startswith('cp3'):
impl = self.py_limited_api
abi_tag = 'abi3'
else:
abi_tag = str(get_abi_tag()).lower()
tag = (impl, abi_tag, plat_name)
# issue gh-374: allow overriding plat_name
supported_tags = [(t.interpreter, t.abi, plat_name)
for t in tags.sys_tags()]
assert tag in supported_tags, "would build wheel with unsupported tag {}".format(tag)
return tag
def run(self):
build_scripts = self.reinitialize_command('build_scripts')
build_scripts.executable = 'python'
build_scripts.force = True
build_ext = self.reinitialize_command('build_ext')
build_ext.inplace = False
if not self.skip_build:
self.run_command('build')
install = self.reinitialize_command('install',
reinit_subcommands=True)
install.root = self.bdist_dir
install.compile = False
install.skip_build = self.skip_build
install.warn_dir = False
# A wheel without setuptools scripts is more cross-platform.
# Use the (undocumented) `no_ep` option to setuptools'
# install_scripts command to avoid creating entry point scripts.
install_scripts = self.reinitialize_command('install_scripts')
install_scripts.no_ep = True
# Use a custom scheme for the archive, because we have to decide
# at installation time which scheme to use.
for key in ('headers', 'scripts', 'data', 'purelib', 'platlib'):
setattr(install,
'install_' + key,
os.path.join(self.data_dir, key))
basedir_observed = ''
if os.name == 'nt':
# win32 barfs if any of these are ''; could be '.'?
# (distutils.command.install:change_roots bug)
basedir_observed = os.path.normpath(os.path.join(self.data_dir, '..'))
self.install_libbase = self.install_lib = basedir_observed
setattr(install,
'install_purelib' if self.root_is_pure else 'install_platlib',
basedir_observed)
logger.info("installing to %s", self.bdist_dir)
self.run_command('install')
impl_tag, abi_tag, plat_tag = self.get_tag()
archive_basename = "{}-{}-{}-{}".format(self.wheel_dist_name, impl_tag, abi_tag, plat_tag)
if not self.relative:
archive_root = self.bdist_dir
else:
archive_root = os.path.join(
self.bdist_dir,
self._ensure_relative(install.install_base))
self.set_undefined_options('install_egg_info', ('target', 'egginfo_dir'))
distinfo_dirname = '{}-{}.dist-info'.format(
safer_name(self.distribution.get_name()),
safer_version(self.distribution.get_version()))
distinfo_dir = os.path.join(self.bdist_dir, distinfo_dirname)
self.egg2dist(self.egginfo_dir, distinfo_dir)
self.write_wheelfile(distinfo_dir)
# Make the archive
if not os.path.exists(self.dist_dir):
os.makedirs(self.dist_dir)
wheel_path = os.path.join(self.dist_dir, archive_basename + '.whl')
with WheelFile(wheel_path, 'w', self.compression) as wf:
wf.write_files(archive_root)
# Add to 'Distribution.dist_files' so that the "upload" command works
getattr(self.distribution, 'dist_files', []).append(
('bdist_wheel',
'{}.{}'.format(*sys.version_info[:2]), # like 3.7
wheel_path))
if not self.keep_temp:
logger.info('removing %s', self.bdist_dir)
if not self.dry_run:
rmtree(self.bdist_dir, onerror=remove_readonly)
def write_wheelfile(self, wheelfile_base, generator='bdist_wheel (' + wheel_version + ')'):
from email.message import Message
# Workaround for Python 2.7 for when "generator" is unicode
if sys.version_info < (3,) and not isinstance(generator, str):
generator = generator.encode('utf-8')
msg = Message()
msg['Wheel-Version'] = '1.0' # of the spec
msg['Generator'] = generator
msg['Root-Is-Purelib'] = str(self.root_is_pure).lower()
if self.build_number is not None:
msg['Build'] = self.build_number
# Doesn't work for bdist_wininst
impl_tag, abi_tag, plat_tag = self.get_tag()
for impl in impl_tag.split('.'):
for abi in abi_tag.split('.'):
for plat in plat_tag.split('.'):
msg['Tag'] = '-'.join((impl, abi, plat))
wheelfile_path = os.path.join(wheelfile_base, 'WHEEL')
logger.info('creating %s', wheelfile_path)
buffer = BytesIO()
BytesGenerator(buffer, maxheaderlen=0).flatten(msg)
with open(wheelfile_path, 'wb') as f:
f.write(buffer.getvalue().replace(b'\r\n', b'\r'))
def _ensure_relative(self, path):
# copied from dir_util, deleted
drive, path = os.path.splitdrive(path)
if path[0:1] == os.sep:
path = drive + path[1:]
return path
@property
def license_paths(self):
metadata = self.distribution.get_option_dict('metadata')
files = set()
patterns = sorted({
option for option in metadata.get('license_files', ('', ''))[1].split()
})
if 'license_file' in metadata:
warnings.warn('The "license_file" option is deprecated. Use '
'"license_files" instead.', DeprecationWarning)
files.add(metadata['license_file'][1])
if 'license_file' not in metadata and 'license_files' not in metadata:
patterns = ('LICEN[CS]E*', 'COPYING*', 'NOTICE*', 'AUTHORS*')
for pattern in patterns:
for path in iglob(pattern):
if path.endswith('~'):
logger.debug('ignoring license file "%s" as it looks like a backup', path)
continue
if path not in files and os.path.isfile(path):
logger.info('adding license file "%s" (matched pattern "%s")', path, pattern)
files.add(path)
return files
def egg2dist(self, egginfo_path, distinfo_path):
"""Convert an .egg-info directory into a .dist-info directory"""
def adios(p):
"""Appropriately delete directory, file or link."""
if os.path.exists(p) and not os.path.islink(p) and os.path.isdir(p):
shutil.rmtree(p)
elif os.path.exists(p):
os.unlink(p)
adios(distinfo_path)
if not os.path.exists(egginfo_path):
# There is no egg-info. This is probably because the egg-info
# file/directory is not named matching the distribution name used
# to name the archive file. Check for this case and report
# accordingly.
import glob
pat = os.path.join(os.path.dirname(egginfo_path), '*.egg-info')
possible = glob.glob(pat)
err = "Egg metadata expected at %s but not found" % (egginfo_path,)
if possible:
alt = os.path.basename(possible[0])
err += " (%s found - possible misnamed archive file?)" % (alt,)
raise ValueError(err)
if os.path.isfile(egginfo_path):
# .egg-info is a single file
pkginfo_path = egginfo_path
pkg_info = pkginfo_to_metadata(egginfo_path, egginfo_path)
os.mkdir(distinfo_path)
else:
# .egg-info is a directory
pkginfo_path = os.path.join(egginfo_path, 'PKG-INFO')
pkg_info = pkginfo_to_metadata(egginfo_path, pkginfo_path)
# ignore common egg metadata that is useless to wheel
shutil.copytree(egginfo_path, distinfo_path,
ignore=lambda x, y: {'PKG-INFO', 'requires.txt', 'SOURCES.txt',
'not-zip-safe'}
)
# delete dependency_links if it is only whitespace
dependency_links_path = os.path.join(distinfo_path, 'dependency_links.txt')
with open(dependency_links_path, 'r') as dependency_links_file:
dependency_links = dependency_links_file.read().strip()
if not dependency_links:
adios(dependency_links_path)
write_pkg_info(os.path.join(distinfo_path, 'METADATA'), pkg_info)
for license_path in self.license_paths:
filename = os.path.basename(license_path)
shutil.copy(license_path, os.path.join(distinfo_path, filename))
adios(egginfo_path)
-88
View File
@@ -1,88 +0,0 @@
"""
Wheel command-line utility.
"""
from __future__ import print_function
import argparse
import os
import sys
def require_pkgresources(name):
try:
import pipenv.patched.pip._vendor.pkg_resources as pkg_resources # noqa: F401
except ImportError:
raise RuntimeError("'{0}' needs pkg_resources (part of setuptools).".format(name))
class WheelError(Exception):
pass
def unpack_f(args):
from .unpack import unpack
unpack(args.wheelfile, args.dest)
def pack_f(args):
from .pack import pack
pack(args.directory, args.dest_dir, args.build_number)
def convert_f(args):
from .convert import convert
convert(args.files, args.dest_dir, args.verbose)
def version_f(args):
from .. import __version__
print("wheel %s" % __version__)
def parser():
p = argparse.ArgumentParser()
s = p.add_subparsers(help="commands")
unpack_parser = s.add_parser('unpack', help='Unpack wheel')
unpack_parser.add_argument('--dest', '-d', help='Destination directory',
default='.')
unpack_parser.add_argument('wheelfile', help='Wheel file')
unpack_parser.set_defaults(func=unpack_f)
repack_parser = s.add_parser('pack', help='Repack wheel')
repack_parser.add_argument('directory', help='Root directory of the unpacked wheel')
repack_parser.add_argument('--dest-dir', '-d', default=os.path.curdir,
help="Directory to store the wheel (default %(default)s)")
repack_parser.add_argument('--build-number', help="Build tag to use in the wheel name")
repack_parser.set_defaults(func=pack_f)
convert_parser = s.add_parser('convert', help='Convert egg or wininst to wheel')
convert_parser.add_argument('files', nargs='*', help='Files to convert')
convert_parser.add_argument('--dest-dir', '-d', default=os.path.curdir,
help="Directory to store wheels (default %(default)s)")
convert_parser.add_argument('--verbose', '-v', action='store_true')
convert_parser.set_defaults(func=convert_f)
version_parser = s.add_parser('version', help='Print version and exit')
version_parser.set_defaults(func=version_f)
help_parser = s.add_parser('help', help='Show this help')
help_parser.set_defaults(func=lambda args: p.print_help())
return p
def main():
p = parser()
args = p.parse_args()
if not hasattr(args, 'func'):
p.print_help()
else:
try:
args.func(args)
return 0
except WheelError as e:
print(e, file=sys.stderr)
return 1
-269
View File
@@ -1,269 +0,0 @@
import os.path
import re
import shutil
import sys
import tempfile
import zipfile
from distutils import dist
from glob import iglob
from ..bdist_wheel import bdist_wheel
from ..wheelfile import WheelFile
from . import WheelError, require_pkgresources
egg_info_re = re.compile(r'''
(?P<name>.+?)-(?P<ver>.+?)
(-(?P<pyver>py\d\.\d+)
(-(?P<arch>.+?))?
)?.egg$''', re.VERBOSE)
class _bdist_wheel_tag(bdist_wheel):
# allow the client to override the default generated wheel tag
# The default bdist_wheel implementation uses python and abi tags
# of the running python process. This is not suitable for
# generating/repackaging prebuild binaries.
full_tag_supplied = False
full_tag = None # None or a (pytag, soabitag, plattag) triple
def get_tag(self):
if self.full_tag_supplied and self.full_tag is not None:
return self.full_tag
else:
return bdist_wheel.get_tag(self)
def egg2wheel(egg_path, dest_dir):
filename = os.path.basename(egg_path)
match = egg_info_re.match(filename)
if not match:
raise WheelError('Invalid egg file name: {}'.format(filename))
egg_info = match.groupdict()
dir = tempfile.mkdtemp(suffix="_e2w")
if os.path.isfile(egg_path):
# assume we have a bdist_egg otherwise
with zipfile.ZipFile(egg_path) as egg:
egg.extractall(dir)
else:
# support buildout-style installed eggs directories
for pth in os.listdir(egg_path):
src = os.path.join(egg_path, pth)
if os.path.isfile(src):
shutil.copy2(src, dir)
else:
shutil.copytree(src, os.path.join(dir, pth))
pyver = egg_info['pyver']
if pyver:
pyver = egg_info['pyver'] = pyver.replace('.', '')
arch = (egg_info['arch'] or 'any').replace('.', '_').replace('-', '_')
# assume all binary eggs are for CPython
abi = 'cp' + pyver[2:] if arch != 'any' else 'none'
root_is_purelib = egg_info['arch'] is None
if root_is_purelib:
bw = bdist_wheel(dist.Distribution())
else:
bw = _bdist_wheel_tag(dist.Distribution())
bw.root_is_pure = root_is_purelib
bw.python_tag = pyver
bw.plat_name_supplied = True
bw.plat_name = egg_info['arch'] or 'any'
if not root_is_purelib:
bw.full_tag_supplied = True
bw.full_tag = (pyver, abi, arch)
dist_info_dir = os.path.join(dir, '{name}-{ver}.dist-info'.format(**egg_info))
bw.egg2dist(os.path.join(dir, 'EGG-INFO'), dist_info_dir)
bw.write_wheelfile(dist_info_dir, generator='egg2wheel')
wheel_name = '{name}-{ver}-{pyver}-{}-{}.whl'.format(abi, arch, **egg_info)
with WheelFile(os.path.join(dest_dir, wheel_name), 'w') as wf:
wf.write_files(dir)
shutil.rmtree(dir)
def parse_wininst_info(wininfo_name, egginfo_name):
"""Extract metadata from filenames.
Extracts the 4 metadataitems needed (name, version, pyversion, arch) from
the installer filename and the name of the egg-info directory embedded in
the zipfile (if any).
The egginfo filename has the format::
name-ver(-pyver)(-arch).egg-info
The installer filename has the format::
name-ver.arch(-pyver).exe
Some things to note:
1. The installer filename is not definitive. An installer can be renamed
and work perfectly well as an installer. So more reliable data should
be used whenever possible.
2. The egg-info data should be preferred for the name and version, because
these come straight from the distutils metadata, and are mandatory.
3. The pyver from the egg-info data should be ignored, as it is
constructed from the version of Python used to build the installer,
which is irrelevant - the installer filename is correct here (even to
the point that when it's not there, any version is implied).
4. The architecture must be taken from the installer filename, as it is
not included in the egg-info data.
5. Architecture-neutral installers still have an architecture because the
installer format itself (being executable) is architecture-specific. We
should therefore ignore the architecture if the content is pure-python.
"""
egginfo = None
if egginfo_name:
egginfo = egg_info_re.search(egginfo_name)
if not egginfo:
raise ValueError("Egg info filename %s is not valid" % (egginfo_name,))
# Parse the wininst filename
# 1. Distribution name (up to the first '-')
w_name, sep, rest = wininfo_name.partition('-')
if not sep:
raise ValueError("Installer filename %s is not valid" % (wininfo_name,))
# Strip '.exe'
rest = rest[:-4]
# 2. Python version (from the last '-', must start with 'py')
rest2, sep, w_pyver = rest.rpartition('-')
if sep and w_pyver.startswith('py'):
rest = rest2
w_pyver = w_pyver.replace('.', '')
else:
# Not version specific - use py2.py3. While it is possible that
# pure-Python code is not compatible with both Python 2 and 3, there
# is no way of knowing from the wininst format, so we assume the best
# here (the user can always manually rename the wheel to be more
# restrictive if needed).
w_pyver = 'py2.py3'
# 3. Version and architecture
w_ver, sep, w_arch = rest.rpartition('.')
if not sep:
raise ValueError("Installer filename %s is not valid" % (wininfo_name,))
if egginfo:
w_name = egginfo.group('name')
w_ver = egginfo.group('ver')
return {'name': w_name, 'ver': w_ver, 'arch': w_arch, 'pyver': w_pyver}
def wininst2wheel(path, dest_dir):
with zipfile.ZipFile(path) as bdw:
# Search for egg-info in the archive
egginfo_name = None
for filename in bdw.namelist():
if '.egg-info' in filename:
egginfo_name = filename
break
info = parse_wininst_info(os.path.basename(path), egginfo_name)
root_is_purelib = True
for zipinfo in bdw.infolist():
if zipinfo.filename.startswith('PLATLIB'):
root_is_purelib = False
break
if root_is_purelib:
paths = {'purelib': ''}
else:
paths = {'platlib': ''}
dist_info = "%(name)s-%(ver)s" % info
datadir = "%s.data/" % dist_info
# rewrite paths to trick ZipFile into extracting an egg
# XXX grab wininst .ini - between .exe, padding, and first zip file.
members = []
egginfo_name = ''
for zipinfo in bdw.infolist():
key, basename = zipinfo.filename.split('/', 1)
key = key.lower()
basepath = paths.get(key, None)
if basepath is None:
basepath = datadir + key.lower() + '/'
oldname = zipinfo.filename
newname = basepath + basename
zipinfo.filename = newname
del bdw.NameToInfo[oldname]
bdw.NameToInfo[newname] = zipinfo
# Collect member names, but omit '' (from an entry like "PLATLIB/"
if newname:
members.append(newname)
# Remember egg-info name for the egg2dist call below
if not egginfo_name:
if newname.endswith('.egg-info'):
egginfo_name = newname
elif '.egg-info/' in newname:
egginfo_name, sep, _ = newname.rpartition('/')
dir = tempfile.mkdtemp(suffix="_b2w")
bdw.extractall(dir, members)
# egg2wheel
abi = 'none'
pyver = info['pyver']
arch = (info['arch'] or 'any').replace('.', '_').replace('-', '_')
# Wininst installers always have arch even if they are not
# architecture-specific (because the format itself is).
# So, assume the content is architecture-neutral if root is purelib.
if root_is_purelib:
arch = 'any'
# If the installer is architecture-specific, it's almost certainly also
# CPython-specific.
if arch != 'any':
pyver = pyver.replace('py', 'cp')
wheel_name = '-'.join((dist_info, pyver, abi, arch))
if root_is_purelib:
bw = bdist_wheel(dist.Distribution())
else:
bw = _bdist_wheel_tag(dist.Distribution())
bw.root_is_pure = root_is_purelib
bw.python_tag = pyver
bw.plat_name_supplied = True
bw.plat_name = info['arch'] or 'any'
if not root_is_purelib:
bw.full_tag_supplied = True
bw.full_tag = (pyver, abi, arch)
dist_info_dir = os.path.join(dir, '%s.dist-info' % dist_info)
bw.egg2dist(os.path.join(dir, egginfo_name), dist_info_dir)
bw.write_wheelfile(dist_info_dir, generator='wininst2wheel')
wheel_path = os.path.join(dest_dir, wheel_name)
with WheelFile(wheel_path, 'w') as wf:
wf.write_files(dir)
shutil.rmtree(dir)
def convert(files, dest_dir, verbose):
# Only support wheel convert if pkg_resources is present
require_pkgresources('wheel convert')
for pat in files:
for installer in iglob(pat):
if os.path.splitext(installer)[1] == '.egg':
conv = egg2wheel
else:
conv = wininst2wheel
if verbose:
print("{}... ".format(installer))
sys.stdout.flush()
conv(installer, dest_dir)
if verbose:
print("OK")
-82
View File
@@ -1,82 +0,0 @@
from __future__ import print_function
import os.path
import re
import sys
from pipenv.vendor.wheel.cli import WheelError
from pipenv.vendor.wheel.wheelfile import WheelFile
DIST_INFO_RE = re.compile(r"^(?P<namever>(?P<name>.+?)-(?P<ver>\d.*?))\.dist-info$")
BUILD_NUM_RE = re.compile(br'Build: (\d\w*)$')
def pack(directory, dest_dir, build_number):
"""Repack a previously unpacked wheel directory into a new wheel file.
The .dist-info/WHEEL file must contain one or more tags so that the target
wheel file name can be determined.
:param directory: The unpacked wheel directory
:param dest_dir: Destination directory (defaults to the current directory)
"""
# Find the .dist-info directory
dist_info_dirs = [fn for fn in os.listdir(directory)
if os.path.isdir(os.path.join(directory, fn)) and DIST_INFO_RE.match(fn)]
if len(dist_info_dirs) > 1:
raise WheelError('Multiple .dist-info directories found in {}'.format(directory))
elif not dist_info_dirs:
raise WheelError('No .dist-info directories found in {}'.format(directory))
# Determine the target wheel filename
dist_info_dir = dist_info_dirs[0]
name_version = DIST_INFO_RE.match(dist_info_dir).group('namever')
# Read the tags and the existing build number from .dist-info/WHEEL
existing_build_number = None
wheel_file_path = os.path.join(directory, dist_info_dir, 'WHEEL')
with open(wheel_file_path) as f:
tags = []
for line in f:
if line.startswith('Tag: '):
tags.append(line.split(' ')[1].rstrip())
elif line.startswith('Build: '):
existing_build_number = line.split(' ')[1].rstrip()
if not tags:
raise WheelError('No tags present in {}/WHEEL; cannot determine target wheel filename'
.format(dist_info_dir))
# Set the wheel file name and add/replace/remove the Build tag in .dist-info/WHEEL
build_number = build_number if build_number is not None else existing_build_number
if build_number is not None:
if build_number:
name_version += '-' + build_number
if build_number != existing_build_number:
replacement = ('Build: %s\r\n' % build_number).encode('ascii') if build_number else b''
with open(wheel_file_path, 'rb+') as f:
wheel_file_content = f.read()
wheel_file_content, num_replaced = BUILD_NUM_RE.subn(replacement,
wheel_file_content)
if not num_replaced:
wheel_file_content += replacement
f.seek(0)
f.truncate()
f.write(wheel_file_content)
# Reassemble the tags for the wheel file
impls = sorted({tag.split('-')[0] for tag in tags})
abivers = sorted({tag.split('-')[1] for tag in tags})
platforms = sorted({tag.split('-')[2] for tag in tags})
tagline = '-'.join(['.'.join(impls), '.'.join(abivers), '.'.join(platforms)])
# Repack the wheel
wheel_path = os.path.join(dest_dir, '{}-{}.whl'.format(name_version, tagline))
with WheelFile(wheel_path, 'w') as wf:
print("Repacking wheel as {}...".format(wheel_path), end='')
sys.stdout.flush()
wf.write_files(directory)
print('OK')
-25
View File
@@ -1,25 +0,0 @@
from __future__ import print_function
import os.path
import sys
from ..wheelfile import WheelFile
def unpack(path, dest='.'):
"""Unpack a wheel.
Wheel content will be unpacked to {dest}/{name}-{ver}, where {name}
is the package name and {ver} its version.
:param path: The path to the wheel.
:param dest: Destination directory (default to current directory).
"""
with WheelFile(path) as wf:
namever = wf.parsed_filename.group('namever')
destination = os.path.join(dest, namever)
print("Unpacking to: {}...".format(destination), end='')
sys.stdout.flush()
wf.extractall(destination)
print('OK')
-428
View File
@@ -1,428 +0,0 @@
"""
This module contains function to analyse dynamic library
headers to extract system information
Currently only for MacOSX
Library file on macosx system starts with Mach-O or Fat field.
This can be distinguish by first 32 bites and it is called magic number.
Proper value of magic number is with suffix _MAGIC. Suffix _CIGAM means
reversed bytes order.
Both fields can occur in two types: 32 and 64 bytes.
FAT field inform that this library contains few version of library
(typically for different types version). It contains
information where Mach-O headers starts.
Each section started with Mach-O header contains one library
(So if file starts with this field it contains only one version).
After filed Mach-O there are section fields.
Each of them starts with two fields:
cmd - magic number for this command
cmdsize - total size occupied by this section information.
In this case only sections LC_VERSION_MIN_MACOSX (for macosx 10.13 and earlier)
and LC_BUILD_VERSION (for macosx 10.14 and newer) are interesting,
because them contains information about minimal system version.
Important remarks:
- For fat files this implementation looks for maximum number version.
It not check if it is 32 or 64 and do not compare it with currently built package.
So it is possible to false report higher version that needed.
- All structures signatures are taken form macosx header files.
- I think that binary format will be more stable than `otool` output.
and if apple introduce some changes both implementation will need to be updated.
- The system compile will set the deployment target no lower than
11.0 for arm64 builds. For "Universal 2" builds use the x86_64 deployment
target when the arm64 target is 11.0.
"""
import ctypes
import os
import sys
"""here the needed const and struct from mach-o header files"""
FAT_MAGIC = 0xcafebabe
FAT_CIGAM = 0xbebafeca
FAT_MAGIC_64 = 0xcafebabf
FAT_CIGAM_64 = 0xbfbafeca
MH_MAGIC = 0xfeedface
MH_CIGAM = 0xcefaedfe
MH_MAGIC_64 = 0xfeedfacf
MH_CIGAM_64 = 0xcffaedfe
LC_VERSION_MIN_MACOSX = 0x24
LC_BUILD_VERSION = 0x32
CPU_TYPE_ARM64 = 0x0100000c
mach_header_fields = [
("magic", ctypes.c_uint32), ("cputype", ctypes.c_int),
("cpusubtype", ctypes.c_int), ("filetype", ctypes.c_uint32),
("ncmds", ctypes.c_uint32), ("sizeofcmds", ctypes.c_uint32),
("flags", ctypes.c_uint32)
]
"""
struct mach_header {
uint32_t magic; /* mach magic number identifier */
cpu_type_t cputype; /* cpu specifier */
cpu_subtype_t cpusubtype; /* machine specifier */
uint32_t filetype; /* type of file */
uint32_t ncmds; /* number of load commands */
uint32_t sizeofcmds; /* the size of all the load commands */
uint32_t flags; /* flags */
};
typedef integer_t cpu_type_t;
typedef integer_t cpu_subtype_t;
"""
mach_header_fields_64 = mach_header_fields + [("reserved", ctypes.c_uint32)]
"""
struct mach_header_64 {
uint32_t magic; /* mach magic number identifier */
cpu_type_t cputype; /* cpu specifier */
cpu_subtype_t cpusubtype; /* machine specifier */
uint32_t filetype; /* type of file */
uint32_t ncmds; /* number of load commands */
uint32_t sizeofcmds; /* the size of all the load commands */
uint32_t flags; /* flags */
uint32_t reserved; /* reserved */
};
"""
fat_header_fields = [("magic", ctypes.c_uint32), ("nfat_arch", ctypes.c_uint32)]
"""
struct fat_header {
uint32_t magic; /* FAT_MAGIC or FAT_MAGIC_64 */
uint32_t nfat_arch; /* number of structs that follow */
};
"""
fat_arch_fields = [
("cputype", ctypes.c_int), ("cpusubtype", ctypes.c_int),
("offset", ctypes.c_uint32), ("size", ctypes.c_uint32),
("align", ctypes.c_uint32)
]
"""
struct fat_arch {
cpu_type_t cputype; /* cpu specifier (int) */
cpu_subtype_t cpusubtype; /* machine specifier (int) */
uint32_t offset; /* file offset to this object file */
uint32_t size; /* size of this object file */
uint32_t align; /* alignment as a power of 2 */
};
"""
fat_arch_64_fields = [
("cputype", ctypes.c_int), ("cpusubtype", ctypes.c_int),
("offset", ctypes.c_uint64), ("size", ctypes.c_uint64),
("align", ctypes.c_uint32), ("reserved", ctypes.c_uint32)
]
"""
struct fat_arch_64 {
cpu_type_t cputype; /* cpu specifier (int) */
cpu_subtype_t cpusubtype; /* machine specifier (int) */
uint64_t offset; /* file offset to this object file */
uint64_t size; /* size of this object file */
uint32_t align; /* alignment as a power of 2 */
uint32_t reserved; /* reserved */
};
"""
segment_base_fields = [("cmd", ctypes.c_uint32), ("cmdsize", ctypes.c_uint32)]
"""base for reading segment info"""
segment_command_fields = [
("cmd", ctypes.c_uint32), ("cmdsize", ctypes.c_uint32),
("segname", ctypes.c_char * 16), ("vmaddr", ctypes.c_uint32),
("vmsize", ctypes.c_uint32), ("fileoff", ctypes.c_uint32),
("filesize", ctypes.c_uint32), ("maxprot", ctypes.c_int),
("initprot", ctypes.c_int), ("nsects", ctypes.c_uint32),
("flags", ctypes.c_uint32),
]
"""
struct segment_command { /* for 32-bit architectures */
uint32_t cmd; /* LC_SEGMENT */
uint32_t cmdsize; /* includes sizeof section structs */
char segname[16]; /* segment name */
uint32_t vmaddr; /* memory address of this segment */
uint32_t vmsize; /* memory size of this segment */
uint32_t fileoff; /* file offset of this segment */
uint32_t filesize; /* amount to map from the file */
vm_prot_t maxprot; /* maximum VM protection */
vm_prot_t initprot; /* initial VM protection */
uint32_t nsects; /* number of sections in segment */
uint32_t flags; /* flags */
};
typedef int vm_prot_t;
"""
segment_command_fields_64 = [
("cmd", ctypes.c_uint32), ("cmdsize", ctypes.c_uint32),
("segname", ctypes.c_char * 16), ("vmaddr", ctypes.c_uint64),
("vmsize", ctypes.c_uint64), ("fileoff", ctypes.c_uint64),
("filesize", ctypes.c_uint64), ("maxprot", ctypes.c_int),
("initprot", ctypes.c_int), ("nsects", ctypes.c_uint32),
("flags", ctypes.c_uint32),
]
"""
struct segment_command_64 { /* for 64-bit architectures */
uint32_t cmd; /* LC_SEGMENT_64 */
uint32_t cmdsize; /* includes sizeof section_64 structs */
char segname[16]; /* segment name */
uint64_t vmaddr; /* memory address of this segment */
uint64_t vmsize; /* memory size of this segment */
uint64_t fileoff; /* file offset of this segment */
uint64_t filesize; /* amount to map from the file */
vm_prot_t maxprot; /* maximum VM protection */
vm_prot_t initprot; /* initial VM protection */
uint32_t nsects; /* number of sections in segment */
uint32_t flags; /* flags */
};
"""
version_min_command_fields = segment_base_fields + \
[("version", ctypes.c_uint32), ("sdk", ctypes.c_uint32)]
"""
struct version_min_command {
uint32_t cmd; /* LC_VERSION_MIN_MACOSX or
LC_VERSION_MIN_IPHONEOS or
LC_VERSION_MIN_WATCHOS or
LC_VERSION_MIN_TVOS */
uint32_t cmdsize; /* sizeof(struct min_version_command) */
uint32_t version; /* X.Y.Z is encoded in nibbles xxxx.yy.zz */
uint32_t sdk; /* X.Y.Z is encoded in nibbles xxxx.yy.zz */
};
"""
build_version_command_fields = segment_base_fields + \
[("platform", ctypes.c_uint32), ("minos", ctypes.c_uint32),
("sdk", ctypes.c_uint32), ("ntools", ctypes.c_uint32)]
"""
struct build_version_command {
uint32_t cmd; /* LC_BUILD_VERSION */
uint32_t cmdsize; /* sizeof(struct build_version_command) plus */
/* ntools * sizeof(struct build_tool_version) */
uint32_t platform; /* platform */
uint32_t minos; /* X.Y.Z is encoded in nibbles xxxx.yy.zz */
uint32_t sdk; /* X.Y.Z is encoded in nibbles xxxx.yy.zz */
uint32_t ntools; /* number of tool entries following this */
};
"""
def swap32(x):
return (((x << 24) & 0xFF000000) |
((x << 8) & 0x00FF0000) |
((x >> 8) & 0x0000FF00) |
((x >> 24) & 0x000000FF))
def get_base_class_and_magic_number(lib_file, seek=None):
if seek is None:
seek = lib_file.tell()
else:
lib_file.seek(seek)
magic_number = ctypes.c_uint32.from_buffer_copy(
lib_file.read(ctypes.sizeof(ctypes.c_uint32))).value
# Handle wrong byte order
if magic_number in [FAT_CIGAM, FAT_CIGAM_64, MH_CIGAM, MH_CIGAM_64]:
if sys.byteorder == "little":
BaseClass = ctypes.BigEndianStructure
else:
BaseClass = ctypes.LittleEndianStructure
magic_number = swap32(magic_number)
else:
BaseClass = ctypes.Structure
lib_file.seek(seek)
return BaseClass, magic_number
def read_data(struct_class, lib_file):
return struct_class.from_buffer_copy(lib_file.read(
ctypes.sizeof(struct_class)))
def extract_macosx_min_system_version(path_to_lib):
with open(path_to_lib, "rb") as lib_file:
BaseClass, magic_number = get_base_class_and_magic_number(lib_file, 0)
if magic_number not in [FAT_MAGIC, FAT_MAGIC_64, MH_MAGIC, MH_MAGIC_64]:
return
if magic_number in [FAT_MAGIC, FAT_CIGAM_64]:
class FatHeader(BaseClass):
_fields_ = fat_header_fields
fat_header = read_data(FatHeader, lib_file)
if magic_number == FAT_MAGIC:
class FatArch(BaseClass):
_fields_ = fat_arch_fields
else:
class FatArch(BaseClass):
_fields_ = fat_arch_64_fields
fat_arch_list = [read_data(FatArch, lib_file) for _ in range(fat_header.nfat_arch)]
versions_list = []
for el in fat_arch_list:
try:
version = read_mach_header(lib_file, el.offset)
if version is not None:
if el.cputype == CPU_TYPE_ARM64 and len(fat_arch_list) != 1:
# Xcode will not set the deployment target below 11.0.0
# for the arm64 architecture. Ignore the arm64 deployment
# in fat binaries when the target is 11.0.0, that way
# the other architectures can select a lower deployment
# target.
# This is safe because there is no arm64 variant for
# macOS 10.15 or earlier.
if version == (11, 0, 0):
continue
versions_list.append(version)
except ValueError:
pass
if len(versions_list) > 0:
return max(versions_list)
else:
return None
else:
try:
return read_mach_header(lib_file, 0)
except ValueError:
"""when some error during read library files"""
return None
def read_mach_header(lib_file, seek=None):
"""
This funcition parse mach-O header and extract
information about minimal system version
:param lib_file: reference to opened library file with pointer
"""
if seek is not None:
lib_file.seek(seek)
base_class, magic_number = get_base_class_and_magic_number(lib_file)
arch = "32" if magic_number == MH_MAGIC else "64"
class SegmentBase(base_class):
_fields_ = segment_base_fields
if arch == "32":
class MachHeader(base_class):
_fields_ = mach_header_fields
else:
class MachHeader(base_class):
_fields_ = mach_header_fields_64
mach_header = read_data(MachHeader, lib_file)
for _i in range(mach_header.ncmds):
pos = lib_file.tell()
segment_base = read_data(SegmentBase, lib_file)
lib_file.seek(pos)
if segment_base.cmd == LC_VERSION_MIN_MACOSX:
class VersionMinCommand(base_class):
_fields_ = version_min_command_fields
version_info = read_data(VersionMinCommand, lib_file)
return parse_version(version_info.version)
elif segment_base.cmd == LC_BUILD_VERSION:
class VersionBuild(base_class):
_fields_ = build_version_command_fields
version_info = read_data(VersionBuild, lib_file)
return parse_version(version_info.minos)
else:
lib_file.seek(pos + segment_base.cmdsize)
continue
def parse_version(version):
x = (version & 0xffff0000) >> 16
y = (version & 0x0000ff00) >> 8
z = (version & 0x000000ff)
return x, y, z
def calculate_macosx_platform_tag(archive_root, platform_tag):
"""
Calculate proper macosx platform tag basing on files which are included to wheel
Example platform tag `macosx-10.14-x86_64`
"""
prefix, base_version, suffix = platform_tag.split('-')
base_version = tuple([int(x) for x in base_version.split(".")])
base_version = base_version[:2]
if base_version[0] > 10:
base_version = (base_version[0], 0)
assert len(base_version) == 2
if "MACOSX_DEPLOYMENT_TARGET" in os.environ:
deploy_target = tuple([int(x) for x in os.environ[
"MACOSX_DEPLOYMENT_TARGET"].split(".")])
deploy_target = deploy_target[:2]
if deploy_target[0] > 10:
deploy_target = (deploy_target[0], 0)
if deploy_target < base_version:
sys.stderr.write(
"[WARNING] MACOSX_DEPLOYMENT_TARGET is set to a lower value ({}) than the "
"version on which the Python interpreter was compiled ({}), and will be "
"ignored.\n".format('.'.join(str(x) for x in deploy_target),
'.'.join(str(x) for x in base_version))
)
else:
base_version = deploy_target
assert len(base_version) == 2
start_version = base_version
versions_dict = {}
for (dirpath, dirnames, filenames) in os.walk(archive_root):
for filename in filenames:
if filename.endswith('.dylib') or filename.endswith('.so'):
lib_path = os.path.join(dirpath, filename)
min_ver = extract_macosx_min_system_version(lib_path)
if min_ver is not None:
min_ver = min_ver[0:2]
if min_ver[0] > 10:
min_ver = (min_ver[0], 0)
versions_dict[lib_path] = min_ver
if len(versions_dict) > 0:
base_version = max(base_version, max(versions_dict.values()))
# macosx platform tag do not support minor bugfix release
fin_base_version = "_".join([str(x) for x in base_version])
if start_version < base_version:
problematic_files = [k for k, v in versions_dict.items() if v > start_version]
problematic_files = "\n".join(problematic_files)
if len(problematic_files) == 1:
files_form = "this file"
else:
files_form = "these files"
error_message = \
"[WARNING] This wheel needs a higher macOS version than {} " \
"To silence this warning, set MACOSX_DEPLOYMENT_TARGET to at least " +\
fin_base_version + " or recreate " + files_form + " with lower " \
"MACOSX_DEPLOYMENT_TARGET: \n" + problematic_files
if "MACOSX_DEPLOYMENT_TARGET" in os.environ:
error_message = error_message.format("is set in MACOSX_DEPLOYMENT_TARGET variable.")
else:
error_message = error_message.format(
"the version your Python interpreter is compiled against.")
sys.stderr.write(error_message)
platform_tag = prefix + "_" + fin_base_version + "_" + suffix
return platform_tag
-133
View File
@@ -1,133 +0,0 @@
"""
Tools for converting old- to new-style metadata.
"""
import os.path
import textwrap
import pipenv.patched.pip._vendor.pkg_resources as pkg_resources
from .pkginfo import read_pkg_info
def requires_to_requires_dist(requirement):
"""Return the version specifier for a requirement in PEP 345/566 fashion."""
if getattr(requirement, 'url', None):
return " @ " + requirement.url
requires_dist = []
for op, ver in requirement.specs:
requires_dist.append(op + ver)
if not requires_dist:
return ''
return " (%s)" % ','.join(sorted(requires_dist))
def convert_requirements(requirements):
"""Yield Requires-Dist: strings for parsed requirements strings."""
for req in requirements:
parsed_requirement = pkg_resources.Requirement.parse(req)
spec = requires_to_requires_dist(parsed_requirement)
extras = ",".join(sorted(parsed_requirement.extras))
if extras:
extras = "[%s]" % extras
yield (parsed_requirement.project_name + extras + spec)
def generate_requirements(extras_require):
"""
Convert requirements from a setup()-style dictionary to ('Requires-Dist', 'requirement')
and ('Provides-Extra', 'extra') tuples.
extras_require is a dictionary of {extra: [requirements]} as passed to setup(),
using the empty extra {'': [requirements]} to hold install_requires.
"""
for extra, depends in extras_require.items():
condition = ''
extra = extra or ''
if ':' in extra: # setuptools extra:condition syntax
extra, condition = extra.split(':', 1)
extra = pkg_resources.safe_extra(extra)
if extra:
yield 'Provides-Extra', extra
if condition:
condition = "(" + condition + ") and "
condition += "extra == '%s'" % extra
if condition:
condition = ' ; ' + condition
for new_req in convert_requirements(depends):
yield 'Requires-Dist', new_req + condition
def pkginfo_to_metadata(egg_info_path, pkginfo_path):
"""
Convert .egg-info directory with PKG-INFO to the Metadata 2.1 format
"""
pkg_info = read_pkg_info(pkginfo_path)
pkg_info.replace_header('Metadata-Version', '2.1')
# Those will be regenerated from `requires.txt`.
del pkg_info['Provides-Extra']
del pkg_info['Requires-Dist']
requires_path = os.path.join(egg_info_path, 'requires.txt')
if os.path.exists(requires_path):
with open(requires_path) as requires_file:
requires = requires_file.read()
parsed_requirements = sorted(pkg_resources.split_sections(requires),
key=lambda x: x[0] or '')
for extra, reqs in parsed_requirements:
for key, value in generate_requirements({extra: reqs}):
if (key, value) not in pkg_info.items():
pkg_info[key] = value
description = pkg_info['Description']
if description:
pkg_info.set_payload(dedent_description(pkg_info))
del pkg_info['Description']
return pkg_info
def pkginfo_unicode(pkg_info, field):
"""Hack to coax Unicode out of an email Message() - Python 3.3+"""
text = pkg_info[field]
field = field.lower()
if not isinstance(text, str):
for item in pkg_info.raw_items():
if item[0].lower() == field:
text = item[1].encode('ascii', 'surrogateescape') \
.decode('utf-8')
break
return text
def dedent_description(pkg_info):
"""
Dedent and convert pkg_info['Description'] to Unicode.
"""
description = pkg_info['Description']
# Python 3 Unicode handling, sorta.
surrogates = False
if not isinstance(description, str):
surrogates = True
description = pkginfo_unicode(pkg_info, 'Description')
description_lines = description.splitlines()
description_dedent = '\n'.join(
# if the first line of long_description is blank,
# the first line here will be indented.
(description_lines[0].lstrip(),
textwrap.dedent('\n'.join(description_lines[1:])),
'\n'))
if surrogates:
description_dedent = description_dedent \
.encode("utf8") \
.decode("ascii", "surrogateescape")
return description_dedent
-43
View File
@@ -1,43 +0,0 @@
"""Tools for reading and writing PKG-INFO / METADATA without caring
about the encoding."""
from email.parser import Parser
try:
unicode
_PY3 = False
except NameError:
_PY3 = True
if not _PY3:
from email.generator import Generator
def read_pkg_info_bytes(bytestr):
return Parser().parsestr(bytestr)
def read_pkg_info(path):
with open(path, "r") as headers:
message = Parser().parse(headers)
return message
def write_pkg_info(path, message):
with open(path, 'w') as metadata:
Generator(metadata, mangle_from_=False, maxheaderlen=0).flatten(message)
else:
from email.generator import BytesGenerator
def read_pkg_info_bytes(bytestr):
headers = bytestr.decode(encoding="ascii", errors="surrogateescape")
message = Parser().parsestr(headers)
return message
def read_pkg_info(path):
with open(path, "r",
encoding="ascii",
errors="surrogateescape") as headers:
message = Parser().parse(headers)
return message
def write_pkg_info(path, message):
with open(path, "wb") as out:
BytesGenerator(out, mangle_from_=False, maxheaderlen=0).flatten(message)
-46
View File
@@ -1,46 +0,0 @@
import base64
import io
import sys
if sys.version_info[0] < 3:
text_type = unicode # noqa: F821
StringIO = io.BytesIO
def native(s, encoding='utf-8'):
if isinstance(s, unicode): # noqa: F821
return s.encode(encoding)
return s
else:
text_type = str
StringIO = io.StringIO
def native(s, encoding='utf-8'):
if isinstance(s, bytes):
return s.decode(encoding)
return s
def urlsafe_b64encode(data):
"""urlsafe_b64encode without padding"""
return base64.urlsafe_b64encode(data).rstrip(b'=')
def urlsafe_b64decode(data):
"""urlsafe_b64decode without padding"""
pad = b'=' * (4 - (len(data) & 3))
return base64.urlsafe_b64decode(data + pad)
def as_unicode(s):
if isinstance(s, bytes):
return s.decode('utf-8')
return s
def as_bytes(s):
if isinstance(s, text_type):
return s.encode('utf-8')
return s
View File
-48
View File
@@ -1,48 +0,0 @@
"""For neatly implementing static typing in packaging.
`mypy` - the static type analysis tool we use - uses the `typing` module, which
provides core functionality fundamental to mypy's functioning.
Generally, `typing` would be imported at runtime and used in that fashion -
it acts as a no-op at runtime and does not have any run-time overhead by
design.
As it turns out, `typing` is not vendorable - it uses separate sources for
Python 2/Python 3. Thus, this codebase can not expect it to be present.
To work around this, mypy allows the typing import to be behind a False-y
optional to prevent it from running at runtime and type-comments can be used
to remove the need for the types to be accessible directly during runtime.
This module provides the False-y guard in a nicely named fashion so that a
curious maintainer can reach here to read this.
In packaging, all static-typing related imports should be guarded as follows:
from pipenv.patched.pip._vendor.packaging._typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import ...
Ref: https://github.com/python/mypy/issues/3216
"""
__all__ = ["TYPE_CHECKING", "cast"]
# The TYPE_CHECKING constant defined by the typing module is False at runtime
# but True while type checking.
if False: # pragma: no cover
from typing import TYPE_CHECKING
else:
TYPE_CHECKING = False
# typing's cast syntax requires calling typing.cast at runtime, but we don't
# want to import typing at runtime. Here, we inform the type checkers that
# we're importing `typing.cast` as `cast` and re-implement typing.cast's
# runtime behavior in a block that is ignored by type checkers.
if TYPE_CHECKING: # pragma: no cover
# not executed at runtime
from typing import cast
else:
# executed at runtime
def cast(type_, value): # noqa
return value
-866
View File
@@ -1,866 +0,0 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import absolute_import
import distutils.util
try:
from importlib.machinery import EXTENSION_SUFFIXES
except ImportError: # pragma: no cover
import imp
EXTENSION_SUFFIXES = [x[0] for x in imp.get_suffixes()]
del imp
import collections
import logging
import os
import platform
import re
import struct
import sys
import sysconfig
import warnings
from ._typing import TYPE_CHECKING, cast
if TYPE_CHECKING: # pragma: no cover
from typing import (
Dict,
FrozenSet,
IO,
Iterable,
Iterator,
List,
Optional,
Sequence,
Tuple,
Union,
)
PythonVersion = Sequence[int]
MacVersion = Tuple[int, int]
GlibcVersion = Tuple[int, int]
logger = logging.getLogger(__name__)
INTERPRETER_SHORT_NAMES = {
"python": "py", # Generic.
"cpython": "cp",
"pypy": "pp",
"ironpython": "ip",
"jython": "jy",
} # type: Dict[str, str]
_32_BIT_INTERPRETER = sys.maxsize <= 2 ** 32
_LEGACY_MANYLINUX_MAP = {
# CentOS 7 w/ glibc 2.17 (PEP 599)
(2, 17): "manylinux2014",
# CentOS 6 w/ glibc 2.12 (PEP 571)
(2, 12): "manylinux2010",
# CentOS 5 w/ glibc 2.5 (PEP 513)
(2, 5): "manylinux1",
}
# If glibc ever changes its major version, we need to know what the last
# minor version was, so we can build the complete list of all versions.
# For now, guess what the highest minor version might be, assume it will
# be 50 for testing. Once this actually happens, update the dictionary
# with the actual value.
_LAST_GLIBC_MINOR = collections.defaultdict(lambda: 50) # type: Dict[int, int]
glibcVersion = collections.namedtuple("Version", ["major", "minor"])
class Tag(object):
"""
A representation of the tag triple for a wheel.
Instances are considered immutable and thus are hashable. Equality checking
is also supported.
"""
__slots__ = ["_interpreter", "_abi", "_platform", "_hash"]
def __init__(self, interpreter, abi, platform):
# type: (str, str, str) -> None
self._interpreter = interpreter.lower()
self._abi = abi.lower()
self._platform = platform.lower()
# The __hash__ of every single element in a Set[Tag] will be evaluated each time
# that a set calls its `.disjoint()` method, which may be called hundreds of
# times when scanning a page of links for packages with tags matching that
# Set[Tag]. Pre-computing the value here produces significant speedups for
# downstream consumers.
self._hash = hash((self._interpreter, self._abi, self._platform))
@property
def interpreter(self):
# type: () -> str
return self._interpreter
@property
def abi(self):
# type: () -> str
return self._abi
@property
def platform(self):
# type: () -> str
return self._platform
def __eq__(self, other):
# type: (object) -> bool
if not isinstance(other, Tag):
return NotImplemented
return (
(self.platform == other.platform)
and (self.abi == other.abi)
and (self.interpreter == other.interpreter)
)
def __hash__(self):
# type: () -> int
return self._hash
def __str__(self):
# type: () -> str
return "{}-{}-{}".format(self._interpreter, self._abi, self._platform)
def __repr__(self):
# type: () -> str
return "<{self} @ {self_id}>".format(self=self, self_id=id(self))
def parse_tag(tag):
# type: (str) -> FrozenSet[Tag]
"""
Parses the provided tag (e.g. `py3-none-any`) into a frozenset of Tag instances.
Returning a set is required due to the possibility that the tag is a
compressed tag set.
"""
tags = set()
interpreters, abis, platforms = tag.split("-")
for interpreter in interpreters.split("."):
for abi in abis.split("."):
for platform_ in platforms.split("."):
tags.add(Tag(interpreter, abi, platform_))
return frozenset(tags)
def _warn_keyword_parameter(func_name, kwargs):
# type: (str, Dict[str, bool]) -> bool
"""
Backwards-compatibility with Python 2.7 to allow treating 'warn' as keyword-only.
"""
if not kwargs:
return False
elif len(kwargs) > 1 or "warn" not in kwargs:
kwargs.pop("warn", None)
arg = next(iter(kwargs.keys()))
raise TypeError(
"{}() got an unexpected keyword argument {!r}".format(func_name, arg)
)
return kwargs["warn"]
def _get_config_var(name, warn=False):
# type: (str, bool) -> Union[int, str, None]
value = sysconfig.get_config_var(name)
if value is None and warn:
logger.debug(
"Config variable '%s' is unset, Python ABI tag may be incorrect", name
)
return value
def _normalize_string(string):
# type: (str) -> str
return string.replace(".", "_").replace("-", "_")
def _abi3_applies(python_version):
# type: (PythonVersion) -> bool
"""
Determine if the Python version supports abi3.
PEP 384 was first implemented in Python 3.2.
"""
return len(python_version) > 1 and tuple(python_version) >= (3, 2)
def _cpython_abis(py_version, warn=False):
# type: (PythonVersion, bool) -> List[str]
py_version = tuple(py_version) # To allow for version comparison.
abis = []
version = _version_nodot(py_version[:2])
debug = pymalloc = ucs4 = ""
with_debug = _get_config_var("Py_DEBUG", warn)
has_refcount = hasattr(sys, "gettotalrefcount")
# Windows doesn't set Py_DEBUG, so checking for support of debug-compiled
# extension modules is the best option.
# https://github.com/pypa/pip/issues/3383#issuecomment-173267692
has_ext = "_d.pyd" in EXTENSION_SUFFIXES
if with_debug or (with_debug is None and (has_refcount or has_ext)):
debug = "d"
if py_version < (3, 8):
with_pymalloc = _get_config_var("WITH_PYMALLOC", warn)
if with_pymalloc or with_pymalloc is None:
pymalloc = "m"
if py_version < (3, 3):
unicode_size = _get_config_var("Py_UNICODE_SIZE", warn)
if unicode_size == 4 or (
unicode_size is None and sys.maxunicode == 0x10FFFF
):
ucs4 = "u"
elif debug:
# Debug builds can also load "normal" extension modules.
# We can also assume no UCS-4 or pymalloc requirement.
abis.append("cp{version}".format(version=version))
abis.insert(
0,
"cp{version}{debug}{pymalloc}{ucs4}".format(
version=version, debug=debug, pymalloc=pymalloc, ucs4=ucs4
),
)
return abis
def cpython_tags(
python_version=None, # type: Optional[PythonVersion]
abis=None, # type: Optional[Iterable[str]]
platforms=None, # type: Optional[Iterable[str]]
**kwargs # type: bool
):
# type: (...) -> Iterator[Tag]
"""
Yields the tags for a CPython interpreter.
The tags consist of:
- cp<python_version>-<abi>-<platform>
- cp<python_version>-abi3-<platform>
- cp<python_version>-none-<platform>
- cp<less than python_version>-abi3-<platform> # Older Python versions down to 3.2.
If python_version only specifies a major version then user-provided ABIs and
the 'none' ABItag will be used.
If 'abi3' or 'none' are specified in 'abis' then they will be yielded at
their normal position and not at the beginning.
"""
warn = _warn_keyword_parameter("cpython_tags", kwargs)
if not python_version:
python_version = sys.version_info[:2]
interpreter = "cp{}".format(_version_nodot(python_version[:2]))
if abis is None:
if len(python_version) > 1:
abis = _cpython_abis(python_version, warn)
else:
abis = []
abis = list(abis)
# 'abi3' and 'none' are explicitly handled later.
for explicit_abi in ("abi3", "none"):
try:
abis.remove(explicit_abi)
except ValueError:
pass
platforms = list(platforms or _platform_tags())
for abi in abis:
for platform_ in platforms:
yield Tag(interpreter, abi, platform_)
if _abi3_applies(python_version):
for tag in (Tag(interpreter, "abi3", platform_) for platform_ in platforms):
yield tag
for tag in (Tag(interpreter, "none", platform_) for platform_ in platforms):
yield tag
if _abi3_applies(python_version):
for minor_version in range(python_version[1] - 1, 1, -1):
for platform_ in platforms:
interpreter = "cp{version}".format(
version=_version_nodot((python_version[0], minor_version))
)
yield Tag(interpreter, "abi3", platform_)
def _generic_abi():
# type: () -> Iterator[str]
abi = sysconfig.get_config_var("SOABI")
if abi:
yield _normalize_string(abi)
def generic_tags(
interpreter=None, # type: Optional[str]
abis=None, # type: Optional[Iterable[str]]
platforms=None, # type: Optional[Iterable[str]]
**kwargs # type: bool
):
# type: (...) -> Iterator[Tag]
"""
Yields the tags for a generic interpreter.
The tags consist of:
- <interpreter>-<abi>-<platform>
The "none" ABI will be added if it was not explicitly provided.
"""
warn = _warn_keyword_parameter("generic_tags", kwargs)
if not interpreter:
interp_name = interpreter_name()
interp_version = interpreter_version(warn=warn)
interpreter = "".join([interp_name, interp_version])
if abis is None:
abis = _generic_abi()
platforms = list(platforms or _platform_tags())
abis = list(abis)
if "none" not in abis:
abis.append("none")
for abi in abis:
for platform_ in platforms:
yield Tag(interpreter, abi, platform_)
def _py_interpreter_range(py_version):
# type: (PythonVersion) -> Iterator[str]
"""
Yields Python versions in descending order.
After the latest version, the major-only version will be yielded, and then
all previous versions of that major version.
"""
if len(py_version) > 1:
yield "py{version}".format(version=_version_nodot(py_version[:2]))
yield "py{major}".format(major=py_version[0])
if len(py_version) > 1:
for minor in range(py_version[1] - 1, -1, -1):
yield "py{version}".format(version=_version_nodot((py_version[0], minor)))
def compatible_tags(
python_version=None, # type: Optional[PythonVersion]
interpreter=None, # type: Optional[str]
platforms=None, # type: Optional[Iterable[str]]
):
# type: (...) -> Iterator[Tag]
"""
Yields the sequence of tags that are compatible with a specific version of Python.
The tags consist of:
- py*-none-<platform>
- <interpreter>-none-any # ... if `interpreter` is provided.
- py*-none-any
"""
if not python_version:
python_version = sys.version_info[:2]
platforms = list(platforms or _platform_tags())
for version in _py_interpreter_range(python_version):
for platform_ in platforms:
yield Tag(version, "none", platform_)
if interpreter:
yield Tag(interpreter, "none", "any")
for version in _py_interpreter_range(python_version):
yield Tag(version, "none", "any")
def _mac_arch(arch, is_32bit=_32_BIT_INTERPRETER):
# type: (str, bool) -> str
if not is_32bit:
return arch
if arch.startswith("ppc"):
return "ppc"
return "i386"
def _mac_binary_formats(version, cpu_arch):
# type: (MacVersion, str) -> List[str]
formats = [cpu_arch]
if cpu_arch == "x86_64":
if version < (10, 4):
return []
formats.extend(["intel", "fat64", "fat32"])
elif cpu_arch == "i386":
if version < (10, 4):
return []
formats.extend(["intel", "fat32", "fat"])
elif cpu_arch == "ppc64":
# TODO: Need to care about 32-bit PPC for ppc64 through 10.2?
if version > (10, 5) or version < (10, 4):
return []
formats.append("fat64")
elif cpu_arch == "ppc":
if version > (10, 6):
return []
formats.extend(["fat32", "fat"])
if cpu_arch in {"arm64", "x86_64"}:
formats.append("universal2")
if cpu_arch in {"x86_64", "i386", "ppc64", "ppc", "intel"}:
formats.append("universal")
return formats
def mac_platforms(version=None, arch=None):
# type: (Optional[MacVersion], Optional[str]) -> Iterator[str]
"""
Yields the platform tags for a macOS system.
The `version` parameter is a two-item tuple specifying the macOS version to
generate platform tags for. The `arch` parameter is the CPU architecture to
generate platform tags for. Both parameters default to the appropriate value
for the current system.
"""
version_str, _, cpu_arch = platform.mac_ver() # type: ignore
if version is None:
version = cast("MacVersion", tuple(map(int, version_str.split(".")[:2])))
else:
version = version
if arch is None:
arch = _mac_arch(cpu_arch)
else:
arch = arch
if (10, 0) <= version and version < (11, 0):
# Prior to Mac OS 11, each yearly release of Mac OS bumped the
# "minor" version number. The major version was always 10.
for minor_version in range(version[1], -1, -1):
compat_version = 10, minor_version
binary_formats = _mac_binary_formats(compat_version, arch)
for binary_format in binary_formats:
yield "macosx_{major}_{minor}_{binary_format}".format(
major=10, minor=minor_version, binary_format=binary_format
)
if version >= (11, 0):
# Starting with Mac OS 11, each yearly release bumps the major version
# number. The minor versions are now the midyear updates.
for major_version in range(version[0], 10, -1):
compat_version = major_version, 0
binary_formats = _mac_binary_formats(compat_version, arch)
for binary_format in binary_formats:
yield "macosx_{major}_{minor}_{binary_format}".format(
major=major_version, minor=0, binary_format=binary_format
)
if version >= (11, 0):
# Mac OS 11 on x86_64 is compatible with binaries from previous releases.
# Arm64 support was introduced in 11.0, so no Arm binaries from previous
# releases exist.
#
# However, the "universal2" binary format can have a
# macOS version earlier than 11.0 when the x86_64 part of the binary supports
# that version of macOS.
if arch == "x86_64":
for minor_version in range(16, 3, -1):
compat_version = 10, minor_version
binary_formats = _mac_binary_formats(compat_version, arch)
for binary_format in binary_formats:
yield "macosx_{major}_{minor}_{binary_format}".format(
major=compat_version[0],
minor=compat_version[1],
binary_format=binary_format,
)
else:
for minor_version in range(16, 3, -1):
compat_version = 10, minor_version
binary_format = "universal2"
yield "macosx_{major}_{minor}_{binary_format}".format(
major=compat_version[0],
minor=compat_version[1],
binary_format=binary_format,
)
# From PEP 513, PEP 600
def _is_manylinux_compatible(name, arch, glibc_version):
# type: (str, str, GlibcVersion) -> bool
sys_glibc = _get_glibc_version()
if sys_glibc < glibc_version:
return False
# Check for presence of _manylinux module.
try:
import _manylinux # noqa
except ImportError:
pass
else:
if hasattr(_manylinux, "manylinux_compatible"):
result = _manylinux.manylinux_compatible(
glibc_version[0], glibc_version[1], arch
)
if result is not None:
return bool(result)
else:
if glibc_version == (2, 5):
if hasattr(_manylinux, "manylinux1_compatible"):
return bool(_manylinux.manylinux1_compatible)
if glibc_version == (2, 12):
if hasattr(_manylinux, "manylinux2010_compatible"):
return bool(_manylinux.manylinux2010_compatible)
if glibc_version == (2, 17):
if hasattr(_manylinux, "manylinux2014_compatible"):
return bool(_manylinux.manylinux2014_compatible)
return True
def _glibc_version_string():
# type: () -> Optional[str]
# Returns glibc version string, or None if not using glibc.
return _glibc_version_string_confstr() or _glibc_version_string_ctypes()
def _glibc_version_string_confstr():
# type: () -> Optional[str]
"""
Primary implementation of glibc_version_string using os.confstr.
"""
# os.confstr is quite a bit faster than ctypes.DLL. It's also less likely
# to be broken or missing. This strategy is used in the standard library
# platform module.
# https://github.com/python/cpython/blob/fcf1d003bf4f0100c9d0921ff3d70e1127ca1b71/Lib/platform.py#L175-L183
try:
# os.confstr("CS_GNU_LIBC_VERSION") returns a string like "glibc 2.17".
version_string = os.confstr( # type: ignore[attr-defined] # noqa: F821
"CS_GNU_LIBC_VERSION"
)
assert version_string is not None
_, version = version_string.split() # type: Tuple[str, str]
except (AssertionError, AttributeError, OSError, ValueError):
# os.confstr() or CS_GNU_LIBC_VERSION not available (or a bad value)...
return None
return version
def _glibc_version_string_ctypes():
# type: () -> Optional[str]
"""
Fallback implementation of glibc_version_string using ctypes.
"""
try:
import ctypes
except ImportError:
return None
# ctypes.CDLL(None) internally calls dlopen(NULL), and as the dlopen
# manpage says, "If filename is NULL, then the returned handle is for the
# main program". This way we can let the linker do the work to figure out
# which libc our process is actually using.
#
# We must also handle the special case where the executable is not a
# dynamically linked executable. This can occur when using musl libc,
# for example. In this situation, dlopen() will error, leading to an
# OSError. Interestingly, at least in the case of musl, there is no
# errno set on the OSError. The single string argument used to construct
# OSError comes from libc itself and is therefore not portable to
# hard code here. In any case, failure to call dlopen() means we
# can proceed, so we bail on our attempt.
try:
# Note: typeshed is wrong here so we are ignoring this line.
process_namespace = ctypes.CDLL(None) # type: ignore
except OSError:
return None
try:
gnu_get_libc_version = process_namespace.gnu_get_libc_version
except AttributeError:
# Symbol doesn't exist -> therefore, we are not linked to
# glibc.
return None
# Call gnu_get_libc_version, which returns a string like "2.5"
gnu_get_libc_version.restype = ctypes.c_char_p
version_str = gnu_get_libc_version() # type: str
# py2 / py3 compatibility:
if not isinstance(version_str, str):
version_str = version_str.decode("ascii")
return version_str
def _parse_glibc_version(version_str):
# type: (str) -> Tuple[int, int]
# Parse glibc version.
#
# We use a regexp instead of str.split because we want to discard any
# random junk that might come after the minor version -- this might happen
# in patched/forked versions of glibc (e.g. Linaro's version of glibc
# uses version strings like "2.20-2014.11"). See gh-3588.
m = re.match(r"(?P<major>[0-9]+)\.(?P<minor>[0-9]+)", version_str)
if not m:
warnings.warn(
"Expected glibc version with 2 components major.minor,"
" got: %s" % version_str,
RuntimeWarning,
)
return -1, -1
return (int(m.group("major")), int(m.group("minor")))
_glibc_version = [] # type: List[Tuple[int, int]]
def _get_glibc_version():
# type: () -> Tuple[int, int]
if _glibc_version:
return _glibc_version[0]
version_str = _glibc_version_string()
if version_str is None:
_glibc_version.append((-1, -1))
else:
_glibc_version.append(_parse_glibc_version(version_str))
return _glibc_version[0]
# Python does not provide platform information at sufficient granularity to
# identify the architecture of the running executable in some cases, so we
# determine it dynamically by reading the information from the running
# process. This only applies on Linux, which uses the ELF format.
class _ELFFileHeader(object):
# https://en.wikipedia.org/wiki/Executable_and_Linkable_Format#File_header
class _InvalidELFFileHeader(ValueError):
"""
An invalid ELF file header was found.
"""
ELF_MAGIC_NUMBER = 0x7F454C46
ELFCLASS32 = 1
ELFCLASS64 = 2
ELFDATA2LSB = 1
ELFDATA2MSB = 2
EM_386 = 3
EM_S390 = 22
EM_ARM = 40
EM_X86_64 = 62
EF_ARM_ABIMASK = 0xFF000000
EF_ARM_ABI_VER5 = 0x05000000
EF_ARM_ABI_FLOAT_HARD = 0x00000400
def __init__(self, file):
# type: (IO[bytes]) -> None
def unpack(fmt):
# type: (str) -> int
try:
(result,) = struct.unpack(
fmt, file.read(struct.calcsize(fmt))
) # type: (int, )
except struct.error:
raise _ELFFileHeader._InvalidELFFileHeader()
return result
self.e_ident_magic = unpack(">I")
if self.e_ident_magic != self.ELF_MAGIC_NUMBER:
raise _ELFFileHeader._InvalidELFFileHeader()
self.e_ident_class = unpack("B")
if self.e_ident_class not in {self.ELFCLASS32, self.ELFCLASS64}:
raise _ELFFileHeader._InvalidELFFileHeader()
self.e_ident_data = unpack("B")
if self.e_ident_data not in {self.ELFDATA2LSB, self.ELFDATA2MSB}:
raise _ELFFileHeader._InvalidELFFileHeader()
self.e_ident_version = unpack("B")
self.e_ident_osabi = unpack("B")
self.e_ident_abiversion = unpack("B")
self.e_ident_pad = file.read(7)
format_h = "<H" if self.e_ident_data == self.ELFDATA2LSB else ">H"
format_i = "<I" if self.e_ident_data == self.ELFDATA2LSB else ">I"
format_q = "<Q" if self.e_ident_data == self.ELFDATA2LSB else ">Q"
format_p = format_i if self.e_ident_class == self.ELFCLASS32 else format_q
self.e_type = unpack(format_h)
self.e_machine = unpack(format_h)
self.e_version = unpack(format_i)
self.e_entry = unpack(format_p)
self.e_phoff = unpack(format_p)
self.e_shoff = unpack(format_p)
self.e_flags = unpack(format_i)
self.e_ehsize = unpack(format_h)
self.e_phentsize = unpack(format_h)
self.e_phnum = unpack(format_h)
self.e_shentsize = unpack(format_h)
self.e_shnum = unpack(format_h)
self.e_shstrndx = unpack(format_h)
def _get_elf_header():
# type: () -> Optional[_ELFFileHeader]
try:
with open(sys.executable, "rb") as f:
elf_header = _ELFFileHeader(f)
except (IOError, OSError, TypeError, _ELFFileHeader._InvalidELFFileHeader):
return None
return elf_header
def _is_linux_armhf():
# type: () -> bool
# hard-float ABI can be detected from the ELF header of the running
# process
# https://static.docs.arm.com/ihi0044/g/aaelf32.pdf
elf_header = _get_elf_header()
if elf_header is None:
return False
result = elf_header.e_ident_class == elf_header.ELFCLASS32
result &= elf_header.e_ident_data == elf_header.ELFDATA2LSB
result &= elf_header.e_machine == elf_header.EM_ARM
result &= (
elf_header.e_flags & elf_header.EF_ARM_ABIMASK
) == elf_header.EF_ARM_ABI_VER5
result &= (
elf_header.e_flags & elf_header.EF_ARM_ABI_FLOAT_HARD
) == elf_header.EF_ARM_ABI_FLOAT_HARD
return result
def _is_linux_i686():
# type: () -> bool
elf_header = _get_elf_header()
if elf_header is None:
return False
result = elf_header.e_ident_class == elf_header.ELFCLASS32
result &= elf_header.e_ident_data == elf_header.ELFDATA2LSB
result &= elf_header.e_machine == elf_header.EM_386
return result
def _have_compatible_manylinux_abi(arch):
# type: (str) -> bool
if arch == "armv7l":
return _is_linux_armhf()
if arch == "i686":
return _is_linux_i686()
return arch in {"x86_64", "aarch64", "ppc64", "ppc64le", "s390x"}
def _manylinux_tags(linux, arch):
# type: (str, str) -> Iterator[str]
# Oldest glibc to be supported regardless of architecture is (2, 17).
too_old_glibc2 = glibcVersion(2, 16)
if arch in {"x86_64", "i686"}:
# On x86/i686 also oldest glibc to be supported is (2, 5).
too_old_glibc2 = glibcVersion(2, 4)
current_glibc = glibcVersion(*_get_glibc_version())
glibc_max_list = [current_glibc]
# We can assume compatibility across glibc major versions.
# https://sourceware.org/bugzilla/show_bug.cgi?id=24636
#
# Build a list of maximum glibc versions so that we can
# output the canonical list of all glibc from current_glibc
# down to too_old_glibc2, including all intermediary versions.
for glibc_major in range(current_glibc.major - 1, 1, -1):
glibc_max_list.append(glibcVersion(glibc_major, _LAST_GLIBC_MINOR[glibc_major]))
for glibc_max in glibc_max_list:
if glibc_max.major == too_old_glibc2.major:
min_minor = too_old_glibc2.minor
else:
# For other glibc major versions oldest supported is (x, 0).
min_minor = -1
for glibc_minor in range(glibc_max.minor, min_minor, -1):
glibc_version = (glibc_max.major, glibc_minor)
tag = "manylinux_{}_{}".format(*glibc_version)
if _is_manylinux_compatible(tag, arch, glibc_version):
yield linux.replace("linux", tag)
# Handle the legacy manylinux1, manylinux2010, manylinux2014 tags.
if glibc_version in _LEGACY_MANYLINUX_MAP:
legacy_tag = _LEGACY_MANYLINUX_MAP[glibc_version]
if _is_manylinux_compatible(legacy_tag, arch, glibc_version):
yield linux.replace("linux", legacy_tag)
def _linux_platforms(is_32bit=_32_BIT_INTERPRETER):
# type: (bool) -> Iterator[str]
linux = _normalize_string(distutils.util.get_platform())
if is_32bit:
if linux == "linux_x86_64":
linux = "linux_i686"
elif linux == "linux_aarch64":
linux = "linux_armv7l"
_, arch = linux.split("_", 1)
if _have_compatible_manylinux_abi(arch):
for tag in _manylinux_tags(linux, arch):
yield tag
yield linux
def _generic_platforms():
# type: () -> Iterator[str]
yield _normalize_string(distutils.util.get_platform())
def _platform_tags():
# type: () -> Iterator[str]
"""
Provides the platform tags for this installation.
"""
if platform.system() == "Darwin":
return mac_platforms()
elif platform.system() == "Linux":
return _linux_platforms()
else:
return _generic_platforms()
def interpreter_name():
# type: () -> str
"""
Returns the name of the running interpreter.
"""
try:
name = sys.implementation.name # type: ignore
except AttributeError: # pragma: no cover
# Python 2.7 compatibility.
name = platform.python_implementation().lower()
return INTERPRETER_SHORT_NAMES.get(name) or name
def interpreter_version(**kwargs):
# type: (bool) -> str
"""
Returns the version of the running interpreter.
"""
warn = _warn_keyword_parameter("interpreter_version", kwargs)
version = _get_config_var("py_version_nodot", warn=warn)
if version:
version = str(version)
else:
version = _version_nodot(sys.version_info[:2])
return version
def _version_nodot(version):
# type: (PythonVersion) -> str
return "".join(map(str, version))
def sys_tags(**kwargs):
# type: (bool) -> Iterator[Tag]
"""
Returns the sequence of tag triples for the running interpreter.
The order of the sequence corresponds to priority order for the
interpreter, from most to least important.
"""
warn = _warn_keyword_parameter("sys_tags", kwargs)
interp_name = interpreter_name()
if interp_name == "cp":
for tag in cpython_tags(warn=warn):
yield tag
else:
for tag in generic_tags():
yield tag
for tag in compatible_tags():
yield tag
-181
View File
@@ -1,181 +0,0 @@
from __future__ import print_function
import csv
import hashlib
import os.path
import re
import stat
import sys
import time
from collections import OrderedDict
from distutils import log as logger
from zipfile import ZIP_DEFLATED, ZipInfo, ZipFile
from pipenv.vendor.wheel.cli import WheelError
from pipenv.vendor.wheel.util import urlsafe_b64decode, as_unicode, native, urlsafe_b64encode, as_bytes, StringIO
if sys.version_info >= (3,):
from io import TextIOWrapper
def read_csv(fp):
return csv.reader(TextIOWrapper(fp, newline='', encoding='utf-8'))
else:
def read_csv(fp):
for line in csv.reader(fp):
yield [column.decode('utf-8') for column in line]
# Non-greedy matching of an optional build number may be too clever (more
# invalid wheel filenames will match). Separate regex for .dist-info?
WHEEL_INFO_RE = re.compile(
r"""^(?P<namever>(?P<name>.+?)-(?P<ver>.+?))(-(?P<build>\d[^-]*))?
-(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?)\.whl$""",
re.VERBOSE)
def get_zipinfo_datetime(timestamp=None):
# Some applications need reproducible .whl files, but they can't do this without forcing
# the timestamp of the individual ZipInfo objects. See issue #143.
timestamp = int(os.environ.get('SOURCE_DATE_EPOCH', timestamp or time.time()))
return time.gmtime(timestamp)[0:6]
class WheelFile(ZipFile):
"""A ZipFile derivative class that also reads SHA-256 hashes from
.dist-info/RECORD and checks any read files against those.
"""
_default_algorithm = hashlib.sha256
def __init__(self, file, mode='r', compression=ZIP_DEFLATED):
basename = os.path.basename(file)
self.parsed_filename = WHEEL_INFO_RE.match(basename)
if not basename.endswith('.whl') or self.parsed_filename is None:
raise WheelError("Bad wheel filename {!r}".format(basename))
ZipFile.__init__(self, file, mode, compression=compression, allowZip64=True)
self.dist_info_path = '{}.dist-info'.format(self.parsed_filename.group('namever'))
self.record_path = self.dist_info_path + '/RECORD'
self._file_hashes = OrderedDict()
self._file_sizes = {}
if mode == 'r':
# Ignore RECORD and any embedded wheel signatures
self._file_hashes[self.record_path] = None, None
self._file_hashes[self.record_path + '.jws'] = None, None
self._file_hashes[self.record_path + '.p7s'] = None, None
# Fill in the expected hashes by reading them from RECORD
try:
record = self.open(self.record_path)
except KeyError:
raise WheelError('Missing {} file'.format(self.record_path))
with record:
for line in read_csv(record):
path, hash_sum, size = line
if not hash_sum:
continue
algorithm, hash_sum = hash_sum.split(u'=')
try:
hashlib.new(algorithm)
except ValueError:
raise WheelError('Unsupported hash algorithm: {}'.format(algorithm))
if algorithm.lower() in {'md5', 'sha1'}:
raise WheelError(
'Weak hash algorithm ({}) is not permitted by PEP 427'
.format(algorithm))
self._file_hashes[path] = (
algorithm, urlsafe_b64decode(hash_sum.encode('ascii')))
def open(self, name_or_info, mode="r", pwd=None):
def _update_crc(newdata, eof=None):
if eof is None:
eof = ef._eof
update_crc_orig(newdata)
else: # Python 2
update_crc_orig(newdata, eof)
running_hash.update(newdata)
if eof and running_hash.digest() != expected_hash:
raise WheelError("Hash mismatch for file '{}'".format(native(ef_name)))
ef_name = as_unicode(name_or_info.filename if isinstance(name_or_info, ZipInfo)
else name_or_info)
if mode == 'r' and not ef_name.endswith('/') and ef_name not in self._file_hashes:
raise WheelError("No hash found for file '{}'".format(native(ef_name)))
ef = ZipFile.open(self, name_or_info, mode, pwd)
if mode == 'r' and not ef_name.endswith('/'):
algorithm, expected_hash = self._file_hashes[ef_name]
if expected_hash is not None:
# Monkey patch the _update_crc method to also check for the hash from RECORD
running_hash = hashlib.new(algorithm)
update_crc_orig, ef._update_crc = ef._update_crc, _update_crc
return ef
def write_files(self, base_dir):
logger.info("creating '%s' and adding '%s' to it", self.filename, base_dir)
deferred = []
for root, dirnames, filenames in os.walk(base_dir):
# Sort the directory names so that `os.walk` will walk them in a
# defined order on the next iteration.
dirnames.sort()
for name in sorted(filenames):
path = os.path.normpath(os.path.join(root, name))
if os.path.isfile(path):
arcname = os.path.relpath(path, base_dir).replace(os.path.sep, '/')
if arcname == self.record_path:
pass
elif root.endswith('.dist-info'):
deferred.append((path, arcname))
else:
self.write(path, arcname)
deferred.sort()
for path, arcname in deferred:
self.write(path, arcname)
def write(self, filename, arcname=None, compress_type=None):
with open(filename, 'rb') as f:
st = os.fstat(f.fileno())
data = f.read()
zinfo = ZipInfo(arcname or filename, date_time=get_zipinfo_datetime(st.st_mtime))
zinfo.external_attr = (stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode)) << 16
zinfo.compress_type = compress_type or self.compression
self.writestr(zinfo, data, compress_type)
def writestr(self, zinfo_or_arcname, bytes, compress_type=None):
ZipFile.writestr(self, zinfo_or_arcname, bytes, compress_type)
fname = (zinfo_or_arcname.filename if isinstance(zinfo_or_arcname, ZipInfo)
else zinfo_or_arcname)
logger.info("adding '%s'", fname)
if fname != self.record_path:
hash_ = self._default_algorithm(bytes)
self._file_hashes[fname] = hash_.name, native(urlsafe_b64encode(hash_.digest()))
self._file_sizes[fname] = len(bytes)
def close(self):
# Write RECORD
if self.fp is not None and self.mode == 'w' and self._file_hashes:
data = StringIO()
writer = csv.writer(data, delimiter=',', quotechar='"', lineterminator='\n')
writer.writerows((
(
fname,
algorithm + "=" + hash_,
self._file_sizes[fname]
)
for fname, (algorithm, hash_) in self._file_hashes.items()
))
writer.writerow((format(self.record_path), "", ""))
zinfo = ZipInfo(native(self.record_path), date_time=get_zipinfo_datetime())
zinfo.compress_type = self.compression
zinfo.external_attr = 0o664 << 16
self.writestr(zinfo, as_bytes(data.getvalue()))
ZipFile.close(self)
+3
View File
@@ -315,6 +315,9 @@ def post_install_cleanup(ctx, vendor_dir):
drop_dir(vendor_dir / "bin")
drop_dir(vendor_dir / "tests")
drop_dir(vendor_dir / "shutil_backports")
drop_dir(vendor_dir / "cerberus" / "tests")
drop_dir(vendor_dir / "cerberus" / "benchmarks")
remove_all(vendor_dir.glob("toml.py"))