Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
This commit is contained in:
2018-03-14 17:22:40 -04:00
parent 2fa5b84378
commit 1103043bc3
6 changed files with 1053 additions and 1076 deletions
+3 -1
View File
@@ -12,11 +12,13 @@ click = "*"
pytest-pypy = {path = "./tests/pytest-pypi", editable = true}
pytest-tap = "*"
stdeb = {version="*", sys_platform="== 'linux'"}
black = {git = "https://github.com/ambv/black.git", python_version = "=='3.6'"}
black = {git = "https://github.com/ambv/black.git", editable = true}
[packages]
[scripts]
tests = "bash ./run-tests.sh"
Generated
+3 -3
View File
@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "6b77ca7e54fbb6febca620c0fb96c0b6ba17aed618c1eaa7e7cba8604a1b3f84"
"sha256": "014daa4b632edc332ac864fe8220872044c779155e75f3c0ef7e3a832b49001f"
},
"pipfile-spec": 6,
"requires": {},
@@ -44,8 +44,8 @@
"version": "==2.5.3"
},
"black": {
"git": "https://github.com/ambv/black.git",
"python_version": "=='3.6'"
"editable": true,
"git": "https://github.com/ambv/black.git"
},
"certifi": {
"hashes": [
+569 -641
View File
File diff suppressed because it is too large Load Diff
+211 -186
View File
@@ -1,186 +1,211 @@
# -*- coding: utf-8 -*-
"""
clint.textui.progress
~~~~~~~~~~~~~~~~~
This module provides the progressbar functionality.
"""
from __future__ import absolute_import
import os
import sys
import time
import crayons
STREAM = sys.stderr
MILL_TEMPLATE = '%s %s %i/%i\r'
DOTS_CHAR = '.'
if os.name != 'nt':
BAR_FILLED_CHAR = str(crayons.green('', bold=True))
BAR_EMPTY_CHAR = str(crayons.black(''))
else:
BAR_FILLED_CHAR = '='
BAR_EMPTY_CHAR = '-'
if (sys.version_info[0] >= 3) and (os.name != 'nt'):
BAR_TEMPLATE = u' %s%s%s %i/%i{0}\r'.format(crayons.black('%s'))
else:
if os.name == 'nt':
BAR_TEMPLATE = ' %s%s%s %i/%i - %s\r'
else:
BAR_TEMPLATE = ' %s%s%s %i/%i%s\r'
MILL_CHARS = ['|', '/', '-', '\\']
# How long to wait before recalculating the ETA
ETA_INTERVAL = 1
# How many intervals (excluding the current one) to calculate the simple moving
# average
ETA_SMA_WINDOW = 9
class Bar(object):
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.done()
return False # we're not suppressing exceptions
def __init__(self, label='', width=32, hide=None, empty_char=BAR_EMPTY_CHAR,
filled_char=BAR_FILLED_CHAR, expected_size=None, every=1):
self.label = label
self.width = width
self.hide = hide
# Only show bar in terminals by default (better for piping, logging etc.)
if hide is None:
try:
self.hide = not STREAM.isatty()
except AttributeError: # output does not support isatty()
self.hide = True
self.empty_char = empty_char
self.filled_char = filled_char
self.expected_size = expected_size
self.every = every
self.start = time.time()
self.ittimes = []
self.eta = 0
self.etadelta = time.time()
self.etadisp = self.format_time(self.eta)
self.last_progress = 0
if (self.expected_size):
self.show(0)
def show(self, progress, count=None):
if count is not None:
self.expected_size = count
if self.expected_size is None:
raise Exception("expected_size not initialized")
self.last_progress = progress
if (time.time() - self.etadelta) > ETA_INTERVAL:
self.etadelta = time.time()
self.ittimes = \
self.ittimes[-ETA_SMA_WINDOW:] + \
[-(self.start - time.time()) / (progress+1)]
self.eta = \
sum(self.ittimes) / float(len(self.ittimes)) * \
(self.expected_size - progress)
self.etadisp = self.format_time(self.eta)
x = int(self.width * progress / self.expected_size)
if not self.hide:
if ((progress % self.every) == 0 or # True every "every" updates
(progress == self.expected_size)): # And when we're done
STREAM.write(BAR_TEMPLATE % (
self.label, self.filled_char * x,
self.empty_char * (self.width - x), progress,
self.expected_size, self.etadisp))
STREAM.flush()
def done(self):
self.elapsed = time.time() - self.start
elapsed_disp = self.format_time(self.elapsed)
if not self.hide:
# Print completed bar with elapsed time
STREAM.write(BAR_TEMPLATE % (
self.label, self.filled_char * self.width,
self.empty_char * 0, self.last_progress,
self.expected_size, elapsed_disp))
STREAM.write('\n')
STREAM.flush()
def format_time(self, seconds):
return time.strftime('%H:%M:%S', time.gmtime(seconds))
def bar(it, label='', width=32, hide=None, empty_char=BAR_EMPTY_CHAR,
filled_char=BAR_FILLED_CHAR, expected_size=None, every=1):
"""Progress iterator. Wrap your iterables with it."""
count = len(it) if expected_size is None else expected_size
with Bar(label=label, width=width, hide=hide, empty_char=BAR_EMPTY_CHAR,
filled_char=BAR_FILLED_CHAR, expected_size=count, every=every) \
as bar:
for i, item in enumerate(it):
yield item
bar.show(i + 1)
def dots(it, label='', hide=None, every=1):
"""Progress iterator. Prints a dot for each item being iterated"""
count = 0
if not hide:
STREAM.write(label)
for i, item in enumerate(it):
if not hide:
if i % every == 0: # True every "every" updates
STREAM.write(DOTS_CHAR)
sys.stderr.flush()
count += 1
yield item
STREAM.write('\n')
STREAM.flush()
def mill(it, label='', hide=None, expected_size=None, every=1):
"""Progress iterator. Prints a mill while iterating over the items."""
def _mill_char(_i):
if _i >= count:
return ' '
else:
return MILL_CHARS[(_i // every) % len(MILL_CHARS)]
def _show(_i):
if not hide:
if ((_i % every) == 0 or # True every "every" updates
(_i == count)): # And when we're done
STREAM.write(MILL_TEMPLATE % (
label, _mill_char(_i), _i, count))
STREAM.flush()
count = len(it) if expected_size is None else expected_size
if count:
_show(0)
for i, item in enumerate(it):
yield item
_show(i + 1)
if not hide:
STREAM.write('\n')
STREAM.flush()
# -*- coding: utf-8 -*-
"""
clint.textui.progress
~~~~~~~~~~~~~~~~~
This module provides the progressbar functionality.
"""
from __future__ import absolute_import
import os
import sys
import time
import crayons
STREAM = sys.stderr
MILL_TEMPLATE = '%s %s %i/%i\r'
DOTS_CHAR = '.'
if os.name != 'nt':
BAR_FILLED_CHAR = str(crayons.green('', bold=True))
BAR_EMPTY_CHAR = str(crayons.black(''))
else:
BAR_FILLED_CHAR = '='
BAR_EMPTY_CHAR = '-'
if (sys.version_info[0] >= 3) and (os.name != 'nt'):
BAR_TEMPLATE = u' %s%s%s %i/%i{0}\r'.format(crayons.black('%s'))
else:
if os.name == 'nt':
BAR_TEMPLATE = ' %s%s%s %i/%i - %s\r'
else:
BAR_TEMPLATE = ' %s%s%s %i/%i%s\r'
MILL_CHARS = ['|', '/', '-', '\\']
# How long to wait before recalculating the ETA
ETA_INTERVAL = 1
# How many intervals (excluding the current one) to calculate the simple moving
# average
ETA_SMA_WINDOW = 9
class Bar(object):
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.done()
return False # we're not suppressing exceptions
def __init__(
self,
label='',
width=32,
hide=None,
empty_char=BAR_EMPTY_CHAR,
filled_char=BAR_FILLED_CHAR,
expected_size=None,
every=1,
):
self.label = label
self.width = width
self.hide = hide
# Only show bar in terminals by default (better for piping, logging etc.)
if hide is None:
try:
self.hide = not STREAM.isatty()
except AttributeError: # output does not support isatty()
self.hide = True
self.empty_char = empty_char
self.filled_char = filled_char
self.expected_size = expected_size
self.every = every
self.start = time.time()
self.ittimes = []
self.eta = 0
self.etadelta = time.time()
self.etadisp = self.format_time(self.eta)
self.last_progress = 0
if (self.expected_size):
self.show(0)
def show(self, progress, count=None):
if count is not None:
self.expected_size = count
if self.expected_size is None:
raise Exception("expected_size not initialized")
self.last_progress = progress
if (time.time() - self.etadelta) > ETA_INTERVAL:
self.etadelta = time.time()
self.ittimes = self.ittimes[-ETA_SMA_WINDOW:] + [
- (self.start - time.time()) / (progress + 1)
]
self.eta = sum(self.ittimes) / float(len(self.ittimes)) * (
self.expected_size - progress
)
self.etadisp = self.format_time(self.eta)
x = int(self.width * progress / self.expected_size)
if not self.hide:
if (
(progress % self.every) == 0 or # True every "every" updates
(progress == self.expected_size)
): # And when we're done
STREAM.write(
BAR_TEMPLATE %
(
self.label,
self.filled_char * x,
self.empty_char * (self.width - x),
progress,
self.expected_size,
self.etadisp,
)
)
STREAM.flush()
def done(self):
self.elapsed = time.time() - self.start
elapsed_disp = self.format_time(self.elapsed)
if not self.hide:
# Print completed bar with elapsed time
STREAM.write(
BAR_TEMPLATE %
(
self.label,
self.filled_char * self.width,
self.empty_char * 0,
self.last_progress,
self.expected_size,
elapsed_disp,
)
)
STREAM.write('\n')
STREAM.flush()
def format_time(self, seconds):
return time.strftime('%H:%M:%S', time.gmtime(seconds))
def bar(
it,
label='',
width=32,
hide=None,
empty_char=BAR_EMPTY_CHAR,
filled_char=BAR_FILLED_CHAR,
expected_size=None,
every=1,
):
"""Progress iterator. Wrap your iterables with it."""
count = len(it) if expected_size is None else expected_size
with Bar(
label=label,
width=width,
hide=hide,
empty_char=BAR_EMPTY_CHAR,
filled_char=BAR_FILLED_CHAR,
expected_size=count,
every=every,
) as bar:
for i, item in enumerate(it):
yield item
bar.show(i + 1)
def dots(it, label='', hide=None, every=1):
"""Progress iterator. Prints a dot for each item being iterated"""
count = 0
if not hide:
STREAM.write(label)
for i, item in enumerate(it):
if not hide:
if i % every == 0: # True every "every" updates
STREAM.write(DOTS_CHAR)
sys.stderr.flush()
count += 1
yield item
STREAM.write('\n')
STREAM.flush()
def mill(it, label='', hide=None, expected_size=None, every=1):
"""Progress iterator. Prints a mill while iterating over the items."""
def _mill_char(_i):
if _i >= count:
return ' '
else:
return MILL_CHARS[(_i // every) % len(MILL_CHARS)]
def _show(_i):
if not hide:
if (
(_i % every) == 0 or (_i == count) # True every "every" updates
): # And when we're done
STREAM.write(MILL_TEMPLATE % (label, _mill_char(_i), _i, count))
STREAM.flush()
count = len(it) if expected_size is None else expected_size
if count:
_show(0)
for i, item in enumerate(it):
yield item
_show(i + 1)
if not hide:
STREAM.write('\n')
STREAM.flush()
+76 -80
View File
@@ -14,10 +14,20 @@ import toml
from pip9 import ConfigOptionParser
from .utils import (
mkdir_p, convert_deps_from_pip, pep423_name, recase_file,
find_requirements, is_file, is_vcs, python_version, cleanup_toml,
is_installable_file, is_valid_url, normalize_drive, python_version,
escape_grouped_arguments
mkdir_p,
convert_deps_from_pip,
pep423_name,
recase_file,
find_requirements,
is_file,
is_vcs,
python_version,
cleanup_toml,
is_installable_file,
is_valid_url,
normalize_drive,
python_version,
escape_grouped_arguments,
)
from .environments import (
PIPENV_MAX_DEPTH,
@@ -26,12 +36,13 @@ from .environments import (
PIPENV_VIRTUALENV,
PIPENV_NO_INHERIT,
PIPENV_TEST_INDEX,
PIPENV_PYTHON
PIPENV_PYTHON,
)
if PIPENV_PIPFILE:
if not os.path.isfile(PIPENV_PIPFILE):
raise RuntimeError('Given PIPENV_PIPFILE is not found!')
else:
PIPENV_PIPFILE = normalize_drive(os.path.abspath(PIPENV_PIPFILE))
@@ -50,7 +61,6 @@ class Project(object):
self._original_dir = os.path.abspath(os.curdir)
self.which = which
self.python_version = python_version
# Hack to skip this during pipenv run, or -r.
if ('run' not in sys.argv) and chdir:
try:
@@ -75,13 +85,22 @@ class Project(object):
# When a vcs url is gven without editable it only appears as a key
# Eliminate any vcs, path, or url entries which are not editable
# Since pip-tools can't do deep resolution on them, even setuptools-installable ones
if (is_vcs(v) or is_vcs(k) or (is_installable_file(k) or is_installable_file(v)) or
any((prefix in v and
(os.path.isfile(v[prefix]) or is_valid_url(v[prefix])))
for prefix in ['path', 'file'])):
if (
is_vcs(v) or
is_vcs(k) or
(is_installable_file(k) or is_installable_file(v)) or
any(
(
prefix in v and
(os.path.isfile(v[prefix]) or is_valid_url(v[prefix]))
)
for prefix in ['path', 'file']
)
):
# If they are editable, do resolve them
if 'editable' not in v:
continue
else:
ps.update({k: v})
else:
@@ -90,13 +109,15 @@ class Project(object):
# Since these entries have no attributes we know they are not editable
# So we can safely exclude things that need to be editable in order to be resolved
# First exclude anything that is a vcs entry either in the key or value
if not (any(is_vcs(i) for i in [k, v]) or
# Then exclude any installable files that are not directories
# Because pip-tools can resolve setup.py for example
any(is_installable_file(i) for i in [k, v]) or
# Then exclude any URLs because they need to be editable also
# Things that are excluded can only be 'shallow resolved'
any(is_valid_url(i) for i in [k, v])):
if not (
any(is_vcs(i) for i in [k, v]) or
# Then exclude any installable files that are not directories
# Because pip-tools can resolve setup.py for example
any(is_installable_file(i) for i in [k, v]) or
# Then exclude any URLs because they need to be editable also
# Things that are excluded can only be 'shallow resolved'
any(is_valid_url(i) for i in [k, v])
):
ps.update({k: v})
return ps
@@ -113,7 +134,9 @@ class Project(object):
@property
def required_python_version(self):
if self.pipfile_exists:
required = self.parsed_pipfile.get('requires', {}).get('python_full_version')
required = self.parsed_pipfile.get('requires', {}).get(
'python_full_version'
)
if not required:
required = self.parsed_pipfile.get('requires', {}).get('python_version')
if required != "*":
@@ -123,6 +146,7 @@ class Project(object):
def project_directory(self):
if self.pipfile_location is not None:
return os.path.abspath(os.path.join(self.pipfile_location, os.pardir))
else:
return None
@@ -157,22 +181,19 @@ class Project(object):
# http://www.tldp.org/LDP/abs/html/special-chars.html#FIELDREF
# https://github.com/torvalds/linux/blob/2bfe01ef/include/uapi/linux/binfmts.h#L18
sanitized = re.sub(r'[ $`!*@"\\\r\n\t]', '_', self.name)[0:42]
# Hash the full path of the pipfile
hash = hashlib.sha256(self.pipfile_location.encode()).digest()[:6]
encoded_hash = base64.urlsafe_b64encode(hash).decode()
# If the pipfile was located at '/home/user/MY_PROJECT/Pipfile',
# the name of its virtualenv will be 'my-project-wyUfYPqE'
if PIPENV_PYTHON:
return sanitized + '-' + encoded_hash + '-' + PIPENV_PYTHON
else:
return sanitized + '-' + encoded_hash
@property
def virtualenv_location(self):
# if VIRTUAL_ENV is set, use that.
if PIPENV_VIRTUALENV:
return PIPENV_VIRTUALENV
@@ -183,12 +204,15 @@ class Project(object):
# The user wants the virtualenv in the project.
if not PIPENV_VENV_IN_PROJECT:
c = delegator.run('{0} -m pipenv.pew dir "{1}"'.format(escape_grouped_arguments(sys.executable), self.virtualenv_name))
c = delegator.run(
'{0} -m pipenv.pew dir "{1}"'.format(
escape_grouped_arguments(sys.executable), self.virtualenv_name
)
)
loc = c.out.strip()
# Default mode.
else:
loc = os.sep.join(self.pipfile_location.split(os.sep)[:-1] + ['.venv'])
self._virtualenv_location = loc
return loc
@@ -203,10 +227,8 @@ class Project(object):
if self._download_location is None:
loc = os.sep.join([self.virtualenv_location, 'downloads'])
self._download_location = loc
# Create the directory, if it doesn't exist.
mkdir_p(self._download_location)
return self._download_location
@property
@@ -214,10 +236,8 @@ class Project(object):
if self._proper_names_location is None:
loc = os.sep.join([self.virtualenv_location, 'pipenv-proper-names.txt'])
self._proper_names_location = loc
# Create the database, if it doesn't exist.
open(self._proper_names_location, 'a').close()
return self._proper_names_location
@property
@@ -241,7 +261,6 @@ class Project(object):
except RuntimeError:
loc = None
self._pipfile_location = normalize_drive(loc)
return self._pipfile_location
@property
@@ -252,7 +271,6 @@ class Project(object):
except RuntimeError:
loc = None
self._requirements_location = loc
return self._requirements_location
@property
@@ -260,25 +278,21 @@ class Project(object):
# Open the pipfile, read it into memory.
with open(self.pipfile_location) as f:
contents = f.read()
# If any outline tables are present...
if ('[packages.' in contents) or ('[dev-packages.' in contents):
data = toml.loads(contents)
# Convert all outline tables to inline tables.
for section in ('packages', 'dev-packages'):
for package in data.get(section, {}):
# Convert things to inline tables — fancy :)
if hasattr(data[section][package], 'keys'):
_data = data[section][package]
data[section][package] = toml._get_empty_inline_table(dict)
data[section][package].update(_data)
# We lose comments here, but it's for the best.)
try:
return contoml.loads(toml.dumps(data, preserve=True))
except RuntimeError:
return toml.loads(toml.dumps(data, preserve=True))
@@ -286,6 +300,7 @@ class Project(object):
# Fallback to toml parser, for large files.
try:
return contoml.loads(contents)
except Exception:
return toml.loads(contents)
@@ -295,12 +310,10 @@ class Project(object):
pfile = self.parsed_pipfile
for section in ('packages', 'dev-packages'):
p_section = pfile.get(section, {})
for key in list(p_section.keys()):
# Normalize key name to PEP 423.
norm_key = pep423_name(key)
p_section[norm_key] = p_section.pop(key)
return pfile
@property
@@ -313,23 +326,18 @@ class Project(object):
scripts = self.parsed_pipfile.get('scripts', {})
for (k, v) in scripts.items():
scripts[k] = shlex.split(v, posix=True)
return scripts
def update_settings(self, d):
settings = self.settings
changed = False
for new in d:
if new not in settings:
settings[new] = d[new]
changed = True
if changed:
p = self.parsed_pipfile
p['pipenv'] = settings
# Write the changes to disk.
self.write_toml(p)
@@ -338,14 +346,11 @@ class Project(object):
"""Pipfile.lock divided by PyPI and external dependencies."""
pfile = pipfile.load(self.pipfile_location)
lockfile = json.loads(pfile.lock())
for section in ('default', 'develop'):
lock_section = lockfile.get(section, {})
for key in list(lock_section.keys()):
norm_key = pep423_name(key)
lockfile[section][norm_key] = lock_section.pop(key)
return lockfile
@property
@@ -417,56 +422,55 @@ class Project(object):
config_parser = ConfigOptionParser(name=self.name)
install = dict(config_parser.get_config_section('install'))
indexes = install.get('extra-index-url', '').lstrip('\n').split('\n')
if PIPENV_TEST_INDEX:
sources = [{u'url': PIPENV_TEST_INDEX, u'verify_ssl': True, u'name': u'custom'}]
sources = [
{u'url': PIPENV_TEST_INDEX, u'verify_ssl': True, u'name': u'custom'}
]
else:
# Default source.
pypi_source = {u'url': u'https://pypi.python.org/simple', u'verify_ssl': True, u'name': 'pypi'}
pypi_source = {
u'url': u'https://pypi.python.org/simple',
u'verify_ssl': True,
u'name': 'pypi',
}
sources = [pypi_source]
for i, index in enumerate(indexes):
if not index:
continue
source_name = 'pip_index_{}'.format(i)
verify_ssl = index.startswith('https')
sources.append({u'url': index, u'verify_ssl': verify_ssl, u'name': source_name})
sources.append(
{u'url': index, u'verify_ssl': verify_ssl, u'name': source_name}
)
data = {
u'source': sources,
# Default packages.
u'packages': {},
u'dev-packages': {},
}
# Default requires.
required_python = python or self.which('python', self.virtualenv_location)
data[u'requires'] = {'python_version': python_version(required_python)[:len('2.7')]}
data[u'requires'] = {
'python_version': python_version(required_python)[: len('2.7')]
}
self.write_toml(data, 'Pipfile')
def write_toml(self, data, path=None):
"""Writes the given data structure out as TOML."""
if path is None:
path = self.pipfile_location
try:
formatted_data = contoml.dumps(data).rstrip()
except Exception:
for section in ('packages', 'dev-packages'):
for package in data[section]:
# Convert things to inline tables — fancy :)
if hasattr(data[section][package], 'keys'):
_data = data[section][package]
data[section][package] = toml._get_empty_inline_table(dict)
data[section][package].update(_data)
formatted_data = toml.dumps(data).rstrip()
formatted_data = cleanup_toml(formatted_data)
with open(path, 'w') as f:
f.write(formatted_data)
@@ -478,16 +482,25 @@ class Project(object):
sources_ = meta_.get('sources')
if sources_:
return sources_
if 'source' in self.parsed_pipfile:
return self.parsed_pipfile['source']
else:
return [{u'url': u'https://pypi.python.org/simple', u'verify_ssl': True, 'name': 'pypi'}]
return [
{
u'url': u'https://pypi.python.org/simple',
u'verify_ssl': True,
'name': 'pypi',
}
]
def get_source(self, name=None, url=None):
for source in self.sources:
if name:
if source.get('name') == name:
return source
elif url:
if source.get('url') in url:
return source
@@ -496,66 +509,49 @@ class Project(object):
"""Deletes the lockfile."""
try:
return os.remove(self.lockfile_location)
except OSError:
pass
def remove_package_from_pipfile(self, package_name, dev=False):
# Read and append Pipfile.
p = self._pipfile
package_name = pep423_name(package_name)
key = 'dev-packages' if dev else 'packages'
if key in p and package_name in p[key]:
del p[key][package_name]
# Write Pipfile.
self.write_toml(recase_file(p))
def add_package_to_pipfile(self, package_name, dev=False):
# Read and append Pipfile.
p = self._pipfile
# Don't re-capitalize file URLs or VCSs.
converted = convert_deps_from_pip(package_name)
converted = converted[[k for k in converted.keys()][0]]
if not (is_file(package_name) or is_vcs(converted) or 'path' in converted):
package_name = pep423_name(package_name)
key = 'dev-packages' if dev else 'packages'
# Set empty group if it doesn't exist yet.
if key not in p:
p[key] = {}
package = convert_deps_from_pip(package_name)
package_name = [k for k in package.keys()][0]
# Add the package to the group.
p[key][package_name] = package[package_name]
# Write Pipfile.
self.write_toml(p)
def add_index_to_pipfile(self, index):
"""Adds a given index to the Pipfile."""
# Read and append Pipfile.
p = self._pipfile
source = {'url': index, 'verify_ssl': True}
# Add the package to the group.
if 'source' not in p:
p['source'] = [source]
else:
p['source'].append(source)
# Write Pipfile.
self.write_toml(p)
+191 -165
View File
@@ -15,6 +15,7 @@ import requests
import six
import stat
import warnings
try:
from weakref import finalize
except ImportError:
@@ -22,11 +23,9 @@ except ImportError:
from backports.weakref import finalize
except ImportError:
pass
from time import time
logging.basicConfig(level=logging.ERROR)
try:
from urllib.parse import urlparse
except ImportError:
@@ -38,8 +37,6 @@ except ImportError:
from pathlib2 import Path
except ImportError:
pass
from distutils.spawn import find_executable
from contextlib import contextmanager
from pipenv.patched.piptools.resolver import Resolver
@@ -56,21 +53,22 @@ from .pep508checker import lookup
from .environments import SESSION_IS_INTERACTIVE, PIPENV_MAX_ROUNDS, PIPENV_CACHE_DIR
if six.PY2:
class ResourceWarning(Warning):
pass
specifiers = [k for k in lookup.keys()]
specifiers = [k for k in lookup.keys()]
# List of version control systems we support.
VCS_LIST = ('git', 'svn', 'hg', 'bzr')
SCHEME_LIST = ('http://', 'https://', 'ftp://', 'file://')
requests = requests.Session()
def get_requirement(dep):
from pip9.req.req_install import _strip_extras
import requirements
"""Pre-clean requirement strings passed to the requirements parser.
Ensures that we can accept both local and relative paths, file and VCS URIs,
@@ -148,22 +146,21 @@ def get_requirement(dep):
req.markers = markers
if extras:
# Bizarrely this is also what pip does...
req.extras = [r for r in requirements.parse('fakepkg{0}'.format(extras))][0].extras
req.extras = [r for r in requirements.parse('fakepkg{0}'.format(extras))][
0
].extras
return req
def cleanup_toml(tml):
toml = tml.split('\n')
new_toml = []
# Remove all empty lines from TOML.
for line in toml:
if line.strip():
new_toml.append(line)
toml = '\n'.join(new_toml)
new_toml = []
# Add newlines between TOML sections.
for i, line in enumerate(toml.split('\n')):
after = False
@@ -173,12 +170,10 @@ def cleanup_toml(tml):
# Insert a newline before the heading.
new_toml.append('\n')
after = True
new_toml.append(line)
# Insert a newline after the heading.
if after:
new_toml.append('')
# adding new line at the end of the TOML file
new_toml.append('')
toml = '\n'.join(new_toml)
@@ -193,6 +188,7 @@ def python_version(path_to_python):
c = delegator.run([path_to_python, '--version'], block=False)
except Exception:
return None
output = c.out.strip() or c.err.strip()
@parse.with_pattern(r'.*')
@@ -216,10 +212,10 @@ def escape_grouped_arguments(s):
"""
if s is None:
return None
# Additional escaping for windows paths
if os.name == 'nt':
s = "{}".format(s.replace("\\", "\\\\"))
return '"' + s.replace("'", "'\\''") + '"'
@@ -230,6 +226,7 @@ def clean_pkg_version(version):
class HackedPythonVersion(object):
"""A Beautiful hack, which allows us to tell pip which version of Python we're using."""
def __init__(self, python_version, python_path):
self.python_version = python_version
self.python_path = python_path
@@ -246,28 +243,29 @@ class HackedPythonVersion(object):
def prepare_pip_source_args(sources, pip_args=None):
if pip_args is None:
pip_args = []
if sources:
# Add the source to pip9.
pip_args.extend(['-i', sources[0]['url']])
# Trust the host if it's not verified.
if not sources[0].get('verify_ssl', True):
pip_args.extend(['--trusted-host', urlparse(sources[0]['url']).netloc.split(':')[0]])
pip_args.extend(
['--trusted-host', urlparse(sources[0]['url']).netloc.split(':')[0]]
)
# Add additional sources as extra indexes.
if len(sources) > 1:
for source in sources[1:]:
pip_args.extend(['--extra-index-url', source['url']])
# Trust the host if it's not verified.
if not source.get('verify_ssl', True):
pip_args.extend(['--trusted-host', urlparse(source['url']).netloc.split(':')[0]])
pip_args.extend(
['--trusted-host', urlparse(source['url']).netloc.split(':')[0]]
)
return pip_args
def actually_resolve_reps(deps, index_lookup, markers_lookup, project, sources, verbose, clear, pre):
def actually_resolve_reps(
deps, index_lookup, markers_lookup, project, sources, verbose, clear, pre
):
from pip9 import basecommand, req
from pip9._vendor import requests as pip_requests
@@ -276,52 +274,51 @@ def actually_resolve_reps(deps, index_lookup, markers_lookup, project, sources,
name = 'PipCommand'
constraints = []
req_dir = tempfile.mkdtemp(prefix='pipenv-', suffix='-requirements')
for dep in deps:
if dep:
if dep.startswith('-e '):
constraint = req.InstallRequirement.from_editable(dep[len('-e '):])
else:
fd, t = tempfile.mkstemp(prefix='pipenv-', suffix='-requirement.txt', dir=req_dir)
fd, t = tempfile.mkstemp(
prefix='pipenv-', suffix='-requirement.txt', dir=req_dir
)
with os.fdopen(fd, 'w') as f:
f.write(dep)
constraint = [c for c in req.parse_requirements(t, session=pip_requests)][0]
# extra_constraints = []
constraint = [
c for c in req.parse_requirements(t, session=pip_requests)
][
0
]
# extra_constraints = []
if ' -i ' in dep:
index_lookup[constraint.name] = project.get_source(url=dep.split(' -i ')[1]).get('name')
index_lookup[constraint.name] = project.get_source(
url=dep.split(' -i ')[1]
).get(
'name'
)
if constraint.markers:
markers_lookup[constraint.name] = str(constraint.markers).replace('"', "'")
markers_lookup[constraint.name] = str(constraint.markers).replace(
'"', "'"
)
constraints.append(constraint)
rmtree(req_dir)
pip_command = get_pip_command()
pip_args = []
if sources:
pip_args = prepare_pip_source_args(sources, pip_args)
if verbose:
print('Using pip: {0}'.format(' '.join(pip_args)))
pip_options, _ = pip_command.parse_args(pip_args)
session = pip_command._build_session(pip_options)
pypi = PyPIRepository(pip_options=pip_options, use_json=False, session=session)
if verbose:
logging.log.verbose = True
piptools_logging.log.verbose = True
resolved_tree = set()
resolver = Resolver(constraints=constraints, repository=pypi, clear_caches=clear, prereleases=pre)
resolver = Resolver(
constraints=constraints, repository=pypi, clear_caches=clear, prereleases=pre
)
# pre-resolve instead of iterating to avoid asking pypi for hashes of editable packages
try:
resolved_tree.update(resolver.resolve(max_rounds=PIPENV_MAX_ROUNDS))
@@ -332,32 +329,36 @@ def actually_resolve_reps(deps, index_lookup, markers_lookup, project, sources,
''.format(
crayons.red('Warning', bold=True),
crayons.red('$ pipenv install --skip-lock'),
crayons.red('$ pipenv graph')
crayons.red('$ pipenv graph'),
),
err=True)
err=True,
)
click.echo(crayons.blue(str(e)), err=True)
if 'no version found at all' in str(e):
click.echo(crayons.blue('Please check your version specifier and version number. See PEP440 for more information.'))
click.echo(
crayons.blue(
'Please check your version specifier and version number. See PEP440 for more information.'
)
)
raise RuntimeError
return resolved_tree, resolver
def venv_resolve_deps(deps, which, project, pre=False, verbose=False, clear=False):
from . import resolver
from .import resolver
import json
resolver = escape_grouped_arguments(resolver.__file__.rstrip('co'))
cmd = '{0} {1} {2} {3}'.format(escape_grouped_arguments(which('python')), resolver, '--pre' if pre else '', '--verbose' if verbose else '')
cmd = '{0} {1} {2} {3}'.format(
escape_grouped_arguments(which('python')),
resolver,
'--pre' if pre else '',
'--verbose' if verbose else '',
)
os.environ['PIPENV_PACKAGES'] = '\n'.join(deps)
c = delegator.run(cmd, block=True)
del os.environ['PIPENV_PACKAGES']
try:
assert c.return_code == 0
except AssertionError:
@@ -367,125 +368,138 @@ def venv_resolve_deps(deps, which, project, pre=False, verbose=False, clear=Fals
else:
click.echo(c.err[int(len(c.err) / 2) - 1:], err=True)
sys.exit(c.return_code)
if verbose:
click.echo(c.out.split('RESULTS:')[0], err=True)
try:
return json.loads(c.out.split('RESULTS:')[1].strip())
except IndexError:
raise RuntimeError('There was a problem with locking.')
def resolve_deps(deps, which, project, sources=None, verbose=False, python=False, clear=False, pre=False, allow_global=False):
def resolve_deps(
deps,
which,
project,
sources=None,
verbose=False,
python=False,
clear=False,
pre=False,
allow_global=False,
):
"""Given a list of dependencies, return a resolved list of dependencies,
using pip-tools -- and their hashes, using the warehouse API / pip9.
"""
index_lookup = {}
markers_lookup = {}
python_path = which('python', allow_global=allow_global)
backup_python_path = sys.executable
results = []
# First (proper) attempt:
with HackedPythonVersion(python_version=python, python_path=python_path):
try:
resolved_tree, resolver = actually_resolve_reps(deps, index_lookup, markers_lookup, project, sources, verbose, clear, pre)
resolved_tree, resolver = actually_resolve_reps(
deps,
index_lookup,
markers_lookup,
project,
sources,
verbose,
clear,
pre,
)
except RuntimeError:
# Don't exit here, like usual.
resolved_tree = None
# Second (last-resort) attempt:
if resolved_tree is None:
with HackedPythonVersion(python_version='.'.join([str(s) for s in sys.version_info[:3]]), python_path=backup_python_path):
with HackedPythonVersion(
python_version='.'.join([str(s) for s in sys.version_info[:3]]),
python_path=backup_python_path,
):
try:
# Attempt to resolve again, with different Python version information,
# particularly for particularly particular packages.
resolved_tree, resolver = actually_resolve_reps(deps, index_lookup, markers_lookup, project, sources, verbose, clear, pre)
resolved_tree, resolver = actually_resolve_reps(
deps,
index_lookup,
markers_lookup,
project,
sources,
verbose,
clear,
pre,
)
except RuntimeError:
sys.exit(1)
for result in resolved_tree:
if not result.editable:
name = pep423_name(result.name)
version = clean_pkg_version(result.specifier)
index = index_lookup.get(result.name)
if not markers_lookup.get(result.name):
markers = str(result.markers) if result.markers and 'extra' not in str(result.markers) else None
markers = str(result.markers) if result.markers and 'extra' not in str(
result.markers
) else None
else:
markers = markers_lookup.get(result.name)
collected_hashes = []
if 'python.org' in '|'.join([source['url'] for source in sources]):
try:
# Grab the hashes from the new warehouse API.
r = requests.get('https://pypi.org/pypi/{0}/json'.format(name), timeout=10)
r = requests.get(
'https://pypi.org/pypi/{0}/json'.format(name), timeout=10
)
api_releases = r.json()['releases']
cleaned_releases = {}
for api_version, api_info in api_releases.items():
cleaned_releases[clean_pkg_version(api_version)] = api_info
for release in cleaned_releases[version]:
collected_hashes.append(release['digests']['sha256'])
collected_hashes = ['sha256:' + s for s in collected_hashes]
except (ValueError, KeyError, ConnectionError) as e:
if verbose:
click.echo('{0}: Error generating hash for {1}'.format(crayons.red('Warning', bold=True), name))
click.echo(
'{0}: Error generating hash for {1}'.format(
crayons.red('Warning', bold=True), name
)
)
# Collect un-collectable hashes (should work with devpi).
try:
collected_hashes = collected_hashes + list(list(resolver.resolve_hashes([result]).items())[0][1])
collected_hashes = collected_hashes + list(
list(resolver.resolve_hashes([result]).items())[0][1]
)
except (ValueError, KeyError, ConnectionError, IndexError):
if verbose:
print('Error generating hash for {}'.format(name))
collected_hashes = list(set(collected_hashes))
d = {'name': name, 'version': version, 'hashes': collected_hashes}
if index:
d.update({'index': index})
if markers:
d.update({'markers': markers.replace('"', "'")})
results.append(d)
return results
def multi_split(s, split):
"""Splits on multiple given separators."""
for r in split:
s = s.replace(r, '|')
return [i for i in s.split('|') if len(i) > 0]
def convert_deps_from_pip(dep):
""""Converts a pip-formatted dependency to a Pipfile-formatted one."""
dependency = {}
req = get_requirement(dep)
extras = {'extras': req.extras}
# File installs.
if (req.uri or req.path or is_installable_file(req.name)) and not req.vcs:
# Assign a package name to the file, last 7 of it's sha256 hex digest.
if not req.uri and not req.path:
req.path = os.path.abspath(req.name)
hashable_path = req.uri if req.uri else req.path
req.name = hashlib.sha256(hashable_path.encode('utf-8')).hexdigest()
req.name = req.name[len(req.name) - 7:]
@@ -494,68 +508,56 @@ def convert_deps_from_pip(dep):
dependency[req.name] = {'file': hashable_path}
else:
dependency[req.name] = {'path': hashable_path}
if req.extras:
dependency[req.name].update(extras)
# Add --editable if applicable
if req.editable:
dependency[req.name].update({'editable': True})
# VCS Installs. Extra check for unparsed git over SSH
elif req.vcs or is_vcs(req.path):
if req.name is None:
raise ValueError('pipenv requires an #egg fragment for version controlled '
'dependencies. Please install remote dependency '
'in the form {0}#egg=<package-name>.'.format(req.uri))
raise ValueError(
'pipenv requires an #egg fragment for version controlled '
'dependencies. Please install remote dependency '
'in the form {0}#egg=<package-name>.'.format(req.uri)
)
# Set up this requirement as a proper VCS requirement if it was not
if not req.vcs and req.path.startswith(VCS_LIST):
req.vcs = [vcs for vcs in VCS_LIST if req.path.startswith(vcs)][0]
req.uri = '{0}'.format(req.path)
req.path = None
# Crop off the git+, etc part.
if req.uri.startswith('{0}+'.format(req.vcs)):
req.uri = req.uri[len(req.vcs) + 1:]
dependency.setdefault(req.name, {}).update({req.vcs: req.uri})
# Add --editable, if it's there.
if req.editable:
dependency[req.name].update({'editable': True})
# Add subdirectory, if it's there
if req.subdirectory:
dependency[req.name].update({'subdirectory': req.subdirectory})
# Add the specifier, if it was provided.
if req.revision:
dependency[req.name].update({'ref': req.revision})
# Extras: e.g. #egg=requests[security]
if req.extras:
dependency[req.name].update({'extras': req.extras})
elif req.extras or req.specs:
specs = None
# Comparison operators: e.g. Django>1.10
if req.specs:
r = multi_split(dep, '!=<>~')
specs = dep[len(r[0]):]
dependency[req.name] = specs
# Extras: e.g. requests[socks]
if req.extras:
dependency[req.name] = extras
if specs:
dependency[req.name].update({'version': specs})
# Bare dependencies: e.g. requests
else:
dependency[dep] = '*'
# Cleanup when there's multiple values, e.g. -e.
if len(dependency) > 1:
for key in dependency.copy():
@@ -563,45 +565,41 @@ def convert_deps_from_pip(dep):
del dependency[key]
return dependency
def is_star(val):
return isinstance(val, six.string_types) and val == '*'
def is_pinned(val):
return isinstance(val, six.string_types) and val.startswith('==')
def convert_deps_to_pip(deps, project=None, r=True, include_index=False):
""""Converts a Pipfile-formatted dependency to a pip-formatted one."""
dependencies = []
for dep in deps.keys():
# Default (e.g. '>1.10').
extra = deps[dep] if isinstance(deps[dep], six.string_types) else ''
version = ''
index = ''
# Get rid of '*'.
if is_star(deps[dep]) or str(extra) == '{}':
extra = ''
hash = ''
# Support for single hash (spec 1).
if 'hash' in deps[dep]:
hash = ' --hash={0}'.format(deps[dep]['hash'])
# Support for multiple hashes (spec 2).
if 'hashes' in deps[dep]:
hash = '{0} '.format(''.join([' --hash={0} '.format(h) for h in deps[dep]['hashes']]))
hash = '{0} '.format(
''.join([' --hash={0} '.format(h) for h in deps[dep]['hashes']])
)
# Support for extras (e.g. requests[socks])
if 'extras' in deps[dep]:
extra = '[{0}]'.format(','.join(deps[dep]['extras']))
if 'version' in deps[dep]:
if not is_star(deps[dep]['version']):
version = deps[dep]['version']
# For lockfile format.
if 'markers' in deps[dep]:
specs = '; {0}'.format(deps[dep]['markers'])
@@ -616,7 +614,6 @@ def convert_deps_to_pip(deps, project=None, r=True, include_index=False):
specs = '; {0}'.format(' and '.join(specs))
else:
specs = ''
if include_index and not is_file(deps[dep]) and not is_vcs(deps[dep]):
pip_src_args = []
if 'index' in deps[dep]:
@@ -625,52 +622,42 @@ def convert_deps_to_pip(deps, project=None, r=True, include_index=False):
pip_src_args = project.sources
pip_args = prepare_pip_source_args(pip_src_args)
index = ' '.join(pip_args)
# Support for version control
maybe_vcs = [vcs for vcs in VCS_LIST if vcs in deps[dep]]
vcs = maybe_vcs[0] if maybe_vcs else None
# Support for files.
if 'file' in deps[dep]:
extra = '{1}{0}'.format(extra, deps[dep]['file']).strip()
# Flag the file as editable if it is a local relative path
if 'editable' in deps[dep]:
dep = '-e '
else:
dep = ''
# Support for paths.
elif 'path' in deps[dep]:
extra = '{1}{0}'.format(extra, deps[dep]['path']).strip()
# Flag the file as editable if it is a local relative path
if 'editable' in deps[dep]:
dep = '-e '
else:
dep = ''
if vcs:
extra = '{0}+{1}'.format(vcs, deps[dep][vcs])
# Support for @refs.
if 'ref' in deps[dep]:
extra += '@{0}'.format(deps[dep]['ref'])
extra += '#egg={0}'.format(dep)
# Support for subdirectory
if 'subdirectory' in deps[dep]:
extra += '&subdirectory={0}'.format(deps[dep]['subdirectory'])
# Support for editable.
if 'editable' in deps[dep]:
# Support for --egg.
dep = '-e '
else:
dep = ''
s = '{0}{1}{2}{3}{4} {5}'.format(dep, extra, version, specs, hash, index).strip()
s = '{0}{1}{2}{3}{4} {5}'.format(dep, extra, version, specs, hash, index).strip(
)
dependencies.append(s)
if not r:
return dependencies
@@ -689,11 +676,15 @@ def mkdir_p(newdir):
- parent directory(ies) does not exist, make them as well
From: http://code.activestate.com/recipes/82465-a-friendly-mkdir/
"""
if os.path.isdir(newdir):
pass
elif os.path.isfile(newdir):
raise OSError("a file with the same name as the desired dir, '{0}', already exists.".format(newdir))
raise OSError(
"a file with the same name as the desired dir, '{0}', already exists.".format(
newdir
)
)
else:
head, tail = os.path.split(newdir)
if head and not os.path.isdir(head):
@@ -706,12 +697,12 @@ def is_required_version(version, specified_version):
"""Check to see if there's a hard requirement for version
number provided in the Pipfile.
"""
# Certain packages may be defined with multiple values.
if isinstance(specified_version, dict):
specified_version = specified_version.get('version', '')
if specified_version.startswith('=='):
return version.strip() == specified_version.split('==')[1].strip()
return True
@@ -733,12 +724,16 @@ def clean_git_uri(uri):
def is_vcs(pipfile_entry):
import requirements
"""Determine if dictionary entry from Pipfile is for a vcs dependency."""
"""Determine if dictionary entry from Pipfile is for a vcs dependency."""
if hasattr(pipfile_entry, 'keys'):
return any(key for key in pipfile_entry.keys() if key in VCS_LIST)
elif isinstance(pipfile_entry, six.string_types):
return bool(requirements.requirement.VCS_REGEX.match(clean_git_uri(pipfile_entry)))
return bool(
requirements.requirement.VCS_REGEX.match(clean_git_uri(pipfile_entry))
)
return False
@@ -746,10 +741,14 @@ def is_installable_file(path):
"""Determine if a path can potentially be installed"""
from pip9.utils import is_installable_dir
from pip9.utils.packaging import specifiers
if hasattr(path, 'keys') and any(key for key in path.keys() if key in ['file', 'path']):
if hasattr(path, 'keys') and any(
key for key in path.keys() if key in ['file', 'path']
):
path = urlparse(path['file']).path if 'file' in path else path['path']
if not isinstance(path, six.string_types) or path == '*':
return False
# If the string starts with a valid specifier operator, test if it is a valid
# specifier set before making a path object (to avoid breaking windows)
if any(path.startswith(spec) for spec in '!=<>~'):
@@ -760,14 +759,18 @@ def is_installable_file(path):
pass
else:
return False
if not os.path.exists(os.path.abspath(path)):
return False
lookup_path = Path(path)
absolute_path = '{0}'.format(lookup_path.absolute())
if lookup_path.is_dir() and is_installable_dir(absolute_path):
return True
elif lookup_path.is_file() and is_archive_file(absolute_path):
return True
return False
@@ -797,23 +800,26 @@ def pep440_version(version):
def pep423_name(name):
"""Normalize package name to PEP 423 style standard."""
name = name.lower()
if any(i not in name for i in (VCS_LIST+SCHEME_LIST)):
if any(i not in name for i in (VCS_LIST + SCHEME_LIST)):
return name.replace('_', '-')
else:
return name
def proper_case(package_name):
"""Properly case project name from pypi.org."""
# Hit the simple API.
r = requests.get('https://pypi.org/pypi/{0}/json'.format(package_name), timeout=0.3, stream=True)
r = requests.get(
'https://pypi.org/pypi/{0}/json'.format(package_name), timeout=0.3, stream=True
)
if not r.ok:
raise IOError('Unable to find package {0} in PyPI repository.'.format(package_name))
raise IOError(
'Unable to find package {0} in PyPI repository.'.format(package_name)
)
r = parse.parse('https://pypi.org/pypi/{name}/json', r.url)
good_name = r['name']
return good_name
@@ -867,15 +873,22 @@ def split_section(input_file, section_suffix, test_function):
def split_file(file_dict):
"""Split VCS and editable dependencies out from file."""
sections = {
'vcs': is_vcs,
'editable': lambda x: hasattr(x, 'keys') and x.get('editable')
'vcs': is_vcs, 'editable': lambda x: hasattr(x, 'keys') and x.get('editable')
}
for k, func in sections.items():
file_dict = split_section(file_dict, k, func)
return file_dict
def merge_deps(file_dict, project, dev=False, requirements=False, ignore_hashes=False, blocking=False, only=False):
def merge_deps(
file_dict,
project,
dev=False,
requirements=False,
ignore_hashes=False,
blocking=False,
only=False,
):
"""
Given a file_dict, merges dependencies and converts them to pip dependency lists.
:param dict file_dict: The result of calling :func:`pipenv.utils.split_file`
@@ -889,12 +902,18 @@ def merge_deps(file_dict, project, dev=False, requirements=False, ignore_hashes=
"""
deps = []
requirements_deps = []
for section in list(file_dict.keys()):
# Turn develop-vcs into ['develop', 'vcs']
section_name, suffix = section.rsplit('-', 1) if '-' in section and not section == 'dev-packages' else (section, None)
if not file_dict[section] or section_name not in ('dev-packages', 'packages', 'default', 'develop'):
section_name, suffix = section.rsplit(
'-', 1
) if '-' in section and not section == 'dev-packages' else (
section, None
)
if not file_dict[section] or section_name not in (
'dev-packages', 'packages', 'default', 'develop'
):
continue
is_dev = section_name in ('dev-packages', 'develop')
if is_dev and not dev:
continue
@@ -903,12 +922,13 @@ def merge_deps(file_dict, project, dev=False, requirements=False, ignore_hashes=
for k, v in file_dict[section]:
if 'hash' in v:
del v['hash']
# Block and ignore hashes for all suffixed sections (vcs/editable)
no_hashes = True if suffix else ignore_hashes
block = True if suffix else blocking
include_index = True if not suffix else False
converted = convert_deps_to_pip(file_dict[section], project, r=False, include_index=include_index)
converted = convert_deps_to_pip(
file_dict[section], project, r=False, include_index=include_index
)
deps.extend((d, no_hashes, block) for d in converted)
if dev and is_dev and requirements:
requirements_deps.extend((d, no_hashes, block) for d in converted)
@@ -917,15 +937,12 @@ def merge_deps(file_dict, project, dev=False, requirements=False, ignore_hashes=
def recase_file(file_dict):
"""Recase file before writing to output."""
if 'packages' in file_dict or 'dev-packages' in file_dict:
sections = ('packages', 'dev-packages')
elif 'default' in file_dict or 'develop' in file_dict:
sections = ('default', 'develop')
for section in sections:
file_section = file_dict.get(section, {})
# Try to properly case each key if we can.
for key in list(file_section.keys()):
try:
@@ -933,7 +950,6 @@ def recase_file(file_dict):
except IOError:
cased_key = key
file_section[cased_key] = file_section.pop(key)
return file_dict
@@ -957,10 +973,11 @@ def find_windows_executable(bin_path, exe_name):
exec_files = [filename for filename in exec_paths if os.path.isfile(filename)]
if exec_files:
return exec_files[0]
return find_executable(exe_name)
def get_converted_relative_path(path, relative_to=os.curdir):
def get_converted_relative_path(path, relative_to= os.curdir):
"""Given a vague relative path, return the path relative to the given location"""
return os.path.join('.', os.path.relpath(path, start=relative_to))
@@ -969,9 +986,7 @@ def walk_up(bottom):
"""Mimic os.walk, but walk 'up' instead of down the directory tree.
From: https://gist.github.com/zdavkeos/1098474
"""
bottom = os.path.realpath(bottom)
# Get files in current dir.
try:
names = os.listdir(bottom)
@@ -984,11 +999,9 @@ def walk_up(bottom):
dirs.append(name)
else:
nondirs.append(name)
yield bottom, dirs, nondirs
new_path = os.path.realpath(os.path.join(bottom, '..'))
# See if we are at the top.
if new_path == bottom:
return
@@ -999,19 +1012,20 @@ def walk_up(bottom):
def find_requirements(max_depth=3):
"""Returns the path of a Pipfile in parent directories."""
i = 0
for c, d, f in walk_up(os.getcwd()):
i += 1
if i < max_depth:
if 'requirements.txt':
r = os.path.join(c, 'requirements.txt')
if os.path.isfile(r):
return r
raise RuntimeError('No requirements.txt found!')
# Borrowed from pew to avoid importing pew which imports psutil
# See https://github.com/berdario/pew/blob/master/pew/_utils.py#L82
@contextmanager
@@ -1020,6 +1034,7 @@ def temp_environ():
environ = dict(os.environ)
try:
yield
finally:
os.environ.clear()
os.environ.update(environ)
@@ -1047,9 +1062,11 @@ def need_update_check():
p = os.sep.join((PIPENV_CACHE_DIR, '.pipenv_update_check'))
if not os.path.exists(p):
return True
out_of_date_time = time() - (24 * 60 * 60)
if os.path.isfile(p) and os.path.getmtime(p) <= out_of_date_time:
return True
else:
return False
@@ -1076,10 +1093,12 @@ def normalize_drive(path):
"""
if os.name != 'nt' or not isinstance(path, six.string_types):
return path
drive, tail = os.path.splitdrive(path)
# Only match (lower cased) local drives (e.g. 'c:'), not UNC mounts.
if drive.islower() and len(drive) == 2 and drive[1] == ':':
return '{}{}'.format(drive.upper(), tail)
return path
@@ -1090,6 +1109,7 @@ def is_readonly_path(fn):
"""
if os.path.exists(fn):
return (os.stat(fn).st_mode & stat.S_IREAD) or not os.access(fn, os.W_OK)
return False
@@ -1100,7 +1120,9 @@ def set_write_bit(fn):
def rmtree(directory, ignore_errors=False):
shutil.rmtree(directory, ignore_errors=ignore_errors, onerror=handle_remove_readonly)
shutil.rmtree(
directory, ignore_errors=ignore_errors, onerror=handle_remove_readonly
)
def handle_remove_readonly(func, path, exc):
@@ -1121,9 +1143,11 @@ def handle_remove_readonly(func, path, exc):
if e.errno in [errno.EACCES, errno.EPERM]:
warnings.warn(default_warning_message.format(path), ResourceWarning)
return
if exc_exception.errno in [errno.EACCES, errno.EPERM]:
warnings.warn(default_warning_message.format(path), ResourceWarning)
return
raise
@@ -1142,17 +1166,19 @@ class TemporaryDirectory(object):
def __init__(self, suffix=None, prefix=None, dir=None):
if 'RAM_DISK' in os.environ:
import uuid
name = uuid.uuid4().hex
dir_name = os.path.sep.join([os.environ['RAM_DISK'].strip(), name])
os.mkdir(dir_name)
self.name = dir_name
else:
self.name = tempfile.mkdtemp(suffix, prefix, dir)
self._finalizer = finalize(
self, self._cleanup, self.name,
warn_message="Implicitly cleaning up {!r}".format(self))
self,
self._cleanup,
self.name,
warn_message="Implicitly cleaning up {!r}".format(self),
)
@classmethod
def _cleanup(cls, name, warn_message):