Merge pull request #5468 from pypa/remove-yaspin-spinner

Remove yaspin spinner
This commit is contained in:
Oz N Tiram
2022-11-18 10:27:21 +01:00
committed by GitHub
18 changed files with 162 additions and 555 deletions
+2
View File
@@ -0,0 +1,2 @@
* Replace yaspin spinner with rich spinner.
* Bump vistir version to 0.7.4
+5 -2
View File
@@ -80,8 +80,8 @@ def cli(
site_packages=None,
**kwargs,
):
from pipenv.patched.pip._vendor import rich
from pipenv.utils.shell import system_which
from pipenv.utils.spinner import create_spinner
load_dot_env(state.project, quiet=state.quiet)
@@ -188,7 +188,10 @@ def cli(
)
)
)
with create_spinner(text="Running...", setting=state.project.s):
console = rich.console.Console()
# TODO: add state.project.s to spinner status
with console.status("Running..."):
# Remove the virtualenv.
cleanup_virtualenv(state.project, bare=True)
return 0
+43 -34
View File
@@ -24,6 +24,7 @@ from pipenv.patched.pip._internal.req.constructors import (
)
from pipenv.patched.pip._internal.req.req_file import parse_requirements
from pipenv.patched.pip._internal.utils.misc import split_auth_from_netloc
from pipenv.patched.pip._vendor import rich
from pipenv.patched.pip._vendor.packaging.utils import canonicalize_name
from pipenv.project import Project
from pipenv.utils.constants import MYPY_RUNNING
@@ -52,7 +53,6 @@ from pipenv.utils.shell import (
subprocess_run,
system_which,
)
from pipenv.utils.spinner import create_spinner
from pipenv.vendor import click, plette, vistir
from pipenv.vendor.requirementslib.models.requirements import Requirement
@@ -94,6 +94,10 @@ else:
STARTING_LABEL = " "
console = rich.console.Console()
err = rich.console.Console(stderr=True)
def do_clear(project):
from pipenv.patched.pip._internal import locations
@@ -245,14 +249,16 @@ def ensure_pipfile(project, validate=True, skip_requirements=False, system=False
)
# Create a Pipfile...
project.create_pipfile(python=python)
with create_spinner("Importing requirements...", project.s) as sp:
with console.status(
"Importing requirements...", spinner=project.s.PIPENV_SPINNER
) as st:
# Import requirements.txt.
try:
import_requirements(project)
except Exception:
sp.fail(environments.PIPENV_SPINNER_FAIL_TEXT.format("Failed..."))
err.print(environments.PIPENV_SPINNER_FAIL_TEXT.format("Failed..."))
else:
sp.ok(environments.PIPENV_SPINNER_OK_TEXT.format("Success!"))
st.update(environments.PIPENV_SPINNER_OK_TEXT.format("Success!"))
# Warn the user of side-effects.
click.echo(
"{0}: Your {1} now contains pinned versions, if your {2} did. \n"
@@ -398,17 +404,18 @@ def ensure_python(project, three=None, python=None):
click.style("...", bold=True),
)
)
with create_spinner("Installing python...", project.s) as sp:
# TOOD: pass project settings to console.status
with console.status("Installing python...") as st:
try:
c = installer.install(version)
except InstallerError as e:
sp.fail(
err.print(
environments.PIPENV_SPINNER_FAIL_TEXT.format("Failed...")
)
click.echo(fix_utf8("Something went wrong..."), err=True)
click.secho(e.err, fg="cyan", err=True)
else:
sp.ok(environments.PIPENV_SPINNER_OK_TEXT.format("Success!"))
st(environments.PIPENV_SPINNER_OK_TEXT.format("Success!"))
# Print the results, in a beautiful blue...
click.secho(c.stdout, fg="cyan", err=True)
# Clear the pythonfinder caches
@@ -1003,20 +1010,22 @@ def do_create_virtualenv(project, python=None, site_packages=None, pypi_mirror=N
# Actually create the virtualenv.
error = None
with create_spinner("Creating virtual environment...", project.s) as sp:
with console.status(
"Creating virtual environment...", spinner=project.s.PIPENV_SPINNER
):
c = subprocess_run(cmd, env=pip_config)
click.secho(f"{c.stdout}", fg="cyan", err=True)
if c.returncode != 0:
error = (
c.stderr if project.s.is_verbose() else exceptions.prettify_exc(c.stderr)
)
sp.fail(
err.print(
environments.PIPENV_SPINNER_FAIL_TEXT.format(
"Failed creating virtual environment"
)
)
else:
sp.green.ok(
console.print(
environments.PIPENV_SPINNER_OK_TEXT.format(
"Successfully created virtual environment!"
)
@@ -2284,6 +2293,7 @@ def do_install(
extra_pip_args=extra_pip_args,
categories=categories,
)
for pkg_line in pkg_list:
click.secho(
fix_utf8(f"Installing {pkg_line}..."),
@@ -2291,29 +2301,29 @@ def do_install(
bold=True,
)
# pip install:
with vistir.contextmanagers.temp_environ(), create_spinner(
"Installing...", project.s
) as sp:
with vistir.contextmanagers.temp_environ(), console.status(
"Installing...", spinner=project.s.PIPENV_SPINNER
) as st:
if not system:
os.environ["PIP_USER"] = "0"
if "PYTHONHOME" in os.environ:
del os.environ["PYTHONHOME"]
sp.text = f"Resolving {pkg_line}..."
st.update(f"Resolving {pkg_line}...")
try:
pkg_requirement = Requirement.from_line(pkg_line)
except ValueError as e:
sp.write_err("{}: {}".format(click.style("WARNING", fg="red"), e))
sp.red.fail(
err.print("{}: {}".format(click.style("WARNING", fg="red"), e))
err.print(
environments.PIPENV_SPINNER_FAIL_TEXT.format(
"Installation Failed"
)
)
sys.exit(1)
sp.text = "Installing..."
st.update("Installing...")
try:
sp.text = f"Installing {pkg_requirement.name}..."
st.update(f"Installing {pkg_requirement.name}...")
if project.s.is_verbose():
sp.hide_and_write(
st.update(
f"Installing package: {pkg_requirement.as_line(include_hashes=False)}"
)
c = pip_install(
@@ -2332,34 +2342,32 @@ def do_install(
extra_pip_args=extra_pip_args,
)
if c.returncode:
sp.write_err(
err.print(
"{} An error occurred while installing {}!".format(
click.style("Error: ", fg="red", bold=True),
click.style(pkg_line, fg="green"),
),
)
sp.write_err(f"Error text: {c.stdout}")
sp.write_err(click.style(format_pip_error(c.stderr), fg="cyan"))
err.print(f"Error text: {c.stdout}")
err.print(click.style(format_pip_error(c.stderr), fg="cyan"))
if project.s.is_verbose():
sp.write_err(
click.style(format_pip_output(c.stdout), fg="cyan")
)
err.print(click.style(format_pip_output(c.stdout), fg="cyan"))
if "setup.py egg_info" in c.stderr:
sp.write_err(
err.print(
"This is likely caused by a bug in {}. "
"Report this to its maintainers.".format(
click.style(pkg_requirement.name, fg="green")
)
)
sp.red.fail(
err.print(
environments.PIPENV_SPINNER_FAIL_TEXT.format(
"Installation Failed"
)
)
sys.exit(1)
except (ValueError, RuntimeError) as e:
sp.write_err("{}: {}".format(click.style("WARNING", fg="red"), e))
sp.red.fail(
err.print("{}: {}".format(click.style("WARNING", fg="red"), e))
err.print(
environments.PIPENV_SPINNER_FAIL_TEXT.format(
"Installation Failed",
)
@@ -2371,7 +2379,7 @@ def do_install(
and not pkg_requirement.editable
and not project.s.PIPENV_RESOLVE_VCS
):
sp.write_err(
err.print(
"{}: You installed a VCS dependency in non-editable mode. "
"This will work fine, but sub-dependencies will not be resolved by {}."
"\n To enable this sub-dependency functionality, specify that this dependency is editable."
@@ -2388,7 +2396,7 @@ def do_install(
pipfile_sections = "[dev-packages]"
else:
pipfile_sections = "[packages]"
sp.write(
st.update(
"{} {} {} {}{}".format(
click.style("Adding", bold=True),
click.style(f"{pkg_requirement.name}", fg="green", bold=True),
@@ -2416,18 +2424,19 @@ def do_install(
except ValueError:
import traceback
sp.write_err(
err.print(
"{} {}".format(
click.style("Error:", fg="red", bold=True),
traceback.format_exc(),
)
)
sp.fail(
err.print(
environments.PIPENV_SPINNER_FAIL_TEXT.format(
"Failed adding package to Pipfile"
)
)
sp.ok(
# ok has a nice v in front, should do something similir with rich
st.update(
environments.PIPENV_SPINNER_OK_TEXT.format("Installation Succeeded")
)
# Update project settings with pre preference.
+13 -6
View File
@@ -218,14 +218,21 @@ class Setting:
if PIPENV_IS_CI:
self.PIPENV_NOSPIN = True
pipenv_spinner = "dots" if not os.name == "nt" else "bouncingBar"
self.PIPENV_SPINNER = get_from_env(
"SPINNER", check_for_negation=False, default=pipenv_spinner
)
if self.PIPENV_NOSPIN:
from pipenv.patched.pip._vendor.rich import _spinners
_spinners.SPINNERS[None] = {"interval": 80, "frames": " "}
self.PIPENV_SPINNER = None
else:
pipenv_spinner = "dots" if not os.name == "nt" else "bouncingBar"
self.PIPENV_SPINNER = get_from_env(
"SPINNER", check_for_negation=False, default=pipenv_spinner
)
"""Sets the default spinner type.
Spinners are identical to the ``node.js`` spinners and can be found at
https://github.com/sindresorhus/cli-spinners
You can see which spinners are available by running::
$ python -m pipenv.patched.pip._vendor.rich.spinner
"""
pipenv_pipfile = get_from_env("PIPFILE", check_for_negation=False)
+13 -16
View File
@@ -11,7 +11,6 @@ from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Union
from pipenv import environments
from pipenv._compat import decode_for_output
from pipenv.exceptions import RequirementError, ResolutionFailure
from pipenv.patched.pip._internal.cache import WheelCache
from pipenv.patched.pip._internal.commands.install import InstallCommand
@@ -27,7 +26,7 @@ from pipenv.patched.pip._internal.req.constructors import (
from pipenv.patched.pip._internal.req.req_file import parse_requirements
from pipenv.patched.pip._internal.utils.hashes import FAVORITE_HASH
from pipenv.patched.pip._internal.utils.temp_dir import global_tempdir_manager
from pipenv.patched.pip._vendor import pkg_resources
from pipenv.patched.pip._vendor import pkg_resources, rich
from pipenv.project import Project
from pipenv.vendor import click
from pipenv.vendor.requirementslib import Requirement
@@ -59,7 +58,9 @@ from .indexes import parse_indexes, prepare_pip_source_args
from .internet import _get_requests_session, is_pypi_url
from .locking import format_requirement_for_lockfile, prepare_lockfile
from .shell import make_posix, subprocess_run, temp_environ
from .spinner import create_spinner
console = rich.console.Console()
err = rich.console.Console(stderr=True)
def get_package_finder(
@@ -911,7 +912,7 @@ def actually_resolve_deps(
return (results, hashes, resolver.markers_lookup, resolver, resolver.skipped)
def resolve(cmd, sp, project):
def resolve(cmd, st, project):
from pipenv._compat import decode_output
from pipenv.cmdparse import Script
from pipenv.vendor.vistir.misc import echo
@@ -925,13 +926,13 @@ def resolve(cmd, sp, project):
continue
err += line
if is_verbose:
sp.hide_and_write(line.rstrip())
st.update(line.rstrip())
c.wait()
returncode = c.poll()
out = c.stdout.read()
if returncode != 0:
sp.red.fail(environments.PIPENV_SPINNER_FAIL_TEXT.format("Locking Failed!"))
st.update(environments.PIPENV_SPINNER_FAIL_TEXT.format("Locking Failed!"))
echo(out.strip(), err=True)
if not is_verbose:
echo(err, err=True)
@@ -1026,14 +1027,12 @@ def venv_resolve_deps(
os.environ.pop("PIPENV_SITE_DIR", None)
if keep_outdated:
os.environ["PIPENV_KEEP_OUTDATED"] = "1"
with create_spinner(
text=decode_for_output("Locking..."), setting=project.s
) as sp:
with console.status("Locking...", spinner=project.s.PIPENV_SPINNER) as st:
# This conversion is somewhat slow on local and file-type requirements since
# we now download those requirements / make temporary folders to perform
# dependency resolution on them, so we are including this step inside the
# spinner context manager for the UX improvement
sp.write(decode_for_output("Building requirements..."))
st.update("Building requirements...")
deps = convert_deps_to_pip(deps, project, include_index=True)
constraints = set(deps)
with tempfile.NamedTemporaryFile(
@@ -1042,16 +1041,14 @@ def venv_resolve_deps(
constraints_file.write(str("\n".join(constraints)))
cmd.append("--constraints-file")
cmd.append(constraints_file.name)
sp.write(decode_for_output("Resolving dependencies..."))
c = resolve(cmd, sp, project=project)
st.update("Resolving dependencies...")
c = resolve(cmd, st, project=project)
if c.returncode == 0:
sp.green.ok(environments.PIPENV_SPINNER_OK_TEXT.format("Success!"))
st.update(environments.PIPENV_SPINNER_OK_TEXT.format("Success!"))
if not project.s.is_verbose() and c.stderr.strip():
click.echo(click.style(f"Warning: {c.stderr.strip()}"), err=True)
else:
sp.red.fail(
environments.PIPENV_SPINNER_FAIL_TEXT.format("Locking Failed!")
)
st.update(environments.PIPENV_SPINNER_FAIL_TEXT.format("Locking Failed!"))
click.echo(f"Output: {c.stdout.strip()}", err=True)
click.echo(f"Error: {c.stderr.strip()}", err=True)
try:
-18
View File
@@ -1,18 +0,0 @@
import contextlib
@contextlib.contextmanager
def create_spinner(text, setting, nospin=None, spinner_name=None):
from pipenv.vendor.vistir import spin
if not spinner_name:
spinner_name = setting.PIPENV_SPINNER
if nospin is None:
nospin = setting.PIPENV_NOSPIN
with spin.create_spinner(
spinner_name=spinner_name,
start_text=text,
nospin=nospin,
write_to_stdout=False,
) as sp:
yield sp
+1 -1
View File
@@ -19,6 +19,6 @@ shellingham==1.5.0
termcolor==1.1.0
toml==0.10.2
tomlkit==0.9.2
vistir==0.6.1
vistir==0.7.4
wheel==0.37.1
yaspin==2.0.0
+1 -10
View File
@@ -1,13 +1,5 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
from .compat import (
NamedTemporaryFile,
StringIO,
TemporaryDirectory,
partialmethod,
to_native_string,
)
from .contextmanagers import (
atomic_open_for_write,
cd,
@@ -36,7 +28,7 @@ from .misc import (
from .path import create_tracked_tempdir, create_tracked_tempfile, mkdir_p, rmtree
from .spin import create_spinner
__version__ = "0.6.1"
__version__ = "0.7.4"
__all__ = [
@@ -58,7 +50,6 @@ __all__ = [
"create_spinner",
"create_tracked_tempdir",
"create_tracked_tempfile",
"to_native_string",
"decode_for_output",
"to_text",
"to_bytes",
+2 -2
View File
@@ -44,6 +44,7 @@ import io
import os
import sys
import time
import typing
import zlib
from ctypes import (
POINTER,
@@ -65,7 +66,6 @@ from itertools import count
import msvcrt
from .compat import IS_TYPE_CHECKING
from .misc import StreamWrapper, run, to_text
try:
@@ -77,7 +77,7 @@ except ImportError:
pythonapi = None
if IS_TYPE_CHECKING:
if typing.TYPE_CHECKING:
from typing import Text
-320
View File
@@ -1,320 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import codecs
import errno
import os
import sys
import warnings
from tempfile import mkdtemp
__all__ = [
"Path",
"get_terminal_size",
"finalize",
"partialmethod",
"JSONDecodeError",
"FileNotFoundError",
"ResourceWarning",
"PermissionError",
"is_type_checking",
"IS_TYPE_CHECKING",
"IsADirectoryError",
"fs_str",
"lru_cache",
"TemporaryDirectory",
"NamedTemporaryFile",
"to_native_string",
"samefile",
"Mapping",
"Hashable",
"MutableMapping",
"Container",
"Iterator",
"KeysView",
"ItemsView",
"MappingView",
"Iterable",
"Set",
"Sequence",
"Sized",
"ValuesView",
"MutableSet",
"MutableSequence",
"Callable",
"fs_encode",
"fs_decode",
"_fs_encode_errors",
"_fs_decode_errors",
]
from pathlib import Path
from functools import lru_cache, partialmethod
from tempfile import NamedTemporaryFile
from shutil import get_terminal_size
from weakref import finalize
from collections.abc import (
Mapping,
Hashable,
MutableMapping,
Container,
Iterator,
KeysView,
ItemsView,
MappingView,
Iterable,
Set,
Sequence,
Sized,
ValuesView,
MutableSet,
MutableSequence,
Callable,
)
from os.path import samefile
from json import JSONDecodeError
from builtins import (
ResourceWarning,
FileNotFoundError,
PermissionError,
IsADirectoryError,
FileExistsError,
TimeoutError,
)
from io import StringIO
if not sys.warnoptions:
warnings.simplefilter("default", ResourceWarning)
def is_type_checking():
try:
from typing import TYPE_CHECKING
except ImportError:
return False
return TYPE_CHECKING
IS_TYPE_CHECKING = os.environ.get("MYPY_RUNNING", is_type_checking())
class TemporaryDirectory(object):
"""
Create and return a temporary directory. This has the same
behavior as mkdtemp but can be used as a context manager. For
example:
with TemporaryDirectory() as tmpdir:
...
Upon exiting the context, the directory and everything contained
in it are removed.
"""
def __init__(self, suffix="", prefix=None, dir=None):
if "RAM_DISK" in os.environ:
import uuid
name = uuid.uuid4().hex
dir_name = os.path.join(os.environ["RAM_DISK"].strip(), name)
os.mkdir(dir_name)
self.name = dir_name
else:
suffix = suffix if suffix else ""
if not prefix:
self.name = mkdtemp(suffix=suffix, dir=dir)
else:
self.name = mkdtemp(suffix, prefix, dir)
self._finalizer = finalize(
self,
self._cleanup,
self.name,
warn_message="Implicitly cleaning up {!r}".format(self),
)
@classmethod
def _rmtree(cls, name):
from .path import rmtree
rmtree(name)
@classmethod
def _cleanup(cls, name, warn_message):
cls._rmtree(name)
warnings.warn(warn_message, ResourceWarning)
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.name)
def __enter__(self):
return self
def __exit__(self, exc, value, tb):
self.cleanup()
def cleanup(self):
if self._finalizer.detach():
self._rmtree(self.name)
def is_bytes(string):
"""Check if a string is a bytes instance.
:param Union[str, bytes] string: A string that may be string or bytes like
:return: Whether the provided string is a bytes type or not
:rtype: bool
"""
if isinstance(string, (bytes, memoryview, bytearray)): # noqa
return True
return False
def fs_str(string):
"""Encodes a string into the proper filesystem encoding.
Borrowed from pip-tools
"""
if isinstance(string, str):
return string
assert not isinstance(string, bytes)
return string.encode(_fs_encoding)
def _get_path(path):
"""Fetch the string value from a path-like object.
Returns **None** if there is no string value.
"""
if isinstance(path, (str, bytes)):
return path
path_type = type(path)
try:
path_repr = path_type.__fspath__(path)
except AttributeError:
return
if isinstance(path_repr, (str, bytes)):
return path_repr
return
# copied from the os backport which in turn copied this from
# the pyutf8 package --
# URL: https://github.com/etrepum/pyutf8/blob/master/pyutf8/ref.py
#
def _invalid_utf8_indexes(bytes):
skips = []
i = 0
len_bytes = len(bytes)
while i < len_bytes:
c1 = bytes[i]
if c1 < 0x80:
# U+0000 - U+007F - 7 bits
i += 1
continue
try:
c2 = bytes[i + 1]
if (c1 & 0xE0 == 0xC0) and (c2 & 0xC0 == 0x80):
# U+0080 - U+07FF - 11 bits
c = ((c1 & 0x1F) << 6) | (c2 & 0x3F)
if c < 0x80: # pragma: no cover
# Overlong encoding
skips.extend([i, i + 1]) # pragma: no cover
i += 2
continue
c3 = bytes[i + 2]
if (c1 & 0xF0 == 0xE0) and (c2 & 0xC0 == 0x80) and (c3 & 0xC0 == 0x80):
# U+0800 - U+FFFF - 16 bits
c = ((((c1 & 0x0F) << 6) | (c2 & 0x3F)) << 6) | (c3 & 0x3F)
if (c < 0x800) or (0xD800 <= c <= 0xDFFF):
# Overlong encoding or surrogate.
skips.extend([i, i + 1, i + 2])
i += 3
continue
c4 = bytes[i + 3]
if (
(c1 & 0xF8 == 0xF0)
and (c2 & 0xC0 == 0x80)
and (c3 & 0xC0 == 0x80)
and (c4 & 0xC0 == 0x80)
):
# U+10000 - U+10FFFF - 21 bits
c = ((((((c1 & 0x0F) << 6) | (c2 & 0x3F)) << 6) | (c3 & 0x3F)) << 6) | (
c4 & 0x3F
)
if (c < 0x10000) or (c > 0x10FFFF): # pragma: no cover
# Overlong encoding or invalid code point.
skips.extend([i, i + 1, i + 2, i + 3])
i += 4
continue
except IndexError:
pass
skips.append(i)
i += 1
return skips
def fs_encode(path):
"""Encode a filesystem path to the proper filesystem encoding.
:param Union[str, bytes] path: A string-like path
:returns: A bytes-encoded filesystem path representation
"""
path = _get_path(path)
if path is None:
raise TypeError("expected a valid path to encode")
if isinstance(path, str):
return path.encode(_fs_encoding, _fs_encode_errors)
return path
def fs_decode(path):
"""Decode a filesystem path using the proper filesystem encoding.
:param path: The filesystem path to decode from bytes or string
:return: The filesystem path, decoded with the determined encoding
:rtype: Text
"""
path = _get_path(path)
if path is None:
raise TypeError("expected a valid path to decode")
if isinstance(path, bytes):
import array
indexes = _invalid_utf8_indexes(array.array(str("B"), path))
if indexes and os.name == "nt":
return path.decode(_fs_encoding, "surrogateescape")
return path.decode(_fs_encoding, _fs_decode_errors)
return path
_fs_encoding = "utf-8"
_fs_decode_errors = "surrogateescape"
if sys.platform.startswith("win"):
_fs_error_fn = None
_fs_encode_errors = "surrogatepass"
else:
if sys.version_info >= (3, 3):
_fs_encoding = sys.getfilesystemencoding()
if not _fs_encoding:
_fs_encoding = sys.getdefaultencoding()
alt_strategy = "surrogateescape"
_fs_error_fn = getattr(sys, "getfilesystemencodeerrors", None)
_fs_encode_errors = _fs_error_fn() if _fs_error_fn else alt_strategy
_fs_decode_errors = _fs_error_fn() if _fs_error_fn else _fs_decode_errors
_byte = chr if sys.version_info < (3,) else lambda i: bytes([i])
def to_native_string(string):
from .misc import to_text, to_bytes
return to_text(string)
+5 -4
View File
@@ -1,18 +1,19 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import io
import os
import stat
import sys
import typing
from contextlib import closing, contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile
from urllib import request
from .compat import IS_TYPE_CHECKING, NamedTemporaryFile, Path
from .path import is_file_url, is_valid_url, path_to_url, url_to_path
if IS_TYPE_CHECKING:
if typing.TYPE_CHECKING:
from typing import (
Any,
Bytes,
-6
View File
@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
from .compat import IS_TYPE_CHECKING
MYPY_RUNNING = IS_TYPE_CHECKING
+37 -81
View File
@@ -11,29 +11,21 @@ import os
import subprocess
import sys
import threading
import typing
import warnings
from collections import OrderedDict
from functools import partial
from itertools import islice, tee
from weakref import WeakKeyDictionary
from queue import Empty, Queue
from typing import Iterable
from .cmdparse import Script
from .compat import (
Iterable,
Path,
StringIO,
TimeoutError,
_fs_decode_errors,
_fs_encode_errors,
fs_str,
is_bytes,
partialmethod,
to_native_string,
)
from .contextmanagers import spinner as spinner
from .environment import MYPY_RUNNING
from .termcolors import ANSI_REMOVAL_RE, colorize
_fs_encode_errors = "surrogatepass"
if os.name != "nt":
@@ -62,7 +54,7 @@ __all__ = [
]
if MYPY_RUNNING:
if typing.TYPE_CHECKING:
from typing import Any, Dict, Generator, IO, List, Optional, Text, Tuple, Union
from .spin import VistirSpinner
@@ -145,6 +137,11 @@ def dedup(iterable):
# type: (Iterable) -> Iterable
"""Deduplicate an iterable object like iter(set(iterable)) but order-
preserved."""
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Use instead: sorted(iter(dict.fromkeys(iterable)))'),
DeprecationWarning, stacklevel=2)
return iter(OrderedDict.fromkeys(iterable))
@@ -381,7 +378,7 @@ class SubprocessStreamWrapper(object):
if self.display_line:
if new_line != self.display_line:
self.display_line_loops_displayed = 0
new_line = fs_str("{}".format(new_line))
new_line = "{}".format(new_line)
if len(new_line) > self.display_line_max_len:
new_line = "{}...".format(new_line[: self.display_line_max_len])
self.display_line = new_line
@@ -447,9 +444,7 @@ class SubprocessStreamWrapper(object):
line, "stderr", spinner=spinner, stdout_allowed=stdout_allowed
)
if spinner:
spinner.text = to_native_string(
"{} {}".format(spinner.text, self.display_line)
)
spinner.text = "{} {}".format(spinner.text, self.display_line)
self.out = self.out.strip()
self.err = self.err.strip()
@@ -486,11 +481,11 @@ def _handle_nonblocking_subprocess(c, spinner=None):
c.wait()
if spinner:
if c.returncode != 0:
spinner.fail(to_native_string("Failed...cleaning up..."))
spinner.fail("Failed...cleaning up...")
elif c.returncode == 0 and not os.name == "nt":
spinner.ok(to_native_string("✔ Complete"))
spinner.ok("✔ Complete")
else:
spinner.ok(to_native_string("Complete"))
spinner.ok("Complete")
return c
@@ -524,7 +519,7 @@ def _create_subprocess(
formatted_tb = "".join(traceback.format_exception(*sys.exc_info()))
sys.stderr.write(
"Error while executing command %s:" % to_native_string(" ".join(cmd._parts))
"Error while executing command %s:" % " ".join(cmd._parts)
)
sys.stderr.write(formatted_tb)
raise exc
@@ -600,7 +595,7 @@ def run(
if env:
_env.update(env)
_env = {k: fs_str(v) for k, v in _env.items()}
_env = {k: v for k, v in _env.items()}
if not spinner_name:
spinner_name = "bouncingBar"
@@ -647,7 +642,10 @@ def load_path(python):
'/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7/site-packages',
'/home/user/git/requirementslib/src']
"""
warnings.warn(
'This function is deprecated and will be removed in version 0.8.',
DeprecationWarning, stacklevel=2)
from pathlib import Path
python = Path(python).as_posix()
out, err = run(
[python, "-c", "import json, sys; print(json.dumps(sys.path))"], nospin=True
@@ -684,7 +682,7 @@ def partialclass(cls, *args, **kwargs):
>>> new_source.__dict__
{'url': 'https://pypi.org/simple', 'verify_ssl': True, 'name': 'pypi'}
"""
from functools import partialmethod
name_attrs = [
n
for n in (getattr(cls, name, str(cls)) for name in ("__name__", "__qualname__"))
@@ -787,7 +785,10 @@ def divide(n, iterable):
:return: a list of new iterables derived from the original iterable
:rtype: list
"""
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Use instead: more_itertools.divide(n, iterable)))'),
DeprecationWarning, stacklevel=2)
seq = tuple(iterable)
q, r = divmod(len(seq), n)
@@ -809,6 +810,11 @@ def take(n, iterable):
from https://github.com/erikrose/more-itertools/blob/master/more_itertools/recipes.py
"""
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Use instead: list(islice(iterable, n))'),
DeprecationWarning, stacklevel=2)
return list(islice(iterable, n))
@@ -820,6 +826,10 @@ def chunked(n, iterable):
from https://github.com/erikrose/more-itertools/blob/master/more_itertools/more.py
"""
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Use instead: more_itertools.chunked(iterable, n)))'),
DeprecationWarning, stacklevel=2)
return iter(partial(take, n, iter(iterable)), [])
@@ -893,7 +903,6 @@ def decode_for_output(output, target_stream=None, translation_map=None):
try:
output = _encode(output, encoding=encoding, translation_map=translation_map)
except (UnicodeDecodeError, UnicodeEncodeError):
output = to_native_string(output)
output = _encode(
output, encoding=encoding, errors="replace", translation_map=translation_map
)
@@ -1205,56 +1214,3 @@ def _can_use_color(stream=None, color=None):
stream = sys.stdin
return _isatty(stream)
return bool(color)
def echo(text, fg=None, bg=None, style=None, file=None, err=False, color=None):
"""Write the given text to the provided stream or **sys.stdout** by
default.
Provides optional foreground and background colors from the ansi defaults:
**grey**, **red**, **green**, **yellow**, **blue**, **magenta**, **cyan**
or **white**.
Available styles include **bold**, **dark**, **underline**, **blink**, **reverse**,
**concealed**
:param str text: Text to write
:param str fg: Foreground color to use (default: None)
:param str bg: Foreground color to use (default: None)
:param str style: Style to use (default: None)
:param stream file: File to write to (default: None)
:param bool color: Whether to force color (i.e. ANSI codes are in the text)
"""
if file and not hasattr(file, "write"):
raise TypeError("Expected a writable stream, received {!r}".format(file))
if not file:
if err:
file = _text_stderr()
else:
file = _text_stdout()
if text and not isinstance(text, (str, bytes, bytearray)):
text = str(text)
text = "" if not text else text
if isinstance(text, str):
text += "\n"
else:
text += b"\n"
if text and is_bytes(text):
buffer = _get_binary_buffer(file)
if buffer is not None:
file.flush()
buffer.write(text)
buffer.flush()
return
if text and not is_bytes(text):
can_use_color = _can_use_color(file, color=color)
if any([fg, bg, style]):
text = colorize(text, fg=fg, bg=bg, attrs=style)
if not can_use_color or (os.name == "nt" and not _wrap_for_color):
text = ANSI_REMOVAL_RE.sub("", text)
elif os.name == "nt" and _wrap_for_color and not _is_wrapped_for_color(file):
file = _wrap_for_color(file, color=color)
if text:
file.write(text)
file.flush()
+23 -40
View File
@@ -10,43 +10,28 @@ import posixpath
import shutil
import stat
import sys
import typing
import time
import unicodedata
import warnings
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Optional, Callable
from urllib import parse as urllib_parse
from urllib import request as urllib_request
from .compat import (
IS_TYPE_CHECKING,
FileNotFoundError,
Path,
PermissionError,
ResourceWarning,
TemporaryDirectory,
_fs_encoding,
NamedTemporaryFile,
finalize,
fs_decode,
fs_encode,
)
from urllib.parse import quote
# fmt: off
from urllib.parse import quote_from_bytes as quote
# fmt: on
if IS_TYPE_CHECKING:
if typing.TYPE_CHECKING:
from types import TracebackType
from typing import (
Any,
AnyStr,
ByteString,
Callable,
Generator,
Iterator,
List,
Optional,
Text,
Tuple,
Type,
@@ -56,6 +41,7 @@ if IS_TYPE_CHECKING:
TPath = os.PathLike
TFunc = Callable[..., Any]
__all__ = [
"check_for_unc_path",
"get_converted_relative_path",
@@ -188,13 +174,13 @@ def path_to_url(path):
# XXX: This enables us to handle half-surrogates that were never
# XXX: actually part of a surrogate pair, but were just incidentally
# XXX: passed in as a piece of a filename
quoted_path = quote(fs_encode(path))
return fs_decode("file:///{}:{}".format(drive, quoted_path))
quoted_path = quote(path, errors="backslashreplace")
return "file:///{}:{}".format(drive, quoted_path)
# XXX: This is also here to help deal with incidental dangling surrogates
# XXX: on linux, by making sure they are preserved during encoding so that
# XXX: we can urlencode the backslash correctly
bytes_path = to_bytes(normalized_path, errors="backslashreplace")
return fs_decode("file://{}".format(quote(bytes_path)))
# bytes_path = to_bytes(normalized_path, errors="backslashreplace")
return "file://{}".format(quote(path, errors="backslashreplace"))
def url_to_path(url):
@@ -248,8 +234,6 @@ def is_readonly_path(fn):
Permissions check is `bool(path.stat & stat.S_IREAD)` or `not
os.access(path, os.W_OK)`
"""
fn = fs_decode(fs_encode(fn))
if os.path.exists(fn):
file_stat = os.stat(fn).st_mode
return not bool(file_stat & stat.S_IWRITE) or not os.access(fn, os.W_OK)
@@ -257,6 +241,10 @@ def is_readonly_path(fn):
def mkdir_p(newdir, mode=0o777):
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Use os.makedirs instead'), DeprecationWarning, stacklevel=2)
# This exists in shutil already
# type: (TPath, int) -> None
"""Recursively creates the target directory and all of its parents if they
do not already exist. Fails silently if they do.
@@ -264,12 +252,11 @@ def mkdir_p(newdir, mode=0o777):
:param str newdir: The directory path to ensure
:raises: OSError if a file is encountered along the way
"""
newdir = fs_decode(fs_encode(newdir))
if os.path.exists(newdir):
if not os.path.isdir(newdir):
raise OSError(
"a file with the same name as the desired dir, '{}', already exists.".format(
fs_decode(newdir)
newdir
)
)
return None
@@ -280,7 +267,10 @@ def ensure_mkdir_p(mode=0o777):
# type: (int) -> Callable[Callable[..., Any], Callable[..., Any]]
"""Decorator to ensure `mkdir_p` is called to the function's return
value."""
warnings.warn('This function is deprecated and will be removed in version 0.8.',
DeprecationWarning, stacklevel=2)
# This exists in shutil already
def decorator(f):
# type: (Callable[..., Any]) -> Callable[..., Any]
@functools.wraps(f)
@@ -289,9 +279,7 @@ def ensure_mkdir_p(mode=0o777):
path = f(*args, **kwargs)
mkdir_p(path, mode=mode)
return path
return decorated
return decorator
@@ -346,16 +334,13 @@ def _find_icacls_exe():
return None
def set_write_bit(fn):
# type: (str) -> None
def set_write_bit(fn: str) -> None:
"""Set read-write permissions for the current user on the target path. Fail
silently if the path doesn't exist.
:param str fn: The target filename or path
:return: None
"""
fn = fs_decode(fs_encode(fn))
if not os.path.exists(fn):
return
file_stat = os.stat(fn).st_mode
@@ -411,8 +396,9 @@ def set_write_bit(fn):
set_write_bit(file_)
def rmtree(directory, ignore_errors=False, onerror=None):
# type: (str, bool, Optional[Callable]) -> None
def rmtree(directory: str,
ignore_errors: bool = False,
onerror: Optional[Callable] = None) -> None :
"""Stand-in for :func:`~shutil.rmtree` with additional error-handling.
This version of `rmtree` handles read-only paths, especially in the case of index
@@ -427,7 +413,6 @@ def rmtree(directory, ignore_errors=False, onerror=None):
Setting `ignore_errors=True` may cause this to silently fail to delete the path
"""
directory = fs_decode(fs_encode(directory))
if onerror is None:
onerror = handle_remove_readonly
try:
@@ -485,8 +470,6 @@ def handle_remove_readonly(func, path, exc):
This function will call check :func:`is_readonly_path` before attempting to call
:func:`set_write_bit` on the target path and try again.
"""
# Check for read-only attribute
from .compat import ResourceWarning, FileNotFoundError, PermissionError
PERM_ERRORS = (errno.EACCES, errno.EPERM, errno.ENOENT)
default_warning_message = "Unable to remove file due to permissions restriction: {!r}"
+14 -10
View File
@@ -1,22 +1,22 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
import functools
import os
import signal
import sys
import threading
import time
import typing
import warnings
from io import StringIO
import pipenv.vendor.colorama as colorama
from .compat import IS_TYPE_CHECKING, to_native_string
from .cursor import hide_cursor, show_cursor
from .misc import decode_for_output, to_text
from .termcolors import COLOR_MAP, COLORS, DISABLE_COLORS, colored
if IS_TYPE_CHECKING:
if typing.TYPE_CHECKING:
from typing import (
Any,
Callable,
@@ -38,17 +38,17 @@ if IS_TYPE_CHECKING:
try:
import pipenv.vendor.yaspin as yaspin
except ImportError: # pragma: no cover
yaspin = None
Spinners = None
SpinBase = None
else: # pragma: no cover
import yaspin.spinners
import yaspin.core
Spinners = yaspin.spinners.Spinners
SpinBase = yaspin.core.Yaspin
except ImportError: # pragma: no cover
yaspin = None
Spinners = None
SpinBase = None
if os.name == "nt": # pragma: no cover
def handler(signum, frame, spinner):
@@ -88,7 +88,7 @@ class DummySpinner(object):
# type: (str, Any) -> None
if DISABLE_COLORS:
colorama.init()
self.text = to_native_string(decode_output(text)) if text else ""
self.text = decode_output(text) if text else ""
self.stdout = kwargs.get("stdout", sys.stdout)
self.stderr = kwargs.get("stderr", sys.stderr)
self.out_buff = StringIO()
@@ -484,6 +484,10 @@ class VistirSpinner(SpinBase):
def create_spinner(*args, **kwargs):
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Consider using yaspin directly instead, or user rich.status'),
DeprecationWarning, stacklevel=2)
# type: (Any, Any) -> Union[DummySpinner, VistirSpinner]
nospin = kwargs.pop("nospin", False)
use_yaspin = kwargs.pop("use_yaspin", not nospin)
+1 -3
View File
@@ -1,12 +1,10 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import os
import re
import pipenv.vendor.colorama as colorama
from .compat import to_native_string
from .misc import to_text as to_native_string
DISABLE_COLORS = os.getenv("CI", False) or os.getenv(
"ANSI_COLORS_DISABLED", os.getenv("VISTIR_DISABLE_COLORS", False)
+1 -1
View File
@@ -248,7 +248,7 @@ def test_pipenv_three(pipenv_instance_pypi):
with pipenv_instance_pypi() as p:
c = p.pipenv('--three')
assert c.returncode == 0
assert 'Successfully created virtual environment' in c.stderr
assert 'Successfully created virtual environment' in c.stdout
@pytest.mark.outdated
+1 -1
View File
@@ -135,7 +135,7 @@ hello = "echo $HELLO_VAR"
else:
c = p.pipenv('run hello')
assert c.returncode == 0
assert 'WORLD\n' == c.stdout
assert 'WORLD\n' in c.stdout
@pytest.mark.run