diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1730f0c7..63d6a496 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -66,7 +66,7 @@ jobs: with: python-version: "3.11" - run: | - python -m pip install --upgrade wheel invoke parver beautifulsoup4 vistir towncrier requests parse hatch-fancy-pypi-readme + python -m pip install --upgrade wheel invoke parver beautifulsoup4 towncrier requests parse hatch-fancy-pypi-readme python -m invoke vendoring.update tests: name: ${{matrix.os}} / ${{ matrix.python-version }} diff --git a/news/5672.vendor.rst b/news/5672.vendor.rst new file mode 100644 index 00000000..973fc371 --- /dev/null +++ b/news/5672.vendor.rst @@ -0,0 +1 @@ +Vendor in ``requirementslib==2.3.0`` which drops usage of ``vistir``. diff --git a/pipenv/__init__.py b/pipenv/__init__.py index e1335ec2..043c08e7 100644 --- a/pipenv/__init__.py +++ b/pipenv/__init__.py @@ -4,7 +4,6 @@ # | import os -import sys import warnings from pipenv.__version__ import __version__ # noqa @@ -13,50 +12,16 @@ from pipenv.patched.pip._vendor.urllib3.exceptions import DependencyWarning warnings.filterwarnings("ignore", category=DependencyWarning) warnings.filterwarnings("ignore", category=ResourceWarning) warnings.filterwarnings("ignore", category=UserWarning) -PIPENV_ROOT = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) - -PIPENV_VENDOR = os.sep.join([PIPENV_ROOT, "vendor"]) -PIPENV_PATCHED = os.sep.join([PIPENV_ROOT, "patched"]) -# PIP_VENDOR = os.sep.join([PIPENV_ROOT, "patched", "pip", "_vendor"]) - -# sys.path.insert(0, PIP_VENDOR) -# Inject vendored directory into system path. -sys.path.insert(0, PIPENV_VENDOR) -# Inject patched directory into system path. -sys.path.insert(0, PIPENV_PATCHED) # Load patched pip instead of system pip os.environ["PIP_DISABLE_PIP_VERSION_CHECK"] = "1" -# Hack to make things work better. -try: - if "concurrency" in sys.modules: - del sys.modules["concurrency"] -except Exception: - pass -if "urllib3" in sys.modules: - del sys.modules["urllib3"] - if os.name == "nt": from pipenv.vendor import colorama - # Backward compatability with vistir - # These variables will be removed in vistir 0.8.0 no_color = False - for item in ("ANSI_COLORS_DISABLED", "VISTIR_DISABLE_COLORS"): - if os.getenv(item): - warnings.warn( - ( - f"Please do not use {item}, as it will be removed in future versions." - "\nUse NO_COLOR instead." - ), - DeprecationWarning, - stacklevel=2, - ) - no_color = True - if not os.getenv("NO_COLOR") or no_color: colorama.just_fix_windows_console() diff --git a/pipenv/environment.py b/pipenv/environment.py index 5e86a328..2a51a722 100644 --- a/pipenv/environment.py +++ b/pipenv/environment.py @@ -5,7 +5,6 @@ import importlib import importlib.util import itertools import json -import operator import os import site import sys @@ -14,17 +13,18 @@ from pathlib import Path from sysconfig import get_paths, get_python_version, get_scheme_names import pipenv -from pipenv import cmdparse from pipenv.patched.pip._internal.commands.install import InstallCommand from pipenv.patched.pip._internal.index.package_finder import PackageFinder -from pipenv.patched.pip._internal.req.req_uninstall import UninstallPathSet from pipenv.patched.pip._vendor import pkg_resources from pipenv.patched.pip._vendor.packaging.utils import canonicalize_name from pipenv.utils.funktools import chunked, unnest from pipenv.utils.indexes import prepare_pip_source_args from pipenv.utils.processes import subprocess_run -from pipenv.utils.shell import make_posix, normalize_path -from pipenv.vendor import click, vistir +from pipenv.utils.shell import make_posix +from pipenv.vendor import click +from pipenv.vendor.pythonfinder.utils import is_in_path +from pipenv.vendor.requirementslib.fileutils import normalize_path, temp_path +from pipenv.vendor.requirementslib.utils import temp_environ try: # this is only in Python3.8 and later @@ -490,7 +490,7 @@ class Environment: @cached_property def paths(self) -> Dict[str, str]: paths = {} - with vistir.contextmanagers.temp_environ(), vistir.contextmanagers.temp_path(): + with temp_environ(), temp_path(): os.environ["PYTHONIOENCODING"] = "utf-8" os.environ["PYTHONDONTWRITEBYTECODE"] = "1" paths = self.base_paths @@ -519,7 +519,7 @@ class Environment: prefixes = [ Path(prefix) for prefix in self.base_paths["libdirs"].split(os.pathsep) - if vistir.path.is_in_path(prefix, self.prefix.as_posix()) + if is_in_path(prefix, self.prefix.as_posix()) ] for loc in prefixes: if not loc.exists(): @@ -528,8 +528,7 @@ class Environment: if not pth.suffix == ".egg-link": continue contents = [ - vistir.path.normalize_path(line.strip()) - for line in pth.read_text().splitlines() + normalize_path(line.strip()) for line in pth.read_text().splitlines() ] pth.write_text("\n".join(contents)) @@ -796,53 +795,6 @@ class Environment: return True return False - def run(self, cmd, cwd=os.curdir): - """Run a command with :class:`~subprocess.Popen` in the context of the environment - - :param cmd: A command to run in the environment - :type cmd: str or list - :param str cwd: The working directory in which to execute the command, defaults to :data:`os.curdir` - :return: A finished command object - :rtype: :class:`~subprocess.Popen` - """ - - c = None - with self.activated(): - script = cmdparse.Script.parse(cmd) - c = vistir.misc.run( - script._parts, - return_object=True, - nospin=True, - cwd=cwd, - write_to_stdout=False, - ) - return c - - def run_py(self, cmd, cwd=os.curdir): - """Run a python command in the environment context. - - :param cmd: A command to run in the environment - runs with `python -c` - :type cmd: str or list - :param str cwd: The working directory in which to execute the command, defaults to :data:`os.curdir` - :return: A finished command object - :rtype: :class:`~subprocess.Popen` - """ - - c = None - if isinstance(cmd, str): - script = cmdparse.Script.parse(f"{self.python} -c {cmd}") - else: - script = cmdparse.Script.parse([self.python, "-c"] + list(cmd)) - with self.activated(): - c = vistir.misc.run( - script._parts, - return_object=True, - nospin=True, - cwd=cwd, - write_to_stdout=False, - ) - return c - def run_activate_this(self): """Runs the environment's inline activation script""" if self.is_venv: @@ -876,7 +828,7 @@ class Environment: original_path = sys.path original_prefix = sys.prefix prefix = self.prefix.as_posix() - with vistir.contextmanagers.temp_environ(), vistir.contextmanagers.temp_path(): + with temp_environ(), temp_path(): os.environ["PATH"] = os.pathsep.join( [ self.script_basedir, @@ -902,103 +854,3 @@ class Environment: finally: sys.path = original_path sys.prefix = original_prefix - - @cached_property - def finders(self): - from pipenv.vendor.pythonfinder import Finder - - finders = [ - Finder(path=self.base_paths["scripts"], global_search=gs, system=False) - for gs in (False, True) - ] - return finders - - @property - def finder(self): - return next(iter(self.finders), None) - - def which(self, search, as_path=True): - find = operator.methodcaller("which", search) - result = next(iter(filter(None, (find(finder) for finder in self.finders))), None) - if not result: - result = self._which(search) - else: - if as_path: - result = str(result.path) - return result - - def install(self, requirements): - if not isinstance(requirements, (tuple, list)): - requirements = [requirements] - with self.get_finder() as finder: - args = [] - for format_control in ("no_binary", "only_binary"): - formats = getattr(finder.format_control, format_control) - args.extend( - ( - "--" + format_control.replace("_", "-"), - ",".join(sorted(formats or {":none:"})), - ) - ) - if finder.index_urls: - args.extend(["-i", finder.index_urls[0]]) - for extra_index in finder.index_urls[1:]: - args.extend(["--extra-index-url", extra_index]) - else: - args.append("--no-index") - for link in finder.find_links: - args.extend(["--find-links", link]) - for _, host, _ in finder.secure_origins: - args.extend(["--trusted-host", host]) - if finder.allow_all_prereleases: - args.append("--pre") - if finder.process_dependency_links: - args.append("--process-dependency-links") - args.append("--") - args.extend(requirements) - out, _ = vistir.misc.run( - args, return_object=False, nospin=True, block=True, combine_stderr=False - ) - - @contextlib.contextmanager - def uninstall(self, pkgname, *args, **kwargs): - """A context manager which allows uninstallation of packages from the environment - - :param str pkgname: The name of a package to uninstall - - >>> env = Environment("/path/to/env/root") - >>> with env.uninstall("pytz", auto_confirm=True, verbose=False) as uninstaller: - cleaned = uninstaller.paths - >>> if cleaned: - print("uninstalled packages: %s" % cleaned) - """ - - auto_confirm = kwargs.pop("auto_confirm", True) - verbose = kwargs.pop("verbose", False) - with self.activated(): - monkey_patch = next( - iter( - dist - for dist in self.base_working_set - if dist.project_name == "recursive-monkey-patch" - ), - None, - ) - if monkey_patch: - monkey_patch.activate() - dist = next( - iter(d for d in self.get_working_set() if d.project_name == pkgname), None - ) - path_set = UninstallPathSet.from_dist(dist) - if path_set is not None: - path_set.remove(auto_confirm=auto_confirm, verbose=verbose) - try: - yield path_set - except Exception: - if path_set is not None: - path_set.rollback() - else: - if path_set is not None: - path_set.commit() - if path_set is None: - return diff --git a/pipenv/environments.py b/pipenv/environments.py index 6fb39a6e..3b8718bd 100644 --- a/pipenv/environments.py +++ b/pipenv/environments.py @@ -4,11 +4,9 @@ import pathlib import re import sys -from vistir.path import normalize_drive - from pipenv.patched.pip._vendor.platformdirs import user_cache_dir -from pipenv.utils.shell import env_to_bool, is_env_truthy -from pipenv.vendor.vistir.misc import _isatty +from pipenv.utils.shell import env_to_bool, is_env_truthy, isatty +from pipenv.vendor.requirementslib.fileutils import normalize_drive # HACK: avoid resolver.py uses the wrong byte code files. # I hope I can remove this one day. @@ -73,7 +71,7 @@ def normalize_pipfile_path(p): # https://bugs.python.org/issue22490 os.environ.pop("__PYVENV_LAUNCHER__", None) # Internal, to tell whether the command line session is interactive. -SESSION_IS_INTERACTIVE = _isatty(sys.stdout) +SESSION_IS_INTERACTIVE = isatty(sys.stdout) # TF_BUILD indicates to Azure pipelines it is a build step PIPENV_IS_CI = get_from_env("CI", prefix="", check_for_negation=False) or is_env_truthy( diff --git a/pipenv/exceptions.py b/pipenv/exceptions.py index 457895b9..e71d17d2 100644 --- a/pipenv/exceptions.py +++ b/pipenv/exceptions.py @@ -3,7 +3,6 @@ import sys from collections import namedtuple from traceback import format_tb -from pipenv import environments from pipenv.vendor import click from pipenv.vendor.click.exceptions import ClickException, FileError, UsageError @@ -31,6 +30,8 @@ KNOWN_EXCEPTIONS = [ def handle_exception(exc_type, exception, traceback, hook=sys.excepthook): + from pipenv import environments + if environments.Setting().is_verbose() or not issubclass(exc_type, ClickException): hook(exc_type, exception, traceback) else: diff --git a/pipenv/project.py b/pipenv/project.py index 3e7e8f53..75ee7d40 100644 --- a/pipenv/project.py +++ b/pipenv/project.py @@ -13,8 +13,6 @@ import urllib.parse from json.decoder import JSONDecodeError from pathlib import Path -import click - from pipenv.cmdparse import Script from pipenv.environment import Environment from pipenv.environments import Setting, is_in_virtualenv, normalize_pipfile_path @@ -30,6 +28,7 @@ from pipenv.utils.dependencies import ( python_version, ) from pipenv.utils.internet import get_url_name, is_pypi_url, is_valid_url, proper_case +from pipenv.utils.locking import atomic_open_for_write from pipenv.utils.shell import ( find_requirements, find_windows_executable, @@ -41,7 +40,7 @@ from pipenv.utils.shell import ( system_which, ) from pipenv.utils.toml import cleanup_toml, convert_toml_outline_tables -from pipenv.vendor import plette, toml, tomlkit, vistir +from pipenv.vendor import click, plette, toml, tomlkit from pipenv.vendor.requirementslib.models.utils import get_default_pyproject_backend try: @@ -842,9 +841,7 @@ class Project: """Write out the lockfile.""" s = self._lockfile_encoder.encode(content) open_kwargs = {"newline": self._lockfile_newlines, "encoding": "utf-8"} - with vistir.contextmanagers.atomic_open_for_write( - self.lockfile_location, **open_kwargs - ) as f: + with atomic_open_for_write(self.lockfile_location, **open_kwargs) as f: f.write(s) # Write newline at end of document. GH-319. # Only need '\n' here; the file object handles the rest. diff --git a/pipenv/resolver.py b/pipenv/resolver.py index 2dfffe04..234af7ca 100644 --- a/pipenv/resolver.py +++ b/pipenv/resolver.py @@ -691,8 +691,8 @@ def clean_outdated(results, resolver, project, category): def parse_packages(packages, pre, clear, system, requirements_dir=None): from pipenv.utils.indexes import parse_indexes + from pipenv.vendor.requirementslib.fileutils import cd, temp_path from pipenv.vendor.requirementslib.models.requirements import Requirement - from pipenv.vendor.vistir.contextmanagers import cd, temp_path parsed_packages = [] for package in packages: diff --git a/pipenv/routines/clear.py b/pipenv/routines/clear.py index 9a237342..86245b1c 100644 --- a/pipenv/routines/clear.py +++ b/pipenv/routines/clear.py @@ -1,7 +1,8 @@ import shutil from pipenv import environments -from pipenv.vendor import click, vistir +from pipenv.vendor import click +from pipenv.vendor.requirementslib.models.setup_info import handle_remove_readonly def do_clear(project): @@ -9,14 +10,12 @@ def do_clear(project): click.secho("Clearing caches...", bold=True) try: - shutil.rmtree( - project.s.PIPENV_CACHE_DIR, onerror=vistir.path.handle_remove_readonly - ) + shutil.rmtree(project.s.PIPENV_CACHE_DIR, onerror=handle_remove_readonly) # Other processes may be writing into this directory simultaneously. shutil.rmtree( locations.USER_CACHE_DIR, ignore_errors=environments.PIPENV_IS_CI, - onerror=vistir.path.handle_remove_readonly, + onerror=handle_remove_readonly, ) except OSError as e: # Ignore FileNotFoundError. This is needed for Python 2.7. diff --git a/pipenv/routines/install.py b/pipenv/routines/install.py index 90fdddc1..4b8b2bf8 100644 --- a/pipenv/routines/install.py +++ b/pipenv/routines/install.py @@ -3,6 +3,7 @@ import queue import sys import warnings from collections import defaultdict +from tempfile import NamedTemporaryFile from pipenv import environments, exceptions from pipenv.patched.pip._internal.exceptions import PipError @@ -22,8 +23,10 @@ from pipenv.utils.pipfile import ensure_pipfile from pipenv.utils.project import ensure_project from pipenv.utils.requirements import import_requirements from pipenv.utils.virtualenv import cleanup_virtualenv, do_create_virtualenv -from pipenv.vendor import click, vistir +from pipenv.vendor import click +from pipenv.vendor.requirementslib import fileutils from pipenv.vendor.requirementslib.models.requirements import Requirement +from pipenv.vendor.requirementslib.utils import temp_environ console = rich.console.Console() err = rich.console.Console(stderr=True) @@ -49,7 +52,7 @@ def do_install( extra_pip_args=None, categories=None, ): - requirements_directory = vistir.path.create_tracked_tempdir( + requirements_directory = fileutils.create_tracked_tempdir( suffix="-requirements", prefix="pipenv-" ) warnings.filterwarnings("default", category=ResourceWarning) @@ -109,7 +112,7 @@ def do_install( bold=True, err=True, ) - fd = vistir.path.create_tracked_tempfile( + fd = NamedTemporaryFile( prefix="pipenv-", suffix="-requirement.txt", dir=requirements_directory ) temp_reqs = fd.name @@ -235,7 +238,7 @@ def do_install( bold=True, ) # pip install: - with vistir.contextmanagers.temp_environ(), console.status( + with temp_environ(), console.status( "Installing...", spinner=project.s.PIPENV_SPINNER ) as st: if not system: @@ -426,7 +429,7 @@ def do_sync( ) # Install everything. - requirements_dir = vistir.path.create_tracked_tempdir( + requirements_dir = fileutils.create_tracked_tempdir( suffix="-requirements", prefix="pipenv-" ) if system: @@ -592,7 +595,7 @@ def batch_install_iteration( elif dep.is_vcs: is_artifact = True - with vistir.contextmanagers.temp_environ(): + with temp_environ(): if not allow_global: os.environ["PIP_USER"] = "0" if "PYTHONHOME" in os.environ: @@ -790,7 +793,7 @@ def do_init( if not deploy: ensure_pipfile(project, system=system) if not requirements_dir: - requirements_dir = vistir.path.create_tracked_tempdir( + requirements_dir = fileutils.create_tracked_tempdir( suffix="-requirements", prefix="pipenv-" ) # Write out the lockfile if it doesn't exist, but not if the Pipfile is being ignored diff --git a/pipenv/shells.py b/pipenv/shells.py index 447fffdf..24f53e62 100644 --- a/pipenv/shells.py +++ b/pipenv/shells.py @@ -9,7 +9,7 @@ from pathlib import Path from shutil import get_terminal_size from pipenv.vendor import shellingham -from pipenv.vendor.vistir.contextmanagers import temp_environ +from pipenv.vendor.requirementslib.utils import temp_environ ShellDetectionFailure = shellingham.ShellDetectionFailure diff --git a/pipenv/utils/dependencies.py b/pipenv/utils/dependencies.py index 17aa5f55..77275a61 100644 --- a/pipenv/utils/dependencies.py +++ b/pipenv/utils/dependencies.py @@ -1,9 +1,11 @@ import os from contextlib import contextmanager +from tempfile import NamedTemporaryFile from typing import Mapping, Sequence from pipenv.patched.pip._vendor.packaging.markers import Marker from pipenv.patched.pip._vendor.packaging.version import parse +from pipenv.vendor.requirementslib.fileutils import create_tracked_tempdir from pipenv.vendor.requirementslib.models.requirements import ( InstallRequirement, Requirement, @@ -314,15 +316,10 @@ def prepare_constraint_file( sources=None, pip_args=None, ): - from pipenv.vendor.vistir.path import ( - create_tracked_tempdir, - create_tracked_tempfile, - ) - if not directory: directory = create_tracked_tempdir(suffix="-requirements", prefix="pipenv-") - constraints_file = create_tracked_tempfile( + constraints_file = NamedTemporaryFile( mode="w", prefix="pipenv-", suffix="-constraints.txt", @@ -367,8 +364,6 @@ def is_editable(pipfile_entry): @contextmanager def locked_repository(requirement): - from pipenv.vendor.vistir.path import create_tracked_tempdir - if not requirement.is_vcs: return src_dir = create_tracked_tempdir(prefix="pipenv-", suffix="-src") diff --git a/pipenv/utils/locking.py b/pipenv/utils/locking.py index c4333b67..6dd9316b 100644 --- a/pipenv/utils/locking.py +++ b/pipenv/utils/locking.py @@ -1,3 +1,7 @@ +import os +import stat +from contextlib import contextmanager +from tempfile import NamedTemporaryFile from typing import Mapping from .dependencies import clean_resolved_dep, pep423_name, translate_markers @@ -84,3 +88,85 @@ def prepare_lockfile(results, pipfile, lockfile): else: lockfile[name] = lockfile_entry[name] return lockfile + + +@contextmanager +def atomic_open_for_write(target, binary=False, newline=None, encoding=None) -> None: + """Atomically open `target` for writing. + This is based on Lektor's `atomic_open()` utility, but simplified a lot + to handle only writing, and skip many multi-process/thread edge cases + handled by Werkzeug. + :param str target: Target filename to write + :param bool binary: Whether to open in binary mode, default False + :param Optional[str] newline: The newline character to use when writing, determined + from system if not supplied. + :param Optional[str] encoding: The encoding to use when writing, defaults to system + encoding. + How this works: + * Create a temp file (in the same directory of the actual target), and + yield for surrounding code to write to it. + * If some thing goes wrong, try to remove the temp file. The actual target + is not touched whatsoever. + * If everything goes well, close the temp file, and replace the actual + target with this new file. + .. code:: python + >>> fn = "test_file.txt" + >>> def read_test_file(filename=fn): + with open(filename, 'r') as fh: + print(fh.read().strip()) + >>> with open(fn, "w") as fh: + fh.write("this is some test text") + >>> read_test_file() + this is some test text + >>> def raise_exception_while_writing(filename): + with open(filename, "w") as fh: + fh.write("writing some new text") + raise RuntimeError("Uh oh, hope your file didn't get overwritten") + >>> raise_exception_while_writing(fn) + Traceback (most recent call last): + ... + RuntimeError: Uh oh, hope your file didn't get overwritten + >>> read_test_file() + writing some new text + >>> def raise_exception_while_writing(filename): + with atomic_open_for_write(filename) as fh: + fh.write("Overwriting all the text from before with even newer text") + raise RuntimeError("But did it get overwritten now?") + >>> raise_exception_while_writing(fn) + Traceback (most recent call last): + ... + RuntimeError: But did it get overwritten now? + >>> read_test_file() + writing some new text + """ + + mode = "w+b" if binary else "w" + f = NamedTemporaryFile( + dir=os.path.dirname(target), + prefix=".__atomic-write", + mode=mode, + encoding=encoding, + newline=newline, + delete=False, + ) + # set permissions to 0644 + try: + os.chmod(f.name, stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + except OSError: + pass + try: + yield f + except BaseException: + f.close() + try: + os.remove(f.name) + except OSError: + pass + raise + else: + f.close() + try: + os.remove(target) # This is needed on Windows. + except OSError: + pass + os.rename(f.name, target) # No os.replace() on Python 2. diff --git a/pipenv/utils/pip.py b/pipenv/utils/pip.py index a937c781..c77bfb57 100644 --- a/pipenv/utils/pip.py +++ b/pipenv/utils/pip.py @@ -9,9 +9,10 @@ from pipenv.project import Project from pipenv.utils.dependencies import get_constraints_from_deps, prepare_constraint_file from pipenv.utils.indexes import get_source_list, prepare_pip_source_args from pipenv.utils.processes import subprocess_run -from pipenv.utils.shell import cmd_list_to_shell, normalize_path, project_python -from pipenv.vendor import click, vistir +from pipenv.utils.shell import cmd_list_to_shell, project_python +from pipenv.vendor import click from pipenv.vendor.requirementslib import Requirement +from pipenv.vendor.requirementslib.fileutils import create_tracked_tempdir, normalize_path def format_pip_output(out, r=None): @@ -93,9 +94,7 @@ def pip_install_deps( else: src_dir = os.getenv("PIP_SRC", os.getenv("PIP_SRC_DIR")) if not requirements_dir: - requirements_dir = vistir.path.create_tracked_tempdir( - prefix="pipenv", suffix="requirements" - ) + requirements_dir = create_tracked_tempdir(prefix="pipenv", suffix="requirements") standard_requirements = tempfile.NamedTemporaryFile( prefix="pipenv-", suffix="-hashed-reqs.txt", dir=requirements_dir, delete=False @@ -411,9 +410,7 @@ def write_requirement_to_file( include_hashes: bool = True, ) -> str: if not requirements_dir: - requirements_dir = vistir.path.create_tracked_tempdir( - prefix="pipenv", suffix="requirements" - ) + requirements_dir = create_tracked_tempdir(prefix="pipenv", suffix="requirements") line = requirement.line_instance.get_line( with_prefix=True, with_hashes=include_hashes, with_markers=True, as_list=False ) diff --git a/pipenv/utils/resolver.py b/pipenv/utils/resolver.py index 33a4ae47..ca480fa5 100644 --- a/pipenv/utils/resolver.py +++ b/pipenv/utils/resolver.py @@ -30,10 +30,9 @@ from pipenv.patched.pip._vendor import pkg_resources, rich from pipenv.project import Project from pipenv.vendor import click from pipenv.vendor.requirementslib import Requirement +from pipenv.vendor.requirementslib.fileutils import create_tracked_tempdir, open_file from pipenv.vendor.requirementslib.models.requirements import Line from pipenv.vendor.requirementslib.models.utils import DIRECT_URL_RE -from pipenv.vendor.vistir.contextmanagers import open_file -from pipenv.vendor.vistir.path import create_tracked_tempdir try: # this is only in Python3.8 and later diff --git a/pipenv/utils/shell.py b/pipenv/utils/shell.py index 36380ffe..27c6fcc8 100644 --- a/pipenv/utils/shell.py +++ b/pipenv/utils/shell.py @@ -11,15 +11,12 @@ from contextlib import contextmanager from functools import lru_cache from pathlib import Path -from pipenv.utils.constants import MYPY_RUNNING from pipenv.vendor import click +from pipenv.vendor.requirementslib.fileutils import normalize_drive, normalize_path from .constants import FALSE_VALUES, SCHEME_LIST, TRUE_VALUES from .processes import subprocess_run -if MYPY_RUNNING: - from typing import Text # noqa - @lru_cache() def make_posix(path: str) -> str: @@ -84,12 +81,6 @@ def path_to_url(path): return Path(normalize_drive(os.path.abspath(path))).as_uri() -def normalize_path(path): - return os.path.expandvars( - os.path.expanduser(os.path.normcase(os.path.normpath(os.path.abspath(str(path))))) - ) - - def get_windows_path(*args): """Sanitize a path for windows environments @@ -307,28 +298,6 @@ def is_python_command(line): return False -# TODO This code is basically a duplicate of pipenv.vendor.vistir.path.normalize_drive -# Proposal: Try removing this method and replacing usages in separate PR -def normalize_drive(path): - """Normalize drive in path so they stay consistent. - - This currently only affects local drives on Windows, which can be - identified with either upper or lower cased drive names. The case is - always converted to uppercase because it seems to be preferred. - - See: - """ - if os.name != "nt" or not isinstance(path, str): - return path - - drive, tail = os.path.splitdrive(path) - # Only match (lower cased) local drives (e.g. 'c:'), not UNC mounts. - if drive.islower() and len(drive) == 2 and drive[1] == ":": - return f"{drive.upper()}{tail}" - - return path - - @contextmanager def temp_path(): """Allow the ability to set os.environ temporarily""" @@ -466,3 +435,11 @@ def shorten_path(location, bold=False): if bold: short[-1] = str(click.style(short[-1], bold=True)) return os.sep.join(short) + + +def isatty(stream): + try: + is_a_tty = stream.isatty() + except Exception: # pragma: no cover + is_a_tty = False + return is_a_tty diff --git a/pipenv/vendor/pythonfinder/pythonfinder.py b/pipenv/vendor/pythonfinder/pythonfinder.py index 3d466bab..340f242a 100644 --- a/pipenv/vendor/pythonfinder/pythonfinder.py +++ b/pipenv/vendor/pythonfinder/pythonfinder.py @@ -73,7 +73,7 @@ class Finder(object): def create_system_path(self): # type: () -> SystemPath - pyfinder_path = importlib.import_module("pythonfinder.models.path") + pyfinder_path = importlib.import_module("pipenv.vendor.pythonfinder.models.path") return pyfinder_path.SystemPath.create( path=self.path_prepend, system=self.system, @@ -92,7 +92,7 @@ class Finder(object): if self._system_path is not None: self._system_path = self._system_path.clear_caches() self._system_path = None - pyfinder_path = importlib.import_module("pythonfinder.models.path") + pyfinder_path = importlib.import_module("pipenv.vendor.pythonfinder.models.path") importlib.reload(pyfinder_path) self._system_path = self.create_system_path() diff --git a/pipenv/vendor/requirementslib/__init__.py b/pipenv/vendor/requirementslib/__init__.py index 752a3cd1..366ffb86 100644 --- a/pipenv/vendor/requirementslib/__init__.py +++ b/pipenv/vendor/requirementslib/__init__.py @@ -5,7 +5,7 @@ from .models.lockfile import Lockfile from .models.pipfile import Pipfile from .models.requirements import Requirement -__version__ = "2.2.5" +__version__ = "2.3.0" logger = logging.getLogger(__name__) diff --git a/pipenv/vendor/requirementslib/fileutils.py b/pipenv/vendor/requirementslib/fileutils.py new file mode 100644 index 00000000..ec555c58 --- /dev/null +++ b/pipenv/vendor/requirementslib/fileutils.py @@ -0,0 +1,317 @@ +"""A collection for utilities for working with files and paths.""" +import atexit +import io +import os +import posixpath +import sys +import warnings +from contextlib import closing, contextmanager +from http.client import HTTPResponse as Urllib_HTTPResponse +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import IO, Any, ContextManager, Iterator, Optional, Text, TypeVar, Union +from urllib import parse as urllib_parse +from urllib import request as urllib_request +from urllib.parse import quote, urlparse + +from pipenv.patched.pip._vendor.requests import Session +from pipenv.patched.pip._vendor.urllib3.response import HTTPResponse as Urllib3_HTTPResponse + +_T = TypeVar("_T") + + +@contextmanager +def cd(path): + # type: () -> Iterator[None] + """Context manager to temporarily change working directories. + + :param str path: The directory to move into + >>> print(os.path.abspath(os.curdir)) + '/home/user/code/myrepo' + >>> with cd("/home/user/code/otherdir/subdir"): + ... print("Changed directory: %s" % os.path.abspath(os.curdir)) + Changed directory: /home/user/code/otherdir/subdir + >>> print(os.path.abspath(os.curdir)) + '/home/user/code/myrepo' + """ + if not path: + return + prev_cwd = Path.cwd().as_posix() + if isinstance(path, Path): + path = path.as_posix() + os.chdir(str(path)) + try: + yield + finally: + os.chdir(prev_cwd) + + +def is_file_url(url: Any) -> bool: + """Returns true if the given url is a file url.""" + if not url: + return False + if not isinstance(url, str): + try: + url = url.url + except AttributeError: + raise ValueError("Cannot parse url from unknown type: {!r}".format(url)) + return urllib_parse.urlparse(url.lower()).scheme == "file" + + +def is_valid_url(url: str) -> bool: + """Checks if a given string is an url.""" + pieces = urlparse(url) + return all([pieces.scheme, pieces.netloc]) + + +def url_to_path(url: str) -> str: + """Convert a valid file url to a local filesystem path. + + Follows logic taken from pip's equivalent function + """ + assert is_file_url(url), "Only file: urls can be converted to local paths" + _, netloc, path, _, _ = urllib_parse.urlsplit(url) + # Netlocs are UNC paths + if netloc: + netloc = "\\\\" + netloc + + path = urllib_request.url2pathname(netloc + path) + return urllib_parse.unquote(path) + + +if os.name == "nt": + # from click _winconsole.py + from ctypes import create_unicode_buffer, windll + + def get_long_path(short_path: Text) -> Text: + BUFFER_SIZE = 500 + buffer = create_unicode_buffer(BUFFER_SIZE) + get_long_path_name = windll.kernel32.GetLongPathNameW + get_long_path_name(short_path, buffer, BUFFER_SIZE) + return buffer.value + + +def normalize_path(path): + return os.path.expandvars( + os.path.expanduser(os.path.normcase(os.path.normpath(os.path.abspath(str(path))))) + ) + + + +def normalize_drive(path): + """Normalize drive in path so they stay consistent. + + This currently only affects local drives on Windows, which can be + identified with either upper or lower cased drive names. The case is + always converted to uppercase because it seems to be preferred. + + See: + """ + if os.name != "nt" or not isinstance(path, str): + return path + + drive, tail = os.path.splitdrive(path) + # Only match (lower cased) local drives (e.g. 'c:'), not UNC mounts. + if drive.islower() and len(drive) == 2 and drive[1] == ":": + return f"{drive.upper()}{tail}" + + return path + + +def path_to_url(path): + """Convert the supplied local path to a file uri. + + :param str path: A string pointing to or representing a local path + :return: A `file://` uri for the same location + :rtype: str + >>> path_to_url("/home/user/code/myrepo/myfile.zip") + 'file:///home/user/code/myrepo/myfile.zip' + """ + + if not path: + return path # type: ignore + normalized_path = Path(normalize_drive(os.path.abspath(path))).as_posix() + if os.name == "nt" and normalized_path[1] == ":": + drive, _, path = normalized_path.partition(":") + # XXX: This enables us to handle half-surrogates that were never + # XXX: actually part of a surrogate pair, but were just incidentally + # XXX: passed in as a piece of a filename + quoted_path = quote(path, errors="backslashreplace") + return "file:///{}:{}".format(drive, quoted_path) + # XXX: This is also here to help deal with incidental dangling surrogates + # XXX: on linux, by making sure they are preserved during encoding so that + # XXX: we can urlencode the backslash correctly + # bytes_path = to_bytes(normalized_path, errors="backslashreplace") + return "file://{}".format(quote(path, errors="backslashreplace")) + + +@contextmanager +def open_file( + link: Union[_T, str], session: Optional[Session] = None, stream: bool = True +) -> ContextManager[Union[IO[bytes], Urllib3_HTTPResponse, Urllib_HTTPResponse]]: + """Open local or remote file for reading. + + :param pipenv.patched.pip._internal.index.Link link: A link object from resolving dependencies with + pip, or else a URL. + :param Optional[Session] session: A :class:`~requests.Session` instance + :param bool stream: Whether to stream the content if remote, default True + :raises ValueError: If link points to a local directory. + :return: a context manager to the opened file-like object + """ + if not isinstance(link, str): + try: + link = link.url_without_fragment + except AttributeError: + raise ValueError("Cannot parse url from unknown type: {0!r}".format(link)) + + if not is_valid_url(link) and os.path.exists(link): + link = path_to_url(link) + + if is_file_url(link): + # Local URL + local_path = url_to_path(link) + if os.path.isdir(local_path): + raise ValueError("Cannot open directory for read: {}".format(link)) + else: + with io.open(local_path, "rb") as local_file: + yield local_file + else: + # Remote URL + headers = {"Accept-Encoding": "identity"} + if not session: + try: + from pipenv.patched.pip._vendor.requests import Session # noqa + except ImportError: + session = None + else: + session = Session() + if session is None: + with closing(urllib_request.urlopen(link)) as f: + yield f + else: + with session.get(link, headers=headers, stream=stream) as resp: + try: + raw = getattr(resp, "raw", None) + result = raw if raw else resp + yield result + finally: + if raw: + conn = raw._connection + if conn is not None: + conn.close() + result.close() + + +@contextmanager +def temp_path(): + # type: () -> Iterator[None] + """A context manager which allows the ability to set sys.path temporarily. + + >>> path_from_virtualenv = load_path("/path/to/venv/bin/python") + >>> print(sys.path) + [ + '/home/user/.pyenv/versions/3.7.0/bin', + '/home/user/.pyenv/versions/3.7.0/lib/python37.zip', + '/home/user/.pyenv/versions/3.7.0/lib/python3.7', + '/home/user/.pyenv/versions/3.7.0/lib/python3.7/lib-dynload', + '/home/user/.pyenv/versions/3.7.0/lib/python3.7/site-packages' + ] + >>> with temp_path(): + sys.path = path_from_virtualenv + # Running in the context of the path above + run(["pip", "install", "stuff"]) + >>> print(sys.path) + [ + '/home/user/.pyenv/versions/3.7.0/bin', + '/home/user/.pyenv/versions/3.7.0/lib/python37.zip', + '/home/user/.pyenv/versions/3.7.0/lib/python3.7', + '/home/user/.pyenv/versions/3.7.0/lib/python3.7/lib-dynload', + '/home/user/.pyenv/versions/3.7.0/lib/python3.7/site-packages' + ] + """ + path = [p for p in sys.path] + try: + yield + finally: + sys.path = [p for p in path] + + +TRACKED_TEMPORARY_DIRECTORIES = [] + + +def create_tracked_tempdir(*args: Any, **kwargs: Any) -> str: + """Create a tracked temporary directory. + + This uses `TemporaryDirectory`, but does not remove the directory + when the return value goes out of scope, instead registers a handler + to cleanup on program exit. The return value is the path to the + created directory. + """ + tempdir = TemporaryDirectory(*args, **kwargs) + TRACKED_TEMPORARY_DIRECTORIES.append(tempdir) + atexit.register(tempdir.cleanup) + warnings.simplefilter("ignore", ResourceWarning) + return tempdir.name + + +def check_for_unc_path(path): + # type: (Path) -> bool + """Checks to see if a pathlib `Path` object is a unc path or not.""" + if ( + os.name == "nt" + and len(path.drive) > 2 + and not path.drive[0].isalpha() + and path.drive[1] != ":" + ): + return True + else: + return False + + +def get_converted_relative_path(path, relative_to=None): + """Convert `path` to be relative. + + Given a vague relative path, return the path relative to the given + location. + + :param str path: The location of a target path + :param str relative_to: The starting path to build against, optional + :returns: A relative posix-style path with a leading `./` + + This performs additional conversion to ensure the result is of POSIX form, + and starts with `./`, or is precisely `.`. + + >>> os.chdir('/home/user/code/myrepo/myfolder') + >>> vistir.path.get_converted_relative_path('/home/user/code/file.zip') + './../../file.zip' + >>> vistir.path.get_converted_relative_path('/home/user/code/myrepo/myfolder/mysubfolder') + './mysubfolder' + >>> vistir.path.get_converted_relative_path('/home/user/code/myrepo/myfolder') + '.' + """ + if not relative_to: + relative_to = os.getcwd() + + start_path = Path(str(relative_to)) + try: + start = start_path.resolve() + except OSError: + start = start_path.absolute() + + # check if there is a drive letter or mount point + # if it is a mountpoint use the original absolute path + # instead of the unc path + if check_for_unc_path(start): + start = start_path.absolute() + + path = start.joinpath(str(path)).relative_to(start) + + # check and see if the path that was passed into the function is a UNC path + # and raise value error if it is not. + if check_for_unc_path(path): + raise ValueError("The path argument does not currently accept UNC paths") + + relpath_s = posixpath.normpath(path.as_posix()) + if not (relpath_s == "." or relpath_s.startswith("./")): + relpath_s = posixpath.join(".", relpath_s) + return relpath_s diff --git a/pipenv/vendor/requirementslib/models/cache.py b/pipenv/vendor/requirementslib/models/cache.py index dc8e17ac..5e686cb2 100644 --- a/pipenv/vendor/requirementslib/models/cache.py +++ b/pipenv/vendor/requirementslib/models/cache.py @@ -4,13 +4,13 @@ import hashlib import json import os -import pipenv.vendor.vistir as vistir from pipenv.patched.pip._internal.utils.hashes import FAVORITE_HASH from pipenv.patched.pip._internal.vcs.versioncontrol import VcsSupport from pipenv.patched.pip._vendor.cachecontrol.cache import DictCache from pipenv.patched.pip._vendor.packaging.requirements import Requirement from pipenv.patched.pip._vendor.platformdirs import user_cache_dir +from ..fileutils import open_file from .utils import as_tuple, get_pinned_version, key_from_req, lookup_table CACHE_DIR = os.environ.get("PIPENV_CACHE_DIR", user_cache_dir("pipenv")) @@ -30,7 +30,8 @@ class DependencyCache(object): ("ipython", "2.1.0") - For a requirement with extras, the extras will be comma-separated and appended to the version, inside brackets, + For a requirement with extras, the extras will be comma-separated and appended to the + version, inside brackets, like so: ("ipython", "2.1.0[nbconvert,notebook]") @@ -160,7 +161,7 @@ class HashCache(DictCache): def _get_file_hash(self, location): h = hashlib.new(FAVORITE_HASH) - with vistir.contextmanagers.open_file(location, self.session) as fp: + with open_file(location, self.session) as fp: for chunk in iter(lambda: fp.read(8096), b""): h.update(chunk) return ":".join([FAVORITE_HASH, h.hexdigest()]) diff --git a/pipenv/vendor/requirementslib/models/dependencies.py b/pipenv/vendor/requirementslib/models/dependencies.py index 0cb4cfd0..118b7f80 100644 --- a/pipenv/vendor/requirementslib/models/dependencies.py +++ b/pipenv/vendor/requirementslib/models/dependencies.py @@ -16,11 +16,15 @@ from pipenv.patched.pip._internal.utils.temp_dir import TempDirectory, global_te from pipenv.patched.pip._vendor.packaging.markers import Marker from pipenv.patched.pip._vendor.packaging.utils import canonicalize_name from pipenv.patched.pip._vendor.packaging.version import parse -from pipenv.vendor.vistir.contextmanagers import temp_environ -from pipenv.vendor.vistir.path import create_tracked_tempdir from ..environment import MYPY_RUNNING -from ..utils import get_package_finder, get_pip_command, prepare_pip_source_args +from ..fileutils import create_tracked_tempdir +from ..utils import ( + get_package_finder, + get_pip_command, + prepare_pip_source_args, + temp_environ, +) from .cache import CACHE_DIR, DependencyCache from .setup_info import SetupInfo from .utils import ( diff --git a/pipenv/vendor/requirementslib/models/metadata.py b/pipenv/vendor/requirementslib/models/metadata.py index dde559b8..5697c775 100644 --- a/pipenv/vendor/requirementslib/models/metadata.py +++ b/pipenv/vendor/requirementslib/models/metadata.py @@ -12,7 +12,6 @@ from typing import Sequence import pipenv.vendor.attr as attr import pipenv.patched.pip._vendor.requests as requests -import pipenv.vendor.vistir as vistir from pipenv.patched.pip._vendor.distlib import wheel from pipenv.patched.pip._vendor.distlib.metadata import Metadata from pipenv.patched.pip._vendor.packaging.markers import Marker @@ -22,6 +21,7 @@ from pipenv.patched.pip._vendor.packaging.tags import Tag from pipenv.patched.pip._vendor.packaging.version import _BaseVersion, parse from ..environment import MYPY_RUNNING +from ..fileutils import open_file from .markers import ( get_contained_extras, get_contained_pyversions, @@ -166,7 +166,7 @@ def get_remote_wheel_metadata(whl_file): # type: (str) -> Optional[Metadata] parsed_metadata = None data = io.BytesIO() - with vistir.contextmanagers.open_file(whl_file) as fp: + with open_file(whl_file) as fp: for chunk in iter(lambda: fp.read(8096), b""): data.write(chunk) with zipfile.ZipFile(data, mode="r", compression=zipfile.ZIP_DEFLATED) as zf: diff --git a/pipenv/vendor/requirementslib/models/requirements.py b/pipenv/vendor/requirementslib/models/requirements.py index 69603a3d..18deedcb 100644 --- a/pipenv/vendor/requirementslib/models/requirements.py +++ b/pipenv/vendor/requirementslib/models/requirements.py @@ -10,7 +10,6 @@ from urllib import parse as urllib_parse from urllib.parse import unquote import pipenv.vendor.attr as attr -from pipenv.patched.pip._vendor.pyparsing.core import cached_property from pipenv.patched.pip._internal.models.link import Link from pipenv.patched.pip._internal.models.wheel import Wheel from pipenv.patched.pip._internal.req.constructors import ( @@ -21,6 +20,7 @@ from pipenv.patched.pip._internal.req.constructors import ( from pipenv.patched.pip._internal.req.req_install import InstallRequirement from pipenv.patched.pip._internal.utils.temp_dir import global_tempdir_manager from pipenv.patched.pip._internal.utils.urls import path_to_url, url_to_path +from pipenv.patched.pip._vendor.distlib.util import cached_property from pipenv.patched.pip._vendor.packaging.markers import Marker from pipenv.patched.pip._vendor.packaging.requirements import Requirement as PackagingRequirement from pipenv.patched.pip._vendor.packaging.specifiers import ( @@ -31,17 +31,17 @@ from pipenv.patched.pip._vendor.packaging.specifiers import ( ) from pipenv.patched.pip._vendor.packaging.utils import canonicalize_name from pipenv.patched.pip._vendor.packaging.version import parse -from pipenv.vendor.vistir.contextmanagers import temp_path -from pipenv.vendor.vistir.path import ( + +from ..environment import MYPY_RUNNING +from ..exceptions import RequirementError +from ..fileutils import ( create_tracked_tempdir, get_converted_relative_path, is_file_url, is_valid_url, normalize_path, + temp_path, ) - -from ..environment import MYPY_RUNNING -from ..exceptions import RequirementError from ..funktools import dedup from ..utils import ( VCS_LIST, @@ -95,6 +95,7 @@ if MYPY_RUNNING: Dict, FrozenSet, Generator, + Iterator, List, Optional, Sequence, diff --git a/pipenv/vendor/requirementslib/models/setup_info.py b/pipenv/vendor/requirementslib/models/setup_info.py index 96d978af..55b2356e 100644 --- a/pipenv/vendor/requirementslib/models/setup_info.py +++ b/pipenv/vendor/requirementslib/models/setup_info.py @@ -2,15 +2,22 @@ import ast import atexit import configparser import contextlib +import errno +import locale import os import shutil +import stat import subprocess as sp import sys +import time +import warnings from collections.abc import Iterable, Mapping from contextlib import ExitStack from functools import lru_cache +from itertools import count from os import scandir from pathlib import Path +from typing import Callable, Optional from urllib.parse import parse_qs, urlparse, urlunparse from weakref import finalize @@ -31,11 +38,10 @@ from pipenv.patched.pip._vendor.pkg_resources import ( find_distributions, ) from pipenv.patched.pip._vendor.platformdirs import user_cache_dir -from pipenv.vendor.vistir.contextmanagers import cd, temp_path -from pipenv.vendor.vistir.path import create_tracked_tempdir, rmtree from ..environment import MYPY_RUNNING from ..exceptions import RequirementError +from ..fileutils import cd, create_tracked_tempdir, temp_path from ..utils import get_pip_command from .old_pip_utils import _copy_source_tree from .utils import ( @@ -54,7 +60,6 @@ if MYPY_RUNNING: Dict, Generator, List, - Optional, Sequence, Set, Text, @@ -126,6 +131,229 @@ class HookCaller(wrappers.Pep517HookCaller): self.backend_path = backend_path +def get_value_from_tuple(value, value_type): + try: + import winreg + except ImportError: + import _winreg as winreg + if value_type in (winreg.REG_SZ, winreg.REG_EXPAND_SZ): + if "\0" in value: + return value[: value.index("\0")] + return value + return None + + +def is_readonly_path(fn: os.PathLike) -> bool: + """check if a provided path exists and is readonly. + + permissions check is `bool(path.stat & stat.s_iread)` or `not + os.access(path, os.w_ok)` + """ + if os.path.exists(fn): + file_stat = os.stat(fn).st_mode + return not bool(file_stat & stat.s_iwrite) or not os.access(fn, os.w_ok) + return False + + +def query_registry_value(root, key_name, value): + try: + import winreg + except ImportError: + import _winreg as winreg + try: + with winreg.OpenKeyEx(root, key_name, 0, winreg.KEY_READ) as key: + return get_value_from_tuple(*winreg.QueryValueEx(key, value)) + except OSError: + return None + + +def _find_icacls_exe(): + if os.name == "nt": + paths = [ + os.path.expandvars(r"%windir%\{0}").format(subdir) + for subdir in ("system32", "SysWOW64") + ] + for path in paths: + icacls_path = next( + iter(fn for fn in os.listdir(path) if fn.lower() == "icacls.exe"), None + ) + if icacls_path is not None: + icacls_path = os.path.join(path, icacls_path) + return icacls_path + return None + + +def _walk_for_powershell(directory): + for _, dirs, files in os.walk(directory): + powershell = next( + iter(fn for fn in files if fn.lower() == "powershell.exe"), None + ) + if powershell is not None: + return os.path.join(directory, powershell) + for subdir in dirs: + powershell = _walk_for_powershell(os.path.join(directory, subdir)) + if powershell: + return powershell + return None + + +def _get_powershell_path(): + paths = [ + os.path.expandvars(r"%windir%\{0}\WindowsPowerShell").format(subdir) + for subdir in ("SysWOW64", "system32") + ] + powershell_path = next(iter(_walk_for_powershell(pth) for pth in paths), None) + if not powershell_path: + powershell_path = sp.run(["where", "powershell"]) + if powershell_path.stdout: + return powershell_path.stdout.strip() + + +def _get_sid_with_powershell(): + powershell_path = _get_powershell_path() + if not powershell_path: + return None + args = [ + powershell_path, + "-ExecutionPolicy", + "Bypass", + "-Command", + "Invoke-Expression '[System.Security.Principal.WindowsIdentity]::GetCurrent().user | Write-Host'", + ] + sid = sp.run(args, capture_output=True) + return sid.stdout.strip() + + +def _get_sid_from_registry(): + try: + import winreg + except ImportError: + import _winreg as winreg + var_names = ("%USERPROFILE%", "%HOME%") + current_user_home = next(iter(os.path.expandvars(v) for v in var_names if v), None) + root, subkey = ( + winreg.HKEY_LOCAL_MACHINE, + r"Software\Microsoft\Windows NT\CurrentVersion\ProfileList", + ) + subkey_names = [] + value = None + matching_key = None + try: + with winreg.OpenKeyEx(root, subkey, 0, winreg.KEY_READ) as key: + for i in count(): + key_name = winreg.EnumKey(key, i) + subkey_names.append(key_name) + value = query_registry_value( + root, r"{0}\{1}".format(subkey, key_name), "ProfileImagePath" + ) + if value and value.lower() == current_user_home.lower(): + matching_key = key_name + break + except OSError: + pass + if matching_key is not None: + return matching_key + + +def _get_current_user(): + fns = (_get_sid_from_registry, _get_sid_with_powershell) + for fn in fns: + result = fn() + if result: + return result + return None + + +def _wait_for_files(path): # pragma: no cover + """Retry with backoff up to 1 second to delete files from a directory. + + :param str path: The path to crawl to delete files from + :return: A list of remaining paths or None + :rtype: Optional[List[str]] + """ + timeout = 0.001 + remaining = [] + while timeout < 1.0: + remaining = [] + if os.path.isdir(path): + L = os.listdir(path) + for target in L: + _remaining = _wait_for_files(target) + if _remaining: + remaining.extend(_remaining) + continue + try: + os.unlink(path) + except FileNotFoundError as e: + if e.errno == errno.ENOENT: + return + except (OSError, IOError, PermissionError): # noqa:B014 + time.sleep(timeout) + timeout *= 2 + remaining.append(path) + else: + return + return remaining + + +def set_write_bit(fn: str) -> None: + """Set read-write permissions for the current user on the target path. Fail + silently if the path doesn't exist. + + :param str fn: The target filename or path + :return: None + """ + if not os.path.exists(fn): + return + file_stat = os.stat(fn).st_mode + os.chmod(fn, file_stat | stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + if os.name == "nt": + user_sid = _get_current_user() + icacls_exe = _find_icacls_exe() or "icacls" + + if user_sid: + c = sp.run( + [ + icacls_exe, + "''{}''".format(fn), + "/grant", + "{}:WD".format(user_sid), + "/T", + "/C", + "/Q", + ], + capture_output=True, + # 2020-06-12 Yukihiko Shinoda + # There are 3 way to get system default encoding in Stack Overflow. + # see: https://stackoverflow.com/questions/37506535/how-to-get-the-system-default-encoding-in-python-2-x + # I investigated these way by using Shift-JIS Windows. + # >>> import locale + # >>> locale.getpreferredencoding() + # "cp932" (Shift-JIS) + # >>> import sys + # >>> sys.getdefaultencoding() + # "utf-8" + # >>> sys.stdout.encoding + # "UTF8" + encoding=locale.getpreferredencoding(), + ) + if not c.err and c.returncode == 0: + return + + if not os.path.isdir(fn): + for path in [fn, os.path.dirname(fn)]: + try: + os.chflags(path, 0) + except AttributeError: + pass + return None + for root, dirs, files in os.walk(fn, topdown=False): + for dir_ in [os.path.join(root, d) for d in dirs]: + set_write_bit(dir_) + for file_ in [os.path.join(root, f) for f in files]: + set_write_bit(file_) + + def make_base_requirements(reqs): # type: (Sequence[STRING_TYPE]) -> Set[BaseRequirement] requirements = set() @@ -141,6 +369,96 @@ def make_base_requirements(reqs): return requirements +def handle_remove_readonly(func, path, exc): + """Error handler for shutil.rmtree. + + Windows source repo folders are read-only by default, so this error handler + attempts to set them as writeable and then proceed with deletion. + + :param function func: The caller function + :param str path: The target path for removal + :param Exception exc: The raised exception + + This function will call check :func:`is_readonly_path` before attempting to call + :func:`set_write_bit` on the target path and try again. + """ + + PERM_ERRORS = (errno.EACCES, errno.EPERM, errno.ENOENT) + default_warning_message = "Unable to remove file due to permissions restriction: {!r}" + # split the initial exception out into its type, exception, and traceback + exc_type, exc_exception, exc_tb = exc + if is_readonly_path(path): + # Apply write permission and call original function + set_write_bit(path) + try: + func(path) + except ( # noqa:B014 + OSError, + IOError, + FileNotFoundError, + PermissionError, + ) as e: # pragma: no cover + if e.errno in PERM_ERRORS: + if e.errno == errno.ENOENT: + return + remaining = None + if os.path.isdir(path): + remaining = _wait_for_files(path) + if remaining: + warnings.warn( + default_warning_message.format(path), + ResourceWarning, + stacklevel=2, + ) + else: + func(path, ignore_errors=True) + return + + if exc_exception.errno in PERM_ERRORS: + set_write_bit(path) + remaining = _wait_for_files(path) + try: + func(path) + except (OSError, IOError, FileNotFoundError, PermissionError) as e: # noqa:B014 + if e.errno in PERM_ERRORS: + if e.errno != errno.ENOENT: # File still exists + warnings.warn( + default_warning_message.format(path), + ResourceWarning, + stacklevel=2, + ) + return + else: + raise exc_exception + + +def rmtree( + directory: str, ignore_errors: bool = False, onerror: Optional[Callable] = None +) -> None: + """Stand-in for :func:`~shutil.rmtree` with additional error-handling. + + This version of `rmtree` handles read-only paths, especially in the case of index + files written by certain source control systems. + + :param str directory: The target directory to remove + :param bool ignore_errors: Whether to ignore errors, defaults to False + :param func onerror: An error handling function, defaults to :func:`handle_remove_readonly` + + .. note:: + + Setting `ignore_errors=True` may cause this to silently fail to delete the path + """ + + if onerror is None: + onerror = handle_remove_readonly + try: + shutil.rmtree(directory, ignore_errors=ignore_errors, onerror=onerror) + except (IOError, OSError, FileNotFoundError, PermissionError) as exc: # noqa:B014 + # Ignore removal failures where the file doesn't exist + if exc.errno != errno.ENOENT: + raise + + def suppress_unparsable(func, *args, **kwargs): try: return func(*args, **kwargs) @@ -158,7 +476,7 @@ class SetupReader: @classmethod def read_setup_py(cls, file: Path, raising: bool = True) -> "Dict[str, Any]": - with file.open(encoding="utf-8") as f: + with file.open(encoding="utf-8-sig") as f: content = f.read() body = ast.parse(content).body diff --git a/pipenv/vendor/requirementslib/models/utils.py b/pipenv/vendor/requirementslib/models/utils.py index b1ab8f76..a83c53ea 100644 --- a/pipenv/vendor/requirementslib/models/utils.py +++ b/pipenv/vendor/requirementslib/models/utils.py @@ -22,9 +22,9 @@ from pipenv.vendor.tomlkit.container import Container from pipenv.vendor.tomlkit.items import AoT, Array, Bool, InlineTable, Item, String, Table from pipenv.patched.pip._vendor.urllib3 import util as urllib3_util from pipenv.patched.pip._vendor.urllib3.util import parse_url as urllib3_parse -from pipenv.vendor.vistir.path import is_valid_url from ..environment import MYPY_RUNNING +from ..fileutils import is_valid_url from ..utils import SCHEME_LIST, VCS_LIST, is_star if MYPY_RUNNING: diff --git a/pipenv/vendor/requirementslib/utils.py b/pipenv/vendor/requirementslib/utils.py index a7fb37cc..f5800466 100644 --- a/pipenv/vendor/requirementslib/utils.py +++ b/pipenv/vendor/requirementslib/utils.py @@ -1,23 +1,54 @@ +# This Module is taken in part from the click project and expanded +# see https://github.com/pallets/click/blob/6cafd32/click/_winconsole.py +# Copyright © 2014 by the Pallets team. + +# Some rights reserved. + +# Redistribution and use in source and binary forms of the software as well as +# documentation, with or without modification, are permitted provided that the +# following conditions are met: +# Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this +# software without specific prior written permission. + +# THIS SOFTWARE AND DOCUMENTATION IS PROVIDED BY THE COPYRIGHT HOLDERS AND +# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT +# NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE AND +# DOCUMENTATION, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + import logging import os import sys from collections.abc import ItemsView, Mapping, Sequence, Set +from contextlib import contextmanager from pathlib import Path from urllib.parse import urlparse, urlsplit, urlunparse import pipenv.vendor.tomlkit as tomlkit -import pipenv.vendor.vistir as vistir from pipenv.patched.pip._internal.commands.install import InstallCommand from pipenv.patched.pip._internal.models.target_python import TargetPython from pipenv.patched.pip._internal.utils.filetypes import is_archive_file from pipenv.patched.pip._internal.utils.misc import is_installable_dir from pipenv.patched.pip._vendor.packaging import specifiers -from pipenv.vendor.vistir.path import is_valid_url from .environment import MYPY_RUNNING +from .fileutils import is_valid_url, normalize_path, url_to_path if MYPY_RUNNING: - from typing import Dict, List, Optional, Text, Tuple, TypeVar, Union + from typing import Dict, Iterator, List, Optional, Text, Tuple, TypeVar, Union STRING_TYPE = Union[bytes, str, Text] S = TypeVar("S", bytes, str, Text) @@ -145,7 +176,7 @@ def convert_entry_to_path(path): raise ValueError("missing path-like entry in supplied mapping {0!r}".format(path)) if "file" in path: - path = vistir.path.url_to_path(path["file"]) + path = url_to_path(path["file"]) elif "path" in path: path = path["path"] @@ -179,8 +210,8 @@ def is_installable_file(path): or (len(parsed.scheme) == 1 and os.name == "nt") ) if parsed.scheme and parsed.scheme == "file": - path = os.fsdecode(vistir.path.url_to_path(path)) - normalized_path = vistir.path.normalize_path(path) + path = os.fsdecode(url_to_path(path)) + normalized_path = normalize_path(path) if is_local and not os.path.exists(normalized_path): return False @@ -673,3 +704,17 @@ def get_pip_command() -> InstallCommand: name="InstallCommand", summary="requirementslib pip Install command." ) return pip_command + + +# Borrowed from Pew. +# See https://github.com/berdario/pew/blob/master/pew/_utils.py#L82 +@contextmanager +def temp_environ(): + # type: () -> Iterator[None] + """Allow the ability to set os.environ temporarily.""" + environ = dict(os.environ) + try: + yield + finally: + os.environ.clear() + os.environ.update(environ) diff --git a/pipenv/vendor/vendor.txt b/pipenv/vendor/vendor.txt index a9c3419d..9620b33b 100644 --- a/pipenv/vendor/vendor.txt +++ b/pipenv/vendor/vendor.txt @@ -12,9 +12,8 @@ plette[validation]==0.4.4 ptyprocess==0.7.0 python-dotenv==1.0.0 pythonfinder==1.3.2 -requirementslib==2.2.5 +requirementslib==2.3.0 ruamel.yaml==0.17.21 shellingham==1.5.0.post1 toml==0.10.2 tomlkit==0.11.7 -vistir==0.8.0 diff --git a/pipenv/vendor/vistir/LICENSE b/pipenv/vendor/vistir/LICENSE deleted file mode 100644 index e1a278e7..00000000 --- a/pipenv/vendor/vistir/LICENSE +++ /dev/null @@ -1,13 +0,0 @@ -Copyright (c) 2018, Dan Ryan - -Permission to use, copy, modify, and distribute this software for any -purpose with or without fee is hereby granted, provided that the above -copyright notice and this permission notice appear in all copies. - -THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR -ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN -ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF -OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. diff --git a/pipenv/vendor/vistir/__init__.py b/pipenv/vendor/vistir/__init__.py deleted file mode 100644 index 7014814e..00000000 --- a/pipenv/vendor/vistir/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -*- coding=utf-8 -*- - -__version__ = "0.8.0" diff --git a/pipenv/vendor/vistir/_winconsole.py b/pipenv/vendor/vistir/_winconsole.py deleted file mode 100644 index 922f6952..00000000 --- a/pipenv/vendor/vistir/_winconsole.py +++ /dev/null @@ -1,485 +0,0 @@ -# -*- coding: utf-8 -*- - -# This Module is taken in part from the click project and expanded -# see https://github.com/pallets/click/blob/6cafd32/click/_winconsole.py -# Copyright © 2014 by the Pallets team. - -# Some rights reserved. - -# Redistribution and use in source and binary forms of the software as well as -# documentation, with or without modification, are permitted provided that the -# following conditions are met: -# Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# Neither the name of the copyright holder nor the names of its contributors -# may be used to endorse or promote products derived from this -# software without specific prior written permission. - -# THIS SOFTWARE AND DOCUMENTATION IS PROVIDED BY THE COPYRIGHT HOLDERS AND -# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT -# NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; -# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR -# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE AND -# DOCUMENTATION, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# This module is based on the excellent work by Adam Bartoš who -# provided a lot of what went into the implementation here in -# the discussion to issue1602 in the Python bug tracker. -# -# There are some general differences in regards to how this works -# compared to the original patches as we do not need to patch -# the entire interpreter but just work in our little world of -# echo and prmopt. - -import ctypes -import io -import os -import sys -import time -import typing -import zlib -from ctypes import ( - POINTER, - WINFUNCTYPE, - Structure, - byref, - c_char, - c_char_p, - c_int, - c_ssize_t, - c_ulong, - c_void_p, - create_unicode_buffer, - py_object, - windll, -) -from ctypes.wintypes import HANDLE, LPCWSTR, LPWSTR -from itertools import count - -import msvcrt - -from .misc import StreamWrapper, run, to_text - -try: - from ctypes import pythonapi - - PyObject_GetBuffer = pythonapi.PyObject_GetBuffer - PyBuffer_Release = pythonapi.PyBuffer_Release -except ImportError: - pythonapi = None - - -if typing.TYPE_CHECKING: - from typing import Text - - -c_ssize_p = POINTER(c_ssize_t) -CommandLineToArgvW = WINFUNCTYPE(POINTER(LPWSTR), LPCWSTR, POINTER(c_int))( - ("CommandLineToArgvW", windll.shell32) -) -kernel32 = windll.kernel32 -GetLastError = kernel32.GetLastError -GetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32)) -GetConsoleCursorInfo = kernel32.GetConsoleCursorInfo -GetStdHandle = kernel32.GetStdHandle -LocalFree = WINFUNCTYPE(ctypes.c_void_p, ctypes.c_void_p)(("LocalFree", windll.kernel32)) -ReadConsoleW = kernel32.ReadConsoleW -SetConsoleCursorInfo = kernel32.SetConsoleCursorInfo -WriteConsoleW = kernel32.WriteConsoleW - -# XXX: Added for cursor hiding on windows -STDOUT_HANDLE_ID = ctypes.c_ulong(-11) -STDERR_HANDLE_ID = ctypes.c_ulong(-12) -STDIN_HANDLE = GetStdHandle(-10) -STDOUT_HANDLE = GetStdHandle(-11) -STDERR_HANDLE = GetStdHandle(-12) - -STREAM_MAP = {0: STDIN_HANDLE, 1: STDOUT_HANDLE, 2: STDERR_HANDLE} - - -PyBUF_SIMPLE = 0 -PyBUF_WRITABLE = 1 - -ERROR_SUCCESS = 0 -ERROR_NOT_ENOUGH_MEMORY = 8 -ERROR_OPERATION_ABORTED = 995 - -STDIN_FILENO = 0 -STDOUT_FILENO = 1 -STDERR_FILENO = 2 - -EOF = b"\x1a" -MAX_BYTES_WRITTEN = 32767 - - -class Py_buffer(Structure): - _fields_ = [ - ("buf", c_void_p), - ("obj", py_object), - ("len", c_ssize_t), - ("itemsize", c_ssize_t), - ("readonly", c_int), - ("ndim", c_int), - ("format", c_char_p), - ("shape", c_ssize_p), - ("strides", c_ssize_p), - ("suboffsets", c_ssize_p), - ("internal", c_void_p), - ] - - -# XXX: This was added for the use of cursors -class CONSOLE_CURSOR_INFO(Structure): - _fields_ = [("dwSize", ctypes.c_int), ("bVisible", ctypes.c_int)] - - -# On PyPy we cannot get buffers so our ability to operate here is -# serverly limited. -if pythonapi is None: - get_buffer = None -else: - - def get_buffer(obj, writable=False): - buf = Py_buffer() - flags = PyBUF_WRITABLE if writable else PyBUF_SIMPLE - PyObject_GetBuffer(py_object(obj), byref(buf), flags) - try: - buffer_type = c_char * buf.len - return buffer_type.from_address(buf.buf) - finally: - PyBuffer_Release(byref(buf)) - - -def get_long_path(short_path): - # type: (Text, str) -> Text - BUFFER_SIZE = 500 - buffer = create_unicode_buffer(BUFFER_SIZE) - get_long_path_name = windll.kernel32.GetLongPathNameW - get_long_path_name(to_text(short_path), buffer, BUFFER_SIZE) - return buffer.value - - -class _WindowsConsoleRawIOBase(io.RawIOBase): - def __init__(self, handle): - self.handle = handle - - def isatty(self): - io.RawIOBase.isatty(self) - return True - - -class _WindowsConsoleReader(_WindowsConsoleRawIOBase): - def readable(self): - return True - - def readinto(self, b): - bytes_to_be_read = len(b) - if not bytes_to_be_read: - return 0 - elif bytes_to_be_read % 2: - raise ValueError( - "cannot read odd number of bytes from " "UTF-16-LE encoded console" - ) - - buffer = get_buffer(b, writable=True) - code_units_to_be_read = bytes_to_be_read // 2 - code_units_read = c_ulong() - - rv = ReadConsoleW( - self.handle, buffer, code_units_to_be_read, byref(code_units_read), None - ) - if GetLastError() == ERROR_OPERATION_ABORTED: - # wait for KeyboardInterrupt - time.sleep(0.1) - if not rv: - raise OSError("Windows error: %s" % GetLastError()) - - if buffer[0] == EOF: - return 0 - return 2 * code_units_read.value - - -class _WindowsConsoleWriter(_WindowsConsoleRawIOBase): - def writable(self): - return True - - @staticmethod - def _get_error_message(errno): - if errno == ERROR_SUCCESS: - return "ERROR_SUCCESS" - elif errno == ERROR_NOT_ENOUGH_MEMORY: - return "ERROR_NOT_ENOUGH_MEMORY" - return "Windows error %s" % errno - - def write(self, b): - bytes_to_be_written = len(b) - buf = get_buffer(b) - code_units_to_be_written = min(bytes_to_be_written, MAX_BYTES_WRITTEN) // 2 - code_units_written = c_ulong() - - WriteConsoleW( - self.handle, buf, code_units_to_be_written, byref(code_units_written), None - ) - bytes_written = 2 * code_units_written.value - - if bytes_written == 0 and bytes_to_be_written > 0: - raise OSError(self._get_error_message(GetLastError())) - return bytes_written - - -class ConsoleStream(object): - def __init__(self, text_stream, byte_stream): - self._text_stream = text_stream - self.buffer = byte_stream - - @property - def name(self): - return self.buffer.name - - @property - def fileno(self): - return self.buffer.fileno - - def write(self, x): - if isinstance(x, str): - return self._text_stream.write(x) - try: - self.flush() - except Exception: - pass - return self.buffer.write(x) - - def writelines(self, lines): - for line in lines: - self.write(line) - - def __getattr__(self, name): - try: - return getattr(self._text_stream, name) - except io.UnsupportedOperation: - return getattr(self.buffer, name) - - def isatty(self): - return self.buffer.isatty() - - def __repr__(self): - return "" % (self.name, self.encoding) - - -class WindowsChunkedWriter(object): - """ - Wraps a stream (such as stdout), acting as a transparent proxy for all - attribute access apart from method 'write()' which we wrap to write in - limited chunks due to a Windows limitation on binary console streams. - """ - - def __init__(self, wrapped): - # double-underscore everything to prevent clashes with names of - # attributes on the wrapped stream object. - self.__wrapped = wrapped - - def __getattr__(self, name): - return getattr(self.__wrapped, name) - - def write(self, text): - total_to_write = len(text) - written = 0 - - while written < total_to_write: - to_write = min(total_to_write - written, MAX_BYTES_WRITTEN) - self.__wrapped.write(text[written : written + to_write]) - written += to_write - - -_wrapped_std_streams = set() - - -def _get_text_stdin(buffer_stream): - text_stream = StreamWrapper( - io.BufferedReader(_WindowsConsoleReader(STDIN_HANDLE)), - "utf-16-le", - "strict", - line_buffering=True, - ) - return ConsoleStream(text_stream, buffer_stream) - - -def _get_text_stdout(buffer_stream): - text_stream = StreamWrapper( - io.BufferedWriter(_WindowsConsoleWriter(STDOUT_HANDLE)), - "utf-16-le", - "strict", - line_buffering=True, - ) - return ConsoleStream(text_stream, buffer_stream) - - -def _get_text_stderr(buffer_stream): - text_stream = StreamWrapper( - io.BufferedWriter(_WindowsConsoleWriter(STDERR_HANDLE)), - "utf-16-le", - "strict", - line_buffering=True, - ) - return ConsoleStream(text_stream, buffer_stream) - - -_stream_factories = {0: _get_text_stdin, 1: _get_text_stdout, 2: _get_text_stderr} - - -def _get_windows_console_stream(f, encoding, errors): - if ( - get_buffer is not None - and encoding in ("utf-16-le", None) - and errors in ("strict", None) - and hasattr(f, "isatty") - and f.isatty() - ): - if isinstance(f, ConsoleStream): - return f - func = _stream_factories.get(f.fileno()) - if func is not None: - f = getattr(f, "buffer", None) - if f is None: - return None - else: - # If we are on Python 2 we need to set the stream that we - # deal with to binary mode as otherwise the exercise if a - # bit moot. The same problems apply as for - # get_binary_stdin and friends from _compat. - msvcrt.setmode(f.fileno(), os.O_BINARY) - return func(f) - - -def hide_cursor(): - cursor_info = CONSOLE_CURSOR_INFO() - GetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info)) - cursor_info.visible = False - SetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info)) - - -def show_cursor(): - cursor_info = CONSOLE_CURSOR_INFO() - GetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info)) - cursor_info.visible = True - SetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info)) - - -def get_stream_handle(stream): - return STREAM_MAP.get(stream.fileno()) - - -def _walk_for_powershell(directory): - for path, dirs, files in os.walk(directory): - powershell = next( - iter(fn for fn in files if fn.lower() == "powershell.exe"), None - ) - if powershell is not None: - return os.path.join(directory, powershell) - for subdir in dirs: - powershell = _walk_for_powershell(os.path.join(directory, subdir)) - if powershell: - return powershell - return None - - -def _get_powershell_path(): - paths = [ - os.path.expandvars(r"%windir%\{0}\WindowsPowerShell").format(subdir) - for subdir in ("SysWOW64", "system32") - ] - powershell_path = next(iter(_walk_for_powershell(pth) for pth in paths), None) - if not powershell_path: - powershell_path, _ = run( - ["where", "powershell"], block=True, nospin=True, return_object=False - ) - if powershell_path: - return powershell_path.strip() - return None - - -def _get_sid_with_powershell(): - powershell_path = _get_powershell_path() - if not powershell_path: - return None - args = [ - powershell_path, - "-ExecutionPolicy", - "Bypass", - "-Command", - "Invoke-Expression '[System.Security.Principal.WindowsIdentity]::GetCurrent().user | Write-Host'", - ] - sid, _ = run(args, nospin=True) - return sid.strip() - - -def _get_sid_from_registry(): - try: - import winreg - except ImportError: - import _winreg as winreg - var_names = ("%USERPROFILE%", "%HOME%") - current_user_home = next(iter(os.path.expandvars(v) for v in var_names if v), None) - root, subkey = ( - winreg.HKEY_LOCAL_MACHINE, - r"Software\Microsoft\Windows NT\CurrentVersion\ProfileList", - ) - subkey_names = [] - value = None - matching_key = None - try: - with winreg.OpenKeyEx(root, subkey, 0, winreg.KEY_READ) as key: - for i in count(): - key_name = winreg.EnumKey(key, i) - subkey_names.append(key_name) - value = query_registry_value( - root, r"{0}\{1}".format(subkey, key_name), "ProfileImagePath" - ) - if value and value.lower() == current_user_home.lower(): - matching_key = key_name - break - except OSError: - pass - if matching_key is not None: - return matching_key - - -def get_value_from_tuple(value, value_type): - try: - import winreg - except ImportError: - import _winreg as winreg - if value_type in (winreg.REG_SZ, winreg.REG_EXPAND_SZ): - if "\0" in value: - return value[: value.index("\0")] - return value - return None - - -def query_registry_value(root, key_name, value): - try: - import winreg - except ImportError: - import _winreg as winreg - try: - with winreg.OpenKeyEx(root, key_name, 0, winreg.KEY_READ) as key: - return get_value_from_tuple(*winreg.QueryValueEx(key, value)) - except OSError: - return None - - -def get_current_user(): - fns = (_get_sid_from_registry, _get_sid_with_powershell) - for fn in fns: - result = fn() - if result: - return result - return None diff --git a/pipenv/vendor/vistir/cmdparse.py b/pipenv/vendor/vistir/cmdparse.py deleted file mode 100644 index 704133e4..00000000 --- a/pipenv/vendor/vistir/cmdparse.py +++ /dev/null @@ -1,83 +0,0 @@ -# -*- coding=utf-8 -*- -from __future__ import absolute_import, unicode_literals - -import itertools -import re -import shlex - -__all__ = ["ScriptEmptyError", "Script"] - - -class ScriptEmptyError(ValueError): - pass - - -def _quote_if_contains(value, pattern): - if next(re.finditer(pattern, value), None): - return '"{0}"'.format(re.sub(r'(\\*)"', r'\1\1\\"', value)) - return value - - -class Script(object): - """Parse a script line (in Pipfile's [scripts] section). - - This always works in POSIX mode, even on Windows. - """ - - def __init__(self, command, args=None): - self._parts = [command] - if args: - self._parts.extend(args) - - @classmethod - def parse(cls, value): - if isinstance(value, str): - value = shlex.split(value) - if not value: - raise ScriptEmptyError(value) - return cls(value[0], value[1:]) - - def __repr__(self): - return "Script({0!r})".format(self._parts) - - @property - def command(self): - return self._parts[0] - - @property - def args(self): - return self._parts[1:] - - def extend(self, extra_args): - self._parts.extend(extra_args) - - def cmdify(self): - """Encode into a cmd-executable string. - - This re-implements CreateProcess's quoting logic to turn a list of - arguments into one single string for the shell to interpret. - - * All double quotes are escaped with a backslash. - * Existing backslashes before a quote are doubled, so they are all - escaped properly. - * Backslashes elsewhere are left as-is; cmd will interpret them - literally. - - The result is then quoted into a pair of double quotes to be grouped. - - An argument is intentionally not quoted if it does not contain - whitespaces. This is done to be compatible with Windows built-in - commands that don't work well with quotes, e.g. everything with `echo`, - and DOS-style (forward slash) switches. - - The intended use of this function is to pre-process an argument list - before passing it into ``subprocess.Popen(..., shell=True)``. - - See also: https://docs.python.org/3/library/subprocess.html#converting-argument-sequence - """ - return " ".join( - itertools.chain( - [_quote_if_contains(self.command, r"[\s^()]")], - (_quote_if_contains(arg, r"[\s^]") for arg in self.args), - ) - ) diff --git a/pipenv/vendor/vistir/contextmanagers.py b/pipenv/vendor/vistir/contextmanagers.py deleted file mode 100644 index 20b4d64d..00000000 --- a/pipenv/vendor/vistir/contextmanagers.py +++ /dev/null @@ -1,338 +0,0 @@ -# -*- coding=utf-8 -*- -import io -import os - -import stat -import sys -import typing - -from contextlib import closing, contextmanager -from pathlib import Path -from tempfile import NamedTemporaryFile -from urllib import request - -from .path import is_file_url, is_valid_url, path_to_url, url_to_path - -if typing.TYPE_CHECKING: - from typing import ( - Any, - Bytes, - Callable, - ContextManager, - Dict, - IO, - Iterator, - Optional, - Union, - Text, - Tuple, - TypeVar, - ) - from types import ModuleType - from pipenv.patched.pip._vendor.requests import Session - from http.client import HTTPResponse as Urllib_HTTPResponse - from pipenv.patched.pip._vendor.urllib3.response import HTTPResponse as Urllib3_HTTPResponse - from .spin import VistirSpinner, DummySpinner - - TSpinner = Union[VistirSpinner, DummySpinner] - _T = TypeVar("_T") - - -__all__ = [ - "temp_environ", - "temp_path", - "cd", - "atomic_open_for_write", - "open_file", - "replaced_stream", - "replaced_streams", -] - - -# Borrowed from Pew. -# See https://github.com/berdario/pew/blob/master/pew/_utils.py#L82 -@contextmanager -def temp_environ(): - # type: () -> Iterator[None] - """Allow the ability to set os.environ temporarily""" - environ = dict(os.environ) - try: - yield - finally: - os.environ.clear() - os.environ.update(environ) - - -@contextmanager -def temp_path(): - # type: () -> Iterator[None] - """A context manager which allows the ability to set sys.path temporarily - - >>> path_from_virtualenv = load_path("/path/to/venv/bin/python") - >>> print(sys.path) - [ - '/home/user/.pyenv/versions/3.7.0/bin', - '/home/user/.pyenv/versions/3.7.0/lib/python37.zip', - '/home/user/.pyenv/versions/3.7.0/lib/python3.7', - '/home/user/.pyenv/versions/3.7.0/lib/python3.7/lib-dynload', - '/home/user/.pyenv/versions/3.7.0/lib/python3.7/site-packages' - ] - >>> with temp_path(): - sys.path = path_from_virtualenv - # Running in the context of the path above - run(["pip", "install", "stuff"]) - >>> print(sys.path) - [ - '/home/user/.pyenv/versions/3.7.0/bin', - '/home/user/.pyenv/versions/3.7.0/lib/python37.zip', - '/home/user/.pyenv/versions/3.7.0/lib/python3.7', - '/home/user/.pyenv/versions/3.7.0/lib/python3.7/lib-dynload', - '/home/user/.pyenv/versions/3.7.0/lib/python3.7/site-packages' - ] - - """ - path = [p for p in sys.path] - try: - yield - finally: - sys.path = [p for p in path] - - -@contextmanager -def cd(path): - # type: () -> Iterator[None] - """Context manager to temporarily change working directories - - :param str path: The directory to move into - - >>> print(os.path.abspath(os.curdir)) - '/home/user/code/myrepo' - >>> with cd("/home/user/code/otherdir/subdir"): - ... print("Changed directory: %s" % os.path.abspath(os.curdir)) - Changed directory: /home/user/code/otherdir/subdir - >>> print(os.path.abspath(os.curdir)) - '/home/user/code/myrepo' - """ - if not path: - return - prev_cwd = Path.cwd().as_posix() - if isinstance(path, Path): - path = path.as_posix() - os.chdir(str(path)) - try: - yield - finally: - os.chdir(prev_cwd) - - -@contextmanager -def atomic_open_for_write(target, binary=False, newline=None, encoding=None): - # type: (str, bool, Optional[str], Optional[str]) -> None - """Atomically open `target` for writing. - - This is based on Lektor's `atomic_open()` utility, but simplified a lot - to handle only writing, and skip many multi-process/thread edge cases - handled by Werkzeug. - - :param str target: Target filename to write - :param bool binary: Whether to open in binary mode, default False - :param Optional[str] newline: The newline character to use when writing, determined - from system if not supplied. - :param Optional[str] encoding: The encoding to use when writing, defaults to system - encoding. - - How this works: - - * Create a temp file (in the same directory of the actual target), and - yield for surrounding code to write to it. - * If some thing goes wrong, try to remove the temp file. The actual target - is not touched whatsoever. - * If everything goes well, close the temp file, and replace the actual - target with this new file. - - .. code:: python - - >>> fn = "test_file.txt" - >>> def read_test_file(filename=fn): - with open(filename, 'r') as fh: - print(fh.read().strip()) - - >>> with open(fn, "w") as fh: - fh.write("this is some test text") - >>> read_test_file() - this is some test text - - >>> def raise_exception_while_writing(filename): - with open(filename, "w") as fh: - fh.write("writing some new text") - raise RuntimeError("Uh oh, hope your file didn't get overwritten") - - >>> raise_exception_while_writing(fn) - Traceback (most recent call last): - ... - RuntimeError: Uh oh, hope your file didn't get overwritten - >>> read_test_file() - writing some new text - - # Now try with vistir - >>> def raise_exception_while_writing(filename): - with vistir.contextmanagers.atomic_open_for_write(filename) as fh: - fh.write("Overwriting all the text from before with even newer text") - raise RuntimeError("But did it get overwritten now?") - - >>> raise_exception_while_writing(fn) - Traceback (most recent call last): - ... - RuntimeError: But did it get overwritten now? - - >>> read_test_file() - writing some new text - """ - - mode = "w+b" if binary else "w" - f = NamedTemporaryFile( - dir=os.path.dirname(target), - prefix=".__atomic-write", - mode=mode, - encoding=encoding, - newline=newline, - delete=False, - ) - # set permissions to 0644 - try: - os.chmod(f.name, stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) - except OSError: - pass - try: - yield f - except BaseException: - f.close() - try: - os.remove(f.name) - except OSError: - pass - raise - else: - f.close() - try: - os.remove(target) # This is needed on Windows. - except OSError: - pass - os.rename(f.name, target) # No os.replace() on Python 2. - - -@contextmanager -def open_file( - link, # type: Union[_T, str] - session=None, # type: Optional[Session] - stream=True, # type: bool -): - # type: (...) -> ContextManager[Union[IO[bytes], Urllib3_HTTPResponse, Urllib_HTTPResponse]] - """ - Open local or remote file for reading. - - :param pipenv.patched.pip._internal.index.Link link: A link object from resolving dependencies with - pip, or else a URL. - :param Optional[Session] session: A :class:`~requests.Session` instance - :param bool stream: Whether to stream the content if remote, default True - :raises ValueError: If link points to a local directory. - :return: a context manager to the opened file-like object - """ - if not isinstance(link, str): - try: - link = link.url_without_fragment - except AttributeError: - raise ValueError("Cannot parse url from unknown type: {0!r}".format(link)) - - if not is_valid_url(link) and os.path.exists(link): - link = path_to_url(link) - - if is_file_url(link): - # Local URL - local_path = url_to_path(link) - if os.path.isdir(local_path): - raise ValueError("Cannot open directory for read: {}".format(link)) - else: - with io.open(local_path, "rb") as local_file: - yield local_file - else: - # Remote URL - headers = {"Accept-Encoding": "identity"} - if not session: - try: - from pipenv.patched.pip._vendor.requests import Session # noqa - except ImportError: - session = None - else: - session = Session() - if session is None: - with closing(request.urlopen(link)) as f: - yield f - else: - with session.get(link, headers=headers, stream=stream) as resp: - try: - raw = getattr(resp, "raw", None) - result = raw if raw else resp - yield result - finally: - if raw: - conn = raw._connection - if conn is not None: - conn.close() - result.close() - - -@contextmanager -def replaced_stream(stream_name): - # type: (str) -> Iterator[IO[Text]] - """ - Context manager to temporarily swap out *stream_name* with a stream wrapper. - - :param str stream_name: The name of a sys stream to wrap - :returns: A ``StreamWrapper`` replacement, temporarily - - >>> orig_stdout = sys.stdout - >>> with replaced_stream("stdout") as stdout: - ... sys.stdout.write("hello") - ... assert stdout.getvalue() == "hello" - - >>> sys.stdout.write("hello") - 'hello' - """ - - orig_stream = getattr(sys, stream_name) - new_stream = io.StringIO() - try: - setattr(sys, stream_name, new_stream) - yield getattr(sys, stream_name) - finally: - setattr(sys, stream_name, orig_stream) - - -@contextmanager -def replaced_streams(): - # type: () -> Iterator[Tuple[IO[Text], IO[Text]]] - """ - Context manager to replace both ``sys.stdout`` and ``sys.stderr`` using - ``replaced_stream`` - - returns: *(stdout, stderr)* - - >>> import sys - >>> with vistir.contextmanagers.replaced_streams() as streams: - >>> stdout, stderr = streams - >>> sys.stderr.write("test") - >>> sys.stdout.write("hello") - >>> assert stdout.getvalue() == "hello" - >>> assert stderr.getvalue() == "test" - - >>> stdout.getvalue() - 'hello' - - >>> stderr.getvalue() - 'test' - """ - - with replaced_stream("stdout") as stdout: - with replaced_stream("stderr") as stderr: - yield (stdout, stderr) diff --git a/pipenv/vendor/vistir/cursor.py b/pipenv/vendor/vistir/cursor.py deleted file mode 100644 index bdb281f6..00000000 --- a/pipenv/vendor/vistir/cursor.py +++ /dev/null @@ -1,61 +0,0 @@ -# -*- coding=utf-8 -*- -from __future__ import absolute_import, print_function - -import os -import sys - -__all__ = ["hide_cursor", "show_cursor", "get_stream_handle"] - - -def get_stream_handle(stream=sys.stdout): - """ - Get the OS appropriate handle for the corresponding output stream. - - :param str stream: The the stream to get the handle for - :return: A handle to the appropriate stream, either a ctypes buffer - or **sys.stdout** or **sys.stderr**. - """ - handle = stream - if os.name == "nt": - from ._winconsole import get_stream_handle as get_win_stream_handle - - return get_win_stream_handle(stream) - return handle - - -def hide_cursor(stream=sys.stdout): - """ - Hide the console cursor on the given stream - - :param stream: The name of the stream to get the handle for - :return: None - :rtype: None - """ - - handle = get_stream_handle(stream=stream) - if os.name == "nt": - from ._winconsole import hide_cursor - - hide_cursor() - else: - handle.write("\033[?25l") - handle.flush() - - -def show_cursor(stream=sys.stdout): - """ - Show the console cursor on the given stream - - :param stream: The name of the stream to get the handle for - :return: None - :rtype: None - """ - - handle = get_stream_handle(stream=stream) - if os.name == "nt": - from ._winconsole import show_cursor - - show_cursor() - else: - handle.write("\033[?25h") - handle.flush() diff --git a/pipenv/vendor/vistir/misc.py b/pipenv/vendor/vistir/misc.py deleted file mode 100644 index 6f9abf44..00000000 --- a/pipenv/vendor/vistir/misc.py +++ /dev/null @@ -1,1119 +0,0 @@ -import io -import json -import locale -import logging -import os -import subprocess -import sys -import threading -import warnings - -from itertools import tee -from weakref import WeakKeyDictionary - -from queue import Empty, Queue -from typing import Iterable, List, Optional, Union - -from .cmdparse import Script - -_fs_encode_errors = "surrogatepass" - - -__all__ = [ - "shell_escape", - "unnest", - "run", - "load_path", - "partialclass", - "to_text", - "to_bytes", - "locale_encoding", - "getpreferredencoding", - "decode_for_output", - "get_canonical_encoding_name", - "get_wrapped_stream", - "StreamWrapper", -] - - -def _get_logger(name=None, level="ERROR"): - # type: (Optional[str], str) -> logging.Logger - if not name: - name = __name__ - level = getattr(logging, level.upper()) - logger = logging.getLogger(name) - logger.setLevel(level) - formatter = logging.Formatter( - "%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s" - ) - handler = logging.StreamHandler(stream=sys.stderr) - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger - - -def shell_escape(cmd: Union[str, List[str]]) -> str: - """Escape strings for use in :func:`~subprocess.Popen` and :func:`run`. - - This is a passthrough method for instantiating a - :class:`~vistir.cmdparse.Script` object which can be used to escape - commands to output as a single string. - """ - cmd = Script.parse(cmd) - return cmd.cmdify() - - -def unnest(elem): - # type: (Iterable) -> Any - """Flatten an arbitrarily nested iterable. - - :param elem: An iterable to flatten - :type elem: :class:`~collections.Iterable` - - >>> nested_iterable = ( - 1234, (3456, 4398345, (234234)), ( - 2396, ( - 23895750, 9283798, 29384, ( - 289375983275, 293759, 2347, ( - 2098, 7987, 27599 - ) - ) - ) - ) - ) - >>> list(vistir.misc.unnest(nested_iterable)) - [1234, 3456, 4398345, 234234, 2396, 23895750, 9283798, 29384, 289375983275, 293759, - 2347, 2098, 7987, 27599] - """ - - if isinstance(elem, Iterable) and not isinstance(elem, str): - elem, target = tee(elem, 2) - else: - target = elem - if not target or not _is_iterable(target): - yield target - else: - for el in target: - if isinstance(el, Iterable) and not isinstance(el, str): - el, el_copy = tee(el, 2) - for sub in unnest(el_copy): - yield sub - else: - yield el - - -def _is_iterable(elem): - # type: (Any) -> bool - if getattr(elem, "__iter__", False) or isinstance(elem, Iterable): - return True - return False - - -def _spawn_subprocess( - script, # type: Union[str, List[str]] - env=None, # type: Optional[Dict[str, str]] - block=True, # type: bool - cwd=None, # type: Optional[Union[str, Path]] - combine_stderr=True, # type: bool - encoding="utf-8", # type: str -): - from shutil import which - - if os.name != "nt": - class WindowsError(OSError): - """this exception is only available on windows""" - - if not env: - env = os.environ.copy() - command = which(script.command) - options = { - "env": env, - "universal_newlines": True, - "stdout": subprocess.PIPE, - "stderr": subprocess.PIPE if not combine_stderr else subprocess.STDOUT, - "shell": False, - } - if sys.version_info[:2] > (3, 5): - options.update({"universal_newlines": True, "encoding": encoding}) - elif os.name != "nt": - options["universal_newlines"] = True - if not block: - options["stdin"] = subprocess.PIPE - if cwd: - options["cwd"] = cwd - # Command not found, maybe this is a shell built-in? - cmd = [command] + script.args - if not command: # Try to use CreateProcess directly if possible. - cmd = script.cmdify() - options["shell"] = True - - # Try to use CreateProcess directly if possible. Specifically catch - # Windows error 193 "Command is not a valid Win32 application" to handle - # a "command" that is non-executable. See pypa/pipenv#2727. - try: - return subprocess.Popen(cmd, **options) - except WindowsError as err: # pragma: no cover - if getattr(err, "winerror", 9999) != 193: - raise - options["shell"] = True - # Try shell mode to use Windows's file association for file launch. - return subprocess.Popen(script.cmdify(), **options) - - -class SubprocessStreamWrapper(object): - def __init__( - self, - display_stderr_maxlen=200, # type: int - display_line_for_loops=20, # type: int - subprocess=None, # type: subprocess.Popen - spinner=None, # type: Optional[VistirSpinner] - verbose=False, # type: bool - stdout_allowed=False, # type: bool - ): - # type: (...) -> None - stdout_encoding = None - stderr_encoding = None - preferred_encoding = getpreferredencoding() - if subprocess is not None: - stdout_encoding = self.get_subprocess_encoding(subprocess, "stdout") - stderr_encoding = self.get_subprocess_encoding(subprocess, "stderr") - self.stdout_encoding = stdout_encoding or preferred_encoding - self.stderr_encoding = stderr_encoding or preferred_encoding - self.stdout_lines = [] - self.text_stdout_lines = [] - self.stderr_lines = [] - self.text_stderr_lines = [] - self.display_line = "" - self.display_line_loops_displayed = 0 - self.display_line_shown_for_loops = display_line_for_loops - self.display_line_max_len = display_stderr_maxlen - self.spinner = spinner - self.stdout_allowed = stdout_allowed - self.verbose = verbose - self._iterated_stdout = None - self._iterated_stderr = None - self._subprocess = subprocess - self._queues = { - "streams": Queue(), - "lines": Queue(), - } - self._threads = { - stream_name: threading.Thread( - target=self.enqueue_stream, - args=(self._subprocess, stream_name, self._queues["streams"]), - ) - for stream_name in ("stdout", "stderr") - } - self._threads["watcher"] = threading.Thread( - target=self.process_output_lines, - args=(self._queues["streams"], self._queues["lines"]), - ) - self.start_threads() - - def enqueue_stream(self, proc, stream_name, queue): - # type: (subprocess.Popen, str, Queue) -> None - if not getattr(proc, stream_name, None): - queue.put(("stderr", None)) - else: - for line in iter(getattr(proc, stream_name).readline, ""): - queue.put((stream_name, line)) - getattr(proc, stream_name).close() - - @property - def stderr(self): - return self._subprocess.stderr - - @property - def stdout(self): - return self._subprocess.stdout - - @classmethod - def get_subprocess_encoding(cls, cmd_instance, stream_name): - # type: (subprocess.Popen, str) -> Optional[str] - stream = getattr(cmd_instance, stream_name, None) - if stream is not None: - return get_output_encoding(getattr(stream, "encoding", None)) - return None - - @property - def stdout_iter(self): - if self._iterated_stdout is None and self.stdout: - self._iterated_stdout = iter(self.stdout.readline, "") - return self._iterated_stdout - - @property - def stderr_iter(self): - if self._iterated_stderr is None and self.stderr: - self._iterated_stderr = iter(self.stderr.readline, "") - return self._iterated_stderr - - def _decode_line(self, line, encoding): - # type: (Union[str, bytes], str) -> str - if isinstance(line, bytes): - line = to_text( - line.decode(encoding, errors=_fs_decode_errors).encode( - "utf-8", errors=_fs_encode_errors - ), - errors="backslashreplace", - ) - else: - line = to_text(line, encoding=encoding, errors=_fs_encode_errors) - return line - - def start_threads(self): - for thread in self._threads.values(): - thread.daemon = True - thread.start() - - @property - def subprocess(self): - return self._subprocess - - @property - def out(self): - # type: () -> str - return getattr(self.subprocess, "out", "") - - @out.setter - def out(self, value): - # type: (str) -> None - self._subprocess.out = value - - @property - def err(self): - # type: () -> str - return getattr(self.subprocess, "err", "") - - @err.setter - def err(self, value): - # type: (str) -> None - self._subprocess.err = value - - def poll(self): - # type: () -> Optional[int] - return self.subprocess.poll() - - def wait(self, timeout=None): - # type: (self, Optional[int]) -> Optional[int] - kwargs = {} - if sys.version_info[0] >= 3: - kwargs = {"timeout": timeout} - result = self._subprocess.wait(**kwargs) - self.gather_output() - return result - - @property - def returncode(self): - # type: () -> Optional[int] - return self.subprocess.returncode - - @property - def text_stdout(self): - return os.linesep.join(self.text_stdout_lines) - - @property - def text_stderr(self): - return os.linesep.join(self.text_stderr_lines) - - @property - def stderr_closed(self): - # type: () -> bool - return self.stderr is None or (self.stderr is not None and self.stderr.closed) - - @property - def stdout_closed(self): - # type: () -> bool - return self.stdout is None or (self.stdout is not None and self.stdout.closed) - - @property - def running(self): - # type: () -> bool - return any(t.is_alive() for t in self._threads.values()) or not all( - [self.stderr_closed, self.stdout_closed, self.subprocess_finished] - ) - - @property - def subprocess_finished(self): - if self._subprocess is None: - return False - return ( - self._subprocess.poll() is not None or self._subprocess.returncode is not None - ) - - def update_display_line(self, new_line): - # type: () -> None - if self.display_line: - if new_line != self.display_line: - self.display_line_loops_displayed = 0 - new_line = "{}".format(new_line) - if len(new_line) > self.display_line_max_len: - new_line = "{}...".format(new_line[: self.display_line_max_len]) - self.display_line = new_line - elif self.display_line_loops_displayed >= self.display_line_shown_for_loops: - self.display_line = "" - self.display_line_loops_displayed = 0 - else: - self.display_line_loops_displayed += 1 - return None - - @classmethod - def check_line_content(cls, line): - # type: (Optional[str]) -> bool - return line is not None and line != "" - - def get_line(self, queue): - # type: (Queue) -> Tuple[Optional[str], ...] - stream, result = None, None - try: - stream, result = queue.get_nowait() - except Empty: - result = None - return stream, result - - def process_output_lines(self, recv_queue, line_queue): - # type: (Queue, Queue) -> None - stream, line = self.get_line(recv_queue) - while self.poll() is None or line is not None: - if self.check_line_content(line): - line = to_text("{}".format(line).rstrip()) - line_queue.put((stream, line)) - stream, line = self.get_line(recv_queue) - - def gather_output(self, spinner=None, stdout_allowed=False, verbose=False): - # type: (Optional[VistirSpinner], bool, bool) -> None - if not getattr(self._subprocess, "out", None): - self._subprocess.out = "" - if not getattr(self._subprocess, "err", None): - self._subprocess.err = "" - if not self._queues["streams"].empty(): - self.process_output_lines(self._queues["streams"], self._queues["lines"]) - while not self._queues["lines"].empty(): - try: - stream_name, line = self._queues["lines"].get() - except Empty: - if not self._threads["watcher"].is_active(): - break - pass - if stream_name == "stdout": - text_line = self._decode_line(line, self.stdout_encoding) - self.text_stdout_lines.append(text_line) - self.out += "{}\n".format(text_line) - if verbose: - _write_subprocess_result( - line, "stdout", spinner=spinner, stdout_allowed=stdout_allowed - ) - else: - text_err = self._decode_line(line, self.stderr_encoding) - self.text_stderr_lines.append(text_err) - self.update_display_line(line) - self.err += "{}\n".format(text_err) - _write_subprocess_result( - line, "stderr", spinner=spinner, stdout_allowed=stdout_allowed - ) - if spinner: - spinner.text = "{} {}".format(spinner.text, self.display_line) - self.out = self.out.strip() - self.err = self.err.strip() - - -def _write_subprocess_result(result, stream_name, spinner=None, stdout_allowed=False): - # type: (str, str, Optional[VistirSpinner], bool) -> None - if not stdout_allowed and stream_name == "stdout": - stream_name = "stderr" - if spinner: - spinner.hide_and_write(result, target=getattr(spinner, stream_name)) - else: - target_stream = getattr(sys, stream_name) - target_stream.write(result) - target_stream.flush() - return None - - -def attach_stream_reader( - cmd_instance, verbose, maxlen, spinner=None, stdout_allowed=False -): - streams = SubprocessStreamWrapper( - subprocess=cmd_instance, - display_stderr_maxlen=maxlen, - spinner=spinner, - verbose=verbose, - stdout_allowed=stdout_allowed, - ) - streams.gather_output(spinner=spinner, verbose=verbose, stdout_allowed=stdout_allowed) - return streams - - -def _handle_nonblocking_subprocess(c, spinner=None): - while c.running: - c.wait() - if spinner: - if c.returncode != 0: - spinner.fail("Failed...cleaning up...") - elif c.returncode == 0 and not os.name == "nt": - spinner.ok("✔ Complete") - else: - spinner.ok("Complete") - return c - - -def _create_subprocess( - cmd, - env=None, - block=True, - return_object=False, - cwd=os.curdir, - verbose=False, - spinner=None, - combine_stderr=False, - display_limit=200, - start_text="", - write_to_stdout=True, - encoding="utf-8", -): - if not env: - env = os.environ.copy() - try: - c = _spawn_subprocess( - cmd, - env=env, - block=block, - cwd=cwd, - combine_stderr=combine_stderr, - encoding=encoding, - ) - except Exception as exc: # pragma: no cover - import traceback - - formatted_tb = "".join(traceback.format_exception(*sys.exc_info())) - sys.stderr.write( - "Error while executing command %s:" % " ".join(cmd._parts) - ) - sys.stderr.write(formatted_tb) - raise exc - if not block: - c.stdin.close() - spinner_orig_text = "" - if spinner and getattr(spinner, "text", None) is not None: - spinner_orig_text = spinner.text - if not spinner_orig_text and start_text is not None: - spinner_orig_text = start_text - c = attach_stream_reader( - c, - verbose=verbose, - maxlen=display_limit, - spinner=spinner, - stdout_allowed=write_to_stdout, - ) - _handle_nonblocking_subprocess(c, spinner) - else: - try: - c.out, c.err = c.communicate() - except (SystemExit, KeyboardInterrupt, TimeoutError): # pragma: no cover - c.terminate() - c.out, c.err = c.communicate() - raise - if not return_object: - return c.out.strip(), c.err.strip() - return c - - -def run( - cmd, - env=None, - return_object=False, - block=True, - cwd=None, - verbose=False, - nospin=False, - spinner_name=None, - combine_stderr=True, - display_limit=200, - write_to_stdout=True, - encoding="utf-8", -): - """Use `subprocess.Popen` to get the output of a command and decode it. - - :param list cmd: A list representing the command you want to run. - :param dict env: Additional environment settings to pass through to the subprocess. - :param bool return_object: When True, returns the whole subprocess instance - :param bool block: When False, returns a potentially still-running - :class:`subprocess.Popen` instance - :param str cwd: Current working directory context to use for spawning the subprocess. - :param bool verbose: Whether to print stdout in real time when non-blocking. - :param bool nospin: Whether to disable the cli spinner. - :param str spinner_name: The name of the spinner to use if enabled, defaults to - bouncingBar - :param bool combine_stderr: Optionally merge stdout and stderr in the subprocess, - false if nonblocking. - :param int dispay_limit: The max width of output lines to display when using a - spinner. - :param bool write_to_stdout: Whether to write to stdout when using a spinner, - defaults to True. - :returns: A 2-tuple of (output, error) or a :class:`subprocess.Popen` object. - - .. Warning:: Merging standard out and standard error in a nonblocking subprocess - can cause errors in some cases and may not be ideal. Consider disabling - this functionality. - """ - - _env = os.environ.copy() - _env["PYTHONIOENCODING"] = str("utf-8") - _env["PYTHONUTF8"] = str("1") - if env: - _env.update(env) - - _env = {k: v for k, v in _env.items()} - if not spinner_name: - spinner_name = "bouncingBar" - - if not isinstance(cmd, Script): - cmd = Script.parse(cmd) - if block or not return_object: - combine_stderr = False - start_text = "" - return _create_subprocess( - cmd, - env=_env, - return_object=return_object, - block=block, - cwd=cwd, - verbose=verbose, - spinner=None, - combine_stderr=combine_stderr, - start_text=start_text, - write_to_stdout=write_to_stdout, - encoding=encoding, - ) - - -def load_path(python): - """Load the :mod:`sys.path` from the given python executable's environment - as json. - - :param str python: Path to a valid python executable - :return: A python representation of the `sys.path` value of the given python - executable. - :rtype: list - - >>> load_path("/home/user/.virtualenvs/requirementslib-5MhGuG3C/bin/python") - ['', '/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python37.zip', - '/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7', - '/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7/lib-dynload', - '/home/user/.pyenv/versions/3.7.0/lib/python3.7', - '/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7/site-packages', - '/home/user/git/requirementslib/src'] - """ - warnings.warn( - 'This function is deprecated and will be removed in version 0.8.', - DeprecationWarning, stacklevel=2) - from pathlib import Path - python = Path(python).as_posix() - out, err = run( - [python, "-c", "import json, sys; print(json.dumps(sys.path))"], nospin=True - ) - if out: - return json.loads(out) - else: - return [] - - -def partialclass(cls, *args, **kwargs): - """Returns a partially instantiated class. - - :return: A partial class instance - :rtype: cls - - >>> source = partialclass(Source, url="https://pypi.org/simple") - >>> source - - >>> source(name="pypi") - >>> source.__dict__ - mappingproxy({ - '__module__': '__main__', - '__dict__': , - '__weakref__': , - '__doc__': None, - '__init__': functools.partialmethod( - , , url='https://pypi.org/simple' - ) - }) - >>> new_source = source(name="pypi") - >>> new_source - <__main__.Source object at 0x7f23af189b38> - >>> new_source.__dict__ - {'url': 'https://pypi.org/simple', 'verify_ssl': True, 'name': 'pypi'} - """ - from functools import partialmethod - name_attrs = [ - n - for n in (getattr(cls, name, str(cls)) for name in ("__name__", "__qualname__")) - if n is not None - ] - name_attrs = name_attrs[0] - type_ = type( - name_attrs, (cls,), {"__init__": partialmethod(cls.__init__, *args, **kwargs)} - ) - # Swiped from attrs.make_class - try: - type_.__module__ = sys._getframe(1).f_globals.get("__name__", "__main__") - except (AttributeError, ValueError): # pragma: no cover - pass # pragma: no cover - return type_ - - -# Borrowed from django -- force bytes and decode -- see link for details: -# https://github.com/django/django/blob/fc6b90b/django/utils/encoding.py#L112 -def to_bytes(string, encoding="utf-8", errors=None): - """Force a value to bytes. - - :param string: Some input that can be converted to a bytes. - :type string: str or bytes unicode or a memoryview subclass - :param encoding: The encoding to use for conversions, defaults to "utf-8" - :param encoding: str, optional - :return: Corresponding byte representation (for use in filesystem operations) - :rtype: bytes - """ - - unicode_name = get_canonical_encoding_name("utf-8") - if not errors: - if get_canonical_encoding_name(encoding) == unicode_name: - if os.name == "nt": - errors = "surrogatepass" - else: - errors = "surrogateescape" - else: - errors = "strict" - if isinstance(string, bytes): - if get_canonical_encoding_name(encoding) == unicode_name: - return string - else: - return string.decode(unicode_name).encode(encoding, errors) - elif isinstance(string, memoryview): - return string.tobytes() - elif not isinstance(string, str): # pragma: no cover - try: - return str(string).encode(encoding, errors) - except UnicodeEncodeError: - if isinstance(string, Exception): - return b" ".join(to_bytes(arg, encoding, errors) for arg in string) - return str(string).encode(encoding, errors) - else: - return string.encode(encoding, errors) - - -def to_text(string, encoding="utf-8", errors=None): - """Force a value to a text-type. - - :param string: Some input that can be converted to a unicode representation. - :type string: str or bytes unicode - :param encoding: The encoding to use for conversions, defaults to "utf-8" - :param encoding: str, optional - :return: The unicode representation of the string - :rtype: str - """ - - unicode_name = get_canonical_encoding_name("utf-8") - if not errors: - if get_canonical_encoding_name(encoding) == unicode_name: - if os.name == "nt": - errors = "surrogatepass" - else: - errors = "surrogateescape" - else: - errors = "strict" - if issubclass(type(string), str): - return string - try: - if not issubclass(type(string), str): - if isinstance(string, bytes): - string = str(string, encoding, errors) - else: - string = str(string) - - else: - string = string.decode(encoding, errors) - except UnicodeDecodeError: # pragma: no cover - string = " ".join(to_text(arg, encoding, errors) for arg in string) - return string - - -try: - locale_encoding = locale.getdefaultlocale()[1] or "ascii" -except Exception: - locale_encoding = "ascii" - - -def getpreferredencoding(): - """Determine the proper output encoding for terminal rendering.""" - - # Borrowed from Invoke - # (see https://github.com/pyinvoke/invoke/blob/93af29d/invoke/runners.py#L881) - _encoding = sys.getdefaultencoding() or locale.getpreferredencoding(False) - return _encoding - - -PREFERRED_ENCODING = getpreferredencoding() - - -def get_output_encoding(source_encoding): - """Given a source encoding, determine the preferred output encoding. - - :param str source_encoding: The encoding of the source material. - :returns: The output encoding to decode to. - :rtype: str - """ - - if source_encoding is not None: - if get_canonical_encoding_name(source_encoding) == "ascii": - return "utf-8" - return get_canonical_encoding_name(source_encoding) - return get_canonical_encoding_name(PREFERRED_ENCODING) - - -def _encode(output, encoding=None, errors=None, translation_map=None): - if encoding is None: - encoding = PREFERRED_ENCODING - try: - output = output.encode(encoding) - except (UnicodeDecodeError, UnicodeEncodeError): - if translation_map is not None: - output = output.translate(translation_map) - else: - output = to_text(output, encoding=encoding, errors=errors) - except AttributeError: - pass - return output - - -def decode_for_output(output, target_stream=None, translation_map=None): - """Given a string, decode it for output to a terminal. - - :param str output: A string to print to a terminal - :param target_stream: A stream to write to, we will encode to target this stream if - possible. - :param dict translation_map: A mapping of unicode character ordinals to replacement - strings. - :return: A re-encoded string using the preferred encoding - :rtype: str - """ - - if not isinstance(output, str): - return output - encoding = None - if target_stream is not None: - encoding = getattr(target_stream, "encoding", None) - encoding = get_output_encoding(encoding) - try: - output = _encode(output, encoding=encoding, translation_map=translation_map) - except (UnicodeDecodeError, UnicodeEncodeError): - output = _encode( - output, encoding=encoding, errors="replace", translation_map=translation_map - ) - return to_text(output, encoding=encoding, errors="replace") - - -def get_canonical_encoding_name(name): - # type: (str) -> str - """Given an encoding name, get the canonical name from a codec lookup. - - :param str name: The name of the codec to lookup - :return: The canonical version of the codec name - :rtype: str - """ - - import codecs - - try: - codec = codecs.lookup(name) - except LookupError: - return name - else: - return codec.name - - -def _is_binary_buffer(stream): - try: - stream.write(b"") - except Exception: - try: - stream.write("") - except Exception: - pass - return False - return True - - -def _get_binary_buffer(stream): - if not _is_binary_buffer(stream): - stream = getattr(stream, "buffer", None) - if stream is not None and _is_binary_buffer(stream): - return stream - return stream - - -def get_wrapped_stream(stream, encoding=None, errors="replace"): - """Given a stream, wrap it in a `StreamWrapper` instance and return the - wrapped stream. - - :param stream: A stream instance to wrap - :param str encoding: The encoding to use for the stream - :param str errors: The error handler to use, default "replace" - :returns: A new, wrapped stream - :rtype: :class:`StreamWrapper` - """ - - if stream is None: - raise TypeError("must provide a stream to wrap") - stream = _get_binary_buffer(stream) - if stream is not None and encoding is None: - encoding = "utf-8" - if not encoding: - encoding = get_output_encoding(getattr(stream, "encoding", None)) - else: - encoding = get_canonical_encoding_name(encoding) - return StreamWrapper(stream, encoding, errors, line_buffering=True) - - -class StreamWrapper(io.TextIOWrapper): - - """This wrapper class will wrap a provided stream and supply an interface - for compatibility.""" - - def __init__(self, stream, encoding, errors, line_buffering=True, **kwargs): - self._stream = stream = _StreamProvider(stream) - io.TextIOWrapper.__init__( - self, stream, encoding, errors, line_buffering=line_buffering, **kwargs - ) - - # borrowed from click's implementation of stream wrappers, see - # https://github.com/pallets/click/blob/6cafd32/click/_compat.py#L64 - - def write(self, x): - # try to use backslash and surrogate escape strategies before failing - self._errors = ( - "backslashescape" if self.encoding != "mbcs" else "surrogateescape" - ) - try: - return io.TextIOWrapper.write(self, to_text(x, errors=self._errors)) - except UnicodeDecodeError: - if self._errors != "surrogateescape": - self._errors = "surrogateescape" - else: - self._errors = "replace" - return io.TextIOWrapper.write(self, to_text(x, errors=self._errors)) - - def writelines(self, lines): - for line in lines: - self.write(line) - - def __del__(self): - try: - self.detach() - except Exception: - pass - - def isatty(self): - return self._stream.isatty() - - -# More things borrowed from click, this is because we are using `TextIOWrapper` instead of -# just a normal StringIO -class _StreamProvider(object): - def __init__(self, stream): - self._stream = stream - super(_StreamProvider, self).__init__() - - def __getattr__(self, name): - return getattr(self._stream, name) - - def read1(self, size): - fn = getattr(self._stream, "read1", None) - if fn is not None: - return fn(size) - return self._stream.read(size) - - def readable(self): - fn = getattr(self._stream, "readable", None) - if fn is not None: - return fn() - try: - self._stream.read(0) - except Exception: - return False - return True - - def writable(self): - fn = getattr(self._stream, "writable", None) - if fn is not None: - return fn() - try: - self._stream.write(b"") - except Exception: - return False - return True - - def seekable(self): - fn = getattr(self._stream, "seekable", None) - if fn is not None: - return fn() - try: - self._stream.seek(self._stream.tell()) - except Exception: - return False - return True - - -# XXX: The approach here is inspired somewhat by click with details taken from various -# XXX: other sources. Specifically we are using a stream cache and stream wrapping -# XXX: techniques from click (loosely inspired for the most part, with many details) -# XXX: heavily modified to suit our needs - - -def _isatty(stream): - try: - is_a_tty = stream.isatty() - except Exception: # pragma: no cover - is_a_tty = False - return is_a_tty - - -_wrap_for_color = None - -try: - import pipenv.vendor.colorama as colorama -except ImportError: - colorama = None - -_color_stream_cache = WeakKeyDictionary() - -if os.name == "nt" or sys.platform.startswith("win"): - - if colorama is not None: - - def _is_wrapped_for_color(stream): - return isinstance( - stream, (colorama.AnsiToWin32, colorama.ansitowin32.StreamWrapper) - ) - - def _wrap_for_color(stream, color=None): - try: - cached = _color_stream_cache.get(stream) - except KeyError: - cached = None - if cached is not None: - return cached - strip = not _can_use_color(stream, color) - _color_wrapper = colorama.AnsiToWin32(stream, strip=strip) - result = _color_wrapper.stream - _write = result.write - - def _write_with_color(s): - try: - return _write(s) - except Exception: - _color_wrapper.reset_all() - raise - - result.write = _write_with_color - try: - _color_stream_cache[stream] = result - except Exception: - pass - return result - - -def _cached_stream_lookup(stream_lookup_func, stream_resolution_func): - stream_cache = WeakKeyDictionary() - - def lookup(): - stream = stream_lookup_func() - result = None - if stream in stream_cache: - result = stream_cache.get(stream, None) - if result is not None: - return result - result = stream_resolution_func() - try: - stream = stream_lookup_func() - stream_cache[stream] = result - except Exception: - pass - return result - - return lookup - - -def get_text_stream(stream="stdout", encoding=None): - """Retrieve a utf-8 stream wrapper around **sys.stdout** or **sys.stderr**. - - :param str stream: The name of the stream to wrap from the :mod:`sys` module. - :param str encoding: An optional encoding to use. - :return: A new :class:`~vistir.misc.StreamWrapper` instance around the stream - :rtype: `vistir.misc.StreamWrapper` - """ - - stream_map = {"stdin": sys.stdin, "stdout": sys.stdout, "stderr": sys.stderr} - if os.name == "nt" or sys.platform.startswith("win"): - from ._winconsole import _get_windows_console_stream - - else: - _get_windows_console_stream = lambda *args: None # noqa - _wrap_std_stream = lambda *args: None # noqa - - sys_stream = stream_map[stream] - windows_console = _get_windows_console_stream(sys_stream, encoding, None) - if windows_console is not None: - if _can_use_color(windows_console): - return _wrap_for_color(windows_console) - return windows_console - return get_wrapped_stream(sys_stream, encoding) - - -def get_text_stdout(): - return get_text_stream("stdout") - - -def get_text_stderr(): - return get_text_stream("stderr") - - -def get_text_stdin(): - return get_text_stream("stdin") - - -_text_stdin = _cached_stream_lookup(lambda: sys.stdin, get_text_stdin) -_text_stdout = _cached_stream_lookup(lambda: sys.stdout, get_text_stdout) -_text_stderr = _cached_stream_lookup(lambda: sys.stderr, get_text_stderr) - - -TEXT_STREAMS = { - "stdin": get_text_stdin, - "stdout": get_text_stdout, - "stderr": get_text_stderr, -} - - -def replace_with_text_stream(stream_name): - """Given a stream name, replace the target stream with a text-converted - equivalent. - - :param str stream_name: The name of a target stream, such as **stdout** or **stderr** - :return: None - """ - new_stream = TEXT_STREAMS.get(stream_name) - if new_stream is not None: - new_stream = new_stream() - setattr(sys, stream_name, new_stream) - return None - - -def _can_use_color(stream=None, color=None): - from .termcolors import DISABLE_COLORS - - if DISABLE_COLORS: - return False - if not color: - if not stream: - stream = sys.stdin - return _isatty(stream) - return bool(color) diff --git a/pipenv/vendor/vistir/path.py b/pipenv/vendor/vistir/path.py deleted file mode 100644 index 2464cb23..00000000 --- a/pipenv/vendor/vistir/path.py +++ /dev/null @@ -1,618 +0,0 @@ -# -*- coding=utf-8 -*- -from __future__ import absolute_import, print_function, unicode_literals - -import atexit -import errno -import functools -import locale -import os -import posixpath -import shutil -import stat -import sys -import typing -import time -import unicodedata -import warnings - -from pathlib import Path -from tempfile import NamedTemporaryFile, TemporaryDirectory -from typing import Optional, Callable -from urllib import parse as urllib_parse -from urllib import request as urllib_request - -from urllib.parse import quote - -if typing.TYPE_CHECKING: - from types import TracebackType - from typing import ( - Any, - AnyStr, - ByteString, - Generator, - Iterator, - List, - Text, - Tuple, - Type, - Union, - ) - - TPath = os.PathLike - TFunc = Callable[..., Any] - - -__all__ = [ - "check_for_unc_path", - "get_converted_relative_path", - "handle_remove_readonly", - "normalize_path", - "is_in_path", - "is_file_url", - "is_readonly_path", - "is_valid_url", - "mkdir_p", - "ensure_mkdir_p", - "create_tracked_tempdir", - "create_tracked_tempfile", - "path_to_url", - "rmtree", - "safe_expandvars", - "set_write_bit", - "url_to_path", - "walk_up", -] - - -if os.name == "nt": - warnings.filterwarnings( - "ignore", - category=DeprecationWarning, - message="The Windows bytes API has been deprecated.*", - ) - - -def unicode_path(path): - # type: (TPath) -> Text - # Paths are supposed to be represented as unicode here - return path - - -def native_path(path): - # type: (TPath) -> str - return str(path) - - -# once again thank you django... -# https://github.com/django/django/blob/fc6b90b/django/utils/_os.py -if os.name == "nt": - abspathu = os.path.abspath -else: - - def abspathu(path): - # type: (TPath) -> Text - """Version of os.path.abspath that uses the unicode representation of - the current working directory, thus avoiding a UnicodeDecodeError in - join when the cwd has non-ASCII characters.""" - if not os.path.isabs(path): - path = os.path.join(os.getcwd(), path) - return os.path.normpath(path) - - -def normalize_path(path): - # type: (TPath) -> Text - """Return a case-normalized absolute variable-expanded path. - - :param str path: The non-normalized path - :return: A normalized, expanded, case-normalized path - :rtype: str - """ - - path = os.path.abspath(os.path.expandvars(os.path.expanduser(str(path)))) - if os.name == "nt" and os.path.exists(path): - from ._winconsole import get_long_path - - path = get_long_path(path) - - return os.path.normpath(os.path.normcase(path)) - - -def is_in_path(path, parent): - # type: (TPath, TPath) -> bool - """Determine if the provided full path is in the given parent root. - - :param str path: The full path to check the location of. - :param str parent: The parent path to check for membership in - :return: Whether the full path is a member of the provided parent. - :rtype: bool - """ - - return normalize_path(path).startswith(normalize_path(parent)) - - -def normalize_drive(path): - # type: (TPath) -> Text - """Normalize drive in path so they stay consistent. - - This currently only affects local drives on Windows, which can be - identified with either upper or lower cased drive names. The case is - always converted to uppercase because it seems to be preferred. - """ - from .misc import to_text - - if os.name != "nt" or not ( - isinstance(path, str) or getattr(path, "__fspath__", None) - ): - return path # type: ignore - - drive, tail = os.path.splitdrive(path) - # Only match (lower cased) local drives (e.g. 'c:'), not UNC mounts. - if drive.islower() and len(drive) == 2 and drive[1] == ":": - return "{}{}".format(drive.upper(), tail) - - return to_text(path, encoding="utf-8") - - -def path_to_url(path): - # type: (TPath) -> Text - """Convert the supplied local path to a file uri. - - :param str path: A string pointing to or representing a local path - :return: A `file://` uri for the same location - :rtype: str - - >>> path_to_url("/home/user/code/myrepo/myfile.zip") - 'file:///home/user/code/myrepo/myfile.zip' - """ - from .misc import to_bytes - - if not path: - return path # type: ignore - normalized_path = Path(normalize_drive(os.path.abspath(path))).as_posix() - if os.name == "nt" and normalized_path[1] == ":": - drive, _, path = normalized_path.partition(":") - # XXX: This enables us to handle half-surrogates that were never - # XXX: actually part of a surrogate pair, but were just incidentally - # XXX: passed in as a piece of a filename - quoted_path = quote(path, errors="backslashreplace") - return "file:///{}:{}".format(drive, quoted_path) - # XXX: This is also here to help deal with incidental dangling surrogates - # XXX: on linux, by making sure they are preserved during encoding so that - # XXX: we can urlencode the backslash correctly - # bytes_path = to_bytes(normalized_path, errors="backslashreplace") - return "file://{}".format(quote(path, errors="backslashreplace")) - - -def url_to_path(url): - # type: (str) -> str - """Convert a valid file url to a local filesystem path. - - Follows logic taken from pip's equivalent function - """ - - assert is_file_url(url), "Only file: urls can be converted to local paths" - _, netloc, path, _, _ = urllib_parse.urlsplit(url) - # Netlocs are UNC paths - if netloc: - netloc = "\\\\" + netloc - - path = urllib_request.url2pathname(netloc + path) - return urllib_parse.unquote(path) - - -def is_valid_url(url): - # type: (Union[str, bytes]) -> bool - """Checks if a given string is an url.""" - from .misc import to_text - - if not url: - return url # type: ignore - pieces = urllib_parse.urlparse(to_text(url)) - return all([pieces.scheme, pieces.netloc]) - - -def is_file_url(url): - # type: (Any) -> bool - """Returns true if the given url is a file url.""" - from .misc import to_text - - if not url: - return False - if not isinstance(url, str): - try: - url = url.url - except AttributeError: - raise ValueError("Cannot parse url from unknown type: {!r}".format(url)) - url = to_text(url, encoding="utf-8") - return urllib_parse.urlparse(url.lower()).scheme == "file" - - -def is_readonly_path(fn): - # type: (TPath) -> bool - """Check if a provided path exists and is readonly. - - Permissions check is `bool(path.stat & stat.S_IREAD)` or `not - os.access(path, os.W_OK)` - """ - if os.path.exists(fn): - file_stat = os.stat(fn).st_mode - return not bool(file_stat & stat.S_IWRITE) or not os.access(fn, os.W_OK) - return False - - -def mkdir_p(newdir, mode=0o777): - warnings.warn( - ('This function is deprecated and will be removed in version 0.8.' - 'Use os.makedirs instead'), DeprecationWarning, stacklevel=2) - # This exists in shutil already - # type: (TPath, int) -> None - """Recursively creates the target directory and all of its parents if they - do not already exist. Fails silently if they do. - - :param str newdir: The directory path to ensure - :raises: OSError if a file is encountered along the way - """ - if os.path.exists(newdir): - if not os.path.isdir(newdir): - raise OSError( - "a file with the same name as the desired dir, '{}', already exists.".format( - newdir - ) - ) - return None - os.makedirs(newdir, mode) - - -def ensure_mkdir_p(mode=0o777): - # type: (int) -> Callable[Callable[..., Any], Callable[..., Any]] - """Decorator to ensure `mkdir_p` is called to the function's return - value.""" - warnings.warn('This function is deprecated and will be removed in version 0.8.', - DeprecationWarning, stacklevel=2) - - # This exists in shutil already - def decorator(f): - # type: (Callable[..., Any]) -> Callable[..., Any] - @functools.wraps(f) - def decorated(*args, **kwargs): - # type: () -> str - path = f(*args, **kwargs) - mkdir_p(path, mode=mode) - return path - return decorated - return decorator - - -TRACKED_TEMPORARY_DIRECTORIES = [] - - -def create_tracked_tempdir(*args, **kwargs): - # type: (Any, Any) -> str - """Create a tracked temporary directory. - - This uses `TemporaryDirectory`, but does not remove the directory when - the return value goes out of scope, instead registers a handler to cleanup - on program exit. - - The return value is the path to the created directory. - """ - - tempdir = TemporaryDirectory(*args, **kwargs) - TRACKED_TEMPORARY_DIRECTORIES.append(tempdir) - atexit.register(tempdir.cleanup) - warnings.simplefilter("ignore", ResourceWarning) - return tempdir.name - - -def create_tracked_tempfile(*args, **kwargs): - # type: (Any, Any) -> str - """Create a tracked temporary file. - - This uses the `NamedTemporaryFile` construct, but does not remove the file - until the interpreter exits. - - The return value is the file object. - """ - - return NamedTemporaryFile(*args, **kwargs) - - -def _find_icacls_exe(): - # type: () -> Optional[Text] - if os.name == "nt": - paths = [ - os.path.expandvars(r"%windir%\{0}").format(subdir) - for subdir in ("system32", "SysWOW64") - ] - for path in paths: - icacls_path = next( - iter(fn for fn in os.listdir(path) if fn.lower() == "icacls.exe"), None - ) - if icacls_path is not None: - icacls_path = os.path.join(path, icacls_path) - return icacls_path - return None - - -def set_write_bit(fn: str) -> None: - """Set read-write permissions for the current user on the target path. Fail - silently if the path doesn't exist. - - :param str fn: The target filename or path - :return: None - """ - if not os.path.exists(fn): - return - file_stat = os.stat(fn).st_mode - os.chmod(fn, file_stat | stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - if os.name == "nt": - from ._winconsole import get_current_user - - user_sid = get_current_user() - icacls_exe = _find_icacls_exe() or "icacls" - from .misc import run - - if user_sid: - c = run( - [ - icacls_exe, - "''{}''".format(fn), - "/grant", - "{}:WD".format(user_sid), - "/T", - "/C", - "/Q", - ], - nospin=True, - return_object=True, - # 2020-06-12 Yukihiko Shinoda - # There are 3 way to get system default encoding in Stack Overflow. - # see: https://stackoverflow.com/questions/37506535/how-to-get-the-system-default-encoding-in-python-2-x - # I investigated these way by using Shift-JIS Windows. - # >>> import locale - # >>> locale.getpreferredencoding() - # "cp932" (Shift-JIS) - # >>> import sys - # >>> sys.getdefaultencoding() - # "utf-8" - # >>> sys.stdout.encoding - # "UTF8" - encoding=locale.getpreferredencoding(), - ) - if not c.err and c.returncode == 0: - return - - if not os.path.isdir(fn): - for path in [fn, os.path.dirname(fn)]: - try: - os.chflags(path, 0) - except AttributeError: - pass - return None - for root, dirs, files in os.walk(fn, topdown=False): - for dir_ in [os.path.join(root, d) for d in dirs]: - set_write_bit(dir_) - for file_ in [os.path.join(root, f) for f in files]: - set_write_bit(file_) - - -def rmtree(directory: str, - ignore_errors: bool = False, - onerror: Optional[Callable] = None) -> None : - """Stand-in for :func:`~shutil.rmtree` with additional error-handling. - - This version of `rmtree` handles read-only paths, especially in the case of index - files written by certain source control systems. - - :param str directory: The target directory to remove - :param bool ignore_errors: Whether to ignore errors, defaults to False - :param func onerror: An error handling function, defaults to :func:`handle_remove_readonly` - - .. note:: - - Setting `ignore_errors=True` may cause this to silently fail to delete the path - """ - - if onerror is None: - onerror = handle_remove_readonly - try: - shutil.rmtree(directory, ignore_errors=ignore_errors, onerror=onerror) - except (IOError, OSError, FileNotFoundError, PermissionError) as exc: # noqa:B014 - # Ignore removal failures where the file doesn't exist - if exc.errno != errno.ENOENT: - raise - - -def _wait_for_files(path): # pragma: no cover - # type: (Union[str, TPath]) -> Optional[List[TPath]] - """Retry with backoff up to 1 second to delete files from a directory. - - :param str path: The path to crawl to delete files from - :return: A list of remaining paths or None - :rtype: Optional[List[str]] - """ - timeout = 0.001 - remaining = [] - while timeout < 1.0: - remaining = [] - if os.path.isdir(path): - L = os.listdir(path) - for target in L: - _remaining = _wait_for_files(target) - if _remaining: - remaining.extend(_remaining) - continue - try: - os.unlink(path) - except FileNotFoundError as e: - if e.errno == errno.ENOENT: - return - except (OSError, IOError, PermissionError): # noqa:B014 - time.sleep(timeout) - timeout *= 2 - remaining.append(path) - else: - return - return remaining - - -def handle_remove_readonly(func, path, exc): - # type: (Callable[..., str], TPath, Tuple[Type[OSError], OSError, TracebackType]) -> None - """Error handler for shutil.rmtree. - - Windows source repo folders are read-only by default, so this error handler - attempts to set them as writeable and then proceed with deletion. - - :param function func: The caller function - :param str path: The target path for removal - :param Exception exc: The raised exception - - This function will call check :func:`is_readonly_path` before attempting to call - :func:`set_write_bit` on the target path and try again. - """ - - PERM_ERRORS = (errno.EACCES, errno.EPERM, errno.ENOENT) - default_warning_message = "Unable to remove file due to permissions restriction: {!r}" - # split the initial exception out into its type, exception, and traceback - exc_type, exc_exception, exc_tb = exc - if is_readonly_path(path): - # Apply write permission and call original function - set_write_bit(path) - try: - func(path) - except ( # noqa:B014 - OSError, - IOError, - FileNotFoundError, - PermissionError, - ) as e: # pragma: no cover - if e.errno in PERM_ERRORS: - if e.errno == errno.ENOENT: - return - remaining = None - if os.path.isdir(path): - remaining = _wait_for_files(path) - if remaining: - warnings.warn(default_warning_message.format(path), ResourceWarning) - else: - func(path, ignore_errors=True) - return - - if exc_exception.errno in PERM_ERRORS: - set_write_bit(path) - remaining = _wait_for_files(path) - try: - func(path) - except (OSError, IOError, FileNotFoundError, PermissionError) as e: # noqa:B014 - if e.errno in PERM_ERRORS: - if e.errno != errno.ENOENT: # File still exists - warnings.warn(default_warning_message.format(path), ResourceWarning) - return - else: - raise exc_exception - - -def walk_up(bottom): - # type: (Union[TPath, str]) -> Generator[Tuple[str, List[str], List[str]], None, None] - """Mimic os.walk, but walk 'up' instead of down the directory tree. - - From: https://gist.github.com/zdavkeos/1098474 - """ - bottom = os.path.realpath(str(bottom)) - # Get files in current dir. - try: - names = os.listdir(bottom) - except Exception: - return - - dirs, nondirs = [], [] - for name in names: - if os.path.isdir(os.path.join(bottom, name)): - dirs.append(name) - else: - nondirs.append(name) - yield bottom, dirs, nondirs - - new_path = os.path.realpath(os.path.join(bottom, "..")) - # See if we are at the top. - if new_path == bottom: - return - - for x in walk_up(new_path): - yield x - - -def check_for_unc_path(path): - # type: (Path) -> bool - """Checks to see if a pathlib `Path` object is a unc path or not.""" - if ( - os.name == "nt" - and len(path.drive) > 2 - and not path.drive[0].isalpha() - and path.drive[1] != ":" - ): - return True - else: - return False - - -def get_converted_relative_path(path, relative_to=None): - # type: (TPath, Optional[TPath]) -> str - """Convert `path` to be relative. - - Given a vague relative path, return the path relative to the given - location. - - :param str path: The location of a target path - :param str relative_to: The starting path to build against, optional - :returns: A relative posix-style path with a leading `./` - - This performs additional conversion to ensure the result is of POSIX form, - and starts with `./`, or is precisely `.`. - - >>> os.chdir('/home/user/code/myrepo/myfolder') - >>> vistir.path.get_converted_relative_path('/home/user/code/file.zip') - './../../file.zip' - >>> vistir.path.get_converted_relative_path('/home/user/code/myrepo/myfolder/mysubfolder') - './mysubfolder' - >>> vistir.path.get_converted_relative_path('/home/user/code/myrepo/myfolder') - '.' - """ - from .misc import to_text, to_bytes # noqa - - if not relative_to: - relative_to = os.getcwd() - - path = to_text(path, encoding="utf-8") - relative_to = to_text(relative_to, encoding="utf-8") - start_path = Path(relative_to) - try: - start = start_path.resolve() - except OSError: - start = start_path.absolute() - - # check if there is a drive letter or mount point - # if it is a mountpoint use the original absolute path - # instead of the unc path - if check_for_unc_path(start): - start = start_path.absolute() - - path = start.joinpath(path).relative_to(start) - - # check and see if the path that was passed into the function is a UNC path - # and raise value error if it is not. - if check_for_unc_path(path): - raise ValueError("The path argument does not currently accept UNC paths") - - relpath_s = to_text(posixpath.normpath(path.as_posix())) - if not (relpath_s == "." or relpath_s.startswith("./")): - relpath_s = posixpath.join(".", relpath_s) - return relpath_s - - -def safe_expandvars(value): - # type: (TPath) -> str - """Call os.path.expandvars if value is a string, otherwise do nothing.""" - if isinstance(value, str): - return os.path.expandvars(value) - return value # type: ignore diff --git a/pipenv/vendor/vistir/termcolors.py b/pipenv/vendor/vistir/termcolors.py deleted file mode 100644 index d5b34cf4..00000000 --- a/pipenv/vendor/vistir/termcolors.py +++ /dev/null @@ -1,123 +0,0 @@ -# -*- coding=utf-8 -*- -import os -import re - -import pipenv.vendor.colorama as colorama - -from .misc import to_text as to_native_string - -DISABLE_COLORS = os.getenv("CI", False) or os.getenv( - "ANSI_COLORS_DISABLED", os.getenv("VISTIR_DISABLE_COLORS", False) -) - - -ATTRIBUTE_NAMES = ["bold", "dark", "", "underline", "blink", "", "reverse", "concealed"] -ATTRIBUTES = dict(zip(ATTRIBUTE_NAMES, range(1, 9))) -del ATTRIBUTES[""] - -colors = ["grey", "red", "green", "yellow", "blue", "magenta", "cyan", "white"] -COLORS = dict(zip(colors, range(30, 38))) -HIGHLIGHTS = dict(zip(["on_{0}".format(c) for c in colors], range(40, 48))) -ANSI_REMOVAL_RE = re.compile(r"\033\[((?:\d|;)*)([a-zA-Z])") - - -COLOR_MAP = { - # name: type - "blink": "attrs", - "bold": "attrs", - "concealed": "attrs", - "dark": "attrs", - "reverse": "attrs", - "underline": "attrs", - "blue": "color", - "cyan": "color", - "green": "color", - "magenta": "color", - "red": "color", - "white": "color", - "yellow": "color", - "on_blue": "on_color", - "on_cyan": "on_color", - "on_green": "on_color", - "on_grey": "on_color", - "on_magenta": "on_color", - "on_red": "on_color", - "on_white": "on_color", - "on_yellow": "on_color", -} -COLOR_ATTRS = COLOR_MAP.keys() - - -RESET = colorama.Style.RESET_ALL - - -def colored(text, color=None, on_color=None, attrs=None): - """Colorize text using a reimplementation of the colorizer from - https://github.com/pavdmyt/yaspin so that it works on windows. - - Available text colors: - red, green, yellow, blue, magenta, cyan, white. - - Available text highlights: - on_red, on_green, on_yellow, on_blue, on_magenta, on_cyan, on_white. - - Available attributes: - bold, dark, underline, blink, reverse, concealed. - - Example: - colored('Hello, World!', 'red', 'on_grey', ['blue', 'blink']) - colored('Hello, World!', 'green') - """ - return colorize(text, fg=color, bg=on_color, attrs=attrs) - - -def colorize(text, fg=None, bg=None, attrs=None): - if os.getenv("ANSI_COLORS_DISABLED") is None: - style = "NORMAL" - if attrs is not None and not isinstance(attrs, list): - _attrs = [] - if isinstance(attrs, str): - _attrs.append(attrs) - else: - _attrs = list(attrs) - attrs = _attrs - if attrs and "bold" in attrs: - style = "BRIGHT" - attrs.remove("bold") - if fg is not None: - fg = fg.upper() - text = to_native_string("%s%s%s%s%s") % ( - to_native_string(getattr(colorama.Fore, fg)), - to_native_string(getattr(colorama.Style, style)), - to_native_string(text), - to_native_string(colorama.Fore.RESET), - to_native_string(colorama.Style.NORMAL), - ) - - if bg is not None: - bg = bg.upper() - text = to_native_string("%s%s%s%s") % ( - to_native_string(getattr(colorama.Back, bg)), - to_native_string(text), - to_native_string(colorama.Back.RESET), - to_native_string(colorama.Style.NORMAL), - ) - - if attrs is not None: - fmt_str = to_native_string("%s[%%dm%%s%s[9m") % (chr(27), chr(27)) - for attr in attrs: - text = fmt_str % (ATTRIBUTES[attr], text) - - text += RESET - else: - text = ANSI_REMOVAL_RE.sub("", text) - return text - - -def cprint(text, color=None, on_color=None, attrs=None, **kwargs): - """Print colorize text. - - It accepts arguments of print function. - """ - - print((colored(text, color, on_color, attrs)), **kwargs) diff --git a/tasks/release.py b/tasks/release.py index 8e796b64..69adb8ac 100644 --- a/tasks/release.py +++ b/tasks/release.py @@ -9,7 +9,7 @@ import invoke from parver import Version from pipenv.__version__ import __version__ -from pipenv.vendor.vistir.contextmanagers import temp_environ +from pipenv.vendor.requirementslib.utils import temp_environ from .vendoring import _get_git_root, drop_dir diff --git a/tasks/vendoring/__init__.py b/tasks/vendoring/__init__.py index b6851457..2f069aad 100644 --- a/tasks/vendoring/__init__.py +++ b/tasks/vendoring/__init__.py @@ -15,6 +15,8 @@ import invoke import requests from urllib3.util import parse_url as urllib3_parse +from pipenv.vendor.requirementslib.fileutils import open_file + TASK_NAME = "update" LIBRARY_DIRNAMES = { @@ -328,11 +330,6 @@ def post_install_cleanup(ctx, vendor_dir): drop_dir(vendor_dir / "colorama" / "tests") remove_all(vendor_dir.glob("toml.py")) - # this function is called twice hence try ... except ... - try: - (vendor_dir / "vistir" / "spin.py").unlink() - except FileNotFoundError: - pass @invoke.task @@ -770,8 +767,6 @@ def main(ctx, package=None, type=None): @invoke.task def vendor_artifact(ctx, package, version=None): - from pipenv.vendor.vistir.contextmanagers import open_file - simple = requests.get(f"https://pypi.org/simple/{package}/") pkg_str = f"{package}-{version}" soup = bs4.BeautifulSoup(simple.content) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 928ff397..276673d6 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -21,10 +21,9 @@ from pipenv.cli import cli from pipenv.exceptions import VirtualenvActivationException from pipenv.utils.processes import subprocess_run from pipenv.vendor import toml, tomlkit -from pipenv.vendor.vistir.contextmanagers import temp_environ -from pipenv.vendor.vistir.path import ( - create_tracked_tempdir, handle_remove_readonly -) +from pipenv.vendor.requirementslib.fileutils import create_tracked_tempdir +from pipenv.vendor.requirementslib.utils import temp_environ +from pipenv.vendor.requirementslib.models.setup_info import handle_remove_readonly log = logging.getLogger(__name__) warnings.simplefilter("default", category=ResourceWarning) @@ -137,7 +136,7 @@ def _create_tracked_dir(): @pytest.fixture -def vistir_tmpdir(): +def tracked_tmpdir(): temp_path = _create_tracked_dir() yield Path(temp_path) @@ -159,7 +158,7 @@ def local_tempdir(request): @pytest.fixture(name='create_tmpdir') -def vistir_tmpdir_factory(): +def tmpdir_factory(): def create_tmpdir(): return Path(_create_tracked_dir()) @@ -417,9 +416,9 @@ def _rmtree_func(path, ignore_errors=True, onerror=None): @pytest.fixture() -def pip_src_dir(request, vistir_tmpdir): +def pip_src_dir(request, tracked_tmpdir): old_src_dir = os.environ.get('PIP_SRC', '') - os.environ['PIP_SRC'] = vistir_tmpdir.as_posix() + os.environ['PIP_SRC'] = tracked_tmpdir.as_posix() def finalize(): os.environ['PIP_SRC'] = old_src_dir @@ -509,8 +508,8 @@ class VirtualEnv: @pytest.fixture() -def virtualenv(vistir_tmpdir): - with VirtualEnv(base_dir=vistir_tmpdir) as venv: +def virtualenv(tracked_tmpdir): + with VirtualEnv(base_dir=tracked_tmpdir) as venv: yield venv diff --git a/tests/integration/test_lock.py b/tests/integration/test_lock.py index 83b23353..cbf33639 100644 --- a/tests/integration/test_lock.py +++ b/tests/integration/test_lock.py @@ -5,7 +5,6 @@ from pathlib import Path import pytest from flaky import flaky -from pipenv.vendor.vistir.misc import to_text from pipenv.utils.shell import temp_environ @@ -193,7 +192,7 @@ def test_keep_outdated_keeps_markers_not_removed(pipenv_instance_pypi): lockfile_json = json.loads(lockfile_content) assert "six" in lockfile_json["default"] lockfile_json["default"]["six"]["markers"] = "python_version >= '2.7'" - lockfile.write_text(to_text(json.dumps(lockfile_json))) + lockfile.write_text(json.dumps(lockfile_json)) c = p.pipenv("lock --keep-outdated") assert c.returncode == 0 assert p.lockfile["default"]["six"].get("markers", "") == "python_version >= '2.7'" diff --git a/tests/integration/test_project.py b/tests/integration/test_project.py index 6616703a..ddd5436d 100644 --- a/tests/integration/test_project.py +++ b/tests/integration/test_project.py @@ -7,8 +7,9 @@ import pytest from pipenv.project import Project from pipenv.utils.shell import temp_environ -from pipenv.vendor.vistir.path import is_in_path, normalize_path from pipenv.vendor.plette import Pipfile +from pipenv.vendor.requirementslib.fileutils import normalize_path +from pipenv.vendor.pythonfinder.utils import is_in_path @pytest.mark.project diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index 10c2f6bb..2702ea05 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -23,7 +23,7 @@ def test_suppress_nested_venv_warning(capsys, project): def test_load_dot_env_from_environment_variable_location(monkeypatch, capsys, project): with temp_environ(), monkeypatch.context() as m, TemporaryDirectory(prefix='pipenv-', suffix='') as tempdir: if os.name == "nt": - import click + from pipenv.vendor import click is_console = False m.setattr(click._winconsole, "_is_console", lambda x: is_console) dotenv_path = os.path.join(tempdir, 'test.env') @@ -40,7 +40,7 @@ def test_load_dot_env_from_environment_variable_location(monkeypatch, capsys, pr def test_doesnt_load_dot_env_if_disabled(monkeypatch, capsys, project): with temp_environ(), monkeypatch.context() as m, TemporaryDirectory(prefix='pipenv-', suffix='') as tempdir: if os.name == "nt": - import click + from pipenv.vendor import click is_console = False m.setattr(click._winconsole, "_is_console", lambda x: is_console) dotenv_path = os.path.join(tempdir, 'test.env') @@ -61,7 +61,7 @@ def test_doesnt_load_dot_env_if_disabled(monkeypatch, capsys, project): def test_load_dot_env_warns_if_file_doesnt_exist(monkeypatch, capsys, project): with temp_environ(), monkeypatch.context() as m, TemporaryDirectory(prefix='pipenv-', suffix='') as tempdir: if os.name == "nt": - import click + from pipenv.vendor import click is_console = False m.setattr(click._winconsole, "_is_console", lambda x: is_console) dotenv_path = os.path.join(tempdir, 'does-not-exist.env')