Update requirementslib

Signed-off-by: Dan Ryan <dan@danryan.co>
This commit is contained in:
Dan Ryan
2019-03-04 01:07:18 -05:00
parent de4d659eac
commit 374086e318
4 changed files with 242 additions and 169 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
__version__ = '1.4.1'
__version__ = '1.4.2'
import logging
import warnings
+90 -83
View File
@@ -41,6 +41,7 @@ from vistir.path import (
)
from .setup_info import SetupInfo, _prepare_wheel_building_kwargs
from .url import URI
from .utils import (
DIRECT_URL_RE,
HASH_STRING,
@@ -87,6 +88,7 @@ if MYPY_RUNNING:
Union,
Any,
Tuple,
Sequence,
Set,
AnyStr,
Text,
@@ -114,6 +116,9 @@ if MYPY_RUNNING:
BASE_TYPES = Union[bool, STRING_TYPE, Tuple[STRING_TYPE, ...]]
CUSTOM_TYPES = Union[VCSRepository, RequirementType, SetupInfo, "Line"]
CREATION_ARG_TYPES = Union[BASE_TYPES, Link, CUSTOM_TYPES]
PIPFILE_ENTRY_TYPE = Union[STRING_TYPE, bool, Tuple[STRING_TYPE], List[STRING_TYPE]]
PIPFILE_TYPE = Union[STRING_TYPE, Dict[STRING_TYPE, PIPFILE_ENTRY_TYPE]]
TPIPFILE = Dict[STRING_TYPE, PIPFILE_ENTRY_TYPE]
SPECIFIERS_BY_LENGTH = sorted(list(Specifier._operators.keys()), key=len, reverse=True)
@@ -249,10 +254,21 @@ class Line(object):
line = os.path.abspath(self.base_path)
else:
if DIRECT_URL_RE.match(self.line):
uri = URI.parse(self.line)
line = uri.full_url
self._requirement = init_requirement(self.line)
line = convert_direct_url_to_url(self.line)
else:
line = self.link.url
if self.link:
line = self.link.url
else:
try:
uri = URI.parse(line)
except ValueError:
line = line
else:
line = uri.base_url
self._link = uri.as_link
if self.editable:
if not line:
@@ -433,30 +449,35 @@ class Line(object):
def pyproject_requires(self):
# type: () -> Optional[Tuple[STRING_TYPE, ...]]
if self._pyproject_requires is None and self.pyproject_toml is not None:
pyproject_requires, pyproject_backend = get_pyproject(
self.path
) # type: ignore
if pyproject_requires:
self._pyproject_requires = tuple(pyproject_requires)
self._pyproject_backend = pyproject_backend
if self.path is not None:
pyproject_requires, pyproject_backend = None, None
pyproject_results = get_pyproject(self.path) # type: ignore
if pyproject_results:
pyproject_requires, pyproject_backend = pyproject_results
if pyproject_requires:
self._pyproject_requires = tuple(pyproject_requires)
self._pyproject_backend = pyproject_backend
return self._pyproject_requires
@property
def pyproject_backend(self):
# type: () -> Optional[STRING_TYPE]
if self._pyproject_requires is None and self.pyproject_toml is not None:
pyproject_requires, pyproject_backend = get_pyproject(
self.path
) # type: ignore
pyproject_requires = None # type: Optional[Sequence[STRING_TYPE]]
pyproject_backend = None # type: Optional[STRING_TYPE]
pyproject_results = get_pyproject(self.path) # type: ignore
if pyproject_results:
pyproject_requires, pyproject_backend = pyproject_results
if not pyproject_backend and self.setup_cfg is not None:
setup_dict = SetupInfo.get_setup_cfg(self.setup_cfg)
pyproject_backend = get_default_pyproject_backend()
pyproject_requires = setup_dict.get(
"build_requires", ["setuptools", "wheel"]
) # type: ignore
self._pyproject_requires = tuple(pyproject_requires)
self._pyproject_backend = pyproject_backend
if pyproject_requires:
self._pyproject_requires = tuple(pyproject_requires)
if pyproject_backend:
self._pyproject_backend = pyproject_backend
return self._pyproject_backend
def parse_hashes(self):
@@ -480,45 +501,16 @@ class Line(object):
"""
extras = None
url = "" # type: STRING_TYPE
if "@" in self.line or self.is_vcs or self.is_url:
line = "{0}".format(self.line)
match = DIRECT_URL_RE.match(line)
if match is None:
match = URL_RE.match(line)
else:
self.is_direct_url = True
if match is not None:
match_dict = match.groupdict()
name = match_dict.get("name")
extras = match_dict.get("extras")
scheme = match_dict.get("scheme")
host = match_dict.get("host")
path = match_dict.get("path")
ref = match_dict.get("ref")
subdir = match_dict.get("subdirectory")
pathsep = match_dict.get("pathsep", "/")
if scheme is not None:
url = scheme
if host:
url = "{0}{1}".format(url, host)
if path:
url = "{0}{1}{2}".format(url, pathsep, path)
if self.is_vcs and ref:
url = "{0}@{1}".format(url, ref)
if name:
url = "{0}#egg={1}".format(url, name)
if extras:
url = "{0}{1}".format(url, extras)
elif is_file_url(url) and extras and not name and self.editable:
url = "{0}{1}{2}".format(pathsep, path, extras)
if subdir:
url = "{0}&subdirectory={1}".format(url, subdir)
elif extras and not path:
url = "{0}{1}".format(url, extras)
self.line = add_ssh_scheme_to_git_uri(url)
if name:
self._name = name
uri = URI.parse(line)
name = uri.name
if name:
self._name = name
if uri.host and uri.path and uri.scheme:
self.line = uri.to_string(
escape_password=False, direct=False, strip_ssh=uri.is_implicit_ssh
)
else:
self.line, extras = pip_shims.shims._strip_extras(self.line)
else:
@@ -539,6 +531,14 @@ class Line(object):
"""Sets ``self.name`` if given a **PEP-508** style URL"""
line = self.line
try:
parsed = URI.parse(line)
line = parsed.to_string(escape_password=False, direct=False, strip_ref=True)
except ValueError:
pass
else:
self._parsed_url = parsed
return line
if self.vcs is not None and self.line.startswith("{0}+".format(self.vcs)):
_, _, _parseable = self.line.partition("+")
parsed = urllib_parse.urlparse(add_ssh_scheme_to_git_uri(_parseable))
@@ -985,7 +985,31 @@ class Line(object):
def parse_link(self):
# type: () -> None
if self.is_file or self.is_url or self.is_vcs:
parsed_url = None # type: Optional[URI]
if not is_valid_url(self.line) and (
self.line.startswith("./")
or (os.path.exists(self.line) or os.path.isabs(self.line))
):
url = pip_shims.shims.path_to_url(os.path.abspath(self.line))
parsed_url = URI.parse(url)
elif is_valid_url(self.line) or is_vcs(self.line) or is_file_url(self.line):
parsed_url = URI.parse(self.line)
if parsed_url is not None:
line = parsed_url.to_string(
escape_password=False, direct=False, strip_ref=True, strip_ssh=False
)
if parsed_url.is_vcs:
self.vcs, _ = parsed_url.scheme.split("+")
if parsed_url.is_file_url:
self.is_local = True
parsed_link = parsed_url.as_link
self._ref = parsed_url.ref
self.uri = parsed_url.bare_url
if parsed_url.name:
self._name = parsed_url.name
if parsed_url.extras:
self.extras = tuple(sorted(set(parsed_url.extras)))
self._link = parsed_link
vcs, prefer, relpath, path, uri, link = FileRequirement.get_link_from_line(
self.line
)
@@ -999,32 +1023,14 @@ class Line(object):
link_url = link.url_without_fragment
if "@" in link_url:
link_url, _ = split_ref_from_uri(link_url)
self._ref = ref
self.vcs = vcs
self.preferred_scheme = prefer
self.relpath = relpath
self.path = path
self.uri = uri
# self.uri = uri
if prefer in ("path", "relpath") or uri.startswith("file"):
self.is_local = True
if link.egg_fragment:
name, extras = pip_shims.shims._strip_extras(link.egg_fragment)
self.extras = tuple(sorted(set(parse_extras(extras))))
self._name = name
else:
# set this so we can call `self.name` without a recursion error
self._link = link
if (self.is_direct_url or vcs) and self.name is not None and vcs is not None:
self._link = create_link(
build_vcs_uri(
vcs=vcs,
uri=link_url,
ref=ref,
extras=self.extras,
name=self.name,
subdirectory=link.subdirectory_fragment,
)
)
if parsed_url.is_vcs or parsed_url.is_direct_url and parsed_link:
self._link = parsed_link
else:
self._link = link
@@ -1041,7 +1047,7 @@ class Line(object):
Generates a 3-tuple of the requisite *name*, *extras* and *url* to generate a
:class:`~packaging.requirements.Requirement` out of.
:return: A Tuple containing an optional name, a Tuple of extras names, and an optional URL.
:return: A Tuple of an optional name, a Tuple of extras, and an optional URL.
:rtype: Tuple[Optional[S], Tuple[Optional[S], ...], Optional[S]]
"""
@@ -1083,8 +1089,8 @@ class Line(object):
when there is a folder called *alembic* in the working directory.
In this case we first need to check that the given requirement is a valid
URL, VCS requirement, or installable filesystem path before deciding to treat it as
a file requirement over a named requirement.
URL, VCS requirement, or installable filesystem path before deciding to treat it
as a file requirement over a named requirement.
"""
line = self.line
if is_file_url(line):
@@ -1178,11 +1184,9 @@ class NamedRequirement(object):
return cls(**creation_kwargs)
@classmethod
def from_pipfile(cls, name, pipfile):
# type: (S, Dict[S, Union[S, bool, Union[List[S], Tuple[S, ...], Set[S]]]]) -> NamedRequirement
creation_args = (
{}
) # type: Dict[STRING_TYPE, Union[Optional[STRING_TYPE], Optional[List[STRING_TYPE]]]]
def from_pipfile(cls, name, pipfile): # type: S # type: TPIPFILE
# type: (...) -> NamedRequirement
creation_args = {} # type: TPIPFILE
if hasattr(pipfile, "keys"):
attr_fields = [field.name for field in attr.fields(cls)]
creation_args = {
@@ -1191,10 +1195,13 @@ class NamedRequirement(object):
creation_args["name"] = name
version = get_version(pipfile) # type: Optional[STRING_TYPE]
extras = creation_args.get("extras", None)
creation_args["version"] = version
creation_args["version"] = version # type: ignore
req = init_requirement("{0}{1}".format(name, version))
if extras:
req.extras += tuple(extras)
if req and extras and req.extras and isinstance(req.extras, tuple):
if isinstance(extras, six.string_types):
req.extras = (extras) + tuple(["{0}".format(xtra) for xtra in req.extras])
elif isinstance(extras, (tuple, list)):
req.extras += tuple(extras)
creation_args["req"] = req
return cls(**creation_args) # type: ignore
+80 -48
View File
@@ -6,14 +6,12 @@ import os
import re
import string
import sys
from collections import defaultdict
from itertools import chain, groupby
from operator import attrgetter
import six
import tomlkit
from attr import validators
from first import first
from packaging.markers import InvalidMarker, Marker, Op, Value, Variable
@@ -25,22 +23,37 @@ from vistir.compat import lru_cache
from vistir.misc import dedup
from vistir.path import is_valid_url
from ..utils import SCHEME_LIST, VCS_LIST, is_star, add_ssh_scheme_to_git_uri
from ..environment import MYPY_RUNNING
from ..utils import SCHEME_LIST, VCS_LIST, add_ssh_scheme_to_git_uri, is_star
if MYPY_RUNNING:
from typing import Union, Optional, List, Set, Any, TypeVar, Tuple, Sequence, Dict, Text, AnyStr, Match, Iterable
from typing import (
Union,
Optional,
List,
Set,
Any,
TypeVar,
Tuple,
Sequence,
Dict,
Text,
AnyStr,
Match,
Iterable,
)
from attr import _ValidatorType
from packaging.requirements import Requirement as PackagingRequirement
from pkg_resources import Requirement as PkgResourcesRequirement
from pkg_resources.extern.packaging.markers import (
Op as PkgResourcesOp, Variable as PkgResourcesVariable,
Value as PkgResourcesValue, Marker as PkgResourcesMarker
Op as PkgResourcesOp,
Variable as PkgResourcesVariable,
Value as PkgResourcesValue,
Marker as PkgResourcesMarker,
)
from pip_shims.shims import Link
from vistir.compat import Path
_T = TypeVar("_T")
TMarker = Union[Marker, PkgResourcesMarker]
TVariable = TypeVar("TVariable", PkgResourcesVariable, Variable)
@@ -65,7 +78,13 @@ NAME_RE = re.compile(NAME_WITH_EXTRAS)
SUBDIR_RE = r"(?:[&#]subdirectory=(?P<subdirectory>.*))"
URL_NAME = r"(?:#egg={0})".format(NAME_WITH_EXTRAS)
REF_RE = r"(?:@(?P<ref>{0}+)?)".format(REF)
URL = r"(?P<scheme>[^ ]+://)(?:(?P<host>[^ ]+?\.?{0}+(?P<port>:\d+)?))?(?P<pathsep>[:/])(?P<path>[^ @]+){1}?".format(ALPHA_NUMERIC, REF_RE)
PATH_RE = r"(?P<pathsep>[:/])(?P<path>[^ @]+){0}?".format(REF_RE)
PASS_RE = r"(?:(?<=:)(?P<password>[^ ]+))"
AUTH_RE = r"(?:(?P<username>[^ ]+)[:@]{0}?@)".format(PASS_RE)
HOST_RE = r"(?:{0}?(?P<host>[^ ]+?\.?{1}+(?P<port>:\d+)?))?".format(
AUTH_RE, ALPHA_NUMERIC
)
URL = r"(?P<scheme>[^ ]+://){0}{1}".format(HOST_RE, PATH_RE)
URL_RE = re.compile(r"{0}(?:{1}?{2}?)?".format(URL, URL_NAME, SUBDIR_RE))
DIRECT_URL_RE = re.compile(r"{0}\s?@\s?{1}".format(NAME_WITH_EXTRAS, URL))
@@ -88,6 +107,7 @@ def create_link(link):
if not isinstance(link, six.string_types):
raise TypeError("must provide a string to instantiate a new link")
from pip_shims.shims import Link
return Link(link)
@@ -111,6 +131,7 @@ def init_requirement(name):
if not isinstance(name, six.string_types):
raise TypeError("must supply a name to generate a requirement")
from pkg_resources import Requirement
req = Requirement.parse(name)
req.vcs = None
req.local_file = None
@@ -139,6 +160,7 @@ def parse_extras(extras_str):
"""
from pkg_resources import Requirement
extras = Requirement.parse("fakepkg{0}".format(extras_to_string(extras_str))).extras
return sorted(dedup([extra.lower() for extra in extras]))
@@ -166,7 +188,7 @@ def build_vcs_uri(
name=None, # type: Optional[S]
ref=None, # type: Optional[S]
subdirectory=None, # type: Optional[S]
extras=None # type: Optional[Iterable[S]]
extras=None, # type: Optional[Iterable[S]]
):
# type: (...) -> STRING_TYPE
if extras is None:
@@ -203,11 +225,17 @@ def convert_direct_url_to_url(direct_url):
url_match = URL_RE.match(direct_url)
if url_match or is_valid_url(direct_url):
return direct_url
match_dict = {} # type: Dict[STRING_TYPE, Union[Tuple[STRING_TYPE, ...], STRING_TYPE]]
match_dict = (
{}
) # type: Dict[STRING_TYPE, Union[Tuple[STRING_TYPE, ...], STRING_TYPE]]
if direct_match is not None:
match_dict = direct_match.groupdict() # type: ignore
if not match_dict:
raise ValueError("Failed converting value to normal URL, is it a direct URL? {0!r}".format(direct_url))
raise ValueError(
"Failed converting value to normal URL, is it a direct URL? {0!r}".format(
direct_url
)
)
url_segments = [match_dict.get(s) for s in ("scheme", "host", "path", "pathsep")]
url = "" # type: STRING_TYPE
url = "".join([s for s in url_segments if s is not None]) # type: ignore
@@ -217,7 +245,7 @@ def convert_direct_url_to_url(direct_url):
ref=match_dict.get("ref"),
name=match_dict.get("name"),
extras=match_dict.get("extras"),
subdirectory=match_dict.get("subdirectory")
subdirectory=match_dict.get("subdirectory"),
)
return new_url
@@ -334,6 +362,7 @@ def _strip_extras_markers(marker):
def get_setuptools_version():
# type: () -> Optional[STRING_TYPE]
import pkg_resources
setuptools_dist = pkg_resources.get_distribution(
pkg_resources.Requirement("setuptools")
)
@@ -364,6 +393,7 @@ def get_pyproject(path):
if not path:
return
from vistir.compat import Path
if not isinstance(path, Path):
path = Path(path)
if not path.is_dir():
@@ -387,10 +417,7 @@ def get_pyproject(path):
else:
requires = ["setuptools>=40.8", "wheel"]
backend = get_default_pyproject_backend()
build_system = {
"requires": requires,
"build-backend": backend
}
build_system = {"requires": requires, "build-backend": backend}
pyproject_data["build_system"] = build_system
else:
requires = build_system.get("requires", ["setuptools>=40.8", "wheel"])
@@ -480,14 +507,14 @@ def key_from_ireq(ireq):
def key_from_req(req):
"""Get an all-lowercase version of the requirement's name."""
if hasattr(req, 'key'):
if hasattr(req, "key"):
# from pkg_resources, such as installed dists for pip-sync
key = req.key
else:
# from packaging, such as install requirements from requirements.txt
key = req.name
key = key.replace('_', '-').lower()
key = key.replace("_", "-").lower()
return key
@@ -529,17 +556,17 @@ def format_requirement(ireq):
"""
if ireq.editable:
line = '-e {}'.format(ireq.link)
line = "-e {}".format(ireq.link)
else:
line = _requirement_to_str_lowercase_name(ireq.req)
if str(ireq.req.marker) != str(ireq.markers):
if not ireq.req.marker:
line = '{}; {}'.format(line, ireq.markers)
line = "{}; {}".format(line, ireq.markers)
else:
name, markers = line.split(";", 1)
markers = markers.strip()
line = '{}; ({}) and ({})'.format(name, markers, ireq.markers)
line = "{}; ({}) and ({})".format(name, markers, ireq.markers)
return line
@@ -552,7 +579,7 @@ def format_specifier(ireq):
# TODO: Ideally, this is carried over to the pip library itself
specs = ireq.specifier._specs if ireq.req is not None else []
specs = sorted(specs, key=lambda x: x._spec[1])
return ','.join(str(s) for s in specs) or '<any>'
return ",".join(str(s) for s in specs) or "<any>"
def get_pinned_version(ireq):
@@ -579,9 +606,7 @@ def get_pinned_version(ireq):
try:
specifier = ireq.specifier
except AttributeError:
raise TypeError("Expected InstallRequirement, not {}".format(
type(ireq).__name__,
))
raise TypeError("Expected InstallRequirement, not {}".format(type(ireq).__name__))
if ireq.editable:
raise ValueError("InstallRequirement is editable")
@@ -591,10 +616,8 @@ def get_pinned_version(ireq):
raise ValueError("InstallRequirement has multiple specifications")
op, version = next(iter(specifier._specs))._spec
if op not in ('==', '===') or version.endswith('.*'):
raise ValueError("InstallRequirement not pinned (is {0!r})".format(
op + version,
))
if op not in ("==", "===") or version.endswith(".*"):
raise ValueError("InstallRequirement not pinned (is {0!r})".format(op + version))
return version
@@ -630,7 +653,7 @@ def as_tuple(ireq):
"""
if not is_pinned_requirement(ireq):
raise TypeError('Expected a pinned InstallRequirement, got {}'.format(ireq))
raise TypeError("Expected a pinned InstallRequirement, got {}".format(ireq))
name = key_from_req(ireq.req)
version = first(ireq.specifier._specs)._spec[1]
@@ -692,9 +715,9 @@ def lookup_table(values, key=None, keyval=None, unique=False, use_lists=False):
if keyval is None:
if key is None:
keyval = (lambda v: v)
keyval = lambda v: v
else:
keyval = (lambda v: (key(v), v))
keyval = lambda v: (key(v), v)
if unique:
return dict(keyval(v) for v in values)
@@ -718,7 +741,7 @@ def lookup_table(values, key=None, keyval=None, unique=False, use_lists=False):
def name_from_req(req):
"""Get the name of the requirement"""
if hasattr(req, 'project_name'):
if hasattr(req, "project_name"):
# from pkg_resources, such as installed dists for pip-sync
return req.project_name
else:
@@ -748,6 +771,7 @@ def make_install_requirement(name, version, extras, markers, constraint=False):
# If no extras are specified, the extras string is blank
from pip_shims.shims import install_req_from_line
extras_string = ""
if extras:
# Sort extras for stability
@@ -755,12 +779,13 @@ def make_install_requirement(name, version, extras, markers, constraint=False):
if not markers:
return install_req_from_line(
str('{}{}=={}'.format(name, extras_string, version)),
constraint=constraint)
str("{}{}=={}".format(name, extras_string, version)), constraint=constraint
)
else:
return install_req_from_line(
str('{}{}=={}; {}'.format(name, extras_string, version, str(markers))),
constraint=constraint)
str("{}{}=={}; {}".format(name, extras_string, version, str(markers))),
constraint=constraint,
)
def version_from_ireq(ireq):
@@ -778,9 +803,10 @@ def version_from_ireq(ireq):
def clean_requires_python(candidates):
"""Get a cleaned list of all the candidates with valid specifiers in the `requires_python` attributes."""
all_candidates = []
sys_version = '.'.join(map(str, sys.version_info[:3]))
sys_version = ".".join(map(str, sys.version_info[:3]))
from packaging.version import parse as parse_version
py_version = parse_version(os.environ.get('PIP_PYTHON_VERSION', sys_version))
py_version = parse_version(os.environ.get("PIP_PYTHON_VERSION", sys_version))
for c in candidates:
from_location = attrgetter("location.requires_python")
requires_python = getattr(c, "requires_python", from_location(c))
@@ -788,7 +814,9 @@ def clean_requires_python(candidates):
# Old specifications had people setting this to single digits
# which is effectively the same as '>=digit,<digit+1'
if requires_python.isdigit():
requires_python = '>={0},<{1}'.format(requires_python, int(requires_python) + 1)
requires_python = ">={0},<{1}".format(
requires_python, int(requires_python) + 1
)
try:
specifierset = SpecifierSet(requires_python)
except InvalidSpecifier:
@@ -802,7 +830,8 @@ def clean_requires_python(candidates):
def fix_requires_python_marker(requires_python):
from packaging.requirements import Requirement as PackagingRequirement
marker_str = ''
marker_str = ""
if any(requires_python.startswith(op) for op in Specifier._operators.keys()):
spec_dict = defaultdict(set)
# We are checking first if we have leading specifier operator
@@ -810,16 +839,18 @@ def fix_requires_python_marker(requires_python):
specifierset = list(SpecifierSet(requires_python))
# for multiple specifiers, the correct way to represent that in
# a specifierset is `Requirement('fakepkg; python_version<"3.0,>=2.6"')`
marker_key = Variable('python_version')
marker_key = Variable("python_version")
for spec in specifierset:
operator, val = spec._spec
cleaned_val = Value(val).serialize().replace('"', "")
spec_dict[Op(operator).serialize()].add(cleaned_val)
marker_str = ' and '.join([
"{0}{1}'{2}'".format(marker_key.serialize(), op, ','.join(vals))
for op, vals in spec_dict.items()
])
marker_to_add = PackagingRequirement('fakepkg; {0}'.format(marker_str)).marker
marker_str = " and ".join(
[
"{0}{1}'{2}'".format(marker_key.serialize(), op, ",".join(vals))
for op, vals in spec_dict.items()
]
)
marker_to_add = PackagingRequirement("fakepkg; {0}".format(marker_str)).marker
return marker_to_add
@@ -851,6 +882,7 @@ def get_name_variants(pkg):
raise TypeError("must provide a string to derive package names")
from pkg_resources import safe_name
from packaging.utils import canonicalize_name
pkg = pkg.lower()
names = {safe_name(pkg), canonicalize_name(pkg), pkg.replace("-", "_")}
return names
+71 -37
View File
@@ -4,27 +4,53 @@ from __future__ import absolute_import, print_function
import contextlib
import logging
import os
import six
import sys
import tomlkit
import vistir
six.add_move(six.MovedAttribute("Mapping", "collections", "collections.abc")) # type: ignore # noqa
six.add_move(six.MovedAttribute("Sequence", "collections", "collections.abc")) # type: ignore # noqa
six.add_move(six.MovedAttribute("Set", "collections", "collections.abc")) # type: ignore # noqa
six.add_move(six.MovedAttribute("ItemsView", "collections", "collections.abc")) # type: ignore # noqa
from six.moves import Mapping, Sequence, Set, ItemsView # type: ignore # noqa
from six.moves.urllib.parse import urlparse, urlsplit, urlunparse
import pip_shims.shims
import six
import tomlkit
import vistir
from six.moves.urllib.parse import urlparse, urlsplit, urlunparse
from vistir.compat import Path
from vistir.path import is_valid_url, ensure_mkdir_p, create_tracked_tempdir
from vistir.path import create_tracked_tempdir, ensure_mkdir_p, is_valid_url
from .environment import MYPY_RUNNING
# fmt: off
six.add_move(
six.MovedAttribute("Mapping", "collections", "collections.abc")
) # type: ignore # noqa # isort:skip
six.add_move(
six.MovedAttribute("Sequence", "collections", "collections.abc")
) # type: ignore # noqa # isort:skip
six.add_move(
six.MovedAttribute("Set", "collections", "collections.abc")
) # type: ignore # noqa # isort:skip
six.add_move(
six.MovedAttribute("ItemsView", "collections", "collections.abc")
) # type: ignore # noqa
from six.moves import ItemsView, Mapping, Sequence, Set # type: ignore # noqa # isort:skip
# fmt: on
if MYPY_RUNNING:
from typing import Dict, Any, Optional, Union, Tuple, List, Iterable, Generator, Text
from typing import (
Dict,
Any,
Optional,
Union,
Tuple,
List,
Iterable,
Generator,
Text,
TypeVar,
)
STRING_TYPE = Union[bytes, str, Text]
S = TypeVar("S", bytes, str, Text)
PipfileEntryType = Union[STRING_TYPE, bool, Tuple[STRING_TYPE], List[STRING_TYPE]]
PipfileType = Union[STRING_TYPE, Dict[STRING_TYPE, PipfileEntryType]]
VCS_LIST = ("git", "svn", "hg", "bzr")
@@ -74,7 +100,7 @@ VCS_SCHEMES = [
def is_installable_dir(path):
# type: (Text) -> bool
# type: (STRING_TYPE) -> bool
if pip_shims.shims.is_installable_dir(path):
return True
pyproject_path = os.path.join(path, "pyproject.toml")
@@ -88,7 +114,7 @@ def is_installable_dir(path):
def strip_ssh_from_git_uri(uri):
# type: (Text) -> Text
# type: (S) -> S
"""Return git+ssh:// formatted URI to git+git@ format"""
if isinstance(uri, six.string_types):
if "git+ssh://" in uri:
@@ -105,7 +131,7 @@ def strip_ssh_from_git_uri(uri):
def add_ssh_scheme_to_git_uri(uri):
# type: (Text) -> Text
# type: (S) -> S
"""Cleans VCS uris from pip format"""
if isinstance(uri, six.string_types):
# Add scheme for parsing purposes, this is also what pip does
@@ -120,7 +146,7 @@ def add_ssh_scheme_to_git_uri(uri):
def is_vcs(pipfile_entry):
# type: (Union[Text, Dict[Text, Union[Text, bool, Tuple[Text], List[Text]]]]) -> bool
# type: (PipfileType) -> bool
"""Determine if dictionary entry from Pipfile is for a vcs dependency."""
if isinstance(pipfile_entry, Mapping):
return any(key for key in pipfile_entry.keys() if key in VCS_LIST)
@@ -135,7 +161,7 @@ def is_vcs(pipfile_entry):
def is_editable(pipfile_entry):
# type: (Union[Text, Dict[Text, Union[Text, bool, Tuple[Text], List[Text]]]]) -> bool
# type: (PipfileType) -> bool
if isinstance(pipfile_entry, Mapping):
return pipfile_entry.get("editable", False) is True
if isinstance(pipfile_entry, six.string_types):
@@ -144,7 +170,7 @@ def is_editable(pipfile_entry):
def multi_split(s, split):
# type: (Text, Iterable[Text]) -> List[Text]
# type: (S, Iterable[S]) -> List[S]
"""Splits on multiple given separators."""
for r in split:
s = s.replace(r, "|")
@@ -152,14 +178,14 @@ def multi_split(s, split):
def is_star(val):
# type: (Union[Text, Dict[Text, Union[Text, bool, Tuple[Text], List[Text]]]]) -> bool
# type: (PipfileType) -> bool
return (isinstance(val, six.string_types) and val == "*") or (
isinstance(val, Mapping) and val.get("version", "") == "*"
)
def convert_entry_to_path(path):
# type: (Dict[Text, Union[Text, bool, Tuple[Text], List[Text]]]) -> Text
# type: (Dict[S, Union[S, bool, Tuple[S], List[S]]]) -> S
"""Convert a pipfile entry to a string"""
if not isinstance(path, Mapping):
@@ -177,7 +203,7 @@ def convert_entry_to_path(path):
def is_installable_file(path):
# type: (Union[Text, Dict[Text, Union[Text, bool, Tuple[Text], List[Text]]]]) -> bool
# type: (PipfileType) -> bool
"""Determine if a path can potentially be installed"""
from packaging import specifiers
@@ -196,7 +222,11 @@ def is_installable_file(path):
return False
parsed = urlparse(path)
is_local = (not parsed.scheme or parsed.scheme == "file" or (len(parsed.scheme) == 1 and os.name == "nt"))
is_local = (
not parsed.scheme
or parsed.scheme == "file"
or (len(parsed.scheme) == 1 and os.name == "nt")
)
if parsed.scheme and parsed.scheme == "file":
path = vistir.compat.fs_decode(vistir.path.url_to_path(path))
normalized_path = vistir.path.normalize_path(path)
@@ -204,7 +234,9 @@ def is_installable_file(path):
return False
is_archive = pip_shims.shims.is_archive_file(normalized_path)
is_local_project = os.path.isdir(normalized_path) and is_installable_dir(normalized_path)
is_local_project = os.path.isdir(normalized_path) and is_installable_dir(
normalized_path
)
if is_local and is_local_project or is_archive:
return True
@@ -217,11 +249,13 @@ def is_installable_file(path):
def get_dist_metadata(dist):
import pkg_resources
from email.parser import FeedParser
if (isinstance(dist, pkg_resources.DistInfoDistribution) and
dist.has_metadata('METADATA')):
metadata = dist.get_metadata('METADATA')
elif dist.has_metadata('PKG-INFO'):
metadata = dist.get_metadata('PKG-INFO')
if isinstance(dist, pkg_resources.DistInfoDistribution) and dist.has_metadata(
"METADATA"
):
metadata = dist.get_metadata("METADATA")
elif dist.has_metadata("PKG-INFO"):
metadata = dist.get_metadata("PKG-INFO")
else:
metadata = ""
@@ -231,7 +265,7 @@ def get_dist_metadata(dist):
def get_setup_paths(base_path, subdirectory=None):
# type: (Text, Optional[Text]) -> Dict[Text, Optional[Text]]
# type: (S, Optional[S]) -> Dict[S, Optional[S]]
if base_path is None:
raise TypeError("must provide a path to derive setup paths from")
setup_py = os.path.join(base_path, "setup.py")
@@ -251,12 +285,12 @@ def get_setup_paths(base_path, subdirectory=None):
return {
"setup_py": setup_py if os.path.exists(setup_py) else None,
"setup_cfg": setup_cfg if os.path.exists(setup_cfg) else None,
"pyproject_toml": pyproject_toml if os.path.exists(pyproject_toml) else None
"pyproject_toml": pyproject_toml if os.path.exists(pyproject_toml) else None,
}
def prepare_pip_source_args(sources, pip_args=None):
# type: (List[Dict[Text, Union[Text, bool]]], Optional[List[Text]]) -> List[Text]
# type: (List[Dict[S, Union[S, bool]]], Optional[List[S]]) -> List[S]
if pip_args is None:
pip_args = []
if sources:
@@ -264,7 +298,9 @@ def prepare_pip_source_args(sources, pip_args=None):
pip_args.extend(["-i", sources[0]["url"]]) # type: ignore
# Trust the host if it's not verified.
if not sources[0].get("verify_ssl", True):
pip_args.extend(["--trusted-host", urlparse(sources[0]["url"]).hostname]) # type: ignore
pip_args.extend(
["--trusted-host", urlparse(sources[0]["url"]).hostname]
) # type: ignore
# Add additional sources as extra indexes.
if len(sources) > 1:
for source in sources[1:]:
@@ -284,7 +320,7 @@ def _ensure_dir(path):
@contextlib.contextmanager
def ensure_setup_py(base):
# type: (Text) -> Generator[None, None, None]
# type: (STRING_TYPE) -> Generator[None, None, None]
if not base:
base = create_tracked_tempdir(prefix="requirementslib-setup")
base_dir = Path(base)
@@ -413,9 +449,7 @@ def get_path(root, path, default=_UNSET):
cur = cur[seg]
except (ValueError, KeyError, IndexError, TypeError):
if not getattr(cur, "__iter__", None):
exc = TypeError(
"%r object is not indexable" % type(cur).__name__
)
exc = TypeError("%r object is not indexable" % type(cur).__name__)
raise PathAccessError(exc, seg, path)
except PathAccessError:
if default is _UNSET: