Vendor in requirementslib==2.0.0 & drop pip-shims & revert sysconfig patch w/ safer setuptools usage (#5228)

* Vendor in requirementslib 2.0.0

* Drop sysconfig default patch.

* Remove setuptools import usages in pipenv.

* Add news fragment.
This commit is contained in:
Matt Davis
2022-08-24 14:09:17 -04:00
committed by GitHub
parent b6d9ad6a63
commit e4cc8f2550
31 changed files with 459 additions and 4188 deletions
+2
View File
@@ -0,0 +1,2 @@
Remove eager and unnecessary importing of ``setuptools`` and ``pkg_resources`` to avoid conflict upgrading ``setuptools``.
Roll back ``sysconfig`` patch of ``pip`` because it was problematic for some ``--system`` commands.
+1
View File
@@ -0,0 +1 @@
Vendor in ``requirementslib==2.0.0`` and drop ``pip-shims`` entirely.
-1
View File
@@ -27,7 +27,6 @@ sys.path.insert(0, PIPENV_PATCHED)
# Load patched pip instead of system pip
os.environ["PIP_SHIMS_BASE_MODULE"] = "pipenv.patched.pip"
os.environ["PIP_DISABLE_PIP_VERSION_CHECK"] = "1"
# Hack to make things work better.
-7
View File
@@ -2095,7 +2095,6 @@ def do_install(
pypi_mirror=pypi_mirror,
skip_lock=skip_lock,
)
pip_shims_module = os.environ.pop("PIP_SHIMS_BASE_MODULE", None)
for pkg_line in pkg_list:
click.secho(
fix_utf8(f"Installing {pkg_line}..."),
@@ -2232,8 +2231,6 @@ def do_install(
# Update project settings with pre preference.
if pre:
project.update_settings({"allow_prereleases": pre})
if pip_shims_module:
os.environ["PIP_SHIMS_BASE_MODULE"] = pip_shims_module
do_init(
project,
dev=dev,
@@ -2439,8 +2436,6 @@ def do_shell(
# otherwise its value will be changed
os.environ["PIPENV_ACTIVE"] = "1"
os.environ.pop("PIP_SHIMS_BASE_MODULE", None)
if fancy:
shell.fork(*fork_args)
return
@@ -2591,7 +2586,6 @@ def do_run(
load_dot_env(project, quiet=quiet)
env = os.environ.copy()
env.pop("PIP_SHIMS_BASE_MODULE", None)
path = env.get("PATH", "")
if project.virtualenv_location:
@@ -2609,7 +2603,6 @@ def do_run(
# such as in inline_activate_virtual_environment
# otherwise its value will be changed
env["PIPENV_ACTIVE"] = "1"
env.pop("PIP_SHIMS_BASE_MODULE", None)
try:
script = project.build_script(command, args)
+10 -44
View File
@@ -16,6 +16,7 @@ import pkg_resources
import pipenv
from pipenv.patched.pip._internal.commands.install import InstallCommand
from pipenv.patched.pip._internal.index.package_finder import PackageFinder
from pipenv.patched.pip._internal.req.req_uninstall import UninstallPathSet
from pipenv.patched.pip._vendor.packaging.utils import canonicalize_name
from pipenv.utils.constants import is_type_checking
from pipenv.utils.indexes import prepare_pip_source_args
@@ -954,24 +955,6 @@ class Environment:
result = str(result.path)
return result
def get_install_args(self, editable=False, setup_path=None):
install_arg = "install" if not editable else "develop"
install_keys = ["headers", "purelib", "platlib", "scripts", "data"]
install_args = [
self.environment.python,
"-u",
"-c",
SETUPTOOLS_SHIM % setup_path,
install_arg,
"--single-version-externally-managed",
"--no-deps",
"--prefix={}".format(self.base_paths["prefix"]),
"--no-warn-script-location",
]
for key in install_keys:
install_args.append(f"--install-{key}={self.base_paths[key]}")
return install_args
def install(self, requirements):
if not isinstance(requirements, (tuple, list)):
requirements = [requirements]
@@ -1031,36 +1014,19 @@ class Environment:
)
if monkey_patch:
monkey_patch.activate()
pip_shims = self.safe_import("pip_shims")
pathset_base = pip_shims.UninstallPathSet
pathset_base._permitted = PatchedUninstaller._permitted
dist = next(
iter(d for d in self.get_working_set() if d.project_name == pkgname), None
)
pathset = pathset_base.from_dist(dist)
if pathset is not None:
pathset.remove(auto_confirm=auto_confirm, verbose=verbose)
path_set = UninstallPathSet.from_dist(dist)
if path_set is not None:
path_set.remove(auto_confirm=auto_confirm, verbose=verbose)
try:
yield pathset
yield path_set
except Exception:
if pathset is not None:
pathset.rollback()
if path_set is not None:
path_set.rollback()
else:
if pathset is not None:
pathset.commit()
if pathset is None:
if path_set is not None:
path_set.commit()
if path_set is None:
return
class PatchedUninstaller:
def _permitted(self, path):
return True
SETUPTOOLS_SHIM = (
"import setuptools, tokenize;__file__=%r;"
"f=getattr(tokenize, 'open', open)(__file__);"
"code=f.read().replace('\\r\\n', '\\n');"
"f.close();"
"exec(compile(code, __file__, 'exec'))"
)
-4
View File
@@ -8,16 +8,12 @@ from pipenv.vendor import pythonfinder
def get_pipenv_diagnostics(project):
import setuptools
print("<details><summary>$ pipenv --support</summary>")
print("")
print(f"Pipenv version: `{pipenv.__version__!r}`")
print("")
print(f"Pipenv location: `{os.path.dirname(pipenv.__file__)!r}`")
print("")
print(f"setuptools version: `{setuptools.__version__!r}`")
print("")
print(f"Python location: `{sys.executable!r}`")
print("")
print(f"OS Name: `{os.name!r}`")
@@ -41,7 +41,7 @@ logger = logging.getLogger(__name__)
_PLATLIBDIR: str = getattr(sys, "platlibdir", "lib")
_USE_SYSCONFIG_DEFAULT = sys.version_info >= (3, 7)
_USE_SYSCONFIG_DEFAULT = sys.version_info >= (3, 10)
def _should_use_sysconfig() -> bool:
+4 -12
View File
@@ -4,6 +4,7 @@ from tempfile import NamedTemporaryFile
from typing import Mapping, Sequence
from pipenv.patched.pip._vendor.packaging.markers import Marker
from pipenv.patched.pip._vendor.packaging.version import parse
from .constants import SCHEME_LIST, VCS_LIST
from .shell import temp_path
@@ -61,10 +62,7 @@ def get_canonical_names(packages):
def pep440_version(version):
"""Normalize version to PEP 440 standards"""
# Use pip built-in version parser.
from pipenv.vendor.pip_shims import shims
return str(shims.parse_version(version))
return str(parse(version))
def pep423_name(name):
@@ -360,12 +358,6 @@ def locked_repository(requirement):
if not requirement.is_vcs:
return
original_base = os.environ.pop("PIP_SHIMS_BASE_MODULE", None)
os.environ["PIP_SHIMS_BASE_MODULE"] = "pipenv.patched.pip"
src_dir = create_tracked_tempdir(prefix="pipenv-", suffix="-src")
try:
with requirement.req.locked_vcs_repo(src_dir=src_dir) as repo:
yield repo
finally:
if original_base:
os.environ["PIP_SHIMS_BASE_MODULE"] = original_base
with requirement.req.locked_vcs_repo(src_dir=src_dir) as repo:
yield repo
-13
View File
@@ -1,13 +0,0 @@
Copyright (c) 2018-2021, Dan Ryan <dan@danryan.co> and Frost Ming
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-54
View File
@@ -1,54 +0,0 @@
# -*- coding=utf-8 -*-
"""
This library is a set of compatibility access shims to the ``pip`` internal API.
It provides compatibility with pip versions 8.0 through the current release. The
shims are provided using a lazy import strategy by hacking a module by overloading
a class instance's ``getattr`` method. This library exists due to my constant
writing of the same set of import shims.
Submodules
==========
.. autosummary::
:toctree: _autosummary
pip_shims.models
pip_shims.compat
pip_shims.utils
pip_shims.shims
pip_shims.environment
"""
from __future__ import absolute_import
import sys
from . import shims
__version__ = "0.7.3"
if "pip_shims" in sys.modules:
# mainly to keep a reference to the old module on hand so it doesn't get
# weakref'd away
if __name__ != "pip_shims":
del sys.modules["pip_shims"]
if __name__ in sys.modules:
old_module = sys.modules[__name__]
module = sys.modules["pip_shims"] = sys.modules[__name__] = shims._new()
module.shims = shims
module.__dict__.update(
{
"__file__": __file__,
"__package__": "pip_shims",
"__path__": __path__,
"__doc__": __doc__,
"__all__": module.__all__ + ["shims"],
"__version__": __version__,
"__name__": __name__,
}
)
-1632
View File
File diff suppressed because it is too large Load Diff
-45
View File
@@ -1,45 +0,0 @@
# -*- coding=utf-8 -*-
"""
Module with functionality to learn about the environment.
"""
from __future__ import absolute_import
import importlib
import os
def get_base_import_path():
base_import_path = os.environ.get("PIP_SHIMS_BASE_MODULE", "pip")
return base_import_path
BASE_IMPORT_PATH = get_base_import_path()
def get_pip_version(import_path=BASE_IMPORT_PATH):
try:
pip = importlib.import_module(import_path)
except ImportError:
if import_path != "pip":
return get_pip_version(import_path="pip")
else:
import subprocess
version = subprocess.check_output(["pip", "--version"])
if version:
version = version.decode("utf-8").split()[1]
return version
return "0.0.0"
version = getattr(pip, "__version__", None)
return version
def is_type_checking():
try:
from typing import TYPE_CHECKING
except ImportError:
return False
return TYPE_CHECKING
MYPY_RUNNING = os.environ.get("MYPY_RUNNING", is_type_checking())
-1171
View File
File diff suppressed because it is too large Load Diff
-75
View File
@@ -1,75 +0,0 @@
# -*- coding=utf-8 -*-
"""
Main module with magic self-replacement mechanisms to handle import speedups.
"""
from __future__ import absolute_import
import sys
import types
from pipenv.patched.pip._vendor.packaging.version import parse as parse_version
from .models import (
ShimmedPathCollection,
get_package_finder,
import_pip,
lookup_current_pip_version,
)
class _shims(types.ModuleType):
CURRENT_PIP_VERSION = str(lookup_current_pip_version())
@classmethod
def parse_version(cls, version):
return parse_version(version)
def __dir__(self):
result = list(self._locations.keys()) + list(self.__dict__.keys())
result.extend(
(
"__file__",
"__doc__",
"__all__",
"__docformat__",
"__name__",
"__path__",
"__package__",
"__version__",
)
)
return result
@classmethod
def _new(cls):
return cls()
@property
def __all__(self):
return list(self._locations.keys())
def __init__(self):
self.pip = import_pip()
self._locations = ShimmedPathCollection.get_registry()
self._locations["get_package_finder"] = get_package_finder
self.pip_version = str(lookup_current_pip_version())
self.parsed_pip_version = lookup_current_pip_version()
def __getattr__(self, *args, **kwargs):
locations = super(_shims, self).__getattribute__("_locations")
if args[0] in locations:
return locations[args[0]].shim()
return super(_shims, self).__getattribute__(*args, **kwargs)
old_module = sys.modules[__name__] if __name__ in sys.modules else None
module = sys.modules[__name__] = _shims()
module.__dict__.update(
{
"__file__": __file__,
"__package__": __package__,
"__doc__": __doc__,
"__all__": module.__all__,
"__name__": __name__,
}
)
-453
View File
@@ -1,453 +0,0 @@
# -*- coding=utf-8 -*-
"""
Shared utility functions which are not specific to any particular module.
"""
from __future__ import absolute_import
import contextlib
import copy
import inspect
import sys
from collections.abc import Callable
from functools import wraps
from pipenv.patched.pip._vendor.packaging.version import _BaseVersion, parse
from .environment import MYPY_RUNNING
if MYPY_RUNNING:
from types import ModuleType
from typing import (
Any,
Dict,
Iterator,
List,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
)
TShimmedPath = TypeVar("TShimmedPath")
TShimmedPathCollection = TypeVar("TShimmedPathCollection")
TShim = Union[TShimmedPath, TShimmedPathCollection]
TShimmedFunc = Union[TShimmedPath, TShimmedPathCollection, Callable, Type]
STRING_TYPES = (str,)
if sys.version_info < (3, 0):
STRING_TYPES = STRING_TYPES + (unicode,) # noqa:F821
class BaseMethod(Callable):
def __init__(self, func_base, name, *args, **kwargs):
# type: (Callable, str, Any, Any) -> None
self.func = func_base
self.__name__ = self.__qualname__ = name
def __call__(self, *args, **kwargs):
# type: (Any, Any) -> Any
return self.func(*args, **kwargs)
class BaseClassMethod(Callable):
def __init__(self, func_base, name, *args, **kwargs):
# type: (Callable, str, Any, Any) -> None
self.func = func_base
self.__name__ = self.__qualname__ = name
def __call__(self, cls, *args, **kwargs):
# type: (Type, Any, Any) -> Any
return self.func(*args, **kwargs)
def make_method(fn):
# type: (Callable) -> Callable
@wraps(fn)
def method_creator(*args, **kwargs):
# type: (Any, Any) -> Callable
return BaseMethod(fn, *args, **kwargs)
return method_creator
def make_classmethod(fn):
# type: (Callable) -> Callable
@wraps(fn)
def classmethod_creator(*args, **kwargs):
# type: (Any, Any) -> Callable
return classmethod(BaseClassMethod(fn, *args, **kwargs))
return classmethod_creator
def memoize(obj):
# type: (Any) -> Callable
cache = obj.cache = {}
@wraps(obj)
def memoizer(*args, **kwargs):
key = str(args) + str(kwargs)
if key not in cache:
cache[key] = obj(*args, **kwargs)
return cache[key]
return memoizer
@memoize
def _parse(version):
# type: (str) -> Tuple[int, ...]
if isinstance(version, STRING_TYPES):
return tuple(int(i) for i in version.split("."))
return version
@memoize
def parse_version(version):
# type: (str) -> _BaseVersion
if not isinstance(version, STRING_TYPES):
raise TypeError("Can only derive versions from string, got {!r}".format(version))
return parse(version)
@memoize
def split_package(module, subimport=None):
# type: (str, Optional[str]) -> Tuple[str, str]
"""
Used to determine what target to import.
Either splits off the final segment or uses the provided sub-import to return a
2-tuple of the import path and the target module or sub-path.
:param str module: A package to import from
:param Optional[str] subimport: A class, function, or subpackage to import
:return: A 2-tuple of the corresponding import package and sub-import path
:rtype: Tuple[str, str]
:Example:
>>> from pip_shims.utils import split_package
>>> split_package("pipenv.patched.pip._internal.req.req_install", subimport="InstallRequirement")
("pipenv.patched.pip._internal.req.req_install", "InstallRequirement")
>>> split_package("pipenv.patched.pip._internal.cli.base_command")
("pipenv.patched.pip._internal.cli", "base_command")
"""
package = None
if subimport:
package = subimport
else:
module, _, package = module.rpartition(".")
return module, package
def get_method_args(target_method):
# type: (Callable) -> Tuple[Callable, Optional[inspect.Arguments]]
"""
Returns the arguments for a callable.
:param Callable target_method: A callable to retrieve arguments for
:return: A 2-tuple of the original callable and its resulting arguments
:rtype: Tuple[Callable, Optional[inspect.Arguments]]
"""
inspected_args = None
try:
inspected_args = inspect.getargs(target_method.__code__)
except AttributeError:
target_func = getattr(target_method, "__func__", None)
if target_func is not None:
inspected_args = inspect.getargs(target_func.__code__)
else:
target_func = target_method
return target_func, inspected_args
def set_default_kwargs(basecls, method, *args, **default_kwargs):
# type: (Union[Type, ModuleType], Callable, Any, Any) -> Union[Type, ModuleType] # noqa
target_method = getattr(basecls, method, None)
if target_method is None:
return basecls
target_func, inspected_args = get_method_args(target_method)
if inspected_args is not None:
pos_args = inspected_args.args
else:
pos_args = []
# Spit back the base class if we can't find matching arguments
# to put defaults in place of
if not any(arg in pos_args for arg in list(default_kwargs.keys())):
return basecls
prepended_defaults = tuple() # type: Tuple[Any, ...]
# iterate from the function's argument order to make sure we fill this
# out in the correct order
for arg in args:
prepended_defaults += (arg,)
for arg in pos_args:
if arg in default_kwargs:
prepended_defaults = prepended_defaults + (default_kwargs[arg],)
if not prepended_defaults:
return basecls
new_defaults = prepended_defaults + target_method.__defaults__
target_method.__defaults__ = new_defaults
setattr(basecls, method, target_method)
return basecls
def ensure_function(parent, funcname, func):
# type: (Union[ModuleType, Type, Callable, Any], str, Callable) -> Callable
"""Given a module, a function name, and a function object, attaches the given
function to the module and ensures it is named properly according to the provided
argument
:param Any parent: The parent to attack the function to
:param str funcname: The name to give the function
:param Callable func: The function to rename and attach to **parent**
:returns: The function with its name, qualname, etc set to mirror **parent**
:rtype: Callable
"""
qualname = funcname
if parent is None:
parent = __module__ # type: ignore # noqa:F821
parent_is_module = inspect.ismodule(parent)
parent_is_class = inspect.isclass(parent)
module = None
if parent_is_module:
module = parent.__name__
elif parent_is_class:
qualname = "{}.{}".format(parent.__name__, qualname)
module = getattr(parent, "__module__", None)
else:
module = getattr(parent, "__module__", None)
try:
func.__name__ = funcname
except AttributeError:
if getattr(func, "__func__", None) is not None:
func = func.__func__
func.__name__ = funcname
func.__qualname__ = qualname
func.__module__ = module
return func
def add_mixin_to_class(basecls, mixins):
# type: (Type, List[Type]) -> Type
"""
Given a class, adds the provided mixin classes as base classes and gives a new class
:param Type basecls: An initial class to generate a new class from
:param List[Type] mixins: A list of mixins to add as base classes
:return: A new class with the provided mixins as base classes
:rtype: Type[basecls, *mixins]
"""
if not any(mixins):
return basecls
base_dict = basecls.__dict__.copy()
class_tuple = (basecls,) # type: Tuple[Type, ...]
for mixin in mixins:
if not mixin:
continue
mixin_dict = mixin.__dict__.copy()
base_dict.update(mixin_dict)
class_tuple = class_tuple + (mixin,)
base_dict.update(basecls.__dict__)
return type(basecls.__name__, class_tuple, base_dict)
def fallback_is_file_url(link):
# type: (Any) -> bool
return link.url.lower().startswith("file:")
def fallback_is_artifact(self):
# type: (Any) -> bool
return not getattr(self, "is_vcs", False)
def fallback_is_vcs(self):
# type: (Any) -> bool
return not getattr(self, "is_artifact", True)
def resolve_possible_shim(target):
# type: (TShimmedFunc) -> Optional[Union[Type, Callable]]
if target is None:
return target
if getattr(target, "shim", None):
return target.shim()
return target
@contextlib.contextmanager
def nullcontext(*args, **kwargs):
# type: (Any, Any) -> Iterator
try:
yield
finally:
pass
def has_property(target, name):
# type: (Any, str) -> bool
if getattr(target, name, None) is not None:
return True
return False
def apply_alias(imported, target, *aliases):
# type: (Union[ModuleType, Type, None], Any, Any) -> Any
"""
Given a target with attributes, point non-existant aliases at the first existing one
:param Union[ModuleType, Type] imported: A Module or Class base
:param Any target: The target which is a member of **imported** and will have aliases
:param str aliases: A list of aliases, the first found attribute will be the basis
for all non-existant names which will be created as pointers
:return: The original target
:rtype: Any
"""
base_value = None # type: Optional[Any]
applied_aliases = set()
unapplied_aliases = set()
for alias in aliases:
if has_property(target, alias):
base_value = getattr(target, alias)
applied_aliases.add(alias)
else:
unapplied_aliases.add(alias)
is_callable = inspect.ismethod(base_value) or inspect.isfunction(base_value)
for alias in unapplied_aliases:
if is_callable:
func_copy = copy.deepcopy(base_value)
alias_value = ensure_function(imported, alias, func_copy)
else:
alias_value = base_value
setattr(target, alias, alias_value)
return target
def suppress_setattr(obj, attr, value, filter_none=False):
"""
Set an attribute, suppressing any exceptions and skipping the attempt on failure.
:param Any obj: Object to set the attribute on
:param str attr: The attribute name to set
:param Any value: The value to set the attribute to
:param bool filter_none: [description], defaults to False
:return: Nothing
:rtype: None
:Example:
>>> class MyClass(object):
... def __init__(self, name):
... self.name = name
... self.parent = None
... def __repr__(self):
... return "<{0!r} instance (name={1!r}, parent={2!r})>".format(
... self.__class__.__name__, self.name, self.parent
... )
... def __str__(self):
... return self.name
>>> me = MyClass("Dan")
>>> dad = MyClass("John")
>>> grandfather = MyClass("Joe")
>>> suppress_setattr(dad, "parent", grandfather)
>>> dad
<'MyClass' instance (name='John', parent=<'MyClass' instance (name='Joe', parent=None
)>)>
>>> suppress_setattr(me, "parent", dad)
>>> me
<'MyClass' instance (name='Dan', parent=<'MyClass' instance (name='John', parent=<'My
Class' instance (name='Joe', parent=None)>)>)>
>>> suppress_setattr(me, "grandparent", grandfather)
>>> me
<'MyClass' instance (name='Dan', parent=<'MyClass' instance (name='John', parent=<'My
Class' instance (name='Joe', parent=None)>)>)>
"""
if filter_none and value is None:
pass
try:
setattr(obj, attr, value)
except Exception: # noqa
pass
def get_allowed_args(fn_or_class):
# type: (Union[Callable, Type]) -> Tuple[List[str], Dict[str, Any]]
"""
Given a callable or a class, returns the arguments and default kwargs passed in.
:param Union[Callable, Type] fn_or_class: A function, method or class to inspect.
:return: A 2-tuple with a list of arguments and a dictionary of keywords mapped to
default values.
:rtype: Tuple[List[str], Dict[str, Any]]
"""
try:
signature = inspect.signature(fn_or_class)
except AttributeError:
import funcsigs
signature = funcsigs.signature(fn_or_class)
args = []
kwargs = {}
for arg, param in signature.parameters.items():
if (
param.kind in (param.POSITIONAL_OR_KEYWORD, param.POSITIONAL_ONLY)
) and param.default is param.empty:
args.append(arg)
else:
kwargs[arg] = param.default if param.default is not param.empty else None
return args, kwargs
def call_function_with_correct_args(fn, **provided_kwargs):
# type: (Callable, Dict[str, Any]) -> Any
"""
Determines which arguments from **provided_kwargs** to call **fn** and calls it.
Consumes a list of allowed arguments (e.g. from :func:`~inspect.getargs()`) and
uses it to determine which of the arguments in the provided kwargs should be passed
through to the given callable.
:param Callable fn: A callable which has some dynamic arguments
:param List[str] allowed_args: A list of allowed arguments which can be passed to
the supplied function
:return: The result of calling the function
:rtype: Any
"""
# signature = inspect.signature(fn)
args = []
kwargs = {}
func_args, func_kwargs = get_allowed_args(fn)
for arg in func_args:
args.append(provided_kwargs[arg])
for arg in func_kwargs:
if not provided_kwargs.get(arg):
continue
kwargs[arg] = provided_kwargs[arg]
return fn(*args, **kwargs)
def filter_allowed_args(fn, **provided_kwargs):
# type: (Callable, Dict[str, Any]) -> Tuple[List[Any], Dict[str, Any]]
"""
Given a function and a kwarg mapping, return only those kwargs used in the function.
:param Callable fn: A function to inspect
:param Dict[str, Any] kwargs: A mapping of kwargs to filter
:return: A new, filtered kwarg mapping
:rtype: Tuple[List[Any], Dict[str, Any]]
"""
args = []
kwargs = {}
func_args, func_kwargs = get_allowed_args(fn)
for arg in func_args:
if arg in provided_kwargs:
args.append(provided_kwargs[arg])
for arg in func_kwargs:
if arg not in provided_kwargs:
continue
kwargs[arg] = provided_kwargs[arg]
return args, kwargs
+1 -6
View File
@@ -1,16 +1,11 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
import logging
import warnings
import setuptools
from .models.lockfile import Lockfile
from .models.pipfile import Pipfile
from .models.requirements import Requirement
__version__ = "1.6.9"
__version__ = "2.0.0"
logger = logging.getLogger(__name__)
+9 -186
View File
@@ -1,6 +1,3 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import atexit
import copy
import hashlib
@@ -11,7 +8,9 @@ import sys
import pipenv.vendor.vistir as vistir
from pipenv.patched.pip._vendor.packaging.requirements import Requirement
from pipenv.vendor.pip_shims.shims import FAVORITE_HASH, SafeFileCache
from pipenv.patched.pip._internal.utils.hashes import FAVORITE_HASH
from pipenv.patched.pip._internal.vcs.versioncontrol import VcsSupport
from pipenv.patched.pip._vendor.cachecontrol.cache import DictCache
from pipenv.vendor.platformdirs import user_cache_dir
from .utils import as_tuple, get_pinned_version, key_from_req, lookup_table
@@ -19,66 +18,12 @@ from .utils import as_tuple, get_pinned_version, key_from_req, lookup_table
CACHE_DIR = os.environ.get("PIPENV_CACHE_DIR", user_cache_dir("pipenv"))
# Pip-tools cache implementation
class CorruptCacheError(Exception):
def __init__(self, path):
self.path = path
def __str__(self):
lines = [
"The dependency cache seems to have been corrupted.",
"Inspect, or delete, the following file:",
" {}".format(self.path),
]
return os.linesep.join(lines)
def read_cache_file(cache_file_path):
with open(cache_file_path, "r") as cache_file:
try:
doc = json.load(cache_file)
except ValueError:
raise CorruptCacheError(cache_file_path)
# Check version and load the contents
assert doc["__format__"] == 1, "Unknown cache file format"
return doc["dependencies"]
class DependencyCache(object):
"""Creates a new persistent dependency cache for the current Python
version. The cache file is written to the appropriate user cache dir for
the current platform, i.e.
"""Creates a new in memory dependency cache for the current Python
version."""
~/.cache/pip-tools/depcache-pyX.Y.json
Where X.Y indicates the Python version.
"""
def __init__(self, cache_dir=None):
if cache_dir is None:
cache_dir = CACHE_DIR
if not pathlib.Path(CACHE_DIR).absolute().is_dir():
try:
vistir.path.mkdir_p(os.path.abspath(cache_dir))
except OSError:
pass
py_version = ".".join(str(digit) for digit in sys.version_info[:2])
cache_filename = "depcache-py{}.json".format(py_version)
self._cache_file = os.path.join(cache_dir, cache_filename)
self._cache = None
@property
def cache(self):
"""The dictionary that is the actual in-memory cache.
This property lazily loads the cache from disk.
"""
if self._cache is None:
self.read_cache()
return self._cache
def __init__(self):
self.cache = {}
def as_cache_key(self, ireq):
"""Given a requirement, return its cache key. This behavior is a little
@@ -99,13 +44,6 @@ class DependencyCache(object):
extras_string = "[{}]".format(",".join(extras))
return name, "{}{}".format(version, extras_string)
def read_cache(self):
"""Reads the cached contents into memory."""
if os.path.exists(self._cache_file):
self._cache = read_cache_file(self._cache_file)
else:
self._cache = {}
def write_cache(self):
"""Writes the cache to disk as JSON."""
doc = {
@@ -117,7 +55,6 @@ class DependencyCache(object):
def clear(self):
self._cache = {}
self.write_cache()
def __contains__(self, ireq):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
@@ -131,7 +68,6 @@ class DependencyCache(object):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
self.cache.setdefault(pkgname, {})
self.cache[pkgname][pkgversion_and_extras] = values
self.write_cache()
def __delitem__(self, ireq):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
@@ -139,7 +75,6 @@ class DependencyCache(object):
del self.cache[pkgname][pkgversion_and_extras]
except KeyError:
return
self.write_cache()
def get(self, ireq, default=None):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
@@ -184,7 +119,7 @@ class DependencyCache(object):
)
class HashCache(SafeFileCache):
class HashCache(DictCache):
"""Caches hashes of PyPI artifacts so we do not need to re-download them.
Hashes are only cached when the URL appears to contain a hash in it
@@ -199,15 +134,11 @@ class HashCache(SafeFileCache):
session = requests.session()
atexit.register(session.close)
cache_dir = kwargs.pop("cache_dir", CACHE_DIR)
self.session = session
kwargs.setdefault("directory", os.path.join(cache_dir, "hash-cache"))
super(HashCache, self).__init__(*args, **kwargs)
def get_hash(self, location):
from pipenv.vendor.pip_shims import VcsSupport
# if there is no location hash (i.e., md5 / sha256 / etc) we on't want to store it
# if there is no location hash (i.e., md5 / sha256 / etc) we don't want to store it
hash_value = None
vcs = VcsSupport()
orig_scheme = location.scheme
@@ -235,111 +166,3 @@ class HashCache(SafeFileCache):
for chunk in iter(lambda: fp.read(8096), b""):
h.update(chunk)
return ":".join([FAVORITE_HASH, h.hexdigest()])
class _JSONCache(object):
"""A persistent cache backed by a JSON file.
The cache file is written to the appropriate user cache dir for the
current platform, i.e.
~/.cache/pip-tools/depcache-pyX.Y.json
Where X.Y indicates the Python version.
"""
filename_format = None
def __init__(self, cache_dir=CACHE_DIR):
vistir.mkdir_p(cache_dir)
python_version = ".".join(str(digit) for digit in sys.version_info[:2])
cache_filename = self.filename_format.format(
python_version=python_version,
)
self._cache_file = os.path.join(cache_dir, cache_filename)
self._cache = None
@property
def cache(self):
"""The dictionary that is the actual in-memory cache.
This property lazily loads the cache from disk.
"""
if self._cache is None:
self.read_cache()
return self._cache
def as_cache_key(self, ireq):
"""Given a requirement, return its cache key.
This behavior is a little weird in order to allow backwards
compatibility with cache files. For a requirement without extras, this
will return, for example::
("ipython", "2.1.0")
For a requirement with extras, the extras will be comma-separated and
appended to the version, inside brackets, like so::
("ipython", "2.1.0[nbconvert,notebook]")
"""
extras = tuple(sorted(ireq.extras))
if not extras:
extras_string = ""
else:
extras_string = "[{}]".format(",".join(extras))
name = key_from_req(ireq.req)
version = get_pinned_version(ireq)
return name, "{}{}".format(version, extras_string)
def read_cache(self):
"""Reads the cached contents into memory."""
if os.path.exists(self._cache_file):
self._cache = read_cache_file(self._cache_file)
else:
self._cache = {}
def write_cache(self):
"""Writes the cache to disk as JSON."""
doc = {
"__format__": 1,
"dependencies": self._cache,
}
with open(self._cache_file, "w") as f:
json.dump(doc, f, sort_keys=True)
def clear(self):
self._cache = {}
self.write_cache()
def __contains__(self, ireq):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
return pkgversion_and_extras in self.cache.get(pkgname, {})
def __getitem__(self, ireq):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
return self.cache[pkgname][pkgversion_and_extras]
def __setitem__(self, ireq, values):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
self.cache.setdefault(pkgname, {})
self.cache[pkgname][pkgversion_and_extras] = values
self.write_cache()
def __delitem__(self, ireq):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
try:
del self.cache[pkgname][pkgversion_and_extras]
except KeyError:
return
self.write_cache()
def get(self, ireq, default=None):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
return self.cache.get(pkgname, {}).get(pkgversion_and_extras, default)
class RequiresPythonCache(_JSONCache):
"""Cache a candidate's Requires-Python information."""
filename_format = "pyreqcache-py{python_version}.json"
+125 -71
View File
@@ -13,13 +13,24 @@ import pipenv.patched.pip._vendor.requests as requests
from pipenv.patched.pip._vendor.packaging.markers import Marker
from pipenv.patched.pip._vendor.packaging.utils import canonicalize_name
from pipenv.patched.pip._vendor.packaging.version import parse
from pipenv.vendor.pip_shims import shims
from pipenv.patched.pip._internal.cache import WheelCache
from pipenv.patched.pip._internal.models.format_control import FormatControl
from pipenv.patched.pip._internal.operations.build.build_tracker import get_build_tracker
from pipenv.patched.pip._internal.req.constructors import install_req_from_line
from pipenv.patched.pip._internal.req.req_install import InstallRequirement
from pipenv.patched.pip._internal.req.req_set import RequirementSet
from pipenv.patched.pip._internal.utils.temp_dir import TempDirectory, global_tempdir_manager
from pipenv.vendor.vistir.compat import fs_str
from pipenv.vendor.vistir.contextmanagers import cd, temp_environ
from pipenv.vendor.vistir.contextmanagers import temp_environ
from pipenv.vendor.vistir.path import create_tracked_tempdir
from ..environment import MYPY_RUNNING
from ..utils import _ensure_dir, prepare_pip_source_args
from ..utils import (
_ensure_dir,
get_package_finder,
get_pip_command,
prepare_pip_source_args,
)
from .cache import CACHE_DIR, DependencyCache
from .setup_info import SetupInfo
from .utils import (
@@ -49,7 +60,9 @@ if MYPY_RUNNING:
)
from pipenv.patched.pip._vendor.packaging.requirements import Requirement as PackagingRequirement
from shims import Command, InstallationCandidate, InstallRequirement, PackageFinder
from pipenv.patched.pip._internal.commands.base_command import Command
from pipenv.patched.pip._internal.index.package_finder import PackageFinder
from pipenv.patched.pip._internal.models.candidate import InstallationCandidate
TRequirement = TypeVar("TRequirement")
RequirementType = TypeVar(
@@ -68,8 +81,8 @@ DEPENDENCY_CACHE = DependencyCache()
@contextlib.contextmanager
def _get_wheel_cache():
with shims.global_tempdir_manager():
yield shims.WheelCache(CACHE_DIR, shims.FormatControl(set(), set()))
with global_tempdir_manager():
yield WheelCache(CACHE_DIR, FormatControl(set(), set()))
def _get_filtered_versions(ireq, versions, prereleases):
@@ -98,15 +111,6 @@ def find_all_matches(finder, ireq, pre=False):
return candidates
def get_pip_command():
# type: () -> Command
# Use pip's parser for pip.conf management and defaults.
# General options (find_links, index_url, extra_index_url, trusted_host,
# and pre) are deferred to pip.
pip_command = shims.InstallCommand()
return pip_command
@attr.s
class AbstractDependency(object):
name = attr.ib() # type: STRING_TYPE
@@ -214,7 +218,7 @@ class AbstractDependency(object):
req = Requirement.from_line(key)
req = req.merge_markers(self.markers)
self.dep_dict[key] = req.get_abstract_dependencies()
self.dep_dict[key] = req.abstract_dependencies()
return self.dep_dict[key]
@classmethod
@@ -274,26 +278,23 @@ class AbstractDependency(object):
return abstract_dep
def get_abstract_dependencies(reqs, sources=None, parent=None):
def get_abstract_dependencies(reqs, parent=None):
"""Get all abstract dependencies for a given list of requirements.
Given a set of requirements, convert each requirement to an Abstract Dependency.
:param reqs: A list of Requirements
:type reqs: list[:class:`~requirementslib.models.requirements.Requirement`]
:param sources: Pipfile-formatted sources, defaults to None
:param sources: list[dict], optional
:param parent: The parent of this list of dependencies, defaults to None
:param parent: :class:`~requirementslib.models.requirements.Requirement`, optional
:return: A list of Abstract Dependencies
:rtype: list[:class:`~requirementslib.models.dependency.AbstractDependency`]
"""
deps = []
from .requirements import Requirement
for req in reqs:
if isinstance(req, shims.InstallRequirement):
if isinstance(req, InstallRequirement):
requirement = Requirement.from_line("{0}{1}".format(req.name, req.specifier))
if req.link:
requirement.req.link = req.link
@@ -323,19 +324,18 @@ def get_dependencies(ireq, sources=None, parent=None):
:return: A set of dependency lines for generating new InstallRequirements.
:rtype: set(str)
"""
if not isinstance(ireq, shims.InstallRequirement):
if not isinstance(ireq, InstallRequirement):
name = getattr(ireq, "project_name", getattr(ireq, "project", ireq.name))
version = getattr(ireq, "version", None)
if not version:
ireq = shims.InstallRequirement.from_line("{0}".format(name))
ireq = install_req_from_line("{0}".format(name))
else:
ireq = shims.InstallRequirement.from_line("{0}=={1}".format(name, version))
pip_options = get_pip_options(sources=sources)
ireq = install_req_from_line("{0}=={1}".format(name, version))
getters = [
get_dependencies_from_cache,
get_dependencies_from_wheel_cache,
get_dependencies_from_json,
functools.partial(get_dependencies_from_index, pip_options=pip_options),
functools.partial(get_dependencies_from_index, sources=sources),
]
for getter in getters:
deps = getter(ireq)
@@ -345,7 +345,7 @@ def get_dependencies(ireq, sources=None, parent=None):
def get_dependencies_from_wheel_cache(ireq):
# type: (shims.InstallRequirement) -> Optional[Set[shims.InstallRequirement]]
# type: (InstallRequirement) -> Optional[Set[InstallRequirement]]
"""Retrieves dependencies for the given install requirement from the wheel
cache.
@@ -358,7 +358,7 @@ def get_dependencies_from_wheel_cache(ireq):
if ireq.editable or not is_pinned_requirement(ireq):
return
with _get_wheel_cache() as wheel_cache:
matches = wheel_cache.get(ireq.link, name_from_req(ireq.req))
matches = wheel_cache.get(ireq.link, name_from_req(ireq.req), ireq.markers)
if matches:
matches = set(matches)
if not DEPENDENCY_CACHE.get(ireq):
@@ -406,7 +406,7 @@ def get_dependencies_from_json(ireq):
if not requires_dist: # The API can return None for this.
return
for requires in requires_dist:
i = shims.InstallRequirement.from_line(requires)
i = install_req_from_line(requires)
# See above, we don't handle requirements with extras.
if not _marker_contains_extra(i):
yield format_requirement(i)
@@ -442,7 +442,7 @@ def get_dependencies_from_cache(ireq):
try:
broken = False
for line in cached:
dep_ireq = shims.InstallRequirement.from_line(line)
dep_ireq = install_req_from_line(line)
name = canonicalize_name(dep_ireq.name)
if _marker_contains_extra(dep_ireq):
broken = True # The "extra =" marker breaks everything.
@@ -464,6 +464,68 @@ def is_python(section):
return section.startswith("[") and ":" in section
def get_resolver(
finder, build_tracker, pip_options, session, directory, install_command=None
):
wheel_cache = WheelCache(pip_options.cache_dir, pip_options.format_control)
if install_command is None:
install_command = get_pip_command()
preparer = install_command.make_requirement_preparer(
temp_build_dir=directory,
options=pip_options,
build_tracker=build_tracker,
session=session,
finder=finder,
use_user_site=False,
)
resolver = install_command.make_resolver(
preparer=preparer,
finder=finder,
options=pip_options,
wheel_cache=wheel_cache,
use_user_site=False,
ignore_installed=True,
ignore_requires_python=pip_options.ignore_requires_python,
force_reinstall=pip_options.force_reinstall,
upgrade_strategy="to-satisfy-only",
use_pep517=pip_options.use_pep517,
)
return resolver
def resolve(ireq, sources, install_command, pip_options):
with global_tempdir_manager(), get_build_tracker() as build_tracker, TempDirectory() as directory:
session, finder = get_finder(
sources=sources, pip_command=install_command, pip_options=pip_options
)
resolver = get_resolver(
finder=finder,
build_tracker=build_tracker,
pip_options=pip_options,
session=session,
directory=directory,
install_command=install_command,
)
reqset = RequirementSet(install_command)
reqset.add_named_requirement(ireq)
resolver_args = []
resolver_args.append([ireq])
resolver_args.append(True) # check_supported_wheels
if getattr(reqset, "prepare_files", None):
reqset.prepare_files(finder)
result = reqset.requirements
reqset.cleanup_files()
return result
result_reqset = resolver.resolve(*resolver_args)
if result_reqset is None:
result_reqset = reqset
results = result_reqset.requirements
cleanup_fn = getattr(reqset, "cleanup_files", None)
if cleanup_fn is not None:
cleanup_fn()
return results
def get_dependencies_from_index(dep, sources=None, pip_options=None, wheel_cache=None):
"""Retrieves dependencies for the given install requirement from the pip
resolver.
@@ -475,14 +537,12 @@ def get_dependencies_from_index(dep, sources=None, pip_options=None, wheel_cache
:return: A set of dependency lines for generating new InstallRequirements.
:rtype: set(str) or None
"""
session, finder = get_finder(sources=sources, pip_options=pip_options)
install_command = get_pip_command()
if pip_options is None:
pip_options = get_pip_options(sources=sources, pip_command=install_command)
dep.is_direct = True
requirements = None
setup_requires = {}
with temp_environ(), ExitStack() as stack:
if not wheel_cache:
wheel_cache = stack.enter_context(_get_wheel_cache())
with temp_environ():
os.environ["PIP_EXISTS_ACTION"] = "i"
if dep.editable and not dep.prepared and not dep.req:
setup_info = SetupInfo.from_ireq(dep)
@@ -490,7 +550,12 @@ def get_dependencies_from_index(dep, sources=None, pip_options=None, wheel_cache
setup_requires.update(results["setup_requires"])
requirements = set(results["requires"].values())
else:
results = shims.resolve(dep)
results = resolve(
dep,
sources=sources,
install_command=install_command,
pip_options=pip_options,
)
requirements = [v for v in results.values() if v.name != dep.name]
requirements = set([format_requirement(r) for r in requirements])
if not dep.editable and is_pinned_requirement(dep) and requirements is not None:
@@ -537,16 +602,14 @@ def get_finder(sources=None, pip_command=None, pip_options=None):
"""
if not pip_command:
pip_command = shims.InstallCommand()
pip_command = get_pip_command()
if not sources:
sources = [{"url": "https://pypi.org/simple", "name": "pypi", "verify_ssl": True}]
if not pip_options:
pip_options = get_pip_options(sources=sources, pip_command=pip_command)
session = pip_command._build_session(pip_options)
atexit.register(session.close)
finder = shims.get_package_finder(
shims.InstallCommand(), options=pip_options, session=session
)
finder = get_package_finder(get_pip_command(), options=pip_options, session=session)
return session, finder
@@ -576,41 +639,32 @@ def start_resolver(finder=None, session=None, wheel_cache=None):
_build_dir = create_tracked_tempdir(fs_str("build"))
_source_dir = create_tracked_tempdir(fs_str("source"))
pip_options.src_dir = _source_dir
try:
with ExitStack() as ctx:
ctx.enter_context(shims.global_tempdir_manager())
with global_tempdir_manager(), get_build_tracker() as build_tracker:
if not wheel_cache:
wheel_cache = ctx.enter_context(_get_wheel_cache())
_ensure_dir(fs_str(os.path.join(wheel_cache.cache_dir, "wheels")))
preparer = ctx.enter_context(
shims.make_preparer(
options=pip_options,
finder=finder,
session=session,
build_dir=_build_dir,
src_dir=_source_dir,
download_dir=download_dir,
wheel_download_dir=WHEEL_DOWNLOAD_DIR,
progress_bar="off",
build_isolation=False,
install_cmd=pip_command,
)
)
resolver = shims.get_resolver(
finder=finder,
ignore_dependencies=False,
ignore_requires_python=True,
preparer=preparer,
session=session,
wheel_cache = _get_wheel_cache()
_ensure_dir(str(os.path.join(wheel_cache.cache_dir, "wheels")))
preparer = pip_command.make_requirement_preparer(
temp_build_dir=_build_dir,
options=pip_options,
install_cmd=pip_command,
wheel_cache=wheel_cache,
force_reinstall=True,
ignore_installed=True,
upgrade_strategy="to-satisfy-only",
isolated=False,
build_tracker=build_tracker,
session=session,
finder=finder,
use_user_site=False,
)
resolver = pip_command.make_resolver(
preparer=preparer,
finder=finder,
options=pip_options,
wheel_cache=wheel_cache,
use_user_site=False,
ignore_installed=True,
ignore_requires_python=pip_options.ignore_requires_python,
force_reinstall=pip_options.force_reinstall,
upgrade_strategy="to-satisfy-only",
use_pep517=pip_options.use_pep517,
)
yield resolver
finally:
session.close()
+128 -129
View File
@@ -16,148 +16,147 @@ from pipenv.vendor.vistir.path import rmtree
logger = logging.getLogger(__name__)
try: # Required for pip>=22.1
from typing import Dict, Iterable, List, Optional
from typing import Dict, Iterable, List, Optional
from pipenv.patched.pip._internal.models.link import Link
from pipenv.patched.pip._internal.network.download import Downloader
from pipenv.patched.pip._internal.operations.prepare import (
File,
get_file_url,
get_http_url,
unpack_vcs_link,
)
from pipenv.patched.pip._internal.utils.hashes import Hashes
from pipenv.patched.pip._internal.utils.unpacking import unpack_file
from pipenv.patched.pip._internal.models.link import Link
from pipenv.patched.pip._internal.network.download import Downloader
from pipenv.patched.pip._internal.operations.prepare import (
File,
get_file_url,
get_http_url,
unpack_vcs_link,
)
from pipenv.patched.pip._internal.utils.hashes import Hashes
from pipenv.patched.pip._internal.utils.unpacking import unpack_file
def is_socket(path):
# type: (str) -> bool
return stat.S_ISSOCK(os.lstat(path).st_mode)
def copy2_fixed(src, dest):
# type: (str, str) -> None
"""Wrap shutil.copy2() but map errors copying socket files to
SpecialFileError as expected.
def is_socket(path):
# type: (str) -> bool
return stat.S_ISSOCK(os.lstat(path).st_mode)
See also https://bugs.python.org/issue37700.
"""
try:
shutil.copy2(src, dest)
except OSError:
for f in [src, dest]:
try:
is_socket_file = is_socket(f)
except OSError:
# An error has already occurred. Another error here is not
# a problem and we can ignore it.
pass
else:
if is_socket_file:
raise shutil.SpecialFileError(
"`{f}` is a socket".format(**locals())
)
raise
def copy2_fixed(src, dest):
# type: (str, str) -> None
"""Wrap shutil.copy2() but map errors copying socket files to
SpecialFileError as expected.
def _copy2_ignoring_special_files(src: str, dest: str) -> None:
"""Copying special files is not supported, but as a convenience to
users we skip errors copying them.
See also https://bugs.python.org/issue37700.
"""
try:
shutil.copy2(src, dest)
except OSError:
for f in [src, dest]:
try:
is_socket_file = is_socket(f)
except OSError:
# An error has already occurred. Another error here is not
# a problem and we can ignore it.
pass
else:
if is_socket_file:
raise shutil.SpecialFileError("`{f}` is a socket".format(**locals()))
This supports tools that may create e.g. socket files in the
project source directory.
"""
try:
copy2_fixed(src, dest)
except shutil.SpecialFileError as e:
# SpecialFileError may be raised due to either the source or
# destination. If the destination was the cause then we would actually
# care, but since the destination directory is deleted prior to
# copy we ignore all of them assuming it is caused by the source.
logger.warning(
"Ignoring special file error '%s' encountered copying %s to %s.",
str(e),
src,
dest,
)
raise
def _copy_source_tree(source: str, target: str) -> None:
target_abspath = os.path.abspath(target)
target_basename = os.path.basename(target_abspath)
target_dirname = os.path.dirname(target_abspath)
def ignore(d: str, names: List[str]) -> List[str]:
skipped: List[str] = []
if d == source:
# Pulling in those directories can potentially be very slow,
# exclude the following directories if they appear in the top
# level dir (and only it).
# See discussion at https://github.com/pypa/pip/pull/6770
skipped += [".tox", ".nox"]
if os.path.abspath(d) == target_dirname:
# Prevent an infinite recursion if the target is in source.
# This can happen when TMPDIR is set to ${PWD}/...
# and we copy PWD to TMPDIR.
skipped += [target_basename]
return skipped
def _copy2_ignoring_special_files(src: str, dest: str) -> None:
"""Copying special files is not supported, but as a convenience to users we
skip errors copying them.
shutil.copytree(
source,
target,
ignore=ignore,
symlinks=True,
copy_function=_copy2_ignoring_special_files,
This supports tools that may create e.g. socket files in the project
source directory.
"""
try:
copy2_fixed(src, dest)
except shutil.SpecialFileError as e:
# SpecialFileError may be raised due to either the source or
# destination. If the destination was the cause then we would actually
# care, but since the destination directory is deleted prior to
# copy we ignore all of them assuming it is caused by the source.
logger.warning(
"Ignoring special file error '%s' encountered copying %s to %s.",
str(e),
src,
dest,
)
def old_unpack_url(
link: Link,
location: str,
download: Downloader,
verbosity: int,
download_dir: Optional[str] = None,
hashes: Optional[Hashes] = None,
) -> Optional[File]:
"""Unpack link into location, downloading if required.
:param hashes: A Hashes object, one of whose embedded hashes must match,
or HashMismatch will be raised. If the Hashes is empty, no matches are
required, and unhashable types of requirements (like VCS ones, which
would ordinarily raise HashUnsupported) are allowed.
"""
# non-editable vcs urls
if link.is_vcs:
unpack_vcs_link(link, location, verbosity=verbosity)
return None
def _copy_source_tree(source: str, target: str) -> None:
target_abspath = os.path.abspath(target)
target_basename = os.path.basename(target_abspath)
target_dirname = os.path.dirname(target_abspath)
# Once out-of-tree-builds are no longer supported, could potentially
# replace the below condition with `assert not link.is_existing_dir`
# - unpack_url does not need to be called for in-tree-builds.
#
# As further cleanup, _copy_source_tree and accompanying tests can
# be removed.
#
# TODO when use-deprecated=out-of-tree-build is removed
if link.is_existing_dir():
if os.path.isdir(location):
rmtree(location)
_copy_source_tree(link.file_path, location)
return None
def ignore(d: str, names: List[str]) -> List[str]:
skipped: List[str] = []
if d == source:
# Pulling in those directories can potentially be very slow,
# exclude the following directories if they appear in the top
# level dir (and only it).
# See discussion at https://github.com/pypa/pip/pull/6770
skipped += [".tox", ".nox"]
if os.path.abspath(d) == target_dirname:
# Prevent an infinite recursion if the target is in source.
# This can happen when TMPDIR is set to ${PWD}/...
# and we copy PWD to TMPDIR.
skipped += [target_basename]
return skipped
# file urls
if link.is_file:
file = get_file_url(link, download_dir, hashes=hashes)
# http urls
else:
file = get_http_url(
link,
download,
download_dir,
hashes=hashes,
)
# unpack the archive to the build dir location. even when only downloading
# archives, they have to be unpacked to parse dependencies, except wheels
if not link.is_wheel:
unpack_file(file.path, location, file.content_type)
return file
shutil.copytree(
source,
target,
ignore=ignore,
symlinks=True,
copy_function=_copy2_ignoring_special_files,
)
except ImportError:
raise
def old_unpack_url(
link: Link,
location: str,
download: Downloader,
verbosity: int,
download_dir: Optional[str] = None,
hashes: Optional[Hashes] = None,
) -> Optional[File]:
"""Unpack link into location, downloading if required.
:param hashes: A Hashes object, one of whose embedded hashes must match,
or HashMismatch will be raised. If the Hashes is empty, no matches are
required, and unhashable types of requirements (like VCS ones, which
would ordinarily raise HashUnsupported) are allowed.
"""
# non-editable vcs urls
if link.is_vcs:
unpack_vcs_link(link, location, verbosity=verbosity)
return None
# Once out-of-tree-builds are no longer supported, could potentially
# replace the below condition with `assert not link.is_existing_dir`
# - unpack_url does not need to be called for in-tree-builds.
#
# As further cleanup, _copy_source_tree and accompanying tests can
# be removed.
#
# TODO when use-deprecated=out-of-tree-build is removed
if link.is_existing_dir():
if os.path.isdir(location):
rmtree(location)
_copy_source_tree(link.file_path, location)
return None
# file urls
if link.is_file:
file = get_file_url(link, download_dir, hashes=hashes)
# http urls
else:
file = get_http_url(
link,
download,
download_dir,
hashes=hashes,
)
# unpack the archive to the build dir location. even when only downloading
# archives, they have to be unpacked to parse dependencies, except wheels
if not link.is_wheel:
unpack_file(file.path, location, file.content_type)
return file
+71 -79
View File
@@ -1,7 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
import collections
import copy
import os
@@ -14,7 +10,6 @@ from urllib import parse as urllib_parse
from urllib.parse import unquote
import pipenv.vendor.attr as attr
import pipenv.vendor.pip_shims as pip_shims
from pipenv.vendor.pyparsing.core import cached_property
from pipenv.patched.pip._vendor.packaging.markers import Marker
from pipenv.patched.pip._vendor.packaging.requirements import Requirement as PackagingRequirement
@@ -25,6 +20,17 @@ from pipenv.patched.pip._vendor.packaging.specifiers import (
SpecifierSet,
)
from pipenv.patched.pip._vendor.packaging.utils import canonicalize_name
from pipenv.patched.pip._vendor.packaging.version import parse
from pipenv.patched.pip._internal.models.link import Link
from pipenv.patched.pip._internal.models.wheel import Wheel
from pipenv.patched.pip._internal.req.constructors import (
_strip_extras,
install_req_from_editable,
install_req_from_line,
)
from pipenv.patched.pip._internal.req.req_install import InstallRequirement
from pipenv.patched.pip._internal.utils.temp_dir import global_tempdir_manager
from pipenv.patched.pip._internal.utils.urls import path_to_url, url_to_path
from pipenv.vendor.vistir.contextmanagers import temp_path
from pipenv.vendor.vistir.misc import dedup
from pipenv.vendor.vistir.path import (
@@ -47,6 +53,7 @@ from ..utils import (
is_vcs,
strip_ssh_from_git_uri,
)
from .dependencies import AbstractDependency, get_abstract_dependencies, get_dependencies
from .markers import normalize_marker_str
from .setup_info import (
SetupInfo,
@@ -100,12 +107,8 @@ if MYPY_RUNNING:
Union,
)
from pipenv.vendor.pip_shims.shims import (
InstallationCandidate,
InstallRequirement,
Link,
PackageFinder,
)
from pipenv.patched.pip._internal.index.package_finder import PackageFinder
from pipenv.patched.pip._internal.models.candidate import InstallationCandidate
RequirementType = TypeVar(
"RequirementType", covariant=True, bound=PackagingRequirement
@@ -113,7 +116,6 @@ if MYPY_RUNNING:
F = TypeVar("F", "FileRequirement", "VCSRequirement", covariant=True)
from urllib.parse import SplitResult
from .dependencies import AbstractDependency
from .vcs import VCSRepository
NON_STRING_ITERABLE = Union[List, Set, Tuple]
@@ -330,7 +332,7 @@ class Line(object):
if not line:
if self.is_path or self.is_file:
if not self.path and self.url is not None:
line = pip_shims.shims.url_to_path(self.url)
line = url_to_path(self.url)
else:
line = self.path
if self.extras:
@@ -572,14 +574,14 @@ class Line(object):
strip_ssh=self.parsed_url.is_implicit_ssh,
)
except ValueError:
self.line, extras = pip_shims.shims._strip_extras(self.line)
self.line, extras = _strip_extras(self.line)
else:
self.line, extras = pip_shims.shims._strip_extras(self.line)
self.line, extras = _strip_extras(self.line)
extras_set = set() # type: Set[STRING_TYPE]
if extras is not None:
extras_set = set(parse_extras(extras))
if self._name:
self._name, name_extras = pip_shims.shims._strip_extras(self._name)
self._name, name_extras = _strip_extras(self._name)
if name_extras:
name_extras = set(parse_extras(name_extras))
extras_set |= name_extras
@@ -660,7 +662,7 @@ class Line(object):
if self.link is None:
return False
return not self.link.is_vcs
return getattr(self.link, "is_vcs", False)
@property
def is_vcs(self):
@@ -767,7 +769,7 @@ class Line(object):
@property
def ireq(self):
# type: () -> Optional[pip_shims.InstallRequirement]
# type: () -> Optional[InstallRequirement]
if self._ireq is None:
self.parse_ireq()
return self._ireq
@@ -791,7 +793,7 @@ class Line(object):
def get_setup_info(self):
# type: () -> SetupInfo
setup_info = None
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
setup_info = SetupInfo.from_ireq(self.ireq, subdir=self.subdirectory)
if not setup_info.name:
setup_info.get_info()
@@ -897,7 +899,7 @@ class Line(object):
ireq = self.ireq
wheel_kwargs = self.wheel_kwargs.copy()
wheel_kwargs["src_dir"] = repo.checkout_directory
with pip_shims.shims.global_tempdir_manager(), temp_path():
with global_tempdir_manager(), temp_path():
ireq.ensure_has_source_dir(wheel_kwargs["src_dir"])
sys.path = [repo.checkout_directory, "", ".", get_python_lib(plat_specific=0)]
setupinfo = SetupInfo.create(
@@ -913,13 +915,13 @@ class Line(object):
# type: () -> InstallRequirement
line = self.line_for_ireq
if self.editable:
ireq = pip_shims.shims.install_req_from_editable(line)
ireq = install_req_from_editable(line)
else:
ireq = pip_shims.shims.install_req_from_line(line)
ireq = install_req_from_line(line)
if self.is_named:
ireq = pip_shims.shims.install_req_from_line(self.line)
ireq = install_req_from_line(self.line)
if self.is_file or self.is_remote_url:
ireq.link = pip_shims.shims.Link(expand_env_variables(self.link.url))
ireq.link = Link(expand_env_variables(self.link.url))
if self.extras and not ireq.extras:
ireq.extras = set(self.extras)
if self.parsed_marker is not None and not ireq.markers:
@@ -939,9 +941,7 @@ class Line(object):
def _parse_wheel(self):
# type: () -> Optional[STRING_TYPE]
if not self.is_wheel:
pass
from pipenv.vendor.pip_shims.shims import Wheel
return
_wheel = Wheel(self.link.filename)
name = _wheel.name
version = _wheel.version
@@ -1029,7 +1029,7 @@ class Line(object):
if self.is_local:
name = self._parse_name_from_path()
if name is not None:
name, extras = pip_shims.shims._strip_extras(name)
name, extras = _strip_extras(name)
if extras is not None and not self.extras:
self.extras = tuple(sorted(set(parse_extras(extras))))
self._name = name
@@ -1066,7 +1066,7 @@ class Line(object):
if self.ref and self._requirement is not None:
self._requirement.revision = self.ref
if self._vcsrepo is not None:
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
self._requirement.revision = self._vcsrepo.get_commit_hash()
return self._requirement
@@ -1127,7 +1127,7 @@ class Line(object):
or (os.path.exists(self.line) or os.path.isabs(self.line))
)
):
url = pip_shims.shims.path_to_url(os.path.abspath(self.line))
url = path_to_url(os.path.abspath(self.line))
self._parsed_url = parsed_url = URI.parse(url)
elif any(
[
@@ -1524,14 +1524,12 @@ class FileRequirement(object):
if parsed_url.scheme == "file" and parsed_url.path:
# This is a "file://" URI. Use url_to_path and path_to_url to
# ensure the path is absolute. Also we need to build relpath.
path = Path(
pip_shims.shims.url_to_path(urllib_parse.urlunsplit(parsed_url))
).as_posix()
path = Path(url_to_path(urllib_parse.urlunsplit(parsed_url))).as_posix()
try:
relpath = get_converted_relative_path(path)
except ValueError:
relpath = None
uri = pip_shims.shims.path_to_url(path)
uri = path_to_url(path)
else:
# This is a remote URI. Simply use it.
path = None
@@ -1615,20 +1613,20 @@ class FileRequirement(object):
self._parsed_line._setup_info
and not self._parsed_line._setup_info.name
):
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
self._parsed_line._setup_info.get_info()
self._setup_info = self.parsed_line._setup_info
elif self.parsed_line and (
self.parsed_line.ireq and not self.parsed_line.is_wheel
):
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
self._setup_info = SetupInfo.from_ireq(
self.parsed_line.ireq, subdir=self.subdirectory
)
else:
if self.link and not self.link.is_wheel:
self._setup_info = Line(self.line_part).setup_info
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
self._setup_info.get_info()
return self._setup_info
@@ -1644,7 +1642,7 @@ class FileRequirement(object):
# type: () -> STRING_TYPE
if self.path and not self.uri:
self._uri_scheme = "path"
return pip_shims.shims.path_to_url(os.path.abspath(self.path))
return path_to_url(os.path.abspath(self.path))
elif (
getattr(self, "req", None)
and self.req is not None
@@ -1666,8 +1664,7 @@ class FileRequirement(object):
return self.setup_info.name
@link.default
def get_link(self):
# type: () -> pip_shims.shims.Link
def get_link(self) -> Link:
target = "{0}".format(self.uri)
if hasattr(self, "name") and not self._has_hashed_name:
target = "{0}#egg={1}".format(target, self.name)
@@ -1676,7 +1673,7 @@ class FileRequirement(object):
@req.default
def get_requirement(self):
# type: () -> RequirementType
# type () -> RequirementType
if self.name is None:
if self._parsed_line is not None and self._parsed_line.name is not None:
self.name = self._parsed_line.name
@@ -1709,7 +1706,7 @@ class FileRequirement(object):
uri = getattr(self, "uri", None)
if uri is None:
if getattr(self, "path", None) and self.path is not None:
uri = pip_shims.shims.path_to_url(os.path.abspath(self.path))
uri = path_to_url(os.path.abspath(self.path))
elif (
getattr(self, "req", None)
and self.req is not None
@@ -1788,7 +1785,7 @@ class FileRequirement(object):
uri_scheme = "file"
if not uri:
uri = pip_shims.shims.path_to_url(path)
uri = path_to_url(path)
link_info = None # type: Optional[LinkInfo]
if uri and isinstance(uri, str):
link_info = cls.get_link_from_line(uri)
@@ -1849,7 +1846,7 @@ class FileRequirement(object):
seed = None # type: Optional[STRING_TYPE]
if self.link is not None:
link_url = self.link.url_without_fragment
is_vcs = getattr(self.link, "is_vcs", not self.link.is_artifact)
is_vcs = getattr(self.link, "is_vcs", False)
if self._uri_scheme and self._uri_scheme == "path":
# We may need any one of these for passing to pip
seed = self.path or link_url or self.uri
@@ -1898,7 +1895,7 @@ class FileRequirement(object):
key_match = next(iter(k for k in collision_order if k in pipfile_dict.keys()))
is_vcs = None
if self.link is not None:
is_vcs = getattr(self.link, "is_vcs", not self.link.is_artifact)
is_vcs = getattr(self.link, "is_vcs", False)
if self._uri_scheme:
dict_key = self._uri_scheme
target_key = dict_key if dict_key in pipfile_dict else key_match
@@ -1950,14 +1947,14 @@ class VCSRequirement(FileRequirement):
_repo = attr.ib(default=None) # type: Optional[VCSRepository]
_base_line = attr.ib(default=None) # type: Optional[STRING_TYPE]
name = attr.ib() # type: STRING_TYPE
link = attr.ib() # type: Optional[pip_shims.shims.Link]
link = attr.ib() # type: Optional[Link]
req = attr.ib() # type: Optional[RequirementType]
def __attrs_post_init__(self):
# type: () -> None
if not self.uri:
if self.path:
self.uri = pip_shims.shims.path_to_url(self.path)
self.uri = path_to_url(self.path)
if self.uri is not None:
split = urllib_parse.urlsplit(self.uri)
scheme, rest = split[0], split[1:]
@@ -1979,9 +1976,8 @@ class VCSRequirement(FileRequirement):
raise ValueError("No valid url found for requirement {0!r}".format(self))
@link.default
def get_link(self):
# type: () -> pip_shims.shims.Link
uri = self.uri if self.uri else pip_shims.shims.path_to_url(self.path)
def get_link(self) -> Link:
uri = self.uri if self.uri else path_to_url(self.path)
vcs_uri = build_vcs_uri(
self.vcs,
add_ssh_scheme_to_git_uri(uri),
@@ -2014,12 +2010,12 @@ class VCSRequirement(FileRequirement):
def setup_info(self):
if self._parsed_line and self._parsed_line.setup_info:
if not self._parsed_line.setup_info.name:
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
self._parsed_line._setup_info.get_info()
return self._parsed_line.setup_info
subdir = self.subdirectory or self.parsed_line.subdirectory
if self._repo:
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
self._setup_info = SetupInfo.from_ireq(
Line(self._repo.checkout_directory).ireq, subdir=subdir
)
@@ -2027,7 +2023,7 @@ class VCSRequirement(FileRequirement):
return self._setup_info
ireq = self.parsed_line.ireq
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
self._setup_info = SetupInfo.from_ireq(ireq, subdir=subdir)
return self._setup_info
@@ -2124,7 +2120,7 @@ class VCSRequirement(FileRequirement):
if self.is_local:
path = self.path
if not path:
path = pip_shims.shims.url_to_path(self.uri)
path = url_to_path(self.uri)
if path and os.path.exists(path):
checkout_dir = os.path.abspath(path)
return checkout_dir
@@ -2169,7 +2165,7 @@ class VCSRequirement(FileRequirement):
def get_commit_hash(self):
# type: () -> STRING_TYPE
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
hash_ = self.repo.get_commit_hash()
return hash_
@@ -2196,7 +2192,7 @@ class VCSRequirement(FileRequirement):
self.req = self.parsed_line.requirement
else:
self.req = self.get_requirement()
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
revision = self.req.revision = vcsrepo.get_commit_hash()
# Remove potential ref in the end of uri after ref is parsed
@@ -2274,7 +2270,7 @@ class VCSRequirement(FileRequirement):
else:
creation_args["path"] = target
if os.path.isabs(target):
creation_args["uri"] = pip_shims.shims.path_to_url(target)
creation_args["uri"] = path_to_url(target)
elif key in pipfile_keys:
creation_args[key] = pipfile[key]
creation_args["name"] = name
@@ -2343,7 +2339,7 @@ class VCSRequirement(FileRequirement):
@property
def pipfile_part(self):
# type: () -> Dict[S, Dict[S, Union[List[S], S, bool, RequirementType, pip_shims.shims.Link]]]
# type: () -> Dict[S, Dict[S, Union[List[S], S, bool, RequirementType, Link]]]
excludes = [
"_repo",
"_base_line",
@@ -2368,7 +2364,7 @@ class VCSRequirement(FileRequirement):
name = self.name = self.setup_info.name
if "vcs" in pipfile_dict:
pipfile_dict = self._choose_vcs_source(pipfile_dict)
name, _ = pip_shims.shims._strip_extras(name)
name, _ = _strip_extras(name)
return {name: pipfile_dict} # type: ignore
@@ -2400,7 +2396,7 @@ class Requirement(object):
_line_instance = attr.ib(default=None, eq=False, order=False) # type: Optional[Line]
_ireq = attr.ib(
default=None, eq=False, order=False
) # type: Optional[pip_shims.InstallRequirement]
) # type: Optional[InstallRequirement]
def __hash__(self):
return hash(self.as_line())
@@ -2521,13 +2517,18 @@ class Requirement(object):
def get_line_instance(self):
# type: () -> Line
line_parts = []
local_editable = False
if self.req:
if self.req.line_part.startswith("-e "):
local_editable = True
line_parts.extend(self.req.line_part.split(" ", 1))
else:
line_parts.append(self.req.line_part)
if not self.is_vcs and not self.vcs and self.extras_as_pip:
line_parts.append(self.extras_as_pip)
if self.is_file_or_url and not local_editable:
line_parts.append(f"#egg={self.extras_as_pip}")
else:
line_parts.append(self.extras_as_pip)
if self._specifiers and not (self.is_file_or_url or self.is_vcs):
line_parts.append(self._specifiers)
if self.markers:
@@ -2623,7 +2624,7 @@ class Requirement(object):
if self.req is not None and (
not isinstance(self.req, NamedRequirement) and self.req.is_local
):
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
setup_info = self.run_requires()
build_backend = setup_info.get("build_backend")
return build_backend
@@ -2669,7 +2670,7 @@ class Requirement(object):
@lru_cache()
def from_line(cls, line):
# type: (AnyStr) -> Requirement
if isinstance(line, pip_shims.shims.InstallRequirement):
if isinstance(line, InstallRequirement):
line = format_requirement(line)
parsed_line = Line(line)
r = (
@@ -2828,7 +2829,7 @@ class Requirement(object):
return LegacySpecifier(self.specifiers)
def get_version(self):
return pip_shims.shims.parse_version(self.get_specifier().version)
return parse(self.get_specifier().version)
def get_requirement(self):
req_line = self.req.req.line
@@ -2937,7 +2938,7 @@ class Requirement(object):
def ireq(self):
return self.as_ireq()
def get_dependencies(self, sources=None):
def dependencies(self, sources=None):
"""Retrieve the dependencies of the current requirement.
Retrieves dependencies of the current requirement. This only works on pinned
@@ -2948,16 +2949,13 @@ class Requirement(object):
:return: A set of requirement strings of the dependencies of this requirement.
:rtype: set(str)
"""
from .dependencies import get_dependencies
if not sources:
sources = [
{"name": "pypi", "url": "https://pypi.org/simple", "verify_ssl": True}
]
return get_dependencies(self.as_ireq(), sources=sources)
def get_abstract_dependencies(self, sources=None):
def abstract_dependencies(self, sources=None):
"""Retrieve the abstract dependencies of this requirement.
Returns the abstract dependencies of the current requirement in order to resolve.
@@ -2968,12 +2966,6 @@ class Requirement(object):
:rtype: list[ :class:`~requirementslib.models.dependency.AbstractDependency` ]
"""
from .dependencies import (
AbstractDependency,
get_abstract_dependencies,
get_dependencies,
)
if not self.abstract_dep:
parent = getattr(self, "parent", None)
self.abstract_dep = AbstractDependency.from_requirement(self, parent=parent)
@@ -2982,11 +2974,11 @@ class Requirement(object):
{"url": "https://pypi.org/simple", "name": "pypi", "verify_ssl": True}
]
if is_pinned_requirement(self.ireq):
deps = self.get_dependencies()
deps = self.dependencies()
else:
ireq = sorted(self.find_all_matches(), key=lambda k: k.version)
deps = get_dependencies(ireq.pop(), sources=sources)
return get_abstract_dependencies(deps, sources=sources, parent=self.abstract_dep)
deps = get_dependencies(ireq.pop())
return get_abstract_dependencies(deps, parent=self.abstract_dep)
def find_all_matches(self, sources=None, finder=None):
# type: (Optional[List[Dict[S, Union[S, bool]]]], Optional[PackageFinder]) -> List[InstallationCandidate]
@@ -3018,7 +3010,7 @@ class Requirement(object):
from .dependencies import get_finder
finder = get_finder(sources=sources)
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
info = SetupInfo.from_requirement(self, finder=finder)
if info is None:
return {}
+2 -4
View File
@@ -1,8 +1,8 @@
# -*- coding=utf-8 -*-
from contextlib import contextmanager
import pipenv.vendor.attr as attr
from pipenv.vendor.pip_shims.shims import Wheel
from pipenv.patched.pip._internal.models.wheel import Wheel
from pipenv.patched.pip._internal.vcs.versioncontrol import VcsSupport
from .cache import HashCache
from .utils import format_requirement, is_pinned_requirement, version_from_ireq
@@ -199,8 +199,6 @@ class DependencyResolver(object):
if ireq.editable:
return set()
from pipenv.vendor.pip_shims import VcsSupport
vcs = VcsSupport()
if (
ireq.link
+30 -64
View File
@@ -1,6 +1,3 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
import ast
import atexit
import configparser
@@ -23,8 +20,10 @@ from pipenv.vendor.distlib.wheel import Wheel
from pipenv.patched.pip._vendor.packaging.markers import Marker
from pipenv.patched.pip._vendor.packaging.specifiers import SpecifierSet
from pipenv.patched.pip._vendor.packaging.version import parse
from pipenv.vendor.pip_shims import shims
from pipenv.vendor.pip_shims.utils import call_function_with_correct_args
from pipenv.patched.pip._internal.commands.install import InstallCommand
from pipenv.patched.pip._internal.network.download import Downloader
from pipenv.patched.pip._internal.utils.temp_dir import global_tempdir_manager
from pipenv.patched.pip._internal.utils.urls import url_to_path
from pipenv.vendor.platformdirs import user_cache_dir
from pipenv.vendor.vistir.contextmanagers import cd, temp_path
from pipenv.vendor.vistir.misc import run
@@ -32,6 +31,8 @@ from pipenv.vendor.vistir.path import create_tracked_tempdir, ensure_mkdir_p, mk
from ..environment import MYPY_RUNNING
from ..exceptions import RequirementError
from ..utils import get_pip_command
from .old_pip_utils import old_unpack_url
from .utils import (
get_default_pyproject_backend,
get_name_variants,
@@ -46,12 +47,6 @@ try:
except ModuleNotFoundError:
pkg_resources_requirements = None
try:
from setuptools.dist import Distribution, distutils
except ImportError:
import distutils
from distutils.core import Distribution
from contextlib import ExitStack
from os import scandir
@@ -73,10 +68,16 @@ if MYPY_RUNNING:
import pipenv.patched.pip._vendor.requests as requests
from pipenv.patched.pip._vendor.packaging.requirements import Requirement as PackagingRequirement
from pipenv.vendor.pip_shims.shims import InstallRequirement, PackageFinder
from pipenv.patched.pip._internal.index.package_finder import PackageFinder
from pipenv.patched.pip._internal.req.req_install import InstallRequirement
from pkg_resources import DistInfoDistribution, EggInfoDistribution, PathMetadata
from pkg_resources import Requirement as PkgResourcesRequirement
try:
from setuptools.dist import Distribution
except ImportError:
from distutils.core import Distribution
TRequirement = TypeVar("TRequirement")
RequirementType = TypeVar(
"RequirementType", covariant=True, bound=PackagingRequirement
@@ -546,26 +547,6 @@ def parse_setup_cfg(path: str) -> "Dict[str, Any]":
return SetupReader.read_setup_cfg(Path(path))
@contextlib.contextmanager
def _suppress_distutils_logs():
# type: () -> Generator[None, None, None]
"""Hack to hide noise generated by `setup.py develop`.
There isn't a good way to suppress them now, so let's monky-patch.
See https://bugs.python.org/issue25392.
"""
f = distutils.log.Log._log
def _log(log, level, msg, args):
if level >= distutils.log.ERROR:
f(log, level, msg, args)
distutils.log.Log._log = _log
yield
distutils.log.Log._log = f
def build_pep517(source_dir, build_dir, config_settings=None, dist_type="wheel"):
if config_settings is None:
config_settings = {}
@@ -902,7 +883,7 @@ def run_setup(script_path, egg_base=None):
target_cwd = os.path.dirname(os.path.abspath(script_path))
if egg_base is None:
egg_base = os.path.join(target_cwd, "reqlib-metadata")
with temp_path(), cd(target_cwd), _suppress_distutils_logs():
with temp_path(), cd(target_cwd):
# This is for you, Hynek
# see https://github.com/hynek/environ_config/blob/69b1c8a/setup.py
args = ["egg_info"]
@@ -1510,10 +1491,10 @@ build-backend = "{1}"
return None
stack = ExitStack()
if not session:
cmd = shims.InstallCommand()
cmd = get_pip_command()
options, _ = cmd.parser.parse_args([])
session = cmd._build_session(options)
stack.enter_context(shims.global_tempdir_manager())
stack.enter_context(global_tempdir_manager())
vcs, uri = split_vcs_method_from_uri(ireq.link.url_without_fragment)
parsed = urlparse(uri)
if "file" in parsed.scheme:
@@ -1528,19 +1509,17 @@ build-backend = "{1}"
is_file = True
if "file:/" in uri and "file:///" not in uri:
uri = uri.replace("file:/", "file:///")
path = shims.url_to_path(uri)
path = url_to_path(uri)
kwargs = _prepare_wheel_building_kwargs(ireq)
is_artifact_or_vcs = getattr(
ireq.link, "is_vcs", getattr(ireq.link, "is_artifact", False)
)
is_vcs = True if vcs else is_artifact_or_vcs
download_dir = None
if not (ireq.editable and is_file and is_vcs):
if ireq.is_wheel:
only_download = True
download_dir = kwargs["wheel_download_dir"]
else:
only_download = False
download_dir = kwargs["download_dir"]
elif path is not None and os.path.isdir(path):
raise RequirementError(
@@ -1561,32 +1540,19 @@ build-backend = "{1}"
"autodelete": False,
"parallel_builds": True,
}
call_function_with_correct_args(build_location_func, **build_kwargs)
build_location_func(**build_kwargs)
ireq.ensure_has_source_dir(kwargs["src_dir"])
try: # Support for pip >= 21.1
from pipenv.patched.pip._internal.network.download import Downloader
from pipenv.vendor.requirementslib.models.old_pip_utils import old_unpack_url
location = None
if getattr(ireq, "source_dir", None):
location = ireq.source_dir
old_unpack_url(
link=ireq.link,
location=location,
download=Downloader(session, "off"),
verbosity=1,
download_dir=download_dir,
hashes=ireq.hashes(True),
)
except ImportError:
shims.shim_unpack(
download_dir=download_dir,
ireq=ireq,
only_download=only_download,
session=session,
hashes=ireq.hashes(False),
)
location = None
if getattr(ireq, "source_dir", None):
location = ireq.source_dir
old_unpack_url(
link=ireq.link,
location=location,
download=Downloader(session, "off"),
verbosity=1,
download_dir=download_dir,
hashes=ireq.hashes(True),
)
created = cls.create(
ireq.source_dir, subdirectory=subdir, ireq=ireq, kwargs=kwargs, stack=stack
)
+6 -9
View File
@@ -7,7 +7,8 @@ from urllib.parse import unquote_plus
import pipenv.vendor.attr as attr
from pipenv.vendor.orderedmultidict import omdict
from pipenv.vendor.pip_shims import shims
from pipenv.patched.pip._internal.models.link import Link
from pipenv.patched.pip._internal.req.constructors import _strip_extras
from pipenv.patched.pip._vendor.urllib3.util import parse_url as urllib3_parse
from pipenv.patched.pip._vendor.urllib3.util.url import Url
@@ -18,8 +19,6 @@ from .utils import extras_to_string, parse_extras
if MYPY_RUNNING:
from typing import Dict, Optional, Text, Tuple, TypeVar, Union
from shims import Link
_T = TypeVar("_T")
STRING_TYPE = Union[bytes, str, Text]
S = TypeVar("S", bytes, str, Text)
@@ -139,7 +138,7 @@ class URI(object):
if key == "egg":
from .utils import parse_extras
name, stripped_extras = shims._strip_extras(val)
name, stripped_extras = _strip_extras(val)
if stripped_extras:
extras = tuple(parse_extras(stripped_extras))
elif key == "subdirectory":
@@ -370,9 +369,7 @@ class URI(object):
@property
def as_link(self):
# type: () -> Link
link = shims.Link(
self.to_string(escape_password=False, strip_ssh=False, direct=False)
)
link = Link(self.to_string(escape_password=False, strip_ssh=False, direct=False))
return link
@property
@@ -480,14 +477,14 @@ def update_url_name_and_fragment(name_with_extras, ref, parsed_dict):
if name_with_extras:
fragment = "" # type: Optional[str]
parsed_extras = ()
name, extras = shims._strip_extras(name_with_extras)
name, extras = _strip_extras(name_with_extras)
if extras:
parsed_extras = parsed_extras + tuple(parse_extras(extras))
if parsed_dict["fragment"] is not None:
fragment = "{0}".format(parsed_dict["fragment"])
if fragment.startswith("egg="):
_, _, fragment_part = fragment.partition("=")
fragment_name, fragment_extras = shims._strip_extras(fragment_part)
fragment_name, fragment_extras = _strip_extras(fragment_part)
name = name if name else fragment_name
if fragment_extras:
parsed_extras = parsed_extras + tuple(parse_extras(fragment_extras))
+6 -23
View File
@@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
import io
import os
import re
@@ -16,6 +13,8 @@ from pipenv.vendor.attr import validators
from pipenv.patched.pip._vendor.packaging.markers import InvalidMarker, Marker, Op, Value, Variable
from pipenv.patched.pip._vendor.packaging.specifiers import InvalidSpecifier, Specifier, SpecifierSet
from pipenv.patched.pip._vendor.packaging.version import parse as parse_version
from pipenv.patched.pip._internal.models.link import Link
from pipenv.patched.pip._internal.req.constructors import install_req_from_line
from pipenv.vendor.plette.models import Package, PackageCollection
from pipenv.vendor.tomlkit.container import Container
from pipenv.vendor.tomlkit.items import AoT, Array, Bool, InlineTable, Item, String, Table
@@ -50,7 +49,6 @@ if MYPY_RUNNING:
from pipenv.patched.pip._vendor.packaging.markers import Value as PkgResourcesValue
from pipenv.patched.pip._vendor.packaging.markers import Variable as PkgResourcesVariable
from pipenv.patched.pip._vendor.packaging.requirements import Requirement as PackagingRequirement
from pipenv.vendor.pip_shims.shims import Link
from pkg_resources import Requirement as PkgResourcesRequirement
from pipenv.patched.pip._vendor.urllib3.util.url import Url
@@ -115,7 +113,6 @@ def create_link(link):
if not isinstance(link, str):
raise TypeError("must provide a string to instantiate a new link")
from pipenv.vendor.pip_shims.shims import Link # noqa: F811
return Link(link)
@@ -309,11 +306,11 @@ def convert_direct_url_to_url(direct_url):
# type: (AnyStr) -> AnyStr
"""Converts direct URLs to standard, link-style URLs.
Given a direct url as defined by *PEP 508*, convert to a :class:`~pip_shims.shims.Link`
Given a direct url as defined by *PEP 508*, convert to a :class:`Link`
compatible URL by moving the name and extras into an **egg_fragment**.
:param str direct_url: A pep-508 compliant direct url.
:return: A reformatted URL for use with Link objects and :class:`~pip_shims.shims.InstallRequirement` objects.
:return: A reformatted URL for use with Link objects and :class:`InstallRequirement` objects.
:rtype: AnyStr
"""
direct_match = DIRECT_URL_RE.match(direct_url) # type: Optional[Match]
@@ -350,10 +347,10 @@ def convert_url_to_direct_url(url, name=None):
# type: (AnyStr, Optional[AnyStr]) -> AnyStr
"""Converts normal link-style URLs to direct urls.
Given a :class:`~pip_shims.shims.Link` compatible URL, convert to a direct url as
Given a :class:`Link` compatible URL, convert to a direct url as
defined by *PEP 508* by extracting the name and extras from the **egg_fragment**.
:param AnyStr url: A :class:`~pip_shims.shims.InstallRequirement` compliant URL.
:param AnyStr url: A :class:`InstallRequirement` compliant URL.
:param Optiona[AnyStr] name: A name to use in case the supplied URL doesn't provide one.
:return: A pep-508 compliant direct url.
:rtype: AnyStr
@@ -871,11 +868,6 @@ def make_install_requirement(
:return: A generated InstallRequirement
:rtype: :class:`~pipenv.patched.pip._internal.req.req_install.InstallRequirement`
"""
# If no extras are specified, the extras string is blank
from pipenv.vendor.pip_shims.shims import install_req_from_line
extras_string = ""
requirement_string = "{0}".format(name)
if extras:
# Sort extras for stability
@@ -1026,12 +1018,3 @@ def expand_env_variables(line):
return value if value else match.group()
return re.sub(r"\$\{([A-Z0-9_]+)\}", replace_with_env, line)
SETUPTOOLS_SHIM = (
"import setuptools, tokenize;__file__=%r;"
"f=getattr(tokenize, 'open', open)(__file__);"
"code=f.read().replace('\\r\\n', '\\n');"
"f.close();"
"exec(compile(code, __file__, 'exec'))"
)
+12 -46
View File
@@ -1,12 +1,10 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
import importlib
import os
import sys
import pipenv.vendor.attr as attr
import pipenv.vendor.pip_shims as pip_shims
from pipenv.patched.pip._internal.utils.temp_dir import global_tempdir_manager
from pipenv.patched.pip._internal.vcs.versioncontrol import VcsSupport
from ..environment import MYPY_RUNNING
from .url import URI
@@ -41,7 +39,6 @@ class VCSRepository(object):
default_run_args = self.monkeypatch_pip()
else:
default_run_args = self.DEFAULT_RUN_ARGS
from pipenv.vendor.pip_shims.shims import VcsSupport
VCS_SUPPORT = VcsSupport()
backend = VCS_SUPPORT.get_backend(self.vcs_type)
@@ -58,29 +55,13 @@ class VCSRepository(object):
url = url.split("+")[1]
return url.startswith("file")
def obtain(self, verbosity=1):
# type: () -> None
lt_pip_19_2 = (
pip_shims.parsed_pip_version.parsed_version < pip_shims.parse_version("19.2")
)
gte_pip_22_0 = (
pip_shims.parsed_pip_version.parsed_version >= pip_shims.parse_version("22.0")
)
if lt_pip_19_2:
self.repo_backend = self.repo_backend(self.url)
def obtain(self, verbosity=1) -> None:
if os.path.exists(
self.checkout_directory
) and not self.repo_backend.is_repository_directory(self.checkout_directory):
self.repo_backend.unpack(self.checkout_directory)
elif not os.path.exists(self.checkout_directory):
if lt_pip_19_2:
self.repo_backend.obtain(self.checkout_directory)
elif gte_pip_22_0:
self.repo_backend.obtain(
self.checkout_directory, self.parsed_url, verbosity
)
else: # at least Pip 19.2 but not quite pip 22.x
self.repo_backend.obtain(self.checkout_directory, self.parsed_url)
self.repo_backend.obtain(self.checkout_directory, self.parsed_url, verbosity)
else:
if self.ref:
self.checkout_ref(self.ref)
@@ -102,39 +83,24 @@ class VCSRepository(object):
def update(self, ref):
# type: (str) -> None
target_ref = self.repo_backend.make_rev_options(ref)
if pip_shims.parse_version(pip_shims.pip_version) > pip_shims.parse_version(
"18.0"
):
self.repo_backend.update(self.checkout_directory, self.url, target_ref)
else:
self.repo_backend.update(self.checkout_directory, target_ref)
self.repo_backend.update(self.checkout_directory, self.url, target_ref)
self.commit_sha = self.get_commit_hash()
def get_commit_hash(self, ref=None):
# type: (Optional[str]) -> str
with pip_shims.shims.global_tempdir_manager():
with global_tempdir_manager():
return self.repo_backend.get_revision(self.checkout_directory)
@classmethod
def monkeypatch_pip(cls):
# type: () -> Tuple[Any, ...]
from pipenv.vendor.pip_shims.compat import get_allowed_args
target_module = pip_shims.shims.VcsSupport.__module__
target_module = VcsSupport.__module__
pip_vcs = importlib.import_module(target_module)
args, kwargs = get_allowed_args(pip_vcs.VersionControl.run_command)
run_command_defaults = pip_vcs.VersionControl.run_command.__defaults__
if "show_stdout" not in args and "show_stdout" not in kwargs:
new_defaults = run_command_defaults
else:
# set the default to not write stdout, the first option sets this value
new_defaults = [False] + list(run_command_defaults)[1:]
new_defaults = tuple(new_defaults)
try:
pip_vcs.VersionControl.run_command.__defaults__ = new_defaults
except AttributeError:
pip_vcs.VersionControl.run_command.__func__.__defaults__ = new_defaults
run_command_defaults = pip_vcs.VersionControl.run_command.__func__.__defaults__
# set the default to not write stdout, the first option sets this value
new_defaults = [False] + list(run_command_defaults)[1:]
new_defaults = tuple(new_defaults)
pip_vcs.VersionControl.run_command.__func__.__defaults__ = new_defaults
sys.modules[target_module] = pip_vcs
cls.DEFAULT_RUN_ARGS = new_defaults
return new_defaults
+48 -21
View File
@@ -10,8 +10,10 @@ from urllib.parse import urlparse, urlsplit, urlunparse
import pipenv.vendor.tomlkit as tomlkit
import pipenv.vendor.vistir as vistir
from pipenv.vendor.pip_shims import shims
from pipenv.vendor.vistir.compat import fs_decode
from pipenv.patched.pip._internal.commands.install import InstallCommand
from pipenv.patched.pip._internal.models.target_python import TargetPython
from pipenv.patched.pip._internal.utils.filetypes import is_archive_file
from pipenv.patched.pip._internal.utils.misc import is_installable_dir
from pipenv.vendor.vistir.path import ensure_mkdir_p, is_valid_url
from .environment import MYPY_RUNNING
@@ -71,20 +73,6 @@ VCS_SCHEMES = [
]
def is_installable_dir(path):
# type: (STRING_TYPE) -> bool
if shims.is_installable_dir(path):
return True
pyproject_path = os.path.join(path, "pyproject.toml")
if os.path.exists(pyproject_path):
pyproject = Path(pyproject_path)
pyproject_toml = tomlkit.loads(pyproject.read_text())
build_system = pyproject_toml.get("build-system", {}).get("build-backend", "")
if build_system:
return True
return False
def strip_ssh_from_git_uri(uri):
# type: (S) -> S
"""Return git+ssh:// formatted URI to git+git@ format."""
@@ -164,8 +152,8 @@ def convert_entry_to_path(path):
elif "path" in path:
path = path["path"]
if not os.name == "nt":
return fs_decode(path)
return Path(fs_decode(path)).as_posix()
return os.fsdecode(path)
return Path(os.fsdecode(path)).as_posix()
def is_installable_file(path):
@@ -194,19 +182,19 @@ def is_installable_file(path):
or (len(parsed.scheme) == 1 and os.name == "nt")
)
if parsed.scheme and parsed.scheme == "file":
path = fs_decode(vistir.path.url_to_path(path))
path = os.fsdecode(vistir.path.url_to_path(path))
normalized_path = vistir.path.normalize_path(path)
if is_local and not os.path.exists(normalized_path):
return False
is_archive = shims.is_archive_file(normalized_path)
is_archive = is_archive_file(normalized_path)
is_local_project = os.path.isdir(normalized_path) and is_installable_dir(
normalized_path
)
if is_local and is_local_project or is_archive:
return True
if not is_local and shims.is_archive_file(parsed.path):
if not is_local and is_archive_file(parsed.path):
return True
return False
@@ -280,6 +268,35 @@ def prepare_pip_source_args(sources, pip_args=None):
return pip_args
def get_package_finder(
install_cmd=None,
options=None,
session=None,
platform=None,
python_versions=None,
abi=None,
implementation=None,
ignore_requires_python=None,
):
"""Reduced Shim for compatibility to generate package finders."""
py_version_info = None
if python_versions:
py_version_info_python = max(python_versions)
py_version_info = tuple([int(part) for part in py_version_info_python])
target_python = TargetPython(
platforms=[platform] if platform else None,
py_version_info=py_version_info,
abis=[abi] if abi else None,
implementation=implementation,
)
return install_cmd._build_package_finder(
options=options,
session=session,
target_python=target_python,
ignore_requires_python=ignore_requires_python,
)
@ensure_mkdir_p(mode=0o777)
def _ensure_dir(path):
return path
@@ -656,3 +673,13 @@ def merge_items(target_list, sourced=False):
if not sourced:
return ret
return ret, source_map
def get_pip_command() -> InstallCommand:
# Use pip's parser for pip.conf management and defaults.
# General options (find_links, index_url, extra_index_url, trusted_host,
# and pre) are deferred to pip.
pip_command = InstallCommand(
name="InstallCommand", summary="requirementslib pip Install command."
)
return pip_command
+1 -2
View File
@@ -12,7 +12,6 @@ markupsafe==2.0.1
orderedmultidict==1.0.1
parse==1.19.0
pexpect==4.8.0
pip-shims==0.7.3
pipdeptree==2.2.1
platformdirs==2.4.0
plette[validation]==0.2.3
@@ -21,7 +20,7 @@ pyparsing==3.0.9
python-dateutil==2.8.2
python-dotenv==0.19.0
pythonfinder==1.2.10
requirementslib==1.6.9
requirementslib==2.0.0
shellingham==1.4.0
six==1.16.0
termcolor==1.1.0
+1
View File
@@ -62,6 +62,7 @@ LIBRARY_RENAMES = {
"requests": "pipenv.patched.pip._vendor.requests",
"packaging": "pipenv.patched.pip._vendor.packaging",
"urllib3": "pipenv.patched.pip._vendor.urllib3",
"zipp": "pipenv.vendor.zipp",
}
GLOBAL_REPLACEMENT = [
@@ -40,16 +40,3 @@ index 0ba06c52..6fdb59b7 100644
class LinkCandidate(_InstallRequirementBackedCandidate):
diff --git a/pipenv/patched/pip/_internal/locations/__init__.py b/pipenv/patched/pip/_internal/locations/__init__.py
index 23eaea64..fe5dd5b4 100644
--- a/pipenv/patched/pip/_internal/locations/__init__.py
+++ b/pipenv/patched/pip/_internal/locations/__init__.py
@@ -41,7 +41,7 @@ logger = logging.getLogger(__name__)
_PLATLIBDIR: str = getattr(sys, "platlibdir", "lib")
-_USE_SYSCONFIG_DEFAULT = sys.version_info >= (3, 10)
+_USE_SYSCONFIG_DEFAULT = sys.version_info >= (3, 7)
def _should_use_sysconfig() -> bool:
-22
View File
@@ -1,22 +0,0 @@
diff --git a/pipenv/vendor/pip_shims/__init__.py b/pipenv/vendor/pip_shims/__init__.py
index c66a075a..febea8df 100644
--- a/pipenv/vendor/pip_shims/__init__.py
+++ b/pipenv/vendor/pip_shims/__init__.py
@@ -31,10 +31,15 @@ __version__ = "0.7.0"
if "pip_shims" in sys.modules:
# mainly to keep a reference to the old module on hand so it doesn't get
# weakref'd away
- old_module = sys.modules["pip_shims"]
+ if __name__ != "pip_shims":
+ del sys.modules["pip_shims"]
-module = sys.modules["pip_shims"] = shims._new()
+if __name__ in sys.modules:
+ old_module = sys.modules[__name__]
+
+
+module = sys.modules["pip_shims"] = sys.modules[__name__] = shims._new()
module.shims = shims
module.__dict__.update(
{
+1 -1
View File
@@ -61,7 +61,7 @@ DEP_PIP_PAIRS = [
"extras": ["voice"],
}
},
"https://github.com/Rapptz/discord.py/archive/async.zip#egg=discord.py[voice]",
"https://github.com/Rapptz/discord.py/archive/async.zip#egg=[voice]",
),
(
{