Re-vendor vistir and requirementslib

- Fix issues parsing direct dependency URLs
- Fix vistir `rmtree` error handling automated attempts at `chmod`
  invocation which may not always be permitted
- Fix nested direct dependency parsing
- Fixes #4226
- Fixes #3964

Signed-off-by: Dan Ryan <dan.ryan@canonical.com>
This commit is contained in:
Dan Ryan
2020-05-14 20:15:07 -04:00
parent cb6cdb9644
commit c4a165bce3
21 changed files with 872 additions and 424 deletions
+2 -2
View File
@@ -13,8 +13,8 @@ See <http://github.com/ActiveState/appdirs> for details and usage.
# - Mac OS X: http://developer.apple.com/documentation/MacOSX/Conceptual/BPFileSystem/index.html
# - XDG spec for Un*x: http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
__version_info__ = (1, 4, 3)
__version__ = '.'.join(map(str, __version_info__))
__version__ = "1.4.4"
__version_info__ = tuple(int(segment) for segment in __version__.split("."))
import sys
+1 -1
View File
@@ -76,4 +76,4 @@ from .utils import open_file
# literals.
disable_unicode_literals_warning = False
__version__ = "7.1.1"
__version__ = "7.1.2"
-4
View File
@@ -174,8 +174,6 @@ if PY2:
iteritems = lambda x: x.iteritems()
range_type = xrange
from pipes import quote as shlex_quote
def is_bytes(x):
return isinstance(x, (buffer, bytearray))
@@ -284,8 +282,6 @@ else:
isidentifier = lambda x: x.isidentifier()
iteritems = lambda x: iter(x.items())
from shlex import quote as shlex_quote
def is_bytes(x):
return isinstance(x, (bytes, memoryview, bytearray))
+9 -13
View File
@@ -17,7 +17,6 @@ from ._compat import int_types
from ._compat import isatty
from ._compat import open_stream
from ._compat import range_type
from ._compat import shlex_quote
from ._compat import strip_ansi
from ._compat import term_len
from ._compat import WIN
@@ -346,10 +345,7 @@ def pager(generator, color=None):
fd, filename = tempfile.mkstemp()
os.close(fd)
try:
if (
hasattr(os, "system")
and os.system("more {}".format(shlex_quote(filename))) == 0
):
if hasattr(os, "system") and os.system('more "{}"'.format(filename)) == 0:
return _pipepager(generator, "more", color)
return _nullpager(stdout, generator, color)
finally:
@@ -418,7 +414,7 @@ def _tempfilepager(generator, cmd, color):
with open_stream(filename, "wb")[0] as f:
f.write(text.encode(encoding))
try:
os.system("{} {}".format(shlex_quote(cmd), shlex_quote(filename)))
os.system('{} "{}"'.format(cmd, filename))
finally:
os.unlink(filename)
@@ -463,9 +459,7 @@ class Editor(object):
environ = None
try:
c = subprocess.Popen(
"{} {}".format(shlex_quote(editor), shlex_quote(filename)),
env=environ,
shell=True,
'{} "{}"'.format(editor, filename), env=environ, shell=True,
)
exit_code = c.wait()
if exit_code != 0:
@@ -536,16 +530,18 @@ def open_url(url, wait=False, locate=False):
elif WIN:
if locate:
url = _unquote_file(url)
args = "explorer /select,{}".format(shlex_quote(url))
args = 'explorer /select,"{}"'.format(_unquote_file(url.replace('"', "")))
else:
args = 'start {} "" {}'.format("/WAIT" if wait else "", shlex_quote(url))
args = 'start {} "" "{}"'.format(
"/WAIT" if wait else "", url.replace('"', "")
)
return os.system(args)
elif CYGWIN:
if locate:
url = _unquote_file(url)
args = "cygstart {}".format(shlex_quote(os.path.dirname(url)))
args = 'cygstart "{}"'.format(os.path.dirname(url).replace('"', ""))
else:
args = "cygstart {} {}".format("-w" if wait else "", shlex_quote(url))
args = 'cygstart {} "{}"'.format("-w" if wait else "", url.replace('"', ""))
return os.system(args)
try:
+1 -1
View File
@@ -10,7 +10,7 @@ from .models.lockfile import Lockfile
from .models.pipfile import Pipfile
from .models.requirements import Requirement
__version__ = "1.5.7"
__version__ = "1.5.8"
logger = logging.getLogger(__name__)
+24 -17
View File
@@ -57,6 +57,7 @@ if MYPY_RUNNING:
Command,
)
from packaging.requirements import Requirement as PackagingRequirement
from packaging.markers import Marker
TRequirement = TypeVar("TRequirement")
RequirementType = TypeVar(
@@ -71,9 +72,14 @@ PKGS_DOWNLOAD_DIR = fs_str(os.path.join(CACHE_DIR, "pkgs"))
WHEEL_DOWNLOAD_DIR = fs_str(os.path.join(CACHE_DIR, "wheels"))
DEPENDENCY_CACHE = DependencyCache()
WHEEL_CACHE = pip_shims.shims.WheelCache(
CACHE_DIR, pip_shims.shims.FormatControl(set(), set())
)
@contextlib.contextmanager
def _get_wheel_cache():
with pip_shims.shims.global_tempdir_manager():
yield pip_shims.shims.WheelCache(
CACHE_DIR, pip_shims.shims.FormatControl(set(), set())
)
def _get_filtered_versions(ireq, versions, prereleases):
@@ -351,6 +357,7 @@ def get_dependencies(ireq, sources=None, parent=None):
def get_dependencies_from_wheel_cache(ireq):
# type: (pip_shims.shims.InstallRequirement) -> Optional[Set[pip_shims.shims.InstallRequirement]]
"""Retrieves dependencies for the given install requirement from the wheel cache.
:param ireq: A single InstallRequirement
@@ -361,13 +368,14 @@ def get_dependencies_from_wheel_cache(ireq):
if ireq.editable or not is_pinned_requirement(ireq):
return
matches = WHEEL_CACHE.get(ireq.link, name_from_req(ireq.req))
if matches:
matches = set(matches)
if not DEPENDENCY_CACHE.get(ireq):
DEPENDENCY_CACHE[ireq] = [format_requirement(m) for m in matches]
return matches
return
with _get_wheel_cache() as wheel_cache:
matches = wheel_cache.get(ireq.link, name_from_req(ireq.req))
if matches:
matches = set(matches)
if not DEPENDENCY_CACHE.get(ireq):
DEPENDENCY_CACHE[ireq] = [format_requirement(m) for m in matches]
return matches
return None
def _marker_contains_extra(ireq):
@@ -477,12 +485,12 @@ def get_dependencies_from_index(dep, sources=None, pip_options=None, wheel_cache
"""
session, finder = get_finder(sources=sources, pip_options=pip_options)
if not wheel_cache:
wheel_cache = WHEEL_CACHE
dep.is_direct = True
requirements = None
setup_requires = {}
with temp_environ():
with temp_environ(), ExitStack() as stack:
if not wheel_cache:
wheel_cache = stack.enter_context(_get_wheel_cache())
os.environ["PIP_EXISTS_ACTION"] = "i"
if dep.editable and not dep.prepared and not dep.req:
setup_info = SetupInfo.from_ireq(dep)
@@ -570,10 +578,6 @@ def start_resolver(finder=None, session=None, wheel_cache=None):
if not session:
session = pip_command._build_session(pip_options)
if not wheel_cache:
wheel_cache = WHEEL_CACHE
_ensure_dir(fs_str(os.path.join(wheel_cache.cache_dir, "wheels")))
download_dir = PKGS_DOWNLOAD_DIR
_ensure_dir(download_dir)
@@ -582,6 +586,9 @@ def start_resolver(finder=None, session=None, wheel_cache=None):
try:
with ExitStack() as ctx:
ctx.enter_context(pip_shims.shims.global_tempdir_manager())
if not wheel_cache:
wheel_cache = ctx.enter_context(_get_wheel_cache())
_ensure_dir(fs_str(os.path.join(wheel_cache.cache_dir, "wheels")))
preparer = ctx.enter_context(
pip_shims.shims.make_preparer(
options=pip_options,
+2 -2
View File
@@ -25,7 +25,7 @@ if MYPY_RUNNING:
STRING_TYPE = Union[str, bytes, Text]
MAX_VERSIONS = {2: 7, 3: 11, 4: 0}
MAX_VERSIONS = {1: 7, 2: 7, 3: 11, 4: 0}
DEPRECATED_VERSIONS = ["3.0", "3.1", "3.2", "3.3"]
@@ -557,7 +557,7 @@ def _split_specifierset_str(specset_str, prefix="=="):
else:
values = [v.strip() for v in specset_str.split(",")]
if prefix == "!=" and any(v in values for v in DEPRECATED_VERSIONS):
values = DEPRECATED_VERSIONS[:]
values += DEPRECATED_VERSIONS[:]
for value in sorted(values):
specifiers.add(Specifier("{0}{1}".format(prefix, value)))
return specifiers
+76 -73
View File
@@ -164,8 +164,7 @@ class Line(object):
self.parsed_marker = None # type: Optional[Marker]
self.preferred_scheme = None # type: Optional[STRING_TYPE]
self._requirement = None # type: Optional[PackagingRequirement]
self.is_direct_url = False # type: bool
self._parsed_url = None # type: Optional[urllib_parse.ParseResult]
self._parsed_url = None # type: Optional[URI]
self._setup_cfg = None # type: Optional[STRING_TYPE]
self._setup_py = None # type: Optional[STRING_TYPE]
self._pyproject_toml = None # type: Optional[STRING_TYPE]
@@ -567,17 +566,22 @@ class Line(object):
:rtype: :class:`~Line`
"""
extras = None
if "@" in self.line or self.is_vcs or self.is_url:
line = "{0}".format(self.line)
uri = URI.parse(line)
name = uri.name
if name:
self._name = name
if uri.host and uri.path and uri.scheme:
self.line = uri.to_string(
escape_password=False, direct=False, strip_ssh=uri.is_implicit_ssh
)
else:
line = "{0}".format(self.line)
if any([self.is_vcs, self.is_url, "@" in line]):
try:
if self.parsed_url.name:
self._name = self.parsed_url.name
if (
self.parsed_url.host
and self.parsed_url.path
and self.parsed_url.scheme
):
self.line = self.parsed_url.to_string(
escape_password=False,
direct=False,
strip_ssh=self.parsed_url.is_implicit_ssh,
)
except ValueError:
self.line, extras = pip_shims.shims._strip_extras(self.line)
else:
self.line, extras = pip_shims.shims._strip_extras(self.line)
@@ -596,36 +600,13 @@ class Line(object):
def get_url(self):
# type: () -> STRING_TYPE
"""Sets ``self.name`` if given a **PEP-508** style URL"""
line = self.line
try:
parsed = URI.parse(line)
line = parsed.to_string(escape_password=False, direct=False, strip_ref=True)
return self.parsed_url.to_string(
escape_password=False, direct=False, strip_ref=True
)
except ValueError:
pass
else:
self._parsed_url = parsed
return line
if self.vcs is not None and self.line.startswith("{0}+".format(self.vcs)):
_, _, _parseable = self.line.partition("+")
parsed = urllib_parse.urlparse(add_ssh_scheme_to_git_uri(_parseable))
line, _ = split_ref_from_uri(line)
else:
parsed = urllib_parse.urlparse(add_ssh_scheme_to_git_uri(line))
if "@" in self.line and parsed.scheme == "":
name, _, url = self.line.partition("@")
if self._name is None:
url = url.strip()
self._name = name.strip()
if is_valid_url(url):
self.is_direct_url = True
line = url.strip()
parsed = urllib_parse.urlparse(line)
url_path = parsed.path
if "@" in url_path:
url_path, _, _ = url_path.rpartition("@")
parsed = parsed._replace(path=url_path)
self._parsed_url = parsed
return line
return self.line
@property
def name(self):
@@ -655,20 +636,16 @@ class Line(object):
@property
def url(self):
# type: () -> Optional[STRING_TYPE]
if self.uri is not None:
url = add_ssh_scheme_to_git_uri(self.uri)
else:
url = getattr(self.link, "url_without_fragment", None)
if url is not None:
url = add_ssh_scheme_to_git_uri(unquote(url))
if url is not None and self._parsed_url is None:
if self.vcs is not None:
_, _, _parseable = url.partition("+")
self._parsed_url = urllib_parse.urlparse(_parseable)
if self.is_vcs:
# strip the ref from the url
url, _ = split_ref_from_uri(url)
return url
try:
return self.parsed_url.to_string(
escape_password=False,
strip_ref=True,
strip_name=True,
strip_subdir=True,
strip_ssh=False,
)
except ValueError:
return None
@property
def link(self):
@@ -711,9 +688,14 @@ class Line(object):
@property
def is_url(self):
# type: () -> bool
url = self.get_url()
if is_valid_url(url) or is_file_url(url):
return True
# url = self.get_url()
# if is_valid_url(url) or is_file_url(url):
# return True
# return False
try:
return bool(self.parsed_url)
except ValueError:
return False
return False
@property
@@ -739,8 +721,11 @@ class Line(object):
def is_file_url(self):
# type: () -> bool
url = self.get_url()
parsed_url_scheme = self._parsed_url.scheme if self._parsed_url else ""
if url and is_file_url(self.get_url()) or parsed_url_scheme == "file":
try:
parsed_url_scheme = self.parsed_url.scheme
except ValueError:
parsed_url_scheme = ""
if url and is_file_url(url) or parsed_url_scheme == "file":
return True
return False
@@ -850,6 +835,21 @@ class Line(object):
self._vcsrepo = self._get_vcsrepo()
return self._vcsrepo
@property
def parsed_url(self):
# type: () -> URI
if self._parsed_url is None:
self._parsed_url = URI.parse(self.line)
return self._parsed_url
@property
def is_direct_url(self):
# type: () -> bool
try:
return self.is_url and self.parsed_url.is_direct_url
except ValueError:
return self.is_url and bool(DIRECT_URL_RE.match(self.line))
@cached_property
def metadata(self):
# type: () -> Dict[Any, Any]
@@ -886,8 +886,8 @@ class Line(object):
ireq = self.ireq
wheel_kwargs = self.wheel_kwargs.copy()
wheel_kwargs["src_dir"] = repo.checkout_directory
ireq.ensure_has_source_dir(wheel_kwargs["src_dir"])
with pip_shims.shims.global_tempdir_manager(), temp_path():
ireq.ensure_has_source_dir(wheel_kwargs["src_dir"])
sys.path = [repo.checkout_directory, "", ".", get_python_lib(plat_specific=0)]
setupinfo = SetupInfo.create(
repo.checkout_directory,
@@ -1052,10 +1052,10 @@ class Line(object):
# else:
# req.link = self.link
if self.ref and self._requirement is not None:
self._requirement.revision = self.ref
if self._vcsrepo is not None:
self._requirement.revision = self._vcsrepo.get_commit_hash()
else:
self._requirement.revision = self.ref
with pip_shims.shims.global_tempdir_manager():
self._requirement.revision = self._vcsrepo.get_commit_hash()
return self._requirement
def parse_requirement(self):
@@ -1112,7 +1112,7 @@ class Line(object):
or (os.path.exists(self.line) or os.path.isabs(self.line))
):
url = pip_shims.shims.path_to_url(os.path.abspath(self.line))
parsed_url = URI.parse(url)
self._parsed_url = parsed_url = URI.parse(url)
elif is_valid_url(self.line) or is_vcs(self.line) or is_file_url(self.line):
parsed_url = URI.parse(self.line)
if parsed_url is not None:
@@ -2114,21 +2114,18 @@ class VCSRequirement(FileRequirement):
def get_commit_hash(self):
# type: () -> STRING_TYPE
hash_ = None
hash_ = self.repo.get_commit_hash()
with pip_shims.shims.global_tempdir_manager():
hash_ = self.repo.get_commit_hash()
return hash_
def update_repo(self, src_dir=None, ref=None):
# type: (Optional[STRING_TYPE], Optional[STRING_TYPE]) -> STRING_TYPE
if ref:
self.ref = ref
else:
if self.ref:
ref = self.ref
repo_hash = None
if not self.is_local and ref is not None:
self.repo.checkout_ref(ref)
repo_hash = self.repo.get_commit_hash()
if not self.is_local and self.ref is not None:
self.repo.checkout_ref(self.ref)
repo_hash = self.get_commit_hash()
if self.req:
self.req.revision = repo_hash
return repo_hash
@@ -2144,7 +2141,8 @@ class VCSRequirement(FileRequirement):
self.req = self.parsed_line.requirement
else:
self.req = self.get_requirement()
revision = self.req.revision = vcsrepo.get_commit_hash()
with pip_shims.shims.global_tempdir_manager():
revision = self.req.revision = vcsrepo.get_commit_hash()
# Remove potential ref in the end of uri after ref is parsed
if self.link and "@" in self.link.show_url and self.uri and "@" in self.uri:
@@ -3095,3 +3093,8 @@ def named_req_from_parsed_line(parsed_line):
parsed_line=parsed_line,
)
return NamedRequirement.from_line(parsed_line.line)
if __name__ == "__main__":
line = Line("vistir@ git+https://github.com/sarugaku/vistir.git@master")
print(line)
+55 -31
View File
@@ -23,9 +23,10 @@ import six
from appdirs import user_cache_dir
from distlib.wheel import Wheel
from packaging.markers import Marker
from pip_shims.utils import call_function_with_correct_args
from six.moves import configparser
from six.moves.urllib.parse import unquote, urlparse, urlunparse
from vistir.compat import FileNotFoundError, Iterable, Mapping, Path, lru_cache
from vistir.compat import FileNotFoundError, Iterable, Mapping, Path, finalize, lru_cache
from vistir.contextmanagers import cd, temp_path
from vistir.misc import run
from vistir.path import create_tracked_tempdir, ensure_mkdir_p, mkdir_p, rmtree
@@ -1111,29 +1112,34 @@ class Extra(object):
return {self.name: tuple([r.requirement for r in self.requirements])}
@attr.s(slots=True, cmp=True, hash=True)
@attr.s(slots=True, eq=True, hash=True)
class SetupInfo(object):
name = attr.ib(default=None, cmp=True) # type: STRING_TYPE
base_dir = attr.ib(default=None, cmp=True, hash=False) # type: STRING_TYPE
_version = attr.ib(default=None, cmp=True) # type: STRING_TYPE
name = attr.ib(default=None, eq=True) # type: STRING_TYPE
base_dir = attr.ib(default=None, eq=True, hash=False) # type: STRING_TYPE
_version = attr.ib(default=None, eq=True) # type: STRING_TYPE
_requirements = attr.ib(
type=frozenset, factory=frozenset, cmp=True, hash=True
type=frozenset, factory=frozenset, eq=True, hash=True
) # type: Optional[frozenset]
build_requires = attr.ib(default=None, cmp=True) # type: Optional[Tuple]
build_backend = attr.ib(cmp=True) # type: STRING_TYPE
setup_requires = attr.ib(default=None, cmp=True) # type: Optional[Tuple]
build_requires = attr.ib(default=None, eq=True) # type: Optional[Tuple]
build_backend = attr.ib(eq=True) # type: STRING_TYPE
setup_requires = attr.ib(default=None, eq=True) # type: Optional[Tuple]
python_requires = attr.ib(
default=None, cmp=True
default=None, eq=True
) # type: Optional[packaging.specifiers.SpecifierSet]
_extras_requirements = attr.ib(default=None, cmp=True) # type: Optional[Tuple]
setup_cfg = attr.ib(type=Path, default=None, cmp=True, hash=False)
setup_py = attr.ib(type=Path, default=None, cmp=True, hash=False)
pyproject = attr.ib(type=Path, default=None, cmp=True, hash=False)
_extras_requirements = attr.ib(default=None, eq=True) # type: Optional[Tuple]
setup_cfg = attr.ib(type=Path, default=None, eq=True, hash=False)
setup_py = attr.ib(type=Path, default=None, eq=True, hash=False)
pyproject = attr.ib(type=Path, default=None, eq=True, hash=False)
ireq = attr.ib(
default=None, cmp=True, hash=False
default=None, eq=True, hash=False
) # type: Optional[InstallRequirement]
extra_kwargs = attr.ib(default=attr.Factory(dict), type=dict, cmp=False, hash=False)
extra_kwargs = attr.ib(default=attr.Factory(dict), type=dict, eq=False, hash=False)
metadata = attr.ib(default=None) # type: Optional[Tuple[STRING_TYPE]]
stack = attr.ib(default=None, eq=False) # type: Optional[ExitStack]
_finalizer = attr.ib(default=None, eq=False) # type: Any
def __attrs_post_init__(self):
self._finalizer = finalize(self, self.stack.close)
@build_backend.default
def get_build_backend(self):
@@ -1586,10 +1592,13 @@ build-backend = "{1}"
return None
if ireq.link.is_wheel:
return None
if not finder:
from .dependencies import get_finder
session, finder = get_finder()
stack = ExitStack()
if not session:
cmd = pip_shims.shims.InstallCommand()
options, _ = cmd.parser.parse_args([])
session = cmd._build_session(options)
finder = cmd._build_package_finder(options, session)
tempdir_manager = stack.enter_context(pip_shims.shims.global_tempdir_manager())
vcs, uri = split_vcs_method_from_uri(unquote(ireq.link.url_without_fragment))
parsed = urlparse(uri)
if "file" in parsed.scheme:
@@ -1599,7 +1608,9 @@ build-backend = "{1}"
parsed = parsed._replace(path=url_path)
uri = urlunparse(parsed)
path = None
is_file = False
if ireq.link.scheme == "file" or uri.startswith("file://"):
is_file = True
if "file:/" in uri and "file:///" not in uri:
uri = uri.replace("file:/", "file:///")
path = pip_shims.shims.url_to_path(uri)
@@ -1608,7 +1619,11 @@ build-backend = "{1}"
ireq.link, "is_vcs", getattr(ireq.link, "is_artifact", False)
)
is_vcs = True if vcs else is_artifact_or_vcs
if not (ireq.editable and pip_shims.shims.is_file_url(ireq.link) and is_vcs):
if is_file and not is_vcs and path is not None and os.path.isdir(path):
target = os.path.join(kwargs["src_dir"], os.path.basename(path))
shutil.copytree(path, target)
ireq.source_dir = target
if not (ireq.editable and is_file and is_vcs):
if ireq.is_wheel:
only_download = True
download_dir = kwargs["wheel_download_dir"]
@@ -1624,27 +1639,33 @@ build-backend = "{1}"
build_location_func = getattr(ireq, "build_location", None)
if build_location_func is None:
build_location_func = getattr(ireq, "ensure_build_location", None)
build_location_func(kwargs["build_dir"])
ireq.ensure_has_source_dir(kwargs["src_dir"])
src_dir = ireq.source_dir
with pip_shims.shims.global_tempdir_manager():
if not ireq.source_dir:
build_kwargs = {"build_dir": kwargs["build_dir"], "autodelete": False}
call_function_with_correct_args(build_location_func, **build_kwargs)
ireq.ensure_has_source_dir(kwargs["src_dir"])
src_dir = ireq.source_dir
pip_shims.shims.shim_unpack(
link=ireq.link,
location=kwargs["src_dir"],
download_dir=download_dir,
ireq=ireq,
only_download=only_download,
session=session,
hashes=ireq.hashes(False),
progress_bar="off",
)
created = cls.create(
kwargs["src_dir"], subdirectory=subdir, ireq=ireq, kwargs=kwargs
ireq.source_dir, subdirectory=subdir, ireq=ireq, kwargs=kwargs, stack=stack
)
return created
@classmethod
def create(cls, base_dir, subdirectory=None, ireq=None, kwargs=None):
# type: (AnyStr, Optional[AnyStr], Optional[InstallRequirement], Optional[Dict[AnyStr, AnyStr]]) -> Optional[SetupInfo]
def create(
cls,
base_dir, # type: str
subdirectory=None, # type: Optional[str]
ireq=None, # type: Optional[InstallRequirement]
kwargs=None, # type: Optional[Dict[str, str]]
stack=None, # type: Optional[ExitStack]
):
# type: (...) -> Optional[SetupInfo]
if not base_dir or base_dir is None:
return None
@@ -1661,6 +1682,9 @@ build-backend = "{1}"
creation_kwargs["pyproject"] = pyproject
creation_kwargs["setup_py"] = setup_py
creation_kwargs["setup_cfg"] = setup_cfg
if stack is None:
stack = ExitStack()
creation_kwargs["stack"] = stack
if ireq:
creation_kwargs["ireq"] = ireq
created = cls(**creation_kwargs)
+16 -5
View File
@@ -108,12 +108,18 @@ class URI(object):
query_dict = omdict()
queries = query.split("&")
query_items = []
subdirectory = self.subdirectory if self.subdirectory else None
for q in queries:
key, _, val = q.partition("=")
val = unquote_plus(val.replace("+", " "))
query_items.append((key, val))
if key == "subdirectory" and not subdirectory:
subdirectory = val
else:
query_items.append((key, val))
query_dict.load(query_items)
return attr.evolve(self, query_dict=query_dict, query=query)
return attr.evolve(
self, query_dict=query_dict, subdirectory=subdirectory, query=query
)
def _parse_fragment(self):
# type: () -> URI
@@ -187,7 +193,10 @@ class URI(object):
subdir = None
if "&subdirectory" in url_part:
url_part, _, subdir = url_part.rpartition("&")
subdir = "&{0}".format(subdir.strip())
if "#egg=" not in url_part:
subdir = "#{0}".format(subdir.strip())
else:
subdir = "&{0}".format(subdir.strip())
return url_part.strip(), subdir
@classmethod
@@ -295,9 +304,11 @@ class URI(object):
query = ""
if self.query:
query = "{query}?{self.query}".format(query=query, self=self)
subdir_prefix = "#"
if not direct:
if self.name and not strip_name:
fragment = "#egg={self.name_with_extras}".format(self=self)
subdir_prefix = "&"
elif not strip_name and (
self.extras and self.scheme and self.scheme.startswith("file")
):
@@ -308,8 +319,8 @@ class URI(object):
fragment = ""
query = "{query}{fragment}".format(query=query, fragment=fragment)
if self.subdirectory and not strip_subdir:
query = "{query}&subdirectory={self.subdirectory}".format(
query=query, self=self
query = "{query}{subdir_prefix}subdirectory={self.subdirectory}".format(
query=query, subdir_prefix=subdir_prefix, self=self
)
host_port_path = self.get_host_port_path(strip_ref=strip_ref)
url = "{self.scheme}://{auth}{host_port_path}{query}".format(
+2 -1
View File
@@ -106,7 +106,8 @@ class VCSRepository(object):
def get_commit_hash(self, ref=None):
# type: (Optional[str]) -> str
return self.repo_backend.get_revision(self.checkout_directory)
with pip_shims.shims.global_tempdir_manager():
return self.repo_backend.get_revision(self.checkout_directory)
@classmethod
def monkeypatch_pip(cls):
+1 -1
View File
@@ -121,7 +121,7 @@ def strip_ssh_from_git_uri(uri):
def add_ssh_scheme_to_git_uri(uri):
# type: (S) -> S
"""Cleans VCS uris from pipenv.patched.notpip format"""
"""Cleans VCS uris from pip format"""
if isinstance(uri, six.string_types):
# Add scheme for parsing purposes, this is also what pip does
if uri.startswith("git+") and "://" not in uri:
+4 -4
View File
@@ -1,7 +1,7 @@
appdirs==1.4.3
appdirs==1.4.4
backports.shutil_get_terminal_size==1.0.0
backports.weakref==1.0.post1
click==7.1.1
click==7.1.2
click-completion==0.5.2
click-didyoumean==0.0.3
colorama==0.4.3
@@ -26,7 +26,7 @@ requests==2.23.0
idna==2.9
urllib3==1.25.9
certifi==2020.4.5.1
requirementslib==1.5.7
requirementslib==1.5.8
attrs==19.3.0
distlib==0.3.0
packaging==20.3
@@ -38,7 +38,7 @@ six==1.14.0
semver==2.9.0
toml==0.10.0
cached-property==1.5.1
vistir==0.5.0
vistir==0.5.1
pip-shims==0.5.2
contextlib2==0.6.0.post1
funcsigs==1.0.2
+1 -1
View File
@@ -36,7 +36,7 @@ from .misc import (
from .path import create_tracked_tempdir, create_tracked_tempfile, mkdir_p, rmtree
from .spin import create_spinner
__version__ = "0.5.0"
__version__ = "0.5.1"
__all__ = [
+15 -12
View File
@@ -60,7 +60,7 @@ from ctypes import (
py_object,
windll,
)
from ctypes.wintypes import LPCWSTR, LPWSTR
from ctypes.wintypes import HANDLE, LPCWSTR, LPWSTR
from itertools import count
import msvcrt
@@ -83,19 +83,18 @@ if IS_TYPE_CHECKING:
c_ssize_p = POINTER(c_ssize_t)
kernel32 = windll.kernel32
GetStdHandle = kernel32.GetStdHandle
ReadConsoleW = kernel32.ReadConsoleW
WriteConsoleW = kernel32.WriteConsoleW
GetLastError = kernel32.GetLastError
GetConsoleCursorInfo = kernel32.GetConsoleCursorInfo
SetConsoleCursorInfo = kernel32.SetConsoleCursorInfo
GetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32))
CommandLineToArgvW = WINFUNCTYPE(POINTER(LPWSTR), LPCWSTR, POINTER(c_int))(
("CommandLineToArgvW", windll.shell32)
)
kernel32 = windll.kernel32
GetLastError = kernel32.GetLastError
GetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32))
GetConsoleCursorInfo = kernel32.GetConsoleCursorInfo
GetStdHandle = kernel32.GetStdHandle
LocalFree = WINFUNCTYPE(ctypes.c_void_p, ctypes.c_void_p)(("LocalFree", windll.kernel32))
ReadConsoleW = kernel32.ReadConsoleW
SetConsoleCursorInfo = kernel32.SetConsoleCursorInfo
WriteConsoleW = kernel32.WriteConsoleW
# XXX: Added for cursor hiding on windows
STDOUT_HANDLE_ID = ctypes.c_ulong(-11)
@@ -354,7 +353,11 @@ if PY2:
def _get_windows_argv():
argc = c_int(0)
argv_unicode = CommandLineToArgvW(GetCommandLineW(), byref(argc))
argv = [argv_unicode[i] for i in range(0, argc.value)]
try:
argv = [argv_unicode[i] for i in range(0, argc.value)]
finally:
LocalFree(argv_unicode)
del argv_unicode
if not hasattr(sys, "frozen"):
argv = argv[1:]
+1 -1
View File
@@ -35,7 +35,7 @@ if six.PY3:
_unichr = chr
bytes_chr = lambda code: bytes((code,))
else:
_unichr = unichr
_unichr = unichr # type: ignore
bytes_chr = chr
+65 -40
View File
@@ -29,11 +29,22 @@ __all__ = [
"TemporaryDirectory",
"NamedTemporaryFile",
"to_native_string",
"Iterable",
"Mapping",
"Sequence",
"Set",
"Hashable",
"MutableMapping",
"Container",
"Iterator",
"KeysView",
"ItemsView",
"MappingView",
"Iterable",
"Set",
"Sequence",
"Sized",
"ValuesView",
"MutableSet",
"MutableSequence",
"Callable",
"fs_encode",
"fs_decode",
"_fs_encode_errors",
@@ -45,18 +56,55 @@ if sys.version_info >= (3, 5): # pragma: no cover
else: # pragma: no cover
from pipenv.vendor.pathlib2 import Path
if six.PY3: # pragma: no cover
if sys.version_info >= (3, 4): # pragma: no cover
# Only Python 3.4+ is supported
from functools import lru_cache, partialmethod
from tempfile import NamedTemporaryFile
from shutil import get_terminal_size
from weakref import finalize
from collections.abc import (
Mapping,
Hashable,
MutableMapping,
Container,
Iterator,
KeysView,
ItemsView,
MappingView,
Iterable,
Set,
Sequence,
Sized,
ValuesView,
MutableSet,
MutableSequence,
Callable,
)
else: # pragma: no cover
# Only Python 2.7 is supported
from pipenv.vendor.backports.functools_lru_cache import lru_cache
from .backports.functools import partialmethod # type: ignore
from pipenv.vendor.backports.shutil_get_terminal_size import get_terminal_size
from .backports.functools import partialmethod # type: ignore
from .backports.surrogateescape import register_surrogateescape
from collections import (
Mapping,
Hashable,
MutableMapping,
Container,
Iterator,
KeysView,
ItemsView,
MappingView,
Iterable,
Set,
Sequence,
Sized,
ValuesView,
MutableSet,
MutableSequence,
Callable,
)
register_surrogateescape()
NamedTemporaryFile = _NamedTemporaryFile
@@ -76,7 +124,7 @@ if six.PY2: # pragma: no cover
pass
class FileNotFoundError(IOError):
"""No such file or directory"""
"""No such file or directory."""
def __init__(self, *args, **kwargs):
self.errno = errno.ENOENT
@@ -95,7 +143,7 @@ if six.PY2: # pragma: no cover
super(TimeoutError, self).__init__(*args, **kwargs)
class IsADirectoryError(OSError):
"""The command does not work on directories"""
"""The command does not work on directories."""
def __init__(self, *args, **kwargs):
self.errno = errno.EISDIR
@@ -118,24 +166,6 @@ else: # pragma: no cover
)
from io import StringIO
six.add_move(
six.MovedAttribute("Iterable", "collections", "collections.abc")
) # type: ignore
six.add_move(
six.MovedAttribute("Mapping", "collections", "collections.abc")
) # type: ignore
six.add_move(
six.MovedAttribute("Sequence", "collections", "collections.abc")
) # type: ignore
six.add_move(six.MovedAttribute("Set", "collections", "collections.abc")) # type: ignore
six.add_move(
six.MovedAttribute("ItemsView", "collections", "collections.abc")
) # type: ignore
# fmt: off
from six.moves import ItemsView, Iterable, Mapping, Sequence, Set # type: ignore # noqa # isort:skip
# fmt: on
if not sys.warnoptions:
warnings.simplefilter("default", ResourceWarning)
@@ -213,7 +243,7 @@ class TemporaryDirectory(object):
def is_bytes(string):
"""Check if a string is a bytes instance
"""Check if a string is a bytes instance.
:param Union[str, bytes] string: A string that may be string or bytes like
:return: Whether the provided string is a bytes type or not
@@ -227,7 +257,7 @@ def is_bytes(string):
def fs_str(string):
"""Encodes a string into the proper filesystem encoding
"""Encodes a string into the proper filesystem encoding.
Borrowed from pip-tools
"""
@@ -239,8 +269,7 @@ def fs_str(string):
def _get_path(path):
"""
Fetch the string value from a path-like object
"""Fetch the string value from a path-like object.
Returns **None** if there is no string value.
"""
@@ -324,8 +353,7 @@ def _chunks(b, indexes):
def fs_encode(path):
"""
Encode a filesystem path to the proper filesystem encoding
"""Encode a filesystem path to the proper filesystem encoding.
:param Union[str, bytes] path: A string-like path
:returns: A bytes-encoded filesystem path representation
@@ -349,8 +377,7 @@ def fs_encode(path):
def fs_decode(path):
"""
Decode a filesystem path using the proper filesystem encoding
"""Decode a filesystem path using the proper filesystem encoding.
:param path: The filesystem path to decode from bytes or string
:return: The filesystem path, decoded with the determined encoding
@@ -376,17 +403,15 @@ def fs_decode(path):
if sys.version_info[0] < 3: # pragma: no cover
_fs_encode_errors = "surrogateescape"
_fs_encode_errors = "surrogatepass" if sys.platform == "win32" else "surrogateescape"
_fs_decode_errors = "surrogateescape"
_fs_encoding = "utf-8"
else: # pragma: no cover
_fs_encoding = "utf-8"
_fs_decode_errors = "surrogateescape"
if sys.platform.startswith("win"):
_fs_error_fn = None
if sys.version_info[:2] > (3, 4):
alt_strategy = "surrogatepass"
else:
alt_strategy = "surrogateescape"
_fs_encode_errors = "surrogatepass"
else:
if sys.version_info >= (3, 3):
_fs_encoding = sys.getfilesystemencoding()
@@ -394,8 +419,8 @@ else: # pragma: no cover
_fs_encoding = sys.getdefaultencoding()
alt_strategy = "surrogateescape"
_fs_error_fn = getattr(sys, "getfilesystemencodeerrors", None)
_fs_encode_errors = _fs_error_fn() if _fs_error_fn else alt_strategy
_fs_decode_errors = _fs_error_fn() if _fs_error_fn else alt_strategy
_fs_encode_errors = _fs_error_fn() if _fs_error_fn else alt_strategy
_fs_decode_errors = _fs_error_fn() if _fs_error_fn else _fs_decode_errors
_byte = chr if sys.version_info < (3,) else lambda i: bytes([i])
+73 -17
View File
@@ -9,9 +9,34 @@ from contextlib import closing, contextmanager
import six
from .compat import NamedTemporaryFile, Path
from .compat import IS_TYPE_CHECKING, NamedTemporaryFile, Path
from .path import is_file_url, is_valid_url, path_to_url, url_to_path
if IS_TYPE_CHECKING:
from typing import (
Any,
Bytes,
Callable,
ContextManager,
Dict,
IO,
Iterator,
Optional,
Union,
Text,
Tuple,
TypeVar,
)
from types import ModuleType
from requests import Session
from six.moves.http_client import HTTPResponse as Urllib_HTTPResponse
from urllib3.response import HTTPResponse as Urllib3_HTTPResponse
from .spin import VistirSpinner, DummySpinner
TSpinner = Union[VistirSpinner, DummySpinner]
_T = TypeVar("_T")
__all__ = [
"temp_environ",
"temp_path",
@@ -29,6 +54,7 @@ __all__ = [
# See https://github.com/berdario/pew/blob/master/pew/_utils.py#L82
@contextmanager
def temp_environ():
# type: () -> Iterator[None]
"""Allow the ability to set os.environ temporarily"""
environ = dict(os.environ)
try:
@@ -40,17 +66,30 @@ def temp_environ():
@contextmanager
def temp_path():
# type: () -> Iterator[None]
"""A context manager which allows the ability to set sys.path temporarily
>>> path_from_virtualenv = load_path("/path/to/venv/bin/python")
>>> print(sys.path)
['/home/user/.pyenv/versions/3.7.0/bin', '/home/user/.pyenv/versions/3.7.0/lib/python37.zip', '/home/user/.pyenv/versions/3.7.0/lib/python3.7', '/home/user/.pyenv/versions/3.7.0/lib/python3.7/lib-dynload', '/home/user/.pyenv/versions/3.7.0/lib/python3.7/site-packages']
[
'/home/user/.pyenv/versions/3.7.0/bin',
'/home/user/.pyenv/versions/3.7.0/lib/python37.zip',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7/lib-dynload',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7/site-packages'
]
>>> with temp_path():
sys.path = path_from_virtualenv
# Running in the context of the path above
run(["pip", "install", "stuff"])
>>> print(sys.path)
['/home/user/.pyenv/versions/3.7.0/bin', '/home/user/.pyenv/versions/3.7.0/lib/python37.zip', '/home/user/.pyenv/versions/3.7.0/lib/python3.7', '/home/user/.pyenv/versions/3.7.0/lib/python3.7/lib-dynload', '/home/user/.pyenv/versions/3.7.0/lib/python3.7/site-packages']
[
'/home/user/.pyenv/versions/3.7.0/bin',
'/home/user/.pyenv/versions/3.7.0/lib/python37.zip',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7/lib-dynload',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7/site-packages'
]
"""
path = [p for p in sys.path]
@@ -62,6 +101,7 @@ def temp_path():
@contextmanager
def cd(path):
# type: () -> Iterator[None]
"""Context manager to temporarily change working directories
:param str path: The directory to move into
@@ -88,6 +128,7 @@ def cd(path):
@contextmanager
def dummy_spinner(spin_type, text, **kwargs):
# type: (str, str, Any)
class FakeClass(object):
def __init__(self, text=""):
self.text = text
@@ -110,12 +151,13 @@ def dummy_spinner(spin_type, text, **kwargs):
@contextmanager
def spinner(
spinner_name=None,
start_text=None,
handler_map=None,
nospin=False,
write_to_stdout=True,
spinner_name=None, # type: Optional[str]
start_text=None, # type: Optional[str]
handler_map=None, # type: Optional[Dict[str, Callable]]
nospin=False, # type: bool
write_to_stdout=True, # type: bool
):
# type: (...) -> ContextManager[TSpinner]
"""Get a spinner object or a dummy spinner to wrap a context.
:param str spinner_name: A spinner type e.g. "dots" or "bouncingBar" (default: {"bouncingBar"})
@@ -165,6 +207,7 @@ def spinner(
@contextmanager
def atomic_open_for_write(target, binary=False, newline=None, encoding=None):
# type: (str, bool, Optional[str], Optional[str]) -> None
"""Atomically open `target` for writing.
This is based on Lektor's `atomic_open()` utility, but simplified a lot
@@ -173,8 +216,10 @@ def atomic_open_for_write(target, binary=False, newline=None, encoding=None):
:param str target: Target filename to write
:param bool binary: Whether to open in binary mode, default False
:param str newline: The newline character to use when writing, determined from system if not supplied
:param str encoding: The encoding to use when writing, defaults to system encoding
:param Optional[str] newline: The newline character to use when writing, determined
from system if not supplied.
:param Optional[str] encoding: The encoding to use when writing, defaults to system
encoding.
How this works:
@@ -234,7 +279,10 @@ def atomic_open_for_write(target, binary=False, newline=None, encoding=None):
delete=False,
)
# set permissions to 0644
os.chmod(f.name, stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
try:
os.chmod(f.name, stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
except OSError:
pass
try:
yield f
except BaseException:
@@ -254,13 +302,19 @@ def atomic_open_for_write(target, binary=False, newline=None, encoding=None):
@contextmanager
def open_file(link, session=None, stream=True):
def open_file(
link, # type: Union[_T, str]
session=None, # type: Optional[Session]
stream=True, # type: bool
):
# type: (...) -> ContextManager[Union[IO[bytes], Urllib3_HTTPResponse, Urllib_HTTPResponse]]
"""
Open local or remote file for reading.
:type link: pip._internal.index.Link or str
:type session: requests.Session
:param bool stream: Try to stream if remote, default True
:param pip._internal.index.Link link: A link object from resolving dependencies with
pip, or else a URL.
:param Optional[Session] session: A :class:`~requests.Session` instance
:param bool stream: Whether to stream the content if remote, default True
:raises ValueError: If link points to a local directory.
:return: a context manager to the opened file-like object
"""
@@ -286,7 +340,7 @@ def open_file(link, session=None, stream=True):
headers = {"Accept-Encoding": "identity"}
if not session:
try:
from requests import Session
from requests import Session # noqa
except ImportError:
session = None
else:
@@ -302,7 +356,7 @@ def open_file(link, session=None, stream=True):
yield result
finally:
if raw:
conn = getattr(raw, "_connection")
conn = raw._connection
if conn is not None:
conn.close()
result.close()
@@ -310,6 +364,7 @@ def open_file(link, session=None, stream=True):
@contextmanager
def replaced_stream(stream_name):
# type: (str) -> Iterator[IO[Text]]
"""
Context manager to temporarily swap out *stream_name* with a stream wrapper.
@@ -336,6 +391,7 @@ def replaced_stream(stream_name):
@contextmanager
def replaced_streams():
# type: () -> Iterator[Tuple[IO[Text], IO[Text]]]
"""
Context manager to replace both ``sys.stdout`` and ``sys.stderr`` using
``replaced_stream``
+375 -119
View File
@@ -1,19 +1,23 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import atexit
import io
import itertools
import json
import locale
import logging
import os
import subprocess
import sys
import threading
from collections import OrderedDict
from functools import partial
from itertools import islice, tee
from weakref import WeakKeyDictionary
import six
from six.moves.queue import Empty, Queue
from .cmdparse import Script
from .compat import (
@@ -21,6 +25,8 @@ from .compat import (
Path,
StringIO,
TimeoutError,
_fs_decode_errors,
_fs_encode_errors,
fs_str,
is_bytes,
partialmethod,
@@ -58,7 +64,7 @@ __all__ = [
if MYPY_RUNNING:
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, Generator, IO, List, Optional, Text, Tuple, Union
from .spin import VistirSpinner
@@ -66,8 +72,7 @@ def _get_logger(name=None, level="ERROR"):
# type: (Optional[str], str) -> logging.Logger
if not name:
name = __name__
if isinstance(level, six.string_types):
level = getattr(logging, level.upper())
level = getattr(logging, level.upper())
logger = logging.getLogger(name)
logger.setLevel(level)
formatter = logging.Formatter(
@@ -83,8 +88,9 @@ def shell_escape(cmd):
# type: (Union[str, List[str]]) -> str
"""Escape strings for use in :func:`~subprocess.Popen` and :func:`run`.
This is a passthrough method for instantiating a :class:`~vistir.cmdparse.Script`
object which can be used to escape commands to output as a single string.
This is a passthrough method for instantiating a
:class:`~vistir.cmdparse.Script` object which can be used to escape
commands to output as a single string.
"""
cmd = Script.parse(cmd)
return cmd.cmdify()
@@ -92,14 +98,25 @@ def shell_escape(cmd):
def unnest(elem):
# type: (Iterable) -> Any
"""Flatten an arbitrarily nested iterable
"""Flatten an arbitrarily nested iterable.
:param elem: An iterable to flatten
:type elem: :class:`~collections.Iterable`
>>> nested_iterable = (1234, (3456, 4398345, (234234)), (2396, (23895750, 9283798, 29384, (289375983275, 293759, 2347, (2098, 7987, 27599)))))
>>> nested_iterable = (
1234, (3456, 4398345, (234234)), (
2396, (
23895750, 9283798, 29384, (
289375983275, 293759, 2347, (
2098, 7987, 27599
)
)
)
)
)
>>> list(vistir.misc.unnest(nested_iterable))
[1234, 3456, 4398345, 234234, 2396, 23895750, 9283798, 29384, 289375983275, 293759, 2347, 2098, 7987, 27599]
[1234, 3456, 4398345, 234234, 2396, 23895750, 9283798, 29384, 289375983275, 293759,
2347, 2098, 7987, 27599]
"""
if isinstance(elem, Iterable) and not isinstance(elem, six.string_types):
@@ -127,14 +144,19 @@ def _is_iterable(elem):
def dedup(iterable):
# type: (Iterable) -> Iterable
"""Deduplicate an iterable object like iter(set(iterable)) but
order-reserved.
"""
"""Deduplicate an iterable object like iter(set(iterable)) but order-
preserved."""
return iter(OrderedDict.fromkeys(iterable))
def _spawn_subprocess(script, env=None, block=True, cwd=None, combine_stderr=True):
# type: (Union[str, List[str]], Optional[Dict[str, str], bool, Optional[str], bool]) -> subprocess.Popen
def _spawn_subprocess(
script, # type: Union[str, List[str]]
env=None, # type: Optional[Dict[str, str]]
block=True, # type: bool
cwd=None, # type: Optional[Union[str, Path]]
combine_stderr=True, # type: bool
):
# type: (...) -> subprocess.Popen
from distutils.spawn import find_executable
if not env:
@@ -147,6 +169,10 @@ def _spawn_subprocess(script, env=None, block=True, cwd=None, combine_stderr=Tru
"stderr": subprocess.PIPE if not combine_stderr else subprocess.STDOUT,
"shell": False,
}
if sys.version_info[:2] > (3, 5):
options.update({"universal_newlines": True, "encoding": "utf-8"})
elif os.name != "nt":
options["universal_newlines"] = True
if not block:
options["stdin"] = subprocess.PIPE
if cwd:
@@ -170,79 +196,295 @@ def _spawn_subprocess(script, env=None, block=True, cwd=None, combine_stderr=Tru
return subprocess.Popen(script.cmdify(), **options)
def _read_streams(stream_dict):
results = {}
for outstream in stream_dict.keys():
stream = stream_dict[outstream]
if not stream:
results[outstream] = None
continue
line = to_text(stream.readline())
if not line:
results[outstream] = None
continue
line = to_text("{0}".format(line.rstrip()))
results[outstream] = line
return results
class SubprocessStreamWrapper(object):
def __init__(
self,
display_stderr_maxlen=200, # type: int
display_line_for_loops=20, # type: int
subprocess=None, # type: subprocess.Popen
spinner=None, # type: Optional[VistirSpinner]
verbose=False, # type: bool
stdout_allowed=False, # type: bool
):
# type: (...) -> None
if subprocess is not None:
stdout_encoding = self.get_subprocess_encoding(subprocess, "stdout")
stderr_encoding = self.get_subprocess_encoding(subprocess, "stderr")
self.stdout_encoding = stdout_encoding or PREFERRED_ENCODING
self.stderr_encoding = stderr_encoding or PREFERRED_ENCODING
self.stdout_lines = []
self.text_stdout_lines = []
self.stderr_lines = []
self.text_stderr_lines = []
self.display_line = ""
self.display_line_loops_displayed = 0
self.display_line_shown_for_loops = display_line_for_loops
self.display_line_max_len = display_stderr_maxlen
self.spinner = spinner
self.stdout_allowed = stdout_allowed
self.verbose = verbose
self._iterated_stdout = None
self._iterated_stderr = None
self._subprocess = subprocess
self._queues = {
"streams": Queue(),
"lines": Queue(),
}
self._threads = {
stream_name: threading.Thread(
target=self.enqueue_stream,
args=(self._subprocess, stream_name, self._queues["streams"]),
)
for stream_name in ("stdout", "stderr")
}
self._threads["watcher"] = threading.Thread(
target=self.process_output_lines,
args=(self._queues["streams"], self._queues["lines"]),
)
self.start_threads()
def enqueue_stream(self, proc, stream_name, queue):
# type: (subprocess.Popen, str, Queue) -> None
if not getattr(proc, stream_name, None):
queue.put(("stderr", None))
else:
for line in iter(getattr(proc, stream_name).readline, ""):
queue.put((stream_name, line))
getattr(proc, stream_name).close()
def get_stream_results(cmd_instance, verbose, maxlen, spinner=None, stdout_allowed=False):
stream_results = {"stdout": [], "stderr": []}
streams = {"stderr": cmd_instance.stderr, "stdout": cmd_instance.stdout}
while True:
stream_contents = _read_streams(streams)
stdout_line = stream_contents["stdout"]
stderr_line = stream_contents["stderr"]
if not (stdout_line or stderr_line):
break
last_changed = 0
display_line = ""
for stream_name in stream_contents.keys():
if stream_contents[stream_name] and stream_name in stream_results:
line = stream_contents[stream_name]
stream_results[stream_name].append(line)
display_line = (
fs_str("{0}".format(line))
if stream_name == "stderr"
else display_line
)
if display_line and last_changed > 10:
last_changed = 0
display_line = ""
elif display_line:
last_changed += 1
if len(display_line) > maxlen:
display_line = "{0}...".format(display_line[:maxlen])
@property
def stderr(self):
return self._subprocess.stderr
@property
def stdout(self):
return self._subprocess.stdout
@classmethod
def get_subprocess_encoding(cls, cmd_instance, stream_name):
# type: (subprocess.Popen, str) -> Optional[str]
stream = getattr(cmd_instance, stream_name, None)
if stream is not None:
return get_output_encoding(getattr(stream, "encoding", None))
return None
@property
def stdout_iter(self):
if self._iterated_stdout is None and self.stdout:
self._iterated_stdout = iter(self.stdout.readline, "")
return self._iterated_stdout
@property
def stderr_iter(self):
if self._iterated_stderr is None and self.stderr:
self._iterated_stderr = iter(self.stderr.readline, "")
return self._iterated_stderr
def _decode_line(self, line, encoding):
# type: (Union[str, bytes], str) -> str
if isinstance(line, six.binary_type):
line = to_text(
line.decode(encoding, errors=_fs_decode_errors).encode(
"utf-8", errors=_fs_encode_errors
),
errors="backslashreplace",
)
else:
line = to_text(line, encoding=encoding, errors=_fs_encode_errors)
return line
def start_threads(self):
for thread in self._threads.values():
thread.daemon = True
thread.start()
@property
def subprocess(self):
return self._subprocess
@property
def out(self):
# type: () -> str
return getattr(self.subprocess, "out", "")
@out.setter
def out(self, value):
# type: (str) -> None
self._subprocess.out = value
@property
def err(self):
# type: () -> str
return getattr(self.subprocess, "err", "")
@err.setter
def err(self, value):
# type: (str) -> None
self._subprocess.err = value
def poll(self):
# type: () -> Optional[int]
return self.subprocess.poll()
def wait(self, timeout=None):
# type: (self, Optional[int]) -> Optional[int]
kwargs = {}
if sys.version_info[0] >= 3:
kwargs = {"timeout": timeout}
result = self._subprocess.wait(**kwargs)
self.gather_output()
return result
@property
def returncode(self):
# type: () -> Optional[int]
return self.subprocess.returncode
@property
def text_stdout(self):
return os.linesep.join(self.text_stdout_lines)
@property
def text_stderr(self):
return os.linesep.join(self.text_stderr_lines)
@property
def stderr_closed(self):
# type: () -> bool
return self.stderr is None or (self.stderr is not None and self.stderr.closed)
@property
def stdout_closed(self):
# type: () -> bool
return self.stdout is None or (self.stdout is not None and self.stdout.closed)
@property
def running(self):
# type: () -> bool
return any(t.is_alive() for t in self._threads.values()) or not all(
[self.stderr_closed, self.stdout_closed, self.subprocess_finished]
)
@property
def subprocess_finished(self):
if self._subprocess is None:
return False
return (
self._subprocess.poll() is not None or self._subprocess.returncode is not None
)
def update_display_line(self, new_line):
# type: () -> None
if self.display_line:
if new_line != self.display_line:
self.display_line_loops_displayed = 0
new_line = fs_str("{}".format(new_line))
if len(new_line) > self.display_line_max_len:
new_line = "{}...".format(new_line[: self.display_line_max_len])
self.display_line = new_line
elif self.display_line_loops_displayed >= self.display_line_shown_for_loops:
self.display_line = ""
self.display_line_loops_displayed = 0
else:
self.display_line_loops_displayed += 1
return None
@classmethod
def check_line_content(cls, line):
# type: (Optional[str]) -> bool
return line is not None and line != ""
def get_line(self, queue):
# type: (Queue) -> Tuple[Optional[str], ...]
stream, result = None, None
try:
stream, result = queue.get_nowait()
except Empty:
result = None
return stream, result
def process_output_lines(self, recv_queue, line_queue):
# type: (Queue, Queue) -> None
stream, line = self.get_line(recv_queue)
while self.poll() is None or line is not None:
if self.check_line_content(line):
line = to_text("{}".format(line).rstrip())
line_queue.put((stream, line))
stream, line = self.get_line(recv_queue)
def gather_output(self, spinner=None, stdout_allowed=False, verbose=False):
# type: (Optional[VistirSpinner], bool, bool) -> None
if not getattr(self._subprocess, "out", None):
self._subprocess.out = ""
if not getattr(self._subprocess, "err", None):
self._subprocess.err = ""
if not self._queues["streams"].empty():
self.process_output_lines(self._queues["streams"], self._queues["lines"])
while not self._queues["lines"].empty():
try:
stream_name, line = self._queues["lines"].get()
except Empty:
if not self._threads["watcher"].is_active():
break
pass
if stream_name == "stdout":
text_line = self._decode_line(line, self.stdout_encoding)
self.text_stdout_lines.append(text_line)
self.out += "{}\n".format(text_line)
if verbose:
use_stderr = not stdout_allowed or stream_name != "stdout"
if spinner:
target = spinner.stderr if use_stderr else spinner.stdout
spinner.hide_and_write(display_line, target=target)
else:
target = sys.stderr if use_stderr else sys.stdout
target.write(display_line)
target.flush()
if spinner:
spinner.text = to_native_string(
"{0} {1}".format(spinner.text, display_line)
_write_subprocess_result(
line, "stdout", spinner=spinner, stdout_allowed=stdout_allowed
)
continue
return stream_results
else:
text_err = self._decode_line(line, self.stderr_encoding)
self.text_stderr_lines.append(text_err)
self.update_display_line(line)
self.err += "{}\n".format(text_err)
_write_subprocess_result(
line, "stderr", spinner=spinner, stdout_allowed=stdout_allowed
)
if spinner:
spinner.text = to_native_string(
"{} {}".format(spinner.text, self.display_line)
)
self.out = self.out.strip()
self.err = self.err.strip()
def _write_subprocess_result(result, stream_name, spinner=None, stdout_allowed=False):
# type: (str, str, Optional[VistirSpinner], bool) -> None
if not stdout_allowed and stream_name == "stdout":
stream_name = "stderr"
if spinner:
spinner.hide_and_write(result, target=getattr(spinner, stream_name))
else:
target_stream = getattr(sys, stream_name)
target_stream.write(result)
target_stream.flush()
return None
def attach_stream_reader(
cmd_instance, verbose, maxlen, spinner=None, stdout_allowed=False
):
streams = SubprocessStreamWrapper(
subprocess=cmd_instance,
display_stderr_maxlen=maxlen,
spinner=spinner,
verbose=verbose,
stdout_allowed=stdout_allowed,
)
streams.gather_output(spinner=spinner, verbose=verbose, stdout_allowed=stdout_allowed)
return streams
def _handle_nonblocking_subprocess(c, spinner=None):
# type: (subprocess.Popen, VistirSpinner) -> subprocess.Popen
try:
while c.running:
c.wait()
finally:
if c.stdout:
c.stdout.close()
if c.stderr:
c.stderr.close()
if spinner:
if c.returncode > 0:
if c.returncode != 0:
spinner.fail(to_native_string("Failed...cleaning up..."))
if not os.name == "nt":
elif c.returncode == 0 and not os.name == "nt":
spinner.ok(to_native_string("✔ Complete"))
else:
spinner.ok(to_native_string("Complete"))
@@ -284,7 +526,7 @@ def _create_subprocess(
spinner_orig_text = spinner.text
if not spinner_orig_text and start_text is not None:
spinner_orig_text = start_text
stream_results = get_stream_results(
c = attach_stream_reader(
c,
verbose=verbose,
maxlen=display_limit,
@@ -292,10 +534,6 @@ def _create_subprocess(
stdout_allowed=write_to_stdout,
)
_handle_nonblocking_subprocess(c, spinner)
output = stream_results["stdout"]
err = stream_results["stderr"]
c.out = "\n".join(output) if output else ""
c.err = "\n".join(err) if err else ""
else:
try:
c.out, c.err = c.communicate()
@@ -303,10 +541,6 @@ def _create_subprocess(
c.terminate()
c.out, c.err = c.communicate()
raise
if not block:
c.wait()
c.out = to_text("{0}".format(c.out)) if c.out else fs_str("")
c.err = to_text("{0}".format(c.err)) if c.err else fs_str("")
if not return_object:
return c.out.strip(), c.err.strip()
return c
@@ -330,14 +564,19 @@ def run(
:param list cmd: A list representing the command you want to run.
:param dict env: Additional environment settings to pass through to the subprocess.
:param bool return_object: When True, returns the whole subprocess instance
:param bool block: When False, returns a potentially still-running :class:`subprocess.Popen` instance
:param bool block: When False, returns a potentially still-running
:class:`subprocess.Popen` instance
:param str cwd: Current working directory contect to use for spawning the subprocess.
:param bool verbose: Whether to print stdout in real time when non-blocking.
:param bool nospin: Whether to disable the cli spinner.
:param str spinner_name: The name of the spinner to use if enabled, defaults to bouncingBar
:param bool combine_stderr: Optionally merge stdout and stderr in the subprocess, false if nonblocking.
:param int dispay_limit: The max width of output lines to display when using a spinner.
:param bool write_to_stdout: Whether to write to stdout when using a spinner, default True.
:param str spinner_name: The name of the spinner to use if enabled, defaults to
bouncingBar
:param bool combine_stderr: Optionally merge stdout and stderr in the subprocess,
false if nonblocking.
:param int dispay_limit: The max width of output lines to display when using a
spinner.
:param bool write_to_stdout: Whether to write to stdout when using a spinner,
defaults to True.
:returns: A 2-tuple of (output, error) or a :class:`subprocess.Popen` object.
.. Warning:: Merging standard out and standarad error in a nonblocking subprocess
@@ -346,11 +585,13 @@ def run(
"""
_env = os.environ.copy()
_env["PYTHONIOENCODING"] = str("utf-8")
_env["PYTHONUTF8"] = str("1")
if env:
_env.update(env)
if six.PY2:
fs_encode = partial(to_bytes, encoding=locale_encoding)
_env = {fs_encode(k): fs_encode(v) for k, v in _env.items()}
_fs_encode = partial(to_bytes, encoding=locale_encoding)
_env = {_fs_encode(k): _fs_encode(v) for k, v in _env.items()}
else:
_env = {k: fs_str(v) for k, v in _env.items()}
if not spinner_name:
@@ -386,14 +627,21 @@ def run(
def load_path(python):
"""Load the :mod:`sys.path` from the given python executable's environment as json
"""Load the :mod:`sys.path` from the given python executable's environment
as json.
:param str python: Path to a valid python executable
:return: A python representation of the `sys.path` value of the given python executable.
:return: A python representation of the `sys.path` value of the given python
executable.
:rtype: list
>>> load_path("/home/user/.virtualenvs/requirementslib-5MhGuG3C/bin/python")
['', '/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python37.zip', '/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7', '/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7/lib-dynload', '/home/user/.pyenv/versions/3.7.0/lib/python3.7', '/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7/site-packages', '/home/user/git/requirementslib/src']
['', '/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python37.zip',
'/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7',
'/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7/lib-dynload',
'/home/user/.pyenv/versions/3.7.0/lib/python3.7',
'/home/user/.virtualenvs/requirementslib-5MhGuG3C/lib/python3.7/site-packages',
'/home/user/git/requirementslib/src']
"""
python = Path(python).as_posix()
@@ -407,7 +655,7 @@ def load_path(python):
def partialclass(cls, *args, **kwargs):
"""Returns a partially instantiated class
"""Returns a partially instantiated class.
:return: A partial class instance
:rtype: cls
@@ -417,7 +665,15 @@ def partialclass(cls, *args, **kwargs):
<class '__main__.Source'>
>>> source(name="pypi")
>>> source.__dict__
mappingproxy({'__module__': '__main__', '__dict__': <attribute '__dict__' of 'Source' objects>, '__weakref__': <attribute '__weakref__' of 'Source' objects>, '__doc__': None, '__init__': functools.partialmethod(<function Source.__init__ at 0x7f23af429bf8>, , url='https://pypi.org/simple')})
mappingproxy({
'__module__': '__main__',
'__dict__': <attribute '__dict__' of 'Source' objects>,
'__weakref__': <attribute '__weakref__' of 'Source' objects>,
'__doc__': None,
'__init__': functools.partialmethod(
<function Source.__init__ at 0x7f23af429bf8>, , url='https://pypi.org/simple'
)
})
>>> new_source = source(name="pypi")
>>> new_source
<__main__.Source object at 0x7f23af189b38>
@@ -526,8 +782,8 @@ def to_text(string, encoding="utf-8", errors=None):
def divide(n, iterable):
"""
split an iterable into n groups, per https://more-itertools.readthedocs.io/en/latest/api.html#grouping
"""split an iterable into n groups, per https://more-
itertools.readthedocs.io/en/latest/api.html#grouping.
:param int n: Number of unique groups
:param iter iterable: An iterable to split up
@@ -578,11 +834,11 @@ except Exception:
def getpreferredencoding():
"""Determine the proper output encoding for terminal rendering"""
"""Determine the proper output encoding for terminal rendering."""
# Borrowed from Invoke
# (see https://github.com/pyinvoke/invoke/blob/93af29d/invoke/runners.py#L881)
_encoding = locale.getpreferredencoding(False)
_encoding = sys.getdefaultencoding() or locale.getpreferredencoding(False)
if six.PY2 and not sys.platform == "win32":
_default_encoding = locale.getdefaultlocale()[1]
if _default_encoding is not None:
@@ -594,8 +850,7 @@ PREFERRED_ENCODING = getpreferredencoding()
def get_output_encoding(source_encoding):
"""
Given a source encoding, determine the preferred output encoding.
"""Given a source encoding, determine the preferred output encoding.
:param str source_encoding: The encoding of the source material.
:returns: The output encoding to decode to.
@@ -630,11 +885,13 @@ def _encode(output, encoding=None, errors=None, translation_map=None):
def decode_for_output(output, target_stream=None, translation_map=None):
"""Given a string, decode it for output to a terminal
"""Given a string, decode it for output to a terminal.
:param str output: A string to print to a terminal
:param target_stream: A stream to write to, we will encode to target this stream if possible.
:param dict translation_map: A mapping of unicode character ordinals to replacement strings.
:param target_stream: A stream to write to, we will encode to target this stream if
possible.
:param dict translation_map: A mapping of unicode character ordinals to replacement
strings.
:return: A re-encoded string using the preferred encoding
:rtype: str
"""
@@ -657,8 +914,7 @@ def decode_for_output(output, target_stream=None, translation_map=None):
def get_canonical_encoding_name(name):
# type: (str) -> str
"""
Given an encoding name, get the canonical name from a codec lookup.
"""Given an encoding name, get the canonical name from a codec lookup.
:param str name: The name of the codec to lookup
:return: The canonical version of the codec name
@@ -696,8 +952,8 @@ def _get_binary_buffer(stream):
def get_wrapped_stream(stream, encoding=None, errors="replace"):
"""
Given a stream, wrap it in a `StreamWrapper` instance and return the wrapped stream.
"""Given a stream, wrap it in a `StreamWrapper` instance and return the
wrapped stream.
:param stream: A stream instance to wrap
:param str encoding: The encoding to use for the stream
@@ -712,7 +968,7 @@ def get_wrapped_stream(stream, encoding=None, errors="replace"):
if stream is not None and encoding is None:
encoding = "utf-8"
if not encoding:
encoding = get_output_encoding(stream)
encoding = get_output_encoding(getattr(stream, "encoding", None))
else:
encoding = get_canonical_encoding_name(encoding)
return StreamWrapper(stream, encoding, errors, line_buffering=True)
@@ -720,10 +976,8 @@ def get_wrapped_stream(stream, encoding=None, errors="replace"):
class StreamWrapper(io.TextIOWrapper):
"""
This wrapper class will wrap a provided stream and supply an interface
for compatibility.
"""
"""This wrapper class will wrap a provided stream and supply an interface
for compatibility."""
def __init__(self, stream, encoding, errors, line_buffering=True, **kwargs):
self._stream = stream = _StreamProvider(stream)
@@ -907,7 +1161,7 @@ def _cached_stream_lookup(stream_lookup_func, stream_resolution_func):
def get_text_stream(stream="stdout", encoding=None):
"""Retrieve a unicode stream wrapper around **sys.stdout** or **sys.stderr**.
"""Retrieve a utf-8 stream wrapper around **sys.stdout** or **sys.stderr**.
:param str stream: The name of the stream to wrap from the :mod:`sys` module.
:param str encoding: An optional encoding to use.
@@ -959,7 +1213,8 @@ TEXT_STREAMS = {
def replace_with_text_stream(stream_name):
"""Given a stream name, replace the target stream with a text-converted equivalent
"""Given a stream name, replace the target stream with a text-converted
equivalent.
:param str stream_name: The name of a target stream, such as **stdout** or **stderr**
:return: None
@@ -984,7 +1239,8 @@ def _can_use_color(stream=None, color=None):
def echo(text, fg=None, bg=None, style=None, file=None, err=False, color=None):
"""Write the given text to the provided stream or **sys.stdout** by default.
"""Write the given text to the provided stream or **sys.stdout** by
default.
Provides optional foreground and background colors from the ansi defaults:
**grey**, **red**, **green**, **yellow**, **blue**, **magenta**, **cyan**
@@ -1002,7 +1258,7 @@ def echo(text, fg=None, bg=None, style=None, file=None, err=False, color=None):
"""
if file and not hasattr(file, "write"):
raise TypeError("Expected a writable stream, received {0!r}".format(file))
raise TypeError("Expected a writable stream, received {!r}".format(file))
if not file:
if err:
file = _text_stderr()
+97 -77
View File
@@ -8,7 +8,9 @@ import os
import posixpath
import shutil
import stat
import sys
import time
import unicodedata
import warnings
import six
@@ -39,7 +41,27 @@ else:
if IS_TYPE_CHECKING:
from typing import Optional, Callable, Text, ByteString, AnyStr
from types import TracebackType
from typing import (
Any,
AnyStr,
ByteString,
Callable,
Generator,
Iterator,
List,
Optional,
Text,
Tuple,
Type,
Union,
)
if six.PY3:
TPath = os.PathLike
else:
TPath = Union[str, bytes]
TFunc = Callable[..., Any]
__all__ = [
"check_for_unc_path",
@@ -72,16 +94,18 @@ if os.name == "nt":
def unicode_path(path):
# type: (TPath) -> Text
# Paths are supposed to be represented as unicode here
if six.PY2 and not isinstance(path, six.text_type):
if six.PY2 and isinstance(path, six.binary_type):
return path.decode(_fs_encoding)
return path
def native_path(path):
if six.PY2 and not isinstance(path, bytes):
# type: (TPath) -> str
if six.PY2 and isinstance(path, six.text_type):
return path.encode(_fs_encoding)
return path
return str(path)
# once again thank you django...
@@ -91,20 +115,18 @@ if six.PY3 or os.name == "nt":
else:
def abspathu(path):
"""
Version of os.path.abspath that uses the unicode representation
of the current working directory, thus avoiding a UnicodeDecodeError
in join when the cwd has non-ASCII characters.
"""
# type: (TPath) -> Text
"""Version of os.path.abspath that uses the unicode representation of
the current working directory, thus avoiding a UnicodeDecodeError in
join when the cwd has non-ASCII characters."""
if not os.path.isabs(path):
path = os.path.join(os.getcwdu(), path)
return os.path.normpath(path)
def normalize_path(path):
# type: (AnyStr) -> AnyStr
"""
Return a case-normalized absolute variable-expanded path.
# type: (TPath) -> Text
"""Return a case-normalized absolute variable-expanded path.
:param str path: The non-normalized path
:return: A normalized, expanded, case-normalized path
@@ -121,9 +143,8 @@ def normalize_path(path):
def is_in_path(path, parent):
# type: (AnyStr, AnyStr) -> bool
"""
Determine if the provided full path is in the given parent root.
# type: (TPath, TPath) -> bool
"""Determine if the provided full path is in the given parent root.
:param str path: The full path to check the location of.
:param str parent: The parent path to check for membership in
@@ -131,11 +152,11 @@ def is_in_path(path, parent):
:rtype: bool
"""
return normalize_path(str(path)).startswith(normalize_path(str(parent)))
return normalize_path(path).startswith(normalize_path(parent))
def normalize_drive(path):
# type: (str) -> Text
# type: (TPath) -> Text
"""Normalize drive in path so they stay consistent.
This currently only affects local drives on Windows, which can be
@@ -144,8 +165,10 @@ def normalize_drive(path):
"""
from .misc import to_text
if os.name != "nt" or not isinstance(path, six.string_types):
return path
if os.name != "nt" or not (
isinstance(path, six.string_types) or getattr(path, "__fspath__", None)
):
return path # type: ignore
drive, tail = os.path.splitdrive(path)
# Only match (lower cased) local drives (e.g. 'c:'), not UNC mounts.
@@ -156,7 +179,7 @@ def normalize_drive(path):
def path_to_url(path):
# type: (str) -> Text
# type: (TPath) -> Text
"""Convert the supplied local path to a file uri.
:param str path: A string pointing to or representing a local path
@@ -169,7 +192,7 @@ def path_to_url(path):
from .misc import to_bytes
if not path:
return path
return path # type: ignore
normalized_path = Path(normalize_drive(os.path.abspath(path))).as_posix()
if os.name == "nt" and normalized_path[1] == ":":
drive, _, path = normalized_path.partition(":")
@@ -177,18 +200,17 @@ def path_to_url(path):
# XXX: actually part of a surrogate pair, but were just incidentally
# XXX: passed in as a piece of a filename
quoted_path = quote(fs_encode(path))
return fs_decode("file:///{0}:{1}".format(drive, quoted_path))
return fs_decode("file:///{}:{}".format(drive, quoted_path))
# XXX: This is also here to help deal with incidental dangling surrogates
# XXX: on linux, by making sure they are preserved during encoding so that
# XXX: we can urlencode the backslash correctly
bytes_path = to_bytes(normalized_path, errors="backslashreplace")
return fs_decode("file://{0}".format(quote(bytes_path)))
return fs_decode("file://{}".format(quote(bytes_path)))
def url_to_path(url):
# type: (str) -> ByteString
"""
Convert a valid file url to a local filesystem path
# type: (str) -> str
"""Convert a valid file url to a local filesystem path.
Follows logic taken from pip's equivalent function
"""
@@ -204,37 +226,41 @@ def url_to_path(url):
def is_valid_url(url):
"""Checks if a given string is an url"""
# type: (Union[str, bytes]) -> bool
"""Checks if a given string is an url."""
from .misc import to_text
if not url:
return url
return url # type: ignore
pieces = urllib_parse.urlparse(to_text(url))
return all([pieces.scheme, pieces.netloc])
def is_file_url(url):
"""Returns true if the given url is a file url"""
# type: (Any) -> bool
"""Returns true if the given url is a file url."""
from .misc import to_text
if not url:
return False
if not isinstance(url, six.string_types):
try:
url = getattr(url, "url")
url = url.url
except AttributeError:
raise ValueError("Cannot parse url from unknown type: {0!r}".format(url))
raise ValueError("Cannot parse url from unknown type: {!r}".format(url))
url = to_text(url, encoding="utf-8")
return urllib_parse.urlparse(url.lower()).scheme == "file"
def is_readonly_path(fn):
# type: (TPath) -> bool
"""Check if a provided path exists and is readonly.
Permissions check is `bool(path.stat & stat.S_IREAD)` or `not os.access(path, os.W_OK)`
Permissions check is `bool(path.stat & stat.S_IREAD)` or `not
os.access(path, os.W_OK)`
"""
fn = fs_encode(fn)
fn = fs_decode(fs_encode(fn))
if os.path.exists(fn):
file_stat = os.stat(fn).st_mode
return not bool(file_stat & stat.S_IWRITE) or not os.access(fn, os.W_OK)
@@ -242,47 +268,35 @@ def is_readonly_path(fn):
def mkdir_p(newdir, mode=0o777):
"""Recursively creates the target directory and all of its parents if they do not
already exist. Fails silently if they do.
# type: (TPath, int) -> None
"""Recursively creates the target directory and all of its parents if they
do not already exist. Fails silently if they do.
:param str newdir: The directory path to ensure
:raises: OSError if a file is encountered along the way
"""
# http://code.activestate.com/recipes/82465-a-friendly-mkdir/
newdir = fs_encode(newdir)
newdir = fs_decode(fs_encode(newdir))
if os.path.exists(newdir):
if not os.path.isdir(newdir):
raise OSError(
"a file with the same name as the desired dir, '{0}', already exists.".format(
"a file with the same name as the desired dir, '{}', already exists.".format(
fs_decode(newdir)
)
)
else:
head, tail = os.path.split(newdir)
# Make sure the tail doesn't point to the asame place as the head
curdir = fs_encode(".")
tail_and_head_match = (
os.path.relpath(tail, start=os.path.basename(head)) == curdir
)
if tail and not tail_and_head_match and not os.path.isdir(newdir):
target = os.path.join(head, tail)
if os.path.exists(target) and os.path.isfile(target):
raise OSError(
"A file with the same name as the desired dir, '{0}', already exists.".format(
fs_decode(newdir)
)
)
os.makedirs(os.path.join(head, tail), mode)
return None
os.makedirs(newdir, mode)
def ensure_mkdir_p(mode=0o777):
"""Decorator to ensure `mkdir_p` is called to the function's return value.
"""
# type: (int) -> Callable[Callable[..., Any], Callable[..., Any]]
"""Decorator to ensure `mkdir_p` is called to the function's return
value."""
def decorator(f):
# type: (Callable[..., Any]) -> Callable[..., Any]
@functools.wraps(f)
def decorated(*args, **kwargs):
# type: () -> str
path = f(*args, **kwargs)
mkdir_p(path, mode=mode)
return path
@@ -296,6 +310,7 @@ TRACKED_TEMPORARY_DIRECTORIES = []
def create_tracked_tempdir(*args, **kwargs):
# type: (Any, Any) -> str
"""Create a tracked temporary directory.
This uses `TemporaryDirectory`, but does not remove the directory when
@@ -313,6 +328,7 @@ def create_tracked_tempdir(*args, **kwargs):
def create_tracked_tempfile(*args, **kwargs):
# type: (Any, Any) -> str
"""Create a tracked temporary file.
This uses the `NamedTemporaryFile` construct, but does not remove the file
@@ -326,6 +342,7 @@ def create_tracked_tempfile(*args, **kwargs):
def _find_icacls_exe():
# type: () -> Optional[Text]
if os.name == "nt":
paths = [
os.path.expandvars(r"%windir%\{0}").format(subdir)
@@ -343,15 +360,14 @@ def _find_icacls_exe():
def set_write_bit(fn):
# type: (str) -> None
"""
Set read-write permissions for the current user on the target path. Fail silently
if the path doesn't exist.
"""Set read-write permissions for the current user on the target path. Fail
silently if the path doesn't exist.
:param str fn: The target filename or path
:return: None
"""
fn = fs_encode(fn)
fn = fs_decode(fs_encode(fn))
if not os.path.exists(fn):
return
file_stat = os.stat(fn).st_mode
@@ -367,9 +383,9 @@ def set_write_bit(fn):
c = run(
[
icacls_exe,
"''{0}''".format(fn),
"''{}''".format(fn),
"/grant",
"{0}:WD".format(user_sid),
"{}:WD".format(user_sid),
"/T",
"/C",
"/Q",
@@ -396,8 +412,7 @@ def set_write_bit(fn):
def rmtree(directory, ignore_errors=False, onerror=None):
# type: (str, bool, Optional[Callable]) -> None
"""
Stand-in for :func:`~shutil.rmtree` with additional error-handling.
"""Stand-in for :func:`~shutil.rmtree` with additional error-handling.
This version of `rmtree` handles read-only paths, especially in the case of index
files written by certain source control systems.
@@ -411,20 +426,20 @@ def rmtree(directory, ignore_errors=False, onerror=None):
Setting `ignore_errors=True` may cause this to silently fail to delete the path
"""
directory = fs_encode(directory)
directory = fs_decode(fs_encode(directory))
if onerror is None:
onerror = handle_remove_readonly
try:
shutil.rmtree(directory, ignore_errors=ignore_errors, onerror=onerror)
except (IOError, OSError, FileNotFoundError, PermissionError) as exc:
except (IOError, OSError, FileNotFoundError, PermissionError) as exc: # noqa:B014
# Ignore removal failures where the file doesn't exist
if exc.errno != errno.ENOENT:
raise
def _wait_for_files(path): # pragma: no cover
"""
Retry with backoff up to 1 second to delete files from a directory.
# type: (Union[str, TPath]) -> Optional[List[TPath]]
"""Retry with backoff up to 1 second to delete files from a directory.
:param str path: The path to crawl to delete files from
:return: A list of remaining paths or None
@@ -446,7 +461,7 @@ def _wait_for_files(path): # pragma: no cover
except FileNotFoundError as e:
if e.errno == errno.ENOENT:
return
except (OSError, IOError, PermissionError):
except (OSError, IOError, PermissionError): # noqa:B014
time.sleep(timeout)
timeout *= 2
remaining.append(path)
@@ -456,6 +471,7 @@ def _wait_for_files(path): # pragma: no cover
def handle_remove_readonly(func, path, exc):
# type: (Callable[..., str], TPath, Tuple[Type[OSError], OSError, TracebackType]) -> None
"""Error handler for shutil.rmtree.
Windows source repo folders are read-only by default, so this error handler
@@ -480,7 +496,7 @@ def handle_remove_readonly(func, path, exc):
set_write_bit(path)
try:
func(path)
except (
except ( # noqa:B014
OSError,
IOError,
FileNotFoundError,
@@ -503,7 +519,7 @@ def handle_remove_readonly(func, path, exc):
remaining = _wait_for_files(path)
try:
func(path)
except (OSError, IOError, FileNotFoundError, PermissionError) as e:
except (OSError, IOError, FileNotFoundError, PermissionError) as e: # noqa:B014
if e.errno in PERM_ERRORS:
if e.errno != errno.ENOENT: # File still exists
warnings.warn(default_warning_message.format(path), ResourceWarning)
@@ -513,10 +529,12 @@ def handle_remove_readonly(func, path, exc):
def walk_up(bottom):
# type: (Union[TPath, str]) -> Generator[Tuple[str, List[str], List[str]], None, None]
"""Mimic os.walk, but walk 'up' instead of down the directory tree.
From: https://gist.github.com/zdavkeos/1098474
"""
bottom = os.path.realpath(bottom)
bottom = os.path.realpath(str(bottom))
# Get files in current dir.
try:
names = os.listdir(bottom)
@@ -541,7 +559,8 @@ def walk_up(bottom):
def check_for_unc_path(path):
""" Checks to see if a pathlib `Path` object is a unc path or not"""
# type: (Path) -> bool
"""Checks to see if a pathlib `Path` object is a unc path or not."""
if (
os.name == "nt"
and len(path.drive) > 2
@@ -554,6 +573,7 @@ def check_for_unc_path(path):
def get_converted_relative_path(path, relative_to=None):
# type: (TPath, Optional[TPath]) -> str
"""Convert `path` to be relative.
Given a vague relative path, return the path relative to the given
@@ -609,11 +629,11 @@ def get_converted_relative_path(path, relative_to=None):
def safe_expandvars(value):
"""Call os.path.expandvars if value is a string, otherwise do nothing.
"""
# type: (TPath) -> str
"""Call os.path.expandvars if value is a string, otherwise do nothing."""
if isinstance(value, six.string_types):
return os.path.expandvars(value)
return value
return value # type: ignore
class _TrackedTempfileWrapper(_TemporaryFileWrapper, object):
+52 -2
View File
@@ -12,11 +12,31 @@ from io import StringIO
import colorama
import six
from .compat import to_native_string
from .compat import IS_TYPE_CHECKING, to_native_string
from .cursor import hide_cursor, show_cursor
from .misc import decode_for_output, to_text
from .termcolors import COLOR_MAP, COLORS, DISABLE_COLORS, colored
if IS_TYPE_CHECKING:
from typing import (
Any,
Callable,
ContextManager,
Dict,
IO,
Optional,
Text,
Type,
TypeVar,
Union,
)
TSignalMap = Dict[
Type[signal.SIGINT],
Callable[..., int, str, Union["DummySpinner", "VistirSpinner"]],
]
_T = TypeVar("_T", covariant=True)
try:
import yaspin
except ImportError: # pragma: no cover
@@ -66,6 +86,7 @@ decode_output = functools.partial(decode_for_output, translation_map=TRANSLATION
class DummySpinner(object):
def __init__(self, text="", **kwargs):
# type: (str, Any) -> None
if DISABLE_COLORS:
colorama.init()
self.text = to_native_string(decode_output(text)) if text else ""
@@ -108,6 +129,7 @@ class DummySpinner(object):
pass
def fail(self, exitcode=1, text="FAIL"):
# type: (int, str) -> None
if text is not None and text != "None":
if self.write_to_stdout:
self.write(text)
@@ -116,6 +138,7 @@ class DummySpinner(object):
self._close_output_buffer()
def ok(self, text="OK"):
# type: (str) -> int
if text is not None and text != "None":
if self.write_to_stdout:
self.write(text)
@@ -125,6 +148,7 @@ class DummySpinner(object):
return 0
def hide_and_write(self, text, target=None):
# type: (str, Optional[str]) -> None
if not target:
target = self.stdout
if text is None or isinstance(text, six.string_types) and text == "None":
@@ -136,6 +160,7 @@ class DummySpinner(object):
self._show_cursor(target=target)
def write(self, text=None):
# type: (Optional[str]) -> None
if not self.write_to_stdout:
return self.write_err(text)
if text is None or isinstance(text, six.string_types) and text == "None":
@@ -151,6 +176,7 @@ class DummySpinner(object):
stdout.write(CLEAR_LINE)
def write_err(self, text=None):
# type: (Optional[str]) -> None
if text is None or isinstance(text, six.string_types) and text == "None":
pass
text = to_text(text)
@@ -168,10 +194,12 @@ class DummySpinner(object):
@staticmethod
def _hide_cursor(target=None):
# type: (Optional[IO]) -> None
pass
@staticmethod
def _show_cursor(target=None):
# type: (Optional[IO]) -> None
pass
@@ -183,6 +211,7 @@ class VistirSpinner(SpinBase):
"A spinner class for handling spinners on windows and posix."
def __init__(self, *args, **kwargs):
# type: (Any, Any)
"""
Get a spinner object or a dummy spinner to wrap a context.
@@ -196,7 +225,7 @@ class VistirSpinner(SpinBase):
self.handler = handler
colorama.init()
sigmap = {}
sigmap = {} # type: TSignalMap
if handler:
sigmap.update({signal.SIGINT: handler, signal.SIGTERM: handler})
handler_map = kwargs.pop("handler_map", {})
@@ -218,11 +247,15 @@ class VistirSpinner(SpinBase):
self.out_buff = StringIO()
self.write_to_stdout = write_to_stdout
self.is_dummy = bool(yaspin is None)
self._stop_spin = None # type: Optional[threading.Event]
self._hide_spin = None # type: Optional[threading.Event]
self._spin_thread = None # type: Optional[threading.Thread]
super(VistirSpinner, self).__init__(*args, **kwargs)
if DISABLE_COLORS:
colorama.deinit()
def ok(self, text=u"OK", err=False):
# type: (str, bool) -> None
"""Set Ok (success) finalizer to a spinner."""
# Do not display spin text for ok state
self._text = None
@@ -232,6 +265,7 @@ class VistirSpinner(SpinBase):
self._freeze(_text, err=err)
def fail(self, text=u"FAIL", err=False):
# type: (str, bool) -> None
"""Set fail finalizer to a spinner."""
# Do not display spin text for fail state
self._text = None
@@ -241,6 +275,7 @@ class VistirSpinner(SpinBase):
self._freeze(_text, err=err)
def hide_and_write(self, text, target=None):
# type: (str, Optional[str]) -> None
if not target:
target = self.stdout
if text is None or isinstance(text, six.string_types) and text == u"None":
@@ -252,6 +287,7 @@ class VistirSpinner(SpinBase):
self._show_cursor(target=target)
def write(self, text): # pragma: no cover
# type: (str) -> None
if not self.write_to_stdout:
return self.write_err(text)
stdout = self.stdout
@@ -266,6 +302,7 @@ class VistirSpinner(SpinBase):
self.out_buff.write(text)
def write_err(self, text): # pragma: no cover
# type: (str) -> None
"""Write error text in the terminal without breaking the spinner."""
stderr = self.stderr
if self.stderr.closed:
@@ -279,6 +316,7 @@ class VistirSpinner(SpinBase):
self.out_buff.write(decode_output(text, target_stream=self.out_buff))
def start(self):
# type: () -> None
if self._sigmap:
self._register_signal_handlers()
@@ -292,6 +330,7 @@ class VistirSpinner(SpinBase):
self._spin_thread.start()
def stop(self):
# type: () -> None
if self._dfl_sigmap:
# Reset registered signal handlers to default ones
self._reset_signal_handlers()
@@ -314,6 +353,7 @@ class VistirSpinner(SpinBase):
self.out_buff.close()
def _freeze(self, final_text, err=False):
# type: (str, bool) -> None
"""Stop spinner, compose last frame and 'freeze' it."""
if not final_text:
final_text = ""
@@ -330,12 +370,14 @@ class VistirSpinner(SpinBase):
target.write(self._last_frame)
def _compose_color_func(self):
# type: () -> Callable[..., str]
fn = functools.partial(
colored, color=self._color, on_color=self._on_color, attrs=list(self._attrs)
)
return fn
def _compose_out(self, frame, mode=None):
# type: (str, Optional[str]) -> Text
# Ensure Unicode input
frame = to_text(frame)
@@ -355,6 +397,7 @@ class VistirSpinner(SpinBase):
return out
def _spin(self):
# type: () -> None
target = self.stdout if self.write_to_stdout else self.stderr
clear_fn = self._clear_line if self.write_to_stdout else self._clear_err
while not self._stop_spin.is_set():
@@ -379,6 +422,7 @@ class VistirSpinner(SpinBase):
target.write("\b")
def _register_signal_handlers(self):
# type: () -> None
# SIGKILL cannot be caught or ignored, and the receiving
# process cannot perform any clean-up upon receiving this
# signal.
@@ -411,31 +455,37 @@ class VistirSpinner(SpinBase):
signal.signal(sig, sig_handler)
def _reset_signal_handlers(self):
# type: () -> None
for sig, sig_handler in self._dfl_sigmap.items():
signal.signal(sig, sig_handler)
@staticmethod
def _hide_cursor(target=None):
# type: (Optional[IO]) -> None
if not target:
target = sys.stdout
hide_cursor(stream=target)
@staticmethod
def _show_cursor(target=None):
# type: (Optional[IO]) -> None
if not target:
target = sys.stdout
show_cursor(stream=target)
@staticmethod
def _clear_err():
# type: () -> None
sys.stderr.write(CLEAR_LINE)
@staticmethod
def _clear_line():
# type: () -> None
sys.stdout.write(CLEAR_LINE)
def create_spinner(*args, **kwargs):
# type: (Any, Any) -> Union[DummySpinner, VistirSpinner]
nospin = kwargs.pop("nospin", False)
use_yaspin = kwargs.pop("use_yaspin", not nospin)
if nospin or not use_yaspin: