Merge pull request #5522 from pypa/bump-requirementslib

More vendor bumps
This commit is contained in:
Matt Davis
2022-12-06 20:34:25 -05:00
committed by GitHub
14 changed files with 205 additions and 613 deletions
+3
View File
@@ -0,0 +1,3 @@
* Bump version of requirementslib to 2.2.1
* Bump version of vistir to 0.7.5
* Bump version of colorama to 0.4.6
+16 -11
View File
@@ -38,21 +38,26 @@ except Exception:
if "urllib3" in sys.modules:
del sys.modules["urllib3"]
from pipenv.vendor.vistir.misc import get_text_stream
stdout = get_text_stream("stdout")
stderr = get_text_stream("stderr")
if os.name == "nt":
from pipenv.vendor.vistir.misc import _can_use_color, _wrap_for_color
from pipenv.vendor import colorama
if _can_use_color(stdout):
stdout = _wrap_for_color(stdout)
if _can_use_color(stderr):
stderr = _wrap_for_color(stderr)
# Backward compatability with vistir
no_color = False
for item in ("ANSI_COLORS_DISABLED", "VISTIR_DISABLE_COLORS", "CI"):
warnings.warn(
(
f"Please do not use {item}, as it will be removed in future versions."
"\nUse NO_COLOR instead."
),
DeprecationWarning,
stacklevel=2,
)
if os.getenv(item):
no_color = True
sys.stdout = stdout
sys.stderr = stderr
if not os.getenv("NO_COLOR") or no_color:
colorama.just_fix_windows_console()
from . import resolver # noqa
from .cli import cli
+3 -2
View File
@@ -1,6 +1,7 @@
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
from .initialise import init, deinit, reinit, colorama_text
from .initialise import init, deinit, reinit, colorama_text, just_fix_windows_console
from .ansi import Fore, Back, Style, Cursor
from .ansitowin32 import AnsiToWin32
__version__ = '0.4.4'
__version__ = '0.4.6'
+23 -4
View File
@@ -4,7 +4,7 @@ import sys
import os
from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style, BEL
from .winterm import WinTerm, WinColor, WinStyle
from .winterm import enable_vt_processing, WinTerm, WinColor, WinStyle
from .win32 import windll, winapi_test
@@ -37,6 +37,12 @@ class StreamWrapper(object):
def __exit__(self, *args, **kwargs):
return self.__wrapped.__exit__(*args, **kwargs)
def __setstate__(self, state):
self.__dict__ = state
def __getstate__(self):
return self.__dict__
def write(self, text):
self.__convertor.write(text)
@@ -57,7 +63,9 @@ class StreamWrapper(object):
stream = self.__wrapped
try:
return stream.closed
except AttributeError:
# AttributeError in the case that the stream doesn't support being closed
# ValueError for the case that the stream has already been detached when atexit runs
except (AttributeError, ValueError):
return True
@@ -86,15 +94,22 @@ class AnsiToWin32(object):
# (e.g. Cygwin Terminal). In this case it's up to the terminal
# to support the ANSI codes.
conversion_supported = on_windows and winapi_test()
try:
fd = wrapped.fileno()
except Exception:
fd = -1
system_has_native_ansi = not on_windows or enable_vt_processing(fd)
have_tty = not self.stream.closed and self.stream.isatty()
need_conversion = conversion_supported and not system_has_native_ansi
# should we strip ANSI sequences from our output?
if strip is None:
strip = conversion_supported or (not self.stream.closed and not self.stream.isatty())
strip = need_conversion or not have_tty
self.strip = strip
# should we should convert ANSI sequences into win32 calls?
if convert is None:
convert = conversion_supported and not self.stream.closed and self.stream.isatty()
convert = need_conversion and have_tty
self.convert = convert
# dict of ansi codes to win32 functions and parameters
@@ -256,3 +271,7 @@ class AnsiToWin32(object):
if params[0] in '02':
winterm.set_title(params[1])
return text
def flush(self):
self.wrapped.flush()
+46 -5
View File
@@ -6,13 +6,27 @@ import sys
from .ansitowin32 import AnsiToWin32
orig_stdout = None
orig_stderr = None
def _wipe_internal_state_for_tests():
global orig_stdout, orig_stderr
orig_stdout = None
orig_stderr = None
wrapped_stdout = None
wrapped_stderr = None
global wrapped_stdout, wrapped_stderr
wrapped_stdout = None
wrapped_stderr = None
atexit_done = False
global atexit_done
atexit_done = False
global fixed_windows_console
fixed_windows_console = False
try:
# no-op if it wasn't registered
atexit.unregister(reset_all)
except AttributeError:
# python 2: no atexit.unregister. Oh well, we did our best.
pass
def reset_all():
@@ -55,6 +69,29 @@ def deinit():
sys.stderr = orig_stderr
def just_fix_windows_console():
global fixed_windows_console
if sys.platform != "win32":
return
if fixed_windows_console:
return
if wrapped_stdout is not None or wrapped_stderr is not None:
# Someone already ran init() and it did stuff, so we won't second-guess them
return
# On newer versions of Windows, AnsiToWin32.__init__ will implicitly enable the
# native ANSI support in the console as a side-effect. We only need to actually
# replace sys.stdout/stderr if we're in the old-style conversion mode.
new_stdout = AnsiToWin32(sys.stdout, convert=None, strip=None, autoreset=False)
if new_stdout.convert:
sys.stdout = new_stdout
new_stderr = AnsiToWin32(sys.stderr, convert=None, strip=None, autoreset=False)
if new_stderr.convert:
sys.stderr = new_stderr
fixed_windows_console = True
@contextlib.contextmanager
def colorama_text(*args, **kwargs):
init(*args, **kwargs)
@@ -78,3 +115,7 @@ def wrap_stream(stream, convert, strip, autoreset, wrap):
if wrapper.should_wrap():
stream = wrapper.stream
return stream
# Use this for initial setup as well, to reduce code duplication
_wipe_internal_state_for_tests()
+28
View File
@@ -4,6 +4,8 @@
STDOUT = -11
STDERR = -12
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
try:
import ctypes
from ctypes import LibraryLoader
@@ -89,6 +91,20 @@ else:
]
_SetConsoleTitleW.restype = wintypes.BOOL
_GetConsoleMode = windll.kernel32.GetConsoleMode
_GetConsoleMode.argtypes = [
wintypes.HANDLE,
POINTER(wintypes.DWORD)
]
_GetConsoleMode.restype = wintypes.BOOL
_SetConsoleMode = windll.kernel32.SetConsoleMode
_SetConsoleMode.argtypes = [
wintypes.HANDLE,
wintypes.DWORD
]
_SetConsoleMode.restype = wintypes.BOOL
def _winapi_test(handle):
csbi = CONSOLE_SCREEN_BUFFER_INFO()
success = _GetConsoleScreenBufferInfo(
@@ -150,3 +166,15 @@ else:
def SetConsoleTitle(title):
return _SetConsoleTitleW(title)
def GetConsoleMode(handle):
mode = wintypes.DWORD()
success = _GetConsoleMode(handle, byref(mode))
if not success:
raise ctypes.WinError()
return mode.value
def SetConsoleMode(handle, mode):
success = _SetConsoleMode(handle, mode)
if not success:
raise ctypes.WinError()
+27 -1
View File
@@ -1,7 +1,13 @@
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
from . import win32
try:
from msvcrt import get_osfhandle
except ImportError:
def get_osfhandle(_):
raise OSError("This isn't windows!")
from . import win32
# from wincon.h
class WinColor(object):
BLACK = 0
@@ -167,3 +173,23 @@ class WinTerm(object):
def set_title(self, title):
win32.SetConsoleTitle(title)
def enable_vt_processing(fd):
if win32.windll is None or not win32.winapi_test():
return False
try:
handle = get_osfhandle(fd)
mode = win32.GetConsoleMode(handle)
win32.SetConsoleMode(
handle,
mode | win32.ENABLE_VIRTUAL_TERMINAL_PROCESSING,
)
mode = win32.GetConsoleMode(handle)
if mode & win32.ENABLE_VIRTUAL_TERMINAL_PROCESSING:
return True
# Can get TypeError in testsuite where 'fd' is a Mock()
except (OSError, TypeError):
return False
+1 -1
View File
@@ -5,7 +5,7 @@ from .models.lockfile import Lockfile
from .models.pipfile import Pipfile
from .models.requirements import Requirement
__version__ = "2.2.0"
__version__ = "2.2.1"
logger = logging.getLogger(__name__)
+9 -24
View File
@@ -4,6 +4,7 @@ import configparser
import contextlib
import os
import shutil
import subprocess as sp
import sys
from collections.abc import Iterable, Mapping
from contextlib import ExitStack
@@ -30,7 +31,6 @@ from pipenv.patched.pip._vendor.pkg_resources import (
)
from pipenv.patched.pip._vendor.platformdirs import user_cache_dir
from pipenv.vendor.vistir.contextmanagers import cd, temp_path
from pipenv.vendor.vistir.misc import run
from pipenv.vendor.vistir.path import create_tracked_tempdir, rmtree
from ..environment import MYPY_RUNNING
@@ -94,16 +94,8 @@ def pep517_subprocess_runner(cmd, cwd=None, extra_environ=None):
if extra_environ:
env.update(extra_environ)
run(
cmd,
cwd=cwd,
env=env,
block=True,
combine_stderr=True,
return_object=False,
write_to_stdout=False,
nospin=True,
)
cmd_as_str = " ".join(cmd)
sp.run(cmd_as_str, cwd=cwd, env=env, stdout=sp.PIPE, stderr=sp.STDOUT, shell=True)
class BuildEnv(envbuild.BuildEnvironment):
@@ -117,14 +109,8 @@ class BuildEnv(envbuild.BuildEnvironment):
"--prefix",
self.path,
] + list(reqs)
run(
cmd,
block=True,
combine_stderr=True,
return_object=False,
write_to_stdout=False,
nospin=True,
)
sp.run(cmd, shell=True, stderr=sp.PIPE, stdout=sp.PIPE)
class HookCaller(wrappers.Pep517HookCaller):
@@ -892,13 +878,12 @@ def run_setup(script_path, egg_base=None):
# We couldn't import everything needed to run setup
except Exception:
python = os.environ.get("PIP_PYTHON_PATH", sys.executable)
out, _ = run(
sp.run(
[python, "setup.py"] + args,
cwd=target_cwd,
block=True,
combine_stderr=False,
return_object=False,
nospin=True,
stdout=sp.PIPE,
stderr=sp.PIPE,
)
finally:
_setup_stop_after = None
+3 -3
View File
@@ -2,7 +2,7 @@ attrs==22.1.0
cerberus==1.3.4
click-didyoumean==0.0.3
click==8.0.3
colorama==0.4.4
colorama==0.4.6
dparse==0.6.2
markupsafe==2.0.1
pexpect==4.8.0
@@ -12,9 +12,9 @@ ptyprocess==0.7.0
pyparsing==3.0.9
python-dotenv==0.19.0
pythonfinder==1.3.1
requirementslib==2.2.0
requirementslib==2.2.1
ruamel.yaml==0.17.21
shellingham==1.5.0
toml==0.10.2
tomlkit==0.9.2
vistir==0.7.4
vistir==0.7.5
+37 -61
View File
@@ -1,66 +1,42 @@
# -*- coding=utf-8 -*-
import importlib
from .contextmanagers import (
atomic_open_for_write,
cd,
open_file,
replaced_stream,
replaced_streams,
spinner,
temp_environ,
temp_path,
)
from .cursor import hide_cursor, show_cursor
from .misc import (
StreamWrapper,
chunked,
decode_for_output,
divide,
get_wrapped_stream,
load_path,
partialclass,
run,
shell_escape,
take,
to_bytes,
to_text,
)
from .path import create_tracked_tempdir, create_tracked_tempfile, mkdir_p, rmtree
from .spin import create_spinner
__newpaths = {
'create_spinner': 'vistir.spin',
'cd': 'vistir.contextmanagers',
'atomic_open_for_write': 'vistir.contextmanagers',
'open_file': 'vistir.contextmanagers',
'replaced_stream': 'vistir.contextmanagers',
'replaced_streams': 'vistir.contextmanagers',
'spinner': 'vistir.contextmanagers',
'temp_environ': 'vistir.contextmanagers',
'temp_path': 'vistir.contextmanagers',
'hide_cursor': 'vistir.cursor',
'show_cursor': 'vistir.cursor',
'StreamWrapper': 'vistir.misc',
'chunked':'vistir.misc',
'decode_for_output': 'vistir.misc',
'divide': 'vistir.misc',
'get_wrapped_stream': 'vistir.misc',
'load_path': 'vistir.misc',
'partialclass': 'vistir.misc',
'run': 'vistir.misc',
'shell_escape': 'vistir.misc',
'take': 'vistir.misc',
'to_bytes': 'vistir.misc',
'to_text': 'vistir.misc',
'create_tracked_tempdir': 'vistir.path',
'create_tracked_tempfile': 'vistir.path',
'mkdir_p': 'vistir.path',
'rmtree': 'vistir.path',
}
__version__ = "0.7.4"
from warnings import warn
def __getattr__(name):
warn((f"Importing {name} directly from vistir is deprecated.\nUse 'from {__newpaths[name]} import {name}' instead.\n"
"This import path will be removed in vistir 0.8"),
DeprecationWarning)
return getattr(importlib.import_module(__newpaths[name]), name)
__all__ = [
"shell_escape",
"load_path",
"run",
"partialclass",
"temp_environ",
"temp_path",
"cd",
"atomic_open_for_write",
"open_file",
"rmtree",
"mkdir_p",
"TemporaryDirectory",
"NamedTemporaryFile",
"partialmethod",
"spinner",
"create_spinner",
"create_tracked_tempdir",
"create_tracked_tempfile",
"decode_for_output",
"to_text",
"to_bytes",
"take",
"chunked",
"divide",
"StringIO",
"get_wrapped_stream",
"StreamWrapper",
"replaced_stream",
"replaced_streams",
"show_cursor",
"hide_cursor",
]
__version__ = "0.7.5"
-496
View File
@@ -1,496 +0,0 @@
# -*- coding=utf-8 -*-
import functools
import os
import signal
import sys
import threading
import time
import typing
import warnings
from io import StringIO
import pipenv.vendor.colorama as colorama
from .cursor import hide_cursor, show_cursor
from .misc import decode_for_output, to_text
from .termcolors import COLOR_MAP, COLORS, DISABLE_COLORS, colored
if typing.TYPE_CHECKING:
from typing import (
Any,
Callable,
ContextManager,
Dict,
IO,
Optional,
Text,
Type,
TypeVar,
Union,
)
TSignalMap = Dict[
Type[signal.SIGINT],
Callable[..., int, str, Union["DummySpinner", "VistirSpinner"]],
]
_T = TypeVar("_T", covariant=True)
try:
import yaspin
import yaspin.spinners
import yaspin.core
Spinners = yaspin.spinners.Spinners
SpinBase = yaspin.core.Yaspin
except ImportError: # pragma: no cover
yaspin = None
Spinners = None
SpinBase = None
if os.name == "nt": # pragma: no cover
def handler(signum, frame, spinner):
"""Signal handler, used to gracefully shut down the ``spinner`` instance
when specified signal is received by the process running the ``spinner``.
``signum`` and ``frame`` are mandatory arguments. Check ``signal.signal``
function for more details.
"""
spinner.fail()
spinner.stop()
else: # pragma: no cover
def handler(signum, frame, spinner):
"""Signal handler, used to gracefully shut down the ``spinner`` instance
when specified signal is received by the process running the ``spinner``.
``signum`` and ``frame`` are mandatory arguments. Check ``signal.signal``
function for more details.
"""
spinner.red.fail("")
spinner.stop()
CLEAR_LINE = chr(27) + "[K"
TRANSLATION_MAP = {10004: u"OK", 10008: u"x"}
decode_output = functools.partial(decode_for_output, translation_map=TRANSLATION_MAP)
class DummySpinner(object):
def __init__(self, text="", **kwargs):
# type: (str, Any) -> None
if DISABLE_COLORS:
colorama.init()
self.text = decode_output(text) if text else ""
self.stdout = kwargs.get("stdout", sys.stdout)
self.stderr = kwargs.get("stderr", sys.stderr)
self.out_buff = StringIO()
self.write_to_stdout = kwargs.get("write_to_stdout", False)
super(DummySpinner, self).__init__()
def __enter__(self):
if self.text and self.text != "None":
if self.write_to_stdout:
self.write(self.text)
return self
def __exit__(self, exc_type, exc_val, tb):
if exc_type:
import traceback
formatted_tb = traceback.format_exception(exc_type, exc_val, tb)
self.write_err("".join(formatted_tb))
self._close_output_buffer()
return False
def __getattr__(self, k): # pragma: no cover
try:
retval = super(DummySpinner, self).__getattribute__(k)
except AttributeError:
if k in COLOR_MAP.keys() or k.upper() in COLORS:
return self
raise
else:
return retval
def _close_output_buffer(self):
if self.out_buff and not self.out_buff.closed:
try:
self.out_buff.close()
except Exception:
pass
def fail(self, exitcode=1, text="FAIL"):
# type: (int, str) -> None
if text is not None and text != "None":
if self.write_to_stdout:
self.write(text)
else:
self.write_err(text)
self._close_output_buffer()
def ok(self, text="OK"):
# type: (str) -> int
if text is not None and text != "None":
if self.write_to_stdout:
self.write(text)
else:
self.write_err(text)
self._close_output_buffer()
return 0
def hide_and_write(self, text, target=None):
# type: (str, Optional[str]) -> None
if not target:
target = self.stdout
if text is None or isinstance(text, str) and text == "None":
pass
target.write(decode_output(u"\r", target_stream=target))
self._hide_cursor(target=target)
target.write(decode_output(u"{0}\n".format(text), target_stream=target))
target.write(CLEAR_LINE)
self._show_cursor(target=target)
def write(self, text=None):
# type: (Optional[str]) -> None
if not self.write_to_stdout:
return self.write_err(text)
if text is None or isinstance(text, str) and text == "None":
pass
if not self.stdout.closed:
stdout = self.stdout
else:
stdout = sys.stdout
stdout.write(decode_output(u"\r", target_stream=stdout))
text = to_text(text)
line = decode_output(u"{0}\n".format(text), target_stream=stdout)
stdout.write(line)
stdout.write(CLEAR_LINE)
def write_err(self, text=None):
# type: (Optional[str]) -> None
if text is None or isinstance(text, str) and text == "None":
pass
text = to_text(text)
if not self.stderr.closed:
stderr = self.stderr
else:
if sys.stderr.closed:
print(text)
return
stderr = sys.stderr
stderr.write(decode_output(u"\r", target_stream=stderr))
line = decode_output(u"{0}\n".format(text), target_stream=stderr)
stderr.write(line)
stderr.write(CLEAR_LINE)
@staticmethod
def _hide_cursor(target=None):
# type: (Optional[IO]) -> None
pass
@staticmethod
def _show_cursor(target=None):
# type: (Optional[IO]) -> None
pass
if SpinBase is None:
SpinBase = DummySpinner
class VistirSpinner(SpinBase):
"A spinner class for handling spinners on windows and posix."
def __init__(self, *args, **kwargs):
# type: (Any, Any)
"""
Get a spinner object or a dummy spinner to wrap a context.
Keyword Arguments:
:param str spinner_name: A spinner type e.g. "dots" or "bouncingBar" (default: {"bouncingBar"})
:param str start_text: Text to start off the spinner with (default: {None})
:param dict handler_map: Handler map for signals to be handled gracefully (default: {None})
:param bool nospin: If true, use the dummy spinner (default: {False})
:param bool write_to_stdout: Writes to stdout if true, otherwise writes to stderr (default: True)
"""
self.handler = handler
colorama.init()
sigmap = {} # type: TSignalMap
if handler:
sigmap.update({signal.SIGINT: handler, signal.SIGTERM: handler})
handler_map = kwargs.pop("handler_map", {})
if os.name == "nt":
sigmap[signal.SIGBREAK] = handler
else:
sigmap[signal.SIGALRM] = handler
if handler_map:
sigmap.update(handler_map)
spinner_name = kwargs.pop("spinner_name", "bouncingBar")
start_text = kwargs.pop("start_text", None)
_text = kwargs.pop("text", "Running...")
kwargs["text"] = start_text if start_text is not None else _text
kwargs["sigmap"] = sigmap
kwargs["spinner"] = getattr(Spinners, spinner_name, "")
write_to_stdout = kwargs.pop("write_to_stdout", True)
self.stdout = kwargs.pop("stdout", sys.stdout)
self.stderr = kwargs.pop("stderr", sys.stderr)
self.out_buff = StringIO()
self.write_to_stdout = write_to_stdout
self.is_dummy = bool(yaspin is None)
self._stop_spin = None # type: Optional[threading.Event]
self._hide_spin = None # type: Optional[threading.Event]
self._spin_thread = None # type: Optional[threading.Thread]
super(VistirSpinner, self).__init__(*args, **kwargs)
if DISABLE_COLORS:
colorama.deinit()
def ok(self, text=u"OK", err=False):
# type: (str, bool) -> None
"""Set Ok (success) finalizer to a spinner."""
# Do not display spin text for ok state
self._text = None
_text = to_text(text) if text else u"OK"
err = err or not self.write_to_stdout
self._freeze(_text, err=err)
def fail(self, text=u"FAIL", err=False):
# type: (str, bool) -> None
"""Set fail finalizer to a spinner."""
# Do not display spin text for fail state
self._text = None
_text = text if text else u"FAIL"
err = err or not self.write_to_stdout
self._freeze(_text, err=err)
def hide_and_write(self, text, target=None):
# type: (str, Optional[str]) -> None
if not target:
target = self.stdout
if text is None or isinstance(text, str) and text == u"None":
pass
target.write(decode_output(u"\r"))
self._hide_cursor(target=target)
target.write(decode_output(u"{0}\n".format(text)))
target.write(CLEAR_LINE)
self._show_cursor(target=target)
def write(self, text): # pragma: no cover
# type: (str) -> None
if not self.write_to_stdout:
return self.write_err(text)
stdout = self.stdout
if self.stdout.closed:
stdout = sys.stdout
stdout.write(decode_output(u"\r", target_stream=stdout))
stdout.write(decode_output(CLEAR_LINE, target_stream=stdout))
if text is None:
text = ""
text = decode_output(u"{0}\n".format(text), target_stream=stdout)
stdout.write(text)
self.out_buff.write(text)
def write_err(self, text): # pragma: no cover
# type: (str) -> None
"""Write error text in the terminal without breaking the spinner."""
stderr = self.stderr
if self.stderr.closed:
stderr = sys.stderr
stderr.write(decode_output(u"\r", target_stream=stderr))
stderr.write(decode_output(CLEAR_LINE, target_stream=stderr))
if text is None:
text = ""
text = decode_output(u"{0}\n".format(text), target_stream=stderr)
self.stderr.write(text)
self.out_buff.write(decode_output(text, target_stream=self.out_buff))
def start(self):
# type: () -> None
if self._sigmap:
self._register_signal_handlers()
target = self.stdout if self.write_to_stdout else self.stderr
if target.isatty():
self._hide_cursor(target=target)
self._stop_spin = threading.Event()
self._hide_spin = threading.Event()
self._spin_thread = threading.Thread(target=self._spin)
self._spin_thread.start()
def stop(self):
# type: () -> None
if self._dfl_sigmap:
# Reset registered signal handlers to default ones
self._reset_signal_handlers()
if self._spin_thread:
self._stop_spin.set()
self._spin_thread.join()
target = self.stdout if self.write_to_stdout else self.stderr
if target.isatty():
target.write("\r")
if self.write_to_stdout:
self._clear_line()
else:
self._clear_err()
if target.isatty():
self._show_cursor(target=target)
self.out_buff.close()
def _freeze(self, final_text, err=False):
# type: (str, bool) -> None
"""Stop spinner, compose last frame and 'freeze' it."""
if not final_text:
final_text = ""
target = self.stderr if err else self.stdout
if target.closed:
target = sys.stderr if err else sys.stdout
text = to_text(final_text)
last_frame = self._compose_out(text, mode="last")
self._last_frame = decode_output(last_frame, target_stream=target)
# Should be stopped here, otherwise prints after
# self._freeze call will mess up the spinner
self.stop()
target.write(self._last_frame)
def _compose_color_func(self):
# type: () -> Callable[..., str]
fn = functools.partial(
colored, color=self._color, on_color=self._on_color, attrs=list(self._attrs)
)
return fn
def _compose_out(self, frame, mode=None):
# type: (str, Optional[str]) -> Text
# Ensure Unicode input
frame = to_text(frame)
if self._text is None:
self._text = u""
text = to_text(self._text)
if self._color_func is not None:
frame = self._color_func(frame)
if self._side == "right":
frame, text = text, frame
# Mode
frame = to_text(frame)
if not mode:
out = u"\r{0} {1}".format(frame, text)
else:
out = u"{0} {1}\n".format(frame, text)
return out
def _spin(self):
# type: () -> None
target = self.stdout if self.write_to_stdout else self.stderr
clear_fn = self._clear_line if self.write_to_stdout else self._clear_err
while not self._stop_spin.is_set():
if self._hide_spin.is_set():
# Wait a bit to avoid wasting cycles
time.sleep(self._interval)
continue
# Compose output
spin_phase = next(self._cycle)
out = self._compose_out(spin_phase)
out = decode_output(out, target)
# Write
target.write(out)
clear_fn()
target.flush()
# Wait
time.sleep(self._interval)
target.write("\b")
def _register_signal_handlers(self):
# type: () -> None
# SIGKILL cannot be caught or ignored, and the receiving
# process cannot perform any clean-up upon receiving this
# signal.
try:
if signal.SIGKILL in self._sigmap.keys():
raise ValueError(
"Trying to set handler for SIGKILL signal. "
"SIGKILL cannot be cought or ignored in POSIX systems."
)
except AttributeError:
pass
for sig, sig_handler in self._sigmap.items():
# A handler for a particular signal, once set, remains
# installed until it is explicitly reset. Store default
# signal handlers for subsequent reset at cleanup phase.
dfl_handler = signal.getsignal(sig)
self._dfl_sigmap[sig] = dfl_handler
# ``signal.SIG_DFL`` and ``signal.SIG_IGN`` are also valid
# signal handlers and are not callables.
if callable(sig_handler):
# ``signal.signal`` accepts handler function which is
# called with two arguments: signal number and the
# interrupted stack frame. ``functools.partial`` solves
# the problem of passing spinner instance into the handler
# function.
sig_handler = functools.partial(sig_handler, spinner=self)
signal.signal(sig, sig_handler)
def _reset_signal_handlers(self):
# type: () -> None
for sig, sig_handler in self._dfl_sigmap.items():
signal.signal(sig, sig_handler)
@staticmethod
def _hide_cursor(target=None):
# type: (Optional[IO]) -> None
if not target:
target = sys.stdout
hide_cursor(stream=target)
@staticmethod
def _show_cursor(target=None):
# type: (Optional[IO]) -> None
if not target:
target = sys.stdout
show_cursor(stream=target)
@staticmethod
def _clear_err():
# type: () -> None
sys.stderr.write(CLEAR_LINE)
@staticmethod
def _clear_line():
# type: () -> None
sys.stdout.write(CLEAR_LINE)
def create_spinner(*args, **kwargs):
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Consider using yaspin directly instead, or user rich.status'),
DeprecationWarning, stacklevel=2)
# type: (Any, Any) -> Union[DummySpinner, VistirSpinner]
nospin = kwargs.pop("nospin", False)
use_yaspin = kwargs.pop("use_yaspin", not nospin)
if nospin or not use_yaspin:
return DummySpinner(*args, **kwargs)
return VistirSpinner(*args, **kwargs)
+7
View File
@@ -312,6 +312,7 @@ def install(ctx, vendor_dir, package=None):
def post_install_cleanup(ctx, vendor_dir):
log("Removing unused modules and files ...")
remove_all(vendor_dir.glob("*.dist-info"))
remove_all(vendor_dir.glob("*.egg-info"))
@@ -321,8 +322,14 @@ def post_install_cleanup(ctx, vendor_dir):
drop_dir(vendor_dir / "shutil_backports")
drop_dir(vendor_dir / "cerberus" / "tests")
drop_dir(vendor_dir / "cerberus" / "benchmarks")
drop_dir(vendor_dir / "colorama" / "tests")
remove_all(vendor_dir.glob("toml.py"))
# this function is called twice hence try ... except ...
try:
(vendor_dir / "vistir" / "spin.py").unlink()
except FileNotFoundError:
pass
@invoke.task
+2 -5
View File
@@ -6,6 +6,7 @@ import os
import shlex
import shutil
import traceback
import subprocess as sp
import sys
import warnings
from pathlib import Path
@@ -21,7 +22,6 @@ from pipenv.exceptions import VirtualenvActivationException
from pipenv.utils.processes import subprocess_run
from pipenv.vendor import toml, tomlkit
from pipenv.vendor.vistir.contextmanagers import temp_environ
from pipenv.vendor.vistir.misc import run
from pipenv.vendor.vistir.path import (
create_tracked_tempdir, handle_remove_readonly
)
@@ -498,10 +498,7 @@ class VirtualEnv:
cmd = [
python, "-m", "virtualenv", self.path.absolute().as_posix()
]
c = run(
cmd, verbose=False, return_object=True, write_to_stdout=False,
combine_stderr=False, block=True, nospin=True,
)
c = sp.run(cmd)
assert c.returncode == 0
def activate(self):