mirror of
https://github.com/kennethreitz/pipenv.git
synced 2026-06-05 22:50:18 +00:00
blah
This commit is contained in:
+227
@@ -0,0 +1,227 @@
|
||||
import os
|
||||
import subprocess
|
||||
import shlex
|
||||
|
||||
from pexpect.popen_spawn import PopenSpawn
|
||||
import daemon
|
||||
|
||||
|
||||
class Command(object):
|
||||
def __init__(self, cmd):
|
||||
super(Command, self).__init__()
|
||||
self.cmd = cmd
|
||||
self.subprocess = None
|
||||
self.blocking = None
|
||||
self.was_run = False
|
||||
self.__out = None
|
||||
|
||||
def __repr__(self):
|
||||
return '<Commmand {!r}>'.format(self.cmd)
|
||||
|
||||
@property
|
||||
def _popen_args(self):
|
||||
return self.cmd
|
||||
|
||||
@property
|
||||
def _default_popen_kwargs(self):
|
||||
return {
|
||||
'env': os.environ.copy(),
|
||||
'stdin': subprocess.PIPE,
|
||||
'stdout': subprocess.PIPE,
|
||||
'stderr': subprocess.PIPE,
|
||||
'shell': True,
|
||||
'universal_newlines': True,
|
||||
'bufsize': 0,
|
||||
}
|
||||
|
||||
@property
|
||||
def _default_pexpect_kwargs(self):
|
||||
return {
|
||||
'env': os.environ.copy(),
|
||||
}
|
||||
|
||||
@property
|
||||
def _uses_subprocess(self):
|
||||
return isinstance(self.subprocess, subprocess.Popen)
|
||||
|
||||
@property
|
||||
def _uses_pexpect(self):
|
||||
return isinstance(self.subprocess, PopenSpawn)
|
||||
|
||||
@property
|
||||
def std_out(self):
|
||||
return self.subprocess.stdout
|
||||
|
||||
@property
|
||||
def _pexpect_out(self):
|
||||
result = ''
|
||||
|
||||
if self.subprocess.before:
|
||||
result += self.subprocess.before
|
||||
|
||||
if isinstance(self.subprocess.after, str):
|
||||
result += self.subprocess.after
|
||||
|
||||
result += self.subprocess.read()
|
||||
return result
|
||||
|
||||
@property
|
||||
def out(self):
|
||||
"""Std/out output (cached), as well as stderr for non-blocking runs."""
|
||||
if self.__out:
|
||||
return self.__out
|
||||
|
||||
if self._uses_subprocess:
|
||||
self.__out = self.std_out.read()
|
||||
else:
|
||||
self.__out = self._pexpect_out
|
||||
|
||||
return self.__out
|
||||
|
||||
@property
|
||||
def std_err(self):
|
||||
return self.subprocess.stderr
|
||||
|
||||
@property
|
||||
def err(self):
|
||||
if self._uses_subprocess:
|
||||
return self.std_err.read()
|
||||
else:
|
||||
return self._pexpect_out
|
||||
|
||||
@property
|
||||
def pid(self):
|
||||
"""The process' PID."""
|
||||
# Support for pexpect's functionality.
|
||||
if hasattr(self.subprocess, 'proc'):
|
||||
return self.subprocess.proc.pid
|
||||
# Standard subprocess method.
|
||||
return self.subprocess.pid
|
||||
|
||||
@property
|
||||
def return_code(self):
|
||||
return self.subprocess.returncode
|
||||
|
||||
@property
|
||||
def std_in(self):
|
||||
return self.subprocess.stdin
|
||||
|
||||
def run(self, block=True):
|
||||
"""Runs the given command, with or without pexpect functionality enabled."""
|
||||
self.blocking = block
|
||||
|
||||
# Use subprocess.
|
||||
if self.blocking:
|
||||
s = subprocess.Popen(self._popen_args, **self._default_popen_kwargs)
|
||||
|
||||
# Otherwise, use pexpect.
|
||||
else:
|
||||
s = PopenSpawn(self._popen_args, **self._default_pexpect_kwargs)
|
||||
self.subprocess = s
|
||||
self.was_run = True
|
||||
|
||||
def expect(self, pattern, timeout=-1):
|
||||
"""Waits on the given pattern to appear in std_out"""
|
||||
|
||||
if self.blocking:
|
||||
raise RuntimeError('expect can only be used on non-blocking commands.')
|
||||
|
||||
self.subprocess.expect(pattern=pattern, timeout=timeout)
|
||||
|
||||
def send(self, s, end=os.linesep, signal=False):
|
||||
"""Sends the given string or signal to std_in."""
|
||||
|
||||
if self.blocking:
|
||||
raise RuntimeError('send can only be used on non-blocking commands.')
|
||||
|
||||
if not signal:
|
||||
if self._uses_subprocess:
|
||||
return self.subprocess.communicate(s + end)
|
||||
else:
|
||||
return self.subprocess.send(s + end)
|
||||
else:
|
||||
self.subprocess.send_signal(s)
|
||||
|
||||
def terminate(self):
|
||||
self.subprocess.terminate()
|
||||
|
||||
def kill(self):
|
||||
self.subprocess.kill()
|
||||
|
||||
def block(self):
|
||||
"""Blocks until process is complete."""
|
||||
self.subprocess.wait()
|
||||
|
||||
def daemonize(self):
|
||||
"""Daemonizes a non-blocking process."""
|
||||
if self.blocking:
|
||||
raise RuntimeError('daemonize can only be used on non-blocking commands.')
|
||||
|
||||
with daemon.DaemonContext():
|
||||
self.block()
|
||||
|
||||
def pipe(self, command):
|
||||
"""Runs the current command and passes its output to the next
|
||||
given process.
|
||||
"""
|
||||
if not self.was_run:
|
||||
self.run(block=False)
|
||||
|
||||
data = self.out
|
||||
|
||||
c = Command(command)
|
||||
c.run(block=False)
|
||||
if data:
|
||||
c.send(data)
|
||||
c.subprocess.sendeof()
|
||||
c.block()
|
||||
return c
|
||||
|
||||
|
||||
def _expand_args(command):
|
||||
"""Parses command strings and returns a Popen-ready list."""
|
||||
|
||||
# Prepare arguments.
|
||||
if isinstance(command, (str, unicode)):
|
||||
splitter = shlex.shlex(command.encode('utf-8'))
|
||||
splitter.whitespace = '|'
|
||||
splitter.whitespace_split = True
|
||||
command = []
|
||||
|
||||
while True:
|
||||
token = splitter.get_token()
|
||||
if token:
|
||||
command.append(token)
|
||||
else:
|
||||
break
|
||||
|
||||
command = list(map(shlex.split, command))
|
||||
|
||||
return command
|
||||
|
||||
|
||||
def chain(command):
|
||||
commands = _expand_args(command)
|
||||
data = None
|
||||
|
||||
for command in commands:
|
||||
|
||||
c = run(command, block=False)
|
||||
|
||||
if data:
|
||||
c.send(data)
|
||||
c.subprocess.sendeof()
|
||||
|
||||
data = c.out
|
||||
|
||||
return c
|
||||
|
||||
|
||||
def run(command, block=True):
|
||||
c = Command(command)
|
||||
c.run(block=block)
|
||||
|
||||
if block:
|
||||
c.block()
|
||||
|
||||
return c
|
||||
@@ -0,0 +1,32 @@
|
||||
import delegator
|
||||
import click
|
||||
import crayons
|
||||
|
||||
|
||||
def ensure_latest_pip():
|
||||
|
||||
# Ensure that pip is installed.
|
||||
c = delegator.run('pip install pip')
|
||||
|
||||
# Check if version is out of date.
|
||||
if 'however' in c.err:
|
||||
# If version is out of date, update.
|
||||
print crayons.yellow('Pip is out of date... updating to latest.')
|
||||
c = delegator.run('pip install pip --upgrade', block=False)
|
||||
print crayons.blue(c.out)
|
||||
|
||||
def ensure_virtualenv():
|
||||
c = delegator.run('pip install virtualenv')
|
||||
print c.out
|
||||
|
||||
|
||||
@click.command()
|
||||
def main():
|
||||
# Ensure that pip is installed and up-to-date.
|
||||
ensure_latest_pip()
|
||||
|
||||
# Ensure that virtualenv is installed.
|
||||
ensure_virtualenv()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -0,0 +1,21 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__all__ = [
|
||||
"__title__", "__summary__", "__uri__", "__version__", "__author__",
|
||||
"__email__", "__license__", "__copyright__",
|
||||
]
|
||||
|
||||
__title__ = "pipfile"
|
||||
__summary__ = ""
|
||||
__uri__ = "https://github.com/pypa/pipfile"
|
||||
|
||||
__version__ = "16.0.dev0"
|
||||
|
||||
__author__ = "Kenneth Reitz and individual contributors"
|
||||
__email__ = "me@kennethreitz.org"
|
||||
|
||||
__license__ = "BSD or Apache License, Version 2.0"
|
||||
__copyright__ = "Copyright 2017 %s" % __author__
|
||||
@@ -0,0 +1,11 @@
|
||||
# This file is dual licensed under the terms of the Apache License, Version
|
||||
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
||||
# for complete details.
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
from .__about__ import (
|
||||
__author__, __copyright__, __email__, __license__, __summary__, __title__,
|
||||
__uri__, __version__
|
||||
)
|
||||
|
||||
from .api import load, Pipfile
|
||||
@@ -0,0 +1,54 @@
|
||||
import _ctypes
|
||||
import json
|
||||
import re
|
||||
from collections import OrderedDict
|
||||
|
||||
# http://stackoverflow.com/questions/13249415/can-i-implement-custom-indentation-for-pretty-printing-in-python-s-json-module
|
||||
#
|
||||
def di(obj_id):
|
||||
# from http://stackoverflow.com/a/15012814/355230
|
||||
""" Reverse of id() function. """
|
||||
return _ctypes.PyObj_FromPtr(obj_id)
|
||||
|
||||
class NoIndent(object):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
def __repr__(self):
|
||||
if isinstance(self.value, OrderedDict):
|
||||
return json.dumps(self.value)
|
||||
if not isinstance(self.value, list):
|
||||
return repr(self.value)
|
||||
else: # the sort the representation of any dicts in the list
|
||||
reps = ('{{{}}}'.format(', '.join(('{!r}:{}'.format(
|
||||
k, v) for k, v in sorted(v.items()))))
|
||||
if isinstance(v, dict) else repr(v) for v in self.value)
|
||||
|
||||
return '[' + ', '.join(reps) + ']'
|
||||
|
||||
class NoIndentEncoder(json.JSONEncoder):
|
||||
FORMAT_SPEC = "@@{}@@"
|
||||
regex = re.compile(FORMAT_SPEC.format(r"(\d+)"))
|
||||
|
||||
def default(self, obj):
|
||||
if not isinstance(obj, NoIndent):
|
||||
return super(NoIndentEncoder, self).default(obj)
|
||||
return self.FORMAT_SPEC.format(id(obj))
|
||||
|
||||
def encode(self, obj):
|
||||
format_spec = self.FORMAT_SPEC # local var to expedite access
|
||||
result = super(NoIndentEncoder, self).encode(obj)
|
||||
for match in self.regex.finditer(result):
|
||||
id = int(match.group(1))
|
||||
result = result.replace('"{}"'.format(format_spec.format(id)),
|
||||
repr(di(int(id))))
|
||||
return result
|
||||
|
||||
|
||||
def dumps(obj):
|
||||
"""Returns specific data in a specific format."""
|
||||
obj['_meta']['requires'] = [NoIndent(i) for i in obj['_meta']['requires']]
|
||||
obj['_meta']['sources'] = [NoIndent(i) for i in obj['_meta']['sources']]
|
||||
obj['default'] = [NoIndent(i) for i in obj['default']]
|
||||
obj['develop'] = [NoIndent(i) for i in obj['develop']]
|
||||
|
||||
return json.dumps(obj, sort_keys=True, cls=NoIndentEncoder, indent=4, separators=(',', ': '))
|
||||
+196
@@ -0,0 +1,196 @@
|
||||
import toml
|
||||
|
||||
import json
|
||||
import hashlib
|
||||
import platform
|
||||
import sys
|
||||
import os
|
||||
from collections import OrderedDict
|
||||
|
||||
from . import _json
|
||||
|
||||
def format_full_version(info):
|
||||
version = '{0.major}.{0.minor}.{0.micro}'.format(info)
|
||||
kind = info.releaselevel
|
||||
if kind != 'final':
|
||||
version += kind[0] + str(info.serial)
|
||||
return version
|
||||
|
||||
|
||||
def walk_up(bottom):
|
||||
"""mimic os.walk, but walk 'up' instead of down the directory tree.
|
||||
From: https://gist.github.com/zdavkeos/1098474
|
||||
"""
|
||||
|
||||
bottom = os.path.realpath(bottom)
|
||||
|
||||
# get files in current dir
|
||||
try:
|
||||
names = os.listdir(bottom)
|
||||
except Exception as e:
|
||||
print e
|
||||
return
|
||||
|
||||
dirs, nondirs = [], []
|
||||
for name in names:
|
||||
if os.path.isdir(os.path.join(bottom, name)):
|
||||
dirs.append(name)
|
||||
else:
|
||||
nondirs.append(name)
|
||||
|
||||
yield bottom, dirs, nondirs
|
||||
|
||||
new_path = os.path.realpath(os.path.join(bottom, '..'))
|
||||
|
||||
# see if we are at the top
|
||||
if new_path == bottom:
|
||||
return
|
||||
|
||||
for x in walk_up(new_path):
|
||||
yield x
|
||||
|
||||
|
||||
|
||||
class PipfileParser(object):
|
||||
def __init__(self, filename='Pipfile'):
|
||||
self.filename = filename
|
||||
self.sources = []
|
||||
self.groups = OrderedDict({
|
||||
'default': [],
|
||||
'develop': []
|
||||
})
|
||||
self.group_stack = ['default']
|
||||
self.requirements = []
|
||||
|
||||
def __repr__(self):
|
||||
return '<PipfileParser path={0!r}'.format(self.filename)
|
||||
|
||||
def parse(self):
|
||||
# Open the Pipfile.
|
||||
with open(self.filename) as f:
|
||||
content = f.read()
|
||||
|
||||
# Load the default configuration.
|
||||
default_config = {
|
||||
u'source': [{u'url': u'https://pypi.org/', u'verify_ssl': True}],
|
||||
u'packages': {},
|
||||
u'requires': {},
|
||||
u'dev-packages': {}
|
||||
}
|
||||
|
||||
config = {}
|
||||
config.update(default_config)
|
||||
|
||||
# Load the Pipfile's configuration.
|
||||
config = toml.loads(content)
|
||||
|
||||
# Structure the data for output.
|
||||
data = OrderedDict({
|
||||
'_meta': {
|
||||
'sources': config['source'],
|
||||
'requires': config['requires']
|
||||
},
|
||||
})
|
||||
|
||||
# TODO: Validate given data here.
|
||||
self.groups['default'] = config['packages']
|
||||
self.groups['development'] = config['packages']
|
||||
|
||||
# Update the data structure with group information.
|
||||
data.update(self.groups)
|
||||
return data
|
||||
|
||||
|
||||
class Pipfile(object):
|
||||
def __init__(self, filename):
|
||||
super(Pipfile, self).__init__()
|
||||
self.filename = filename
|
||||
self.data = None
|
||||
|
||||
@staticmethod
|
||||
def find(max_depth=3):
|
||||
"""Returns the path of a Pipfile in parent directories."""
|
||||
i = 0
|
||||
for c, d, f in walk_up(os.getcwd()):
|
||||
i += 1
|
||||
|
||||
if i < max_depth:
|
||||
if 'Pipfile':
|
||||
return '{}/Pipfile'.format(c)
|
||||
|
||||
@classmethod
|
||||
def load(klass, filename):
|
||||
"""Load a Pipfile from a given filename."""
|
||||
p = PipfileParser(filename=filename)
|
||||
pipfile = klass(filename=filename)
|
||||
pipfile.data = p.parse()
|
||||
return pipfile
|
||||
|
||||
@property
|
||||
def hash(self):
|
||||
"""Returns the SHA256 of the pipfile."""
|
||||
return hashlib.sha256(self.contents).hexdigest()
|
||||
|
||||
@property
|
||||
def contents(self):
|
||||
"""Returns the contents of the pipfile."""
|
||||
with open(self.filename, 'r') as f:
|
||||
return f.read()
|
||||
|
||||
def freeze(self):
|
||||
"""Returns a JSON representation of the Pipfile."""
|
||||
data = self.data
|
||||
data['_meta']['Pipfile-sha256'] = self.hash
|
||||
# return _json.dumps(data)
|
||||
return json.dumps(data)
|
||||
|
||||
def assert_requirements(self):
|
||||
""""Asserts PEP 508 specifiers."""
|
||||
|
||||
# Support for 508's implementation_version.
|
||||
if hasattr(sys, 'implementation'):
|
||||
implementation_version = format_full_version(sys.implementation.version)
|
||||
else:
|
||||
implementation_version = "0"
|
||||
|
||||
# Default to cpython for 2.7.
|
||||
if hasattr(sys, 'implementation'):
|
||||
implementation_name = sys.implementation.name
|
||||
else:
|
||||
implementation_name = 'cpython'
|
||||
|
||||
lookup = {
|
||||
'os_name': os.name,
|
||||
'sys_platform': sys.platform,
|
||||
'platform_machine': platform.machine(),
|
||||
'platform_python_implementation': platform.python_implementation(),
|
||||
'platform_release': platform.release(),
|
||||
'platform_system': platform.system(),
|
||||
'platform_version': platform.version(),
|
||||
'python_version': platform.python_version()[:3],
|
||||
'python_full_version': platform.python_version(),
|
||||
'implementation_name': implementation_name,
|
||||
'implementation_version': implementation_version
|
||||
}
|
||||
|
||||
# Assert each specified requirement.
|
||||
for requirement in self.data['_meta']['requires']:
|
||||
marker = requirement['marker']
|
||||
specifier = requirement['specifier']
|
||||
|
||||
if marker in lookup:
|
||||
try:
|
||||
assert lookup[marker] == specifier
|
||||
except AssertionError:
|
||||
raise AssertionError('Specifier {!r} does not match {!r}.'.format(marker, specifier))
|
||||
|
||||
|
||||
def load(pipfile_path=None):
|
||||
"""Loads a pipfile from a given path.
|
||||
If none is provided, one will try to be found.
|
||||
"""
|
||||
|
||||
if pipfile_path is None:
|
||||
pipfile_path = Pipfile.find()
|
||||
|
||||
return Pipfile.load(filename=pipfile_path)
|
||||
@@ -0,0 +1,5 @@
|
||||
click==6.7
|
||||
daemon==1.1
|
||||
delegator.py==0.0.1
|
||||
pexpect==4.2.1
|
||||
ptyprocess==0.5.1
|
||||
Reference in New Issue
Block a user