vendor: bump vistir to version 0.7.4

This commit is contained in:
Oz Tiram
2022-11-14 00:47:35 +01:00
parent 74b64f8bae
commit eff7aa0933
10 changed files with 84 additions and 477 deletions
+1 -1
View File
@@ -19,6 +19,6 @@ shellingham==1.5.0
termcolor==1.1.0
toml==0.10.2
tomlkit==0.9.2
vistir==0.6.1
vistir==0.7.4
wheel==0.37.1
yaspin==2.0.0
+1 -10
View File
@@ -1,13 +1,5 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
from .compat import (
NamedTemporaryFile,
StringIO,
TemporaryDirectory,
partialmethod,
to_native_string,
)
from .contextmanagers import (
atomic_open_for_write,
cd,
@@ -36,7 +28,7 @@ from .misc import (
from .path import create_tracked_tempdir, create_tracked_tempfile, mkdir_p, rmtree
from .spin import create_spinner
__version__ = "0.6.1"
__version__ = "0.7.4"
__all__ = [
@@ -58,7 +50,6 @@ __all__ = [
"create_spinner",
"create_tracked_tempdir",
"create_tracked_tempfile",
"to_native_string",
"decode_for_output",
"to_text",
"to_bytes",
+2 -2
View File
@@ -44,6 +44,7 @@ import io
import os
import sys
import time
import typing
import zlib
from ctypes import (
POINTER,
@@ -65,7 +66,6 @@ from itertools import count
import msvcrt
from .compat import IS_TYPE_CHECKING
from .misc import StreamWrapper, run, to_text
try:
@@ -77,7 +77,7 @@ except ImportError:
pythonapi = None
if IS_TYPE_CHECKING:
if typing.TYPE_CHECKING:
from typing import Text
-320
View File
@@ -1,320 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import codecs
import errno
import os
import sys
import warnings
from tempfile import mkdtemp
__all__ = [
"Path",
"get_terminal_size",
"finalize",
"partialmethod",
"JSONDecodeError",
"FileNotFoundError",
"ResourceWarning",
"PermissionError",
"is_type_checking",
"IS_TYPE_CHECKING",
"IsADirectoryError",
"fs_str",
"lru_cache",
"TemporaryDirectory",
"NamedTemporaryFile",
"to_native_string",
"samefile",
"Mapping",
"Hashable",
"MutableMapping",
"Container",
"Iterator",
"KeysView",
"ItemsView",
"MappingView",
"Iterable",
"Set",
"Sequence",
"Sized",
"ValuesView",
"MutableSet",
"MutableSequence",
"Callable",
"fs_encode",
"fs_decode",
"_fs_encode_errors",
"_fs_decode_errors",
]
from pathlib import Path
from functools import lru_cache, partialmethod
from tempfile import NamedTemporaryFile
from shutil import get_terminal_size
from weakref import finalize
from collections.abc import (
Mapping,
Hashable,
MutableMapping,
Container,
Iterator,
KeysView,
ItemsView,
MappingView,
Iterable,
Set,
Sequence,
Sized,
ValuesView,
MutableSet,
MutableSequence,
Callable,
)
from os.path import samefile
from json import JSONDecodeError
from builtins import (
ResourceWarning,
FileNotFoundError,
PermissionError,
IsADirectoryError,
FileExistsError,
TimeoutError,
)
from io import StringIO
if not sys.warnoptions:
warnings.simplefilter("default", ResourceWarning)
def is_type_checking():
try:
from typing import TYPE_CHECKING
except ImportError:
return False
return TYPE_CHECKING
IS_TYPE_CHECKING = os.environ.get("MYPY_RUNNING", is_type_checking())
class TemporaryDirectory(object):
"""
Create and return a temporary directory. This has the same
behavior as mkdtemp but can be used as a context manager. For
example:
with TemporaryDirectory() as tmpdir:
...
Upon exiting the context, the directory and everything contained
in it are removed.
"""
def __init__(self, suffix="", prefix=None, dir=None):
if "RAM_DISK" in os.environ:
import uuid
name = uuid.uuid4().hex
dir_name = os.path.join(os.environ["RAM_DISK"].strip(), name)
os.mkdir(dir_name)
self.name = dir_name
else:
suffix = suffix if suffix else ""
if not prefix:
self.name = mkdtemp(suffix=suffix, dir=dir)
else:
self.name = mkdtemp(suffix, prefix, dir)
self._finalizer = finalize(
self,
self._cleanup,
self.name,
warn_message="Implicitly cleaning up {!r}".format(self),
)
@classmethod
def _rmtree(cls, name):
from .path import rmtree
rmtree(name)
@classmethod
def _cleanup(cls, name, warn_message):
cls._rmtree(name)
warnings.warn(warn_message, ResourceWarning)
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.name)
def __enter__(self):
return self
def __exit__(self, exc, value, tb):
self.cleanup()
def cleanup(self):
if self._finalizer.detach():
self._rmtree(self.name)
def is_bytes(string):
"""Check if a string is a bytes instance.
:param Union[str, bytes] string: A string that may be string or bytes like
:return: Whether the provided string is a bytes type or not
:rtype: bool
"""
if isinstance(string, (bytes, memoryview, bytearray)): # noqa
return True
return False
def fs_str(string):
"""Encodes a string into the proper filesystem encoding.
Borrowed from pip-tools
"""
if isinstance(string, str):
return string
assert not isinstance(string, bytes)
return string.encode(_fs_encoding)
def _get_path(path):
"""Fetch the string value from a path-like object.
Returns **None** if there is no string value.
"""
if isinstance(path, (str, bytes)):
return path
path_type = type(path)
try:
path_repr = path_type.__fspath__(path)
except AttributeError:
return
if isinstance(path_repr, (str, bytes)):
return path_repr
return
# copied from the os backport which in turn copied this from
# the pyutf8 package --
# URL: https://github.com/etrepum/pyutf8/blob/master/pyutf8/ref.py
#
def _invalid_utf8_indexes(bytes):
skips = []
i = 0
len_bytes = len(bytes)
while i < len_bytes:
c1 = bytes[i]
if c1 < 0x80:
# U+0000 - U+007F - 7 bits
i += 1
continue
try:
c2 = bytes[i + 1]
if (c1 & 0xE0 == 0xC0) and (c2 & 0xC0 == 0x80):
# U+0080 - U+07FF - 11 bits
c = ((c1 & 0x1F) << 6) | (c2 & 0x3F)
if c < 0x80: # pragma: no cover
# Overlong encoding
skips.extend([i, i + 1]) # pragma: no cover
i += 2
continue
c3 = bytes[i + 2]
if (c1 & 0xF0 == 0xE0) and (c2 & 0xC0 == 0x80) and (c3 & 0xC0 == 0x80):
# U+0800 - U+FFFF - 16 bits
c = ((((c1 & 0x0F) << 6) | (c2 & 0x3F)) << 6) | (c3 & 0x3F)
if (c < 0x800) or (0xD800 <= c <= 0xDFFF):
# Overlong encoding or surrogate.
skips.extend([i, i + 1, i + 2])
i += 3
continue
c4 = bytes[i + 3]
if (
(c1 & 0xF8 == 0xF0)
and (c2 & 0xC0 == 0x80)
and (c3 & 0xC0 == 0x80)
and (c4 & 0xC0 == 0x80)
):
# U+10000 - U+10FFFF - 21 bits
c = ((((((c1 & 0x0F) << 6) | (c2 & 0x3F)) << 6) | (c3 & 0x3F)) << 6) | (
c4 & 0x3F
)
if (c < 0x10000) or (c > 0x10FFFF): # pragma: no cover
# Overlong encoding or invalid code point.
skips.extend([i, i + 1, i + 2, i + 3])
i += 4
continue
except IndexError:
pass
skips.append(i)
i += 1
return skips
def fs_encode(path):
"""Encode a filesystem path to the proper filesystem encoding.
:param Union[str, bytes] path: A string-like path
:returns: A bytes-encoded filesystem path representation
"""
path = _get_path(path)
if path is None:
raise TypeError("expected a valid path to encode")
if isinstance(path, str):
return path.encode(_fs_encoding, _fs_encode_errors)
return path
def fs_decode(path):
"""Decode a filesystem path using the proper filesystem encoding.
:param path: The filesystem path to decode from bytes or string
:return: The filesystem path, decoded with the determined encoding
:rtype: Text
"""
path = _get_path(path)
if path is None:
raise TypeError("expected a valid path to decode")
if isinstance(path, bytes):
import array
indexes = _invalid_utf8_indexes(array.array(str("B"), path))
if indexes and os.name == "nt":
return path.decode(_fs_encoding, "surrogateescape")
return path.decode(_fs_encoding, _fs_decode_errors)
return path
_fs_encoding = "utf-8"
_fs_decode_errors = "surrogateescape"
if sys.platform.startswith("win"):
_fs_error_fn = None
_fs_encode_errors = "surrogatepass"
else:
if sys.version_info >= (3, 3):
_fs_encoding = sys.getfilesystemencoding()
if not _fs_encoding:
_fs_encoding = sys.getdefaultencoding()
alt_strategy = "surrogateescape"
_fs_error_fn = getattr(sys, "getfilesystemencodeerrors", None)
_fs_encode_errors = _fs_error_fn() if _fs_error_fn else alt_strategy
_fs_decode_errors = _fs_error_fn() if _fs_error_fn else _fs_decode_errors
_byte = chr if sys.version_info < (3,) else lambda i: bytes([i])
def to_native_string(string):
from .misc import to_text, to_bytes
return to_text(string)
+5 -4
View File
@@ -1,18 +1,19 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import io
import os
import stat
import sys
import typing
from contextlib import closing, contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile
from urllib import request
from .compat import IS_TYPE_CHECKING, NamedTemporaryFile, Path
from .path import is_file_url, is_valid_url, path_to_url, url_to_path
if IS_TYPE_CHECKING:
if typing.TYPE_CHECKING:
from typing import (
Any,
Bytes,
-6
View File
@@ -1,6 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
from .compat import IS_TYPE_CHECKING
MYPY_RUNNING = IS_TYPE_CHECKING
+37 -81
View File
@@ -11,29 +11,21 @@ import os
import subprocess
import sys
import threading
import typing
import warnings
from collections import OrderedDict
from functools import partial
from itertools import islice, tee
from weakref import WeakKeyDictionary
from queue import Empty, Queue
from typing import Iterable
from .cmdparse import Script
from .compat import (
Iterable,
Path,
StringIO,
TimeoutError,
_fs_decode_errors,
_fs_encode_errors,
fs_str,
is_bytes,
partialmethod,
to_native_string,
)
from .contextmanagers import spinner as spinner
from .environment import MYPY_RUNNING
from .termcolors import ANSI_REMOVAL_RE, colorize
_fs_encode_errors = "surrogatepass"
if os.name != "nt":
@@ -62,7 +54,7 @@ __all__ = [
]
if MYPY_RUNNING:
if typing.TYPE_CHECKING:
from typing import Any, Dict, Generator, IO, List, Optional, Text, Tuple, Union
from .spin import VistirSpinner
@@ -145,6 +137,11 @@ def dedup(iterable):
# type: (Iterable) -> Iterable
"""Deduplicate an iterable object like iter(set(iterable)) but order-
preserved."""
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Use instead: sorted(iter(dict.fromkeys(iterable)))'),
DeprecationWarning, stacklevel=2)
return iter(OrderedDict.fromkeys(iterable))
@@ -381,7 +378,7 @@ class SubprocessStreamWrapper(object):
if self.display_line:
if new_line != self.display_line:
self.display_line_loops_displayed = 0
new_line = fs_str("{}".format(new_line))
new_line = "{}".format(new_line)
if len(new_line) > self.display_line_max_len:
new_line = "{}...".format(new_line[: self.display_line_max_len])
self.display_line = new_line
@@ -447,9 +444,7 @@ class SubprocessStreamWrapper(object):
line, "stderr", spinner=spinner, stdout_allowed=stdout_allowed
)
if spinner:
spinner.text = to_native_string(
"{} {}".format(spinner.text, self.display_line)
)
spinner.text = "{} {}".format(spinner.text, self.display_line)
self.out = self.out.strip()
self.err = self.err.strip()
@@ -486,11 +481,11 @@ def _handle_nonblocking_subprocess(c, spinner=None):
c.wait()
if spinner:
if c.returncode != 0:
spinner.fail(to_native_string("Failed...cleaning up..."))
spinner.fail("Failed...cleaning up...")
elif c.returncode == 0 and not os.name == "nt":
spinner.ok(to_native_string("✔ Complete"))
spinner.ok("✔ Complete")
else:
spinner.ok(to_native_string("Complete"))
spinner.ok("Complete")
return c
@@ -524,7 +519,7 @@ def _create_subprocess(
formatted_tb = "".join(traceback.format_exception(*sys.exc_info()))
sys.stderr.write(
"Error while executing command %s:" % to_native_string(" ".join(cmd._parts))
"Error while executing command %s:" % " ".join(cmd._parts)
)
sys.stderr.write(formatted_tb)
raise exc
@@ -600,7 +595,7 @@ def run(
if env:
_env.update(env)
_env = {k: fs_str(v) for k, v in _env.items()}
_env = {k: v for k, v in _env.items()}
if not spinner_name:
spinner_name = "bouncingBar"
@@ -647,7 +642,10 @@ def load_path(python):
'/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7/site-packages',
'/home/user/git/requirementslib/src']
"""
warnings.warn(
'This function is deprecated and will be removed in version 0.8.',
DeprecationWarning, stacklevel=2)
from pathlib import Path
python = Path(python).as_posix()
out, err = run(
[python, "-c", "import json, sys; print(json.dumps(sys.path))"], nospin=True
@@ -684,7 +682,7 @@ def partialclass(cls, *args, **kwargs):
>>> new_source.__dict__
{'url': 'https://pypi.org/simple', 'verify_ssl': True, 'name': 'pypi'}
"""
from functools import partialmethod
name_attrs = [
n
for n in (getattr(cls, name, str(cls)) for name in ("__name__", "__qualname__"))
@@ -787,7 +785,10 @@ def divide(n, iterable):
:return: a list of new iterables derived from the original iterable
:rtype: list
"""
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Use instead: more_itertools.divide(n, iterable)))'),
DeprecationWarning, stacklevel=2)
seq = tuple(iterable)
q, r = divmod(len(seq), n)
@@ -809,6 +810,11 @@ def take(n, iterable):
from https://github.com/erikrose/more-itertools/blob/master/more_itertools/recipes.py
"""
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Use instead: list(islice(iterable, n))'),
DeprecationWarning, stacklevel=2)
return list(islice(iterable, n))
@@ -820,6 +826,10 @@ def chunked(n, iterable):
from https://github.com/erikrose/more-itertools/blob/master/more_itertools/more.py
"""
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Use instead: more_itertools.chunked(iterable, n)))'),
DeprecationWarning, stacklevel=2)
return iter(partial(take, n, iter(iterable)), [])
@@ -893,7 +903,6 @@ def decode_for_output(output, target_stream=None, translation_map=None):
try:
output = _encode(output, encoding=encoding, translation_map=translation_map)
except (UnicodeDecodeError, UnicodeEncodeError):
output = to_native_string(output)
output = _encode(
output, encoding=encoding, errors="replace", translation_map=translation_map
)
@@ -1205,56 +1214,3 @@ def _can_use_color(stream=None, color=None):
stream = sys.stdin
return _isatty(stream)
return bool(color)
def echo(text, fg=None, bg=None, style=None, file=None, err=False, color=None):
"""Write the given text to the provided stream or **sys.stdout** by
default.
Provides optional foreground and background colors from the ansi defaults:
**grey**, **red**, **green**, **yellow**, **blue**, **magenta**, **cyan**
or **white**.
Available styles include **bold**, **dark**, **underline**, **blink**, **reverse**,
**concealed**
:param str text: Text to write
:param str fg: Foreground color to use (default: None)
:param str bg: Foreground color to use (default: None)
:param str style: Style to use (default: None)
:param stream file: File to write to (default: None)
:param bool color: Whether to force color (i.e. ANSI codes are in the text)
"""
if file and not hasattr(file, "write"):
raise TypeError("Expected a writable stream, received {!r}".format(file))
if not file:
if err:
file = _text_stderr()
else:
file = _text_stdout()
if text and not isinstance(text, (str, bytes, bytearray)):
text = str(text)
text = "" if not text else text
if isinstance(text, str):
text += "\n"
else:
text += b"\n"
if text and is_bytes(text):
buffer = _get_binary_buffer(file)
if buffer is not None:
file.flush()
buffer.write(text)
buffer.flush()
return
if text and not is_bytes(text):
can_use_color = _can_use_color(file, color=color)
if any([fg, bg, style]):
text = colorize(text, fg=fg, bg=bg, attrs=style)
if not can_use_color or (os.name == "nt" and not _wrap_for_color):
text = ANSI_REMOVAL_RE.sub("", text)
elif os.name == "nt" and _wrap_for_color and not _is_wrapped_for_color(file):
file = _wrap_for_color(file, color=color)
if text:
file.write(text)
file.flush()
+23 -40
View File
@@ -10,43 +10,28 @@ import posixpath
import shutil
import stat
import sys
import typing
import time
import unicodedata
import warnings
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Optional, Callable
from urllib import parse as urllib_parse
from urllib import request as urllib_request
from .compat import (
IS_TYPE_CHECKING,
FileNotFoundError,
Path,
PermissionError,
ResourceWarning,
TemporaryDirectory,
_fs_encoding,
NamedTemporaryFile,
finalize,
fs_decode,
fs_encode,
)
from urllib.parse import quote
# fmt: off
from urllib.parse import quote_from_bytes as quote
# fmt: on
if IS_TYPE_CHECKING:
if typing.TYPE_CHECKING:
from types import TracebackType
from typing import (
Any,
AnyStr,
ByteString,
Callable,
Generator,
Iterator,
List,
Optional,
Text,
Tuple,
Type,
@@ -56,6 +41,7 @@ if IS_TYPE_CHECKING:
TPath = os.PathLike
TFunc = Callable[..., Any]
__all__ = [
"check_for_unc_path",
"get_converted_relative_path",
@@ -188,13 +174,13 @@ def path_to_url(path):
# XXX: This enables us to handle half-surrogates that were never
# XXX: actually part of a surrogate pair, but were just incidentally
# XXX: passed in as a piece of a filename
quoted_path = quote(fs_encode(path))
return fs_decode("file:///{}:{}".format(drive, quoted_path))
quoted_path = quote(path, errors="backslashreplace")
return "file:///{}:{}".format(drive, quoted_path)
# XXX: This is also here to help deal with incidental dangling surrogates
# XXX: on linux, by making sure they are preserved during encoding so that
# XXX: we can urlencode the backslash correctly
bytes_path = to_bytes(normalized_path, errors="backslashreplace")
return fs_decode("file://{}".format(quote(bytes_path)))
# bytes_path = to_bytes(normalized_path, errors="backslashreplace")
return "file://{}".format(quote(path, errors="backslashreplace"))
def url_to_path(url):
@@ -248,8 +234,6 @@ def is_readonly_path(fn):
Permissions check is `bool(path.stat & stat.S_IREAD)` or `not
os.access(path, os.W_OK)`
"""
fn = fs_decode(fs_encode(fn))
if os.path.exists(fn):
file_stat = os.stat(fn).st_mode
return not bool(file_stat & stat.S_IWRITE) or not os.access(fn, os.W_OK)
@@ -257,6 +241,10 @@ def is_readonly_path(fn):
def mkdir_p(newdir, mode=0o777):
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Use os.makedirs instead'), DeprecationWarning, stacklevel=2)
# This exists in shutil already
# type: (TPath, int) -> None
"""Recursively creates the target directory and all of its parents if they
do not already exist. Fails silently if they do.
@@ -264,12 +252,11 @@ def mkdir_p(newdir, mode=0o777):
:param str newdir: The directory path to ensure
:raises: OSError if a file is encountered along the way
"""
newdir = fs_decode(fs_encode(newdir))
if os.path.exists(newdir):
if not os.path.isdir(newdir):
raise OSError(
"a file with the same name as the desired dir, '{}', already exists.".format(
fs_decode(newdir)
newdir
)
)
return None
@@ -280,7 +267,10 @@ def ensure_mkdir_p(mode=0o777):
# type: (int) -> Callable[Callable[..., Any], Callable[..., Any]]
"""Decorator to ensure `mkdir_p` is called to the function's return
value."""
warnings.warn('This function is deprecated and will be removed in version 0.8.',
DeprecationWarning, stacklevel=2)
# This exists in shutil already
def decorator(f):
# type: (Callable[..., Any]) -> Callable[..., Any]
@functools.wraps(f)
@@ -289,9 +279,7 @@ def ensure_mkdir_p(mode=0o777):
path = f(*args, **kwargs)
mkdir_p(path, mode=mode)
return path
return decorated
return decorator
@@ -346,16 +334,13 @@ def _find_icacls_exe():
return None
def set_write_bit(fn):
# type: (str) -> None
def set_write_bit(fn: str) -> None:
"""Set read-write permissions for the current user on the target path. Fail
silently if the path doesn't exist.
:param str fn: The target filename or path
:return: None
"""
fn = fs_decode(fs_encode(fn))
if not os.path.exists(fn):
return
file_stat = os.stat(fn).st_mode
@@ -411,8 +396,9 @@ def set_write_bit(fn):
set_write_bit(file_)
def rmtree(directory, ignore_errors=False, onerror=None):
# type: (str, bool, Optional[Callable]) -> None
def rmtree(directory: str,
ignore_errors: bool = False,
onerror: Optional[Callable] = None) -> None :
"""Stand-in for :func:`~shutil.rmtree` with additional error-handling.
This version of `rmtree` handles read-only paths, especially in the case of index
@@ -427,7 +413,6 @@ def rmtree(directory, ignore_errors=False, onerror=None):
Setting `ignore_errors=True` may cause this to silently fail to delete the path
"""
directory = fs_decode(fs_encode(directory))
if onerror is None:
onerror = handle_remove_readonly
try:
@@ -485,8 +470,6 @@ def handle_remove_readonly(func, path, exc):
This function will call check :func:`is_readonly_path` before attempting to call
:func:`set_write_bit` on the target path and try again.
"""
# Check for read-only attribute
from .compat import ResourceWarning, FileNotFoundError, PermissionError
PERM_ERRORS = (errno.EACCES, errno.EPERM, errno.ENOENT)
default_warning_message = "Unable to remove file due to permissions restriction: {!r}"
+14 -10
View File
@@ -1,22 +1,22 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
import functools
import os
import signal
import sys
import threading
import time
import typing
import warnings
from io import StringIO
import pipenv.vendor.colorama as colorama
from .compat import IS_TYPE_CHECKING, to_native_string
from .cursor import hide_cursor, show_cursor
from .misc import decode_for_output, to_text
from .termcolors import COLOR_MAP, COLORS, DISABLE_COLORS, colored
if IS_TYPE_CHECKING:
if typing.TYPE_CHECKING:
from typing import (
Any,
Callable,
@@ -38,17 +38,17 @@ if IS_TYPE_CHECKING:
try:
import pipenv.vendor.yaspin as yaspin
except ImportError: # pragma: no cover
yaspin = None
Spinners = None
SpinBase = None
else: # pragma: no cover
import yaspin.spinners
import yaspin.core
Spinners = yaspin.spinners.Spinners
SpinBase = yaspin.core.Yaspin
except ImportError: # pragma: no cover
yaspin = None
Spinners = None
SpinBase = None
if os.name == "nt": # pragma: no cover
def handler(signum, frame, spinner):
@@ -88,7 +88,7 @@ class DummySpinner(object):
# type: (str, Any) -> None
if DISABLE_COLORS:
colorama.init()
self.text = to_native_string(decode_output(text)) if text else ""
self.text = decode_output(text) if text else ""
self.stdout = kwargs.get("stdout", sys.stdout)
self.stderr = kwargs.get("stderr", sys.stderr)
self.out_buff = StringIO()
@@ -484,6 +484,10 @@ class VistirSpinner(SpinBase):
def create_spinner(*args, **kwargs):
warnings.warn(
('This function is deprecated and will be removed in version 0.8.'
'Consider using yaspin directly instead, or user rich.status'),
DeprecationWarning, stacklevel=2)
# type: (Any, Any) -> Union[DummySpinner, VistirSpinner]
nospin = kwargs.pop("nospin", False)
use_yaspin = kwargs.pop("use_yaspin", not nospin)
+1 -3
View File
@@ -1,12 +1,10 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import os
import re
import pipenv.vendor.colorama as colorama
from .compat import to_native_string
from .misc import to_text as to_native_string
DISABLE_COLORS = os.getenv("CI", False) or os.getenv(
"ANSI_COLORS_DISABLED", os.getenv("VISTIR_DISABLE_COLORS", False)