Update vendored deps

Signed-off-by: Dan Ryan <dan@danryan.co>
This commit is contained in:
Dan Ryan
2019-04-18 13:50:48 -04:00
parent 6c62d23cfc
commit ff4c8b0d5c
27 changed files with 2867 additions and 985 deletions
+5 -17
View File
@@ -36,23 +36,11 @@ try:
except Exception:
pass
from .vendor.vistir.misc import get_wrapped_stream
if sys.version_info >= (3, 0):
stdout = sys.stdout.buffer
stderr = sys.stderr.buffer
else:
stdout = sys.stdout
stderr = sys.stderr
sys.stderr = get_wrapped_stream(stderr)
sys.stdout = get_wrapped_stream(stdout)
from .vendor.colorama import AnsiToWin32
if os.name == "nt":
stderr_wrapper = AnsiToWin32(sys.stderr, autoreset=False, convert=None, strip=None)
stdout_wrapper = AnsiToWin32(sys.stdout, autoreset=False, convert=None, strip=None)
sys.stderr = stderr_wrapper.stream
sys.stdout = stdout_wrapper.stream
from .vendor.vistir.misc import replace_with_text_stream
from .vendor import colorama
replace_with_text_stream("stdout")
replace_with_text_stream("stderr")
colorama.init(wrap=False)
from .cli import cli
from . import resolver
+6 -4
View File
@@ -442,7 +442,7 @@ class Environment(object):
yield new_node
def reverse_dependencies(self):
from vistir.misc import unnest
from vistir.misc import unnest, chunked
rdeps = {}
for req in self.get_package_requirements():
for d in self.reverse_dependency(req):
@@ -454,18 +454,20 @@ class Environment(object):
"required": d["required_version"]
}
}
parents = set(d.get("parent", []))
parents = tuple(d.get("parent", ()))
pkg[name]["parents"] = parents
if rdeps.get(name):
if not (rdeps[name].get("required") or rdeps[name].get("installed")):
rdeps[name].update(pkg[name])
rdeps[name]["parents"] = rdeps[name].get("parents", set()) | parents
rdeps[name]["parents"] = rdeps[name].get("parents", ()) + parents
else:
rdeps[name] = pkg[name]
for k in list(rdeps.keys()):
entry = rdeps[k]
if entry.get("parents"):
rdeps[k]["parents"] = set([p for p in unnest(entry["parents"])])
rdeps[k]["parents"] = set([
p for p, version in chunked(2, unnest(entry["parents"]))
])
return rdeps
def get_working_set(self):
+171 -50
View File
@@ -133,12 +133,104 @@ class Entry(object):
del entry_dict["name"]
return entry_dict
def get_cleaned_dict(self):
if self.is_updated:
@classmethod
def parse_pyparsing_exprs(cls, expr_iterable):
from pipenv.vendor.pyparsing import Literal, MatchFirst
keys = []
expr_list = []
expr = expr_iterable.copy()
if isinstance(expr, Literal) or (
expr.__class__.__name__ == Literal.__name__
):
keys.append(expr.match)
elif isinstance(expr, MatchFirst) or (
expr.__class__.__name__ == MatchFirst.__name__
):
expr_list = expr.exprs
elif isinstance(expr, list):
expr_list = expr
if expr_list:
for part in expr_list:
keys.extend(cls.parse_pyparsing_exprs(part))
return keys
@classmethod
def get_markers_from_dict(cls, entry_dict):
from pipenv.vendor.packaging import markers as packaging_markers
from pipenv.vendor.requirementslib.models.markers import normalize_marker_str
marker_keys = cls.parse_pyparsing_exprs(packaging_markers.VARIABLE)
markers = set()
keys_in_dict = [k for k in marker_keys if k in entry_dict]
markers = {
normalize_marker_str("{k} {v}".format(k=k, v=entry_dict.pop(k)))
for k in keys_in_dict
}
if "markers" in entry_dict:
markers.add(normalize_marker_str(entry_dict["markers"]))
if None in markers:
markers.remove(None)
if markers:
entry_dict["markers"] = " and ".join(list(markers))
else:
markers = None
return markers, entry_dict
@property
def markers(self):
self._markers, self.entry_dict = self.get_markers_from_dict(self.entry_dict)
return self._markers
@markers.setter
def markers(self, markers):
if not markers:
marker_str = self.marker_to_str(markers)
if marker_str:
self._entry = self.entry.merge_markers(marker_str)
self._markers = self.marker_to_str(self._entry.markers)
entry_dict = self.entry_dict.copy()
entry_dict["markers"] = self.marker_to_str(self._entry.markers)
self.entry_dict = entry_dict
@property
def original_markers(self):
original_markers, lockfile_dict = self.get_markers_from_dict(
self.lockfile_dict
)
self.lockfile_dict = lockfile_dict
self._original_markers = self.marker_to_str(original_markers)
return self._original_markers
@staticmethod
def marker_to_str(marker):
from pipenv.vendor.requirementslib.models.markers import normalize_marker_str
if not marker:
return None
from pipenv.vendor import six
from pipenv.vendor.vistir.compat import Mapping
marker_str = None
if isinstance(marker, Mapping):
marker_dict, _ = Entry.get_markers_from_dict(marker)
if marker_dict:
marker_str = "{0}".format(marker_dict.popitem()[1])
elif isinstance(marker, (list, set, tuple)):
marker_str = " and ".join([normalize_marker_str(m) for m in marker if m])
elif isinstance(marker, six.string_types):
marker_str = "{0}".format(normalize_marker_str(marker))
if isinstance(marker_str, six.string_types):
return marker_str
return None
def get_cleaned_dict(self, keep_outdated=False):
if keep_outdated and self.is_updated:
self.validate_constraints()
self.ensure_least_updates_possible()
elif not keep_outdated:
self.validate_constraints()
if self.entry.extras != self.lockfile_entry.extras:
self._entry.req.extras.extend(self.lockfile_entry.req.extras)
entry_extras = list(self.entry.extras)
if self.lockfile_entry.extras:
entry_extras.extend(list(self.lockfile_entry.extras))
self._entry.req.extras = entry_extras
self.entry_dict["extras"] = self.entry.extras
entry_hashes = set(self.entry.hashes)
locked_hashes = set(self.lockfile_entry.hashes)
@@ -202,10 +294,10 @@ class Entry(object):
def clean_specifier(specifier):
from pipenv.vendor.packaging.specifiers import Specifier
if not any(specifier.startswith(k) for k in Specifier._operators.keys()):
if specifier.strip().lower() in ["any", "*"]:
if specifier.strip().lower() in ["any", "<any>", "*"]:
return "*"
specifier = "=={0}".format(specifier)
elif specifier.startswith("==") and specifier.count("=") > 2:
elif specifier.startswith("==") and specifier.count("=") > 3:
specifier = "=={0}".format(specifier.lstrip("="))
return specifier
@@ -255,7 +347,7 @@ class Entry(object):
if not self._requires:
self._requires = next(iter(
self.project.environment.get_package_requirements(self.name)
), None)
), {})
return self._requires
@property
@@ -284,21 +376,25 @@ class Entry(object):
return True
def get_dependency(self, name):
return next(iter(
dep for dep in self.requirements.get("dependencies", [])
if dep.get("package_name", "") == name
), {})
if self.requirements:
return next(iter(
dep for dep in self.requirements.get("dependencies", [])
if dep and dep.get("package_name", "") == name
), {})
return {}
def get_parent_deps(self, unnest=False):
from pipenv.vendor.packaging.specifiers import Specifier
parents = []
for spec in self.reverse_deps.get(self.normalized_name, {}).get("parents", set()):
spec_index = next(iter(c for c in Specifier._operators if c in spec), None)
spec_match = next(iter(c for c in Specifier._operators if c in spec), None)
name = spec
parent = None
if spec_index is not None:
specifier = self.clean_specifier(spec[spec_index:])
name = spec[:spec_index]
if spec_match is not None:
spec_index = spec.index(spec_match)
specifier = self.clean_specifier(spec[spec_index:len(spec_match)]).strip()
name_start = spec_index + len(spec_match)
name = spec[name_start:].strip()
parent = self.create_parent(name, specifier)
else:
name = spec
@@ -373,7 +469,7 @@ class Entry(object):
if c and c.name == self.entry.name
}
pipfile_constraint = self.get_pipfile_constraint()
if pipfile_constraint:
if pipfile_constraint and not (self.pipfile_entry.editable or pipfile_constraint.editable):
constraints.add(pipfile_constraint)
return constraints
@@ -415,10 +511,16 @@ class Entry(object):
required = self.clean_specifier(required)
parent_requires = self.make_requirement(self.name, required)
parent_dependencies.add("{0} => {1} ({2})".format(p.name, self.name, required))
if not parent_requires.requirement.specifier.contains(self.original_version):
# use pre=True here or else prereleases dont satisfy constraints
if parent_requires.requirement.specifier and (
not parent_requires.requirement.specifier.contains(self.original_version, prereleases=True)
):
can_use_original = False
if not parent_requires.requirement.specifier.contains(self.updated_version):
has_mismatch = True
if parent_requires.requirement.specifier and (
not parent_requires.requirement.specifier.contains(self.updated_version, prereleases=True)
):
if not self.entry.editable and self.updated_version != self.original_version:
has_mismatch = True
if has_mismatch and not can_use_original:
from pipenv.exceptions import DependencyConflict
msg = (
@@ -500,6 +602,23 @@ class Entry(object):
return super(Entry, self).__getattribute__(key)
def clean_results(results, resolver, project, dev=False):
if not project.lockfile_exists:
return results
lockfile = project.lockfile_content
section = "develop" if dev else "default"
pipfile_section = "dev-packages" if dev else "packages"
reverse_deps = project.environment.reverse_dependencies()
new_results = [r for r in results if r["name"] not in lockfile[section]]
for result in results:
name = result.get("name")
entry_dict = result.copy()
entry = Entry(name, entry_dict, project, resolver, reverse_deps=reverse_deps, dev=dev)
entry_dict = entry.get_cleaned_dict(keep_outdated=False)
new_results.append(entry_dict)
return new_results
def clean_outdated(results, resolver, project, dev=False):
from pipenv.vendor.requirementslib.models.requirements import Requirement
if not project.lockfile_exists:
@@ -520,24 +639,29 @@ def clean_outdated(results, resolver, project, dev=False):
# TODO: Should this be the case for all locking?
if entry.was_editable and not entry.is_editable:
continue
# if the entry has not changed versions since the previous lock,
# don't introduce new markers since that is more restrictive
if entry.has_markers and not entry.had_markers and not entry.is_updated:
del entry.entry_dict["markers"]
entry._entry.req.req.marker = None
entry._entry.markers = ""
# do make sure we retain the original markers for entries that are not changed
elif entry.had_markers and not entry.has_markers and not entry.is_updated:
if entry._entry and entry._entry.req and entry._entry.req.req and (
entry.lockfile_entry and entry.lockfile_entry.req and
entry.lockfile_entry.req.req and entry.lockfile_entry.req.req.marker
):
entry._entry.req.req.marker = entry.lockfile_entry.req.req.marker
if entry.lockfile_entry and entry.lockfile_entry.markers:
entry._entry.markers = entry.lockfile_entry.markers
if entry.lockfile_dict and "markers" in entry.lockfile_dict:
entry.entry_dict["markers"] = entry.lockfile_dict["markers"]
entry_dict = entry.get_cleaned_dict()
lockfile_entry = lockfile[section].get(name, None)
if not lockfile_entry:
alternate_section = "develop" if not dev else "default"
if name in lockfile[alternate_section]:
lockfile_entry = lockfile[alternate_section][name]
if lockfile_entry and not entry.is_updated:
old_markers = next(iter(m for m in (
entry.lockfile_entry.markers, lockfile_entry.get("markers", None)
) if m is not None), None)
new_markers = entry_dict.get("markers", None)
if old_markers:
old_markers = Entry.marker_to_str(old_markers)
if old_markers and not new_markers:
entry.markers = old_markers
elif new_markers and not old_markers:
del entry.entry_dict["markers"]
entry._entry.req.req.marker = None
entry._entry.markers = None
# if the entry has not changed versions since the previous lock,
# don't introduce new markers since that is more restrictive
# if entry.has_markers and not entry.had_markers and not entry.is_updated:
# do make sure we retain the original markers for entries that are not changed
entry_dict = entry.get_cleaned_dict(keep_outdated=True)
new_results.append(entry_dict)
return new_results
@@ -582,6 +706,8 @@ def resolve_packages(pre, clear, verbose, system, write, requirements_dir, packa
)
def resolve(packages, pre, project, sources, clear, system, requirements_dir=None):
from pipenv.patched.piptools import logging as piptools_logging
piptools_logging.log.verbosity = 1 if verbose else 0
return resolve_deps(
packages,
which,
@@ -611,6 +737,8 @@ def resolve_packages(pre, clear, verbose, system, write, requirements_dir, packa
)
if keep_outdated:
results = clean_outdated(results, resolver, project)
else:
results = clean_results(results, resolver, project)
if write:
with open(write, "w") as fh:
if not results:
@@ -646,26 +774,19 @@ def main():
_patch_path(pipenv_site=parsed.pipenv_site)
import warnings
from pipenv.vendor.vistir.compat import ResourceWarning
from pipenv.vendor.vistir.misc import get_wrapped_stream
from pipenv.vendor.vistir.misc import replace_with_text_stream
warnings.simplefilter("ignore", category=ResourceWarning)
import six
if six.PY3:
stdout = sys.stdout.buffer
stderr = sys.stderr.buffer
else:
stdout = sys.stdout
stderr = sys.stderr
sys.stderr = get_wrapped_stream(stderr)
sys.stdout = get_wrapped_stream(stdout)
replace_with_text_stream("stdout")
replace_with_text_stream("stderr")
from pipenv.vendor import colorama
if os.name == "nt" and (
all(getattr(stream, method, None) for stream in [sys.stdout, sys.stderr] for method in ["write", "isatty"]) and
all(stream.isatty() for stream in [sys.stdout, sys.stderr])
):
stderr_wrapper = colorama.AnsiToWin32(sys.stderr, autoreset=False, convert=None, strip=None)
stdout_wrapper = colorama.AnsiToWin32(sys.stdout, autoreset=False, convert=None, strip=None)
sys.stderr = stderr_wrapper.stream
sys.stdout = stdout_wrapper.stream
# stderr_wrapper = colorama.AnsiToWin32(sys.stderr, autoreset=False, convert=None, strip=None)
# stdout_wrapper = colorama.AnsiToWin32(sys.stdout, autoreset=False, convert=None, strip=None)
# sys.stderr = stderr_wrapper.stream
# sys.stdout = stdout_wrapper.stream
colorama.init(wrap=False)
elif os.name != "nt":
colorama.init()
+294 -92
View File
@@ -38,6 +38,8 @@ from .vendor.urllib3 import util as urllib3_util
if environments.MYPY_RUNNING:
from typing import Tuple, Dict, Any, List, Union, Optional, Text
from .vendor.requirementslib.models.requirements import Requirement, Line
from .vendor.packaging.markers import Marker
from .vendor.packaging.specifiers import Specifier
from .project import Project
@@ -292,11 +294,14 @@ def get_pipenv_sitedir():
class Resolver(object):
def __init__(self, constraints, req_dir, project, sources, clear=False, pre=False):
def __init__(
self, constraints, req_dir, project, sources, index_lookup=None,
markers_lookup=None, skipped=None, clear=False, pre=False
):
from pipenv.patched.piptools import logging as piptools_logging
if environments.is_verbose():
logging.log.verbose = True
piptools_logging.log.verbose = True
piptools_logging.log.verbosity = environments.PIPENV_VERBOSITY
self.initial_constraints = constraints
self.req_dir = req_dir
self.project = project
@@ -306,6 +311,11 @@ class Resolver(object):
self.clear = clear
self.pre = pre
self.results = None
self.markers_lookup = markers_lookup if markers_lookup is not None else {}
self.index_lookup = index_lookup if index_lookup is not None else {}
self.skipped = skipped if skipped is not None else {}
self.markers = {}
self.requires_python_markers = {}
self._pip_args = None
self._constraints = None
self._parsed_constraints = None
@@ -437,6 +447,7 @@ class Resolver(object):
except TypeError:
raise RequirementError(req=req)
setup_info = req.req.setup_info
setup_info.get_info()
locked_deps[pep423_name(name)] = entry
requirements = [v for v in getattr(setup_info, "requires", {}).values()]
for r in requirements:
@@ -446,20 +457,20 @@ class Resolver(object):
continue
line = _requirement_to_str_lowercase_name(r)
new_req, _, _ = cls.parse_line(line)
if r.marker and not r.marker.evaluate():
new_constraints = {}
_, new_entry = req.pipfile_entry
new_lock = {
pep423_name(new_req.normalized_name): new_entry
}
else:
new_constraints, new_lock = cls.get_deps_from_req(new_req)
locked_deps.update(new_lock)
constraints |= new_constraints
else:
if r is not None:
line = _requirement_to_str_lowercase_name(r)
constraints.add(line)
if r.marker and not r.marker.evaluate():
new_constraints = {}
_, new_entry = req.pipfile_entry
new_lock = {
pep423_name(new_req.normalized_name): new_entry
}
else:
new_constraints, new_lock = cls.get_deps_from_req(new_req)
locked_deps.update(new_lock)
constraints |= new_constraints
# if there is no marker or there is a valid marker, add the constraint line
elif r and (not r.marker or (r.marker and r.marker.evaluate())):
line = _requirement_to_str_lowercase_name(r)
constraints.add(line)
# ensure the top level entry remains as provided
# note that we shouldn't pin versions for editable vcs deps
if (not req.is_vcs or (req.is_vcs and not req.editable)):
@@ -477,10 +488,49 @@ class Resolver(object):
req.req.setup_path is not None and os.path.exists(req.req.setup_path)):
constraints.add(req.constraint_line)
else:
# if the dependency isn't installable, don't add it to constraints
# and instead add it directly to the lock
if req and req.requirement and (
req.requirement.marker and not req.requirement.marker.evaluate()
):
return constraints, locked_deps
constraints.add(req.constraint_line)
return constraints, locked_deps
return constraints, locked_deps
@classmethod
def create(
cls,
deps, # type: List[str]
index_lookup=None, # type: Dict[str, str]
markers_lookup=None, # type: Dict[str, str]
project=None, # type: Project
sources=None, # type: List[str]
req_dir=None, # type: str
clear=False, # type: bool
pre=False # type: bool
):
# type: (...) -> "Resolver"
from pipenv.vendor.vistir.path import create_tracked_tempdir
if not req_dir:
req_dir = create_tracked_tempdir(suffix="-requirements", prefix="pipenv-")
if index_lookup is None:
index_lookup = {}
if markers_lookup is None:
markers_lookup = {}
if project is None:
from pipenv.core import project
project = project
if sources is None:
sources = project.sources
constraints, skipped, index_lookup, markers_lookup = cls.get_metadata(
deps, index_lookup, markers_lookup, project, sources,
)
return Resolver(
constraints, req_dir, project, sources, index_lookup=index_lookup,
markers_lookup=markers_lookup, skipped=skipped, clear=clear, pre=pre
)
@property
def pip_command(self):
if self._pip_command is None:
@@ -602,6 +652,35 @@ class Resolver(object):
self.resolved_tree.update(results)
return self.resolved_tree
@lru_cache(maxsize=1024)
def fetch_candidate(self, ireq):
candidates = self.repository.find_all_candidates(ireq.name)
matched_version = next(iter(sorted(
ireq.specifier.filter((c.version for c in candidates), True), reverse=True)
), None)
if matched_version:
matched_candidate = next(iter(
c for c in candidates if c.version == matched_version
))
return matched_candidate
return None
def resolve_constraints(self):
new_tree = set()
for result in self.resolved_tree:
if result.markers:
self.markers[result.name] = result.markers
else:
candidate = self.fetch_candidate(result)
if getattr(candidate, "requires_python", None):
marker = make_marker_from_specifier(candidate.requires_python)
self.markers[result.name] = marker
result.markers = marker
if result.req:
result.req.marker = marker
new_tree.add(result)
self.resolved_tree = new_tree
@classmethod
def prepend_hash_types(cls, checksums):
cleaned_checksums = []
@@ -720,6 +799,87 @@ class Resolver(object):
self.hashes[ireq] = self.get_hash(ireq, ireq_hashes=ireq_hashes)
return self.hashes
def _clean_skipped_result(self, req, value):
ref = None
if req.is_vcs:
ref = req.commit_hash
ireq = req.as_ireq()
entry = value.copy()
entry["name"] = req.name
if entry.get("editable", False) and entry.get("version"):
del entry["version"]
ref = ref if ref is not None else entry.get("ref")
if ref:
entry["ref"] = ref
if self._should_include_hash(ireq):
collected_hashes = self.collect_hashes(ireq)
if collected_hashes:
entry["hashes"] = sorted(set(collected_hashes))
return req.name, entry
def clean_results(self):
from pipenv.vendor.requirementslib.models.requirements import Requirement
reqs = [(Requirement.from_ireq(ireq), ireq) for ireq in self.resolved_tree]
results = {}
for req, ireq in reqs:
if (req.vcs and req.editable and not req.is_direct_url):
continue
collected_hashes = self.collect_hashes(ireq)
req = req.add_hashes(collected_hashes)
if not collected_hashes and self._should_include_hash(ireq):
discovered_hashes = self.hashes.get(ireq, set()) | self.get_hash(ireq)
if discovered_hashes:
req = req.add_hashes(discovered_hashes)
self.hashes[ireq] = collected_hashes = discovered_hashes
if collected_hashes:
collected_hashes = sorted(set(collected_hashes))
name, entry = format_requirement_for_lockfile(
req, self.markers_lookup, self.index_lookup, collected_hashes
)
if name in results:
results[name].update(entry)
else:
results[name] = entry
for k in list(self.skipped.keys()):
req = Requirement.from_pipfile(k, self.skipped[k])
name, entry = self._clean_skipped_result(req, self.skipped[k])
if name in results:
results[name].update(entry)
else:
results[name] = entry
results = list(results.values())
return results
def format_requirement_for_lockfile(req, markers_lookup, index_lookup, hashes=None):
if req.specifiers:
version = str(req.get_version())
else:
version = None
index = index_lookup.get(req.normalized_name)
markers = markers_lookup.get(req.normalized_name)
req.index = index
name, pf_entry = req.pipfile_entry
name = pep423_name(req.name)
entry = {}
if isinstance(pf_entry, six.string_types):
entry["version"] = pf_entry.lstrip("=")
else:
entry.update(pf_entry)
if version is not None:
entry["version"] = version
if req.line_instance.is_direct_url:
entry["file"] = req.req.uri
if hashes:
entry["hashes"] = sorted(set(hashes))
entry["name"] = name
if index: # and index != next(iter(project.sources), {}).get("name"):
entry.update({"index": index})
if markers:
entry.update({"markers": markers})
entry = translate_markers(entry)
return name, entry
def _show_warning(message, category, filename, lineno, line):
warnings.showwarning(message=message, category=category, filename=filename,
@@ -738,87 +898,93 @@ def actually_resolve_deps(
req_dir=None,
):
from pipenv.vendor.vistir.path import create_tracked_tempdir
from pipenv.vendor.requirementslib.models.requirements import Requirement
if not req_dir:
req_dir = create_tracked_tempdir(suffix="-requirements", prefix="pipenv-")
warning_list = []
with warnings.catch_warnings(record=True) as warning_list:
constraints, skipped, index_lookup, markers_lookup = Resolver.get_metadata(
deps, index_lookup, markers_lookup, project, sources,
resolver = Resolver.create(
deps, index_lookup, markers_lookup, project, sources, req_dir, clear, pre
)
resolver = Resolver(constraints, req_dir, project, sources, clear=clear, pre=pre)
resolved_tree = resolver.resolve()
resolver.resolve()
hashes = resolver.resolve_hashes()
reqs = [(Requirement.from_ireq(ireq), ireq) for ireq in resolved_tree]
results = {}
for req, ireq in reqs:
if (req.vcs and req.editable and not req.is_direct_url):
continue
collected_hashes = resolver.collect_hashes(ireq)
if collected_hashes:
req = req.add_hashes(collected_hashes)
elif resolver._should_include_hash(ireq):
existing_hashes = hashes.get(ireq, set())
discovered_hashes = existing_hashes | resolver.get_hash(ireq)
if discovered_hashes:
req = req.add_hashes(discovered_hashes)
resolver.hashes[ireq] = discovered_hashes
if req.specifiers:
version = str(req.get_version())
else:
version = None
index = index_lookup.get(req.normalized_name)
markers = markers_lookup.get(req.normalized_name)
req.index = index
name, pf_entry = req.pipfile_entry
name = pep423_name(req.name)
entry = {}
if isinstance(pf_entry, six.string_types):
entry["version"] = pf_entry.lstrip("=")
else:
entry.update(pf_entry)
if version is not None:
entry["version"] = version
if req.line_instance.is_direct_url:
entry["file"] = req.req.uri
if collected_hashes:
entry["hashes"] = sorted(set(collected_hashes))
entry["name"] = name
if index: # and index != next(iter(project.sources), {}).get("name"):
entry.update({"index": index})
if markers:
entry.update({"markers": markers})
entry = translate_markers(entry)
if name in results:
results[name].update(entry)
else:
results[name] = entry
for k in list(skipped.keys()):
req = Requirement.from_pipfile(k, skipped[k])
ref = None
if req.is_vcs:
ref = req.commit_hash
ireq = req.as_ireq()
entry = skipped[k].copy()
entry["name"] = req.name
ref = ref if ref is not None else entry.get("ref")
if ref:
entry["ref"] = ref
if resolver._should_include_hash(ireq):
collected_hashes = resolver.collect_hashes(ireq)
if collected_hashes:
entry["hashes"] = sorted(set(collected_hashes))
if k in results:
results[k].update(entry)
else:
results[k] = entry
results = list(results.values())
resolver.resolve_constraints()
results = resolver.clean_results()
# constraints, skipped, index_lookup, markers_lookup = Resolver.get_metadata(
# deps, index_lookup, markers_lookup, project, sources,
# )
# resolver = Resolver(constraints, req_dir, project, sources, clear=clear, pre=pre)
# resolved_tree = resolver.resolve()
# hashes = resolver.resolve_hashes()
# reqs = [(Requirement.from_ireq(ireq), ireq) for ireq in resolved_tree]
# results = {}
# for req, ireq in reqs:
# if (req.vcs and req.editable and not req.is_direct_url):
# continue
# collected_hashes = resolver.collect_hashes(ireq)
# if collected_hashes:
# req = req.add_hashes(collected_hashes)
# elif resolver._should_include_hash(ireq):
# existing_hashes = hashes.get(ireq, set())
# discovered_hashes = existing_hashes | resolver.get_hash(ireq)
# if discovered_hashes:
# req = req.add_hashes(discovered_hashes)
# resolver.hashes[ireq] = discovered_hashes
# if req.specifiers:
# version = str(req.get_version())
# else:
# version = None
# index = index_lookup.get(req.normalized_name)
# markers = markers_lookup.get(req.normalized_name)
# req.index = index
# name, pf_entry = req.pipfile_entry
# name = pep423_name(req.name)
# entry = {}
# if isinstance(pf_entry, six.string_types):
# entry["version"] = pf_entry.lstrip("=")
# else:
# entry.update(pf_entry)
# if version is not None:
# entry["version"] = version
# if req.line_instance.is_direct_url:
# entry["file"] = req.req.uri
# if collected_hashes:
# entry["hashes"] = sorted(set(collected_hashes))
# entry["name"] = name
# if index: # and index != next(iter(project.sources), {}).get("name"):
# entry.update({"index": index})
# if markers:
# entry.update({"markers": markers})
# entry = translate_markers(entry)
# if name in results:
# results[name].update(entry)
# else:
# results[name] = entry
# for k in list(skipped.keys()):
# req = Requirement.from_pipfile(k, skipped[k])
# ref = None
# if req.is_vcs:
# ref = req.commit_hash
# ireq = req.as_ireq()
# entry = skipped[k].copy()
# entry["name"] = req.name
# ref = ref if ref is not None else entry.get("ref")
# if ref:
# entry["ref"] = ref
# if resolver._should_include_hash(ireq):
# collected_hashes = resolver.collect_hashes(ireq)
# if collected_hashes:
# entry["hashes"] = sorted(set(collected_hashes))
# if k in results:
# results[k].update(entry)
# else:
# results[k] = entry
# results = list(results.values())
for warning in warning_list:
_show_warning(warning.message, warning.category, warning.filename, warning.lineno,
warning.line)
return (results, hashes, markers_lookup, resolver, skipped)
return (results, hashes, resolver.markers_lookup, resolver, resolver.skipped)
@contextlib.contextmanager
@@ -845,29 +1011,37 @@ def resolve(cmd, sp):
EOF.__module__ = "pexpect.exceptions"
from ._compat import decode_output
c = delegator.run(Script.parse(cmd).cmdify(), block=False, env=os.environ.copy())
if environments.is_verbose():
c.subprocess.logfile = sys.stderr
_out = decode_output("")
result = None
out = to_native_string("")
while True:
result = None
try:
result = c.expect(u"\n", timeout=environments.PIPENV_INSTALL_TIMEOUT)
except (EOF, TIMEOUT):
pass
_out = c.subprocess.before
if _out is not None:
if _out:
_out = decode_output("{0}".format(_out))
out += _out
sp.text = to_native_string("{0}".format(_out[:100]))
if environments.is_verbose():
sp.hide_and_write(_out.rstrip())
if result is None:
# if environments.is_verbose():
# sp.hide_and_write(_out.rstrip())
if not result and not _out:
break
_out = to_native_string("")
c.block()
if c.return_code != 0:
sp.red.fail(environments.PIPENV_SPINNER_FAIL_TEXT.format(
"Locking Failed!"
))
click_echo(c.out.strip(), err=True)
if not environments.is_verbose():
click_echo(out, err=True)
click_echo(c.err.strip(), err=True)
sys.exit(c.return_code)
return c
@@ -1038,9 +1212,6 @@ def venv_resolve_deps(
raise RuntimeError("There was a problem with locking.")
if os.path.exists(target_file.name):
os.unlink(target_file.name)
if environments.is_verbose():
click_echo(results, err=True)
if lockfile_section not in lockfile:
lockfile[lockfile_section] = {}
prepare_lockfile(results, pipfile, lockfile[lockfile_section])
@@ -1639,7 +1810,7 @@ def clean_resolved_dep(dep, is_top_level=False, pipfile_entry=None):
lockfile = {}
# We use this to determine if there are any markers on top level packages
# So we can make sure those win out during resolution if the packages reoccur
if "version" in dep:
if "version" in dep and dep["version"] and not dep.get("editable", False):
version = "{0}".format(dep["version"])
if not version.startswith("=="):
version = "=={0}".format(version)
@@ -1939,3 +2110,34 @@ def is_python_command(line):
if line.startswith("py"):
return True
return False
def make_marker_from_specifier(spec):
# type: (str) -> Optional[Marker]
"""Given a python version specifier, create a marker
:param spec: A specifier
:type spec: str
:return: A new marker
:rtype: Optional[:class:`packaging.marker.Marker`]
"""
from .vendor.packaging.specifiers import SpecifierSet, Specifier
from .vendor.packaging.markers import Marker
from .vendor.requirementslib.models.markers import cleanup_pyspecs, format_pyversion
if not any(spec.startswith(k) for k in Specifier._operators.keys()):
if spec.strip().lower() in ["any", "<any>", "*"]:
return None
spec = "=={0}".format(spec)
elif spec.startswith("==") and spec.count("=") > 3:
spec = "=={0}".format(spec.lstrip("="))
specset = cleanup_pyspecs(SpecifierSet(spec))
marker_str = " and ".join([format_pyversion(pv) for pv in specset])
return Marker(marker_str)
# spec_match = next(iter(c for c in Specifier._operators if c in spec), None)
# if spec_match:
# spec_index = spec.index(spec_match)
# spec_end = spec_index + len(spec_match)
# op = spec[spec_index:spec_end].strip()
# version = spec[spec_end:].strip()
# spec = " {0} '{1}'".format(op, version)
# return Marker("python_version {0}".format(spec))
+5
View File
@@ -35,6 +35,11 @@ else:
IGNORE_UNSUPPORTED = bool(os.environ.get("PYTHONFINDER_IGNORE_UNSUPPORTED", False))
MYPY_RUNNING = os.environ.get("MYPY_RUNNING", is_type_checking())
SUBPROCESS_TIMEOUT = os.environ.get("PYTHONFINDER_SUBPROCESS_TIMEOUT", 5)
"""The default subprocess timeout for determining python versions
Set to **5** by default.
"""
def get_shim_paths():
+5 -4
View File
@@ -89,6 +89,9 @@ class BasePath(object):
self._children = {}
for key in list(self._pythons.keys()):
del self._pythons[key]
self._pythons = None
self._py_version = None
self.path = None
@property
def children(self):
@@ -315,10 +318,8 @@ class BaseFinder(object):
raise NotImplementedError
@classmethod
def create(
cls, *args, **kwargs # type: Type[BaseFinderType] # type: Any # type: Any
):
# type: (...) -> BaseFinderType
def create(cls, *args, **kwargs):
# type: (Any, Any) -> BaseFinderType
raise NotImplementedError
@property
+179 -78
View File
@@ -22,6 +22,7 @@ from ..environment import (
PYENV_INSTALLED,
PYENV_ROOT,
SHIM_PATHS,
get_shim_paths,
)
from ..exceptions import InvalidPythonVersion
from ..utils import (
@@ -76,10 +77,9 @@ class SystemPath(object):
path_order = attr.ib(default=attr.Factory(list)) # type: List[str]
python_version_dict = attr.ib() # type: DefaultDict[Tuple, List[PythonVersion]]
only_python = attr.ib(default=False, type=bool)
pyenv_finder = attr.ib(
default=None, validator=optional_instance_of("PythonFinder")
) # type: Optional[PythonFinder]
pyenv_finder = attr.ib(default=None) # type: Optional[PythonFinder]
asdf_finder = attr.ib(default=None) # type: Optional[PythonFinder]
windows_finder = attr.ib(default=None) # type: Optional[WindowsFinder]
system = attr.ib(default=False, type=bool)
_version_dict = attr.ib(
default=attr.Factory(defaultdict)
@@ -91,31 +91,63 @@ class SystemPath(object):
) # type: Dict[str, Union[WindowsFinder, PythonFinder]]
def _register_finder(self, finder_name, finder):
# type: (str, Union[WindowsFinder, PythonFinder]) -> None
# type: (str, Union[WindowsFinder, PythonFinder]) -> "SystemPath"
if finder_name not in self.__finders:
self.__finders[finder_name] = finder
return self
def clear_caches(self):
for key in ["executables", "python_executables", "version_dict", "path_entries"]:
if key in self.__dict__:
del self.__dict__[key]
self._executables = []
self._python_executables = {}
self.python_version_dict = defaultdict(list)
self._version_dict = defaultdict(list)
for finder in list(self.__finders.keys()):
del self.__finders[finder]
self.__finders = {}
return attr.evolve(
self,
executables=[],
python_executables={},
python_version_dict=defaultdict(list),
version_dict=defaultdict(list),
pyenv_finder=None,
windows_finder=None,
asdf_finder=None,
path_order=[],
paths=defaultdict(PathEntry),
)
def __del__(self):
self.clear_caches()
for key in ["executables", "python_executables", "version_dict", "path_entries"]:
try:
del self.__dict__[key]
except KeyError:
pass
for finder in list(self.__finders.keys()):
del self.__finders[finder]
self.__finders = {}
self._python_executables = {}
self._executables = []
self.python_version_dict = defaultdict(list)
self.version_dict = defaultdict(list)
self.path_order = []
self.pyenv_finder = None
self.asdf_finder = None
self.paths = defaultdict(PathEntry)
self.__finders = {}
@property
def finders(self):
# type: () -> List[str]
return [k for k in self.__finders.keys()]
@staticmethod
def check_for_pyenv():
return PYENV_INSTALLED or os.path.exists(normalize_path(PYENV_ROOT))
@staticmethod
def check_for_asdf():
return ASDF_INSTALLED or os.path.exists(normalize_path(ASDF_DATA_DIR))
@python_version_dict.default
def create_python_version_dict(self):
# type: () -> DefaultDict[Tuple, List[PythonVersion]]
@@ -168,35 +200,68 @@ class SystemPath(object):
self._version_dict[version].append(entry)
return self._version_dict
def __attrs_post_init__(self):
# type: () -> None
#: slice in pyenv
def _run_setup(self):
# type: () -> "SystemPath"
if not self.__class__ == SystemPath:
return
if os.name == "nt":
self._setup_windows()
if PYENV_INSTALLED:
self._setup_pyenv()
if ASDF_INSTALLED:
self._setup_asdf()
return self
new_instance = self
path_order = new_instance.path_order[:]
path_entries = self.paths.copy()
if self.global_search and "PATH" in os.environ:
path_order = path_order + os.environ["PATH"].split(os.pathsep)
path_instances = [
ensure_path(p.strip('"'))
for p in path_order
if not any(
is_in_path(normalize_path(str(p)), normalize_path(shim))
for shim in SHIM_PATHS
)
]
path_entries.update(
{
p.as_posix(): PathEntry.create(
path=p.absolute(), is_root=True, only_python=self.only_python
)
for p in path_instances
}
)
new_instance = attr.evolve(
new_instance,
path_order=[p.as_posix() for p in path_instances],
paths=path_entries,
)
if os.name == "nt" and "windows" not in self.finders:
new_instance = new_instance._setup_windows()
#: slice in pyenv
if self.check_for_pyenv() and "pyenv" not in self.finders:
new_instance = new_instance._setup_pyenv()
#: slice in asdf
if self.check_for_asdf() and "asdf" not in self.finders:
new_instance = new_instance._setup_asdf()
venv = os.environ.get("VIRTUAL_ENV")
if os.name == "nt":
bin_dir = "Scripts"
else:
bin_dir = "bin"
if venv and (self.system or self.global_search):
if venv and (new_instance.system or new_instance.global_search):
p = ensure_path(venv)
self.path_order = [(p / bin_dir).as_posix()] + self.path_order
self.paths[p] = self.get_path(p.joinpath(bin_dir))
if self.system:
path_order = [(p / bin_dir).as_posix()] + new_instance.path_order
new_instance = attr.evolve(new_instance, path_order=path_order)
paths = new_instance.paths.copy()
paths[p] = new_instance.get_path(p.joinpath(bin_dir))
new_instance = attr.evolve(new_instance, paths=paths)
if new_instance.system:
syspath = Path(sys.executable)
syspath_bin = syspath.parent
if syspath_bin.name != bin_dir and syspath_bin.joinpath(bin_dir).exists():
syspath_bin = syspath_bin / bin_dir
self.path_order = [syspath_bin.as_posix()] + self.path_order
self.paths[syspath_bin] = PathEntry.create(
path_order = [syspath_bin.as_posix()] + new_instance.path_order
paths = new_instance.paths.copy()
paths[syspath_bin] = PathEntry.create(
path=syspath_bin, is_root=True, only_python=False
)
new_instance = attr.evolve(new_instance, path_order=path_order, paths=paths)
return new_instance
def _get_last_instance(self, path):
# type: (str) -> int
@@ -210,7 +275,7 @@ class SystemPath(object):
return path_index
def _slice_in_paths(self, start_idx, paths):
# type: (int, List[Path]) -> None
# type: (int, List[Path]) -> "SystemPath"
before_path = [] # type: List[str]
after_path = [] # type: List[str]
if start_idx == 0:
@@ -220,29 +285,35 @@ class SystemPath(object):
else:
before_path = self.path_order[: start_idx + 1]
after_path = self.path_order[start_idx + 2 :]
self.path_order = before_path + [p.as_posix() for p in paths] + after_path
path_order = before_path + [p.as_posix() for p in paths] + after_path
if path_order == self.path_order:
return self
return attr.evolve(self, path_order=path_order)
def _remove_path(self, path):
# type: (str) -> None
# type: (str) -> "SystemPath"
path_copy = [p for p in reversed(self.path_order[:])]
new_order = []
target = normalize_path(path)
path_map = {normalize_path(pth): pth for pth in self.paths.keys()}
new_paths = self.paths.copy()
if target in path_map:
del self.paths[path_map[target]]
del new_paths[path_map[target]]
for current_path in path_copy:
normalized = normalize_path(current_path)
if normalized != target:
new_order.append(normalized)
new_order = [p for p in reversed(new_order)]
self.path_order = new_order
return attr.evolve(self, path_order=new_order, paths=new_paths)
def _setup_asdf(self):
# type: () -> None
# type: () -> "SystemPath"
if "asdf" in self.finders and self.asdf_finder is not None:
return self
from .python import PythonFinder
os_path = os.environ["PATH"].split(os.pathsep)
self.asdf_finder = PythonFinder.create(
asdf_finder = PythonFinder.create(
root=ASDF_DATA_DIR,
ignore_unsupported=True,
sort_function=parse_asdf_version_order,
@@ -252,20 +323,24 @@ class SystemPath(object):
try:
asdf_index = self._get_last_instance(ASDF_DATA_DIR)
except ValueError:
pyenv_index = 0 if is_in_path(next(iter(os_path), ""), PYENV_ROOT) else -1
asdf_index = 0 if is_in_path(next(iter(os_path), ""), ASDF_DATA_DIR) else -1
if asdf_index is None:
# we are in a virtualenv without global pyenv on the path, so we should
# not write pyenv to the path here
return
root_paths = [p for p in self.asdf_finder.roots]
self._slice_in_paths(asdf_index, [self.asdf_finder.root])
self.paths[self.asdf_finder.root] = self.asdf_finder
self.paths.update(self.asdf_finder.roots)
self._remove_path(normalize_path(os.path.join(ASDF_DATA_DIR, "shims")))
self._register_finder("asdf", self.asdf_finder)
return self
root_paths = [p for p in asdf_finder.roots]
new_instance = self._slice_in_paths(asdf_index, [asdf_finder.root])
paths = self.paths.copy()
paths[asdf_finder.root] = asdf_finder
paths.update(asdf_finder.roots)
return (
attr.evolve(new_instance, paths=paths, asdf_finder=asdf_finder)
._remove_path(normalize_path(os.path.join(ASDF_DATA_DIR, "shims")))
._register_finder("asdf", asdf_finder)
)
def reload_finder(self, finder_name):
# type: (str) -> None
# type: (str) -> "SystemPath"
if finder_name is None:
raise TypeError("Must pass a string as the name of the target finder")
finder_attr = "{0}_finder".format(finder_name)
@@ -286,19 +361,21 @@ class SystemPath(object):
finder_name == "asdf" and not ASDF_INSTALLED
):
# Don't allow loading of finders that aren't explicitly 'installed' as it were
pass
return self
setattr(self, finder_attr, None)
if finder_name in self.__finders:
del self.__finders[finder_name]
setup_fn()
return setup_fn()
def _setup_pyenv(self):
# type: () -> None
# type: () -> "SystemPath"
if "pyenv" in self.finders and self.pyenv_finder is not None:
return self
from .python import PythonFinder
os_path = os.environ["PATH"].split(os.pathsep)
self.pyenv_finder = PythonFinder.create(
pyenv_finder = PythonFinder.create(
root=PYENV_ROOT,
sort_function=parse_pyenv_version_order,
version_glob_path="versions/*",
@@ -312,25 +389,37 @@ class SystemPath(object):
if pyenv_index is None:
# we are in a virtualenv without global pyenv on the path, so we should
# not write pyenv to the path here
return
return self
root_paths = [p for p in self.pyenv_finder.roots]
self._slice_in_paths(pyenv_index, [self.pyenv_finder.root])
self.paths[self.pyenv_finder.root] = self.pyenv_finder
self.paths.update(self.pyenv_finder.roots)
self._remove_path(os.path.join(PYENV_ROOT, "shims"))
self._register_finder("pyenv", self.pyenv_finder)
root_paths = [p for p in pyenv_finder.roots]
new_instance = self._slice_in_paths(pyenv_index, [pyenv_finder.root])
paths = new_instance.paths.copy()
paths[pyenv_finder.root] = pyenv_finder
paths.update(pyenv_finder.roots)
return (
attr.evolve(new_instance, paths=paths, pyenv_finder=pyenv_finder)
._remove_path(os.path.join(PYENV_ROOT, "shims"))
._register_finder("pyenv", pyenv_finder)
)
def _setup_windows(self):
# type: () -> None
# type: () -> "SystemPath"
if "windows" in self.finders and self.windows_finder is not None:
return self
from .windows import WindowsFinder
self.windows_finder = WindowsFinder.create()
root_paths = (p for p in self.windows_finder.paths if p.is_root)
windows_finder = WindowsFinder.create()
root_paths = (p for p in windows_finder.paths if p.is_root)
path_addition = [p.path.as_posix() for p in root_paths]
self.path_order = self.path_order[:] + path_addition
self.paths.update({p.path: p for p in root_paths})
self._register_finder("windows", self.windows_finder)
new_path_order = self.path_order[:] + path_addition
new_paths = self.paths.copy()
new_paths.update({p.path: p for p in root_paths})
return attr.evolve(
self,
windows_finder=windows_finder,
path_order=new_path_order,
paths=new_paths,
)._register_finder("windows", windows_finder)
def get_path(self, path):
# type: (Union[str, Path]) -> PathType
@@ -350,7 +439,7 @@ class SystemPath(object):
return _path
def _get_paths(self):
# type: () -> Iterator
# type: () -> Generator[PathType, None, None]
for path in self.path_order:
try:
entry = self.get_path(path)
@@ -558,30 +647,44 @@ class SystemPath(object):
paths = [] # type: List[str]
if ignore_unsupported:
os.environ["PYTHONFINDER_IGNORE_UNSUPPORTED"] = fs_str("1")
if global_search:
if "PATH" in os.environ:
paths = os.environ["PATH"].split(os.pathsep)
# if global_search:
# if "PATH" in os.environ:
# paths = os.environ["PATH"].split(os.pathsep)
path_order = []
if path:
paths = [path] + paths
paths = [p for p in paths if not any(is_in_path(p, shim) for shim in SHIM_PATHS)]
_path_objects = [ensure_path(p.strip('"')) for p in paths]
paths = [p.as_posix() for p in _path_objects]
path_entries.update(
{
p.as_posix(): PathEntry.create(
path=p.absolute(), is_root=True, only_python=only_python
)
for p in _path_objects
}
)
return cls(
path_order = [path]
path_instance = ensure_path(path)
path_entries.update(
{
path_instance.as_posix(): PathEntry.create(
path=path_instance.absolute(),
is_root=True,
only_python=only_python,
)
}
)
# paths = [path] + paths
# paths = [p for p in paths if not any(is_in_path(p, shim) for shim in SHIM_PATHS)]
# _path_objects = [ensure_path(p.strip('"')) for p in paths]
# paths = [p.as_posix() for p in _path_objects]
# path_entries.update(
# {
# p.as_posix(): PathEntry.create(
# path=p.absolute(), is_root=True, only_python=only_python
# )
# for p in _path_objects
# }
# )
instance = cls(
paths=path_entries,
path_order=paths,
path_order=path_order,
only_python=only_python,
system=system,
global_search=global_search,
ignore_unsupported=ignore_unsupported,
)
instance = instance._run_setup()
return instance
@attr.s(slots=True)
@@ -603,8 +706,6 @@ class PathEntry(BasePath):
def _gen_children(self):
# type: () -> Iterator
from ..environment import get_shim_paths
shim_paths = get_shim_paths()
pass_name = self.name != self.path.name
pass_args = {"is_root": False, "only_python": self.only_python}
+4 -4
View File
@@ -229,10 +229,8 @@ class PythonFinder(BaseFinder, BasePath):
return self.pythons
@classmethod
def create(
cls, root, sort_function, version_glob_path=None, ignore_unsupported=True
): # type: ignore
# type: (Type[PythonFinder], str, Callable, Optional[str], bool) -> PythonFinder
def create(cls, root, sort_function, version_glob_path=None, ignore_unsupported=True):
# type: (str, Callable, Optional[str], bool) -> PythonFinder
root = ensure_path(root)
if not version_glob_path:
version_glob_path = "versions/*"
@@ -593,6 +591,8 @@ class PythonVersion(object):
raise TypeError("Must pass a valid path to parse.")
if not isinstance(path, six.string_types):
path = path.as_posix()
# if not looks_like_python(path):
# raise ValueError("Path %r does not look like a valid python path" % path)
try:
result_version = get_python_version(path)
except Exception:
+9 -4
View File
@@ -1,6 +1,7 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
import importlib
import operator
import os
@@ -10,7 +11,6 @@ from vistir.compat import lru_cache
from . import environment
from .exceptions import InvalidPythonVersion
from .models import path as pyfinder_path
from .utils import Iterable, filter_pythons, version_re
if environment.MYPY_RUNNING:
@@ -68,6 +68,7 @@ class Finder(object):
def create_system_path(self):
# type: () -> SystemPath
pyfinder_path = importlib.import_module("pythonfinder.models.path")
return pyfinder_path.SystemPath.create(
path=self.path_prepend,
system=self.system,
@@ -84,8 +85,9 @@ class Finder(object):
"""
if self._system_path is not None:
self._system_path.clear_caches()
self._system_path = None
self._system_path = self._system_path.clear_caches()
self._system_path = None
pyfinder_path = importlib.import_module("pythonfinder.models.path")
six.moves.reload_module(pyfinder_path)
self._system_path = self.create_system_path()
@@ -95,8 +97,11 @@ class Finder(object):
self._system_path = self.create_system_path()
self.find_all_python_versions.cache_clear()
self.find_python_version.cache_clear()
self.reload_system_path()
if self._windows_finder is not None:
self._windows_finder = None
filter_pythons.cache_clear()
self.reload_system_path()
return self
@property
def system_path(self):
+20 -12
View File
@@ -6,13 +6,14 @@ import itertools
import os
import re
from fnmatch import fnmatch
from threading import Timer
import attr
import six
import vistir
from packaging.version import LegacyVersion, Version
from .environment import MYPY_RUNNING, PYENV_ROOT
from .environment import MYPY_RUNNING, PYENV_ROOT, SUBPROCESS_TIMEOUT
from .exceptions import InvalidPythonVersion
six.add_move(
@@ -37,11 +38,12 @@ if MYPY_RUNNING:
from .models.path import PathEntry
version_re = re.compile(
version_re_str = (
r"(?P<major>\d+)(?:\.(?P<minor>\d+))?(?:\.(?P<patch>(?<=\.)[0-9]+))?\.?"
r"(?:(?P<prerel>[abc]|rc|dev)(?:(?P<prerelversion>\d+(?:\.\d+)*))?)"
r"?(?P<postdev>(\.post(?P<post>\d+))?(\.dev(?P<dev>\d+))?)?"
)
version_re = re.compile(version_re_str)
PYTHON_IMPLEMENTATIONS = (
@@ -53,13 +55,19 @@ PYTHON_IMPLEMENTATIONS = (
"miniconda",
"stackless",
"activepython",
"pyston",
"micropython",
)
RE_MATCHER = re.compile(
r"(({0})(?:\d?(?:\.\d[cpm]{{0,3}}))?(?:-?[\d\.]+)*[^z])".format(
"|".join(PYTHON_IMPLEMENTATIONS)
)
KNOWN_EXTS = {"exe", "py", "fish", "sh", ""}
KNOWN_EXTS = KNOWN_EXTS | set(
filter(None, os.environ.get("PATHEXT", "").split(os.pathsep))
)
PY_MATCH_STR = r"((?P<implementation>{0})(?:\d?(?:\.\d[cpm]{{0,3}}))?(?:-?[\d\.]+)*[^z])".format(
"|".join(PYTHON_IMPLEMENTATIONS)
)
EXE_MATCH_STR = r"{0}(?:\.(?P<ext>{1}))?".format(PY_MATCH_STR, "|".join(KNOWN_EXTS))
RE_MATCHER = re.compile(r"({0}|{1})".format(version_re_str, PY_MATCH_STR))
EXE_MATCHER = re.compile(EXE_MATCH_STR)
RULES_BASE = [
"*{0}",
"*{0}?",
@@ -71,11 +79,6 @@ RULES_BASE = [
]
RULES = [rule.format(impl) for impl in PYTHON_IMPLEMENTATIONS for rule in RULES_BASE]
KNOWN_EXTS = {"exe", "py", "fish", "sh", ""}
KNOWN_EXTS = KNOWN_EXTS | set(
filter(None, os.environ.get("PATHEXT", "").split(os.pathsep))
)
MATCH_RULES = []
for rule in RULES:
MATCH_RULES.extend(
@@ -87,7 +90,11 @@ for rule in RULES:
def get_python_version(path):
# type: (str) -> str
"""Get python version string using subprocess from a given path."""
version_cmd = [path, "-c", "import sys; print(sys.version.split()[0])"]
version_cmd = [
path,
"-c",
"import sys; print('.'.join([str(i) for i in sys.version_info[:3]]))",
]
try:
c = vistir.misc.run(
version_cmd,
@@ -97,6 +104,7 @@ def get_python_version(path):
combine_stderr=False,
write_to_stdout=False,
)
timer = Timer(5, c.kill)
except OSError:
raise InvalidPythonVersion("%s is not a valid python path" % path)
if not c.out:
+1 -1
View File
@@ -10,7 +10,7 @@ from .models.lockfile import Lockfile
from .models.pipfile import Pipfile
from .models.requirements import Requirement
__version__ = "1.4.2"
__version__ = "1.4.3.dev0"
logger = logging.getLogger(__name__)
+509 -5
View File
@@ -1,19 +1,35 @@
# -*- coding: utf-8 -*-
import itertools
import operator
import attr
import distlib.markers
import packaging.version
import six
from packaging.markers import InvalidMarker, Marker
from packaging.specifiers import Specifier, SpecifierSet
from vistir.compat import Mapping, Set, lru_cache
from vistir.misc import _is_iterable, dedup
from ..exceptions import RequirementError
from .utils import filter_none, validate_markers
from ..environment import MYPY_RUNNING
from ..exceptions import RequirementError
from six.moves import reduce # isort:skip
if MYPY_RUNNING:
from typing import Optional, List
MAX_VERSIONS = {2: 7, 3: 10}
@attr.s
class PipenvMarkers(object):
"""System-level requirements - see PEP508 for more detail"""
os_name = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
os_name = attr.ib(default=None, validator=attr.validators.optional(validate_markers))
sys_platform = attr.ib(
default=None, validator=attr.validators.optional(validate_markers)
)
@@ -92,3 +108,491 @@ class PipenvMarkers(object):
pass
else:
return combined_marker
@lru_cache(maxsize=128)
def _tuplize_version(version):
return tuple(int(x) for x in filter(lambda i: i != "*", version.split(".")))
@lru_cache(maxsize=128)
def _format_version(version):
if not isinstance(version, six.string_types):
return ".".join(str(i) for i in version)
return version
# Prefer [x,y) ranges.
REPLACE_RANGES = {">": ">=", "<=": "<"}
@lru_cache(maxsize=128)
def _format_pyspec(specifier):
if isinstance(specifier, str):
if not any(op in specifier for op in Specifier._operators.keys()):
specifier = "=={0}".format(specifier)
specifier = Specifier(specifier)
version = specifier.version.replace(".*", "")
if ".*" in specifier.version:
specifier = Specifier("{0}{1}".format(specifier.operator, version))
try:
op = REPLACE_RANGES[specifier.operator]
except KeyError:
return specifier
curr_tuple = _tuplize_version(version)
try:
next_tuple = (curr_tuple[0], curr_tuple[1] + 1)
except IndexError:
next_tuple = (curr_tuple[0], 1)
if not next_tuple[1] <= MAX_VERSIONS[next_tuple[0]]:
if specifier.operator == "<" and curr_tuple[1] <= MAX_VERSIONS[next_tuple[0]]:
op = "<="
next_tuple = (next_tuple[0], curr_tuple[1])
else:
return specifier
specifier = Specifier("{0}{1}".format(op, _format_version(next_tuple)))
return specifier
@lru_cache(maxsize=128)
def _get_specs(specset):
if specset is None:
return
if isinstance(specset, Specifier) or not _is_iterable(specset):
new_specset = SpecifierSet()
specs = set()
specs.add(specset)
new_specset._specs = frozenset(specs)
specset = new_specset
if isinstance(specset, str):
specset = SpecifierSet(specset)
result = []
for spec in set(specset):
version = spec.version
op = spec.operator
if op in ("in", "not in"):
versions = version.split(",")
op = "==" if op == "in" else "!="
for ver in versions:
result.append((op, _tuplize_version(ver.strip())))
else:
result.append((spec.operator, _tuplize_version(spec.version)))
return sorted(result, key=operator.itemgetter(1))
@lru_cache(maxsize=128)
def _group_by_op(specs):
specs = [_get_specs(x) for x in list(specs)]
flattened = [(op, version) for spec in specs for op, version in spec]
specs = sorted(flattened)
grouping = itertools.groupby(specs, key=operator.itemgetter(0))
return grouping
@lru_cache(maxsize=128)
def cleanup_pyspecs(specs, joiner="or"):
specs = {_format_pyspec(spec) for spec in specs}
# for != operator we want to group by version
# if all are consecutive, join as a list
results = set()
for op, versions in _group_by_op(tuple(specs)):
versions = [version[1] for version in versions]
versions = sorted(dedup(versions))
# if we are doing an or operation, we need to use the min for >=
# this way OR(>=2.6, >=2.7, >=3.6) picks >=2.6
# if we do an AND operation we need to use MAX to be more selective
if op in (">", ">="):
if joiner == "or":
results.add((op, _format_version(min(versions))))
else:
results.add((op, _format_version(max(versions))))
# we use inverse logic here so we will take the max value if we are
# using OR but the min value if we are using AND
elif op in ("<=", "<"):
if joiner == "or":
results.add((op, _format_version(max(versions))))
else:
results.add((op, _format_version(min(versions))))
# leave these the same no matter what operator we use
elif op in ("!=", "==", "~="):
version_list = sorted(
"{0}".format(_format_version(version)) for version in versions
)
version = ", ".join(version_list)
if len(version_list) == 1:
results.add((op, version))
elif op == "!=":
results.add(("not in", version))
elif op == "==":
results.add(("in", version))
else:
specifier = SpecifierSet(
",".join(sorted("{0}{1}".format(op, v) for v in version_list))
)._specs
for s in specifier:
results.add((s._spec[0], s._spec[1]))
else:
if len(version) == 1:
results.add((op, version))
else:
specifier = SpecifierSet("{0}".format(version))._specs
for s in specifier:
results.add((s._spec[0], s._spec[1]))
return sorted(results, key=operator.itemgetter(1))
def fix_version_tuple(version_tuple):
op, version = version_tuple
max_major = max(MAX_VERSIONS.keys())
if version[0] > max_major:
return (op, (max_major, MAX_VERSIONS[max_major]))
max_allowed = MAX_VERSIONS[version[0]]
if op == "<" and version[1] > max_allowed and version[1] - 1 <= max_allowed:
op = "<="
version = (version[0], version[1] - 1)
return (op, version)
@lru_cache(maxsize=128)
def get_versions(specset, group_by_operator=True):
specs = [_get_specs(x) for x in list(tuple(specset))]
initial_sort_key = lambda k: (k[0], k[1])
initial_grouping_key = operator.itemgetter(0)
if not group_by_operator:
initial_grouping_key = operator.itemgetter(1)
initial_sort_key = operator.itemgetter(1)
version_tuples = sorted(
set((op, version) for spec in specs for op, version in spec), key=initial_sort_key
)
version_tuples = [fix_version_tuple(t) for t in version_tuples]
op_groups = [
(grp, list(map(operator.itemgetter(1), keys)))
for grp, keys in itertools.groupby(version_tuples, key=initial_grouping_key)
]
versions = [
(op, packaging.version.parse(".".join(str(v) for v in val)))
for op, vals in op_groups
for val in vals
]
return sorted(versions, key=operator.itemgetter(1))
def _ensure_marker(marker):
if not isinstance(marker, Marker):
return Marker(str(marker))
return marker
def gen_marker(mkr):
m = Marker("python_version == '1'")
m._markers.pop()
m._markers.append(mkr)
return m
def _strip_extra(elements):
"""Remove the "extra == ..." operands from the list."""
return _strip_marker_elem("extra", elements)
def _strip_pyversion(elements):
return _strip_marker_elem("python_version", elements)
def _strip_marker_elem(elem_name, elements):
"""Remove the supplied element from the marker.
This is not a comprehensive implementation, but relies on an important
characteristic of metadata generation: The element's operand is always
associated with an "and" operator. This means that we can simply remove the
operand and the "and" operator associated with it.
"""
extra_indexes = []
preceding_operators = ["and"] if elem_name == "extra" else ["and", "or"]
for i, element in enumerate(elements):
if isinstance(element, list):
cancelled = _strip_marker_elem(elem_name, element)
if cancelled:
extra_indexes.append(i)
elif isinstance(element, tuple) and element[0].value == elem_name:
extra_indexes.append(i)
for i in reversed(extra_indexes):
del elements[i]
if i > 0 and elements[i - 1] in preceding_operators:
# Remove the "and" before it.
del elements[i - 1]
elif elements:
# This shouldn't ever happen, but is included for completeness.
# If there is not an "and" before this element, try to remove the
# operator after it.
del elements[0]
return not elements
def _get_stripped_marker(marker, strip_func):
"""Build a new marker which is cleaned according to `strip_func`"""
if not marker:
return None
marker = _ensure_marker(marker)
elements = marker._markers
strip_func(elements)
if elements:
return marker
return None
def get_without_extra(marker):
"""Build a new marker without the `extra == ...` part.
The implementation relies very deep into packaging's internals, but I don't
have a better way now (except implementing the whole thing myself).
This could return `None` if the `extra == ...` part is the only one in the
input marker.
"""
return _get_stripped_marker(marker, _strip_extra)
def get_without_pyversion(marker):
"""Built a new marker without the `python_version` part.
This could return `None` if the `python_version` section is the only section in the
marker.
"""
return _get_stripped_marker(marker, _strip_pyversion)
def _markers_collect_extras(markers, collection):
# Optimization: the marker element is usually appended at the end.
for el in reversed(markers):
if isinstance(el, tuple) and el[0].value == "extra" and el[1].value == "==":
collection.add(el[2].value)
elif isinstance(el, list):
_markers_collect_extras(el, collection)
def _markers_collect_pyversions(markers, collection):
local_collection = []
marker_format_str = "{0}"
for i, el in enumerate(reversed(markers)):
if isinstance(el, tuple) and el[0].value == "python_version":
new_marker = str(gen_marker(el))
local_collection.append(marker_format_str.format(new_marker))
elif isinstance(el, list):
_markers_collect_pyversions(el, local_collection)
if local_collection:
# local_collection = "{0}".format(" ".join(local_collection))
collection.extend(local_collection)
def _markers_contains_extra(markers):
# Optimization: the marker element is usually appended at the end.
return _markers_contains_key(markers, "extra")
def _markers_contains_pyversion(markers):
return _markers_contains_key(markers, "python_version")
def _markers_contains_key(markers, key):
for element in reversed(markers):
if isinstance(element, tuple) and element[0].value == key:
return True
elif isinstance(element, list):
if _markers_contains_key(element, key):
return True
return False
@lru_cache(maxsize=128)
def get_contained_extras(marker):
"""Collect "extra == ..." operands from a marker.
Returns a list of str. Each str is a speficied extra in this marker.
"""
if not marker:
return set()
extras = set()
marker = _ensure_marker(marker)
_markers_collect_extras(marker._markers, extras)
return extras
def get_contained_pyversions(marker):
"""Collect all `python_version` operands from a marker.
"""
collection = []
if not marker:
return set()
marker = _ensure_marker(marker)
# Collect the (Variable, Op, Value) tuples and string joiners from the marker
_markers_collect_pyversions(marker._markers, collection)
marker_str = " and ".join(sorted(collection))
if not marker_str:
return set()
# Use the distlib dictionary parser to create a dictionary 'trie' which is a bit
# easier to reason about
marker_dict = distlib.markers.parse_marker(marker_str)[0]
version_set = set()
pyversions, _ = parse_marker_dict(marker_dict)
if isinstance(pyversions, set):
version_set.update(pyversions)
elif pyversions is not None:
version_set.add(pyversions)
# Each distinct element in the set was separated by an "and" operator in the marker
# So we will need to reduce them with an intersection here rather than a union
# in order to find the boundaries
versions = set()
if version_set:
versions = reduce(lambda x, y: x & y, version_set)
return versions
@lru_cache(maxsize=128)
def contains_extra(marker):
"""Check whehter a marker contains an "extra == ..." operand.
"""
if not marker:
return False
marker = _ensure_marker(marker)
return _markers_contains_extra(marker._markers)
@lru_cache(maxsize=128)
def contains_pyversion(marker):
"""Check whether a marker contains a python_version operand.
"""
if not marker:
return False
marker = _ensure_marker(marker)
return _markers_contains_pyversion(marker._markers)
def get_specset(marker_list):
# type: (List) -> Optional[SpecifierSet]
specset = set()
_last_str = "and"
for marker_parts in marker_list:
if isinstance(marker_parts, tuple):
variable, op, value = marker_parts
if variable.value != "python_version":
continue
if op.value == "in":
values = [v.strip() for v in value.value.split(",")]
specset.update(Specifier("=={0}".format(v)) for v in values)
elif op.value == "not in":
values = [v.strip() for v in value.value.split(",")]
bad_versions = ["3.0", "3.1", "3.2", "3.3"]
if len(values) >= 2 and any(v in values for v in bad_versions):
values = bad_versions
specset.update(
Specifier("!={0}".format(v.strip())) for v in sorted(bad_versions)
)
else:
specset.add(Specifier("{0}{1}".format(op.value, value.value)))
elif isinstance(marker_parts, list):
specset.update(get_specset(marker_parts))
elif isinstance(marker_parts, str):
_last_str = marker_parts
specifiers = SpecifierSet()
specifiers._specs = frozenset(specset)
return specifiers
def parse_marker_dict(marker_dict):
op = marker_dict["op"]
lhs = marker_dict["lhs"]
rhs = marker_dict["rhs"]
# This is where the spec sets for each side land if we have an "or" operator
side_spec_list = []
side_markers_list = []
finalized_marker = ""
# And if we hit the end of the parse tree we use this format string to make a marker
format_string = "{lhs} {op} {rhs}"
specset = SpecifierSet()
specs = set()
# Essentially we will iterate over each side of the parsed marker if either one is
# A mapping instance (i.e. a dictionary) and recursively parse and reduce the specset
# Union the "and" specs, intersect the "or"s to find the most appropriate range
if any(issubclass(type(side), Mapping) for side in (lhs, rhs)):
for side in (lhs, rhs):
side_specs = set()
side_markers = set()
if issubclass(type(side), Mapping):
merged_side_specs, merged_side_markers = parse_marker_dict(side)
side_specs.update(merged_side_specs)
side_markers.update(merged_side_markers)
else:
marker = _ensure_marker(side)
marker_parts = getattr(marker, "_markers", [])
if marker_parts[0][0].value == "python_version":
side_specs |= set(get_specset(marker_parts))
else:
side_markers.add(str(marker))
side_spec_list.append(side_specs)
side_markers_list.append(side_markers)
if op == "and":
# When we are "and"-ing things together, it probably makes the most sense
# to reduce them here into a single PySpec instance
specs = reduce(lambda x, y: set(x) | set(y), side_spec_list)
markers = reduce(lambda x, y: set(x) | set(y), side_markers_list)
if not specs and not markers:
return specset, finalized_marker
if markers and isinstance(markers, (tuple, list, Set)):
finalized_marker = Marker(" and ".join([m for m in markers if m]))
elif markers:
finalized_marker = str(markers)
specset._specs = frozenset(specs)
return specset, finalized_marker
# Actually when we "or" things as well we can also just turn them into a reduced
# set using this logic now
sides = reduce(lambda x, y: set(x) & set(y), side_spec_list)
finalized_marker = " or ".join(
[normalize_marker_str(m) for m in side_markers_list]
)
specset._specs = frozenset(sorted(sides))
return specset, finalized_marker
else:
# At the tip of the tree we are dealing with strings all around and they just need
# to be smashed together
specs = set()
if lhs == "python_version":
format_string = "{lhs}{op}{rhs}"
marker = Marker(format_string.format(**marker_dict))
marker_parts = getattr(marker, "_markers", [])
_set = get_specset(marker_parts)
if _set:
specs |= set(_set)
specset._specs = frozenset(specs)
return specset, finalized_marker
def format_pyversion(parts):
op, val = parts
return "python_version {0} '{1}'".format(op, val)
def normalize_marker_str(marker):
marker_str = ""
if not marker:
return None
if not isinstance(marker, Marker):
marker = _ensure_marker(marker)
pyversion = get_contained_pyversions(marker)
marker = get_without_pyversion(marker)
if pyversion:
parts = cleanup_pyspecs(pyversion)
marker_str = " and ".join([format_pyversion(pv) for pv in parts])
if marker:
if marker_str:
marker_str = "{0!s} and {1!s}".format(marker_str, marker)
else:
marker_str = "{0!s}".format(marker)
return marker_str.replace('"', "'")
+42 -26
View File
@@ -7,22 +7,21 @@ import os
import sys
import attr
import tomlkit
import plette.models.base
import plette.pipfiles
import tomlkit
from vistir.compat import FileNotFoundError, Path
from ..exceptions import RequirementError
from ..utils import is_editable, is_vcs, merge_items
from .project import ProjectFile
from .requirements import Requirement
from .utils import optional_instance_of, get_url_name
from .utils import get_url_name, optional_instance_of, tomlkit_value_to_python
from ..environment import MYPY_RUNNING
from ..exceptions import RequirementError
from ..utils import is_editable, is_vcs, merge_items
if MYPY_RUNNING:
from typing import Union, Any, Dict, Iterable, Mapping, List, Text
package_type = Dict[Text, Dict[Text, Union[List[Text], Text]]]
source_type = Dict[Text, Union[Text, bool]]
sources_type = Iterable[source_type]
@@ -46,7 +45,7 @@ def patch_plette():
def validate(cls, data):
# type: (Any, Dict[Text, Any]) -> None
if not cerberus: # Skip validation if Cerberus is not available.
if not cerberus: # Skip validation if Cerberus is not available.
return
schema = cls.__SCHEMA__
key = id(schema)
@@ -156,10 +155,12 @@ class Pipfile(object):
path = attr.ib(validator=is_path, type=Path)
projectfile = attr.ib(validator=is_projectfile, type=ProjectFile)
_pipfile = attr.ib(type=PipfileLoader)
_pyproject = attr.ib(default=attr.Factory(tomlkit.document), type=tomlkit.toml_document.TOMLDocument)
_pyproject = attr.ib(
default=attr.Factory(tomlkit.document), type=tomlkit.toml_document.TOMLDocument
)
build_system = attr.ib(default=attr.Factory(dict), type=dict)
requirements = attr.ib(default=attr.Factory(list), type=list)
dev_requirements = attr.ib(default=attr.Factory(list), type=list)
_requirements = attr.ib(default=attr.Factory(list), type=list)
_dev_requirements = attr.ib(default=attr.Factory(list), type=list)
@path.default
def _get_path(self):
@@ -188,7 +189,9 @@ class Pipfile(object):
deps.update(self.pipfile._data["dev-packages"])
if only:
return deps
return merge_items([deps, self.pipfile._data["packages"]])
return tomlkit_value_to_python(
merge_items([deps, self.pipfile._data["packages"]])
)
def get(self, k):
# type: (Text) -> Any
@@ -213,6 +216,7 @@ class Pipfile(object):
if "-" in k:
section, _, pkg_type = k.rpartition("-")
vals = getattr(pipfile.get(section, {}), "_data", {})
vals = tomlkit_value_to_python(vals)
if pkg_type == "vcs":
retval = {k: v for k, v in vals.items() if is_vcs(v)}
elif pkg_type == "editable":
@@ -254,11 +258,7 @@ class Pipfile(object):
:return: A project file with the model and location for interaction
:rtype: :class:`~requirementslib.models.project.ProjectFile`
"""
pf = ProjectFile.read(
path,
PipfileLoader,
invalid_ok=True
)
pf = ProjectFile.read(path, PipfileLoader, invalid_ok=True)
return pf
@classmethod
@@ -303,18 +303,10 @@ class Pipfile(object):
projectfile = cls.load_projectfile(path, create=create)
pipfile = projectfile.model
dev_requirements = [
Requirement.from_pipfile(k, getattr(v, "_data", v)) for k, v in pipfile.get("dev-packages", {}).items()
]
requirements = [
Requirement.from_pipfile(k, getattr(v, "_data", v)) for k, v in pipfile.get("packages", {}).items()
]
creation_args = {
"projectfile": projectfile,
"pipfile": pipfile,
"dev_requirements": dev_requirements,
"requirements": requirements,
"path": Path(projectfile.location)
"path": Path(projectfile.location),
}
return cls(**creation_args)
@@ -333,6 +325,30 @@ class Pipfile(object):
# type: () -> List[Requirement]
return self.requirements
@property
def dev_requirements(self):
# type: () -> List[Requirement]
if not self._dev_requirements:
packages = tomlkit_value_to_python(self.pipfile.get("dev-packages", {}))
self._dev_requirements = [
Requirement.from_pipfile(k, v)
for k, v in packages.items()
if v is not None
]
return self._dev_requirements
@property
def requirements(self):
# type: () -> List[Requirement]
if not self._requirements:
packages = tomlkit_value_to_python(self.pipfile.get("packages", {}))
self._requirements = [
Requirement.from_pipfile(k, v)
for k, v in packages.items()
if v is not None
]
return self._requirements
def _read_pyproject(self):
# type: () -> None
pyproject = self.path.parent.joinpath("pyproject.toml")
+135 -260
View File
@@ -40,7 +40,20 @@ from vistir.path import (
normalize_path,
)
from .setup_info import SetupInfo, _prepare_wheel_building_kwargs
from .markers import (
cleanup_pyspecs,
contains_pyversion,
format_pyversion,
get_contained_pyversions,
normalize_marker_str,
)
from .setup_info import (
SetupInfo,
_prepare_wheel_building_kwargs,
ast_parse_setup_py,
get_metadata,
parse_setup_cfg,
)
from .url import URI
from .utils import (
DIRECT_URL_RE,
@@ -74,6 +87,7 @@ from ..utils import (
VCS_LIST,
add_ssh_scheme_to_git_uri,
get_setup_paths,
is_installable_dir,
is_installable_file,
is_vcs,
strip_ssh_from_git_uri,
@@ -195,6 +209,18 @@ class Line(object):
except Exception:
return "<Line {0}>".format(self.__dict__.values())
@property
def name_and_specifier(self):
name_str, spec_str = "", ""
if self.name:
name_str = "{0}".format(self.name.lower())
extras_str = extras_to_string(self.extras)
if extras_str:
name_str = "{0}{1}".format(name_str, extras_str)
if self.specifier:
spec_str = "{0}".format(self.specifier)
return "{0}{1}".format(name_str, spec_str)
@classmethod
def split_hashes(cls, line):
# type: (S) -> Tuple[S, List[S]]
@@ -216,6 +242,8 @@ class Line(object):
def line_with_prefix(self):
# type: () -> STRING_TYPE
line = self.line
if self.is_named:
return self.name_and_specifier
extras_str = extras_to_string(self.extras)
if self.is_direct_url:
line = self.link.url
@@ -224,7 +252,7 @@ class Line(object):
line = self.link.url
if "git+file:/" in line and "git+file:///" not in line:
line = line.replace("git+file:/", "git+file:///")
else:
elif extras_str not in line:
line = "{0}{1}".format(line, extras_str)
if self.editable:
return "-e {0}".format(line)
@@ -487,10 +515,10 @@ class Line(object):
:returns: Nothing
:rtype: None
"""
line, hashes = self.split_hashes(self.line)
self.hashes = hashes
self.line = line
return self
def parse_extras(self):
# type: () -> None
@@ -499,7 +527,6 @@ class Line(object):
:returns: Nothing
:rtype: None
"""
extras = None
if "@" in self.line or self.is_vcs or self.is_url:
line = "{0}".format(self.line)
@@ -525,11 +552,11 @@ class Line(object):
extras_set |= name_extras
if extras_set is not None:
self.extras = tuple(sorted(extras_set))
return self
def get_url(self):
# type: () -> STRING_TYPE
"""Sets ``self.name`` if given a **PEP-508** style URL"""
line = self.line
try:
parsed = URI.parse(line)
@@ -569,6 +596,10 @@ class Line(object):
if self._name is None and not self.is_named and not self.is_wheel:
if self.setup_info:
self._name = self.setup_info.name
elif self.is_wheel:
self._name = self._parse_wheel()
if not self._name:
self._name = self.ireq.name
return self._name
@name.setter
@@ -781,6 +812,29 @@ class Line(object):
self._vcsrepo = self._get_vcsrepo()
return self._vcsrepo
@cached_property
def metadata(self):
# type: () -> Dict[Any, Any]
if self.is_local and is_installable_dir(self.path):
return get_metadata(self.path)
return {}
@cached_property
def parsed_setup_cfg(self):
# type: () -> Dict[Any, Any]
if self.is_local and is_installable_dir(self.path):
if self.setup_cfg:
return parse_setup_cfg(self.setup_cfg)
return {}
@cached_property
def parsed_setup_py(self):
# type: () -> Dict[Any, Any]
if self.is_local and is_installable_dir(self.path):
if self.setup_py:
return ast_parse_setup_py(self.setup_py)
return {}
@vcsrepo.setter
def vcsrepo(self, repo):
# type (VCSRepository) -> None
@@ -843,7 +897,6 @@ class Line(object):
def _parse_name_from_link(self):
# type: () -> Optional[STRING_TYPE]
if self.link is None:
return None
if getattr(self.link, "egg_fragment", None):
@@ -881,8 +934,29 @@ class Line(object):
self._specifier = "{0}{1}".format(specifier, version)
return name
def _parse_name_from_path(self):
# type: () -> Optional[S]
if self.path and self.is_local and is_installable_dir(self.path):
metadata = get_metadata(self.path)
if metadata:
name = metadata.get("name", "")
if name:
return name
parsed_setup_cfg = self.parsed_setup_cfg
if parsed_setup_cfg:
name = parsed_setup_cfg.get("name", "")
if name:
return name
parsed_setup_py = self.parsed_setup_py
if parsed_setup_py:
name = parsed_setup_py.get("name", "")
if name:
return name
return None
def parse_name(self):
# type: () -> None
# type: () -> "Line"
if self._name is None:
name = None
if self.link is not None:
@@ -895,13 +969,17 @@ class Line(object):
if "&" in name:
# subdirectory fragments might also be in here
name, _, _ = name.partition("&")
if self.is_named:
if name is None and self.is_named:
name = self._parse_name_from_line()
elif name is None and self.is_file or self.is_url or self.is_path:
if self.is_local:
name = self._parse_name_from_path()
if name is not None:
name, extras = pip_shims.shims._strip_extras(name)
if extras is not None and not self.extras:
self.extras = tuple(sorted(set(parse_extras(extras))))
self._name = name
return self
def _parse_requirement_from_vcs(self):
# type: () -> Optional[PackagingRequirement]
@@ -939,7 +1017,7 @@ class Line(object):
return self._requirement
def parse_requirement(self):
# type: () -> None
# type: () -> "Line"
if self._name is None:
self.parse_name()
if not self._name and not self.is_vcs and not self.is_named:
@@ -982,9 +1060,10 @@ class Line(object):
"dependencies. Please install remote dependency "
"in the form {0}#egg=<package-name>.".format(url)
)
return self
def parse_link(self):
# type: () -> None
# type: () -> "Line"
parsed_url = None # type: Optional[URI]
if not is_valid_url(self.line) and (
self.line.startswith("./")
@@ -1033,6 +1112,7 @@ class Line(object):
self._link = parsed_link
else:
self._link = link
return self
def parse_markers(self):
# type: () -> None
@@ -1110,8 +1190,7 @@ class Line(object):
def parse(self):
# type: () -> None
self.parse_hashes()
self.line, self.markers = split_markers_from_line(self.line)
self.line, self.markers = split_markers_from_line(self.parse_hashes().line)
self.parse_extras()
self.line = self.line.strip('"').strip("'").strip()
if self.line.startswith("git+file:/") and not self.line.startswith(
@@ -1184,8 +1263,8 @@ class NamedRequirement(object):
return cls(**creation_kwargs)
@classmethod
def from_pipfile(cls, name, pipfile): # type: S # type: TPIPFILE
# type: (...) -> NamedRequirement
def from_pipfile(cls, name, pipfile):
# type: (S, TPIPFILE) -> NamedRequirement
creation_args = {} # type: TPIPFILE
if hasattr(pipfile, "keys"):
attr_fields = [field.name for field in attr.fields(cls)]
@@ -1471,84 +1550,12 @@ class FileRequirement(object):
@name.default
def get_name(self):
# type: () -> STRING_TYPE
loc = self.path or self.uri
if loc and not self._uri_scheme:
self._uri_scheme = "path" if self.path else "file"
name = None # type: Optional[STRING_TYPE]
hashed_loc = None # type: Optional[STRING_TYPE]
hashed_name = None # type: Optional[STRING_TYPE]
if loc:
hashed_loc = hashlib.sha256(loc.encode("utf-8")).hexdigest()
hashed_name = hashed_loc[-7:]
if (
getattr(self, "req", None)
and self.req is not None
and getattr(self.req, "name")
and self.req.name is not None
):
if self.is_direct_url and self.req.name != hashed_name:
return self.req.name
if self.link and self.link.egg_fragment and self.link.egg_fragment != hashed_name:
if self.parsed_line and self.parsed_line.name:
return self.parsed_line.name
elif self.link and self.link.egg_fragment:
return self.link.egg_fragment
elif self.link and self.link.is_wheel:
from pip_shims import Wheel
self._has_hashed_name = False
return Wheel(self.link.filename).name
elif self.link and (
(self.link.scheme == "file" or self.editable)
or (self.path and self.setup_path and os.path.isfile(str(self.setup_path)))
):
_ireq = None # type: Optional[InstallRequirement]
target_path = "" # type: STRING_TYPE
if self.setup_py_dir:
target_path = Path(self.setup_py_dir).as_posix()
elif self.path:
target_path = Path(os.path.abspath(self.path)).as_posix()
if self.editable:
line = pip_shims.shims.path_to_url(target_path)
if self.extras:
line = "{0}[{1}]".format(line, ",".join(self.extras))
_ireq = pip_shims.shims.install_req_from_editable(line)
else:
line = target_path
if self.extras:
line = "{0}[{1}]".format(line, ",".join(self.extras))
_ireq = pip_shims.shims.install_req_from_line(line)
if getattr(self, "req", None) is not None:
_ireq.req = copy.deepcopy(self.req)
if self.extras and _ireq and not _ireq.extras:
_ireq.extras = set(self.extras)
from .setup_info import SetupInfo
subdir = getattr(self, "subdirectory", None)
if self.setup_info is not None:
setupinfo = self.setup_info
else:
setupinfo = SetupInfo.from_ireq(_ireq, subdir=subdir)
if setupinfo:
self._setup_info = setupinfo
self._setup_info.get_info()
setupinfo_dict = setupinfo.as_dict()
setup_name = setupinfo_dict.get("name", None)
if setup_name:
name = setup_name
self._has_hashed_name = False
build_requires = setupinfo_dict.get("build_requires")
build_backend = setupinfo_dict.get("build_backend")
if build_requires and not self.pyproject_requires:
self.pyproject_requires = tuple(build_requires)
if build_backend and not self.pyproject_backend:
self.pyproject_backend = build_backend
if not name or name.lower() == "unknown":
self._has_hashed_name = True
name = hashed_name
name_in_link = getattr(self.link, "egg_fragment", "") if self.link else ""
if not self._has_hashed_name and name_in_link != name and self.link is not None:
self.link = create_link("{0}#egg={1}".format(self.link.url, name))
if name is not None:
return name
return ""
elif self.setup_info and self.setup_info.name:
return self.setup_info.name
@link.default
def get_link(self):
@@ -1581,34 +1588,6 @@ class FileRequirement(object):
if req:
return req
req = init_requirement(normalize_name(self.name))
if req is None:
raise ValueError(
"Failed to generate a requirement: missing name for {0!r}".format(self)
)
req.editable = False
if self.link is not None:
req.line = self.link.url_without_fragment
elif self.uri is not None:
req.line = self.uri
else:
req.line = self.name
if self.path and self.link and self.link.scheme.startswith("file"):
req.local_file = True
req.path = self.path
if self.editable:
req.url = None
else:
req.url = self.link.url_without_fragment
else:
req.local_file = False
req.path = None
req.url = self.link.url_without_fragment
if self.editable:
req.editable = True
req.link = self.link
return req
@property
def parsed_line(self):
# type: () -> Optional[Line]
@@ -1639,11 +1618,9 @@ class FileRequirement(object):
if self.link is None:
return False
return (
any(
self.link.scheme.startswith(scheme)
for scheme in ("http", "https", "ftp", "ftps", "uri")
)
and (self.link.is_artifact or self.link.is_wheel)
self._parsed_line
and not self._parsed_line.is_local
and (self._parsed_line.is_artifact or self._parsed_line.is_wheel)
and not self.editable
)
@@ -1833,53 +1810,8 @@ class FileRequirement(object):
@classmethod
def from_line(cls, line, editable=None, extras=None, parsed_line=None):
# type: (AnyStr, Optional[bool], Optional[Tuple[AnyStr, ...]], Optional[Line]) -> F
line = line.strip('"').strip("'")
link = None
path = None
editable = line.startswith("-e ")
line = line.split(" ", 1)[1] if editable else line
setup_path = None
name = None
req = None
if not extras:
extras = ()
else:
extras = tuple(extras)
if not any([is_installable_file(line), is_valid_url(line), is_file_url(line)]):
try:
req = init_requirement(line)
except Exception:
raise RequirementError(
"Supplied requirement is not installable: {0!r}".format(line)
)
else:
name = getattr(req, "name", None)
line = getattr(req, "url", None)
vcs_type, prefer, relpath, path, uri, link = cls.get_link_from_line(line)
arg_dict = {
"path": relpath if relpath else path,
"uri": unquote(link.url_without_fragment),
"link": link,
"editable": editable,
"setup_path": setup_path,
"uri_scheme": prefer,
"line": line,
"extras": extras,
# "name": name,
}
if req is not None:
arg_dict["req"] = req
if parsed_line is not None:
arg_dict["parsed_line"] = parsed_line
if link and link.is_wheel:
from pip_shims import Wheel
arg_dict["name"] = Wheel(link.filename).name
elif name:
arg_dict["name"] = name
elif link.egg_fragment:
arg_dict["name"] = link.egg_fragment
return cls.create(**arg_dict)
parsed_line = Line(line)
file_req_from_parsed_line(parsed_line)
@classmethod
def from_pipfile(cls, name, pipfile):
@@ -1964,8 +1896,9 @@ class FileRequirement(object):
line = "{0}&subdirectory={1}".format(line, pipfile["subdirectory"])
if editable:
line = "-e {0}".format(line)
arg_dict["line"] = line
return cls.create(**arg_dict) # type: ignore
arg_dict["parsed_line"] = Line(line)
arg_dict["setup_info"] = arg_dict["parsed_line"].setup_info
return cls(**arg_dict) # type: ignore
@property
def line_part(self):
@@ -2344,7 +2277,7 @@ class VCSRequirement(FileRequirement):
)
if self.parsed_line and self._parsed_line:
self._parsed_line.vcsrepo = vcsrepo
if self.req:
if self.req and not self.editable:
self.req.specifier = SpecifierSet("=={0}".format(self.setup_info.version))
try:
yield self._repo
@@ -2407,83 +2340,8 @@ class VCSRequirement(FileRequirement):
@classmethod
def from_line(cls, line, editable=None, extras=None, parsed_line=None):
# type: (AnyStr, Optional[bool], Optional[Tuple[AnyStr, ...]], Optional[Line]) -> F
relpath = None
if parsed_line is None:
parsed_line = Line(line)
if editable:
parsed_line.editable = editable
if extras:
parsed_line.extras = extras
if line.startswith("-e "):
editable = True
line = line.split(" ", 1)[1]
if "@" in line:
parsed = urllib_parse.urlparse(add_ssh_scheme_to_git_uri(line))
if not parsed.scheme:
possible_name, _, line = line.partition("@")
possible_name = possible_name.strip()
line = line.strip()
possible_name, extras = pip_shims.shims._strip_extras(possible_name)
name = possible_name
line = "{0}#egg={1}".format(line, name)
vcs_type, prefer, relpath, path, uri, link = cls.get_link_from_line(line)
if not extras and link.egg_fragment:
name, extras = pip_shims.shims._strip_extras(link.egg_fragment)
else:
name, _ = pip_shims.shims._strip_extras(link.egg_fragment)
parsed_extras = None # type: Optional[List[STRING_TYPE]]
extras_tuple = None # type: Optional[Tuple[STRING_TYPE, ...]]
if not extras:
line, extras = pip_shims.shims._strip_extras(line)
if extras:
if isinstance(extras, six.string_types):
parsed_extras = parse_extras(extras)
if parsed_extras:
extras_tuple = tuple(parsed_extras)
subdirectory = link.subdirectory_fragment
ref = None
if uri:
uri, ref = split_ref_from_uri(uri)
if path is not None and "@" in path:
path, _ref = split_ref_from_uri(path)
if ref is None:
ref = _ref
if relpath and "@" in relpath:
relpath, ref = split_ref_from_uri(relpath)
creation_args = {
"name": name if name else parsed_line.name,
"path": relpath or path,
"editable": editable,
"extras": extras_tuple,
"link": link,
"vcs_type": vcs_type,
"line": line,
"uri": uri,
"uri_scheme": prefer,
"parsed_line": parsed_line,
}
if relpath:
creation_args["relpath"] = relpath
# return cls.create(**creation_args)
cls_inst = cls(
name=name,
ref=ref,
vcs=vcs_type,
subdirectory=subdirectory,
link=link,
path=relpath or path,
editable=editable,
uri=uri,
extras=extras_tuple if extras_tuple else tuple(),
base_line=line,
parsed_line=parsed_line,
)
if cls_inst.req and (
cls_inst._parsed_line.ireq and not cls_inst.parsed_line.ireq.req
):
cls_inst._parsed_line._ireq.req = cls_inst.req
return cls_inst
parsed_line = Line(line)
return vcs_req_from_parsed_line(parsed_line)
@property
def line_part(self):
@@ -3238,15 +3096,32 @@ class Requirement(object):
# type: (Union[AnyStr, Marker]) -> None
if not isinstance(markers, Marker):
markers = Marker(markers)
_markers = set() # type: Set[Marker]
if self.ireq and self.ireq.markers:
_markers.add(Marker(self.ireq.markers))
_markers.add(markers)
new_markers = Marker(" or ".join([str(m) for m in sorted(_markers)]))
self.markers = str(new_markers)
if self.req and self.req.req:
self.req.req.marker = new_markers
return
_markers = [] # type: List[Marker]
ireq = self.as_ireq()
if ireq and ireq.markers:
ireq_marker = ireq.markers
_markers.append(str(ireq_marker))
_markers.append(str(markers))
marker_str = " and ".join([normalize_marker_str(m) for m in _markers if m])
new_marker = Marker(marker_str)
line = copy.deepcopy(self._line_instance)
line.markers = marker_str
line.parsed_marker = new_marker
if getattr(line, "_requirement", None) is not None:
line._requirement.marker = new_marker
if getattr(line, "_ireq", None) is not None and line._ireq.req:
line._ireq.req.marker = new_marker
new_ireq = getattr(self, "ireq", None)
if new_ireq and new_ireq.req:
new_ireq.req.marker = new_marker
req = self.req
if req.req:
req_requirement = req.req
req_requirement.marker = new_marker
req = attr.evolve(req, req=req_requirement, parsed_line=line)
return attr.evolve(
self, markers=str(new_marker), ireq=new_ireq, req=req, line_instance=line
)
def file_req_from_parsed_line(parsed_line):
File diff suppressed because it is too large Load Diff
+6
View File
@@ -203,6 +203,12 @@ class URI(object):
fragment = ""
if parsed_dict["fragment"] is not None:
fragment = "{0}".format(parsed_dict["fragment"])
if fragment.startswith("egg="):
name, extras = pip_shims.shims._strip_extras(name_with_extras)
fragment_name, fragment_extras = pip_shims.shims._strip_extras(fragment)
if fragment_extras and not extras:
name_with_extras = "{0}{1}".format(name, fragment_extras)
fragment = ""
elif "&subdirectory" in parsed_dict["path"]:
path, fragment = cls.parse_subdirectory(parsed_dict["path"])
parsed_dict["path"] = path
+99 -19
View File
@@ -17,7 +17,10 @@ from first import first
from packaging.markers import InvalidMarker, Marker, Op, Value, Variable
from packaging.specifiers import InvalidSpecifier, Specifier, SpecifierSet
from packaging.version import parse as parse_version
from plette.models import Package, PackageCollection
from six.moves.urllib import parse as urllib_parse
from tomlkit.container import Container
from tomlkit.items import AoT, Array, Bool, InlineTable, Item, String, Table
from urllib3 import util as urllib3_util
from vistir.compat import lru_cache
from vistir.misc import dedup
@@ -62,9 +65,13 @@ if MYPY_RUNNING:
MarkerTuple = Tuple[TVariable, TOp, TValue]
TRequirement = Union[PackagingRequirement, PkgResourcesRequirement]
STRING_TYPE = Union[bytes, str, Text]
TOML_DICT_TYPES = Union[Container, Package, PackageCollection, Table, InlineTable]
S = TypeVar("S", bytes, str, Text)
TOML_DICT_OBJECTS = (Container, Package, Table, InlineTable, PackageCollection)
TOML_DICT_NAMES = [o.__class__.__name__ for o in TOML_DICT_OBJECTS]
HASH_STRING = " --hash={0}"
ALPHA_NUMERIC = r"[{0}{1}]".format(string.ascii_letters, string.digits)
@@ -111,6 +118,60 @@ def create_link(link):
return Link(link)
def tomlkit_value_to_python(toml_value):
# type: (Union[Array, AoT, TOML_DICT_TYPES, Item]) -> Union[List, Dict]
value_type = type(toml_value).__name__
if (
isinstance(toml_value, TOML_DICT_OBJECTS + (dict,))
or value_type in TOML_DICT_NAMES
):
return tomlkit_dict_to_python(toml_value)
elif isinstance(toml_value, AoT) or value_type == "AoT":
return [tomlkit_value_to_python(val) for val in toml_value._body]
elif isinstance(toml_value, Array) or value_type == "Array":
return [tomlkit_value_to_python(val) for val in list(toml_value)]
elif isinstance(toml_value, String) or value_type == "String":
return "{0!s}".format(toml_value)
elif isinstance(toml_value, Bool) or value_type == "Bool":
return toml_value.value
elif isinstance(toml_value, Item):
return toml_value.value
return toml_value
def tomlkit_dict_to_python(toml_dict):
# type: (TOML_DICT_TYPES) -> Dict
value_type = type(toml_dict).__name__
if toml_dict is None:
raise TypeError("Invalid type NoneType when converting toml dict to python")
converted = None # type: Optional[Dict]
if isinstance(toml_dict, (InlineTable, Table)) or value_type in (
"InlineTable",
"Table",
):
converted = toml_dict.value
elif isinstance(toml_dict, (Package, PackageCollection)) or value_type in (
"Package, PackageCollection"
):
converted = toml_dict._data
if isinstance(converted, Container) or type(converted).__name__ == "Container":
converted = converted.value
elif isinstance(toml_dict, Container) or value_type == "Container":
converted = toml_dict.value
elif isinstance(toml_dict, dict):
converted = toml_dict.copy()
else:
raise TypeError(
"Invalid type for conversion: expected Container, Dict, or Table, "
"got {0!r}".format(toml_dict)
)
if isinstance(converted, dict):
return {k: tomlkit_value_to_python(v) for k, v in converted.items()}
elif isinstance(converted, (TOML_DICT_OBJECTS)) or value_type in TOML_DICT_NAMES:
return tomlkit_dict_to_python(converted)
return converted
def get_url_name(url):
# type: (AnyStr) -> AnyStr
"""
@@ -142,7 +203,12 @@ def init_requirement(name):
def extras_to_string(extras):
# type: (Iterable[S]) -> S
"""Turn a list of extras into a string"""
"""Turn a list of extras into a string
:param List[str]] extras: a list of extras to format
:return: A string of extras
:rtype: str
"""
if isinstance(extras, six.string_types):
if extras.startswith("["):
return extras
@@ -155,8 +221,11 @@ def extras_to_string(extras):
def parse_extras(extras_str):
# type: (AnyStr) -> List[AnyStr]
"""
Turn a string of extras into a parsed extras list
"""Turn a string of extras into a parsed extras list
:param str extras_str: An extras string
:return: A sorted list of extras
:rtype: List[str]
"""
from pkg_resources import Requirement
@@ -167,8 +236,11 @@ def parse_extras(extras_str):
def specs_to_string(specs):
# type: (List[Union[STRING_TYPE, Specifier]]) -> AnyStr
"""
Turn a list of specifier tuples into a string
"""Turn a list of specifier tuples into a string
:param List[Union[Specifier, str]] specs: a list of specifiers to format
:return: A string of specifiers
:rtype: str
"""
if specs:
@@ -212,7 +284,8 @@ def build_vcs_uri(
def convert_direct_url_to_url(direct_url):
# type: (AnyStr) -> AnyStr
"""
"""Converts direct URLs to standard, link-style URLs
Given a direct url as defined by *PEP 508*, convert to a :class:`~pip_shims.shims.Link`
compatible URL by moving the name and extras into an **egg_fragment**.
@@ -253,6 +326,8 @@ def convert_direct_url_to_url(direct_url):
def convert_url_to_direct_url(url, name=None):
# type: (AnyStr, Optional[AnyStr]) -> AnyStr
"""
Converts normal link-style URLs to direct urls.
Given a :class:`~pip_shims.shims.Link` compatible URL, convert to a direct url as
defined by *PEP 508* by extracting the name and extras from the **egg_fragment**.
@@ -303,7 +378,7 @@ def get_version(pipfile_entry):
if str(pipfile_entry) == "{}" or is_star(pipfile_entry):
return ""
elif hasattr(pipfile_entry, "keys") and "version" in pipfile_entry:
if hasattr(pipfile_entry, "keys") and "version" in pipfile_entry:
if is_star(pipfile_entry.get("version")):
return ""
return pipfile_entry.get("version", "").strip().lstrip("(").rstrip(")")
@@ -316,6 +391,8 @@ def get_version(pipfile_entry):
def strip_extras_markers_from_requirement(req):
# type: (TRequirement) -> TRequirement
"""
Strips extras markers from requirement instances.
Given a :class:`~packaging.requirements.Requirement` instance with markers defining
*extra == 'name'*, strip out the extras from the markers and return the cleaned
requirement
@@ -389,7 +466,6 @@ def get_pyproject(path):
:return: A 2 tuple of build requirements and the build backend
:rtype: Optional[Tuple[List[AnyStr], AnyStr]]
"""
if not path:
return
from vistir.compat import Path
@@ -519,8 +595,7 @@ def key_from_req(req):
def _requirement_to_str_lowercase_name(requirement):
"""
Formats a packaging.requirements.Requirement with a lowercase name.
"""Formats a packaging.requirements.Requirement with a lowercase name.
This is simply a copy of
https://github.com/pypa/packaging/blob/16.8/packaging/requirements.py#L109-L124
@@ -531,7 +606,6 @@ def _requirement_to_str_lowercase_name(requirement):
important stuff that should not be lower-cased (such as the marker). See
this issue for more information: https://github.com/pypa/pipenv/issues/2113.
"""
parts = [requirement.name.lower()]
if requirement.extras:
@@ -550,11 +624,15 @@ def _requirement_to_str_lowercase_name(requirement):
def format_requirement(ireq):
"""
"""Formats an `InstallRequirement` instance as a string.
Generic formatter for pretty printing InstallRequirements to the terminal
in a less verbose way than using its `__str__` method.
"""
:param :class:`InstallRequirement` ireq: A pip **InstallRequirement** instance.
:return: A formatted string for prettyprinting
:rtype: str
"""
if ireq.editable:
line = "-e {}".format(ireq.link)
else:
@@ -572,9 +650,13 @@ def format_requirement(ireq):
def format_specifier(ireq):
"""
Generic formatter for pretty printing the specifier part of
InstallRequirements to the terminal.
"""Generic formatter for pretty printing specifiers.
Pretty-prints specifiers from InstallRequirements for output to terminal.
:param :class:`InstallRequirement` ireq: A pip **InstallRequirement** instance.
:return: A string of specifiers in the given install requirement or <any>
:rtype: str
"""
# TODO: Ideally, this is carried over to the pip library itself
specs = ireq.specifier._specs if ireq.req is not None else []
@@ -583,8 +665,7 @@ def format_specifier(ireq):
def get_pinned_version(ireq):
"""
Get the pinned version of an InstallRequirement.
"""Get the pinned version of an InstallRequirement.
An InstallRequirement is considered pinned if:
@@ -602,7 +683,6 @@ def get_pinned_version(ireq):
Raises `TypeError` if the input is not a valid InstallRequirement, or
`ValueError` if the InstallRequirement is not pinned.
"""
try:
specifier = ireq.specifier
except AttributeError:
+1 -33
View File
@@ -132,7 +132,7 @@ def strip_ssh_from_git_uri(uri):
def add_ssh_scheme_to_git_uri(uri):
# type: (S) -> S
"""Cleans VCS uris from pipenv.patched.notpip format"""
"""Cleans VCS uris from pip format"""
if isinstance(uri, six.string_types):
# Add scheme for parsing purposes, this is also what pip does
if uri.startswith("git+") and "://" not in uri:
@@ -169,14 +169,6 @@ def is_editable(pipfile_entry):
return False
def multi_split(s, split):
# type: (S, Iterable[S]) -> List[S]
"""Splits on multiple given separators."""
for r in split:
s = s.replace(r, "|")
return [i for i in s.split("|") if len(i) > 0]
def is_star(val):
# type: (PipfileType) -> bool
return (isinstance(val, six.string_types) and val == "*") or (
@@ -318,30 +310,6 @@ def _ensure_dir(path):
return path
@contextlib.contextmanager
def ensure_setup_py(base):
# type: (STRING_TYPE) -> Generator[None, None, None]
if not base:
base = create_tracked_tempdir(prefix="requirementslib-setup")
base_dir = Path(base)
if base_dir.exists() and base_dir.name == "setup.py":
base_dir = base_dir.parent
elif not (base_dir.exists() and base_dir.is_dir()):
base_dir = base_dir.parent
if not (base_dir.exists() and base_dir.is_dir()):
base_dir = base_dir.parent
setup_py = base_dir.joinpath("setup.py")
is_new = False if setup_py.exists() else True
if not setup_py.exists():
setup_py.write_text(u"")
try:
yield
finally:
if is_new:
setup_py.unlink()
_UNSET = object()
_REMAP_EXIT = object()
+3 -1
View File
@@ -13,6 +13,7 @@ from .contextmanagers import (
cd,
open_file,
replaced_stream,
replaced_streams,
spinner,
temp_environ,
temp_path,
@@ -35,7 +36,7 @@ from .misc import (
from .path import create_tracked_tempdir, create_tracked_tempfile, mkdir_p, rmtree
from .spin import create_spinner
__version__ = "0.3.1"
__version__ = "0.4.0"
__all__ = [
@@ -68,6 +69,7 @@ __all__ = [
"get_wrapped_stream",
"StreamWrapper",
"replaced_stream",
"replaced_streams",
"show_cursor",
"hide_cursor",
]
+393
View File
@@ -0,0 +1,393 @@
# -*- coding: utf-8 -*-
# This Module is taken in full from the click project
# see https://github.com/pallets/click/blob/6cafd32/click/_winconsole.py
# Copyright © 2014 by the Pallets team.
# Some rights reserved.
# Redistribution and use in source and binary forms of the software as well as
# documentation, with or without modification, are permitted provided that the
# following conditions are met:
# Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this
# software without specific prior written permission.
# THIS SOFTWARE AND DOCUMENTATION IS PROVIDED BY THE COPYRIGHT HOLDERS AND
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
# NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE AND
# DOCUMENTATION, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# This module is based on the excellent work by Adam Bartoš who
# provided a lot of what went into the implementation here in
# the discussion to issue1602 in the Python bug tracker.
#
# There are some general differences in regards to how this works
# compared to the original patches as we do not need to patch
# the entire interpreter but just work in our little world of
# echo and prmopt.
import io
import os
import sys
import zlib
import time
import ctypes
import msvcrt
from ctypes import (
byref,
POINTER,
c_int,
c_char,
c_char_p,
c_void_p,
c_ssize_t,
c_ulong,
py_object,
Structure,
windll,
WINFUNCTYPE,
)
from ctypes.wintypes import LPWSTR, LPCWSTR
from six import PY2, text_type
from .misc import StreamWrapper
try:
from ctypes import pythonapi
PyObject_GetBuffer = pythonapi.PyObject_GetBuffer
PyBuffer_Release = pythonapi.PyBuffer_Release
except ImportError:
pythonapi = None
c_ssize_p = POINTER(c_ssize_t)
kernel32 = windll.kernel32
GetStdHandle = kernel32.GetStdHandle
ReadConsoleW = kernel32.ReadConsoleW
WriteConsoleW = kernel32.WriteConsoleW
GetLastError = kernel32.GetLastError
GetConsoleCursorInfo = kernel32.GetConsoleCursorInfo
SetConsoleCursorInfo = kernel32.SetConsoleCursorInfo
GetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32))
CommandLineToArgvW = WINFUNCTYPE(POINTER(LPWSTR), LPCWSTR, POINTER(c_int))(
("CommandLineToArgvW", windll.shell32)
)
# XXX: Added for cursor hiding on windows
STDOUT_HANDLE_ID = ctypes.c_ulong(-11)
STDERR_HANDLE_ID = ctypes.c_ulong(-12)
STDIN_HANDLE = GetStdHandle(-10)
STDOUT_HANDLE = GetStdHandle(-11)
STDERR_HANDLE = GetStdHandle(-12)
STREAM_MAP = {0: STDIN_HANDLE, 1: STDOUT_HANDLE, 2: STDERR_HANDLE}
PyBUF_SIMPLE = 0
PyBUF_WRITABLE = 1
ERROR_SUCCESS = 0
ERROR_NOT_ENOUGH_MEMORY = 8
ERROR_OPERATION_ABORTED = 995
STDIN_FILENO = 0
STDOUT_FILENO = 1
STDERR_FILENO = 2
EOF = b"\x1a"
MAX_BYTES_WRITTEN = 32767
class Py_buffer(Structure):
_fields_ = [
("buf", c_void_p),
("obj", py_object),
("len", c_ssize_t),
("itemsize", c_ssize_t),
("readonly", c_int),
("ndim", c_int),
("format", c_char_p),
("shape", c_ssize_p),
("strides", c_ssize_p),
("suboffsets", c_ssize_p),
("internal", c_void_p),
]
if PY2:
_fields_.insert(-1, ("smalltable", c_ssize_t * 2))
# XXX: This was added for the use of cursors
class CONSOLE_CURSOR_INFO(Structure):
_fields_ = [("dwSize", ctypes.c_int), ("bVisible", ctypes.c_int)]
# On PyPy we cannot get buffers so our ability to operate here is
# serverly limited.
if pythonapi is None:
get_buffer = None
else:
def get_buffer(obj, writable=False):
buf = Py_buffer()
flags = PyBUF_WRITABLE if writable else PyBUF_SIMPLE
PyObject_GetBuffer(py_object(obj), byref(buf), flags)
try:
buffer_type = c_char * buf.len
return buffer_type.from_address(buf.buf)
finally:
PyBuffer_Release(byref(buf))
class _WindowsConsoleRawIOBase(io.RawIOBase):
def __init__(self, handle):
self.handle = handle
def isatty(self):
io.RawIOBase.isatty(self)
return True
class _WindowsConsoleReader(_WindowsConsoleRawIOBase):
def readable(self):
return True
def readinto(self, b):
bytes_to_be_read = len(b)
if not bytes_to_be_read:
return 0
elif bytes_to_be_read % 2:
raise ValueError(
"cannot read odd number of bytes from " "UTF-16-LE encoded console"
)
buffer = get_buffer(b, writable=True)
code_units_to_be_read = bytes_to_be_read // 2
code_units_read = c_ulong()
rv = ReadConsoleW(
self.handle, buffer, code_units_to_be_read, byref(code_units_read), None
)
if GetLastError() == ERROR_OPERATION_ABORTED:
# wait for KeyboardInterrupt
time.sleep(0.1)
if not rv:
raise OSError("Windows error: %s" % GetLastError())
if buffer[0] == EOF:
return 0
return 2 * code_units_read.value
class _WindowsConsoleWriter(_WindowsConsoleRawIOBase):
def writable(self):
return True
@staticmethod
def _get_error_message(errno):
if errno == ERROR_SUCCESS:
return "ERROR_SUCCESS"
elif errno == ERROR_NOT_ENOUGH_MEMORY:
return "ERROR_NOT_ENOUGH_MEMORY"
return "Windows error %s" % errno
def write(self, b):
bytes_to_be_written = len(b)
buf = get_buffer(b)
code_units_to_be_written = min(bytes_to_be_written, MAX_BYTES_WRITTEN) // 2
code_units_written = c_ulong()
WriteConsoleW(
self.handle, buf, code_units_to_be_written, byref(code_units_written), None
)
bytes_written = 2 * code_units_written.value
if bytes_written == 0 and bytes_to_be_written > 0:
raise OSError(self._get_error_message(GetLastError()))
return bytes_written
class ConsoleStream(object):
def __init__(self, text_stream, byte_stream):
self._text_stream = text_stream
self.buffer = byte_stream
@property
def name(self):
return self.buffer.name
def write(self, x):
if isinstance(x, text_type):
return self._text_stream.write(x)
try:
self.flush()
except Exception:
pass
return self.buffer.write(x)
def writelines(self, lines):
for line in lines:
self.write(line)
def __getattr__(self, name):
try:
return getattr(self._text_stream, name)
except io.UnsupportedOperation:
return getattr(self.buffer, name)
def isatty(self):
return self.buffer.isatty()
def __repr__(self):
return "<ConsoleStream name=%r encoding=%r>" % (self.name, self.encoding)
class WindowsChunkedWriter(object):
"""
Wraps a stream (such as stdout), acting as a transparent proxy for all
attribute access apart from method 'write()' which we wrap to write in
limited chunks due to a Windows limitation on binary console streams.
"""
def __init__(self, wrapped):
# double-underscore everything to prevent clashes with names of
# attributes on the wrapped stream object.
self.__wrapped = wrapped
def __getattr__(self, name):
return getattr(self.__wrapped, name)
def write(self, text):
total_to_write = len(text)
written = 0
while written < total_to_write:
to_write = min(total_to_write - written, MAX_BYTES_WRITTEN)
self.__wrapped.write(text[written : written + to_write])
written += to_write
_wrapped_std_streams = set()
def _wrap_std_stream(name):
# Python 2 & Windows 7 and below
if PY2 and sys.getwindowsversion()[:2] <= (6, 1) and name not in _wrapped_std_streams:
setattr(sys, name, WindowsChunkedWriter(getattr(sys, name)))
_wrapped_std_streams.add(name)
def _get_text_stdin(buffer_stream):
text_stream = StreamWrapper(
io.BufferedReader(_WindowsConsoleReader(STDIN_HANDLE)),
"utf-16-le",
"strict",
line_buffering=True,
)
return ConsoleStream(text_stream, buffer_stream)
def _get_text_stdout(buffer_stream):
text_stream = StreamWrapper(
io.BufferedWriter(_WindowsConsoleWriter(STDOUT_HANDLE)),
"utf-16-le",
"strict",
line_buffering=True,
)
return ConsoleStream(text_stream, buffer_stream)
def _get_text_stderr(buffer_stream):
text_stream = StreamWrapper(
io.BufferedWriter(_WindowsConsoleWriter(STDERR_HANDLE)),
"utf-16-le",
"strict",
line_buffering=True,
)
return ConsoleStream(text_stream, buffer_stream)
if PY2:
def _hash_py_argv():
return zlib.crc32("\x00".join(sys.argv[1:]))
_initial_argv_hash = _hash_py_argv()
def _get_windows_argv():
argc = c_int(0)
argv_unicode = CommandLineToArgvW(GetCommandLineW(), byref(argc))
argv = [argv_unicode[i] for i in range(0, argc.value)]
if not hasattr(sys, "frozen"):
argv = argv[1:]
while len(argv) > 0:
arg = argv[0]
if not arg.startswith("-") or arg == "-":
break
argv = argv[1:]
if arg.startswith(("-c", "-m")):
break
return argv[1:]
_stream_factories = {0: _get_text_stdin, 1: _get_text_stdout, 2: _get_text_stderr}
def _get_windows_console_stream(f, encoding, errors):
if (
get_buffer is not None
and encoding in ("utf-16-le", None)
and errors in ("strict", None)
and hasattr(f, "isatty")
and f.isatty()
):
if isinstance(f, ConsoleStream):
return f
func = _stream_factories.get(f.fileno())
if func is not None:
if not PY2:
f = getattr(f, "buffer", None)
if f is None:
return None
else:
# If we are on Python 2 we need to set the stream that we
# deal with to binary mode as otherwise the exercise if a
# bit moot. The same problems apply as for
# get_binary_stdin and friends from _compat.
msvcrt.setmode(f.fileno(), os.O_BINARY)
return func(f)
def hide_cursor():
cursor_info = CONSOLE_CURSOR_INFO()
GetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info))
cursor_info.visible = False
SetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info))
def show_cursor():
cursor_info = CONSOLE_CURSOR_INFO()
GetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info))
cursor_info.visible = True
SetConsoleCursorInfo(STDOUT_HANDLE, ctypes.byref(cursor_info))
def get_stream_handle(stream):
return STREAM_MAP.get(stream.fileno())
+26 -17
View File
@@ -42,33 +42,28 @@ __all__ = [
if sys.version_info >= (3, 5):
from pathlib import Path
from functools import lru_cache
else:
from pipenv.vendor.pathlib2 import Path
if six.PY3:
# Only Python 3.4+ is supported
from functools import lru_cache, partialmethod
from tempfile import NamedTemporaryFile
from shutil import get_terminal_size
from weakref import finalize
else:
# Only Python 2.7 is supported
from pipenv.vendor.backports.functools_lru_cache import lru_cache
if sys.version_info < (3, 3):
from .backports.functools import partialmethod # type: ignore
from pipenv.vendor.backports.shutil_get_terminal_size import get_terminal_size
NamedTemporaryFile = _NamedTemporaryFile
else:
from tempfile import NamedTemporaryFile
from shutil import get_terminal_size
try:
from weakref import finalize
except ImportError:
from pipenv.vendor.backports.weakref import finalize # type: ignore
try:
from functools import partialmethod
except Exception:
from .backports.functools import partialmethod # type: ignore
try:
# Introduced Python 3.5
from json import JSONDecodeError
except ImportError: # Old Pythons.
except ImportError:
JSONDecodeError = ValueError # type: ignore
if six.PY2:
@@ -205,6 +200,20 @@ class TemporaryDirectory(object):
self._rmtree(self.name)
def is_bytes(string):
"""Check if a string is a bytes instance
:param Union[str, bytes] string: A string that may be string or bytes like
:return: Whether the provided string is a bytes type or not
:rtype: bool
"""
if six.PY3 and isinstance(string, (bytes, memoryview, bytearray)): # noqa
return True
elif six.PY2 and isinstance(string, (buffer, bytearray)): # noqa
return True
return False
def fs_str(string):
"""Encodes a string into the proper filesystem encoding
+2
View File
@@ -21,6 +21,7 @@ __all__ = [
"spinner",
"dummy_spinner",
"replaced_stream",
"replaced_streams",
]
@@ -316,6 +317,7 @@ def replaced_stream(stream_name):
>>> sys.stdout.write("hello")
'hello'
"""
orig_stream = getattr(sys, stream_name)
new_stream = six.StringIO()
try:
+7 -23
View File
@@ -1,19 +1,10 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function
import ctypes
import os
import sys
__all__ = ["hide_cursor", "show_cursor"]
class CONSOLE_CURSOR_INFO(ctypes.Structure):
_fields_ = [("dwSize", ctypes.c_int), ("bVisible", ctypes.c_int)]
WIN_STDERR_HANDLE_ID = ctypes.c_ulong(-12)
WIN_STDOUT_HANDLE_ID = ctypes.c_ulong(-11)
__all__ = ["hide_cursor", "show_cursor", "get_stream_handle"]
def get_stream_handle(stream=sys.stdout):
@@ -26,10 +17,9 @@ def get_stream_handle(stream=sys.stdout):
"""
handle = stream
if os.name == "nt":
from ctypes import windll
from ._winconsole import get_stream_handle as get_win_stream_handle
handle_id = WIN_STDOUT_HANDLE_ID
handle = windll.kernel32.GetStdHandle(handle_id)
return get_win_stream_handle(stream)
return handle
@@ -44,12 +34,9 @@ def hide_cursor(stream=sys.stdout):
handle = get_stream_handle(stream=stream)
if os.name == "nt":
from ctypes import windll
from ._winconsole import hide_cursor
cursor_info = CONSOLE_CURSOR_INFO()
windll.kernel32.GetConsoleCursorInfo(handle, ctypes.byref(cursor_info))
cursor_info.visible = False
windll.kernel32.SetConsoleCursorInfo(handle, ctypes.byref(cursor_info))
hide_cursor()
else:
handle.write("\033[?25l")
handle.flush()
@@ -66,12 +53,9 @@ def show_cursor(stream=sys.stdout):
handle = get_stream_handle(stream=stream)
if os.name == "nt":
from ctypes import windll
from ._winconsole import show_cursor
cursor_info = CONSOLE_CURSOR_INFO()
windll.kernel32.GetConsoleCursorInfo(handle, ctypes.byref(cursor_info))
cursor_info.visible = True
windll.kernel32.SetConsoleCursorInfo(handle, ctypes.byref(cursor_info))
show_cursor()
else:
handle.write("\033[?25h")
handle.flush()
+258 -6
View File
@@ -11,12 +11,22 @@ import sys
from collections import OrderedDict
from functools import partial
from itertools import islice, tee
from weakref import WeakKeyDictionary
import six
from .cmdparse import Script
from .compat import Iterable, Path, StringIO, fs_str, partialmethod, to_native_string
from .compat import (
Iterable,
Path,
StringIO,
fs_str,
is_bytes,
partialmethod,
to_native_string,
)
from .contextmanagers import spinner as spinner
from .termcolors import ANSI_REMOVAL_RE, colorize
if os.name != "nt":
@@ -514,7 +524,7 @@ def chunked(n, iterable):
try:
locale_encoding = locale.getdefaultencoding()[1] or "ascii"
locale_encoding = locale.getdefaultlocale()[1] or "ascii"
except Exception:
locale_encoding = "ascii"
@@ -617,20 +627,47 @@ def get_canonical_encoding_name(name):
return codec.name
def get_wrapped_stream(stream):
def _is_binary_buffer(stream):
try:
stream.write(b"")
except Exception:
try:
stream.write("")
except Exception:
pass
return False
return True
def _get_binary_buffer(stream):
if six.PY3 and not _is_binary_buffer(stream):
stream = getattr(stream, "buffer", None)
if stream is not None and _is_binary_buffer(stream):
return stream
return stream
def get_wrapped_stream(stream, encoding=None, errors="replace"):
"""
Given a stream, wrap it in a `StreamWrapper` instance and return the wrapped stream.
:param stream: A stream instance to wrap
:param str encoding: The encoding to use for the stream
:param str errors: The error handler to use, default "replace"
:returns: A new, wrapped stream
:rtype: :class:`StreamWrapper`
"""
if stream is None:
raise TypeError("must provide a stream to wrap")
encoding = getattr(stream, "encoding", None)
encoding = get_output_encoding(encoding)
return StreamWrapper(stream, encoding, "replace", line_buffering=True)
stream = _get_binary_buffer(stream)
if stream is not None and encoding is None:
encoding = "utf-8"
if not encoding:
encoding = get_output_encoding(stream)
else:
encoding = get_canonical_encoding_name(encoding)
return StreamWrapper(stream, encoding, errors, line_buffering=True)
class StreamWrapper(io.TextIOWrapper):
@@ -656,9 +693,26 @@ class StreamWrapper(io.TextIOWrapper):
self.flush()
except Exception:
pass
# This is modified from the initial implementation to rely on
# our own decoding functionality to preserve unicode strings where
# possible
return self.buffer.write(str(x))
return io.TextIOWrapper.write(self, x)
else:
def write(self, x):
# try to use backslash and surrogate escape strategies before failing
old_errors = getattr(self, "_errors", self.errors)
self._errors = (
"backslashescape" if self.encoding != "mbcs" else "surrogateescape"
)
try:
return io.TextIOWrapper.write(self, to_text(x, errors=self._errors))
except UnicodeDecodeError:
self._errors = old_errors
return io.TextIOWrapper.write(self, to_text(x, errors=self._errors))
def writelines(self, lines):
for line in lines:
self.write(line)
@@ -720,3 +774,201 @@ class _StreamProvider(object):
except Exception:
return False
return True
# XXX: The approach here is inspired somewhat by click with details taken from various
# XXX: other sources. Specifically we are using a stream cache and stream wrapping
# XXX: techniques from click (loosely inspired for the most part, with many details)
# XXX: heavily modified to suit our needs
def _isatty(stream):
try:
is_a_tty = stream.isatty()
except Exception:
is_a_tty = False
return is_a_tty
_wrap_for_color = None
try:
import colorama
except ImportError:
colorama = None
_color_stream_cache = WeakKeyDictionary()
if os.name == "nt" or sys.platform.startswith("win"):
def _wrap_for_color(stream, allow_color=True):
if colorama is not None:
try:
cached = _color_stream_cache.get(stream)
except KeyError:
cached = None
if cached is not None:
return cached
if not _isatty(stream):
allow_color = False
_color_wrapper = colorama.AnsiToWin32(stream, strip=not allow_color)
result = _color_wrapper.stream
_write = result.write
def _write_with_color(s):
try:
return _write(s)
except Exception:
_color_wrapper.reset_all()
raise
result.write = _write_with_color
try:
_color_stream_cache[stream] = result
except Exception:
pass
return result
return stream
def _cached_stream_lookup(stream_lookup_func, stream_resolution_func):
stream_cache = WeakKeyDictionary()
def lookup():
stream = stream_lookup_func()
result = None
if stream in stream_cache:
result = stream_cache.get(stream, None)
if result is not None:
return result
result = stream_resolution_func()
try:
stream = stream_lookup_func()
stream_cache[stream] = result
except Exception:
pass
return result
return lookup
def get_text_stream(stream="stdout", encoding=None, allow_color=True):
"""Retrieve a unicode stream wrapper around **sys.stdout** or **sys.stderr**.
:param str stream: The name of the stream to wrap from the :mod:`sys` module.
:param str encoding: An optional encoding to use.
:return: A new :class:`~vistir.misc.StreamWrapper` instance around the stream
:rtype: `vistir.misc.StreamWrapper`
"""
stream_map = {"stdin": sys.stdin, "stdout": sys.stdout, "stderr": sys.stderr}
if os.name == "nt" or sys.platform.startswith("win"):
from ._winconsole import _get_windows_console_stream, _wrap_std_stream
else:
_get_windows_console_stream = lambda *args: None # noqa
_wrap_std_stream = lambda *args: None # noqa
if six.PY2 and stream != "stdin":
_wrap_std_stream(stream)
sys_stream = stream_map[stream]
windows_console = _get_windows_console_stream(sys_stream, encoding, None)
if windows_console is not None:
return windows_console
return get_wrapped_stream(sys_stream, encoding)
def get_text_stdout():
return get_text_stream("stdout")
def get_text_stderr():
return get_text_stream("stderr")
def get_text_stdin():
return get_text_stream("stdin")
TEXT_STREAMS = {
"stdin": get_text_stdin,
"stdout": get_text_stdout,
"stderr": get_text_stderr,
}
_text_stdin = _cached_stream_lookup(lambda: sys.stdin, get_text_stdin)
_text_stdout = _cached_stream_lookup(lambda: sys.stdout, get_text_stdout)
_text_stderr = _cached_stream_lookup(lambda: sys.stderr, get_text_stderr)
def replace_with_text_stream(stream_name):
"""Given a stream name, replace the target stream with a text-converted equivalent
:param str stream_name: The name of a target stream, such as **stdout** or **stderr**
:return: None
"""
new_stream = TEXT_STREAMS.get(stream_name)
if new_stream is not None:
new_stream = new_stream()
setattr(sys, stream_name, new_stream)
return None
def _can_use_color(stream=None, fg=None, bg=None, style=None):
if not any([fg, bg, style]):
if not stream:
stream = sys.stdin
return _isatty(stream)
return any([fg, bg, style])
def echo(text, fg=None, bg=None, style=None, file=None, err=False):
"""Write the given text to the provided stream or **sys.stdout** by default.
Provides optional foreground and background colors from the ansi defaults:
**grey**, **red**, **green**, **yellow**, **blue**, **magenta**, **cyan**
or **white**.
Available styles include **bold**, **dark**, **underline**, **blink**, **reverse**,
**concealed**
:param str text: Text to write
:param str fg: Foreground color to use (default: None)
:param str bg: Foreground color to use (default: None)
:param str style: Style to use (default: None)
:param stream file: File to write to (default: None)
"""
if file and not hasattr(file, "write"):
raise TypeError("Expected a writable stream, received {0!r}".format(file))
if not file:
if err:
file = _text_stderr()
else:
file = _text_stdout()
if text and not isinstance(text, (six.string_types, bytes, bytearray)):
text = six.text_type(text)
text = "" if not text else text
if isinstance(text, six.text_type):
text += "\n"
else:
text += b"\n"
if text and six.PY3 and is_bytes(text):
buffer = _get_binary_buffer(file)
if buffer is not None:
file.flush()
buffer.write(text)
buffer.flush()
return
if text and not is_bytes(text):
can_use_color = _can_use_color(file, fg=fg, bg=bg, style=style)
if os.name == "nt":
text = colorize(text, fg=fg, bg=bg, attrs=style)
file = _wrap_for_color(file, allow_color=can_use_color)
elif not can_use_color:
text = ANSI_REMOVAL_RE.sub("", text)
if text:
file.write(text)
file.flush()
+8 -15
View File
@@ -33,7 +33,6 @@ from .compat import (
if IS_TYPE_CHECKING:
from typing import Optional, Callable, Text, ByteString, AnyStr
__all__ = [
"check_for_unc_path",
"get_converted_relative_path",
@@ -423,16 +422,17 @@ def handle_remove_readonly(func, path, exc):
try:
func(path)
except (OSError, IOError, FileNotFoundError, PermissionError) as e:
if e.errno == errno.ENOENT:
return
elif e.errno in PERM_ERRORS:
if e.errno in PERM_ERRORS:
if e.errno == errno.ENOENT:
return
remaining = None
if os.path.isdir(path):
remaining =_wait_for_files(path)
remaining = _wait_for_files(path)
if remaining:
warnings.warn(default_warning_message.format(path), ResourceWarning)
else:
func(path, ignore_errors=True)
return
raise
if exc_exception.errno in PERM_ERRORS:
set_write_bit(path)
@@ -441,16 +441,9 @@ def handle_remove_readonly(func, path, exc):
func(path)
except (OSError, IOError, FileNotFoundError, PermissionError) as e:
if e.errno in PERM_ERRORS:
warnings.warn(default_warning_message.format(path), ResourceWarning)
pass
elif e.errno == errno.ENOENT: # File already gone
pass
else:
raise
else:
if e.errno != errno.ENOENT: # File still exists
warnings.warn(default_warning_message.format(path), ResourceWarning)
return
elif exc_exception.errno == errno.ENOENT:
pass
else:
raise exc_exception
+28 -43
View File
@@ -2,8 +2,10 @@
from __future__ import absolute_import, print_function, unicode_literals
import os
import re
import colorama
import six
from .compat import to_native_string
@@ -12,44 +14,14 @@ DISABLE_COLORS = os.getenv("CI", False) or os.getenv(
)
ATTRIBUTES = dict(
list(
zip(
["bold", "dark", "", "underline", "blink", "", "reverse", "concealed"],
list(range(1, 9)),
)
)
)
ATTRIBUTE_NAMES = ["bold", "dark", "", "underline", "blink", "", "reverse", "concealed"]
ATTRIBUTES = dict(zip(ATTRIBUTE_NAMES, range(1, 9)))
del ATTRIBUTES[""]
HIGHLIGHTS = dict(
list(
zip(
[
"on_grey",
"on_red",
"on_green",
"on_yellow",
"on_blue",
"on_magenta",
"on_cyan",
"on_white",
],
list(range(40, 48)),
)
)
)
COLORS = dict(
list(
zip(
["grey", "red", "green", "yellow", "blue", "magenta", "cyan", "white"],
list(range(30, 38)),
)
)
)
colors = ["grey", "red", "green", "yellow", "blue", "magenta", "cyan", "white"]
COLORS = dict(zip(colors, range(30, 38)))
HIGHLIGHTS = dict(zip(["on_{0}".format(c) for c in colors], range(40, 48)))
ANSI_REMOVAL_RE = re.compile(r"\033\[((?:\d|;)*)([a-zA-Z])")
COLOR_MAP = {
@@ -99,25 +71,36 @@ def colored(text, color=None, on_color=None, attrs=None):
colored('Hello, World!', 'red', 'on_grey', ['blue', 'blink'])
colored('Hello, World!', 'green')
"""
return colorize(text, fg=color, bg=on_color, attrs=attrs)
def colorize(text, fg=None, bg=None, attrs=None):
if os.getenv("ANSI_COLORS_DISABLED") is None:
style = "NORMAL"
if "bold" in attrs:
if attrs is not None and not isinstance(attrs, list):
_attrs = []
if isinstance(attrs, six.string_types):
_attrs.append(attrs)
else:
_attrs = list(attrs)
attrs = _attrs
if attrs and "bold" in attrs:
style = "BRIGHT"
attrs.remove("bold")
if color is not None:
color = color.upper()
if fg is not None:
fg = fg.upper()
text = to_native_string("%s%s%s%s%s") % (
to_native_string(getattr(colorama.Fore, color)),
to_native_string(getattr(colorama.Fore, fg)),
to_native_string(getattr(colorama.Style, style)),
to_native_string(text),
to_native_string(colorama.Fore.RESET),
to_native_string(colorama.Style.NORMAL),
)
if on_color is not None:
on_color = on_color.upper()
if bg is not None:
bg = bg.upper()
text = to_native_string("%s%s%s%s") % (
to_native_string(getattr(colorama.Back, on_color)),
to_native_string(getattr(colorama.Back, bg)),
to_native_string(text),
to_native_string(colorama.Back.RESET),
to_native_string(colorama.Style.NORMAL),
@@ -129,6 +112,8 @@ def colored(text, color=None, on_color=None, attrs=None):
text = fmt_str % (ATTRIBUTES[attr], text)
text += RESET
else:
text = ANSI_REMOVAL_RE.sub("", text)
return text
+3 -1
View File
@@ -155,10 +155,12 @@ def isolate(create_tmpdir):
home_dir = os.path.join(str(create_tmpdir()), "home")
os.makedirs(home_dir)
mkdir_p(os.path.join(home_dir, ".config", "git"))
with open(os.path.join(home_dir, ".config", "git", "config"), "wb") as fp:
git_config_file = os.path.join(home_dir, ".config", "git", "config")
with open(git_config_file, "wb") as fp:
fp.write(
b"[user]\n\tname = pipenv\n\temail = pipenv@pipenv.org\n"
)
os.environ["GIT_CONFIG"] = fs_str(git_config_file)
os.environ["GIT_CONFIG_NOSYSTEM"] = fs_str("1")
os.environ["GIT_AUTHOR_NAME"] = fs_str("pipenv")
os.environ["GIT_AUTHOR_EMAIL"] = fs_str("pipenv@pipenv.org")