Remove passa from vendors

This commit is contained in:
Frost Ming
2021-07-26 15:02:15 +08:00
parent d10502ce9b
commit 8c9039d0af
50 changed files with 4 additions and 3573 deletions
-1
View File
@@ -7,7 +7,6 @@ stdeb = {version="*", markers="sys_platform == 'linux'"}
jedi = "*"
isort = "*"
rope = "*"
passa = {git = "https://github.com/sarugaku/passa.git"}
sphinxcontrib-spelling = "<4.3.0"
[packages]
-13
View File
@@ -1,13 +0,0 @@
Copyright (c) 2018, Dan Ryan <dan@danryan.co> and Tzu-ping Chung <uranusjr@gmail.com>
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-7
View File
@@ -1,7 +0,0 @@
# -*- coding=utf-8 -*-
__all__ = [
'__version__'
]
__version__ = '0.3.1.dev0'
-6
View File
@@ -1,6 +0,0 @@
# -*- coding=utf-8 -*-
from .cli import main
if __name__ == '__main__':
main()
View File
-57
View File
@@ -1,57 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import itertools
import sys
def add_packages(packages=[], editables=[], project=None, dev=False, sync=False, clean=False):
from passa.models.lockers import PinReuseLocker
from passa.operations.lock import lock
lines = list(itertools.chain(
packages,
("-e {}".format(e) for e in editables),
))
project = project
for line in lines:
try:
project.add_line_to_pipfile(line, develop=dev)
except (TypeError, ValueError) as e:
print("Cannot add {line!r} to Pipfile: {error}".format(
line=line, error=str(e),
), file=sys.stderr)
return 2
prev_lockfile = project.lockfile
locker = PinReuseLocker(project)
success = lock(locker)
if not success:
return 1
project._p.write()
project._l.write()
print("Written to project at", project.root)
if not sync:
return
from passa.models.synchronizers import Synchronizer
from passa.operations.sync import sync
lockfile_diff = project.difference_lockfile(prev_lockfile)
default = any(lockfile_diff.default)
develop = any(lockfile_diff.develop)
syncer = Synchronizer(
project, default=default, develop=develop,
clean_unneeded=clean,
)
success = sync(syncer)
if not success:
return 1
print("Synchronized project at", project.root)
-16
View File
@@ -1,16 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
def clean(project, default=True, dev=False):
from passa.models.synchronizers import Cleaner
from passa.operations.sync import clean
cleaner = Cleaner(project, default=default, develop=dev)
success = clean(cleaner)
if not success:
return 1
print("Cleaned project at", project.root)
-93
View File
@@ -1,93 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import contextlib
import io
import itertools
import sys
import vistir.misc
def _source_as_lines(source, extra):
url = source["url"]
if extra:
lines = ["--extra-index-url {}".format(url)]
else:
lines = ["--index-url {}".format(url)]
if not source.get("verify_ssl", True):
lines = ["--trusted-host {}".format(url)]
return lines
def _requirement_as_line(requirement, sources, include_hashes):
if requirement.index:
sources = sources
else:
sources = None
line = vistir.misc.to_text(
requirement.as_line(sources=sources, include_hashes=include_hashes)
)
return line
@contextlib.contextmanager
def open_for_output(filename):
if filename is None:
yield sys.stdout
return
with io.open(filename, "w", encoding="utf-8", newline="\n") as f:
yield f
def freeze(project=None, default=True, dev=True, include_hashes=None, target=None):
from requirementslib import Requirement
lockfile = project.lockfile
if not lockfile:
print("Pipfile.lock is required to export.", file=sys.stderr)
return 1
section_names = []
if default:
section_names.append("default")
if dev:
section_names.append("develop")
requirements = [
Requirement.from_pipfile(key, entry._data)
for key, entry in itertools.chain.from_iterable(
lockfile.get(name, {}).items()
for name in section_names
)
]
if include_hashes is None:
include_hashes = all(r.is_named for r in requirements)
sources = lockfile.meta.sources._data
source_lines = list(vistir.misc.dedup(itertools.chain(
itertools.chain.from_iterable(
_source_as_lines(source, False)
for source in sources[:1]
),
itertools.chain.from_iterable(
_source_as_lines(source, True)
for source in sources[1:]
),
)))
requirement_lines = sorted(vistir.misc.dedup(
_requirement_as_line(requirement, sources, include_hashes)
for requirement in requirements
))
with open_for_output(target) as f:
for line in source_lines:
f.write(line)
f.write("\n")
f.write("\n")
for line in requirement_lines:
f.write(line)
f.write("\n")
-59
View File
@@ -1,59 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import io
import os
from pip_shims import Command as PipCommand, cmdoptions
import plette
import six
import vistir
class PipCmd(PipCommand):
name = "PipCmd"
def get_sources(urls, trusted_hosts):
trusted_hosts = [six.moves.urllib.parse.urlparse(url).netloc for url in trusted_hosts]
sources = []
for url in urls:
parsed_url = six.moves.urllib.parse.urlparse(url)
netloc = parsed_url.netloc
if '@' in netloc:
_, _, netloc = netloc.rpartition('@')
name, _, _ = netloc.partition('.') # Just use the domain name as the source name
verify_ssl = True
if netloc in trusted_hosts:
verify_ssl = False
sources.append({"url": url, "name": name, "verify_ssl": verify_ssl})
return sources
def init_project(root=None, python_version=None):
pipfile_path = os.path.join(root, "Pipfile")
if os.path.isfile(pipfile_path):
raise RuntimeError("{0!r} is already a Pipfile project".format(root))
if not os.path.exists(root):
vistir.path.mkdir_p(root, mode=0o755)
pip_command = PipCmd()
cmdoptions.make_option_group(cmdoptions.index_group, pip_command.parser)
parsed, _ = pip_command.parser.parse_args([])
index_urls = [parsed.index_url] + parsed.extra_index_urls
sources = get_sources(index_urls, parsed.trusted_hosts)
data = {
"sources": sources,
"packages": {},
"dev-packages": {},
}
if python_version:
data["requires"] = {"python_version": python_version}
return create_project(pipfile_path=pipfile_path, data=data)
def create_project(pipfile_path, data={}):
pipfile = plette.pipfiles.Pipfile(data=data)
with io.open(pipfile_path, "w") as fh:
pipfile.dump(fh)
print("Successfully created new pipfile at {0!r}".format(pipfile_path))
return 0
-32
View File
@@ -1,32 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
def install(project=None, check=True, dev=False, clean=True):
from passa.models.lockers import BasicLocker
from passa.operations.lock import lock
project = project
if not check or not project.is_synced():
locker = BasicLocker(project)
success = lock(locker)
if not success:
return 1
project._l.write()
print("Written to project at", project.root)
from passa.models.synchronizers import Synchronizer
from passa.operations.sync import sync
syncer = Synchronizer(
project, default=True, develop=dev,
clean_unneeded=clean,
)
success = sync(syncer)
if not success:
return 1
print("Synchronized project at", project.root)
-17
View File
@@ -1,17 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
def lock(project=None):
from passa.models.lockers import BasicLocker
from passa.operations.lock import lock
project = project
locker = BasicLocker(project)
success = lock(locker)
if not success:
return
project._l.write()
print("Written to project at", project.root)
-38
View File
@@ -1,38 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
def remove(project=None, only="default", packages=[], clean=True):
from passa.models.lockers import PinReuseLocker
from passa.operations.lock import lock
default = (only != "dev")
develop = (only != "default")
project = project
project.remove_keys_from_pipfile(
packages, default=default, develop=develop,
)
locker = PinReuseLocker(project)
success = lock(locker)
if not success:
return 1
project._p.write()
project._l.write()
print("Written to project at", project.root)
if not clean:
return
from passa.models.synchronizers import Cleaner
from passa.operations.sync import clean
cleaner = Cleaner(project, default=True, develop=True)
success = clean(cleaner)
if not success:
return 1
print("Cleaned project at", project.root)
-20
View File
@@ -1,20 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
def sync(project=None, dev=False, clean=True):
from passa.models.synchronizers import Synchronizer
from passa.operations.sync import sync
project = project
syncer = Synchronizer(
project, default=True, develop=dev,
clean_unneeded=clean,
)
success = sync(syncer)
if not success:
return 1
print("Synchronized project at", project.root)
-52
View File
@@ -1,52 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import sys
def upgrade(project=None, strategy="only-if-needed", sync=True, packages=[]):
from passa.models.lockers import EagerUpgradeLocker, PinReuseLocker
from passa.operations.lock import lock
for package in packages:
if not project.contains_key_in_pipfile(package):
print("{package!r} not found in Pipfile".format(
package=package,
), file=sys.stderr)
return 2
project.remove_keys_from_lockfile(packages)
prev_lockfile = project.lockfile
if strategy == "eager":
locker = EagerUpgradeLocker(project, packages)
else:
locker = PinReuseLocker(project)
success = lock(locker)
if not success:
return 1
project._l.write()
print("Written to project at", project.root)
if not sync:
return
from passa.operations.sync import sync
from passa.models.synchronizers import Synchronizer
lockfile_diff = project.difference_lockfile(prev_lockfile)
default = bool(any(lockfile_diff.default))
develop = bool(any(lockfile_diff.develop))
syncer = Synchronizer(
project, default=default, develop=develop,
clean_unneeded=False,
)
success = sync(syncer)
if not success:
return 1
print("Synchronized project at", project.root)
-49
View File
@@ -1,49 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import argparse
import importlib
import pkgutil
import sys
from passa import __version__
CURRENT_MODULE_PATH = sys.modules[__name__].__path__
def main(argv=None):
root_parser = argparse.ArgumentParser(
prog="passa",
description="Pipfile project management tool.",
)
root_parser.add_argument(
"--version",
action="version",
version="%(prog)s, version {}".format(__version__),
help="show the version and exit",
)
subparsers = root_parser.add_subparsers()
for _, name, _ in pkgutil.iter_modules(CURRENT_MODULE_PATH, "."):
module = importlib.import_module(name, __name__)
try:
klass = module.Command
except AttributeError:
continue
parser = subparsers.add_parser(klass.name, help=klass.description)
command = klass(parser)
parser.set_defaults(func=command.run)
options = root_parser.parse_args(argv)
try:
f = options.func
except AttributeError:
root_parser.print_help()
result = -1
else:
result = f(options)
if result is not None:
sys.exit(result)
-61
View File
@@ -1,61 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import argparse
import os
import sys
from .options import project
class BaseCommand(object):
"""A CLI command.
"""
name = None
description = None
default_arguments = [project]
arguments = []
def __init__(self, parser=None):
if not parser:
parser = argparse.ArgumentParser(
prog=os.path.basename(sys.argv[0]),
description="Base argument parser for passa"
)
self.parser = parser
self.add_arguments()
@classmethod
def build_parser(cls):
parser = argparse.ArgumentParser(
prog="passa {}".format(cls.name),
description=cls.description,
)
return cls(parser)
@classmethod
def run_parser(cls):
parser = cls.build_parser()
parser()
def __call__(self, argv=None):
options = self.parser.parse_args(argv)
result = self.main(options)
if result is not None:
sys.exit(result)
def add_default_arguments(self):
for arg in self.default_arguments:
arg.add_to_parser(self.parser)
def add_arguments(self):
self.add_default_arguments()
for arg in self.arguments:
arg.add_to_parser(self.parser)
def main(self, options):
return self.run(options)
def run(self, options):
raise NotImplementedError
-29
View File
@@ -1,29 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from ..actions.add import add_packages
from ._base import BaseCommand
from .options import package_group
class Command(BaseCommand):
name = "add"
description = "Add packages to project."
arguments = [package_group]
def run(self, options):
if not options.editables and not options.packages:
self.parser.error("Must supply either a requirement or --editable")
return add_packages(
packages=options.packages,
editables=options.editables,
project=options.project,
dev=options.dev,
sync=options.sync
)
if __name__ == "__main__":
Command.run_parser()
-21
View File
@@ -1,21 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from ..actions.clean import clean
from ._base import BaseCommand
from .options import dev, no_default
class Command(BaseCommand):
name = "clean"
description = "Uninstall unlisted packages from the environment."
arguments = [dev, no_default]
def run(self, options):
return clean(project=options.project, default=options.default, dev=options.dev)
if __name__ == "__main__":
Command.run_parser()
-24
View File
@@ -1,24 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from ..actions.freeze import freeze
from ._base import BaseCommand
from .options import dev, include_hashes_group, no_default, target
class Command(BaseCommand):
name = "freeze"
description = "Export project depenencies to requirements.txt."
arguments = [dev, no_default, target, include_hashes_group]
def run(self, options):
return freeze(
project=options.project, default=options.default, dev=options.dev,
include_hashes=options.include_hashes
)
if __name__ == "__main__":
Command.run_parser()
-32
View File
@@ -1,32 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import argparse
import os
from ..actions.init import init_project
from ._base import BaseCommand
from .options import new_project_group
class Command(BaseCommand):
name = "init"
description = "Create a new project."
default_arguments = []
arguments = [new_project_group]
def run(self, options):
pipfile_path = os.path.join(options.project, "Pipfile")
if os.path.exists(pipfile_path):
raise argparse.ArgumentError(
"{0!r} is already a Pipfile project".format(options.project),
)
return init_project(
root=options.project, python_version=options.python_version
)
if __name__ == "__main__":
Command.run_parser()
-22
View File
@@ -1,22 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from ..actions.install import install
from ._base import BaseCommand
from .options import dev, no_check, no_clean
class Command(BaseCommand):
name = "install"
description = "Generate Pipfile.lock to synchronize the environment."
arguments = [no_check, dev, no_clean]
def run(self, options):
return install(project=options.project, check=options.check, dev=options.dev,
clean=options.clean)
if __name__ == "__main__":
Command.run_parser()
-18
View File
@@ -1,18 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from ..actions.lock import lock
from ._base import BaseCommand
class Command(BaseCommand):
name = "lock"
description = "Generate Pipfile.lock."
def run(self, options):
return lock(project=options.project)
if __name__ == "__main__":
Command.run_parser()
-156
View File
@@ -1,156 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import
import argparse
import os
import sys
import tomlkit.exceptions
import passa.models.projects
import vistir
PYTHON_VERSION = ".".join(str(v) for v in sys.version_info[:2])
class Project(passa.models.projects.Project):
def __init__(self, root, *args, **kwargs):
root = vistir.compat.Path(root).absolute()
pipfile = root.joinpath("Pipfile")
if not pipfile.is_file():
raise argparse.ArgumentError(
"project", "{0!r} is not a Pipfile project".format(root),
)
try:
super(Project, self).__init__(root.as_posix(), *args, **kwargs)
except tomlkit.exceptions.ParseError as e:
raise argparse.ArgumentError(
"project", "failed to parse Pipfile: {0!r}".format(str(e)),
)
def __name__(self):
return "Project Root"
class Option(object):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def add_to_parser(self, parser):
parser.add_argument(*self.args, **self.kwargs)
def add_to_group(self, group):
group.add_argument(*self.args, **self.kwargs)
class ArgumentGroup(object):
def __init__(
self, name, parser=None,
is_mutually_exclusive=False,
required=None, options=None):
self.name = name
self.options = options or []
self.parser = parser
self.required = required
self.is_mutually_exclusive = is_mutually_exclusive
self.argument_group = None
def add_to_parser(self, parser):
group = None
if self.is_mutually_exclusive:
group = parser.add_mutually_exclusive_group(required=self.required)
else:
group = parser.add_argument_group()
for option in self.options:
option.add_to_group(group)
self.argument_group = group
self.parser = parser
project = Option(
"--project", metavar="project", default=os.getcwd(), type=Project,
help="path to project root (directory containing Pipfile)",
)
new_project = Option(
"--project", metavar="project", default=os.getcwd(), type=str,
help="path to project root (directory containing Pipfile)",
)
python_version = Option(
"--py-version", "--python-version", "--requires-python", metavar="python-version",
dest="python_version", default=PYTHON_VERSION, type=str,
help="required minor python version for the project"
)
packages = Option(
"packages", metavar="package", nargs="*",
help="requirement to add (can be used multiple times)",
)
editable = Option(
'-e', '--editable', dest='editables', nargs="*", default=[], metavar='path/vcs',
help="editable requirement to add (can be used multiple times)",
)
dev = Option(
"--dev", action="store_true", default=False,
help="Use [dev-packages] for install/freeze/uninstall operations",
)
no_sync = Option(
"--no-sync", dest="sync", action="store_false", default=True,
help="do not synchronize the environment",
)
target = Option(
"-t", "--target", default=None,
help="file to export into (default is to print to stdout)"
)
no_default = Option(
"--no-default", dest="default", action="store_false", default=True,
help="do not include default packages when exporting, importing, or cleaning"
)
include_hashes = Option(
"--include-hashes", dest="include_hashes", action="store_true",
help="output hashes in requirements.txt (default is to guess)",
)
no_include_hashes = Option(
"--no-include-hashes", dest="include_hashes", action="store_false",
help="do not output hashes in requirements.txt (default is to guess)",
)
no_check = Option(
"--no-check", dest="check", action="store_false", default=True,
help="do not check if Pipfile.lock is up to date, always resolve",
)
no_clean = Option(
"--no-clean", dest="clean", action="store_false", default=True,
help="do not remove packages not specified in Pipfile.lock",
)
dev_only = Option(
"--dev", dest="only", action="store_const", const="dev",
help="only try to modify [dev-packages]",
)
default_only = Option(
"--default", dest="only", action="store_const", const="default",
help="only try to modify [default]",
)
strategy = Option(
"--strategy", choices=["eager", "only-if-needed"], default="only-if-needed",
help="how dependency upgrading is handled",
)
include_hashes_group = ArgumentGroup("include_hashes", is_mutually_exclusive=True, options=[include_hashes, no_include_hashes])
dev_group = ArgumentGroup("dev", is_mutually_exclusive="True", options=[dev_only, default_only])
package_group = ArgumentGroup("packages", options=[packages, editable, dev, no_sync])
new_project_group = ArgumentGroup("new-project", options=[new_project, python_version])
-22
View File
@@ -1,22 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from ..actions.remove import remove
from ._base import BaseCommand
from .options import dev_group, no_clean, packages
class Command(BaseCommand):
name = "remove"
description = "Remove packages from project."
arguments = [dev_group, no_clean, packages]
def run(self, options):
return remove(project=options.project, only=options.only,
packages=options.packages, clean=options.clean)
if __name__ == "__main__":
Command.run_parser()
-21
View File
@@ -1,21 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from ..actions.sync import sync
from ._base import BaseCommand
from .options import dev, no_clean
class Command(BaseCommand):
name = "sync"
description = "Install Pipfile.lock into the environment."
arguments = [dev, no_clean]
def run(self, options):
return sync(project=options.project, dev=options.dev, clean=options.clean)
if __name__ == "__main__":
Command.run_parser()
-21
View File
@@ -1,21 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from ..actions.upgrade import upgrade
from ._base import BaseCommand
from .options import no_clean, no_sync, packages, strategy
class Command(BaseCommand):
name = "upgrade"
description = "Upgrade packages in project."
arguments = [packages, strategy, no_clean, no_sync]
def run(self, options):
return upgrade(project=options.project, strategy=options.strategy,
sync=options.sync, packages=options.packages)
if __name__ == "__main__":
Command.run_parser()
View File
-397
View File
@@ -1,397 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import contextlib
import io
import itertools
import distutils.log
import os
import distlib.database
import distlib.scripts
import distlib.wheel
import packaging.utils
import pip_shims
import setuptools.dist
import six
import vistir
from ..models.caches import CACHE_DIR
from ._pip_shims import VCS_SUPPORT, build_wheel as _build_wheel, unpack_url
from .utils import filter_sources
@vistir.path.ensure_mkdir_p(mode=0o775)
def _get_src_dir():
src = os.environ.get("PIP_SRC")
if src:
return src
virtual_env = os.environ.get("VIRTUAL_ENV")
if virtual_env:
return os.path.join(virtual_env, "src")
return os.path.join(os.getcwd(), "src") # Match pip's behavior.
def _prepare_wheel_building_kwargs(ireq):
download_dir = os.path.join(CACHE_DIR, "pkgs")
vistir.mkdir_p(download_dir)
wheel_download_dir = os.path.join(CACHE_DIR, "wheels")
vistir.mkdir_p(wheel_download_dir)
if ireq.source_dir is not None:
src_dir = ireq.source_dir
elif ireq.editable:
src_dir = _get_src_dir()
else:
src_dir = vistir.path.create_tracked_tempdir(prefix='passa-src')
# This logic matches pip's behavior, although I don't fully understand the
# intention. I guess the idea is to build editables in-place, otherwise out
# of the source tree?
if ireq.editable:
build_dir = src_dir
else:
build_dir = vistir.path.create_tracked_tempdir(prefix="passa-build")
return {
"build_dir": build_dir,
"src_dir": src_dir,
"download_dir": download_dir,
"wheel_download_dir": wheel_download_dir,
}
def _get_pip_index_urls(sources):
index_urls = []
trusted_hosts = []
for source in sources:
url = source.get("url")
if not url:
continue
index_urls.append(url)
if source.get("verify_ssl", True):
continue
host = six.moves.urllib.parse.urlparse(source["url"]).hostname
trusted_hosts.append(host)
return index_urls, trusted_hosts
class _PipCommand(pip_shims.Command):
name = "PipCommand"
def _get_pip_session(trusted_hosts):
cmd = _PipCommand()
options, _ = cmd.parser.parse_args([])
options.cache_dir = CACHE_DIR
options.trusted_hosts = trusted_hosts
session = cmd._build_session(options)
return session
def _get_finder(sources):
index_urls, trusted_hosts = _get_pip_index_urls(sources)
session = _get_pip_session(trusted_hosts)
finder = pip_shims.PackageFinder(
find_links=[],
index_urls=index_urls,
trusted_hosts=trusted_hosts,
allow_all_prereleases=True,
session=session,
)
return finder
def _get_wheel_cache():
format_control = pip_shims.FormatControl(set(), set())
wheel_cache = pip_shims.WheelCache(CACHE_DIR, format_control)
return wheel_cache
def _convert_hashes(values):
"""Convert Pipfile.lock hash lines into InstallRequirement option format.
The option format uses a str-list mapping. Keys are hash algorithms, and
the list contains all values of that algorithm.
"""
hashes = {}
if not values:
return hashes
for value in values:
try:
name, value = value.split(":", 1)
except ValueError:
name = "sha256"
if name not in hashes:
hashes[name] = []
hashes[name].append(value)
return hashes
class WheelBuildError(RuntimeError):
pass
def build_wheel(ireq, sources, hashes=None):
"""Build a wheel file for the InstallRequirement object.
An artifact is downloaded (or read from cache). If the artifact is not a
wheel, build one out of it. The dynamically built wheel is ephemeral; do
not depend on its existence after the returned wheel goes out of scope.
If `hashes` is truthy, it is assumed to be a list of hashes (as formatted
in Pipfile.lock) to be checked against the download.
Returns a `distlib.wheel.Wheel` instance. Raises a `WheelBuildError` (a
`RuntimeError` subclass) if the wheel cannot be built.
"""
kwargs = _prepare_wheel_building_kwargs(ireq)
finder = _get_finder(sources)
# Not for upgrade, hash not required. Hashes are not required here even
# when we provide them, because pip skips local wheel cache if we set it
# to True. Hashes are checked later if we need to download the file.
ireq.populate_link(finder, False, False)
# Ensure ireq.source_dir is set.
# This is intentionally set to build_dir, not src_dir. Comments from pip:
# [...] if filesystem packages are not marked editable in a req, a non
# deterministic error occurs when the script attempts to unpack the
# build directory.
# Also see comments in `_prepare_wheel_building_kwargs()` -- If the ireq
# is editable, build_dir is actually src_dir, making the build in-place.
ireq.ensure_has_source_dir(kwargs["build_dir"])
# Ensure the source is fetched. For wheels, it is enough to just download
# because we'll use them directly. For an sdist, we need to unpack so we
# can build it.
if not ireq.editable or not pip_shims.is_file_url(ireq.link):
if ireq.is_wheel:
only_download = True
download_dir = kwargs["wheel_download_dir"]
else:
only_download = False
download_dir = kwargs["download_dir"]
ireq.options["hashes"] = _convert_hashes(hashes)
unpack_url(
ireq.link, ireq.source_dir, download_dir,
only_download=only_download, session=finder.session,
hashes=ireq.hashes(False), progress_bar="off",
)
if ireq.is_wheel:
# If this is a wheel, use the downloaded thing.
output_dir = kwargs["wheel_download_dir"]
wheel_path = os.path.join(output_dir, ireq.link.filename)
else:
# Othereise we need to build an ephemeral wheel.
wheel_path = _build_wheel(
ireq, vistir.path.create_tracked_tempdir(prefix="ephem"),
finder, _get_wheel_cache(), kwargs,
)
if wheel_path is None or not os.path.exists(wheel_path):
raise WheelBuildError
return distlib.wheel.Wheel(wheel_path)
def _obtrain_ref(vcs_obj, src_dir, name, rev=None):
target_dir = os.path.join(src_dir, name)
target_rev = vcs_obj.make_rev_options(rev)
if not os.path.exists(target_dir):
vcs_obj.obtain(target_dir)
if (not vcs_obj.is_commit_id_equal(target_dir, rev) and
not vcs_obj.is_commit_id_equal(target_dir, target_rev)):
vcs_obj.update(target_dir, target_rev)
return vcs_obj.get_revision(target_dir)
def get_vcs_ref(requirement):
backend = VCS_SUPPORT.get_backend(requirement.vcs)
vcs = backend(url=requirement.req.vcs_uri)
src = _get_src_dir()
name = requirement.normalized_name
ref = _obtrain_ref(vcs, src, name, rev=requirement.req.ref)
return ref
def find_installation_candidates(ireq, sources):
finder = _get_finder(sources)
return finder.find_all_candidates(ireq.name)
class RequirementUninstaller(object):
"""A context manager to remove a package for the inner block.
This uses `UninstallPathSet` to control the workflow. If the inner block
exits correctly, the uninstallation is committed, otherwise rolled back.
"""
def __init__(self, ireq, auto_confirm, verbose):
self.ireq = ireq
self.pathset = None
self.auto_confirm = auto_confirm
self.verbose = verbose
def __enter__(self):
self.pathset = self.ireq.uninstall(
auto_confirm=self.auto_confirm,
verbose=self.verbose,
)
return self.pathset
def __exit__(self, exc_type, exc_value, traceback):
if self.pathset is None:
return
if exc_type is None:
self.pathset.commit()
else:
self.pathset.rollback()
def uninstall(name, **kwargs):
ireq = pip_shims.InstallRequirement.from_line(name)
return RequirementUninstaller(ireq, **kwargs)
@contextlib.contextmanager
def _suppress_distutils_logs():
"""Hack to hide noise generated by `setup.py develop`.
There isn't a good way to suppress them now, so let's monky-patch.
See https://bugs.python.org/issue25392.
"""
f = distutils.log.Log._log
def _log(log, level, msg, args):
if level >= distutils.log.ERROR:
f(log, level, msg, args)
distutils.log.Log._log = _log
yield
distutils.log.Log._log = f
class NoopInstaller(object):
"""An installer.
This class is not designed to be instantiated by itself, but used as a
common interface for subclassing.
An installer has two methods, `prepare()` and `install()`. Neither takes
arguments, and should be called in that order to prepare an installation
operation, and to actually install things.
"""
def prepare(self):
pass
def install(self):
pass
class EditableInstaller(NoopInstaller):
"""Installer to handle editable.
"""
def __init__(self, requirement):
ireq = requirement.as_ireq()
self.working_directory = ireq.setup_py_dir
self.setup_py = ireq.setup_py
def install(self):
with vistir.cd(self.working_directory), _suppress_distutils_logs():
# Access from Setuptools to ensure things are patched correctly.
setuptools.dist.distutils.core.run_setup(
self.setup_py, ["develop", "--no-deps"],
)
class WheelInstaller(NoopInstaller):
"""Installer by building a wheel.
The wheel is built during `prepare()`, and installed in `install()`.
"""
def __init__(self, requirement, sources, paths):
self.ireq = requirement.as_ireq()
self.sources = filter_sources(requirement, sources)
self.hashes = requirement.hashes or None
self.paths = paths
self.wheel = None
def prepare(self):
self.wheel = build_wheel(self.ireq, self.sources, self.hashes)
def install(self):
self.wheel.install(self.paths, distlib.scripts.ScriptMaker(None, None))
def _iter_egg_info_directories(root, name):
name = packaging.utils.canonicalize_name(name)
for parent, dirnames, filenames in os.walk(root):
matched_indexes = []
for i, dirname in enumerate(dirnames):
if not dirname.lower().endswith("egg-info"):
continue
egg_info_name = packaging.utils.canonicalize_name(dirname[:-9])
if egg_info_name != name:
continue
matched_indexes.append(i)
yield os.path.join(parent, dirname)
# Modify dirnames in-place to NOT look into egg-info directories.
# This is a documented behavior in stdlib.
for i in reversed(matched_indexes):
del dirnames[i]
def _read_pkg_info(directory):
path = os.path.join(directory, "PKG-INFO")
try:
with io.open(path, encoding="utf-8", errors="replace") as f:
return f.read()
except (IOError, OSError):
return None
def _find_egg_info(ireq):
"""Find this package's .egg-info directory.
Due to how sdists are designed, the .egg-info directory cannot be reliably
found without running setup.py to aggregate all configurations. This
function instead uses some heuristics to locate the egg-info directory
that most likely represents this package.
The best .egg-info directory's path is returned as a string. None is
returned if no matches can be found.
"""
root = ireq.setup_py_dir
directory_iterator = _iter_egg_info_directories(root, ireq.name)
try:
top_egg_info = next(directory_iterator)
except StopIteration: # No egg-info found. Wat.
return None
directory_iterator = itertools.chain([top_egg_info], directory_iterator)
# Read the sdist's PKG-INFO to determine which egg_info is best.
pkg_info = _read_pkg_info(root)
# PKG-INFO not readable. Just return whatever comes first, I guess.
if pkg_info is None:
return top_egg_info
# Walk the sdist to find the egg-info with matching PKG-INFO.
for directory in directory_iterator:
egg_pkg_info = _read_pkg_info(directory)
if egg_pkg_info == pkg_info:
return directory
# Nothing matches...? Use the first one we found, I guess.
return top_egg_info
def read_sdist_metadata(ireq):
egg_info_dir = _find_egg_info(ireq)
if not egg_info_dir:
return None
distribution = distlib.database.EggInfoDistribution(egg_info_dir)
return distribution.metadata
-61
View File
@@ -1,61 +0,0 @@
# -*- coding=utf-8 -*-
"""Shims to make the pip interface more consistent accross versions.
There are currently two members:
* VCS_SUPPORT is an instance of VcsSupport.
* build_wheel abstracts the process to build a wheel out of a bunch parameters.
* unpack_url wraps the actual function in pip to accept modern parameters.
"""
from __future__ import absolute_import, unicode_literals
import pip_shims
def _build_wheel_pre10(ireq, output_dir, finder, wheel_cache, kwargs):
kwargs.update({"wheel_cache": wheel_cache, "session": finder.session})
reqset = pip_shims.RequirementSet(**kwargs)
builder = pip_shims.WheelBuilder(reqset, finder)
return builder._build_one(ireq, output_dir)
def _build_wheel_modern(ireq, output_dir, finder, wheel_cache, kwargs):
"""Build a wheel.
* ireq: The InstallRequirement object to build
* output_dir: The directory to build the wheel in.
* finder: pip's internal Finder object to find the source out of ireq.
* kwargs: Various keyword arguments from `_prepare_wheel_building_kwargs`.
"""
kwargs.update({"progress_bar": "off", "build_isolation": False})
with pip_shims.RequirementTracker() as req_tracker:
if req_tracker:
kwargs["req_tracker"] = req_tracker
preparer = pip_shims.RequirementPreparer(**kwargs)
builder = pip_shims.WheelBuilder(finder, preparer, wheel_cache)
return builder._build_one(ireq, output_dir)
def _unpack_url_pre10(*args, **kwargs):
"""Shim for unpack_url in various pip versions.
pip before 10.0 does not accept `progress_bar` here. Simply drop it.
"""
kwargs.pop("progress_bar", None)
return pip_shims.unpack_url(*args, **kwargs)
PIP_VERSION = pip_shims.utils._parse(pip_shims.pip_version)
VERSION_10 = pip_shims.utils._parse("10")
VCS_SUPPORT = pip_shims.VcsSupport()
build_wheel = _build_wheel_modern
unpack_url = pip_shims.unpack_url
if PIP_VERSION < VERSION_10:
build_wheel = _build_wheel_pre10
unpack_url = _unpack_url_pre10
-86
View File
@@ -1,86 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import packaging.specifiers
import packaging.version
import requirementslib
from ._pip import find_installation_candidates, get_vcs_ref
def _filter_matching_python_requirement(candidates, required_python):
# TODO: This should also takes the parent's python_version and
# python_full_version markers, and only return matches with valid
# intersections. For example, if parent requires `python_version >= '3.0'`,
# this should not return entries with "Requires-Python: <3".
for c in candidates:
try:
requires_python = c.requires_python
except AttributeError:
requires_python = c.location.requires_python
if required_python and requires_python:
# Old specifications had people setting this to single digits
# which is effectively the same as '>=digit,<digit+1'
if requires_python.isdigit():
requires_python = '>={0},<{1}'.format(
requires_python, int(requires_python) + 1,
)
try:
specset = packaging.specifiers.SpecifierSet(requires_python)
except packaging.specifiers.InvalidSpecifier:
continue
if not specset.contains(required_python):
continue
yield c
def _copy_requirement(requirement):
# Markers are intentionally dropped here. They will be added to candidates
# after resolution, so we can perform marker aggregation.
new = requirement.copy()
new.markers = None
return new
def _requirement_from_metadata(name, version, extras, index):
# Markers are intentionally dropped here. They will be added to candidates
# after resolution, so we can perform marker aggregation.
r = requirementslib.Requirement.from_metadata(name, version, extras, None)
r.index = index
return r
def find_candidates(requirement, sources, requires_python, allow_prereleases):
# A non-named requirement has exactly one candidate that is itself. For
# VCS, we also lock the requirement to an exact ref.
if not requirement.is_named:
candidate = _copy_requirement(requirement)
if candidate.is_vcs:
candidate.req.ref = get_vcs_ref(candidate)
return [candidate]
ireq = requirement.as_ireq()
icans = find_installation_candidates(ireq, sources)
if requires_python:
matching_icans = list(_filter_matching_python_requirement(
icans, packaging.version.parse(requires_python),
))
icans = matching_icans or icans
versions = sorted(ireq.specifier.filter(
(c.version for c in icans), allow_prereleases,
))
if not allow_prereleases and not versions:
versions = sorted(ireq.specifier.filter(
(c.version for c in icans), True,
))
name = requirement.normalized_name
extras = requirement.extras
index = requirement.index
return [
_requirement_from_metadata(name, version, extras, index)
for version in versions
]
-273
View File
@@ -1,273 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import functools
import os
import sys
import packaging.specifiers
import packaging.utils
import packaging.version
import requests
import requirementslib
import six
from ..models.caches import DependencyCache, RequiresPythonCache
from ._pip import WheelBuildError, build_wheel, read_sdist_metadata
from .markers import contains_extra, get_contained_extras, get_without_extra
from .utils import get_pinned_version, is_pinned
DEPENDENCY_CACHE = DependencyCache()
REQUIRES_PYTHON_CACHE = RequiresPythonCache()
def _cached(f, **kwargs):
@functools.wraps(f)
def wrapped(ireq):
result = f(ireq, **kwargs)
if result is not None and is_pinned(ireq):
deps, requires_python = result
DEPENDENCY_CACHE[ireq] = deps
REQUIRES_PYTHON_CACHE[ireq] = requires_python
return result
return wrapped
def _is_cache_broken(line, parent_name):
dep_req = requirementslib.Requirement.from_line(line)
if contains_extra(dep_req.markers):
return True # The "extra =" marker breaks everything.
elif dep_req.normalized_name == parent_name:
return True # A package cannot depend on itself.
return False
def _get_dependencies_from_cache(ireq):
"""Retrieves dependencies for the requirement from the dependency cache.
"""
if os.environ.get("PASSA_IGNORE_LOCAL_CACHE"):
return
if ireq.editable:
return
try:
deps = DEPENDENCY_CACHE[ireq]
pyrq = REQUIRES_PYTHON_CACHE[ireq]
except KeyError:
return
# Preserving sanity: Run through the cache and make sure every entry if
# valid. If this fails, something is wrong with the cache. Drop it.
try:
packaging.specifiers.SpecifierSet(pyrq)
ireq_name = packaging.utils.canonicalize_name(ireq.name)
if any(_is_cache_broken(line, ireq_name) for line in deps):
broken = True
else:
broken = False
except Exception:
broken = True
if broken:
print("dropping broken cache for {0}".format(ireq.name))
del DEPENDENCY_CACHE[ireq]
del REQUIRES_PYTHON_CACHE[ireq]
return
return deps, pyrq
def _get_dependencies_from_json_url(url, session):
response = session.get(url)
response.raise_for_status()
info = response.json()["info"]
requires_python = info["requires_python"] or ""
try:
requirement_lines = info["requires_dist"]
except KeyError:
requirement_lines = info["requires"]
# The JSON API returns null both when there are not requirements, or the
# requirement list cannot be retrieved. We can't safely assume, so it's
# better to drop it and fall back to downloading the package.
try:
dependency_requirements_iterator = (
requirementslib.Requirement.from_line(line)
for line in requirement_lines
)
except TypeError:
return
dependencies = [
dep_req.as_line(include_hashes=False)
for dep_req in dependency_requirements_iterator
if not contains_extra(dep_req.markers)
]
return dependencies, requires_python
def _get_dependencies_from_json(ireq, sources):
"""Retrieves dependencies for the install requirement from the JSON API.
:param ireq: A single InstallRequirement
:type ireq: :class:`~pip._internal.req.req_install.InstallRequirement`
:return: A set of dependency lines for generating new InstallRequirements.
:rtype: set(str) or None
"""
if os.environ.get("PASSA_IGNORE_JSON_API"):
return
# It is technically possible to parse extras out of the JSON API's
# requirement format, but it is such a chore let's just use the simple API.
if ireq.extras:
return
try:
version = get_pinned_version(ireq)
except ValueError:
return
url_prefixes = [
proc_url[:-7] # Strip "/simple".
for proc_url in (
raw_url.rstrip("/")
for raw_url in (source.get("url", "") for source in sources)
)
if proc_url.endswith("/simple")
]
session = requests.session()
for prefix in url_prefixes:
url = "{prefix}/pypi/{name}/{version}/json".format(
prefix=prefix,
name=packaging.utils.canonicalize_name(ireq.name),
version=version,
)
try:
dependencies = _get_dependencies_from_json_url(url, session)
if dependencies is not None:
return dependencies
except Exception as e:
print("unable to read dependencies via {0} ({1})".format(url, e))
session.close()
return
def _read_requirements(metadata, extras):
"""Read wheel metadata to know what it depends on.
The `run_requires` attribute contains a list of dict or str specifying
requirements. For dicts, it may contain an "extra" key to specify these
requirements are for a specific extra. Unfortunately, not all fields are
specificed like this (I don't know why); some are specified with markers.
So we jump though these terrible hoops to know exactly what we need.
The extra extraction is not comprehensive. Tt assumes the marker is NEVER
something like `extra == "foo" and extra == "bar"`. I guess this never
makes sense anyway? Markers are just terrible.
"""
extras = extras or ()
requirements = []
for entry in metadata.run_requires:
if isinstance(entry, six.text_type):
entry = {"requires": [entry]}
extra = None
else:
extra = entry.get("extra")
if extra is not None and extra not in extras:
continue
for line in entry.get("requires", []):
r = requirementslib.Requirement.from_line(line)
if r.markers:
contained = get_contained_extras(r.markers)
if (contained and not any(e in contained for e in extras)):
continue
marker = get_without_extra(r.markers)
r.markers = str(marker) if marker else None
line = r.as_line(include_hashes=False)
requirements.append(line)
return requirements
def _read_requires_python(metadata):
"""Read wheel metadata to know the value of Requires-Python.
This is surprisingly poorly supported in Distlib. This function tries
several ways to get this information:
* Metadata 2.0: metadata.dictionary.get("requires_python") is not None
* Metadata 2.1: metadata._legacy.get("Requires-Python") is not None
* Metadata 1.2: metadata._legacy.get("Requires-Python") != "UNKNOWN"
"""
# TODO: Support more metadata formats.
value = metadata.dictionary.get("requires_python")
if value is not None:
return value
if metadata._legacy:
value = metadata._legacy.get("Requires-Python")
if value is not None and value != "UNKNOWN":
return value
return ""
def _get_dependencies_from_pip(ireq, sources):
"""Retrieves dependencies for the requirement from pipenv.patched.notpip internals.
The current strategy is to try the followings in order, returning the
first successful result.
1. Try to build a wheel out of the ireq, and read metadata out of it.
2. Read metadata out of the egg-info directory if it is present.
"""
extras = ireq.extras or ()
try:
wheel = build_wheel(ireq, sources)
except WheelBuildError:
# XXX: This depends on a side effect of `build_wheel`. This block is
# reached when it fails to build an sdist, where the sdist would have
# been downloaded, extracted into `ireq.source_dir`, and partially
# built (hopefully containing .egg-info).
metadata = read_sdist_metadata(ireq)
if not metadata:
raise
else:
metadata = wheel.metadata
requirements = _read_requirements(metadata, extras)
requires_python = _read_requires_python(metadata)
return requirements, requires_python
def get_dependencies(requirement, sources):
"""Get all dependencies for a given install requirement.
:param requirement: A requirement
:param sources: Pipfile-formatted sources
:type sources: list[dict]
"""
getters = [
_get_dependencies_from_cache,
_cached(_get_dependencies_from_json, sources=sources),
_cached(_get_dependencies_from_pip, sources=sources),
]
ireq = requirement.as_ireq()
last_exc = None
for getter in getters:
try:
result = getter(ireq)
except Exception as e:
last_exc = sys.exc_info()
continue
if result is not None:
deps, pyreq = result
reqs = [requirementslib.Requirement.from_line(d) for d in deps]
return reqs, pyreq
if last_exc:
six.reraise(*last_exc)
raise RuntimeError("failed to get dependencies for {}".format(
requirement.as_line(),
))
-61
View File
@@ -1,61 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import contextlib
from pip_shims import Wheel
def _wheel_supported(self, tags=None):
# Ignore current platform. Support everything.
return True
def _wheel_support_index_min(self, tags=None):
# All wheels are equal priority for sorting.
return 0
@contextlib.contextmanager
def _allow_all_wheels():
"""Monkey patch pip.Wheel to allow all wheels
The usual checks against platforms and Python versions are ignored to allow
fetching all available entries in PyPI. This also saves the candidate cache
and set a new one, or else the results from the previous non-patched calls
will interfere.
"""
original_wheel_supported = Wheel.supported
original_support_index_min = Wheel.support_index_min
Wheel.supported = _wheel_supported
Wheel.support_index_min = _wheel_support_index_min
yield
Wheel.supported = original_wheel_supported
Wheel.support_index_min = original_support_index_min
def get_hashes(cache, req):
if req.is_vcs:
return set()
ireq = req.as_ireq()
if ireq.editable:
return set()
if req.is_file_or_url:
# TODO: Get the hash of the linked artifact?
return set()
if not ireq.is_pinned:
return set()
with _allow_all_wheels():
matching_candidates = req.find_all_matches()
return {
cache.get_hash(candidate.location)
for candidate in matching_candidates
}
-101
View File
@@ -1,101 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
from packaging.markers import Marker
def _strip_extra(elements):
"""Remove the "extra == ..." operands from the list.
This is not a comprehensive implementation, but relies on an important
characteristic of metadata generation: The "extra == ..." operand is always
associated with an "and" operator. This means that we can simply remove the
operand and the "and" operator associated with it.
"""
extra_indexes = []
for i, element in enumerate(elements):
if isinstance(element, list):
cancelled = _strip_extra(element)
if cancelled:
extra_indexes.append(i)
elif isinstance(element, tuple) and element[0].value == "extra":
extra_indexes.append(i)
for i in reversed(extra_indexes):
del elements[i]
if i > 0 and elements[i - 1] == "and":
# Remove the "and" before it.
del elements[i - 1]
elif elements:
# This shouldn't ever happen, but is included for completeness.
# If there is not an "and" before this element, try to remove the
# operator after it.
del elements[0]
return (not elements)
def get_without_extra(marker):
"""Build a new marker without the `extra == ...` part.
The implementation relies very deep into packaging's internals, but I don't
have a better way now (except implementing the whole thing myself).
This could return `None` if the `extra == ...` part is the only one in the
input marker.
"""
# TODO: Why is this very deep in the internals? Why is a better solution
# implementing it yourself when someone is already maintaining a codebase
# for this? It's literally a grammar implementation that is required to
# meet the demands of a pep... -d
if not marker:
return None
marker = Marker(str(marker))
elements = marker._markers
_strip_extra(elements)
if elements:
return marker
return None
def _markers_collect_extras(markers, collection):
# Optimization: the marker element is usually appended at the end.
for el in reversed(markers):
if (isinstance(el, tuple) and
el[0].value == "extra" and
el[1].value == "=="):
collection.add(el[2].value)
elif isinstance(el, list):
_markers_collect_extras(el, collection)
def get_contained_extras(marker):
"""Collect "extra == ..." operands from a marker.
Returns a list of str. Each str is a speficied extra in this marker.
"""
if not marker:
return set()
marker = Marker(str(marker))
extras = set()
_markers_collect_extras(marker._markers, extras)
return extras
def _markers_contains_extra(markers):
# Optimization: the marker element is usually appended at the end.
for element in reversed(markers):
if isinstance(element, tuple) and element[0].value == "extra":
return True
elif isinstance(element, list):
if _markers_contains_extra(element):
return True
return False
def contains_extra(marker):
"""Check whehter a marker contains an "extra == ..." operand.
"""
if not marker:
return False
marker = Marker(str(marker))
return _markers_contains_extra(marker._markers)
-90
View File
@@ -1,90 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import resolvelib
from .traces import trace_graph
def print_title(text):
print('\n{:=^84}\n'.format(text))
def print_requirement(r, end='\n'):
print('{:>40}'.format(r.as_line(include_hashes=False)), end=end)
def print_dependency(state, key):
print_requirement(state.mapping[key], end='')
parents = sorted(
state.graph.iter_parents(key),
key=lambda n: (-1, '') if n is None else (ord(n[0].lower()), n),
)
for i, p in enumerate(parents):
if p is None:
line = '(user)'
else:
line = state.mapping[p].as_line(include_hashes=False)
if i == 0:
padding = ' <= '
else:
padding = ' ' * 44
print('{pad}{line}'.format(pad=padding, line=line))
class StdOutReporter(resolvelib.BaseReporter):
"""Simple reporter that prints things to stdout.
"""
def __init__(self, requirements):
super(StdOutReporter, self).__init__()
self.requirements = requirements
def starting(self):
self._prev = None
print_title(' User requirements ')
for r in self.requirements:
print_requirement(r)
def ending_round(self, index, state):
print_title(' Round {} '.format(index))
mapping = state.mapping
if self._prev is None:
difference = set(mapping.keys())
changed = set()
else:
difference = set(mapping.keys()) - set(self._prev.keys())
changed = set(
k for k, v in mapping.items()
if k in self._prev and self._prev[k] != v
)
self._prev = mapping
if difference:
print('New pins: ')
for k in difference:
print_dependency(state, k)
print()
if changed:
print('Changed pins:')
for k in changed:
print_dependency(state, k)
print()
def ending(self, state):
print_title(" STABLE PINS ")
path_lists = trace_graph(state.graph)
for k in sorted(state.mapping):
print(state.mapping[k].as_line(include_hashes=False))
paths = path_lists[k]
for path in paths:
if path == [None]:
print(' User requirement')
continue
print(' ', end='')
for v in reversed(path[1:]):
line = state.mapping[v].as_line(include_hashes=False)
print(' <=', line, end='')
print()
print()
-136
View File
@@ -1,136 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import itertools
import operator
from packaging.specifiers import SpecifierSet, Specifier
from vistir.misc import dedup
def _tuplize_version(version):
return tuple(int(x) for x in version.split("."))
def _format_version(version):
return ".".join(str(i) for i in version)
# Prefer [x,y) ranges.
REPLACE_RANGES = {">": ">=", "<=": "<"}
def _format_pyspec(specifier):
if isinstance(specifier, str):
if not any(op in specifier for op in Specifier._operators.keys()):
specifier = "=={0}".format(specifier)
specifier = Specifier(specifier)
if specifier.operator == "==" and specifier.version.endswith(".*"):
specifier = Specifier("=={0}".format(specifier.version[:-2]))
try:
op = REPLACE_RANGES[specifier.operator]
except KeyError:
return specifier
version = specifier.version.replace(".*", "")
curr_tuple = _tuplize_version(version)
try:
next_tuple = (curr_tuple[0], curr_tuple[1] + 1)
except IndexError:
next_tuple = (curr_tuple[0], 1)
specifier = Specifier("{0}{1}".format(op, _format_version(next_tuple)))
return specifier
def _get_specs(specset):
if isinstance(specset, Specifier):
specset = str(specset)
if isinstance(specset, str):
specset = SpecifierSet(specset.replace(".*", ""))
return [
(spec._spec[0], _tuplize_version(spec._spec[1]))
for spec in getattr(specset, "_specs", [])
]
def _group_by_op(specs):
specs = [_get_specs(x) for x in list(specs)]
flattened = [(op, version) for spec in specs for op, version in spec]
specs = sorted(flattened, key=operator.itemgetter(1))
grouping = itertools.groupby(specs, key=operator.itemgetter(0))
return grouping
def cleanup_pyspecs(specs, joiner="or"):
specs = {_format_pyspec(spec) for spec in specs}
# for != operator we want to group by version
# if all are consecutive, join as a list
results = set()
for op, versions in _group_by_op(specs):
versions = [version[1] for version in versions]
versions = sorted(dedup(versions))
# if we are doing an or operation, we need to use the min for >=
# this way OR(>=2.6, >=2.7, >=3.6) picks >=2.6
# if we do an AND operation we need to use MAX to be more selective
if op in (">", ">="):
if joiner == "or":
results.add((op, _format_version(min(versions))))
else:
results.add((op, _format_version(max(versions))))
# we use inverse logic here so we will take the max value if we are
# using OR but the min value if we are using AND
elif op in ("<=", "<"):
if joiner == "or":
results.add((op, _format_version(max(versions))))
else:
results.add((op, _format_version(min(versions))))
# leave these the same no matter what operator we use
elif op in ("!=", "==", "~="):
version_list = sorted(
"{0}".format(_format_version(version))
for version in versions
)
version = ", ".join(version_list)
if len(version_list) == 1:
results.add((op, version))
elif op == "!=":
results.add(("not in", version))
elif op == "==":
results.add(("in", version))
else:
specifier = SpecifierSet(",".join(sorted(
"{0}".format(op, v) for v in version_list
)))._specs
for s in specifier:
results &= (specifier._spec[0], specifier._spec[1])
else:
if len(version) == 1:
results.add((op, version))
else:
specifier = SpecifierSet("{0}".format(version))._specs
for s in specifier:
results |= (specifier._spec[0], specifier._spec[1])
return results
def pyspec_from_markers(marker):
if marker._markers[0][0] != 'python_version':
return
op = marker._markers[0][1].value
version = marker._markers[0][2].value
specset = set()
if op == "in":
specset.update(
Specifier("=={0}".format(v.strip()))
for v in version.split(",")
)
elif op == "not in":
specset.update(
Specifier("!={0}".format(v.strip()))
for v in version.split(",")
)
else:
specset.add(Specifier("".join([op, version])))
if specset:
return specset
return None
-40
View File
@@ -1,40 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
def _trace_visit_vertex(graph, current, target, visited, path, paths):
if current == target:
paths.append(path)
return
for v in graph.iter_children(current):
if v == current or v in visited:
continue
next_path = path + [current]
next_visited = visited | {current}
_trace_visit_vertex(graph, v, target, next_visited, next_path, paths)
def trace_graph(graph):
"""Build a collection of "traces" for each package.
A trace is a list of names that eventually leads to the package. For
example, if A and B are root dependencies, A depends on C and D, B
depends on C, and C depends on D, the return value would be like::
{
None: [],
"A": [None],
"B": [None],
"C": [[None, "A"], [None, "B"]],
"D": [[None, "B", "C"], [None, "A"]],
}
"""
result = {None: []}
for vertex in graph:
result[vertex] = []
for root in graph.iter_children(None):
paths = []
_trace_visit_vertex(graph, root, vertex, {None}, [None], paths)
result[vertex].extend(paths)
return result
-118
View File
@@ -1,118 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
def identify_requirment(r):
"""Produce an identifier for a requirement to use in the resolver.
Note that we are treating the same package with different extras as
distinct. This allows semantics like "I only want this extra in
development, not production".
This also makes the resolver's implementation much simpler, with the minor
costs of possibly needing a few extra resolution steps if we happen to have
the same package apprearing multiple times.
"""
return "{0}{1}".format(r.normalized_name, r.extras_as_pip)
def get_pinned_version(ireq):
"""Get the pinned version of an InstallRequirement.
An InstallRequirement is considered pinned if:
- Is not editable
- It has exactly one specifier
- That specifier is "=="
- The version does not contain a wildcard
Examples:
django==1.8 # pinned
django>1.8 # NOT pinned
django~=1.8 # NOT pinned
django==1.* # NOT pinned
Raises `TypeError` if the input is not a valid InstallRequirement, or
`ValueError` if the InstallRequirement is not pinned.
"""
try:
specifier = ireq.specifier
except AttributeError:
raise TypeError("Expected InstallRequirement, not {}".format(
type(ireq).__name__,
))
if ireq.editable:
raise ValueError("InstallRequirement is editable")
if not specifier:
raise ValueError("InstallRequirement has no version specification")
if len(specifier._specs) != 1:
raise ValueError("InstallRequirement has multiple specifications")
op, version = next(iter(specifier._specs))._spec
if op not in ('==', '===') or version.endswith('.*'):
raise ValueError("InstallRequirement not pinned (is {0!r})".format(
op + version,
))
return version
def is_pinned(ireq):
"""Returns whether an InstallRequirement is a "pinned" requirement.
An InstallRequirement is considered pinned if:
- Is not editable
- It has exactly one specifier
- That specifier is "=="
- The version does not contain a wildcard
Examples:
django==1.8 # pinned
django>1.8 # NOT pinned
django~=1.8 # NOT pinned
django==1.* # NOT pinned
"""
try:
get_pinned_version(ireq)
except (TypeError, ValueError):
return False
return True
def filter_sources(requirement, sources):
"""Returns a filtered list of sources for this requirement.
This considers the index specified by the requirement, and returns only
matching source entries if there is at least one.
"""
if not sources or not requirement.index:
return sources
filtered_sources = [
source for source in sources
if source.get("name") == requirement.index
]
return filtered_sources or sources
def get_allow_prereleases(requirement, global_setting):
# TODO: Implement per-package prereleases flag. (pypa/pipenv#1696)
return global_setting
def are_requirements_equal(this, that):
return (
this.as_line(include_hashes=False) ==
that.as_line(include_hashes=False)
)
def strip_extras(requirement):
"""Returns a new requirement object with extras removed.
"""
line = requirement.as_line()
new = type(requirement).from_line(line)
new.extras = None
return new
View File
-214
View File
@@ -1,214 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import copy
import hashlib
import json
import os
import sys
import appdirs
import pip_shims
import requests
import vistir
from ..internals._pip_shims import VCS_SUPPORT
from ..internals.utils import get_pinned_version
CACHE_DIR = os.environ.get("PASSA_CACHE_DIR", appdirs.user_cache_dir("passa"))
class HashCache(pip_shims.SafeFileCache):
"""Caches hashes of PyPI artifacts so we do not need to re-download them.
Hashes are only cached when the URL appears to contain a hash in it and the
cache key includes the hash value returned from the server). This ought to
avoid ssues where the location on the server changes.
"""
def __init__(self, *args, **kwargs):
session = kwargs.pop('session', requests.session())
self.session = session
kwargs.setdefault('directory', os.path.join(CACHE_DIR, 'hash-cache'))
super(HashCache, self).__init__(*args, **kwargs)
def get_hash(self, location):
# If there is no location hash (i.e., md5, sha256, etc.), we don't want
# to store it.
hash_value = None
orig_scheme = location.scheme
new_location = copy.deepcopy(location)
if orig_scheme in VCS_SUPPORT.all_schemes:
new_location.url = new_location.url.split("+", 1)[-1]
can_hash = new_location.hash
if can_hash:
# hash url WITH fragment
hash_value = self.get(new_location.url)
if not hash_value:
hash_value = self._get_file_hash(new_location)
hash_value = hash_value.encode('utf8')
if can_hash:
self.set(new_location.url, hash_value)
return hash_value.decode('utf8')
def _get_file_hash(self, location):
h = hashlib.new(pip_shims.FAVORITE_HASH)
with vistir.open_file(location, self.session) as fp:
for chunk in iter(lambda: fp.read(8096), b""):
h.update(chunk)
return ":".join([h.name, h.hexdigest()])
# pip-tools's dependency cache implementation.
class CorruptCacheError(Exception):
def __init__(self, path):
self.path = path
def __str__(self):
lines = [
'The dependency cache seems to have been corrupted.',
'Inspect, or delete, the following file:',
' {}'.format(self.path),
]
return os.linesep.join(lines)
def _key_from_req(req):
"""Get an all-lowercase version of the requirement's name."""
if hasattr(req, 'key'):
# from pkg_resources, such as installed dists for pip-sync
key = req.key
else:
# from packaging, such as install requirements from requirements.txt
key = req.name
key = key.replace('_', '-').lower()
return key
def _read_cache_file(cache_file_path):
with open(cache_file_path, 'r') as cache_file:
try:
doc = json.load(cache_file)
except ValueError:
raise CorruptCacheError(cache_file_path)
# Check version and load the contents
assert doc['__format__'] == 1, 'Unknown cache file format'
return doc['dependencies']
class _JSONCache(object):
"""A persistent cache backed by a JSON file.
The cache file is written to the appropriate user cache dir for the
current platform, i.e.
~/.cache/pip-tools/depcache-pyX.Y.json
Where X.Y indicates the Python version.
"""
filename_format = None
def __init__(self, cache_dir=CACHE_DIR):
vistir.mkdir_p(cache_dir)
python_version = ".".join(str(digit) for digit in sys.version_info[:2])
cache_filename = self.filename_format.format(
python_version=python_version,
)
self._cache_file = os.path.join(cache_dir, cache_filename)
self._cache = None
@property
def cache(self):
"""The dictionary that is the actual in-memory cache.
This property lazily loads the cache from disk.
"""
if self._cache is None:
self.read_cache()
return self._cache
def as_cache_key(self, ireq):
"""Given a requirement, return its cache key.
This behavior is a little weird in order to allow backwards
compatibility with cache files. For a requirement without extras, this
will return, for example::
("ipython", "2.1.0")
For a requirement with extras, the extras will be comma-separated and
appended to the version, inside brackets, like so::
("ipython", "2.1.0[nbconvert,notebook]")
"""
extras = tuple(sorted(ireq.extras))
if not extras:
extras_string = ""
else:
extras_string = "[{}]".format(",".join(extras))
name = _key_from_req(ireq.req)
version = get_pinned_version(ireq)
return name, "{}{}".format(version, extras_string)
def read_cache(self):
"""Reads the cached contents into memory.
"""
if os.path.exists(self._cache_file):
self._cache = _read_cache_file(self._cache_file)
else:
self._cache = {}
def write_cache(self):
"""Writes the cache to disk as JSON.
"""
doc = {
'__format__': 1,
'dependencies': self._cache,
}
with open(self._cache_file, 'w') as f:
json.dump(doc, f, sort_keys=True)
def clear(self):
self._cache = {}
self.write_cache()
def __contains__(self, ireq):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
return pkgversion_and_extras in self.cache.get(pkgname, {})
def __getitem__(self, ireq):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
return self.cache[pkgname][pkgversion_and_extras]
def __setitem__(self, ireq, values):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
self.cache.setdefault(pkgname, {})
self.cache[pkgname][pkgversion_and_extras] = values
self.write_cache()
def __delitem__(self, ireq):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
try:
del self.cache[pkgname][pkgversion_and_extras]
except KeyError:
return
self.write_cache()
def get(self, ireq, default=None):
pkgname, pkgversion_and_extras = self.as_cache_key(ireq)
return self.cache.get(pkgname, {}).get(pkgversion_and_extras, default)
class DependencyCache(_JSONCache):
"""Cache the dependency of cancidates.
"""
filename_format = "depcache-py{python_version}.json"
class RequiresPythonCache(_JSONCache):
"""Cache a candidate's Requires-Python information.
"""
filename_format = "pyreqcache-py{python_version}.json"
-214
View File
@@ -1,214 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import itertools
import resolvelib
import plette
import requirementslib
import vistir
from ..internals.hashes import get_hashes
from ..internals.reporters import StdOutReporter
from ..internals.traces import trace_graph
from ..internals.utils import identify_requirment
from .caches import HashCache
from .metadata import set_metadata
from .providers import BasicProvider, EagerUpgradeProvider, PinReuseProvider
def _get_requirements(model, section_name):
"""Produce a mapping of identifier: requirement from the section.
"""
if not model:
return {}
return {identify_requirment(r): r for r in (
requirementslib.Requirement.from_pipfile(name, package._data)
for name, package in model.get(section_name, {}).items()
)}
def _get_requires_python(pipfile):
try:
requires = pipfile.requires
except AttributeError:
return ""
try:
return requires.python_full_version
except AttributeError:
pass
try:
return requires.python_version
except AttributeError:
return ""
def _collect_derived_entries(state, traces, identifiers):
"""Produce a mapping containing all candidates derived from `identifiers`.
`identifiers` should provide a collection of requirement identifications
from a section (i.e. `packages` or `dev-packages`). This function uses
`trace` to filter out candidates in the state that are present because of
an entry in that collection.
"""
identifiers = set(identifiers)
if not identifiers:
return {}
entries = {}
extras = {}
for identifier, requirement in state.mapping.items():
routes = {trace[1] for trace in traces[identifier] if len(trace) > 1}
if identifier not in identifiers and not (identifiers & routes):
continue
name = requirement.normalized_name
if requirement.extras:
# Aggregate extras from multiple routes so we can produce their
# union in the lock file. (sarugaku/passa#24)
try:
extras[name].extend(requirement.extras)
except KeyError:
extras[name] = list(requirement.extras)
entries[name] = next(iter(requirement.as_pipfile().values()))
for name, ext in extras.items():
entries[name]["extras"] = ext
return entries
class AbstractLocker(object):
"""Helper class to produce a new lock file for a project.
This is not intended for instantiation. You should use one of its concrete
subclasses instead. The class contains logic to:
* Prepare a project for locking
* Perform the actually resolver invocation
* Convert resolver output into lock file format
* Update the project to have the new lock file
"""
def __init__(self, project):
self.project = project
self.default_requirements = _get_requirements(
project.pipfile, "packages",
)
self.develop_requirements = _get_requirements(
project.pipfile, "dev-packages",
)
# This comprehension dance ensures we merge packages from both
# sections, and definitions in the default section win.
self.requirements = {k: r for k, r in itertools.chain(
self.develop_requirements.items(),
self.default_requirements.items(),
)}.values()
self.sources = [s._data.copy() for s in project.pipfile.sources]
self.allow_prereleases = bool(
project.pipfile.get("pipenv", {}).get("allow_prereleases", False),
)
self.requires_python = _get_requires_python(project.pipfile)
def __repr__(self):
return "<{0} @ {1!r}>".format(type(self).__name__, self.project.root)
def get_provider(self):
raise NotImplementedError
def get_reporter(self):
# TODO: Build SpinnerReporter, and use this only in verbose mode.
return StdOutReporter(self.requirements)
def lock(self):
"""Lock specified (abstract) requirements into (concrete) candidates.
The locking procedure consists of four stages:
* Resolve versions and dependency graph (powered by ResolveLib).
* Walk the graph to determine "why" each candidate came to be, i.e.
what top-level requirements result in a given candidate.
* Populate hashes for resolved candidates.
* Populate markers based on dependency specifications of each
candidate, and the dependency graph.
"""
provider = self.get_provider()
reporter = self.get_reporter()
resolver = resolvelib.Resolver(provider, reporter)
with vistir.cd(self.project.root):
state = resolver.resolve(self.requirements)
traces = trace_graph(state.graph)
hash_cache = HashCache()
for r in state.mapping.values():
if not r.hashes:
r.hashes = get_hashes(hash_cache, r)
set_metadata(
state.mapping, traces,
provider.fetched_dependencies,
provider.collected_requires_pythons,
)
lockfile = plette.Lockfile.with_meta_from(self.project.pipfile)
lockfile["default"] = _collect_derived_entries(
state, traces, self.default_requirements,
)
lockfile["develop"] = _collect_derived_entries(
state, traces, self.develop_requirements,
)
self.project.lockfile = lockfile
class BasicLocker(AbstractLocker):
"""Basic concrete locker.
This takes a project, generates a lock file from its Pipfile, and sets
the lock file property to the project.
"""
def get_provider(self):
return BasicProvider(
self.requirements, self.sources,
self.requires_python, self.allow_prereleases,
)
class PinReuseLocker(AbstractLocker):
"""A specialized locker to handle re-locking based on existing pins.
See :class:`.providers.PinReuseProvider` for more information.
"""
def __init__(self, project):
super(PinReuseLocker, self).__init__(project)
pins = _get_requirements(project.lockfile, "develop")
pins.update(_get_requirements(project.lockfile, "default"))
for pin in pins.values():
pin.markers = None
self.preferred_pins = pins
def get_provider(self):
return PinReuseProvider(
self.preferred_pins, self.requirements, self.sources,
self.requires_python, self.allow_prereleases,
)
class EagerUpgradeLocker(PinReuseLocker):
"""A specialized locker to handle the "eager" upgrade strategy.
See :class:`.providers.EagerUpgradeProvider` for more
information.
"""
def __init__(self, tracked_names, *args, **kwargs):
super(EagerUpgradeLocker, self).__init__(*args, **kwargs)
self.tracked_names = tracked_names
def get_provider(self):
return EagerUpgradeProvider(
self.tracked_names, self.preferred_pins,
self.requirements, self.sources,
self.requires_python, self.allow_prereleases,
)
-169
View File
@@ -1,169 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import copy
import itertools
import packaging.markers
import packaging.specifiers
import vistir
import vistir.misc
from ..internals.markers import get_without_extra
from ..internals.specifiers import cleanup_pyspecs, pyspec_from_markers
def dedup_markers(s):
# TODO: Implement better logic.
deduped = sorted(vistir.misc.dedup(s))
return deduped
class MetaSet(object):
"""Representation of a "metadata set".
This holds multiple metadata representaions. Each metadata representation
includes a marker, and a specifier set of Python versions required.
"""
def __init__(self):
self.markerset = frozenset()
self.pyspecset = packaging.specifiers.SpecifierSet()
def __repr__(self):
return "MetaSet(markerset={0!r}, pyspecset={1!r})".format(
",".join(sorted(self.markerset)), str(self.pyspecset),
)
def __str__(self):
pyspecs = set()
markerset = set()
for m in self.markerset:
marker_specs = pyspec_from_markers(packaging.markers.Marker(m))
if marker_specs:
pyspecs.add(marker_specs)
else:
markerset.add(m)
if pyspecs:
self.pyspecset._specs &= pyspecs
self.markerset = frozenset(markerset)
return " and ".join(dedup_markers(itertools.chain(
# Make sure to always use the same quotes so we can dedup properly.
(
"{0}".format(ms) if " or " in ms else ms
for ms in (str(m).replace('"', "'") for m in self.markerset)
),
(
"python_version {0[0]} '{0[1]}'".format(spec)
for spec in cleanup_pyspecs(self.pyspecset)
),
)))
def __bool__(self):
return bool(self.markerset or self.pyspecset)
def __nonzero__(self): # Python 2.
return self.__bool__()
def __or__(self, pair):
marker, specset = pair
markerset = set(self.markerset)
if marker:
marker_specs = pyspec_from_markers(marker)
if not marker_specs:
markerset.add(str(marker))
else:
specset._specs &= marker_specs
metaset = MetaSet()
metaset.markerset = frozenset(markerset)
# TODO: Implement some logic to clean up dups like '3.0.*' and '3.0'.
metaset.pyspecset &= self.pyspecset & specset
return metaset
def _build_metasets(dependencies, pythons, key, trace, all_metasets):
all_parent_metasets = []
for route in trace:
parent = route[-1]
try:
parent_metasets = all_metasets[parent]
except KeyError: # Parent not calculated yet. Wait for it.
return
all_parent_metasets.append((parent, parent_metasets))
metaset_iters = []
for parent, parent_metasets in all_parent_metasets:
r = dependencies[parent][key]
python = pythons[key]
metaset = (
get_without_extra(r.markers),
packaging.specifiers.SpecifierSet(python),
)
metaset_iters.append(
parent_metaset | metaset
for parent_metaset in parent_metasets
)
return list(itertools.chain.from_iterable(metaset_iters))
def _calculate_metasets_mapping(dependencies, pythons, traces):
all_metasets = {None: [MetaSet()]}
del traces[None]
while traces:
new_metasets = {}
for key, trace in traces.items():
assert key not in all_metasets, key # Sanity check for debug.
metasets = _build_metasets(
dependencies, pythons, key, trace, all_metasets,
)
if metasets is None:
continue
new_metasets[key] = metasets
if not new_metasets:
break # No progress? Deadlocked. Give up.
all_metasets.update(new_metasets)
for key in new_metasets:
del traces[key]
return all_metasets
def _format_metasets(metasets):
# If there is an unconditional route, this needs to be unconditional.
if not metasets or not all(metasets):
return None
# This extra str(Marker()) call helps simplify the expression.
return str(packaging.markers.Marker(" or ".join(
"{0}".format(s) if " and " in s else s
for s in dedup_markers(str(metaset) for metaset in metasets
if metaset)
)))
def set_metadata(candidates, traces, dependencies, pythons):
"""Add "metadata" to candidates based on the dependency tree.
Metadata for a candidate includes markers and a specifier for Python
version requirements.
:param candidates: A key-candidate mapping. Candidates in the mapping will
have their markers set.
:param traces: A graph trace (produced by `traces.trace_graph`) providing
information about dependency relationships between candidates.
:param dependencies: A key-collection mapping containing what dependencies
each candidate in `candidates` requested.
:param pythons: A key-str mapping containing Requires-Python information
of each candidate.
Keys in mappings and entries in the trace are identifiers of a package, as
implemented by the `identify` method of the resolver's provider.
The candidates are modified in-place.
"""
metasets_mapping = _calculate_metasets_mapping(
dependencies, pythons, copy.deepcopy(traces),
)
for key, candidate in candidates.items():
candidate.markers = _format_metasets(metasets_mapping[key])
-241
View File
@@ -1,241 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import collections
import io
import os
from pipenv.vendor import attr
import packaging.markers
import packaging.utils
import plette
import plette.models
import six
import tomlkit
SectionDifference = collections.namedtuple("SectionDifference", [
"inthis", "inthat",
])
FileDifference = collections.namedtuple("FileDifference", [
"default", "develop",
])
def _are_pipfile_entries_equal(a, b):
a = {k: v for k, v in a.items() if k not in ("markers", "hashes", "hash")}
b = {k: v for k, v in b.items() if k not in ("markers", "hashes", "hash")}
if a != b:
return False
try:
marker_eval_a = packaging.markers.Marker(a["markers"]).evaluate()
except (AttributeError, KeyError, TypeError, ValueError):
marker_eval_a = True
try:
marker_eval_b = packaging.markers.Marker(b["markers"]).evaluate()
except (AttributeError, KeyError, TypeError, ValueError):
marker_eval_b = True
return marker_eval_a == marker_eval_b
DEFAULT_NEWLINES = "\n"
def preferred_newlines(f):
if isinstance(f.newlines, six.text_type):
return f.newlines
return DEFAULT_NEWLINES
@attr.s
class ProjectFile(object):
"""A file in the Pipfile project.
"""
location = attr.ib()
line_ending = attr.ib()
model = attr.ib()
@classmethod
def read(cls, location, model_cls, invalid_ok=False):
try:
with io.open(location, encoding="utf-8") as f:
model = model_cls.load(f)
line_ending = preferred_newlines(f)
except Exception:
if not invalid_ok:
raise
model = None
line_ending = DEFAULT_NEWLINES
return cls(location=location, line_ending=line_ending, model=model)
def write(self):
kwargs = {"encoding": "utf-8", "newline": self.line_ending}
with io.open(self.location, "w", **kwargs) as f:
self.model.dump(f)
def dumps(self):
strio = six.StringIO()
self.model.dump(strio)
return strio.getvalue()
@attr.s
class Project(object):
root = attr.ib()
_p = attr.ib(init=False)
_l = attr.ib(init=False)
def __attrs_post_init__(self):
self.root = root = os.path.abspath(self.root)
self._p = ProjectFile.read(
os.path.join(root, "Pipfile"),
plette.Pipfile,
)
self._l = ProjectFile.read(
os.path.join(root, "Pipfile.lock"),
plette.Lockfile,
invalid_ok=True,
)
@property
def pipfile(self):
return self._p.model
@property
def pipfile_location(self):
return self._p.location
@property
def lockfile(self):
return self._l.model
@property
def lockfile_location(self):
return self._l.location
@lockfile.setter
def lockfile(self, new):
self._l.model = new
def is_synced(self):
return self.lockfile and self.lockfile.is_up_to_date(self.pipfile)
def _get_pipfile_section(self, develop, insert=True):
name = "dev-packages" if develop else "packages"
try:
section = self.pipfile[name]
except KeyError:
section = plette.models.PackageCollection(tomlkit.table())
if insert:
self.pipfile[name] = section
return section
def contains_key_in_pipfile(self, key):
sections = [
self._get_pipfile_section(develop=False, insert=False),
self._get_pipfile_section(develop=True, insert=False),
]
return any(
(packaging.utils.canonicalize_name(name) ==
packaging.utils.canonicalize_name(key))
for section in sections
for name in section
)
def add_line_to_pipfile(self, line, develop):
from requirementslib import Requirement
requirement = Requirement.from_line(line)
section = self._get_pipfile_section(develop=develop)
key = requirement.normalized_name
entry = next(iter(requirement.as_pipfile().values()))
if isinstance(entry, dict):
# HACK: TOMLKit prefers to expand tables by default, but we
# always want inline tables here. Also tomlkit.inline_table
# does not have `update()`.
table = tomlkit.inline_table()
for k, v in entry.items():
table[k] = v
entry = table
section[key] = entry
def remove_keys_from_pipfile(self, keys, default, develop):
keys = {packaging.utils.canonicalize_name(key) for key in keys}
sections = []
if default:
sections.append(self._get_pipfile_section(
develop=False, insert=False,
))
if develop:
sections.append(self._get_pipfile_section(
develop=True, insert=False,
))
for section in sections:
removals = set()
for name in section:
if packaging.utils.canonicalize_name(name) in keys:
removals.add(name)
for key in removals:
del section._data[key]
def remove_keys_from_lockfile(self, keys):
keys = {packaging.utils.canonicalize_name(key) for key in keys}
removed = False
for section_name in ("default", "develop"):
try:
section = self.lockfile[section_name]
except KeyError:
continue
removals = set()
for name in section:
if packaging.utils.canonicalize_name(name) in keys:
removals.add(name)
removed = removed or bool(removals)
for key in removals:
del section._data[key]
if removed:
# HACK: The lock file no longer represents the Pipfile at this
# point. Set the hash to an arbitrary invalid value.
self.lockfile.meta.hash = plette.models.Hash({"__invalid__": ""})
def difference_lockfile(self, lockfile):
"""Generate a difference between the current and given lockfiles.
Returns a 2-tuple containing differences in default in develop
sections.
Each element is a 2-tuple of dicts. The first, `inthis`, contains
entries only present in the current lockfile; the second, `inthat`,
contains entries only present in the given one.
If a key exists in both this and that, but the values differ, the key
is present in both dicts, pointing to values from each file.
"""
diff_data = {
"default": SectionDifference({}, {}),
"develop": SectionDifference({}, {}),
}
for section_name, section_diff in diff_data.items():
try:
this = self.lockfile[section_name]._data
except (KeyError, TypeError):
this = {}
try:
that = lockfile[section_name]._data
except (KeyError, TypeError):
that = {}
for key, this_value in this.items():
try:
that_value = that[key]
except KeyError:
section_diff.inthis[key] = this_value
continue
if not _are_pipfile_entries_equal(this_value, that_value):
section_diff.inthis[key] = this_value
section_diff.inthat[key] = that_value
for key, that_value in that.items():
if key not in this:
section_diff.inthat[key] = that_value
return FileDifference(**diff_data)
-198
View File
@@ -1,198 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import os
import resolvelib
from ..internals.candidates import find_candidates
from ..internals.dependencies import get_dependencies
from ..internals.utils import (
filter_sources, get_allow_prereleases, identify_requirment, strip_extras,
)
PROTECTED_PACKAGE_NAMES = {"pip", "setuptools"}
class BasicProvider(resolvelib.AbstractProvider):
"""Provider implementation to interface with `requirementslib.Requirement`.
"""
def __init__(self, root_requirements, sources,
requires_python, allow_prereleases):
self.sources = sources
self.requires_python = requires_python
self.allow_prereleases = bool(allow_prereleases)
self.invalid_candidates = set()
# Remember requirements of each pinned candidate. The resolver calls
# `get_dependencies()` only when it wants to repin, so the last time
# the dependencies we got when it is last called on a package, are
# the set used by the resolver. We use this later to trace how a given
# dependency is specified by a package.
self.fetched_dependencies = {None: {
self.identify(r): r for r in root_requirements
}}
# Should Pipfile's requires.python_[full_]version be included?
self.collected_requires_pythons = {None: ""}
def identify(self, dependency):
return identify_requirment(dependency)
def get_preference(self, resolution, candidates, information):
# TODO: Provide better sorting logic. This simply resolve the ones with
# less choices first. Not sophisticated, but sounds reasonable?
return len(candidates)
def find_matches(self, requirement):
sources = filter_sources(requirement, self.sources)
candidates = find_candidates(
requirement, sources, self.requires_python,
get_allow_prereleases(requirement, self.allow_prereleases),
)
return candidates
def is_satisfied_by(self, requirement, candidate):
# A non-named requirement has exactly one candidate, as implemented in
# `find_matches()`. Since pip does not yet implement URL based lookup
# (PEP 508) yet, it must match unless there are duplicated entries in
# Pipfile. If there is, the user takes the blame. (sarugaku/passa#34)
if not requirement.is_named:
return True
# A non-named candidate can only come from a non-named requirement,
# which, since pip does not implement URL based lookup (PEP 508) yet,
# can only come from Pipfile. Assume the user knows what they're doing,
# and use it without checking. (sarugaku/passa#34)
if not candidate.is_named:
return True
# Optimization: Everything matches if there are no specifiers.
if not requirement.specifiers:
return True
# We can't handle old version strings before PEP 440. Drop them all.
# Practically this shouldn't be a problem if the user is specifying a
# remotely reasonable dependency not from before 2013.
candidate_line = candidate.as_line(include_hashes=False)
if candidate_line in self.invalid_candidates:
return False
try:
version = candidate.get_specifier().version
except (TypeError, ValueError):
print('ignoring invalid version from {!r}'.format(candidate_line))
self.invalid_candidates.add(candidate_line)
return False
return requirement.as_ireq().specifier.contains(version)
def get_dependencies(self, candidate):
sources = filter_sources(candidate, self.sources)
try:
dependencies, requires_python = get_dependencies(
candidate, sources=sources,
)
except Exception as e:
if os.environ.get("PASSA_NO_SUPPRESS_EXCEPTIONS"):
raise
print("failed to get dependencies for {0!r}: {1}".format(
candidate.as_line(include_hashes=False), e,
))
dependencies = []
requires_python = ""
# Exclude protected packages from the list. This prevents those
# packages from being locked, unless the user is actually working on
# them, and explicitly lists them as top-level requirements -- those
# packages are not added via this code path. (sarugaku/passa#15)
dependencies = [
dependency for dependency in dependencies
if dependency.normalized_name not in PROTECTED_PACKAGE_NAMES
]
if candidate.extras:
# HACK: If this candidate has extras, add the original candidate
# (same pinned version, no extras) as its dependency. This ensures
# the same package with different extras (treated as distinct by
# the resolver) have the same version. (sarugaku/passa#4)
dependencies.append(strip_extras(candidate))
candidate_key = self.identify(candidate)
self.fetched_dependencies[candidate_key] = {
self.identify(r): r for r in dependencies
}
self.collected_requires_pythons[candidate_key] = requires_python
return dependencies
class PinReuseProvider(BasicProvider):
"""A provider that reuses preferred pins if possible.
This is used to implement "add", "remove", and "only-if-needed upgrade",
where already-pinned candidates in Pipfile.lock should be preferred.
"""
def __init__(self, preferred_pins, *args, **kwargs):
super(PinReuseProvider, self).__init__(*args, **kwargs)
self.preferred_pins = preferred_pins
def find_matches(self, requirement):
candidates = super(PinReuseProvider, self).find_matches(requirement)
try:
# Add the preferred pin. Remember the resolve prefer candidates
# at the end of the list, so the most preferred should be last.
candidates.append(self.preferred_pins[self.identify(requirement)])
except KeyError:
pass
return candidates
class EagerUpgradeProvider(PinReuseProvider):
"""A specialized provider to handle an "eager" upgrade strategy.
An eager upgrade tries to upgrade not only packages specified, but also
their dependencies (recursively). This contrasts to the "only-if-needed"
default, which only promises to upgrade the specified package, and
prevents touching anything else if at all possible.
The provider is implemented as to keep track of all dependencies of the
specified packages to upgrade, and free their pins when it has a chance.
"""
def __init__(self, tracked_names, *args, **kwargs):
super(EagerUpgradeProvider, self).__init__(*args, **kwargs)
self.tracked_names = set(tracked_names)
for name in tracked_names:
self.preferred_pins.pop(name, None)
# HACK: Set this special flag to distinguish preferred pins from
# regular, to tell the resolver to NOT use them for tracked packages.
for pin in self.preferred_pins.values():
pin._preferred_by_provider = True
def is_satisfied_by(self, requirement, candidate):
# If this is a tracking package, tell the resolver out of using the
# preferred pin, and into a "normal" candidate selection process.
if (self.identify(requirement) in self.tracked_names and
getattr(candidate, "_preferred_by_provider", False)):
return False
return super(EagerUpgradeProvider, self).is_satisfied_by(
requirement, candidate,
)
def get_dependencies(self, candidate):
# If this package is being tracked for upgrade, remove pins of its
# dependencies, and start tracking these new packages.
dependencies = super(EagerUpgradeProvider, self).get_dependencies(
candidate,
)
if self.identify(candidate) in self.tracked_names:
for dependency in dependencies:
name = self.identify(dependency)
self.tracked_names.add(name)
self.preferred_pins.pop(name, None)
return dependencies
def get_preference(self, resolution, candidates, information):
# Resolve tracking packages so we have a chance to unpin them first.
name = self.identify(candidates[0])
if name in self.tracked_names:
return -1
return len(candidates)
-214
View File
@@ -1,214 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, unicode_literals
import collections
import contextlib
import os
import sys
import sysconfig
import pkg_resources
import packaging.markers
import packaging.version
import requirementslib
from ..internals._pip import uninstall, EditableInstaller, WheelInstaller
def _is_installation_local(name):
"""Check whether the distribution is in the current Python installation.
This is used to distinguish packages seen by a virtual environment. A venv
may be able to see global packages, but we don't want to mess with them.
"""
loc = os.path.normcase(pkg_resources.working_set.by_key[name].location)
pre = os.path.normcase(sys.prefix)
return os.path.commonprefix([loc, pre]) == pre
def _is_up_to_date(distro, version):
# This is done in strings to avoid type mismatches caused by vendering.
return str(version) == str(packaging.version.parse(distro.version))
GroupCollection = collections.namedtuple("GroupCollection", [
"uptodate", "outdated", "noremove", "unneeded",
])
def _group_installed_names(packages):
"""Group locally installed packages based on given specifications.
`packages` is a name-package mapping that are used as baseline to
determine how the installed package should be grouped.
Returns a 3-tuple of disjoint sets, all containing names of installed
packages:
* `uptodate`: These match the specifications.
* `outdated`: These installations are specified, but don't match the
specifications in `packages`.
* `unneeded`: These are installed, but not specified in `packages`.
"""
groupcoll = GroupCollection(set(), set(), set(), set())
for distro in pkg_resources.working_set:
name = distro.key
try:
package = packages[name]
except KeyError:
groupcoll.unneeded.add(name)
continue
r = requirementslib.Requirement.from_pipfile(name, package)
if not r.is_named:
# Always mark non-named. I think pip does something similar?
groupcoll.outdated.add(name)
elif not _is_up_to_date(distro, r.get_version()):
groupcoll.outdated.add(name)
else:
groupcoll.uptodate.add(name)
return groupcoll
@contextlib.contextmanager
def _remove_package(name):
if name is None or not _is_installation_local(name):
yield None
return
with uninstall(name, auto_confirm=True, verbose=False) as uninstaller:
yield uninstaller
def _get_packages(lockfile, default, develop):
# Don't need to worry about duplicates because only extras can differ.
# Extras don't matter because they only affect dependencies, and we
# don't install dependencies anyway!
packages = {}
if default:
packages.update(lockfile.default._data)
if develop:
packages.update(lockfile.develop._data)
return packages
def _build_paths():
"""Prepare paths for distlib.wheel.Wheel to install into.
"""
paths = sysconfig.get_paths()
return {
"prefix": sys.prefix,
"data": paths["data"],
"scripts": paths["scripts"],
"headers": paths["include"],
"purelib": paths["purelib"],
"platlib": paths["platlib"],
}
PROTECTED_FROM_CLEAN = {"setuptools", "pip", "wheel"}
def _clean(names):
cleaned = set()
for name in names:
if name in PROTECTED_FROM_CLEAN:
continue
with _remove_package(name) as uninst:
if uninst:
cleaned.add(name)
return cleaned
class Synchronizer(object):
"""Helper class to install packages from a project's lock file.
"""
def __init__(self, project, default, develop, clean_unneeded):
self._root = project.root # Only for repr.
self.packages = _get_packages(project.lockfile, default, develop)
self.sources = project.lockfile.meta.sources._data
self.paths = _build_paths()
self.clean_unneeded = clean_unneeded
def __repr__(self):
return "<{0} @ {1!r}>".format(type(self).__name__, self._root)
def sync(self):
groupcoll = _group_installed_names(self.packages)
installed = set()
updated = set()
cleaned = set()
# TODO: Show a prompt to confirm cleaning. We will need to implement a
# reporter pattern for this as well.
if self.clean_unneeded:
names = _clean(groupcoll.unneeded)
cleaned.update(names)
# TODO: Specify installation order? (pypa/pipenv#2274)
installers = []
for name, package in self.packages.items():
r = requirementslib.Requirement.from_pipfile(name, package)
name = r.normalized_name
if name in groupcoll.uptodate:
continue
markers = r.markers
if markers and not packaging.markers.Marker(markers).evaluate():
continue
r.markers = None
if r.editable:
installer = EditableInstaller(r)
else:
installer = WheelInstaller(r, self.sources, self.paths)
try:
installer.prepare()
except Exception as e:
if os.environ.get("PASSA_NO_SUPPRESS_EXCEPTIONS"):
raise
print("failed to prepare {0!r}: {1}".format(
r.as_line(include_hashes=False), e,
))
else:
installers.append((name, installer))
for name, installer in installers:
if name in groupcoll.outdated:
name_to_remove = name
else:
name_to_remove = None
try:
with _remove_package(name_to_remove):
installer.install()
except Exception as e:
if os.environ.get("PASSA_NO_SUPPRESS_EXCEPTIONS"):
raise
print("failed to install {0!r}: {1}".format(
r.as_line(include_hashes=False), e,
))
continue
if name in groupcoll.outdated or name in groupcoll.noremove:
updated.add(name)
else:
installed.add(name)
return installed, updated, cleaned
class Cleaner(object):
"""Helper class to clean packages not in a project's lock file.
"""
def __init__(self, project, default, develop):
self._root = project.root # Only for repr.
self.packages = _get_packages(project.lockfile, default, develop)
def __repr__(self):
return "<{0} @ {1!r}>".format(type(self).__name__, self._root)
def clean(self):
groupcoll = _group_installed_names(self.packages)
cleaned = _clean(groupcoll.unneeded)
return cleaned
View File
-28
View File
@@ -1,28 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
from resolvelib import NoVersionsAvailable, ResolutionImpossible
from passa.internals.reporters import print_requirement
def lock(locker):
success = False
try:
locker.lock()
except NoVersionsAvailable as e:
print("\nCANNOT RESOLVE. NO CANDIDATES FOUND FOR:")
print("{:>40}".format(e.requirement.as_line(include_hashes=False)))
if e.parent:
line = e.parent.as_line(include_hashes=False)
print("{:>41}".format("(from {})".format(line)))
else:
print("{:>41}".format("(user)"))
except ResolutionImpossible as e:
print("\nCANNOT RESOLVE.\nOFFENDING REQUIREMENTS:")
for r in e.requirements:
print_requirement(r)
else:
success = True
return success
-23
View File
@@ -1,23 +0,0 @@
# -*- coding=utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
def sync(syncer):
print("Starting synchronization")
installed, updated, cleaned = syncer.sync()
if cleaned:
print("Uninstalled: {}".format(", ".join(sorted(cleaned))))
if installed:
print("Installed: {}".format(", ".join(sorted(installed))))
if updated:
print("Updated: {}".format(", ".join(sorted(updated))))
return True
def clean(cleaner):
print("Cleaning")
cleaned = cleaner.clean()
if cleaned:
print("Uninstalled: {}".format(", ".join(sorted(cleaned))))
return True
+1 -2
View File
@@ -6,9 +6,8 @@ from pathlib import Path
import invoke
from . import release, vendoring
from .vendoring import vendor_passa
ROOT = Path(".").parent.parent.absolute()
ns = invoke.Collection(vendoring, release, release.clean_mdchangelog, vendor_passa.vendor_passa)
ns = invoke.Collection(vendoring, release, release.clean_mdchangelog)
-20
View File
@@ -1,20 +0,0 @@
import invoke
from pipenv._compat import TemporaryDirectory
from . import _get_vendor_dir, log
@invoke.task
def vendor_passa(ctx):
with TemporaryDirectory(prefix='passa') as passa_dir:
vendor_dir = _get_vendor_dir(ctx).absolute().as_posix()
ctx.run("git clone https://github.com/sarugaku/passa.git {0}".format(passa_dir.name))
with ctx.cd("{0}".format(passa_dir.name)):
# ctx.run("git checkout 0.3.0")
ctx.run("pip install plette[validation] requirementslib distlib pip-shims -q --exists-action=i")
log("Packing Passa")
ctx.run("invoke pack")
log("Moving pack to vendor dir!")
ctx.run("mv pack/passa.zip {0}".format(vendor_dir))
log("Successfully vendored passa!")
+3
View File
@@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta:__legacy__"