mirror of
https://github.com/kennethreitz/pipenv.git
synced 2026-06-05 22:50:18 +00:00
+3
-2
@@ -1,7 +1,8 @@
|
||||
from __future__ import print_function, absolute_import
|
||||
|
||||
__version__ = "0.1.2"
|
||||
__version__ = "0.1.4.dev0"
|
||||
|
||||
__all__ = ["Finder", "WindowsFinder", "SystemPath"]
|
||||
__all__ = ["Finder", "WindowsFinder", "SystemPath", "InvalidPythonVersion"]
|
||||
from .pythonfinder import Finder
|
||||
from .models import SystemPath, WindowsFinder
|
||||
from .exceptions import InvalidPythonVersion
|
||||
|
||||
+14
@@ -0,0 +1,14 @@
|
||||
# Taken from pip: https://github.com/pypa/pip/blob/95bcf8c5f6394298035a7332c441868f3b0169f4/src/pip/_vendor/Makefile
|
||||
all: clean vendor
|
||||
|
||||
clean:
|
||||
@# Delete vendored items
|
||||
find . -maxdepth 1 -mindepth 1 -type d -exec rm -rf {} \;
|
||||
|
||||
vendor:
|
||||
@# Install vendored libraries
|
||||
pip install -t . -r vendor.txt
|
||||
|
||||
@# Cleanup .egg-info directories
|
||||
rm -rf *.egg-info
|
||||
rm -rf *.dist-info
|
||||
@@ -0,0 +1 @@
|
||||
-e git+https://github.com/zooba/pep514tools.git@320e48745660b696e2dcaee888fc2e516b435e48#egg=pep514tools
|
||||
Vendored
+1
-1
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding=utf-8 -*-
|
||||
|
||||
from __future__ import print_function, absolute_import
|
||||
import click
|
||||
import crayons
|
||||
import sys
|
||||
|
||||
+1
@@ -1,4 +1,5 @@
|
||||
# -*- coding=utf-8 -*-
|
||||
from __future__ import print_function, absolute_import
|
||||
import os
|
||||
import platform
|
||||
import sys
|
||||
|
||||
+7
@@ -0,0 +1,7 @@
|
||||
# -*- coding=utf-8 -*-
|
||||
from __future__ import print_function, absolute_import
|
||||
|
||||
|
||||
class InvalidPythonVersion(Exception):
|
||||
"""Raised when parsing an invalid python version"""
|
||||
pass
|
||||
+11
-13
@@ -1,4 +1,5 @@
|
||||
# -*- coding=utf-8 -*-
|
||||
from __future__ import print_function, absolute_import
|
||||
import abc
|
||||
import operator
|
||||
import six
|
||||
@@ -33,16 +34,13 @@ class BasePath(object):
|
||||
:returns: :class:`~pythonfinder.models.PathEntry` instance.
|
||||
"""
|
||||
|
||||
valid_names = [
|
||||
valid_names = [name] + [
|
||||
"{0}.{1}".format(name, ext).lower() if ext else "{0}".format(name).lower()
|
||||
for ext in KNOWN_EXTS
|
||||
]
|
||||
finder = filter(operator.attrgetter("is_executable"), self.children.values())
|
||||
name_getter = operator.attrgetter("path.name")
|
||||
return next(
|
||||
(child for child in finder if name_getter(child).lower() in valid_names),
|
||||
None,
|
||||
)
|
||||
children = self.children
|
||||
found = next((children[(self.path / child).as_posix()] for child in valid_names if (self.path / child).as_posix() in children), None)
|
||||
return found
|
||||
|
||||
def find_python_version(self, major, minor=None, patch=None, pre=None, dev=None):
|
||||
"""Search or self for the specified Python version and return the first match.
|
||||
@@ -62,16 +60,16 @@ class BasePath(object):
|
||||
is_py = operator.attrgetter("is_python")
|
||||
py_version = operator.attrgetter("as_python")
|
||||
if not self.is_dir:
|
||||
if self.is_python:
|
||||
return self if version_matcher(self.as_python) else None
|
||||
if self.is_python and self.as_python.matches(major, minor=minor, patch=patch, pre=pre, dev=dev):
|
||||
return self
|
||||
return
|
||||
finder = (c for c in self.children.values() if is_py(c) and py_version(c))
|
||||
finder = ((child, child.as_python) for child in self.children.values() if child.is_python and child.as_python)
|
||||
py_filter = filter(
|
||||
None, filter(lambda c: version_matcher(py_version(c)), finder)
|
||||
None, filter(lambda child: version_matcher(child[1]), finder)
|
||||
)
|
||||
version_sort = operator.attrgetter("py_version.version")
|
||||
version_sort = operator.attrgetter("version")
|
||||
return next(
|
||||
(c for c in sorted(py_filter, key=version_sort, reverse=True)), None
|
||||
(c[0] for c in sorted(py_filter, key=lambda child: child[1].version, reverse=True)), None
|
||||
)
|
||||
|
||||
|
||||
|
||||
+64
-20
@@ -1,4 +1,5 @@
|
||||
# -*- coding=utf-8 -*-
|
||||
from __future__ import print_function, absolute_import
|
||||
import attr
|
||||
import operator
|
||||
import os
|
||||
@@ -7,12 +8,14 @@ from collections import defaultdict
|
||||
from . import BasePath
|
||||
from .python import PythonVersion
|
||||
from ..environment import PYENV_INSTALLED, PYENV_ROOT
|
||||
from ..exceptions import InvalidPythonVersion
|
||||
from ..utils import (
|
||||
optional_instance_of,
|
||||
filter_pythons,
|
||||
path_is_known_executable,
|
||||
is_python_name,
|
||||
ensure_path,
|
||||
fs_str
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -50,7 +53,7 @@ class SystemPath(object):
|
||||
for p in self.python_executables:
|
||||
try:
|
||||
version_object = PythonVersion.from_path(p)
|
||||
except ValueError:
|
||||
except (ValueError, InvalidPythonVersion):
|
||||
continue
|
||||
version_dict[version_object.version_tuple].append(version_object)
|
||||
return version_dict
|
||||
@@ -64,11 +67,11 @@ class SystemPath(object):
|
||||
if PYENV_INSTALLED:
|
||||
self._setup_pyenv()
|
||||
venv = os.environ.get('VIRTUAL_ENV')
|
||||
if os.name == 'nt':
|
||||
bin_dir = 'Scripts'
|
||||
else:
|
||||
bin_dir = 'bin'
|
||||
if venv:
|
||||
if os.name == 'nt':
|
||||
bin_dir = 'Scripts'
|
||||
else:
|
||||
bin_dir = 'bin'
|
||||
p = Path(venv)
|
||||
self.path_order = [(p / bin_dir).as_posix()] + self.path_order
|
||||
self.paths[p] = PathEntry.create(
|
||||
@@ -76,9 +79,12 @@ class SystemPath(object):
|
||||
)
|
||||
if self.system:
|
||||
syspath = Path(sys.executable)
|
||||
self.path_order = [syspath.parent.as_posix()] + self.path_order
|
||||
self.paths[syspath.parent.as_posix()] = PathEntry.create(
|
||||
path=syspath.parent, is_root=True, only_python=True
|
||||
syspath_bin = syspath.parent
|
||||
if syspath_bin.name != bin_dir and syspath_bin.joinpath(bin_dir).exists():
|
||||
syspath_bin = syspath_bin / bin_dir
|
||||
self.path_order = [syspath_bin.as_posix()] + self.path_order
|
||||
self.paths[syspath_bin.as_posix()] = PathEntry.create(
|
||||
path=syspath_bin, is_root=True, only_python=False
|
||||
)
|
||||
|
||||
def _setup_pyenv(self):
|
||||
@@ -111,12 +117,24 @@ class SystemPath(object):
|
||||
self.paths.update({p.path: p for p in root_paths})
|
||||
|
||||
def get_path(self, path):
|
||||
_path = self.paths.get(path)
|
||||
if not _path and path in self.path_order:
|
||||
self.paths[path] = PathEntry.create(
|
||||
path=path, is_root=True, only_python=self.only_python
|
||||
path = Path(path)
|
||||
_path = self.paths.get(path.as_posix())
|
||||
if not _path and path.as_posix() in self.path_order:
|
||||
self.paths[path.as_posix()] = PathEntry.create(
|
||||
path=path.resolve(), is_root=True, only_python=self.only_python
|
||||
)
|
||||
return self.paths.get(path)
|
||||
return self.paths.get(path.as_posix())
|
||||
|
||||
def find_all(self, executable):
|
||||
"""Search the path for an executable. Return all copies.
|
||||
|
||||
:param executable: Name of the executable
|
||||
:type executable: str
|
||||
:returns: List[PathEntry]
|
||||
"""
|
||||
sub_which = operator.methodcaller("which", name=executable)
|
||||
filtered = filter(None, (sub_which(self.get_path(k)) for k in self.path_order))
|
||||
return [f for f in filtered]
|
||||
|
||||
def which(self, executable):
|
||||
"""Search for an executable on the path.
|
||||
@@ -126,9 +144,33 @@ class SystemPath(object):
|
||||
:returns: :class:`~pythonfinder.models.PathEntry` object.
|
||||
"""
|
||||
sub_which = operator.methodcaller("which", name=executable)
|
||||
return next(
|
||||
(sub_which(self.get_path(k)) for k in self.path_order), None
|
||||
filtered = filter(None, (sub_which(self.get_path(k)) for k in self.path_order))
|
||||
return next((f for f in filtered), None)
|
||||
|
||||
def find_all_python_versions(self, major, minor=None, patch=None, pre=None, dev=None):
|
||||
"""Search for a specific python version on the path. Return all copies
|
||||
|
||||
:param major: Major python version to search for.
|
||||
:type major: int
|
||||
:param minor: Minor python version to search for, defaults to None
|
||||
:param minor: int, optional
|
||||
:param path: Patch python version to search for, defaults to None
|
||||
:param path: int, optional
|
||||
:return: A list of :class:`~pythonfinder.models.PathEntry` instances matching the version requested.
|
||||
:rtype: List[:class:`~pythonfinder.models.PathEntry`]
|
||||
"""
|
||||
|
||||
sub_finder = operator.methodcaller(
|
||||
"find_python_version", major, minor=minor, patch=patch, pre=pre, dev=dev
|
||||
)
|
||||
if os.name == "nt" and self.windows_finder:
|
||||
windows_finder_version = sub_finder(self.windows_finder)
|
||||
if windows_finder_version:
|
||||
return windows_finder_version
|
||||
paths = (self.get_path(k) for k in self.path_order)
|
||||
path_filter = filter(None, (sub_finder(p) for p in paths if p is not None))
|
||||
version_sort = operator.attrgetter("as_python.version")
|
||||
return [c for c in sorted(path_filter, key=version_sort, reverse=True)]
|
||||
|
||||
def find_python_version(self, major, minor=None, patch=None, pre=None, dev=None):
|
||||
"""Search for a specific python version on the path.
|
||||
@@ -150,8 +192,8 @@ class SystemPath(object):
|
||||
windows_finder_version = sub_finder(self.windows_finder)
|
||||
if windows_finder_version:
|
||||
return windows_finder_version
|
||||
paths = [self.get_path(k) for k in self.path_order]
|
||||
path_filter = filter(None, [sub_finder(p) for p in paths])
|
||||
paths = (self.get_path(k) for k in self.path_order)
|
||||
path_filter = filter(None, (sub_finder(p) for p in paths if p is not None))
|
||||
version_sort = operator.attrgetter("as_python.version")
|
||||
return next(
|
||||
(c for c in sorted(path_filter, key=version_sort, reverse=True)), None
|
||||
@@ -180,7 +222,7 @@ class SystemPath(object):
|
||||
path_entries.update(
|
||||
{
|
||||
p.as_posix(): PathEntry.create(
|
||||
path=p, is_root=True, only_python=only_python
|
||||
path=p.absolute(), is_root=True, only_python=only_python
|
||||
)
|
||||
for p in _path_objects
|
||||
}
|
||||
@@ -197,6 +239,9 @@ class PathEntry(BasePath):
|
||||
py_version = attr.ib(default=None)
|
||||
pythons = attr.ib(default=None)
|
||||
|
||||
def __str__(self):
|
||||
return fs_str('{0}'.format(self.path.as_posix()))
|
||||
|
||||
def _filter_children(self):
|
||||
if self.only_python:
|
||||
children = filter_pythons(self.path)
|
||||
@@ -219,9 +264,8 @@ class PathEntry(BasePath):
|
||||
if not self.py_version:
|
||||
try:
|
||||
from .python import PythonVersion
|
||||
|
||||
self.py_version = PythonVersion.from_path(self.path)
|
||||
except ValueError:
|
||||
except (ValueError, InvalidPythonVersion):
|
||||
self.py_version = None
|
||||
return self.py_version
|
||||
|
||||
|
||||
+2
-1
@@ -1,4 +1,5 @@
|
||||
# -*- coding=utf-8 -*-
|
||||
from __future__ import print_function, absolute_import
|
||||
import attr
|
||||
from collections import defaultdict
|
||||
from . import BaseFinder
|
||||
@@ -30,7 +31,7 @@ class PyenvFinder(BaseFinder):
|
||||
version.get("is_prerelease"),
|
||||
version.get("is_devrelease"),
|
||||
)
|
||||
versions[version_tuple] = VersionPath.create(path=p, only_python=True)
|
||||
versions[version_tuple] = VersionPath.create(path=p.resolve(), only_python=True)
|
||||
return versions
|
||||
|
||||
@classmethod
|
||||
|
||||
+2
-1
@@ -1,4 +1,5 @@
|
||||
# -*- coding=utf-8 -*-
|
||||
from __future__ import print_function, absolute_import
|
||||
import attr
|
||||
import copy
|
||||
import platform
|
||||
@@ -122,7 +123,7 @@ class PythonVersion(object):
|
||||
if not path.is_python:
|
||||
raise ValueError("Not a valid python path: %s" % path.path)
|
||||
return
|
||||
py_version, _ = get_python_version(str(path.path))
|
||||
py_version = get_python_version(str(path.path))
|
||||
instance_dict = cls.parse(py_version)
|
||||
if not isinstance(instance_dict.get("version"), Version):
|
||||
raise ValueError("Not a valid python path: %s" % path.path)
|
||||
|
||||
+6
-1
@@ -1,10 +1,12 @@
|
||||
# -*- coding=utf-8 -*-
|
||||
from __future__ import print_function, absolute_import
|
||||
import attr
|
||||
import operator
|
||||
from collections import defaultdict
|
||||
from . import BaseFinder
|
||||
from .path import PathEntry
|
||||
from .python import PythonVersion
|
||||
from ..exceptions import InvalidPythonVersion
|
||||
from ..utils import ensure_path
|
||||
|
||||
|
||||
@@ -35,7 +37,10 @@ class WindowsFinder(BaseFinder):
|
||||
path = None
|
||||
for version_object in env_versions:
|
||||
path = ensure_path(version_object.info.install_path.__getattr__(""))
|
||||
py_version = PythonVersion.from_windows_launcher(version_object)
|
||||
try:
|
||||
py_version = PythonVersion.from_windows_launcher(version_object)
|
||||
except InvalidPythonVersion:
|
||||
continue
|
||||
self.version_list.append(py_version)
|
||||
base_dir = PathEntry.create(
|
||||
path,
|
||||
|
||||
+16
-7
@@ -1,4 +1,5 @@
|
||||
# -*- coding=utf-8 -*-
|
||||
from __future__ import print_function, absolute_import
|
||||
import os
|
||||
import six
|
||||
from .models import SystemPath
|
||||
@@ -39,15 +40,23 @@ class Finder(object):
|
||||
and not dev
|
||||
and isinstance(major, six.string_types)
|
||||
):
|
||||
from .models import PythonVersion
|
||||
version_dict = {}
|
||||
if "." in major:
|
||||
from .models import PythonVersion
|
||||
|
||||
version_dict = PythonVersion.parse(major)
|
||||
major = version_dict["major"]
|
||||
minor = version_dict["minor"]
|
||||
patch = version_dict["patch"]
|
||||
pre = version_dict["is_prerelease"]
|
||||
dev = version_dict["is_devrelease"]
|
||||
elif len(major) == 1:
|
||||
version_dict = {
|
||||
'major': int(major),
|
||||
'minor': None,
|
||||
'patch': None,
|
||||
'is_prerelease': False,
|
||||
'is_devrelease': False
|
||||
}
|
||||
major = version_dict.get("major", major)
|
||||
minor = version_dict.get("minor", minor)
|
||||
patch = version_dict.get("patch", patch)
|
||||
pre = version_dict.get("is_prerelease", pre)
|
||||
dev = version_dict.get("is_devrelease", dev)
|
||||
if os.name == "nt":
|
||||
match = self.windows_finder.find_python_version(
|
||||
major, minor=minor, patch=patch, pre=pre, dev=dev
|
||||
|
||||
Vendored
+28
-7
@@ -1,9 +1,13 @@
|
||||
# -*- coding=utf-8 -*-
|
||||
from __future__ import print_function, absolute_import
|
||||
import attr
|
||||
import locale
|
||||
import os
|
||||
import six
|
||||
import subprocess
|
||||
import sys
|
||||
from fnmatch import fnmatch
|
||||
from .exceptions import InvalidPythonVersion
|
||||
|
||||
try:
|
||||
from pathlib import Path
|
||||
@@ -26,23 +30,26 @@ def _run(cmd):
|
||||
:returns: A 2-tuple of (output, error)
|
||||
"""
|
||||
encoding = locale.getdefaultlocale()[1] or "utf-8"
|
||||
env = os.environ.copy()
|
||||
c = subprocess.Popen(
|
||||
cmd,
|
||||
encoding=encoding,
|
||||
env=env,
|
||||
universal_newlines=True,
|
||||
env=os.environ.copy(),
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
output, err = c.communicate()
|
||||
return output.strip(), err.strip()
|
||||
out, err = c.communicate()
|
||||
return out.decode(encoding).strip(), err.decode(encoding).strip()
|
||||
|
||||
|
||||
def get_python_version(path):
|
||||
"""Get python version string using subprocess from a given path."""
|
||||
version_cmd = [path, "-c", "import sys; print(sys.version.split()[0])"]
|
||||
return _run(version_cmd)
|
||||
try:
|
||||
out, _ = _run(version_cmd)
|
||||
except OSError:
|
||||
raise InvalidPythonVersion("%s is not a valid python path" % path)
|
||||
if not out:
|
||||
raise InvalidPythonVersion("%s is not a valid python path" % path)
|
||||
return out
|
||||
|
||||
|
||||
def optional_instance_of(cls):
|
||||
@@ -111,3 +118,17 @@ def filter_pythons(path):
|
||||
if not path.is_dir():
|
||||
return path if path_is_python(path) else None
|
||||
return filter(lambda x: path_is_python(x), path.iterdir())
|
||||
|
||||
|
||||
def fs_str(string):
|
||||
"""Encodes a string into the proper filesystem encoding
|
||||
|
||||
Borrowed from pip-tools
|
||||
"""
|
||||
if isinstance(string, str):
|
||||
return string
|
||||
assert not isinstance(string, bytes)
|
||||
return string.encode(_fs_encoding)
|
||||
|
||||
|
||||
_fs_encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
|
||||
|
||||
Vendored
-13
@@ -1,13 +0,0 @@
|
||||
Copyright (c) 2018, Tzu-ping Chung <uranusjr@gmail.com>
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
+2
-4
@@ -1,12 +1,10 @@
|
||||
import importlib
|
||||
import os
|
||||
|
||||
|
||||
__version__ = '1.1.0'
|
||||
from ._core import ShellDetectionFailure
|
||||
|
||||
|
||||
class ShellDetectionFailure(EnvironmentError):
|
||||
pass
|
||||
__version__ = '1.2.3.dev0'
|
||||
|
||||
|
||||
def detect_shell(pid=None, max_depth=6):
|
||||
|
||||
@@ -5,3 +5,7 @@ SHELL_NAMES = {
|
||||
'cmd', 'powershell', 'pwsh', # Microsoft.
|
||||
'elvish', 'xonsh', # More exotic.
|
||||
}
|
||||
|
||||
|
||||
class ShellDetectionFailure(EnvironmentError):
|
||||
pass
|
||||
Vendored
+1
-1
@@ -11,7 +11,7 @@ from ctypes import (
|
||||
)
|
||||
from ctypes.wintypes import DWORD, LONG
|
||||
|
||||
from ._consts import SHELL_NAMES
|
||||
from ._core import SHELL_NAMES
|
||||
|
||||
|
||||
ERROR_NO_MORE_FILES = 18
|
||||
|
||||
Vendored
-56
@@ -1,56 +0,0 @@
|
||||
import collections
|
||||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from ._consts import SHELL_NAMES
|
||||
|
||||
|
||||
Process = collections.namedtuple('Process', 'args pid ppid')
|
||||
|
||||
|
||||
def _get_process_mapping():
|
||||
"""Try to look up the process tree via the output of `ps`.
|
||||
"""
|
||||
output = subprocess.check_output([
|
||||
'ps', '-ww', '-o', 'pid=', '-o', 'ppid=', '-o', 'args=',
|
||||
])
|
||||
if not isinstance(output, str):
|
||||
output = output.decode(sys.stdout.encoding)
|
||||
processes = {}
|
||||
for line in output.split('\n'):
|
||||
try:
|
||||
pid, ppid, args = line.strip().split(maxsplit=2)
|
||||
except ValueError:
|
||||
continue
|
||||
processes[pid] = Process(
|
||||
args=tuple(shlex.split(args)), pid=pid, ppid=ppid,
|
||||
)
|
||||
return processes
|
||||
|
||||
|
||||
def get_shell(pid=None, max_depth=6):
|
||||
"""Get the shell that the supplied pid or os.getpid() is running in.
|
||||
"""
|
||||
pid = str(pid or os.getpid())
|
||||
mapping = _get_process_mapping()
|
||||
login_shell = os.environ.get('SHELL', '')
|
||||
for _ in range(max_depth):
|
||||
try:
|
||||
proc = mapping[pid]
|
||||
except KeyError:
|
||||
break
|
||||
name = os.path.basename(proc.args[0]).lower()
|
||||
if name in SHELL_NAMES:
|
||||
return (name, proc.args[0])
|
||||
elif proc.args[0].startswith('-'):
|
||||
# This is the login shell. Use the SHELL environ if possible
|
||||
# because it provides better information.
|
||||
if login_shell:
|
||||
name = login_shell.lower()
|
||||
else:
|
||||
name = proc.args[0][1:].lower()
|
||||
return (os.path.basename(name), name)
|
||||
pid = proc.ppid # Go up one level.
|
||||
return None
|
||||
+47
-25
@@ -1,16 +1,50 @@
|
||||
import os
|
||||
import platform
|
||||
|
||||
from .._consts import SHELL_NAMES
|
||||
from .._core import SHELL_NAMES, ShellDetectionFailure
|
||||
from . import proc, ps
|
||||
|
||||
|
||||
def _get_process_mapping():
|
||||
system = platform.system()
|
||||
if system == 'Linux':
|
||||
from . import linux as impl
|
||||
"""Select a way to obtain process information from the system.
|
||||
|
||||
* `/proc` is used if supported.
|
||||
* The system `ps` utility is used as a fallback option.
|
||||
"""
|
||||
for impl in (proc, ps):
|
||||
try:
|
||||
mapping = impl.get_process_mapping()
|
||||
except EnvironmentError:
|
||||
continue
|
||||
return mapping
|
||||
raise ShellDetectionFailure('compatible proc fs or ps utility is required')
|
||||
|
||||
|
||||
def _iter_process_command(mapping, pid, max_depth):
|
||||
"""Iterator to traverse up the tree, yielding `argv[0]` of each process.
|
||||
"""
|
||||
for _ in range(max_depth):
|
||||
try:
|
||||
proc = mapping[pid]
|
||||
except KeyError: # We've reached the root process. Give up.
|
||||
break
|
||||
try:
|
||||
cmd = proc.args[0]
|
||||
except IndexError: # Process has no name? Whatever, ignore it.
|
||||
pass
|
||||
else:
|
||||
yield cmd
|
||||
pid = proc.ppid # Go up one level.
|
||||
|
||||
|
||||
def _get_login_shell(proc_cmd):
|
||||
"""Form shell information from the SHELL environment variable if possible.
|
||||
"""
|
||||
login_shell = os.environ.get('SHELL', '')
|
||||
if login_shell:
|
||||
proc_cmd = login_shell
|
||||
else:
|
||||
from . import _default as impl
|
||||
return impl.get_process_mapping()
|
||||
proc_cmd = proc_cmd[1:]
|
||||
return (os.path.basename(proc_cmd).lower(), proc_cmd)
|
||||
|
||||
|
||||
def get_shell(pid=None, max_depth=6):
|
||||
@@ -18,22 +52,10 @@ def get_shell(pid=None, max_depth=6):
|
||||
"""
|
||||
pid = str(pid or os.getpid())
|
||||
mapping = _get_process_mapping()
|
||||
login_shell = os.environ.get('SHELL', '')
|
||||
for _ in range(max_depth):
|
||||
try:
|
||||
proc = mapping[pid]
|
||||
except KeyError:
|
||||
break
|
||||
name = os.path.basename(proc.args[0]).lower()
|
||||
if name in SHELL_NAMES:
|
||||
return (name, proc.args[0])
|
||||
elif proc.args[0].startswith('-'):
|
||||
# This is the login shell. Use the SHELL environ if possible
|
||||
# because it provides better information.
|
||||
if login_shell:
|
||||
name = login_shell.lower()
|
||||
else:
|
||||
name = proc.args[0][1:].lower()
|
||||
return (os.path.basename(name), name)
|
||||
pid = proc.ppid # Go up one level.
|
||||
for proc_cmd in _iter_process_command(mapping, pid, max_depth):
|
||||
if proc_cmd.startswith('-'): # Login shell! Let's use this.
|
||||
return _get_login_shell(proc_cmd)
|
||||
name = os.path.basename(proc_cmd).lower()
|
||||
if name in SHELL_NAMES: # The inner-most (non-login) shell.
|
||||
return (name, proc_cmd)
|
||||
return None
|
||||
|
||||
+3
@@ -0,0 +1,3 @@
|
||||
import collections
|
||||
|
||||
Process = collections.namedtuple('Process', 'args pid ppid')
|
||||
-27
@@ -1,27 +0,0 @@
|
||||
import collections
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
Process = collections.namedtuple('Process', 'args pid ppid')
|
||||
|
||||
|
||||
def get_process_mapping():
|
||||
"""Try to look up the process tree via the output of `ps`.
|
||||
"""
|
||||
output = subprocess.check_output([
|
||||
'ps', '-ww', '-o', 'pid=', '-o', 'ppid=', '-o', 'args=',
|
||||
])
|
||||
if not isinstance(output, str):
|
||||
output = output.decode(sys.stdout.encoding)
|
||||
processes = {}
|
||||
for line in output.split('\n'):
|
||||
try:
|
||||
pid, ppid, args = line.strip().split(None, 2)
|
||||
except ValueError:
|
||||
continue
|
||||
processes[pid] = Process(
|
||||
args=tuple(shlex.split(args)), pid=pid, ppid=ppid,
|
||||
)
|
||||
return processes
|
||||
-35
@@ -1,35 +0,0 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
from ._default import Process
|
||||
|
||||
|
||||
STAT_PPID = 3
|
||||
STAT_TTY = 6
|
||||
|
||||
|
||||
def get_process_mapping():
|
||||
"""Try to look up the process tree via Linux's /proc
|
||||
"""
|
||||
with open('/proc/{0}/stat'.format(os.getpid())) as f:
|
||||
self_tty = f.read().split()[STAT_TTY]
|
||||
processes = {}
|
||||
for pid in os.listdir('/proc'):
|
||||
if not pid.isdigit():
|
||||
continue
|
||||
try:
|
||||
stat = '/proc/{0}/stat'.format(pid)
|
||||
cmdline = '/proc/{0}/cmdline'.format(pid)
|
||||
with open(stat) as fstat, open(cmdline) as fcmdline:
|
||||
stat = re.findall(r'\(.+\)|\S+', fstat.read())
|
||||
cmd = fcmdline.read().split('\x00')[:-1]
|
||||
ppid = stat[STAT_PPID]
|
||||
tty = stat[STAT_TTY]
|
||||
if tty == self_tty:
|
||||
processes[pid] = Process(
|
||||
args=tuple(cmd), pid=pid, ppid=ppid,
|
||||
)
|
||||
except IOError:
|
||||
# Process has disappeared - just ignore it.
|
||||
continue
|
||||
return processes
|
||||
+62
@@ -0,0 +1,62 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
from ._core import Process
|
||||
|
||||
|
||||
STAT_PPID = 3
|
||||
STAT_TTY = 6
|
||||
|
||||
STAT_PATTERN = re.compile(r'\(.+\)|\S+')
|
||||
|
||||
|
||||
def detect_proc():
|
||||
"""Detect /proc filesystem style.
|
||||
|
||||
This checks the /proc/{pid} directory for possible formats. Returns one of
|
||||
the followings as str:
|
||||
|
||||
* `stat`: Linux-style, i.e. ``/proc/{pid}/stat``.
|
||||
* `status`: BSD-style, i.e. ``/proc/{pid}/status``.
|
||||
"""
|
||||
pid = os.getpid()
|
||||
for name in ('stat', 'status'):
|
||||
if os.path.exists(os.path.join('/proc', str(pid), name)):
|
||||
return name
|
||||
raise ProcFormatError('unsupported proc format')
|
||||
|
||||
|
||||
def _get_stat(pid, name):
|
||||
with open(os.path.join('/proc', str(pid), name)) as f:
|
||||
parts = STAT_PATTERN.findall(f.read())
|
||||
return parts[STAT_TTY], parts[STAT_PPID]
|
||||
|
||||
|
||||
def _get_cmdline(pid):
|
||||
with open(os.path.join('/proc', str(pid), 'cmdline')) as f:
|
||||
return tuple(f.read().split('\0')[:-1])
|
||||
|
||||
|
||||
class ProcFormatError(EnvironmentError):
|
||||
pass
|
||||
|
||||
|
||||
def get_process_mapping():
|
||||
"""Try to look up the process tree via the /proc interface.
|
||||
"""
|
||||
stat_name = detect_proc()
|
||||
self_tty = _get_stat(os.getpid(), stat_name)[0]
|
||||
processes = {}
|
||||
for pid in os.listdir('/proc'):
|
||||
if not pid.isdigit():
|
||||
continue
|
||||
try:
|
||||
tty, ppid = _get_stat(pid, stat_name)
|
||||
if tty != self_tty:
|
||||
continue
|
||||
args = _get_cmdline(pid)
|
||||
processes[pid] = Process(args=args, pid=pid, ppid=ppid)
|
||||
except IOError:
|
||||
# Process has disappeared - just ignore it.
|
||||
continue
|
||||
return processes
|
||||
+36
@@ -0,0 +1,36 @@
|
||||
import errno
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from ._core import Process
|
||||
|
||||
|
||||
class PsNotAvailable(EnvironmentError):
|
||||
pass
|
||||
|
||||
|
||||
def get_process_mapping():
|
||||
"""Try to look up the process tree via the output of `ps`.
|
||||
"""
|
||||
try:
|
||||
output = subprocess.check_output([
|
||||
'ps', '-ww', '-o', 'pid=', '-o', 'ppid=', '-o', 'args=',
|
||||
])
|
||||
except OSError as e: # Python 2-compatible FileNotFoundError.
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
raise PsNotAvailable('ps not found')
|
||||
if not isinstance(output, str):
|
||||
encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
|
||||
output = output.decode(encoding)
|
||||
processes = {}
|
||||
for line in output.split('\n'):
|
||||
try:
|
||||
pid, ppid, args = line.strip().split(None, 2)
|
||||
except ValueError:
|
||||
continue
|
||||
processes[pid] = Process(
|
||||
args=tuple(shlex.split(args)), pid=pid, ppid=ppid,
|
||||
)
|
||||
return processes
|
||||
Vendored
+1
-1
@@ -34,7 +34,7 @@ requirementslib==1.0.9
|
||||
pyparsing==2.2.0
|
||||
pytoml==0.1.16
|
||||
requirements-parser==0.2.0
|
||||
shellingham==1.1.0
|
||||
shellingham==1.2.3
|
||||
six==1.11.0
|
||||
semver==2.8.0
|
||||
shutilwhich==1.1.0
|
||||
|
||||
Reference in New Issue
Block a user