Try switching from delegator to subprocess

This commit is contained in:
Frost Ming
2021-07-28 16:39:36 +08:00
parent 0255cc6839
commit a82bbb7b27
22 changed files with 305 additions and 703 deletions
+8 -9
View File
@@ -1,4 +1,5 @@
import os
from pipenv.utils import subprocess_run
import sys
from click import (
@@ -9,7 +10,7 @@ from ..__version__ import __version__
from .._compat import fix_utf8
from ..exceptions import PipenvOptionsError
from ..patched import crayons
from ..vendor import click_completion, delegator
from ..vendor import click_completion
from .options import (
CONTEXT_SETTINGS, PipenvGroup, code_option, common_options, deploy_option,
general_options, install_options, lock_options, pass_state,
@@ -641,18 +642,16 @@ def run_open(state, module, *args, **kwargs):
three=state.three, python=state.python,
validate=False, pypi_mirror=state.pypi_mirror,
)
c = delegator.run(
'{0} -c "import {1}; print({1}.__file__);"'.format(which("python"), module)
c = subprocess_run(
which("python"), "-c", "import {0}; print({0}.__file__)".format(module)
)
try:
assert c.return_code == 0
except AssertionError:
if c.returncode:
echo(crayons.red("Module not found!"))
sys.exit(1)
if "__init__.py" in c.out:
p = os.path.dirname(c.out.strip().rstrip("cdo"))
if "__init__.py" in c.stdout:
p = os.path.dirname(c.stdout.strip().rstrip("cdo"))
else:
p = c.out.strip().rstrip("cdo")
p = c.stdout.strip().rstrip("cdo")
echo(crayons.normal(f"Opening {p!r} in your EDITOR.", bold=True))
inline_activate_virtual_environment()
edit(filename=p)
+4
View File
@@ -43,6 +43,10 @@ class Script(object):
def args(self):
return self._parts[1:]
@property
def cmd_args(self):
return self._parts
def extend(self, extra_args):
self._parts.extend(extra_args)
+73 -75
View File
@@ -8,32 +8,30 @@ import warnings
import click
import delegator
import dotenv
import pipfile
import vistir
from click_completion import init as init_completion
from . import environments, exceptions, pep508checker, progress
from ._compat import decode_for_output, fix_utf8
from .cmdparse import Script
from .environments import (
from pipenv import environments, exceptions, pep508checker, progress
from pipenv._compat import decode_for_output, fix_utf8
from pipenv.environments import (
PIP_EXISTS_ACTION, PIPENV_CACHE_DIR, PIPENV_COLORBLIND,
PIPENV_DEFAULT_PYTHON_VERSION, PIPENV_DONT_USE_PYENV, PIPENV_DONT_USE_ASDF,
PIPENV_HIDE_EMOJIS, PIPENV_MAX_SUBPROCESS, PIPENV_PYUP_API_KEY,
PIPENV_RESOLVE_VCS, PIPENV_SHELL_FANCY, PIPENV_SKIP_VALIDATION, PIPENV_YES,
SESSION_IS_INTERACTIVE, is_type_checking
)
from .patched import crayons
from .project import Project
from .utils import (
from pipenv.patched import crayons
from pipenv.project import Project
from pipenv.utils import (
convert_deps_to_pip, create_spinner, download_file,
escape_grouped_arguments, find_python, find_windows_executable,
get_canonical_names, get_source_list, interrupt_handled_subprocess,
is_pinned, is_python_command, is_required_version, is_star, is_valid_url,
parse_indexes, pep423_name, prepare_pip_source_args, proper_case,
python_version, run_command, venv_resolve_deps
python_version, run_command, subprocess_run, venv_resolve_deps
)
@@ -454,7 +452,7 @@ def ensure_python(three=None, python=None):
else:
sp.ok(environments.PIPENV_SPINNER_OK_TEXT.format("Success!"))
# Print the results, in a beautiful blue...
click.echo(crayons.cyan(c.out), err=True)
click.echo(crayons.cyan(c.stdout), err=True)
# Clear the pythonfinder caches
from .vendor.pythonfinder import Finder
finder = Finder(system=False, global_search=True)
@@ -661,21 +659,21 @@ def do_where(virtualenv=False, bare=True):
def _cleanup_procs(procs, failed_deps_queue, retry=True):
while not procs.empty():
c = procs.get()
if not c.blocking:
c.block()
c.wait()
failed = False
if c.return_code != 0:
if c.returncode != 0:
failed = True
if "Ignoring" in c.out:
click.echo(crayons.yellow(c.out.strip()))
out, err = c.communicate()
if "Ignoring" in out:
click.echo(crayons.yellow(out.strip()))
elif environments.is_verbose():
click.echo(crayons.cyan(c.out.strip() or c.err.strip()))
click.echo(crayons.cyan(out.strip() or err.strip()))
# The Installation failed...
if failed:
# If there is a mismatch in installed locations or the install fails
# due to wrongful disabling of pep517, we should allow for
# additional passes at installation
if "does not match installed location" in c.err:
if "does not match installed location" in err:
project.environment.expand_egg_links()
click.echo("{}".format(
crayons.yellow(
@@ -686,14 +684,14 @@ def _cleanup_procs(procs, failed_deps_queue, retry=True):
))
dep = c.dep.copy()
dep.use_pep517 = True
elif "Disabling PEP 517 processing is invalid" in c.err:
elif "Disabling PEP 517 processing is invalid" in err:
dep = c.dep.copy()
dep.use_pep517 = True
elif not retry:
# The Installation failed...
# We echo both c.out and c.err because pip returns error details on out.
err = c.err.strip().splitlines() if c.err else []
out = c.out.strip().splitlines() if c.out else []
# We echo both c.stdout and c.stderr because pip returns error details on out.
err = err.strip().splitlines() if err else []
out = out.strip().splitlines() if out else []
err_lines = [line for message in [out, err] for line in message]
# Return the subprocess' return code.
raise exceptions.InstallError(c.dep.name, extra=err_lines)
@@ -785,7 +783,7 @@ def batch_install(deps_list, procs, failed_deps_queue,
# if dep.is_vcs or dep.editable:
is_sequential = sequential_deps and dep.name in sequential_dep_names
if is_sequential:
c.block()
c.wait()
procs.put(c)
if procs.full() or procs.qsize() == len(deps_list) or is_sequential:
@@ -1022,12 +1020,13 @@ def get_downloads_info(names_map, section):
# Get the version info from the filenames.
version = parse_download_fname(fname, name)
# Get the hash of each file.
cmd = '{} hash "{}"'.format(
cmd = [
escape_grouped_arguments(which_pip()),
"hash",
os.sep.join([project.download_location, fname]),
)
c = delegator.run(cmd)
hash = c.out.split("--hash=")[1].strip()
]
c = subprocess_run(cmd)
hash = c.stdout.split("--hash=")[1].strip()
# Verify we're adding the correct version from Pipfile
# and not one from a dependency.
specified_version = p[section].get(name, "")
@@ -1177,17 +1176,18 @@ def do_purge(bare=False, downloads=False, allow_global=False):
fix_utf8(f"Found {len(to_remove)} installed package(s), purging...")
)
command = "{} uninstall {} -y".format(
command = [
escape_grouped_arguments(which_pip(allow_global=allow_global)),
"uninstall", "-y",
" ".join(to_remove),
)
]
if environments.is_verbose():
click.echo(f"$ {command}")
c = delegator.run(command)
if c.return_code != 0:
raise exceptions.UninstallError(installed, command, c.out + c.err, c.return_code)
click.echo(f"$ {' '.join(command)}")
c = subprocess_run(command)
if c.returncode != 0:
raise exceptions.UninstallError(installed, ' '.join(command), c.stdout + c.stderr, c.returncode)
if not bare:
click.echo(crayons.cyan(c.out))
click.echo(crayons.cyan(c.stdout))
click.echo(crayons.green("Environment now purged and fresh!"))
return installed
@@ -1485,7 +1485,7 @@ def pip_install(
err=True,
)
pip_command = [which_pip(allow_global=allow_global), "install"]
pip_command = [which("python", allow_global=allow_global), "-m", "pip", "install"]
pip_args = get_pip_args(
pre=pre, verbose=environments.is_verbose(), upgrade=True,
selective_upgrade=selective_upgrade, no_use_pep517=not use_pep517,
@@ -1498,7 +1498,7 @@ def pip_install(
pip_command.extend(line)
pip_command.extend(prepare_pip_source_args(sources))
if environments.is_verbose():
click.echo(f"$ {pip_command}", err=True)
click.echo(f"$ {' '.join(pip_command)}", err=True)
cache_dir = vistir.compat.Path(PIPENV_CACHE_DIR)
DEFAULT_EXISTS_ACTION = "w"
if selective_upgrade:
@@ -1519,10 +1519,7 @@ def pip_install(
pip_config.update(
{"PIP_SRC": vistir.misc.fs_str(src_dir)}
)
cmd = Script.parse(pip_command)
pip_command = cmd.cmdify()
c = None
c = delegator.run(pip_command, block=block, env=pip_config)
c = subprocess_run(pip_command, block=block, env=pip_config)
c.env = pip_config
return c
@@ -1537,14 +1534,15 @@ def pip_download(package_name):
),
}
for source in project.sources:
cmd = '{} download "{}" -i {} -d {}'.format(
cmd = [
escape_grouped_arguments(which_pip()),
"download",
package_name,
source["url"],
project.download_location,
)
c = delegator.run(cmd, env=pip_config)
if c.return_code == 0:
"-i", source["url"],
"-d", project.download_location,
]
c = subprocess_run(cmd, env=pip_config)
if c.returncode == 0:
break
return c
@@ -1616,10 +1614,10 @@ def system_which(command, mult=False):
})
result = None
try:
c = delegator.run(f"{_which} {command}")
c = subprocess_run([_which, command])
try:
# Which Not found...
if c.return_code == 127:
if c.returncode == 127:
click.echo(
"{}: the {} system utility is required for Pipenv to find Python installations properly."
"\n Please install it.".format(
@@ -1627,7 +1625,7 @@ def system_which(command, mult=False):
),
err=True,
)
assert c.return_code == 0
assert c.returncode == 0
except AssertionError:
result = fallback_which(command, allow_global=True)
except TypeError:
@@ -2116,19 +2114,19 @@ def do_install(
extra_indexes=extra_index_url,
pypi_mirror=pypi_mirror,
)
if not c.ok:
if c.returncode:
sp.write_err(
"{} An error occurred while installing {}!".format(
crayons.red("Error: ", bold=True), crayons.green(pkg_line)
),
)
sp.write_err(
vistir.compat.fs_str(f"Error text: {c.out}")
vistir.compat.fs_str(f"Error text: {c.stdout}")
)
sp.write_err(crayons.cyan(vistir.compat.fs_str(format_pip_error(c.err))))
sp.write_err(crayons.cyan(vistir.compat.fs_str(format_pip_error(c.stderr))))
if environments.is_verbose():
sp.write_err(crayons.cyan(vistir.compat.fs_str(format_pip_output(c.out))))
if "setup.py egg_info" in c.err:
sp.write_err(crayons.cyan(vistir.compat.fs_str(format_pip_output(c.stdout))))
if "setup.py egg_info" in c.stderr:
sp.write_err(vistir.compat.fs_str(
"This is likely caused by a bug in {}. "
"Report this to its maintainers.".format(
@@ -2301,8 +2299,8 @@ def do_uninstall(
pip_path = which_pip(allow_global=system)
cmd = [pip_path, "uninstall", package_name, "-y"]
c = run_command(cmd)
click.echo(crayons.cyan(c.out))
if c.return_code != 0:
click.echo(crayons.cyan(c.stdout))
if c.returncode != 0:
failure = True
if not failure and pipfile_remove:
in_packages = project.get_package_name_in_pipfile(package_name, dev=False)
@@ -2611,14 +2609,14 @@ def do_check(
# Run the PEP 508 checker in the virtualenv.
cmd = _cmd + [vistir.compat.Path(pep508checker_path).as_posix()]
c = run_command(cmd)
if c.return_code is not None:
if c.returncode is not None:
try:
results = simplejson.loads(c.out.strip())
results = simplejson.loads(c.stdout.strip())
except JSONDecodeError:
click.echo("{}\n{}\n{}".format(
crayons.white(decode_for_output("Failed parsing pep508 results: "), bold=True),
c.out.strip(),
c.err.strip()
c.stdout.strip(),
c.stderr.strip()
))
sys.exit(1)
# Load the pipfile.
@@ -2681,11 +2679,11 @@ def do_check(
c = run_command(cmd, catch_exceptions=False)
if output == "default":
try:
results = simplejson.loads(c.out)
results = simplejson.loads(c.stdout)
except (ValueError, JSONDecodeError):
raise exceptions.JSONParseError(c.out, c.err)
raise exceptions.JSONParseError(c.stdout, c.stderr)
except Exception:
raise exceptions.PipenvCmdError(c.cmd, c.out, c.err, c.return_code)
raise exceptions.PipenvCmdError(' '.join(c.args), c.stdout, c.stderr, c.returncode)
for (package, resolved, installed, description, vuln) in results:
click.echo(
"{}: {} {} resolved ({} installed)!".format(
@@ -2697,14 +2695,14 @@ def do_check(
)
click.echo(f"{description}")
click.echo()
if c.ok:
if c.returncode == 0:
click.echo(crayons.green("All good!"))
sys.exit(0)
else:
sys.exit(1)
else:
click.echo(c.out)
sys.exit(c.return_code)
click.echo(c.stdout)
sys.exit(c.returncode)
def do_graph(bare=False, json=False, json_tree=False, reverse=False):
@@ -2786,9 +2784,9 @@ def do_graph(bare=False, json=False, json_tree=False, reverse=False):
if json:
data = []
try:
parsed = simplejson.loads(c.out.strip())
parsed = simplejson.loads(c.stdout.strip())
except JSONDecodeError:
raise exceptions.JSONParseError(c.out, c.err)
raise exceptions.JSONParseError(c.stdout, c.stderr)
else:
for d in parsed:
if d["package"]["key"] not in BAD_PACKAGES:
@@ -2809,15 +2807,15 @@ def do_graph(bare=False, json=False, json_tree=False, reverse=False):
return obj
try:
parsed = simplejson.loads(c.out.strip())
parsed = simplejson.loads(c.stdout.strip())
except JSONDecodeError:
raise exceptions.JSONParseError(c.out, c.err)
raise exceptions.JSONParseError(c.stdout, c.stderr)
else:
data = traverse(parsed)
click.echo(simplejson.dumps(data, indent=4))
sys.exit(0)
else:
for line in c.out.strip().split("\n"):
for line in c.stdout.strip().split("\n"):
# Ignore bad packages as top level.
# TODO: This should probably be a "==" in + line.partition
if line.split("==")[0] in BAD_PACKAGES and not reverse:
@@ -2830,17 +2828,17 @@ def do_graph(bare=False, json=False, json_tree=False, reverse=False):
else:
click.echo(crayons.normal(line, bold=False))
else:
click.echo(c.out)
if c.return_code != 0:
click.echo(c.stdout)
if c.returncode != 0:
click.echo(
"{} {}".format(
crayons.red("ERROR: ", bold=True),
crayons.white(f"{c.err}"),
crayons.white(f"{c.stderr}"),
),
err=True,
)
# Return its return code.
sys.exit(c.return_code)
sys.exit(c.returncode)
def do_sync(
@@ -2928,6 +2926,6 @@ def do_clean(
# Uninstall the package.
cmd = [which_pip(), "uninstall", apparent_bad_package, "-y"]
c = run_command(cmd)
if c.return_code != 0:
if c.returncode != 0:
failure = True
sys.exit(int(failure))
+9 -10
View File
@@ -4,9 +4,9 @@ import re
from abc import ABCMeta, abstractmethod
from .environments import PIPENV_INSTALL_TIMEOUT
from .vendor import attr, delegator
from .utils import find_windows_executable
from pipenv.environments import PIPENV_INSTALL_TIMEOUT
from pipenv.vendor import attr
from pipenv.utils import find_windows_executable, subprocess_run
@attr.s
@@ -59,8 +59,8 @@ class InstallerNotFound(RuntimeError):
class InstallerError(RuntimeError):
def __init__(self, desc, c):
super().__init__(desc)
self.out = c.out
self.err = c.err
self.out = c.stdout
self.err = c.stderr
class Installer(metaclass=ABCMeta):
@@ -114,14 +114,13 @@ class Installer(metaclass=ABCMeta):
raise InstallerNotFound()
def _run(self, *args, **kwargs):
timeout = kwargs.pop('timeout', delegator.TIMEOUT)
timeout = kwargs.pop('timeout', 30)
if kwargs:
k = list(kwargs.keys())[0]
raise TypeError(f'unexpected keyword argument {k!r}')
args = (self.cmd,) + tuple(args)
c = delegator.run(args, block=False, timeout=timeout)
c.block()
if c.return_code != 0:
c = subprocess_run(args, timeout=timeout)
if c.returncode != 0:
raise InstallerError(f'failed to run {args}', c)
return c
@@ -201,7 +200,7 @@ class Asdf(Installer):
def iter_installable_versions(self):
"""Iterate through CPython versions available for asdf to install.
"""
for name in self._run('list-all', 'python').out.splitlines():
for name in self._run('list-all', 'python').stdout.splitlines():
try:
version = Version.parse(name.strip())
except ValueError:
+64 -91
View File
@@ -1,3 +1,4 @@
import subprocess
import contextlib
import errno
import logging
@@ -21,6 +22,7 @@ import parse
import tomlkit
from . import environments
from ._compat import DEFAULT_ENCODING
from .exceptions import PipenvCmdError, PipenvUsageError, RequirementError, ResolutionFailure
from .pep508checker import lookup
from .vendor.packaging.markers import Marker
@@ -129,7 +131,6 @@ def run_command(cmd, *args, **kwargs):
:raises: exceptions.PipenvCmdError
"""
from pipenv.vendor import delegator
from ._compat import decode_for_output
from .cmdparse import Script
catch_exceptions = kwargs.pop("catch_exceptions", True)
@@ -140,21 +141,16 @@ def run_command(cmd, *args, **kwargs):
if "env" not in kwargs:
kwargs["env"] = os.environ.copy()
kwargs["env"]["PYTHONIOENCODING"] = "UTF-8"
try:
cmd_string = cmd.cmdify()
except TypeError:
click_echo(f"Error turning command into string: {cmd}", err=True)
sys.exit(1)
command = [cmd.command, *cmd.args]
if environments.is_verbose():
click_echo(f"Running command: $ {cmd_string}")
c = delegator.run(cmd_string, *args, **kwargs)
return_code = c.return_code
click_echo(f"Running command: $ {cmd.cmdify()}")
c = subprocess_run(command, *args, **kwargs)
if environments.is_verbose():
click_echo("Command output: {}".format(
crayons.cyan(decode_for_output(c.out))
crayons.cyan(decode_for_output(c.stdout))
), err=True)
if not c.ok and catch_exceptions:
raise PipenvCmdError(cmd_string, c.out, c.err, return_code)
if c.returncode and catch_exceptions:
raise PipenvCmdError(cmd.cmdify(), c.stdout, c.stderr, c.returncode)
return c
@@ -1131,50 +1127,30 @@ def create_spinner(text, nospin=None, spinner_name=None):
def resolve(cmd, sp):
from .cmdparse import Script
from .vendor import delegator
from .vendor.pexpect.exceptions import EOF, TIMEOUT
from .vendor.vistir.compat import to_native_string
from .vendor.vistir.misc import echo
EOF.__module__ = "pexpect.exceptions"
from ._compat import decode_output
c = delegator.run(Script.parse(cmd).cmdify(), block=False, env=os.environ.copy())
if environments.is_verbose():
c.subprocess.logfile = sys.stderr
_out = decode_output("")
result = None
out = to_native_string("")
while True:
result = None
try:
result = c.expect("\n", timeout=environments.PIPENV_INSTALL_TIMEOUT)
except TIMEOUT:
pass
except EOF:
break
except KeyboardInterrupt:
c.kill()
break
if result:
_out = c.subprocess.before
_out = decode_output(f"{_out}")
out += _out
# sp.text = to_native_string("{0}".format(_out[:100]))
if environments.is_verbose():
sp.hide_and_write(out.splitlines()[-1].rstrip())
else:
break
c.block()
if c.return_code != 0:
c = subprocess_run(Script.parse(cmd).cmd_args, block=False, env=os.environ.copy())
err = ""
for line in iter(c.stderr.readline, ""):
line = decode_output(line)
err += line
if environments.is_verbose() and line.rstrip():
sp.hide_and_write(line.rstrip())
c.wait()
returncode = c.poll()
out = c.stdout.read()
if returncode != 0:
sp.red.fail(environments.PIPENV_SPINNER_FAIL_TEXT.format(
"Locking Failed!"
))
echo(c.out.strip(), err=True)
echo(out.strip(), err=True)
if not environments.is_verbose():
echo(out, err=True)
sys.exit(c.return_code)
echo(err, err=True)
sys.exit(returncode)
if environments.is_verbose():
echo(c.err.strip(), err=True)
return c
echo(out.strip(), err=True)
return subprocess.CompletedProcess(c.args, returncode, out, err)
def get_locked_dep(dep, pipfile_section, prefer_pipfile=True):
@@ -1334,21 +1310,21 @@ def venv_resolve_deps(
os.environ["PIPENV_PACKAGES"] = str("\n".join(constraints))
sp.write(decode_for_output("Resolving dependencies..."))
c = resolve(cmd, sp)
results = c.out.strip()
if c.ok:
results = c.stdout.strip()
if c.returncode == 0:
sp.green.ok(environments.PIPENV_SPINNER_OK_TEXT.format("Success!"))
if not environments.is_verbose() and c.out.strip():
click_echo(crayons.yellow(f"Warning: {c.out.strip()}"), err=True)
if not environments.is_verbose() and c.stdout.strip():
click_echo(crayons.yellow(f"Warning: {c.stdout.strip()}"), err=True)
else:
sp.red.fail(environments.PIPENV_SPINNER_FAIL_TEXT.format("Locking Failed!"))
click_echo(f"Output: {c.out.strip()}", err=True)
click_echo(f"Error: {c.err.strip()}", err=True)
click_echo(f"Output: {c.stdout.strip()}", err=True)
click_echo(f"Error: {c.stderr.strip()}", err=True)
try:
with open(target_file.name) as fh:
results = json.load(fh)
except (IndexError, JSONDecodeError):
click_echo(c.out.strip(), err=True)
click_echo(c.err.strip(), err=True)
click_echo(c.stdout.strip(), err=True)
click_echo(c.stderr.strip(), err=True)
if os.path.exists(target_file.name):
os.unlink(target_file.name)
raise RuntimeError("There was a problem with locking.")
@@ -1723,14 +1699,13 @@ def temp_path():
def load_path(python):
from ._compat import Path
import delegator
from pathlib import Path
import json
python = Path(python).as_posix()
json_dump_commmand = '"import json, sys; print(json.dumps(sys.path));"'
c = delegator.run(f'"{python}" -c {json_dump_commmand}')
if c.return_code == 0:
return json.loads(c.out.strip())
c = subprocess_run([python, "-c", json_dump_commmand])
if c.returncode == 0:
return json.loads(c.stdout.strip())
else:
return []
@@ -2248,35 +2223,6 @@ def is_python_command(line):
return False
# def make_marker_from_specifier(spec):
# # type: (str) -> Optional[Marker]
# """Given a python version specifier, create a marker
# :param spec: A specifier
# :type spec: str
# :return: A new marker
# :rtype: Optional[:class:`packaging.marker.Marker`]
# """
# from .vendor.packaging.markers import Marker
# from .vendor.packaging.specifiers import SpecifierSet, Specifier
# from .vendor.requirementslib.models.markers import cleanup_pyspecs, format_pyversion
# if not any(spec.startswith(k) for k in Specifier._operators.keys()):
# if spec.strip().lower() in ["any", "<any>", "*"]:
# return None
# spec = "=={0}".format(spec)
# elif spec.startswith("==") and spec.count("=") > 3:
# spec = "=={0}".format(spec.lstrip("="))
# if not spec:
# return None
# marker_segments = []
# print(spec)
# for marker_segment in cleanup_pyspecs(spec):
# print(marker_segment)
# marker_segments.append(format_pyversion(marker_segment))
# marker_str = " and ".join(marker_segments)
# return Marker(marker_str)
@contextlib.contextmanager
def interrupt_handled_subprocess(
cmd, verbose=False, return_object=True, write_to_stdout=False, combine_stderr=True,
@@ -2312,3 +2258,30 @@ def interrupt_handled_subprocess(
os.kill(obj.pid, signal.SIGINT)
obj.wait()
raise
def subprocess_run(
args, *, block=True, text=True, capture_output=True,
encoding=DEFAULT_ENCODING, env=None, **other_kwargs
):
"""A backward compatible version of subprocess.run().
It outputs text with default encoding, and store all outputs in the returned object instead of
printing onto stdout.
"""
if env is not None:
env = dict(os.environ, **env)
other_kwargs['env'] = env
if block:
return subprocess.run(
args, universal_newlines=text, capture_output=capture_output,
encoding=encoding, **other_kwargs
)
else:
if capture_output:
other_kwargs['stdout'] = subprocess.PIPE
other_kwargs['stderr'] = subprocess.PIPE
return subprocess.Popen(
args, universal_newlines=text,
encoding=encoding, **other_kwargs
)
-341
View File
@@ -1,341 +0,0 @@
import os
import subprocess
import shlex
import signal
import sys
import locale
import errno
from pexpect.popen_spawn import PopenSpawn
import pexpect
pexpect.EOF.__module__ = "pexpect.exceptions"
# Include `unicode` in STR_TYPES for Python 2.X
try:
STR_TYPES = (str, unicode)
except NameError:
STR_TYPES = (str,)
TIMEOUT = 30
def pid_exists(pid):
"""Check whether pid exists in the current process table."""
if pid == 0:
# According to "man 2 kill" PID 0 has a special meaning:
# it refers to <<every process in the process group of the
# calling process>> so we don't want to go any further.
# If we get here it means this UNIX platform *does* have
# a process with id 0.
return True
try:
os.kill(pid, 0)
except OSError as err:
if err.errno == errno.ESRCH:
# ESRCH == No such process
return False
elif err.errno == errno.EPERM:
# EPERM clearly means there's a process to deny access to
return True
else:
# According to "man 2 kill" possible error values are
# (EINVAL, EPERM, ESRCH) therefore we should never get
# here. If we do let's be explicit in considering this
# an error.
raise err
else:
return True
class Command(object):
def __init__(self, cmd, timeout=TIMEOUT):
super(Command, self).__init__()
self.cmd = cmd
self.timeout = timeout
self.subprocess = None
self.blocking = None
self.was_run = False
self.__out = None
self.__err = None
def __repr__(self):
return "<Command {!r}>".format(self.cmd)
@property
def _popen_args(self):
return self.cmd
@property
def _default_popen_kwargs(self):
return {
"env": os.environ.copy(),
"stdin": subprocess.PIPE,
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"shell": True,
"universal_newlines": True,
"bufsize": 0,
}
@property
def _default_pexpect_kwargs(self):
encoding = "utf-8"
if sys.platform == "win32":
default_encoding = locale.getdefaultlocale()[1]
if default_encoding is not None:
encoding = default_encoding
return {"env": os.environ.copy(), "encoding": encoding, "timeout": self.timeout}
@property
def _uses_subprocess(self):
return isinstance(self.subprocess, subprocess.Popen)
@property
def _uses_pexpect(self):
return isinstance(self.subprocess, PopenSpawn)
@property
def std_out(self):
return self.subprocess.stdout
@property
def ok(self):
return self.return_code == 0
@property
def _pexpect_out(self):
if self.subprocess.encoding:
result = ""
else:
result = b""
if self.subprocess.before:
result += self.subprocess.before
if self.subprocess.after and self.subprocess.after not in (pexpect.EOF, pexpect.TIMEOUT):
try:
result += self.subprocess.after
except (pexpect.EOF, pexpect.TIMEOUT):
pass
result += self.subprocess.read()
return result
@property
def out(self):
"""Std/out output (cached)"""
if self.__out is not None:
return self.__out
if self._uses_subprocess:
self.__out = self.std_out.read()
else:
self.__out = self._pexpect_out
return self.__out
@property
def std_err(self):
return self.subprocess.stderr
@property
def err(self):
"""Std/err output (cached)"""
if self.__err is not None:
return self.__err
if self._uses_subprocess:
self.__err = self.std_err.read()
return self.__err
else:
return self._pexpect_out
@property
def pid(self):
"""The process' PID."""
# Support for pexpect's functionality.
if hasattr(self.subprocess, "proc"):
return self.subprocess.proc.pid
# Standard subprocess method.
return self.subprocess.pid
@property
def is_alive(self):
"""Is the process alive?"""
return pid_exists(self.pid)
@property
def return_code(self):
# Support for pexpect's functionality.
if self._uses_pexpect:
return self.subprocess.exitstatus
# Standard subprocess method.
return self.subprocess.returncode
@property
def std_in(self):
return self.subprocess.stdin
def run(self, block=True, binary=False, cwd=None, env=None):
"""Runs the given command, with or without pexpect functionality enabled."""
self.blocking = block
# Use subprocess.
if self.blocking:
popen_kwargs = self._default_popen_kwargs.copy()
del popen_kwargs["stdin"]
popen_kwargs["universal_newlines"] = not binary
if cwd:
popen_kwargs["cwd"] = cwd
if env:
popen_kwargs["env"].update(env)
s = subprocess.Popen(self._popen_args, **popen_kwargs)
# Otherwise, use pexpect.
else:
pexpect_kwargs = self._default_pexpect_kwargs.copy()
if binary:
pexpect_kwargs["encoding"] = None
if cwd:
pexpect_kwargs["cwd"] = cwd
if env:
pexpect_kwargs["env"].update(env)
# Enable Python subprocesses to work with expect functionality.
pexpect_kwargs["env"]["PYTHONUNBUFFERED"] = "1"
s = PopenSpawn(self._popen_args, **pexpect_kwargs)
self.subprocess = s
self.was_run = True
def expect(self, pattern, timeout=-1):
"""Waits on the given pattern to appear in std_out"""
if self.blocking:
raise RuntimeError("expect can only be used on non-blocking commands.")
try:
self.subprocess.expect(pattern=pattern, timeout=timeout)
except pexpect.EOF:
pass
def send(self, s, end=os.linesep, signal=False):
"""Sends the given string or signal to std_in."""
if self.blocking:
raise RuntimeError("send can only be used on non-blocking commands.")
if not signal:
if self._uses_subprocess:
return self.subprocess.communicate(s + end)
else:
return self.subprocess.send(s + end)
else:
self.subprocess.send_signal(s)
def terminate(self):
self.subprocess.terminate()
def kill(self):
if self._uses_pexpect:
self.subprocess.kill(signal.SIGINT)
else:
self.subprocess.send_signal(signal.SIGINT)
def block(self):
"""Blocks until process is complete."""
if self._uses_subprocess:
# consume stdout and stderr
if self.blocking:
try:
stdout, stderr = self.subprocess.communicate()
self.__out = stdout
self.__err = stderr
except ValueError:
pass # Don't read from finished subprocesses.
else:
self.subprocess.stdin.close()
self.std_out.close()
self.std_err.close()
self.subprocess.wait()
else:
self.subprocess.sendeof()
try:
self.subprocess.wait()
finally:
if self.subprocess.proc.stdout:
self.subprocess.proc.stdout.close()
def pipe(self, command, timeout=None, cwd=None):
"""Runs the current command and passes its output to the next
given process.
"""
if not timeout:
timeout = self.timeout
if not self.was_run:
self.run(block=False, cwd=cwd)
data = self.out
if timeout:
c = Command(command, timeout)
else:
c = Command(command)
c.run(block=False, cwd=cwd)
if data:
c.send(data)
c.block()
return c
def _expand_args(command):
"""Parses command strings and returns a Popen-ready list."""
# Prepare arguments.
if isinstance(command, STR_TYPES):
if sys.version_info[0] == 2:
splitter = shlex.shlex(command.encode("utf-8"))
elif sys.version_info[0] == 3:
splitter = shlex.shlex(command)
else:
splitter = shlex.shlex(command.encode("utf-8"))
splitter.whitespace = "|"
splitter.whitespace_split = True
command = []
while True:
token = splitter.get_token()
if token:
command.append(token)
else:
break
command = list(map(shlex.split, command))
return command
def chain(command, timeout=TIMEOUT, cwd=None, env=None):
commands = _expand_args(command)
data = None
for command in commands:
c = run(command, block=False, timeout=timeout, cwd=cwd, env=env)
if data:
c.send(data)
c.subprocess.sendeof()
data = c.out
return c
def run(command, block=True, binary=False, timeout=TIMEOUT, cwd=None, env=None):
c = Command(command, timeout=timeout)
c.run(block=block, binary=binary, cwd=cwd, env=env)
if block:
c.block()
return c
-21
View File
@@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright 2018 Kenneth Reitz
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
-1
View File
@@ -9,7 +9,6 @@ click-completion==0.5.2
click-didyoumean==0.0.3
click==8.0.1
colorama==0.4.4
delegator.py==0.1.1
distlib==0.3.2
docopt==0.6.2
dparse==0.5.1
+11 -13
View File
@@ -2,6 +2,7 @@ import errno
import json
import logging
import os
from pipenv.utils import subprocess_run
import shutil
import sys
import warnings
@@ -13,7 +14,7 @@ import requests
from pipenv._compat import Path
from pipenv.exceptions import VirtualenvActivationException
from pipenv.vendor import delegator, toml, tomlkit
from pipenv.vendor import toml, tomlkit
from pipenv.vendor.vistir.compat import (
FileNotFoundError, PermissionError, ResourceWarning, TemporaryDirectory,
fs_encode, fs_str
@@ -66,8 +67,8 @@ def check_github_ssh():
# GitHub does not provide shell access.' if ssh keys are available and
# registered with GitHub. Otherwise, the command will fail with
# return_code=255 and say 'Permission denied (publickey).'
c = delegator.run('ssh -o StrictHostKeyChecking=no -o CheckHostIP=no -T git@github.com', timeout=30)
res = True if c.return_code == 1 else False
c = subprocess_run('ssh -o StrictHostKeyChecking=no -o CheckHostIP=no -T git@github.com', timeout=30, shell=True)
res = True if c.returncode == 1 else False
except KeyboardInterrupt:
warnings.warn(
"KeyboardInterrupt while checking GitHub ssh access", RuntimeWarning
@@ -87,11 +88,8 @@ def check_github_ssh():
def check_for_mercurial():
c = delegator.run("hg --help")
if c.return_code != 0:
return False
else:
return True
c = subprocess_run("hg --help", shell=True)
return c.returncode == 0
TESTS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -385,8 +383,8 @@ class _PipenvInstance:
with TemporaryDirectory(prefix='pipenv-', suffix='-cache') as tempdir:
os.environ['PIPENV_CACHE_DIR'] = fs_str(tempdir.name)
c = delegator.run(
f'pipenv {cmd}', block=block,
c = subprocess_run(
f'pipenv {cmd}', block=block, shell=True,
cwd=os.path.abspath(self.path), env=os.environ.copy()
)
if 'PIPENV_CACHE_DIR' in os.environ:
@@ -398,9 +396,9 @@ class _PipenvInstance:
# Pretty output for failing tests.
if block:
print(f'$ pipenv {cmd}')
print(c.out)
print(c.err, file=sys.stderr)
if c.return_code != 0:
print(c.stdout)
print(c.stderr, file=sys.stderr)
if c.returncode != 0:
print("Command failed...")
# Where the action happens.
+35 -35
View File
@@ -15,18 +15,18 @@ from pipenv.utils import normalize_drive
def test_pipenv_where(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv("--where")
assert c.ok
assert normalize_drive(p.path) in c.out
assert c.returncode == 0
assert normalize_drive(p.path) in c.stdout
@pytest.mark.cli
def test_pipenv_venv(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv('--python python')
assert c.ok
assert c.returncode == 0
c = p.pipenv('--venv')
assert c.ok
venv_path = c.out.strip()
assert c.returncode == 0
venv_path = c.stdout.strip()
assert os.path.isdir(venv_path)
@@ -34,10 +34,10 @@ def test_pipenv_venv(PipenvInstance):
def test_pipenv_py(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv('--python python')
assert c.ok
assert c.returncode == 0
c = p.pipenv('--py')
assert c.ok
python = c.out.strip()
assert c.returncode == 0
python = c.stdout.strip()
assert os.path.basename(python).startswith('python')
@@ -46,12 +46,12 @@ def test_pipenv_site_packages(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv('--python python --site-packages')
assert c.return_code == 0
assert 'Making site-packages available' in c.err
assert 'Making site-packages available' in c.stderr
# no-global-site-packages.txt under stdlib dir should not exist.
c = p.pipenv('run python -c "import sysconfig; print(sysconfig.get_path(\'stdlib\'))"')
assert c.return_code == 0
stdlib_path = c.out.strip()
stdlib_path = c.stdout.strip()
assert not os.path.isfile(os.path.join(stdlib_path, 'no-global-site-packages.txt'))
@@ -59,23 +59,23 @@ def test_pipenv_site_packages(PipenvInstance):
def test_pipenv_support(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv('--support')
assert c.ok
assert c.out
assert c.returncode == 0
assert c.stdout
@pytest.mark.cli
def test_pipenv_rm(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv('--python python')
assert c.ok
assert c.returncode == 0
c = p.pipenv('--venv')
assert c.ok
venv_path = c.out.strip()
assert c.returncode == 0
venv_path = c.stdout.strip()
assert os.path.isdir(venv_path)
c = p.pipenv('--rm')
assert c.ok
assert c.out
assert c.returncode == 0
assert c.stdout
assert not os.path.isdir(venv_path)
@@ -83,7 +83,7 @@ def test_pipenv_rm(PipenvInstance):
def test_pipenv_graph(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv('install tablib')
assert c.ok
assert c.returncode == 0
graph = p.pipenv("graph")
assert graph.ok
assert "tablib" in graph.out
@@ -99,14 +99,14 @@ def test_pipenv_graph(PipenvInstance):
def test_pipenv_graph_reverse(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv('install tablib==0.13.0')
assert c.ok
assert c.returncode == 0
c = p.pipenv('graph --reverse')
assert c.ok
output = c.out
assert c.returncode == 0
output = c.stdout
c = p.pipenv('graph --reverse --json')
assert c.return_code == 1
assert 'Warning: Using both --reverse and --json together is not supported.' in c.err
assert 'Warning: Using both --reverse and --json together is not supported.' in c.stderr
requests_dependency = [
('backports.csv', 'backports.csv'),
@@ -145,14 +145,14 @@ def test_pipenv_check(PipenvInstance):
p.pipenv('install requests==1.0.0')
c = p.pipenv('check')
assert c.return_code != 0
assert 'requests' in c.out
assert 'requests' in c.stdout
c = p.pipenv('uninstall requests')
assert c.ok
assert c.returncode == 0
c = p.pipenv('install six')
assert c.ok
assert c.returncode == 0
c = p.pipenv('check --ignore 35015')
assert c.return_code == 0
assert 'Ignoring' in c.err
assert 'Ignoring' in c.stderr
@pytest.mark.cli
@@ -166,7 +166,7 @@ def test_pipenv_clean_pip_no_warnings(PipenvInstance):
assert c.return_code == 0
c = p.pipenv('clean')
assert c.return_code == 0
assert c.out, f"{c.out} -- STDERR: {c.err}"
assert c.stdout, f"{c.stdout} -- STDERR: {c.stderr}"
@pytest.mark.cli
@@ -180,7 +180,7 @@ def test_pipenv_clean_pip_warnings(PipenvInstance):
assert c.return_code == 0
c = p.pipenv('clean')
assert c.return_code == 0
assert c.err
assert c.stderr
@pytest.mark.cli
@@ -205,8 +205,8 @@ pyver = "which python"
""".strip()
f.write(contents)
c = p.pipenv('scripts')
assert 'pyver' in c.out
assert 'which python' in c.out
assert 'pyver' in c.stdout
assert 'which python' in c.stdout
@pytest.mark.cli
@@ -219,7 +219,7 @@ def test_help(PipenvInstance):
def test_man(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv('--man')
assert c.return_code == 0 or c.err
assert c.return_code == 0, c.stderr
@pytest.mark.cli
@@ -259,8 +259,8 @@ import flask
assert all(pkg in p.pipfile['packages'] for pkg in ['requests', 'click', 'flask']), p.pipfile["packages"]
c = p.pipenv('check --unused .')
assert 'click' not in c.out
assert 'flask' not in c.out
assert 'click' not in c.stdout
assert 'flask' not in c.stdout
@pytest.mark.cli
@@ -268,7 +268,7 @@ def test_pipenv_clear(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv('--clear')
assert c.return_code == 0
assert 'Clearing caches' in c.out
assert 'Clearing caches' in c.stdout
@pytest.mark.cli
@@ -276,7 +276,7 @@ def test_pipenv_three(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv('--three')
assert c.return_code == 0
assert 'Successfully created virtual environment' in c.err
assert 'Successfully created virtual environment' in c.stderr
@pytest.mark.outdated
+2 -2
View File
@@ -65,7 +65,7 @@ def test_venv_file(venv_name, PipenvInstance):
c = p.pipenv('--venv')
assert c.return_code == 0
venv_loc = Path(c.out.strip()).absolute()
venv_loc = Path(c.stdout.strip()).absolute()
assert venv_loc.exists()
assert venv_loc.joinpath('.project').exists()
venv_path = normalize_drive(venv_loc.as_posix())
@@ -96,7 +96,7 @@ def test_venv_file_with_path(PipenvInstance):
assert c.return_code == 0
c = p.pipenv('--venv')
assert c.return_code == 0
venv_loc = Path(c.out.strip())
venv_loc = Path(c.stdout.strip())
assert venv_loc.joinpath('.project').exists()
assert venv_loc == Path(venv_path.name)
+11 -12
View File
@@ -5,8 +5,7 @@ import pytest
from flaky import flaky
from pipenv._compat import Path, TemporaryDirectory
from pipenv.utils import temp_environ
from pipenv.vendor import delegator
from pipenv.utils import subprocess_run, temp_environ
@pytest.mark.setup
@@ -288,9 +287,9 @@ def test_requirements_to_pipfile(PipenvInstance, pypi):
c = p.pipenv("install")
assert c.return_code == 0
print(c.out)
print(c.err)
print(delegator.run("ls -l").out)
print(c.stdout)
print(c.stderr)
print(subprocess_run(["ls", "-l"]).stdout)
# assert stuff in pipfile
assert "requests" in p.pipfile["packages"]
@@ -327,7 +326,7 @@ fake_package = "<0.12"
""".strip()
f.write(contents)
c = p.pipenv("install")
assert c.ok
assert c.returncode == 0
assert "fake_package" in p.pipfile["packages"]
assert "fake-package" in p.lockfile["default"]
assert "six" in p.pipfile["packages"]
@@ -384,7 +383,7 @@ def test_editable_no_args(PipenvInstance):
with PipenvInstance() as p:
c = p.pipenv("install -e")
assert c.return_code != 0
assert "Error: Option '-e' requires an argument" in c.err
assert "Error: Option '-e' requires an argument" in c.stderr
@pytest.mark.basic
@@ -405,7 +404,7 @@ def test_install_venv_project_directory(PipenvInstance):
assert c.return_code == 0
venv_loc = None
for line in c.err.splitlines():
for line in c.stderr.splitlines():
if line.startswith("Virtualenv location:"):
venv_loc = Path(line.split(":", 1)[-1].strip())
assert venv_loc is not None
@@ -421,8 +420,8 @@ def test_system_and_deploy_work(PipenvInstance):
assert c.return_code == 0
c = p.pipenv("--rm")
assert c.return_code == 0
c = delegator.run("virtualenv .venv")
assert c.return_code == 0
c = subprocess_run(["virtualenv", ".venv"])
assert c.returncode == 0
c = p.pipenv("install --system --deploy")
assert c.return_code == 0
c = p.pipenv("--rm")
@@ -456,7 +455,7 @@ def test_install_creates_pipfile(PipenvInstance):
def test_install_non_exist_dep(PipenvInstance):
with PipenvInstance(chdir=True) as p:
c = p.pipenv("install dateutil")
assert not c.ok
assert c.returncode
assert "dateutil" not in p.pipfile["packages"]
@@ -465,7 +464,7 @@ def test_install_non_exist_dep(PipenvInstance):
def test_install_package_with_dots(PipenvInstance):
with PipenvInstance(chdir=True) as p:
c = p.pipenv("install backports.html")
assert c.ok
assert c.returncode == 0
assert "backports.html" in p.pipfile["packages"]
+2 -2
View File
@@ -24,7 +24,7 @@ fake_package = {version = "*", markers="os_name=='splashwear'"}
c = p.pipenv('install')
assert c.return_code == 0
assert 'Ignoring' in c.out
assert 'Ignoring' in c.stdout
assert 'markers' in p.lockfile['default']['fake-package'], p.lockfile["default"]
c = p.pipenv('run python -c "import fake_package;"')
@@ -73,7 +73,7 @@ fake-package = {version = "*", os_name = "== 'splashwear'"}
c = p.pipenv('install')
assert c.return_code == 0
assert 'Ignoring' in c.out
assert 'Ignoring' in c.stdout
assert 'markers' in p.lockfile['default']['fake-package']
c = p.pipenv('run python -c "import fake_package;"')
+3 -4
View File
@@ -8,7 +8,6 @@ from flaky import flaky
from pipenv._compat import Path
from pipenv.utils import mkdir_p, temp_environ
from pipenv.vendor import delegator
@pytest.mark.extras
@@ -369,7 +368,7 @@ def test_multiple_editable_packages_should_not_race(PipenvInstance, testsroot):
assert c.return_code == 0
c = p.pipenv('run python -c "import requests, flask, six, jinja2"')
assert c.return_code == 0, c.err
assert c.return_code == 0, c.stderr
@pytest.mark.outdated
@@ -380,8 +379,8 @@ def test_outdated_should_compare_postreleases_without_failing(PipenvInstance):
assert c.return_code == 0
c = p.pipenv("update --outdated")
assert c.return_code == 0
assert "Skipped Update" in c.err
assert "Skipped Update" in c.stderr
p._pipfile.update("ibm-db-sa-py3", "*")
c = p.pipenv("update --outdated")
assert c.return_code != 0
assert "out-of-date" in c.out
assert "out-of-date" in c.stdout
+7 -8
View File
@@ -5,9 +5,8 @@ import pytest
from flaky import flaky
import delegator
from pipenv._compat import Path
from pipenv.utils import subprocess_run
@flaky
@@ -127,10 +126,10 @@ def test_local_vcs_urls_work(PipenvInstance, tmpdir):
six_dir = tmpdir.join("six")
six_path = Path(six_dir.strpath)
with PipenvInstance(chdir=True) as p:
c = delegator.run(
"git clone https://github.com/benjaminp/six.git {0}".format(six_dir.strpath)
c = subprocess_run(
["git", "clone", "https://github.com/benjaminp/six.git", six_dir.strpath]
)
assert c.return_code == 0
assert c.returncode == 0
c = p.pipenv("install git+{0}#egg=six".format(six_path.as_uri()))
assert c.return_code == 0
@@ -216,9 +215,9 @@ def test_install_local_vcs_not_in_lockfile(PipenvInstance):
with PipenvInstance(chdir=True) as p:
# six_path = os.path.join(p.path, "six")
six_path = p._pipfile.get_fixture_path("git/six/").as_posix()
c = delegator.run("git clone {0} ./six".format(six_path))
assert c.return_code == 0
c = p.pipenv("install -e ./six".format(six_path))
c = subprocess_run(["git", "clone", six_path, "./six"])
assert c.returncode == 0
c = p.pipenv("install -e ./six")
assert c.return_code == 0
six_key = list(p.pipfile["packages"].keys())[0]
# we don't need the rest of the test anymore, this just works on its own
+31 -31
View File
@@ -51,7 +51,7 @@ flask = "==0.12.2"
assert d.return_code == 0
for req in req_list:
assert req in c.out
assert req in c.stdout
for req in dev_req_list:
assert req in d.out
@@ -102,8 +102,8 @@ def test_keep_outdated_doesnt_remove_lockfile_entries(PipenvInstance):
p._pipfile.add("requests", "==2.18.4")
p._pipfile.add("colorama", {"version": "*", "markers": "os_name=='FakeOS'"})
c = p.pipenv("install")
assert c.ok
assert "doesn't match your environment, its dependencies won't be resolved." in c.err
assert c.returncode == 0
assert "doesn't match your environment, its dependencies won't be resolved." in c.stderr
p._pipfile.add("six", "*")
p.pipenv("lock --keep-outdated")
assert "colorama" in p.lockfile["default"]
@@ -115,11 +115,11 @@ def test_resolve_skip_unmatched_requirements(PipenvInstance):
with PipenvInstance(chdir=True) as p:
p._pipfile.add("missing-package", {"markers": "os_name=='FakeOS'"})
c = p.pipenv("lock")
assert c.ok
assert c.returncode == 0
assert (
"Could not find a version of missing-package; "
"os_name == 'FakeOS' that matches your environment"
) in c.err
) in c.stderr
@pytest.mark.lock
@@ -128,10 +128,10 @@ def test_keep_outdated_doesnt_upgrade_pipfile_pins(PipenvInstance):
with PipenvInstance(chdir=True) as p:
p._pipfile.add("urllib3", "==1.21.1")
c = p.pipenv("install")
assert c.ok
assert c.returncode == 0
p._pipfile.add("requests", "==2.18.4")
c = p.pipenv("lock --keep-outdated")
assert c.ok
assert c.returncode == 0
assert "requests" in p.lockfile["default"]
assert "urllib3" in p.lockfile["default"]
assert p.lockfile["default"]["requests"]["version"] == "==2.18.4"
@@ -142,7 +142,7 @@ def test_keep_outdated_doesnt_upgrade_pipfile_pins(PipenvInstance):
def test_keep_outdated_keeps_markers_not_removed(PipenvInstance):
with PipenvInstance(chdir=True) as p:
c = p.pipenv("install six click")
assert c.ok
assert c.returncode == 0
lockfile = Path(p.lockfile_path)
lockfile_content = lockfile.read_text()
lockfile_json = json.loads(lockfile_content)
@@ -150,7 +150,7 @@ def test_keep_outdated_keeps_markers_not_removed(PipenvInstance):
lockfile_json["default"]["six"]["markers"] = "python_version >= '2.7'"
lockfile.write_text(to_text(json.dumps(lockfile_json)))
c = p.pipenv("lock --keep-outdated")
assert c.ok
assert c.returncode == 0
assert p.lockfile["default"]["six"].get("markers", "") == "python_version >= '2.7'"
@@ -160,17 +160,17 @@ def test_keep_outdated_doesnt_update_satisfied_constraints(PipenvInstance):
with PipenvInstance(chdir=True) as p:
p._pipfile.add("requests", "==2.18.4")
c = p.pipenv("install")
assert c.ok
assert c.returncode == 0
p._pipfile.add("requests", "*")
assert p.pipfile["packages"]["requests"] == "*"
c = p.pipenv("lock --keep-outdated")
assert c.ok
assert c.returncode == 0
assert "requests" in p.lockfile["default"]
assert "urllib3" in p.lockfile["default"]
# ensure this didn't update requests
assert p.lockfile["default"]["requests"]["version"] == "==2.18.4"
c = p.pipenv("lock")
assert c.ok
assert c.returncode == 0
assert p.lockfile["default"]["requests"]["version"] != "==2.18.4"
@@ -270,7 +270,7 @@ requests = {version = "*", extras = ["socks"]}
c = p.pipenv('lock -r')
assert c.return_code == 0
assert "extra == 'socks'" not in c.out.strip()
assert "extra == 'socks'" not in c.stdout.strip()
@pytest.mark.lock
@@ -354,8 +354,8 @@ requests = "*"
assert c.return_code == 0
c = p.pipenv('lock -r')
assert c.return_code == 0
assert '-i https://pypi.org/simple' in c.out.strip()
assert '--extra-index-url https://test.pypi.org/simple' in c.out.strip()
assert '-i https://pypi.org/simple' in c.stdout.strip()
assert '--extra-index-url https://test.pypi.org/simple' in c.stdout.strip()
@pytest.mark.lock
@@ -391,9 +391,9 @@ fake-package = "*"
assert c.return_code == 0
c = p.pipenv(f'lock -r --pypi-mirror {mirror_url}')
assert c.return_code == 0
assert f'-i {mirror_url}' in c.out.strip()
assert '--extra-index-url https://test.pypi.org/simple' in c.out.strip()
assert f'--extra-index-url {mirror_url}' not in c.out.strip()
assert f'-i {mirror_url}' in c.stdout.strip()
assert '--extra-index-url https://test.pypi.org/simple' in c.stdout.strip()
assert f'--extra-index-url {mirror_url}' not in c.stdout.strip()
@pytest.mark.lock
@@ -418,7 +418,7 @@ def test_outdated_setuptools_with_pep517_legacy_build_meta_is_updated(PipenvInst
assert c.return_code == 0
c = p.pipenv("run python -c 'import setuptools; print(setuptools.__version__)'")
assert c.return_code == 0
assert c.out.strip() == "40.2.0"
assert c.stdout.strip() == "40.2.0"
c = p.pipenv("install legacy-backend-package")
assert c.return_code == 0
assert "vistir" in p.lockfile["default"]
@@ -441,7 +441,7 @@ def test_outdated_setuptools_with_pep517_cython_import_in_setuppy(PipenvInstance
assert c.return_code == 0
c = p.pipenv("run python -c 'import setuptools; print(setuptools.__version__)'")
assert c.return_code == 0
assert c.out.strip() == "40.2.0"
assert c.stdout.strip() == "40.2.0"
c = p.pipenv("install cython-import-package")
assert c.return_code == 0
assert "vistir" in p.lockfile["default"]
@@ -594,7 +594,7 @@ django = "*"
assert c.return_code == 0
c = p.pipenv('run python --version')
assert c.return_code == 0
py_version = c.err.splitlines()[-1].strip().split()[-1]
py_version = c.stderr.splitlines()[-1].strip().split()[-1]
django_version = '==2.0.6' if py_version.startswith('3') else '==1.11.13'
assert py_version == '2.7.14'
assert p.lockfile['default']['django']['version'] == django_version
@@ -608,7 +608,7 @@ def test_lockfile_corrupted(PipenvInstance):
f.write('{corrupted}')
c = p.pipenv('install')
assert c.return_code == 0
assert 'Pipfile.lock is corrupted' in c.err
assert 'Pipfile.lock is corrupted' in c.stderr
assert p.lockfile['_meta']
@@ -620,7 +620,7 @@ def test_lockfile_with_empty_dict(PipenvInstance):
f.write('{}')
c = p.pipenv('install')
assert c.return_code == 0
assert 'Pipfile.lock is corrupted' in c.err
assert 'Pipfile.lock is corrupted' in c.stderr
assert p.lockfile['_meta']
@@ -653,9 +653,9 @@ def test_lock_no_warnings(PipenvInstance):
assert c.return_code == 0
c = p.pipenv('run python -c "import warnings; warnings.warn(\\"This is a warning\\", DeprecationWarning); print(\\"hello\\")"')
assert c.return_code == 0
assert "Warning" in c.err
assert "Warning" not in c.out
assert "hello" in c.out
assert "Warning" in c.stderr
assert "Warning" not in c.stdout
assert "hello" in c.stdout
@pytest.mark.lock
@@ -673,9 +673,9 @@ def test_lock_missing_cache_entries_gets_all_hashes(PipenvInstance, tmpdir):
p._pipfile.add("pathlib2", "*")
assert "pathlib2" in p.pipfile["packages"]
c = p.pipenv("install")
assert c.return_code == 0, (c.err, ("\n".join([f"{k}: {v}\n" for k, v in os.environ.items()])))
assert c.return_code == 0, (c.stderr, ("\n".join([f"{k}: {v}\n" for k, v in os.environ.items()])))
c = p.pipenv("lock --clear")
assert c.return_code == 0, c.err
assert c.return_code == 0, c.stderr
assert "pathlib2" in p.lockfile["default"]
assert "scandir" in p.lockfile["default"]
assert isinstance(p.lockfile["default"]["scandir"]["hashes"], list)
@@ -765,7 +765,7 @@ def test_lock_nested_vcs_direct_url(PipenvInstance):
def test_lock_package_with_wildcard_version(PipenvInstance):
with PipenvInstance(chdir=True) as p:
c = p.pipenv("install 'six==1.11.*'")
assert c.ok
assert c.returncode == 0
assert "six" in p.pipfile["packages"]
assert p.pipfile["packages"]["six"] == "==1.11.*"
assert "six" in p.lockfile["default"]
@@ -778,8 +778,8 @@ def test_lock_package_with_wildcard_version(PipenvInstance):
def test_default_lock_overwrite_dev_lock(PipenvInstance):
with PipenvInstance(chdir=True) as p:
c = p.pipenv("install 'click==6.7'")
assert c.ok
assert c.returncode == 0
c = p.pipenv("install -d flask")
assert c.ok
assert c.returncode == 0
assert p.lockfile["default"]["click"]["version"] == "==6.7"
assert p.lockfile["develop"]["click"]["version"] == "==6.7"
+7 -8
View File
@@ -8,8 +8,7 @@ import os
import pytest
from pipenv.project import Project
from pipenv.utils import temp_environ
from pipenv.vendor import delegator
from pipenv.utils import subprocess_run, temp_environ
@pytest.mark.code
@@ -40,8 +39,8 @@ pytest = "==4.6.9"
f.write(contents)
c = p.pipenv('install --verbose')
if c.return_code != 0:
assert c.err == '' or c.err is None
assert c.out == ''
assert c.stderr == '' or c.stderr is None
assert c.stdout == ''
assert c.return_code == 0
c = p.pipenv('lock')
assert c.return_code == 0
@@ -74,7 +73,7 @@ def test_update_locks(PipenvInstance):
assert p.lockfile['default']['jdcal']['version'] == '==1.4'
c = p.pipenv('run pip freeze')
assert c.return_code == 0
lines = c.out.splitlines()
lines = c.stdout.splitlines()
assert 'jdcal==1.4' in [l.strip() for l in lines]
@@ -82,8 +81,8 @@ def test_update_locks(PipenvInstance):
@pytest.mark.proper_names
def test_proper_names_unamanged_virtualenv(PipenvInstance):
with PipenvInstance(chdir=True):
c = delegator.run('python -m virtualenv .venv')
assert c.return_code == 0
c = subprocess_run(['python', '-m', 'virtualenv', '.venv'])
assert c.returncode == 0
project = Project()
assert project.proper_names == []
@@ -98,7 +97,7 @@ def test_directory_with_leading_dash(raw_venv, PipenvInstance):
assert c.return_code == 0
c = p.pipenv('--venv')
assert c.return_code == 0
venv_path = c.out.strip()
venv_path = c.stdout.strip()
assert os.path.isdir(venv_path)
# Manually clean up environment, since PipenvInstance assumes that
# the virutalenv is in the project directory.
+23 -25
View File
@@ -1,4 +1,3 @@
import io
import os
import tarfile
@@ -6,9 +5,8 @@ import pytest
from pipenv.patched import pipfile
from pipenv.project import Project
from pipenv.utils import temp_environ
from pipenv.utils import subprocess_run, temp_environ
from pipenv.vendor.vistir.path import is_in_path, normalize_path
from pipenv.vendor.delegator import run as delegator_run
@pytest.mark.project
@@ -172,40 +170,40 @@ def test_include_editable_packages(PipenvInstance, testsroot, pathlib_tmpdir):
@pytest.mark.virtualenv
def test_run_in_virtualenv_with_global_context(PipenvInstance, virtualenv):
with PipenvInstance(chdir=True, venv_root=virtualenv.as_posix(), ignore_virtualenvs=False, venv_in_project=False) as p:
c = delegator_run(
"pipenv run pip freeze", cwd=os.path.abspath(p.path),
c = subprocess_run(
["pipenv", "run", "pip", "freeze"], cwd=os.path.abspath(p.path),
env=os.environ.copy()
)
assert c.return_code == 0, (c.out, c.err)
assert 'Creating a virtualenv' not in c.err, c.err
assert c.returncode == 0, (c.stdout, c.stderr)
assert 'Creating a virtualenv' not in c.stderr, c.stderr
project = Project()
assert project.virtualenv_location == virtualenv.as_posix(), (
project.virtualenv_location, virtualenv.as_posix()
)
c = delegator_run(
f"pipenv run pip install -i {p.index_url} click",
c = subprocess_run(
["pipenv", "run", "pip", "install", "-i", p.index_url, "click"],
cwd=os.path.abspath(p.path),
env=os.environ.copy()
)
assert c.return_code == 0, (c.out, c.err)
assert "Courtesy Notice" in c.err, (c.out, c.err)
c = delegator_run(
f"pipenv install -i {p.index_url} six",
assert c.returncode == 0, (c.stdout, c.stderr)
assert "Courtesy Notice" in c.stderr, (c.stdout, c.stderr)
c = subprocess_run(
["pipenv", "install", "-i", p.index_url, "six"],
cwd=os.path.abspath(p.path), env=os.environ.copy()
)
assert c.return_code == 0, (c.out, c.err)
c = delegator_run(
'pipenv run python -c "import click;print(click.__file__)"',
assert c.returncode == 0, (c.stdout, c.stderr)
c = subprocess_run(
['pipenv', 'run', 'python', '-c', 'import click;print(click.__file__)'],
cwd=os.path.abspath(p.path), env=os.environ.copy()
)
assert c.return_code == 0, (c.out, c.err)
assert is_in_path(c.out.strip(), str(virtualenv)), (c.out.strip(), str(virtualenv))
c = delegator_run(
"pipenv clean --dry-run", cwd=os.path.abspath(p.path),
assert c.returncode == 0, (c.stdout, c.stderr)
assert is_in_path(c.stdout.strip(), str(virtualenv)), (c.stdout.strip(), str(virtualenv))
c = subprocess_run(
["pipenv", "clean", "--dry-run"], cwd=os.path.abspath(p.path),
env=os.environ.copy()
)
assert c.return_code == 0, (c.out, c.err)
assert "click" in c.out, c.out
assert c.returncode == 0, (c.stdout, c.stderr)
assert "click" in c.stdout, c.stdout
@pytest.mark.project
@@ -214,7 +212,7 @@ def test_run_in_virtualenv(PipenvInstance):
with PipenvInstance(chdir=True) as p:
c = p.pipenv('run pip freeze')
assert c.return_code == 0
assert 'Creating a virtualenv' in c.err
assert 'Creating a virtualenv' in c.stderr
project = Project()
c = p.pipenv("run pip install click")
assert c.return_code == 0
@@ -222,12 +220,12 @@ def test_run_in_virtualenv(PipenvInstance):
assert c.return_code == 0
c = p.pipenv('run python -c "import click;print(click.__file__)"')
assert c.return_code == 0
assert normalize_path(c.out.strip()).startswith(
assert normalize_path(c.stdout.strip()).startswith(
normalize_path(str(project.virtualenv_location))
)
c = p.pipenv("clean --dry-run")
assert c.return_code == 0
assert "click" in c.out
assert "click" in c.stdout
@pytest.mark.project
@pytest.mark.sources
+8 -8
View File
@@ -15,7 +15,7 @@ def test_env(PipenvInstance):
c = p.pipenv('run python -c "import os; print(os.environ[\'HELLO\'])"')
assert c.return_code == 0
assert 'WORLD' in c.out
assert 'WORLD' in c.stdout
@pytest.mark.run
@@ -38,15 +38,15 @@ multicommand = "bash -c \"cd docs && make html\""
c = p.pipenv('run printfoo')
assert c.return_code == 0
assert c.out == 'foo\n'
assert c.err == ''
assert c.stdout == 'foo\n'
assert c.stderr == ''
c = p.pipenv('run notfoundscript')
assert c.return_code == 1
assert c.out == ''
assert c.stdout == ''
if os.name != 'nt': # TODO: Implement this message for Windows.
assert 'Error' in c.err
assert 'randomthingtotally (from notfoundscript)' in c.err
assert 'Error' in c.stderr
assert 'randomthingtotally (from notfoundscript)' in c.stderr
project = Project()
@@ -61,6 +61,6 @@ multicommand = "bash -c \"cd docs && make html\""
with temp_environ():
os.environ['HELLO'] = 'WORLD'
c = p.pipenv("run scriptwithenv")
assert c.ok
assert c.returncode == 0
if os.name != "nt": # This doesn't work on CI windows.
assert c.out.strip() == "WORLD"
assert c.stdout.strip() == "WORLD"
+2 -2
View File
@@ -17,7 +17,7 @@ def test_sync_error_without_lockfile(PipenvInstance):
c = p.pipenv('sync')
assert c.return_code != 0
assert 'Pipfile.lock not found!' in c.err
assert 'Pipfile.lock not found!' in c.stderr
@pytest.mark.sync
@@ -110,4 +110,4 @@ requests = "*"
c = p.pipenv('sync --sequential --verbose')
for package in p.lockfile['default']:
assert f'Successfully installed {package}' in c.out
assert f'Successfully installed {package}' in c.stdout
+2 -2
View File
@@ -105,7 +105,7 @@ def test_uninstall_all_local_files(PipenvInstance, testsroot):
assert c.return_code == 0
c = p.pipenv("uninstall --all")
assert c.return_code == 0
assert "tablib" in c.out
assert "tablib" in c.stdout
# Uninstall --all is not supposed to remove things from the pipfile
# Note that it didn't before, but that instead local filenames showed as hashes
assert "tablib" in p.pipfile["packages"]
@@ -193,4 +193,4 @@ def test_uninstall_missing_parameters(PipenvInstance):
c = p.pipenv("uninstall")
assert c.return_code != 0
assert "No package provided!" in c.err
assert "No package provided!" in c.stderr
+3 -3
View File
@@ -20,7 +20,7 @@ def test_case_changes_windows(PipenvInstance):
# Canonical venv location.
c = p.pipenv('--venv')
assert c.return_code == 0
virtualenv_location = c.out.strip()
virtualenv_location = c.stdout.strip()
# Dance around to change the casing of the project directory.
target = p.path.upper()
@@ -33,7 +33,7 @@ def test_case_changes_windows(PipenvInstance):
# Ensure the incorrectly-cased project can find the correct venv.
c = p.pipenv('--venv')
assert c.return_code == 0
assert c.out.strip().lower() == virtualenv_location.lower()
assert c.stdout.strip().lower() == virtualenv_location.lower()
@pytest.mark.files
@@ -78,4 +78,4 @@ def test_pipenv_clean_windows(PipenvInstance):
c = p.pipenv('clean --dry-run')
assert c.return_code == 0
assert 'click' in c.out.strip()
assert 'click' in c.stdout.strip()