Try switching from delegator to subprocess

This commit is contained in:
Frost Ming
2021-07-28 16:39:36 +08:00
parent 0255cc6839
commit a82bbb7b27
22 changed files with 305 additions and 703 deletions
+8 -9
View File
@@ -1,4 +1,5 @@
import os import os
from pipenv.utils import subprocess_run
import sys import sys
from click import ( from click import (
@@ -9,7 +10,7 @@ from ..__version__ import __version__
from .._compat import fix_utf8 from .._compat import fix_utf8
from ..exceptions import PipenvOptionsError from ..exceptions import PipenvOptionsError
from ..patched import crayons from ..patched import crayons
from ..vendor import click_completion, delegator from ..vendor import click_completion
from .options import ( from .options import (
CONTEXT_SETTINGS, PipenvGroup, code_option, common_options, deploy_option, CONTEXT_SETTINGS, PipenvGroup, code_option, common_options, deploy_option,
general_options, install_options, lock_options, pass_state, general_options, install_options, lock_options, pass_state,
@@ -641,18 +642,16 @@ def run_open(state, module, *args, **kwargs):
three=state.three, python=state.python, three=state.three, python=state.python,
validate=False, pypi_mirror=state.pypi_mirror, validate=False, pypi_mirror=state.pypi_mirror,
) )
c = delegator.run( c = subprocess_run(
'{0} -c "import {1}; print({1}.__file__);"'.format(which("python"), module) which("python"), "-c", "import {0}; print({0}.__file__)".format(module)
) )
try: if c.returncode:
assert c.return_code == 0
except AssertionError:
echo(crayons.red("Module not found!")) echo(crayons.red("Module not found!"))
sys.exit(1) sys.exit(1)
if "__init__.py" in c.out: if "__init__.py" in c.stdout:
p = os.path.dirname(c.out.strip().rstrip("cdo")) p = os.path.dirname(c.stdout.strip().rstrip("cdo"))
else: else:
p = c.out.strip().rstrip("cdo") p = c.stdout.strip().rstrip("cdo")
echo(crayons.normal(f"Opening {p!r} in your EDITOR.", bold=True)) echo(crayons.normal(f"Opening {p!r} in your EDITOR.", bold=True))
inline_activate_virtual_environment() inline_activate_virtual_environment()
edit(filename=p) edit(filename=p)
+4
View File
@@ -43,6 +43,10 @@ class Script(object):
def args(self): def args(self):
return self._parts[1:] return self._parts[1:]
@property
def cmd_args(self):
return self._parts
def extend(self, extra_args): def extend(self, extra_args):
self._parts.extend(extra_args) self._parts.extend(extra_args)
+73 -75
View File
@@ -8,32 +8,30 @@ import warnings
import click import click
import delegator
import dotenv import dotenv
import pipfile import pipfile
import vistir import vistir
from click_completion import init as init_completion from click_completion import init as init_completion
from . import environments, exceptions, pep508checker, progress from pipenv import environments, exceptions, pep508checker, progress
from ._compat import decode_for_output, fix_utf8 from pipenv._compat import decode_for_output, fix_utf8
from .cmdparse import Script from pipenv.environments import (
from .environments import (
PIP_EXISTS_ACTION, PIPENV_CACHE_DIR, PIPENV_COLORBLIND, PIP_EXISTS_ACTION, PIPENV_CACHE_DIR, PIPENV_COLORBLIND,
PIPENV_DEFAULT_PYTHON_VERSION, PIPENV_DONT_USE_PYENV, PIPENV_DONT_USE_ASDF, PIPENV_DEFAULT_PYTHON_VERSION, PIPENV_DONT_USE_PYENV, PIPENV_DONT_USE_ASDF,
PIPENV_HIDE_EMOJIS, PIPENV_MAX_SUBPROCESS, PIPENV_PYUP_API_KEY, PIPENV_HIDE_EMOJIS, PIPENV_MAX_SUBPROCESS, PIPENV_PYUP_API_KEY,
PIPENV_RESOLVE_VCS, PIPENV_SHELL_FANCY, PIPENV_SKIP_VALIDATION, PIPENV_YES, PIPENV_RESOLVE_VCS, PIPENV_SHELL_FANCY, PIPENV_SKIP_VALIDATION, PIPENV_YES,
SESSION_IS_INTERACTIVE, is_type_checking SESSION_IS_INTERACTIVE, is_type_checking
) )
from .patched import crayons from pipenv.patched import crayons
from .project import Project from pipenv.project import Project
from .utils import ( from pipenv.utils import (
convert_deps_to_pip, create_spinner, download_file, convert_deps_to_pip, create_spinner, download_file,
escape_grouped_arguments, find_python, find_windows_executable, escape_grouped_arguments, find_python, find_windows_executable,
get_canonical_names, get_source_list, interrupt_handled_subprocess, get_canonical_names, get_source_list, interrupt_handled_subprocess,
is_pinned, is_python_command, is_required_version, is_star, is_valid_url, is_pinned, is_python_command, is_required_version, is_star, is_valid_url,
parse_indexes, pep423_name, prepare_pip_source_args, proper_case, parse_indexes, pep423_name, prepare_pip_source_args, proper_case,
python_version, run_command, venv_resolve_deps python_version, run_command, subprocess_run, venv_resolve_deps
) )
@@ -454,7 +452,7 @@ def ensure_python(three=None, python=None):
else: else:
sp.ok(environments.PIPENV_SPINNER_OK_TEXT.format("Success!")) sp.ok(environments.PIPENV_SPINNER_OK_TEXT.format("Success!"))
# Print the results, in a beautiful blue... # Print the results, in a beautiful blue...
click.echo(crayons.cyan(c.out), err=True) click.echo(crayons.cyan(c.stdout), err=True)
# Clear the pythonfinder caches # Clear the pythonfinder caches
from .vendor.pythonfinder import Finder from .vendor.pythonfinder import Finder
finder = Finder(system=False, global_search=True) finder = Finder(system=False, global_search=True)
@@ -661,21 +659,21 @@ def do_where(virtualenv=False, bare=True):
def _cleanup_procs(procs, failed_deps_queue, retry=True): def _cleanup_procs(procs, failed_deps_queue, retry=True):
while not procs.empty(): while not procs.empty():
c = procs.get() c = procs.get()
if not c.blocking: c.wait()
c.block()
failed = False failed = False
if c.return_code != 0: if c.returncode != 0:
failed = True failed = True
if "Ignoring" in c.out: out, err = c.communicate()
click.echo(crayons.yellow(c.out.strip())) if "Ignoring" in out:
click.echo(crayons.yellow(out.strip()))
elif environments.is_verbose(): elif environments.is_verbose():
click.echo(crayons.cyan(c.out.strip() or c.err.strip())) click.echo(crayons.cyan(out.strip() or err.strip()))
# The Installation failed... # The Installation failed...
if failed: if failed:
# If there is a mismatch in installed locations or the install fails # If there is a mismatch in installed locations or the install fails
# due to wrongful disabling of pep517, we should allow for # due to wrongful disabling of pep517, we should allow for
# additional passes at installation # additional passes at installation
if "does not match installed location" in c.err: if "does not match installed location" in err:
project.environment.expand_egg_links() project.environment.expand_egg_links()
click.echo("{}".format( click.echo("{}".format(
crayons.yellow( crayons.yellow(
@@ -686,14 +684,14 @@ def _cleanup_procs(procs, failed_deps_queue, retry=True):
)) ))
dep = c.dep.copy() dep = c.dep.copy()
dep.use_pep517 = True dep.use_pep517 = True
elif "Disabling PEP 517 processing is invalid" in c.err: elif "Disabling PEP 517 processing is invalid" in err:
dep = c.dep.copy() dep = c.dep.copy()
dep.use_pep517 = True dep.use_pep517 = True
elif not retry: elif not retry:
# The Installation failed... # The Installation failed...
# We echo both c.out and c.err because pip returns error details on out. # We echo both c.stdout and c.stderr because pip returns error details on out.
err = c.err.strip().splitlines() if c.err else [] err = err.strip().splitlines() if err else []
out = c.out.strip().splitlines() if c.out else [] out = out.strip().splitlines() if out else []
err_lines = [line for message in [out, err] for line in message] err_lines = [line for message in [out, err] for line in message]
# Return the subprocess' return code. # Return the subprocess' return code.
raise exceptions.InstallError(c.dep.name, extra=err_lines) raise exceptions.InstallError(c.dep.name, extra=err_lines)
@@ -785,7 +783,7 @@ def batch_install(deps_list, procs, failed_deps_queue,
# if dep.is_vcs or dep.editable: # if dep.is_vcs or dep.editable:
is_sequential = sequential_deps and dep.name in sequential_dep_names is_sequential = sequential_deps and dep.name in sequential_dep_names
if is_sequential: if is_sequential:
c.block() c.wait()
procs.put(c) procs.put(c)
if procs.full() or procs.qsize() == len(deps_list) or is_sequential: if procs.full() or procs.qsize() == len(deps_list) or is_sequential:
@@ -1022,12 +1020,13 @@ def get_downloads_info(names_map, section):
# Get the version info from the filenames. # Get the version info from the filenames.
version = parse_download_fname(fname, name) version = parse_download_fname(fname, name)
# Get the hash of each file. # Get the hash of each file.
cmd = '{} hash "{}"'.format( cmd = [
escape_grouped_arguments(which_pip()), escape_grouped_arguments(which_pip()),
"hash",
os.sep.join([project.download_location, fname]), os.sep.join([project.download_location, fname]),
) ]
c = delegator.run(cmd) c = subprocess_run(cmd)
hash = c.out.split("--hash=")[1].strip() hash = c.stdout.split("--hash=")[1].strip()
# Verify we're adding the correct version from Pipfile # Verify we're adding the correct version from Pipfile
# and not one from a dependency. # and not one from a dependency.
specified_version = p[section].get(name, "") specified_version = p[section].get(name, "")
@@ -1177,17 +1176,18 @@ def do_purge(bare=False, downloads=False, allow_global=False):
fix_utf8(f"Found {len(to_remove)} installed package(s), purging...") fix_utf8(f"Found {len(to_remove)} installed package(s), purging...")
) )
command = "{} uninstall {} -y".format( command = [
escape_grouped_arguments(which_pip(allow_global=allow_global)), escape_grouped_arguments(which_pip(allow_global=allow_global)),
"uninstall", "-y",
" ".join(to_remove), " ".join(to_remove),
) ]
if environments.is_verbose(): if environments.is_verbose():
click.echo(f"$ {command}") click.echo(f"$ {' '.join(command)}")
c = delegator.run(command) c = subprocess_run(command)
if c.return_code != 0: if c.returncode != 0:
raise exceptions.UninstallError(installed, command, c.out + c.err, c.return_code) raise exceptions.UninstallError(installed, ' '.join(command), c.stdout + c.stderr, c.returncode)
if not bare: if not bare:
click.echo(crayons.cyan(c.out)) click.echo(crayons.cyan(c.stdout))
click.echo(crayons.green("Environment now purged and fresh!")) click.echo(crayons.green("Environment now purged and fresh!"))
return installed return installed
@@ -1485,7 +1485,7 @@ def pip_install(
err=True, err=True,
) )
pip_command = [which_pip(allow_global=allow_global), "install"] pip_command = [which("python", allow_global=allow_global), "-m", "pip", "install"]
pip_args = get_pip_args( pip_args = get_pip_args(
pre=pre, verbose=environments.is_verbose(), upgrade=True, pre=pre, verbose=environments.is_verbose(), upgrade=True,
selective_upgrade=selective_upgrade, no_use_pep517=not use_pep517, selective_upgrade=selective_upgrade, no_use_pep517=not use_pep517,
@@ -1498,7 +1498,7 @@ def pip_install(
pip_command.extend(line) pip_command.extend(line)
pip_command.extend(prepare_pip_source_args(sources)) pip_command.extend(prepare_pip_source_args(sources))
if environments.is_verbose(): if environments.is_verbose():
click.echo(f"$ {pip_command}", err=True) click.echo(f"$ {' '.join(pip_command)}", err=True)
cache_dir = vistir.compat.Path(PIPENV_CACHE_DIR) cache_dir = vistir.compat.Path(PIPENV_CACHE_DIR)
DEFAULT_EXISTS_ACTION = "w" DEFAULT_EXISTS_ACTION = "w"
if selective_upgrade: if selective_upgrade:
@@ -1519,10 +1519,7 @@ def pip_install(
pip_config.update( pip_config.update(
{"PIP_SRC": vistir.misc.fs_str(src_dir)} {"PIP_SRC": vistir.misc.fs_str(src_dir)}
) )
cmd = Script.parse(pip_command) c = subprocess_run(pip_command, block=block, env=pip_config)
pip_command = cmd.cmdify()
c = None
c = delegator.run(pip_command, block=block, env=pip_config)
c.env = pip_config c.env = pip_config
return c return c
@@ -1537,14 +1534,15 @@ def pip_download(package_name):
), ),
} }
for source in project.sources: for source in project.sources:
cmd = '{} download "{}" -i {} -d {}'.format( cmd = [
escape_grouped_arguments(which_pip()), escape_grouped_arguments(which_pip()),
"download",
package_name, package_name,
source["url"], "-i", source["url"],
project.download_location, "-d", project.download_location,
) ]
c = delegator.run(cmd, env=pip_config) c = subprocess_run(cmd, env=pip_config)
if c.return_code == 0: if c.returncode == 0:
break break
return c return c
@@ -1616,10 +1614,10 @@ def system_which(command, mult=False):
}) })
result = None result = None
try: try:
c = delegator.run(f"{_which} {command}") c = subprocess_run([_which, command])
try: try:
# Which Not found... # Which Not found...
if c.return_code == 127: if c.returncode == 127:
click.echo( click.echo(
"{}: the {} system utility is required for Pipenv to find Python installations properly." "{}: the {} system utility is required for Pipenv to find Python installations properly."
"\n Please install it.".format( "\n Please install it.".format(
@@ -1627,7 +1625,7 @@ def system_which(command, mult=False):
), ),
err=True, err=True,
) )
assert c.return_code == 0 assert c.returncode == 0
except AssertionError: except AssertionError:
result = fallback_which(command, allow_global=True) result = fallback_which(command, allow_global=True)
except TypeError: except TypeError:
@@ -2116,19 +2114,19 @@ def do_install(
extra_indexes=extra_index_url, extra_indexes=extra_index_url,
pypi_mirror=pypi_mirror, pypi_mirror=pypi_mirror,
) )
if not c.ok: if c.returncode:
sp.write_err( sp.write_err(
"{} An error occurred while installing {}!".format( "{} An error occurred while installing {}!".format(
crayons.red("Error: ", bold=True), crayons.green(pkg_line) crayons.red("Error: ", bold=True), crayons.green(pkg_line)
), ),
) )
sp.write_err( sp.write_err(
vistir.compat.fs_str(f"Error text: {c.out}") vistir.compat.fs_str(f"Error text: {c.stdout}")
) )
sp.write_err(crayons.cyan(vistir.compat.fs_str(format_pip_error(c.err)))) sp.write_err(crayons.cyan(vistir.compat.fs_str(format_pip_error(c.stderr))))
if environments.is_verbose(): if environments.is_verbose():
sp.write_err(crayons.cyan(vistir.compat.fs_str(format_pip_output(c.out)))) sp.write_err(crayons.cyan(vistir.compat.fs_str(format_pip_output(c.stdout))))
if "setup.py egg_info" in c.err: if "setup.py egg_info" in c.stderr:
sp.write_err(vistir.compat.fs_str( sp.write_err(vistir.compat.fs_str(
"This is likely caused by a bug in {}. " "This is likely caused by a bug in {}. "
"Report this to its maintainers.".format( "Report this to its maintainers.".format(
@@ -2301,8 +2299,8 @@ def do_uninstall(
pip_path = which_pip(allow_global=system) pip_path = which_pip(allow_global=system)
cmd = [pip_path, "uninstall", package_name, "-y"] cmd = [pip_path, "uninstall", package_name, "-y"]
c = run_command(cmd) c = run_command(cmd)
click.echo(crayons.cyan(c.out)) click.echo(crayons.cyan(c.stdout))
if c.return_code != 0: if c.returncode != 0:
failure = True failure = True
if not failure and pipfile_remove: if not failure and pipfile_remove:
in_packages = project.get_package_name_in_pipfile(package_name, dev=False) in_packages = project.get_package_name_in_pipfile(package_name, dev=False)
@@ -2611,14 +2609,14 @@ def do_check(
# Run the PEP 508 checker in the virtualenv. # Run the PEP 508 checker in the virtualenv.
cmd = _cmd + [vistir.compat.Path(pep508checker_path).as_posix()] cmd = _cmd + [vistir.compat.Path(pep508checker_path).as_posix()]
c = run_command(cmd) c = run_command(cmd)
if c.return_code is not None: if c.returncode is not None:
try: try:
results = simplejson.loads(c.out.strip()) results = simplejson.loads(c.stdout.strip())
except JSONDecodeError: except JSONDecodeError:
click.echo("{}\n{}\n{}".format( click.echo("{}\n{}\n{}".format(
crayons.white(decode_for_output("Failed parsing pep508 results: "), bold=True), crayons.white(decode_for_output("Failed parsing pep508 results: "), bold=True),
c.out.strip(), c.stdout.strip(),
c.err.strip() c.stderr.strip()
)) ))
sys.exit(1) sys.exit(1)
# Load the pipfile. # Load the pipfile.
@@ -2681,11 +2679,11 @@ def do_check(
c = run_command(cmd, catch_exceptions=False) c = run_command(cmd, catch_exceptions=False)
if output == "default": if output == "default":
try: try:
results = simplejson.loads(c.out) results = simplejson.loads(c.stdout)
except (ValueError, JSONDecodeError): except (ValueError, JSONDecodeError):
raise exceptions.JSONParseError(c.out, c.err) raise exceptions.JSONParseError(c.stdout, c.stderr)
except Exception: except Exception:
raise exceptions.PipenvCmdError(c.cmd, c.out, c.err, c.return_code) raise exceptions.PipenvCmdError(' '.join(c.args), c.stdout, c.stderr, c.returncode)
for (package, resolved, installed, description, vuln) in results: for (package, resolved, installed, description, vuln) in results:
click.echo( click.echo(
"{}: {} {} resolved ({} installed)!".format( "{}: {} {} resolved ({} installed)!".format(
@@ -2697,14 +2695,14 @@ def do_check(
) )
click.echo(f"{description}") click.echo(f"{description}")
click.echo() click.echo()
if c.ok: if c.returncode == 0:
click.echo(crayons.green("All good!")) click.echo(crayons.green("All good!"))
sys.exit(0) sys.exit(0)
else: else:
sys.exit(1) sys.exit(1)
else: else:
click.echo(c.out) click.echo(c.stdout)
sys.exit(c.return_code) sys.exit(c.returncode)
def do_graph(bare=False, json=False, json_tree=False, reverse=False): def do_graph(bare=False, json=False, json_tree=False, reverse=False):
@@ -2786,9 +2784,9 @@ def do_graph(bare=False, json=False, json_tree=False, reverse=False):
if json: if json:
data = [] data = []
try: try:
parsed = simplejson.loads(c.out.strip()) parsed = simplejson.loads(c.stdout.strip())
except JSONDecodeError: except JSONDecodeError:
raise exceptions.JSONParseError(c.out, c.err) raise exceptions.JSONParseError(c.stdout, c.stderr)
else: else:
for d in parsed: for d in parsed:
if d["package"]["key"] not in BAD_PACKAGES: if d["package"]["key"] not in BAD_PACKAGES:
@@ -2809,15 +2807,15 @@ def do_graph(bare=False, json=False, json_tree=False, reverse=False):
return obj return obj
try: try:
parsed = simplejson.loads(c.out.strip()) parsed = simplejson.loads(c.stdout.strip())
except JSONDecodeError: except JSONDecodeError:
raise exceptions.JSONParseError(c.out, c.err) raise exceptions.JSONParseError(c.stdout, c.stderr)
else: else:
data = traverse(parsed) data = traverse(parsed)
click.echo(simplejson.dumps(data, indent=4)) click.echo(simplejson.dumps(data, indent=4))
sys.exit(0) sys.exit(0)
else: else:
for line in c.out.strip().split("\n"): for line in c.stdout.strip().split("\n"):
# Ignore bad packages as top level. # Ignore bad packages as top level.
# TODO: This should probably be a "==" in + line.partition # TODO: This should probably be a "==" in + line.partition
if line.split("==")[0] in BAD_PACKAGES and not reverse: if line.split("==")[0] in BAD_PACKAGES and not reverse:
@@ -2830,17 +2828,17 @@ def do_graph(bare=False, json=False, json_tree=False, reverse=False):
else: else:
click.echo(crayons.normal(line, bold=False)) click.echo(crayons.normal(line, bold=False))
else: else:
click.echo(c.out) click.echo(c.stdout)
if c.return_code != 0: if c.returncode != 0:
click.echo( click.echo(
"{} {}".format( "{} {}".format(
crayons.red("ERROR: ", bold=True), crayons.red("ERROR: ", bold=True),
crayons.white(f"{c.err}"), crayons.white(f"{c.stderr}"),
), ),
err=True, err=True,
) )
# Return its return code. # Return its return code.
sys.exit(c.return_code) sys.exit(c.returncode)
def do_sync( def do_sync(
@@ -2928,6 +2926,6 @@ def do_clean(
# Uninstall the package. # Uninstall the package.
cmd = [which_pip(), "uninstall", apparent_bad_package, "-y"] cmd = [which_pip(), "uninstall", apparent_bad_package, "-y"]
c = run_command(cmd) c = run_command(cmd)
if c.return_code != 0: if c.returncode != 0:
failure = True failure = True
sys.exit(int(failure)) sys.exit(int(failure))
+9 -10
View File
@@ -4,9 +4,9 @@ import re
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from .environments import PIPENV_INSTALL_TIMEOUT from pipenv.environments import PIPENV_INSTALL_TIMEOUT
from .vendor import attr, delegator from pipenv.vendor import attr
from .utils import find_windows_executable from pipenv.utils import find_windows_executable, subprocess_run
@attr.s @attr.s
@@ -59,8 +59,8 @@ class InstallerNotFound(RuntimeError):
class InstallerError(RuntimeError): class InstallerError(RuntimeError):
def __init__(self, desc, c): def __init__(self, desc, c):
super().__init__(desc) super().__init__(desc)
self.out = c.out self.out = c.stdout
self.err = c.err self.err = c.stderr
class Installer(metaclass=ABCMeta): class Installer(metaclass=ABCMeta):
@@ -114,14 +114,13 @@ class Installer(metaclass=ABCMeta):
raise InstallerNotFound() raise InstallerNotFound()
def _run(self, *args, **kwargs): def _run(self, *args, **kwargs):
timeout = kwargs.pop('timeout', delegator.TIMEOUT) timeout = kwargs.pop('timeout', 30)
if kwargs: if kwargs:
k = list(kwargs.keys())[0] k = list(kwargs.keys())[0]
raise TypeError(f'unexpected keyword argument {k!r}') raise TypeError(f'unexpected keyword argument {k!r}')
args = (self.cmd,) + tuple(args) args = (self.cmd,) + tuple(args)
c = delegator.run(args, block=False, timeout=timeout) c = subprocess_run(args, timeout=timeout)
c.block() if c.returncode != 0:
if c.return_code != 0:
raise InstallerError(f'failed to run {args}', c) raise InstallerError(f'failed to run {args}', c)
return c return c
@@ -201,7 +200,7 @@ class Asdf(Installer):
def iter_installable_versions(self): def iter_installable_versions(self):
"""Iterate through CPython versions available for asdf to install. """Iterate through CPython versions available for asdf to install.
""" """
for name in self._run('list-all', 'python').out.splitlines(): for name in self._run('list-all', 'python').stdout.splitlines():
try: try:
version = Version.parse(name.strip()) version = Version.parse(name.strip())
except ValueError: except ValueError:
+64 -91
View File
@@ -1,3 +1,4 @@
import subprocess
import contextlib import contextlib
import errno import errno
import logging import logging
@@ -21,6 +22,7 @@ import parse
import tomlkit import tomlkit
from . import environments from . import environments
from ._compat import DEFAULT_ENCODING
from .exceptions import PipenvCmdError, PipenvUsageError, RequirementError, ResolutionFailure from .exceptions import PipenvCmdError, PipenvUsageError, RequirementError, ResolutionFailure
from .pep508checker import lookup from .pep508checker import lookup
from .vendor.packaging.markers import Marker from .vendor.packaging.markers import Marker
@@ -129,7 +131,6 @@ def run_command(cmd, *args, **kwargs):
:raises: exceptions.PipenvCmdError :raises: exceptions.PipenvCmdError
""" """
from pipenv.vendor import delegator
from ._compat import decode_for_output from ._compat import decode_for_output
from .cmdparse import Script from .cmdparse import Script
catch_exceptions = kwargs.pop("catch_exceptions", True) catch_exceptions = kwargs.pop("catch_exceptions", True)
@@ -140,21 +141,16 @@ def run_command(cmd, *args, **kwargs):
if "env" not in kwargs: if "env" not in kwargs:
kwargs["env"] = os.environ.copy() kwargs["env"] = os.environ.copy()
kwargs["env"]["PYTHONIOENCODING"] = "UTF-8" kwargs["env"]["PYTHONIOENCODING"] = "UTF-8"
try: command = [cmd.command, *cmd.args]
cmd_string = cmd.cmdify()
except TypeError:
click_echo(f"Error turning command into string: {cmd}", err=True)
sys.exit(1)
if environments.is_verbose(): if environments.is_verbose():
click_echo(f"Running command: $ {cmd_string}") click_echo(f"Running command: $ {cmd.cmdify()}")
c = delegator.run(cmd_string, *args, **kwargs) c = subprocess_run(command, *args, **kwargs)
return_code = c.return_code
if environments.is_verbose(): if environments.is_verbose():
click_echo("Command output: {}".format( click_echo("Command output: {}".format(
crayons.cyan(decode_for_output(c.out)) crayons.cyan(decode_for_output(c.stdout))
), err=True) ), err=True)
if not c.ok and catch_exceptions: if c.returncode and catch_exceptions:
raise PipenvCmdError(cmd_string, c.out, c.err, return_code) raise PipenvCmdError(cmd.cmdify(), c.stdout, c.stderr, c.returncode)
return c return c
@@ -1131,50 +1127,30 @@ def create_spinner(text, nospin=None, spinner_name=None):
def resolve(cmd, sp): def resolve(cmd, sp):
from .cmdparse import Script from .cmdparse import Script
from .vendor import delegator
from .vendor.pexpect.exceptions import EOF, TIMEOUT
from .vendor.vistir.compat import to_native_string
from .vendor.vistir.misc import echo from .vendor.vistir.misc import echo
EOF.__module__ = "pexpect.exceptions"
from ._compat import decode_output from ._compat import decode_output
c = delegator.run(Script.parse(cmd).cmdify(), block=False, env=os.environ.copy()) c = subprocess_run(Script.parse(cmd).cmd_args, block=False, env=os.environ.copy())
if environments.is_verbose(): err = ""
c.subprocess.logfile = sys.stderr for line in iter(c.stderr.readline, ""):
_out = decode_output("") line = decode_output(line)
result = None err += line
out = to_native_string("") if environments.is_verbose() and line.rstrip():
while True: sp.hide_and_write(line.rstrip())
result = None
try: c.wait()
result = c.expect("\n", timeout=environments.PIPENV_INSTALL_TIMEOUT) returncode = c.poll()
except TIMEOUT: out = c.stdout.read()
pass if returncode != 0:
except EOF:
break
except KeyboardInterrupt:
c.kill()
break
if result:
_out = c.subprocess.before
_out = decode_output(f"{_out}")
out += _out
# sp.text = to_native_string("{0}".format(_out[:100]))
if environments.is_verbose():
sp.hide_and_write(out.splitlines()[-1].rstrip())
else:
break
c.block()
if c.return_code != 0:
sp.red.fail(environments.PIPENV_SPINNER_FAIL_TEXT.format( sp.red.fail(environments.PIPENV_SPINNER_FAIL_TEXT.format(
"Locking Failed!" "Locking Failed!"
)) ))
echo(c.out.strip(), err=True) echo(out.strip(), err=True)
if not environments.is_verbose(): if not environments.is_verbose():
echo(out, err=True) echo(err, err=True)
sys.exit(c.return_code) sys.exit(returncode)
if environments.is_verbose(): if environments.is_verbose():
echo(c.err.strip(), err=True) echo(out.strip(), err=True)
return c return subprocess.CompletedProcess(c.args, returncode, out, err)
def get_locked_dep(dep, pipfile_section, prefer_pipfile=True): def get_locked_dep(dep, pipfile_section, prefer_pipfile=True):
@@ -1334,21 +1310,21 @@ def venv_resolve_deps(
os.environ["PIPENV_PACKAGES"] = str("\n".join(constraints)) os.environ["PIPENV_PACKAGES"] = str("\n".join(constraints))
sp.write(decode_for_output("Resolving dependencies...")) sp.write(decode_for_output("Resolving dependencies..."))
c = resolve(cmd, sp) c = resolve(cmd, sp)
results = c.out.strip() results = c.stdout.strip()
if c.ok: if c.returncode == 0:
sp.green.ok(environments.PIPENV_SPINNER_OK_TEXT.format("Success!")) sp.green.ok(environments.PIPENV_SPINNER_OK_TEXT.format("Success!"))
if not environments.is_verbose() and c.out.strip(): if not environments.is_verbose() and c.stdout.strip():
click_echo(crayons.yellow(f"Warning: {c.out.strip()}"), err=True) click_echo(crayons.yellow(f"Warning: {c.stdout.strip()}"), err=True)
else: else:
sp.red.fail(environments.PIPENV_SPINNER_FAIL_TEXT.format("Locking Failed!")) sp.red.fail(environments.PIPENV_SPINNER_FAIL_TEXT.format("Locking Failed!"))
click_echo(f"Output: {c.out.strip()}", err=True) click_echo(f"Output: {c.stdout.strip()}", err=True)
click_echo(f"Error: {c.err.strip()}", err=True) click_echo(f"Error: {c.stderr.strip()}", err=True)
try: try:
with open(target_file.name) as fh: with open(target_file.name) as fh:
results = json.load(fh) results = json.load(fh)
except (IndexError, JSONDecodeError): except (IndexError, JSONDecodeError):
click_echo(c.out.strip(), err=True) click_echo(c.stdout.strip(), err=True)
click_echo(c.err.strip(), err=True) click_echo(c.stderr.strip(), err=True)
if os.path.exists(target_file.name): if os.path.exists(target_file.name):
os.unlink(target_file.name) os.unlink(target_file.name)
raise RuntimeError("There was a problem with locking.") raise RuntimeError("There was a problem with locking.")
@@ -1723,14 +1699,13 @@ def temp_path():
def load_path(python): def load_path(python):
from ._compat import Path from pathlib import Path
import delegator
import json import json
python = Path(python).as_posix() python = Path(python).as_posix()
json_dump_commmand = '"import json, sys; print(json.dumps(sys.path));"' json_dump_commmand = '"import json, sys; print(json.dumps(sys.path));"'
c = delegator.run(f'"{python}" -c {json_dump_commmand}') c = subprocess_run([python, "-c", json_dump_commmand])
if c.return_code == 0: if c.returncode == 0:
return json.loads(c.out.strip()) return json.loads(c.stdout.strip())
else: else:
return [] return []
@@ -2248,35 +2223,6 @@ def is_python_command(line):
return False return False
# def make_marker_from_specifier(spec):
# # type: (str) -> Optional[Marker]
# """Given a python version specifier, create a marker
# :param spec: A specifier
# :type spec: str
# :return: A new marker
# :rtype: Optional[:class:`packaging.marker.Marker`]
# """
# from .vendor.packaging.markers import Marker
# from .vendor.packaging.specifiers import SpecifierSet, Specifier
# from .vendor.requirementslib.models.markers import cleanup_pyspecs, format_pyversion
# if not any(spec.startswith(k) for k in Specifier._operators.keys()):
# if spec.strip().lower() in ["any", "<any>", "*"]:
# return None
# spec = "=={0}".format(spec)
# elif spec.startswith("==") and spec.count("=") > 3:
# spec = "=={0}".format(spec.lstrip("="))
# if not spec:
# return None
# marker_segments = []
# print(spec)
# for marker_segment in cleanup_pyspecs(spec):
# print(marker_segment)
# marker_segments.append(format_pyversion(marker_segment))
# marker_str = " and ".join(marker_segments)
# return Marker(marker_str)
@contextlib.contextmanager @contextlib.contextmanager
def interrupt_handled_subprocess( def interrupt_handled_subprocess(
cmd, verbose=False, return_object=True, write_to_stdout=False, combine_stderr=True, cmd, verbose=False, return_object=True, write_to_stdout=False, combine_stderr=True,
@@ -2312,3 +2258,30 @@ def interrupt_handled_subprocess(
os.kill(obj.pid, signal.SIGINT) os.kill(obj.pid, signal.SIGINT)
obj.wait() obj.wait()
raise raise
def subprocess_run(
args, *, block=True, text=True, capture_output=True,
encoding=DEFAULT_ENCODING, env=None, **other_kwargs
):
"""A backward compatible version of subprocess.run().
It outputs text with default encoding, and store all outputs in the returned object instead of
printing onto stdout.
"""
if env is not None:
env = dict(os.environ, **env)
other_kwargs['env'] = env
if block:
return subprocess.run(
args, universal_newlines=text, capture_output=capture_output,
encoding=encoding, **other_kwargs
)
else:
if capture_output:
other_kwargs['stdout'] = subprocess.PIPE
other_kwargs['stderr'] = subprocess.PIPE
return subprocess.Popen(
args, universal_newlines=text,
encoding=encoding, **other_kwargs
)
-341
View File
@@ -1,341 +0,0 @@
import os
import subprocess
import shlex
import signal
import sys
import locale
import errno
from pexpect.popen_spawn import PopenSpawn
import pexpect
pexpect.EOF.__module__ = "pexpect.exceptions"
# Include `unicode` in STR_TYPES for Python 2.X
try:
STR_TYPES = (str, unicode)
except NameError:
STR_TYPES = (str,)
TIMEOUT = 30
def pid_exists(pid):
"""Check whether pid exists in the current process table."""
if pid == 0:
# According to "man 2 kill" PID 0 has a special meaning:
# it refers to <<every process in the process group of the
# calling process>> so we don't want to go any further.
# If we get here it means this UNIX platform *does* have
# a process with id 0.
return True
try:
os.kill(pid, 0)
except OSError as err:
if err.errno == errno.ESRCH:
# ESRCH == No such process
return False
elif err.errno == errno.EPERM:
# EPERM clearly means there's a process to deny access to
return True
else:
# According to "man 2 kill" possible error values are
# (EINVAL, EPERM, ESRCH) therefore we should never get
# here. If we do let's be explicit in considering this
# an error.
raise err
else:
return True
class Command(object):
def __init__(self, cmd, timeout=TIMEOUT):
super(Command, self).__init__()
self.cmd = cmd
self.timeout = timeout
self.subprocess = None
self.blocking = None
self.was_run = False
self.__out = None
self.__err = None
def __repr__(self):
return "<Command {!r}>".format(self.cmd)
@property
def _popen_args(self):
return self.cmd
@property
def _default_popen_kwargs(self):
return {
"env": os.environ.copy(),
"stdin": subprocess.PIPE,
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"shell": True,
"universal_newlines": True,
"bufsize": 0,
}
@property
def _default_pexpect_kwargs(self):
encoding = "utf-8"
if sys.platform == "win32":
default_encoding = locale.getdefaultlocale()[1]
if default_encoding is not None:
encoding = default_encoding
return {"env": os.environ.copy(), "encoding": encoding, "timeout": self.timeout}
@property
def _uses_subprocess(self):
return isinstance(self.subprocess, subprocess.Popen)
@property
def _uses_pexpect(self):
return isinstance(self.subprocess, PopenSpawn)
@property
def std_out(self):
return self.subprocess.stdout
@property
def ok(self):
return self.return_code == 0
@property
def _pexpect_out(self):
if self.subprocess.encoding:
result = ""
else:
result = b""
if self.subprocess.before:
result += self.subprocess.before
if self.subprocess.after and self.subprocess.after not in (pexpect.EOF, pexpect.TIMEOUT):
try:
result += self.subprocess.after
except (pexpect.EOF, pexpect.TIMEOUT):
pass
result += self.subprocess.read()
return result
@property
def out(self):
"""Std/out output (cached)"""
if self.__out is not None:
return self.__out
if self._uses_subprocess:
self.__out = self.std_out.read()
else:
self.__out = self._pexpect_out
return self.__out
@property
def std_err(self):
return self.subprocess.stderr
@property
def err(self):
"""Std/err output (cached)"""
if self.__err is not None:
return self.__err
if self._uses_subprocess:
self.__err = self.std_err.read()
return self.__err
else:
return self._pexpect_out
@property
def pid(self):
"""The process' PID."""
# Support for pexpect's functionality.
if hasattr(self.subprocess, "proc"):
return self.subprocess.proc.pid
# Standard subprocess method.
return self.subprocess.pid
@property
def is_alive(self):
"""Is the process alive?"""
return pid_exists(self.pid)
@property
def return_code(self):
# Support for pexpect's functionality.
if self._uses_pexpect:
return self.subprocess.exitstatus
# Standard subprocess method.
return self.subprocess.returncode
@property
def std_in(self):
return self.subprocess.stdin
def run(self, block=True, binary=False, cwd=None, env=None):
"""Runs the given command, with or without pexpect functionality enabled."""
self.blocking = block
# Use subprocess.
if self.blocking:
popen_kwargs = self._default_popen_kwargs.copy()
del popen_kwargs["stdin"]
popen_kwargs["universal_newlines"] = not binary
if cwd:
popen_kwargs["cwd"] = cwd
if env:
popen_kwargs["env"].update(env)
s = subprocess.Popen(self._popen_args, **popen_kwargs)
# Otherwise, use pexpect.
else:
pexpect_kwargs = self._default_pexpect_kwargs.copy()
if binary:
pexpect_kwargs["encoding"] = None
if cwd:
pexpect_kwargs["cwd"] = cwd
if env:
pexpect_kwargs["env"].update(env)
# Enable Python subprocesses to work with expect functionality.
pexpect_kwargs["env"]["PYTHONUNBUFFERED"] = "1"
s = PopenSpawn(self._popen_args, **pexpect_kwargs)
self.subprocess = s
self.was_run = True
def expect(self, pattern, timeout=-1):
"""Waits on the given pattern to appear in std_out"""
if self.blocking:
raise RuntimeError("expect can only be used on non-blocking commands.")
try:
self.subprocess.expect(pattern=pattern, timeout=timeout)
except pexpect.EOF:
pass
def send(self, s, end=os.linesep, signal=False):
"""Sends the given string or signal to std_in."""
if self.blocking:
raise RuntimeError("send can only be used on non-blocking commands.")
if not signal:
if self._uses_subprocess:
return self.subprocess.communicate(s + end)
else:
return self.subprocess.send(s + end)
else:
self.subprocess.send_signal(s)
def terminate(self):
self.subprocess.terminate()
def kill(self):
if self._uses_pexpect:
self.subprocess.kill(signal.SIGINT)
else:
self.subprocess.send_signal(signal.SIGINT)
def block(self):
"""Blocks until process is complete."""
if self._uses_subprocess:
# consume stdout and stderr
if self.blocking:
try:
stdout, stderr = self.subprocess.communicate()
self.__out = stdout
self.__err = stderr
except ValueError:
pass # Don't read from finished subprocesses.
else:
self.subprocess.stdin.close()
self.std_out.close()
self.std_err.close()
self.subprocess.wait()
else:
self.subprocess.sendeof()
try:
self.subprocess.wait()
finally:
if self.subprocess.proc.stdout:
self.subprocess.proc.stdout.close()
def pipe(self, command, timeout=None, cwd=None):
"""Runs the current command and passes its output to the next
given process.
"""
if not timeout:
timeout = self.timeout
if not self.was_run:
self.run(block=False, cwd=cwd)
data = self.out
if timeout:
c = Command(command, timeout)
else:
c = Command(command)
c.run(block=False, cwd=cwd)
if data:
c.send(data)
c.block()
return c
def _expand_args(command):
"""Parses command strings and returns a Popen-ready list."""
# Prepare arguments.
if isinstance(command, STR_TYPES):
if sys.version_info[0] == 2:
splitter = shlex.shlex(command.encode("utf-8"))
elif sys.version_info[0] == 3:
splitter = shlex.shlex(command)
else:
splitter = shlex.shlex(command.encode("utf-8"))
splitter.whitespace = "|"
splitter.whitespace_split = True
command = []
while True:
token = splitter.get_token()
if token:
command.append(token)
else:
break
command = list(map(shlex.split, command))
return command
def chain(command, timeout=TIMEOUT, cwd=None, env=None):
commands = _expand_args(command)
data = None
for command in commands:
c = run(command, block=False, timeout=timeout, cwd=cwd, env=env)
if data:
c.send(data)
c.subprocess.sendeof()
data = c.out
return c
def run(command, block=True, binary=False, timeout=TIMEOUT, cwd=None, env=None):
c = Command(command, timeout=timeout)
c.run(block=block, binary=binary, cwd=cwd, env=env)
if block:
c.block()
return c
-21
View File
@@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright 2018 Kenneth Reitz
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
-1
View File
@@ -9,7 +9,6 @@ click-completion==0.5.2
click-didyoumean==0.0.3 click-didyoumean==0.0.3
click==8.0.1 click==8.0.1
colorama==0.4.4 colorama==0.4.4
delegator.py==0.1.1
distlib==0.3.2 distlib==0.3.2
docopt==0.6.2 docopt==0.6.2
dparse==0.5.1 dparse==0.5.1
+11 -13
View File
@@ -2,6 +2,7 @@ import errno
import json import json
import logging import logging
import os import os
from pipenv.utils import subprocess_run
import shutil import shutil
import sys import sys
import warnings import warnings
@@ -13,7 +14,7 @@ import requests
from pipenv._compat import Path from pipenv._compat import Path
from pipenv.exceptions import VirtualenvActivationException from pipenv.exceptions import VirtualenvActivationException
from pipenv.vendor import delegator, toml, tomlkit from pipenv.vendor import toml, tomlkit
from pipenv.vendor.vistir.compat import ( from pipenv.vendor.vistir.compat import (
FileNotFoundError, PermissionError, ResourceWarning, TemporaryDirectory, FileNotFoundError, PermissionError, ResourceWarning, TemporaryDirectory,
fs_encode, fs_str fs_encode, fs_str
@@ -66,8 +67,8 @@ def check_github_ssh():
# GitHub does not provide shell access.' if ssh keys are available and # GitHub does not provide shell access.' if ssh keys are available and
# registered with GitHub. Otherwise, the command will fail with # registered with GitHub. Otherwise, the command will fail with
# return_code=255 and say 'Permission denied (publickey).' # return_code=255 and say 'Permission denied (publickey).'
c = delegator.run('ssh -o StrictHostKeyChecking=no -o CheckHostIP=no -T git@github.com', timeout=30) c = subprocess_run('ssh -o StrictHostKeyChecking=no -o CheckHostIP=no -T git@github.com', timeout=30, shell=True)
res = True if c.return_code == 1 else False res = True if c.returncode == 1 else False
except KeyboardInterrupt: except KeyboardInterrupt:
warnings.warn( warnings.warn(
"KeyboardInterrupt while checking GitHub ssh access", RuntimeWarning "KeyboardInterrupt while checking GitHub ssh access", RuntimeWarning
@@ -87,11 +88,8 @@ def check_github_ssh():
def check_for_mercurial(): def check_for_mercurial():
c = delegator.run("hg --help") c = subprocess_run("hg --help", shell=True)
if c.return_code != 0: return c.returncode == 0
return False
else:
return True
TESTS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) TESTS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -385,8 +383,8 @@ class _PipenvInstance:
with TemporaryDirectory(prefix='pipenv-', suffix='-cache') as tempdir: with TemporaryDirectory(prefix='pipenv-', suffix='-cache') as tempdir:
os.environ['PIPENV_CACHE_DIR'] = fs_str(tempdir.name) os.environ['PIPENV_CACHE_DIR'] = fs_str(tempdir.name)
c = delegator.run( c = subprocess_run(
f'pipenv {cmd}', block=block, f'pipenv {cmd}', block=block, shell=True,
cwd=os.path.abspath(self.path), env=os.environ.copy() cwd=os.path.abspath(self.path), env=os.environ.copy()
) )
if 'PIPENV_CACHE_DIR' in os.environ: if 'PIPENV_CACHE_DIR' in os.environ:
@@ -398,9 +396,9 @@ class _PipenvInstance:
# Pretty output for failing tests. # Pretty output for failing tests.
if block: if block:
print(f'$ pipenv {cmd}') print(f'$ pipenv {cmd}')
print(c.out) print(c.stdout)
print(c.err, file=sys.stderr) print(c.stderr, file=sys.stderr)
if c.return_code != 0: if c.returncode != 0:
print("Command failed...") print("Command failed...")
# Where the action happens. # Where the action happens.
+35 -35
View File
@@ -15,18 +15,18 @@ from pipenv.utils import normalize_drive
def test_pipenv_where(PipenvInstance): def test_pipenv_where(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv("--where") c = p.pipenv("--where")
assert c.ok assert c.returncode == 0
assert normalize_drive(p.path) in c.out assert normalize_drive(p.path) in c.stdout
@pytest.mark.cli @pytest.mark.cli
def test_pipenv_venv(PipenvInstance): def test_pipenv_venv(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv('--python python') c = p.pipenv('--python python')
assert c.ok assert c.returncode == 0
c = p.pipenv('--venv') c = p.pipenv('--venv')
assert c.ok assert c.returncode == 0
venv_path = c.out.strip() venv_path = c.stdout.strip()
assert os.path.isdir(venv_path) assert os.path.isdir(venv_path)
@@ -34,10 +34,10 @@ def test_pipenv_venv(PipenvInstance):
def test_pipenv_py(PipenvInstance): def test_pipenv_py(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv('--python python') c = p.pipenv('--python python')
assert c.ok assert c.returncode == 0
c = p.pipenv('--py') c = p.pipenv('--py')
assert c.ok assert c.returncode == 0
python = c.out.strip() python = c.stdout.strip()
assert os.path.basename(python).startswith('python') assert os.path.basename(python).startswith('python')
@@ -46,12 +46,12 @@ def test_pipenv_site_packages(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv('--python python --site-packages') c = p.pipenv('--python python --site-packages')
assert c.return_code == 0 assert c.return_code == 0
assert 'Making site-packages available' in c.err assert 'Making site-packages available' in c.stderr
# no-global-site-packages.txt under stdlib dir should not exist. # no-global-site-packages.txt under stdlib dir should not exist.
c = p.pipenv('run python -c "import sysconfig; print(sysconfig.get_path(\'stdlib\'))"') c = p.pipenv('run python -c "import sysconfig; print(sysconfig.get_path(\'stdlib\'))"')
assert c.return_code == 0 assert c.return_code == 0
stdlib_path = c.out.strip() stdlib_path = c.stdout.strip()
assert not os.path.isfile(os.path.join(stdlib_path, 'no-global-site-packages.txt')) assert not os.path.isfile(os.path.join(stdlib_path, 'no-global-site-packages.txt'))
@@ -59,23 +59,23 @@ def test_pipenv_site_packages(PipenvInstance):
def test_pipenv_support(PipenvInstance): def test_pipenv_support(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv('--support') c = p.pipenv('--support')
assert c.ok assert c.returncode == 0
assert c.out assert c.stdout
@pytest.mark.cli @pytest.mark.cli
def test_pipenv_rm(PipenvInstance): def test_pipenv_rm(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv('--python python') c = p.pipenv('--python python')
assert c.ok assert c.returncode == 0
c = p.pipenv('--venv') c = p.pipenv('--venv')
assert c.ok assert c.returncode == 0
venv_path = c.out.strip() venv_path = c.stdout.strip()
assert os.path.isdir(venv_path) assert os.path.isdir(venv_path)
c = p.pipenv('--rm') c = p.pipenv('--rm')
assert c.ok assert c.returncode == 0
assert c.out assert c.stdout
assert not os.path.isdir(venv_path) assert not os.path.isdir(venv_path)
@@ -83,7 +83,7 @@ def test_pipenv_rm(PipenvInstance):
def test_pipenv_graph(PipenvInstance): def test_pipenv_graph(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv('install tablib') c = p.pipenv('install tablib')
assert c.ok assert c.returncode == 0
graph = p.pipenv("graph") graph = p.pipenv("graph")
assert graph.ok assert graph.ok
assert "tablib" in graph.out assert "tablib" in graph.out
@@ -99,14 +99,14 @@ def test_pipenv_graph(PipenvInstance):
def test_pipenv_graph_reverse(PipenvInstance): def test_pipenv_graph_reverse(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv('install tablib==0.13.0') c = p.pipenv('install tablib==0.13.0')
assert c.ok assert c.returncode == 0
c = p.pipenv('graph --reverse') c = p.pipenv('graph --reverse')
assert c.ok assert c.returncode == 0
output = c.out output = c.stdout
c = p.pipenv('graph --reverse --json') c = p.pipenv('graph --reverse --json')
assert c.return_code == 1 assert c.return_code == 1
assert 'Warning: Using both --reverse and --json together is not supported.' in c.err assert 'Warning: Using both --reverse and --json together is not supported.' in c.stderr
requests_dependency = [ requests_dependency = [
('backports.csv', 'backports.csv'), ('backports.csv', 'backports.csv'),
@@ -145,14 +145,14 @@ def test_pipenv_check(PipenvInstance):
p.pipenv('install requests==1.0.0') p.pipenv('install requests==1.0.0')
c = p.pipenv('check') c = p.pipenv('check')
assert c.return_code != 0 assert c.return_code != 0
assert 'requests' in c.out assert 'requests' in c.stdout
c = p.pipenv('uninstall requests') c = p.pipenv('uninstall requests')
assert c.ok assert c.returncode == 0
c = p.pipenv('install six') c = p.pipenv('install six')
assert c.ok assert c.returncode == 0
c = p.pipenv('check --ignore 35015') c = p.pipenv('check --ignore 35015')
assert c.return_code == 0 assert c.return_code == 0
assert 'Ignoring' in c.err assert 'Ignoring' in c.stderr
@pytest.mark.cli @pytest.mark.cli
@@ -166,7 +166,7 @@ def test_pipenv_clean_pip_no_warnings(PipenvInstance):
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv('clean') c = p.pipenv('clean')
assert c.return_code == 0 assert c.return_code == 0
assert c.out, f"{c.out} -- STDERR: {c.err}" assert c.stdout, f"{c.stdout} -- STDERR: {c.stderr}"
@pytest.mark.cli @pytest.mark.cli
@@ -180,7 +180,7 @@ def test_pipenv_clean_pip_warnings(PipenvInstance):
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv('clean') c = p.pipenv('clean')
assert c.return_code == 0 assert c.return_code == 0
assert c.err assert c.stderr
@pytest.mark.cli @pytest.mark.cli
@@ -205,8 +205,8 @@ pyver = "which python"
""".strip() """.strip()
f.write(contents) f.write(contents)
c = p.pipenv('scripts') c = p.pipenv('scripts')
assert 'pyver' in c.out assert 'pyver' in c.stdout
assert 'which python' in c.out assert 'which python' in c.stdout
@pytest.mark.cli @pytest.mark.cli
@@ -219,7 +219,7 @@ def test_help(PipenvInstance):
def test_man(PipenvInstance): def test_man(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv('--man') c = p.pipenv('--man')
assert c.return_code == 0 or c.err assert c.return_code == 0, c.stderr
@pytest.mark.cli @pytest.mark.cli
@@ -259,8 +259,8 @@ import flask
assert all(pkg in p.pipfile['packages'] for pkg in ['requests', 'click', 'flask']), p.pipfile["packages"] assert all(pkg in p.pipfile['packages'] for pkg in ['requests', 'click', 'flask']), p.pipfile["packages"]
c = p.pipenv('check --unused .') c = p.pipenv('check --unused .')
assert 'click' not in c.out assert 'click' not in c.stdout
assert 'flask' not in c.out assert 'flask' not in c.stdout
@pytest.mark.cli @pytest.mark.cli
@@ -268,7 +268,7 @@ def test_pipenv_clear(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv('--clear') c = p.pipenv('--clear')
assert c.return_code == 0 assert c.return_code == 0
assert 'Clearing caches' in c.out assert 'Clearing caches' in c.stdout
@pytest.mark.cli @pytest.mark.cli
@@ -276,7 +276,7 @@ def test_pipenv_three(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv('--three') c = p.pipenv('--three')
assert c.return_code == 0 assert c.return_code == 0
assert 'Successfully created virtual environment' in c.err assert 'Successfully created virtual environment' in c.stderr
@pytest.mark.outdated @pytest.mark.outdated
+2 -2
View File
@@ -65,7 +65,7 @@ def test_venv_file(venv_name, PipenvInstance):
c = p.pipenv('--venv') c = p.pipenv('--venv')
assert c.return_code == 0 assert c.return_code == 0
venv_loc = Path(c.out.strip()).absolute() venv_loc = Path(c.stdout.strip()).absolute()
assert venv_loc.exists() assert venv_loc.exists()
assert venv_loc.joinpath('.project').exists() assert venv_loc.joinpath('.project').exists()
venv_path = normalize_drive(venv_loc.as_posix()) venv_path = normalize_drive(venv_loc.as_posix())
@@ -96,7 +96,7 @@ def test_venv_file_with_path(PipenvInstance):
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv('--venv') c = p.pipenv('--venv')
assert c.return_code == 0 assert c.return_code == 0
venv_loc = Path(c.out.strip()) venv_loc = Path(c.stdout.strip())
assert venv_loc.joinpath('.project').exists() assert venv_loc.joinpath('.project').exists()
assert venv_loc == Path(venv_path.name) assert venv_loc == Path(venv_path.name)
+11 -12
View File
@@ -5,8 +5,7 @@ import pytest
from flaky import flaky from flaky import flaky
from pipenv._compat import Path, TemporaryDirectory from pipenv._compat import Path, TemporaryDirectory
from pipenv.utils import temp_environ from pipenv.utils import subprocess_run, temp_environ
from pipenv.vendor import delegator
@pytest.mark.setup @pytest.mark.setup
@@ -288,9 +287,9 @@ def test_requirements_to_pipfile(PipenvInstance, pypi):
c = p.pipenv("install") c = p.pipenv("install")
assert c.return_code == 0 assert c.return_code == 0
print(c.out) print(c.stdout)
print(c.err) print(c.stderr)
print(delegator.run("ls -l").out) print(subprocess_run(["ls", "-l"]).stdout)
# assert stuff in pipfile # assert stuff in pipfile
assert "requests" in p.pipfile["packages"] assert "requests" in p.pipfile["packages"]
@@ -327,7 +326,7 @@ fake_package = "<0.12"
""".strip() """.strip()
f.write(contents) f.write(contents)
c = p.pipenv("install") c = p.pipenv("install")
assert c.ok assert c.returncode == 0
assert "fake_package" in p.pipfile["packages"] assert "fake_package" in p.pipfile["packages"]
assert "fake-package" in p.lockfile["default"] assert "fake-package" in p.lockfile["default"]
assert "six" in p.pipfile["packages"] assert "six" in p.pipfile["packages"]
@@ -384,7 +383,7 @@ def test_editable_no_args(PipenvInstance):
with PipenvInstance() as p: with PipenvInstance() as p:
c = p.pipenv("install -e") c = p.pipenv("install -e")
assert c.return_code != 0 assert c.return_code != 0
assert "Error: Option '-e' requires an argument" in c.err assert "Error: Option '-e' requires an argument" in c.stderr
@pytest.mark.basic @pytest.mark.basic
@@ -405,7 +404,7 @@ def test_install_venv_project_directory(PipenvInstance):
assert c.return_code == 0 assert c.return_code == 0
venv_loc = None venv_loc = None
for line in c.err.splitlines(): for line in c.stderr.splitlines():
if line.startswith("Virtualenv location:"): if line.startswith("Virtualenv location:"):
venv_loc = Path(line.split(":", 1)[-1].strip()) venv_loc = Path(line.split(":", 1)[-1].strip())
assert venv_loc is not None assert venv_loc is not None
@@ -421,8 +420,8 @@ def test_system_and_deploy_work(PipenvInstance):
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv("--rm") c = p.pipenv("--rm")
assert c.return_code == 0 assert c.return_code == 0
c = delegator.run("virtualenv .venv") c = subprocess_run(["virtualenv", ".venv"])
assert c.return_code == 0 assert c.returncode == 0
c = p.pipenv("install --system --deploy") c = p.pipenv("install --system --deploy")
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv("--rm") c = p.pipenv("--rm")
@@ -456,7 +455,7 @@ def test_install_creates_pipfile(PipenvInstance):
def test_install_non_exist_dep(PipenvInstance): def test_install_non_exist_dep(PipenvInstance):
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
c = p.pipenv("install dateutil") c = p.pipenv("install dateutil")
assert not c.ok assert c.returncode
assert "dateutil" not in p.pipfile["packages"] assert "dateutil" not in p.pipfile["packages"]
@@ -465,7 +464,7 @@ def test_install_non_exist_dep(PipenvInstance):
def test_install_package_with_dots(PipenvInstance): def test_install_package_with_dots(PipenvInstance):
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
c = p.pipenv("install backports.html") c = p.pipenv("install backports.html")
assert c.ok assert c.returncode == 0
assert "backports.html" in p.pipfile["packages"] assert "backports.html" in p.pipfile["packages"]
+2 -2
View File
@@ -24,7 +24,7 @@ fake_package = {version = "*", markers="os_name=='splashwear'"}
c = p.pipenv('install') c = p.pipenv('install')
assert c.return_code == 0 assert c.return_code == 0
assert 'Ignoring' in c.out assert 'Ignoring' in c.stdout
assert 'markers' in p.lockfile['default']['fake-package'], p.lockfile["default"] assert 'markers' in p.lockfile['default']['fake-package'], p.lockfile["default"]
c = p.pipenv('run python -c "import fake_package;"') c = p.pipenv('run python -c "import fake_package;"')
@@ -73,7 +73,7 @@ fake-package = {version = "*", os_name = "== 'splashwear'"}
c = p.pipenv('install') c = p.pipenv('install')
assert c.return_code == 0 assert c.return_code == 0
assert 'Ignoring' in c.out assert 'Ignoring' in c.stdout
assert 'markers' in p.lockfile['default']['fake-package'] assert 'markers' in p.lockfile['default']['fake-package']
c = p.pipenv('run python -c "import fake_package;"') c = p.pipenv('run python -c "import fake_package;"')
+3 -4
View File
@@ -8,7 +8,6 @@ from flaky import flaky
from pipenv._compat import Path from pipenv._compat import Path
from pipenv.utils import mkdir_p, temp_environ from pipenv.utils import mkdir_p, temp_environ
from pipenv.vendor import delegator
@pytest.mark.extras @pytest.mark.extras
@@ -369,7 +368,7 @@ def test_multiple_editable_packages_should_not_race(PipenvInstance, testsroot):
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv('run python -c "import requests, flask, six, jinja2"') c = p.pipenv('run python -c "import requests, flask, six, jinja2"')
assert c.return_code == 0, c.err assert c.return_code == 0, c.stderr
@pytest.mark.outdated @pytest.mark.outdated
@@ -380,8 +379,8 @@ def test_outdated_should_compare_postreleases_without_failing(PipenvInstance):
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv("update --outdated") c = p.pipenv("update --outdated")
assert c.return_code == 0 assert c.return_code == 0
assert "Skipped Update" in c.err assert "Skipped Update" in c.stderr
p._pipfile.update("ibm-db-sa-py3", "*") p._pipfile.update("ibm-db-sa-py3", "*")
c = p.pipenv("update --outdated") c = p.pipenv("update --outdated")
assert c.return_code != 0 assert c.return_code != 0
assert "out-of-date" in c.out assert "out-of-date" in c.stdout
+7 -8
View File
@@ -5,9 +5,8 @@ import pytest
from flaky import flaky from flaky import flaky
import delegator
from pipenv._compat import Path from pipenv._compat import Path
from pipenv.utils import subprocess_run
@flaky @flaky
@@ -127,10 +126,10 @@ def test_local_vcs_urls_work(PipenvInstance, tmpdir):
six_dir = tmpdir.join("six") six_dir = tmpdir.join("six")
six_path = Path(six_dir.strpath) six_path = Path(six_dir.strpath)
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
c = delegator.run( c = subprocess_run(
"git clone https://github.com/benjaminp/six.git {0}".format(six_dir.strpath) ["git", "clone", "https://github.com/benjaminp/six.git", six_dir.strpath]
) )
assert c.return_code == 0 assert c.returncode == 0
c = p.pipenv("install git+{0}#egg=six".format(six_path.as_uri())) c = p.pipenv("install git+{0}#egg=six".format(six_path.as_uri()))
assert c.return_code == 0 assert c.return_code == 0
@@ -216,9 +215,9 @@ def test_install_local_vcs_not_in_lockfile(PipenvInstance):
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
# six_path = os.path.join(p.path, "six") # six_path = os.path.join(p.path, "six")
six_path = p._pipfile.get_fixture_path("git/six/").as_posix() six_path = p._pipfile.get_fixture_path("git/six/").as_posix()
c = delegator.run("git clone {0} ./six".format(six_path)) c = subprocess_run(["git", "clone", six_path, "./six"])
assert c.return_code == 0 assert c.returncode == 0
c = p.pipenv("install -e ./six".format(six_path)) c = p.pipenv("install -e ./six")
assert c.return_code == 0 assert c.return_code == 0
six_key = list(p.pipfile["packages"].keys())[0] six_key = list(p.pipfile["packages"].keys())[0]
# we don't need the rest of the test anymore, this just works on its own # we don't need the rest of the test anymore, this just works on its own
+31 -31
View File
@@ -51,7 +51,7 @@ flask = "==0.12.2"
assert d.return_code == 0 assert d.return_code == 0
for req in req_list: for req in req_list:
assert req in c.out assert req in c.stdout
for req in dev_req_list: for req in dev_req_list:
assert req in d.out assert req in d.out
@@ -102,8 +102,8 @@ def test_keep_outdated_doesnt_remove_lockfile_entries(PipenvInstance):
p._pipfile.add("requests", "==2.18.4") p._pipfile.add("requests", "==2.18.4")
p._pipfile.add("colorama", {"version": "*", "markers": "os_name=='FakeOS'"}) p._pipfile.add("colorama", {"version": "*", "markers": "os_name=='FakeOS'"})
c = p.pipenv("install") c = p.pipenv("install")
assert c.ok assert c.returncode == 0
assert "doesn't match your environment, its dependencies won't be resolved." in c.err assert "doesn't match your environment, its dependencies won't be resolved." in c.stderr
p._pipfile.add("six", "*") p._pipfile.add("six", "*")
p.pipenv("lock --keep-outdated") p.pipenv("lock --keep-outdated")
assert "colorama" in p.lockfile["default"] assert "colorama" in p.lockfile["default"]
@@ -115,11 +115,11 @@ def test_resolve_skip_unmatched_requirements(PipenvInstance):
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
p._pipfile.add("missing-package", {"markers": "os_name=='FakeOS'"}) p._pipfile.add("missing-package", {"markers": "os_name=='FakeOS'"})
c = p.pipenv("lock") c = p.pipenv("lock")
assert c.ok assert c.returncode == 0
assert ( assert (
"Could not find a version of missing-package; " "Could not find a version of missing-package; "
"os_name == 'FakeOS' that matches your environment" "os_name == 'FakeOS' that matches your environment"
) in c.err ) in c.stderr
@pytest.mark.lock @pytest.mark.lock
@@ -128,10 +128,10 @@ def test_keep_outdated_doesnt_upgrade_pipfile_pins(PipenvInstance):
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
p._pipfile.add("urllib3", "==1.21.1") p._pipfile.add("urllib3", "==1.21.1")
c = p.pipenv("install") c = p.pipenv("install")
assert c.ok assert c.returncode == 0
p._pipfile.add("requests", "==2.18.4") p._pipfile.add("requests", "==2.18.4")
c = p.pipenv("lock --keep-outdated") c = p.pipenv("lock --keep-outdated")
assert c.ok assert c.returncode == 0
assert "requests" in p.lockfile["default"] assert "requests" in p.lockfile["default"]
assert "urllib3" in p.lockfile["default"] assert "urllib3" in p.lockfile["default"]
assert p.lockfile["default"]["requests"]["version"] == "==2.18.4" assert p.lockfile["default"]["requests"]["version"] == "==2.18.4"
@@ -142,7 +142,7 @@ def test_keep_outdated_doesnt_upgrade_pipfile_pins(PipenvInstance):
def test_keep_outdated_keeps_markers_not_removed(PipenvInstance): def test_keep_outdated_keeps_markers_not_removed(PipenvInstance):
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
c = p.pipenv("install six click") c = p.pipenv("install six click")
assert c.ok assert c.returncode == 0
lockfile = Path(p.lockfile_path) lockfile = Path(p.lockfile_path)
lockfile_content = lockfile.read_text() lockfile_content = lockfile.read_text()
lockfile_json = json.loads(lockfile_content) lockfile_json = json.loads(lockfile_content)
@@ -150,7 +150,7 @@ def test_keep_outdated_keeps_markers_not_removed(PipenvInstance):
lockfile_json["default"]["six"]["markers"] = "python_version >= '2.7'" lockfile_json["default"]["six"]["markers"] = "python_version >= '2.7'"
lockfile.write_text(to_text(json.dumps(lockfile_json))) lockfile.write_text(to_text(json.dumps(lockfile_json)))
c = p.pipenv("lock --keep-outdated") c = p.pipenv("lock --keep-outdated")
assert c.ok assert c.returncode == 0
assert p.lockfile["default"]["six"].get("markers", "") == "python_version >= '2.7'" assert p.lockfile["default"]["six"].get("markers", "") == "python_version >= '2.7'"
@@ -160,17 +160,17 @@ def test_keep_outdated_doesnt_update_satisfied_constraints(PipenvInstance):
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
p._pipfile.add("requests", "==2.18.4") p._pipfile.add("requests", "==2.18.4")
c = p.pipenv("install") c = p.pipenv("install")
assert c.ok assert c.returncode == 0
p._pipfile.add("requests", "*") p._pipfile.add("requests", "*")
assert p.pipfile["packages"]["requests"] == "*" assert p.pipfile["packages"]["requests"] == "*"
c = p.pipenv("lock --keep-outdated") c = p.pipenv("lock --keep-outdated")
assert c.ok assert c.returncode == 0
assert "requests" in p.lockfile["default"] assert "requests" in p.lockfile["default"]
assert "urllib3" in p.lockfile["default"] assert "urllib3" in p.lockfile["default"]
# ensure this didn't update requests # ensure this didn't update requests
assert p.lockfile["default"]["requests"]["version"] == "==2.18.4" assert p.lockfile["default"]["requests"]["version"] == "==2.18.4"
c = p.pipenv("lock") c = p.pipenv("lock")
assert c.ok assert c.returncode == 0
assert p.lockfile["default"]["requests"]["version"] != "==2.18.4" assert p.lockfile["default"]["requests"]["version"] != "==2.18.4"
@@ -270,7 +270,7 @@ requests = {version = "*", extras = ["socks"]}
c = p.pipenv('lock -r') c = p.pipenv('lock -r')
assert c.return_code == 0 assert c.return_code == 0
assert "extra == 'socks'" not in c.out.strip() assert "extra == 'socks'" not in c.stdout.strip()
@pytest.mark.lock @pytest.mark.lock
@@ -354,8 +354,8 @@ requests = "*"
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv('lock -r') c = p.pipenv('lock -r')
assert c.return_code == 0 assert c.return_code == 0
assert '-i https://pypi.org/simple' in c.out.strip() assert '-i https://pypi.org/simple' in c.stdout.strip()
assert '--extra-index-url https://test.pypi.org/simple' in c.out.strip() assert '--extra-index-url https://test.pypi.org/simple' in c.stdout.strip()
@pytest.mark.lock @pytest.mark.lock
@@ -391,9 +391,9 @@ fake-package = "*"
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv(f'lock -r --pypi-mirror {mirror_url}') c = p.pipenv(f'lock -r --pypi-mirror {mirror_url}')
assert c.return_code == 0 assert c.return_code == 0
assert f'-i {mirror_url}' in c.out.strip() assert f'-i {mirror_url}' in c.stdout.strip()
assert '--extra-index-url https://test.pypi.org/simple' in c.out.strip() assert '--extra-index-url https://test.pypi.org/simple' in c.stdout.strip()
assert f'--extra-index-url {mirror_url}' not in c.out.strip() assert f'--extra-index-url {mirror_url}' not in c.stdout.strip()
@pytest.mark.lock @pytest.mark.lock
@@ -418,7 +418,7 @@ def test_outdated_setuptools_with_pep517_legacy_build_meta_is_updated(PipenvInst
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv("run python -c 'import setuptools; print(setuptools.__version__)'") c = p.pipenv("run python -c 'import setuptools; print(setuptools.__version__)'")
assert c.return_code == 0 assert c.return_code == 0
assert c.out.strip() == "40.2.0" assert c.stdout.strip() == "40.2.0"
c = p.pipenv("install legacy-backend-package") c = p.pipenv("install legacy-backend-package")
assert c.return_code == 0 assert c.return_code == 0
assert "vistir" in p.lockfile["default"] assert "vistir" in p.lockfile["default"]
@@ -441,7 +441,7 @@ def test_outdated_setuptools_with_pep517_cython_import_in_setuppy(PipenvInstance
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv("run python -c 'import setuptools; print(setuptools.__version__)'") c = p.pipenv("run python -c 'import setuptools; print(setuptools.__version__)'")
assert c.return_code == 0 assert c.return_code == 0
assert c.out.strip() == "40.2.0" assert c.stdout.strip() == "40.2.0"
c = p.pipenv("install cython-import-package") c = p.pipenv("install cython-import-package")
assert c.return_code == 0 assert c.return_code == 0
assert "vistir" in p.lockfile["default"] assert "vistir" in p.lockfile["default"]
@@ -594,7 +594,7 @@ django = "*"
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv('run python --version') c = p.pipenv('run python --version')
assert c.return_code == 0 assert c.return_code == 0
py_version = c.err.splitlines()[-1].strip().split()[-1] py_version = c.stderr.splitlines()[-1].strip().split()[-1]
django_version = '==2.0.6' if py_version.startswith('3') else '==1.11.13' django_version = '==2.0.6' if py_version.startswith('3') else '==1.11.13'
assert py_version == '2.7.14' assert py_version == '2.7.14'
assert p.lockfile['default']['django']['version'] == django_version assert p.lockfile['default']['django']['version'] == django_version
@@ -608,7 +608,7 @@ def test_lockfile_corrupted(PipenvInstance):
f.write('{corrupted}') f.write('{corrupted}')
c = p.pipenv('install') c = p.pipenv('install')
assert c.return_code == 0 assert c.return_code == 0
assert 'Pipfile.lock is corrupted' in c.err assert 'Pipfile.lock is corrupted' in c.stderr
assert p.lockfile['_meta'] assert p.lockfile['_meta']
@@ -620,7 +620,7 @@ def test_lockfile_with_empty_dict(PipenvInstance):
f.write('{}') f.write('{}')
c = p.pipenv('install') c = p.pipenv('install')
assert c.return_code == 0 assert c.return_code == 0
assert 'Pipfile.lock is corrupted' in c.err assert 'Pipfile.lock is corrupted' in c.stderr
assert p.lockfile['_meta'] assert p.lockfile['_meta']
@@ -653,9 +653,9 @@ def test_lock_no_warnings(PipenvInstance):
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv('run python -c "import warnings; warnings.warn(\\"This is a warning\\", DeprecationWarning); print(\\"hello\\")"') c = p.pipenv('run python -c "import warnings; warnings.warn(\\"This is a warning\\", DeprecationWarning); print(\\"hello\\")"')
assert c.return_code == 0 assert c.return_code == 0
assert "Warning" in c.err assert "Warning" in c.stderr
assert "Warning" not in c.out assert "Warning" not in c.stdout
assert "hello" in c.out assert "hello" in c.stdout
@pytest.mark.lock @pytest.mark.lock
@@ -673,9 +673,9 @@ def test_lock_missing_cache_entries_gets_all_hashes(PipenvInstance, tmpdir):
p._pipfile.add("pathlib2", "*") p._pipfile.add("pathlib2", "*")
assert "pathlib2" in p.pipfile["packages"] assert "pathlib2" in p.pipfile["packages"]
c = p.pipenv("install") c = p.pipenv("install")
assert c.return_code == 0, (c.err, ("\n".join([f"{k}: {v}\n" for k, v in os.environ.items()]))) assert c.return_code == 0, (c.stderr, ("\n".join([f"{k}: {v}\n" for k, v in os.environ.items()])))
c = p.pipenv("lock --clear") c = p.pipenv("lock --clear")
assert c.return_code == 0, c.err assert c.return_code == 0, c.stderr
assert "pathlib2" in p.lockfile["default"] assert "pathlib2" in p.lockfile["default"]
assert "scandir" in p.lockfile["default"] assert "scandir" in p.lockfile["default"]
assert isinstance(p.lockfile["default"]["scandir"]["hashes"], list) assert isinstance(p.lockfile["default"]["scandir"]["hashes"], list)
@@ -765,7 +765,7 @@ def test_lock_nested_vcs_direct_url(PipenvInstance):
def test_lock_package_with_wildcard_version(PipenvInstance): def test_lock_package_with_wildcard_version(PipenvInstance):
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
c = p.pipenv("install 'six==1.11.*'") c = p.pipenv("install 'six==1.11.*'")
assert c.ok assert c.returncode == 0
assert "six" in p.pipfile["packages"] assert "six" in p.pipfile["packages"]
assert p.pipfile["packages"]["six"] == "==1.11.*" assert p.pipfile["packages"]["six"] == "==1.11.*"
assert "six" in p.lockfile["default"] assert "six" in p.lockfile["default"]
@@ -778,8 +778,8 @@ def test_lock_package_with_wildcard_version(PipenvInstance):
def test_default_lock_overwrite_dev_lock(PipenvInstance): def test_default_lock_overwrite_dev_lock(PipenvInstance):
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
c = p.pipenv("install 'click==6.7'") c = p.pipenv("install 'click==6.7'")
assert c.ok assert c.returncode == 0
c = p.pipenv("install -d flask") c = p.pipenv("install -d flask")
assert c.ok assert c.returncode == 0
assert p.lockfile["default"]["click"]["version"] == "==6.7" assert p.lockfile["default"]["click"]["version"] == "==6.7"
assert p.lockfile["develop"]["click"]["version"] == "==6.7" assert p.lockfile["develop"]["click"]["version"] == "==6.7"
+7 -8
View File
@@ -8,8 +8,7 @@ import os
import pytest import pytest
from pipenv.project import Project from pipenv.project import Project
from pipenv.utils import temp_environ from pipenv.utils import subprocess_run, temp_environ
from pipenv.vendor import delegator
@pytest.mark.code @pytest.mark.code
@@ -40,8 +39,8 @@ pytest = "==4.6.9"
f.write(contents) f.write(contents)
c = p.pipenv('install --verbose') c = p.pipenv('install --verbose')
if c.return_code != 0: if c.return_code != 0:
assert c.err == '' or c.err is None assert c.stderr == '' or c.stderr is None
assert c.out == '' assert c.stdout == ''
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv('lock') c = p.pipenv('lock')
assert c.return_code == 0 assert c.return_code == 0
@@ -74,7 +73,7 @@ def test_update_locks(PipenvInstance):
assert p.lockfile['default']['jdcal']['version'] == '==1.4' assert p.lockfile['default']['jdcal']['version'] == '==1.4'
c = p.pipenv('run pip freeze') c = p.pipenv('run pip freeze')
assert c.return_code == 0 assert c.return_code == 0
lines = c.out.splitlines() lines = c.stdout.splitlines()
assert 'jdcal==1.4' in [l.strip() for l in lines] assert 'jdcal==1.4' in [l.strip() for l in lines]
@@ -82,8 +81,8 @@ def test_update_locks(PipenvInstance):
@pytest.mark.proper_names @pytest.mark.proper_names
def test_proper_names_unamanged_virtualenv(PipenvInstance): def test_proper_names_unamanged_virtualenv(PipenvInstance):
with PipenvInstance(chdir=True): with PipenvInstance(chdir=True):
c = delegator.run('python -m virtualenv .venv') c = subprocess_run(['python', '-m', 'virtualenv', '.venv'])
assert c.return_code == 0 assert c.returncode == 0
project = Project() project = Project()
assert project.proper_names == [] assert project.proper_names == []
@@ -98,7 +97,7 @@ def test_directory_with_leading_dash(raw_venv, PipenvInstance):
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv('--venv') c = p.pipenv('--venv')
assert c.return_code == 0 assert c.return_code == 0
venv_path = c.out.strip() venv_path = c.stdout.strip()
assert os.path.isdir(venv_path) assert os.path.isdir(venv_path)
# Manually clean up environment, since PipenvInstance assumes that # Manually clean up environment, since PipenvInstance assumes that
# the virutalenv is in the project directory. # the virutalenv is in the project directory.
+23 -25
View File
@@ -1,4 +1,3 @@
import io
import os import os
import tarfile import tarfile
@@ -6,9 +5,8 @@ import pytest
from pipenv.patched import pipfile from pipenv.patched import pipfile
from pipenv.project import Project from pipenv.project import Project
from pipenv.utils import temp_environ from pipenv.utils import subprocess_run, temp_environ
from pipenv.vendor.vistir.path import is_in_path, normalize_path from pipenv.vendor.vistir.path import is_in_path, normalize_path
from pipenv.vendor.delegator import run as delegator_run
@pytest.mark.project @pytest.mark.project
@@ -172,40 +170,40 @@ def test_include_editable_packages(PipenvInstance, testsroot, pathlib_tmpdir):
@pytest.mark.virtualenv @pytest.mark.virtualenv
def test_run_in_virtualenv_with_global_context(PipenvInstance, virtualenv): def test_run_in_virtualenv_with_global_context(PipenvInstance, virtualenv):
with PipenvInstance(chdir=True, venv_root=virtualenv.as_posix(), ignore_virtualenvs=False, venv_in_project=False) as p: with PipenvInstance(chdir=True, venv_root=virtualenv.as_posix(), ignore_virtualenvs=False, venv_in_project=False) as p:
c = delegator_run( c = subprocess_run(
"pipenv run pip freeze", cwd=os.path.abspath(p.path), ["pipenv", "run", "pip", "freeze"], cwd=os.path.abspath(p.path),
env=os.environ.copy() env=os.environ.copy()
) )
assert c.return_code == 0, (c.out, c.err) assert c.returncode == 0, (c.stdout, c.stderr)
assert 'Creating a virtualenv' not in c.err, c.err assert 'Creating a virtualenv' not in c.stderr, c.stderr
project = Project() project = Project()
assert project.virtualenv_location == virtualenv.as_posix(), ( assert project.virtualenv_location == virtualenv.as_posix(), (
project.virtualenv_location, virtualenv.as_posix() project.virtualenv_location, virtualenv.as_posix()
) )
c = delegator_run( c = subprocess_run(
f"pipenv run pip install -i {p.index_url} click", ["pipenv", "run", "pip", "install", "-i", p.index_url, "click"],
cwd=os.path.abspath(p.path), cwd=os.path.abspath(p.path),
env=os.environ.copy() env=os.environ.copy()
) )
assert c.return_code == 0, (c.out, c.err) assert c.returncode == 0, (c.stdout, c.stderr)
assert "Courtesy Notice" in c.err, (c.out, c.err) assert "Courtesy Notice" in c.stderr, (c.stdout, c.stderr)
c = delegator_run( c = subprocess_run(
f"pipenv install -i {p.index_url} six", ["pipenv", "install", "-i", p.index_url, "six"],
cwd=os.path.abspath(p.path), env=os.environ.copy() cwd=os.path.abspath(p.path), env=os.environ.copy()
) )
assert c.return_code == 0, (c.out, c.err) assert c.returncode == 0, (c.stdout, c.stderr)
c = delegator_run( c = subprocess_run(
'pipenv run python -c "import click;print(click.__file__)"', ['pipenv', 'run', 'python', '-c', 'import click;print(click.__file__)'],
cwd=os.path.abspath(p.path), env=os.environ.copy() cwd=os.path.abspath(p.path), env=os.environ.copy()
) )
assert c.return_code == 0, (c.out, c.err) assert c.returncode == 0, (c.stdout, c.stderr)
assert is_in_path(c.out.strip(), str(virtualenv)), (c.out.strip(), str(virtualenv)) assert is_in_path(c.stdout.strip(), str(virtualenv)), (c.stdout.strip(), str(virtualenv))
c = delegator_run( c = subprocess_run(
"pipenv clean --dry-run", cwd=os.path.abspath(p.path), ["pipenv", "clean", "--dry-run"], cwd=os.path.abspath(p.path),
env=os.environ.copy() env=os.environ.copy()
) )
assert c.return_code == 0, (c.out, c.err) assert c.returncode == 0, (c.stdout, c.stderr)
assert "click" in c.out, c.out assert "click" in c.stdout, c.stdout
@pytest.mark.project @pytest.mark.project
@@ -214,7 +212,7 @@ def test_run_in_virtualenv(PipenvInstance):
with PipenvInstance(chdir=True) as p: with PipenvInstance(chdir=True) as p:
c = p.pipenv('run pip freeze') c = p.pipenv('run pip freeze')
assert c.return_code == 0 assert c.return_code == 0
assert 'Creating a virtualenv' in c.err assert 'Creating a virtualenv' in c.stderr
project = Project() project = Project()
c = p.pipenv("run pip install click") c = p.pipenv("run pip install click")
assert c.return_code == 0 assert c.return_code == 0
@@ -222,12 +220,12 @@ def test_run_in_virtualenv(PipenvInstance):
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv('run python -c "import click;print(click.__file__)"') c = p.pipenv('run python -c "import click;print(click.__file__)"')
assert c.return_code == 0 assert c.return_code == 0
assert normalize_path(c.out.strip()).startswith( assert normalize_path(c.stdout.strip()).startswith(
normalize_path(str(project.virtualenv_location)) normalize_path(str(project.virtualenv_location))
) )
c = p.pipenv("clean --dry-run") c = p.pipenv("clean --dry-run")
assert c.return_code == 0 assert c.return_code == 0
assert "click" in c.out assert "click" in c.stdout
@pytest.mark.project @pytest.mark.project
@pytest.mark.sources @pytest.mark.sources
+8 -8
View File
@@ -15,7 +15,7 @@ def test_env(PipenvInstance):
c = p.pipenv('run python -c "import os; print(os.environ[\'HELLO\'])"') c = p.pipenv('run python -c "import os; print(os.environ[\'HELLO\'])"')
assert c.return_code == 0 assert c.return_code == 0
assert 'WORLD' in c.out assert 'WORLD' in c.stdout
@pytest.mark.run @pytest.mark.run
@@ -38,15 +38,15 @@ multicommand = "bash -c \"cd docs && make html\""
c = p.pipenv('run printfoo') c = p.pipenv('run printfoo')
assert c.return_code == 0 assert c.return_code == 0
assert c.out == 'foo\n' assert c.stdout == 'foo\n'
assert c.err == '' assert c.stderr == ''
c = p.pipenv('run notfoundscript') c = p.pipenv('run notfoundscript')
assert c.return_code == 1 assert c.return_code == 1
assert c.out == '' assert c.stdout == ''
if os.name != 'nt': # TODO: Implement this message for Windows. if os.name != 'nt': # TODO: Implement this message for Windows.
assert 'Error' in c.err assert 'Error' in c.stderr
assert 'randomthingtotally (from notfoundscript)' in c.err assert 'randomthingtotally (from notfoundscript)' in c.stderr
project = Project() project = Project()
@@ -61,6 +61,6 @@ multicommand = "bash -c \"cd docs && make html\""
with temp_environ(): with temp_environ():
os.environ['HELLO'] = 'WORLD' os.environ['HELLO'] = 'WORLD'
c = p.pipenv("run scriptwithenv") c = p.pipenv("run scriptwithenv")
assert c.ok assert c.returncode == 0
if os.name != "nt": # This doesn't work on CI windows. if os.name != "nt": # This doesn't work on CI windows.
assert c.out.strip() == "WORLD" assert c.stdout.strip() == "WORLD"
+2 -2
View File
@@ -17,7 +17,7 @@ def test_sync_error_without_lockfile(PipenvInstance):
c = p.pipenv('sync') c = p.pipenv('sync')
assert c.return_code != 0 assert c.return_code != 0
assert 'Pipfile.lock not found!' in c.err assert 'Pipfile.lock not found!' in c.stderr
@pytest.mark.sync @pytest.mark.sync
@@ -110,4 +110,4 @@ requests = "*"
c = p.pipenv('sync --sequential --verbose') c = p.pipenv('sync --sequential --verbose')
for package in p.lockfile['default']: for package in p.lockfile['default']:
assert f'Successfully installed {package}' in c.out assert f'Successfully installed {package}' in c.stdout
+2 -2
View File
@@ -105,7 +105,7 @@ def test_uninstall_all_local_files(PipenvInstance, testsroot):
assert c.return_code == 0 assert c.return_code == 0
c = p.pipenv("uninstall --all") c = p.pipenv("uninstall --all")
assert c.return_code == 0 assert c.return_code == 0
assert "tablib" in c.out assert "tablib" in c.stdout
# Uninstall --all is not supposed to remove things from the pipfile # Uninstall --all is not supposed to remove things from the pipfile
# Note that it didn't before, but that instead local filenames showed as hashes # Note that it didn't before, but that instead local filenames showed as hashes
assert "tablib" in p.pipfile["packages"] assert "tablib" in p.pipfile["packages"]
@@ -193,4 +193,4 @@ def test_uninstall_missing_parameters(PipenvInstance):
c = p.pipenv("uninstall") c = p.pipenv("uninstall")
assert c.return_code != 0 assert c.return_code != 0
assert "No package provided!" in c.err assert "No package provided!" in c.stderr
+3 -3
View File
@@ -20,7 +20,7 @@ def test_case_changes_windows(PipenvInstance):
# Canonical venv location. # Canonical venv location.
c = p.pipenv('--venv') c = p.pipenv('--venv')
assert c.return_code == 0 assert c.return_code == 0
virtualenv_location = c.out.strip() virtualenv_location = c.stdout.strip()
# Dance around to change the casing of the project directory. # Dance around to change the casing of the project directory.
target = p.path.upper() target = p.path.upper()
@@ -33,7 +33,7 @@ def test_case_changes_windows(PipenvInstance):
# Ensure the incorrectly-cased project can find the correct venv. # Ensure the incorrectly-cased project can find the correct venv.
c = p.pipenv('--venv') c = p.pipenv('--venv')
assert c.return_code == 0 assert c.return_code == 0
assert c.out.strip().lower() == virtualenv_location.lower() assert c.stdout.strip().lower() == virtualenv_location.lower()
@pytest.mark.files @pytest.mark.files
@@ -78,4 +78,4 @@ def test_pipenv_clean_windows(PipenvInstance):
c = p.pipenv('clean --dry-run') c = p.pipenv('clean --dry-run')
assert c.return_code == 0 assert c.return_code == 0
assert 'click' in c.out.strip() assert 'click' in c.stdout.strip()