mirror of
https://github.com/kennethreitz/pipenv.git
synced 2026-06-05 22:50:18 +00:00
Vendor Shellingham
This commit is contained in:
Vendored
+13
@@ -0,0 +1,13 @@
|
||||
Copyright (c) 2018, Tzu-ping Chung <uranusjr@gmail.com>
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
+27
@@ -0,0 +1,27 @@
|
||||
import importlib
|
||||
import os
|
||||
|
||||
|
||||
__version__ = '1.0.0.dev1'
|
||||
|
||||
|
||||
class ShellDetectionFailure(EnvironmentError):
|
||||
pass
|
||||
|
||||
|
||||
def detect_shell(pid=None, max_depth=6):
|
||||
name = os.name
|
||||
try:
|
||||
impl = importlib.import_module('.' + name, __name__)
|
||||
except ImportError:
|
||||
raise RuntimeError(
|
||||
'Shell detection not implemented for {0!r}'.format(name),
|
||||
)
|
||||
try:
|
||||
get_shell = impl.get_shell
|
||||
except AttributeError:
|
||||
raise RuntimeError('get_shell not implemented for {0!r}'.format(name))
|
||||
shell = get_shell(pid, max_depth=max_depth)
|
||||
if shell:
|
||||
return shell
|
||||
raise ShellDetectionFailure()
|
||||
Vendored
+7
@@ -0,0 +1,7 @@
|
||||
SHELL_NAMES = {
|
||||
'sh', 'bash', 'dash', # Bourne.
|
||||
'csh', 'tcsh', # C.
|
||||
'ksh', 'zsh', 'fish', # Common alternatives.
|
||||
'cmd', 'powershell', 'pwsh', # Microsoft.
|
||||
'elvish', 'xonsh', # More exotic.
|
||||
}
|
||||
Vendored
+134
@@ -0,0 +1,134 @@
|
||||
# Code based on the winappdbg project http://winappdbg.sourceforge.net/
|
||||
# (BSD License) - adapted from Celery by Dan Ryan (dan@danryan.co)
|
||||
# https://github.com/celery/celery/blob/2.5-archived/celery/concurrency/processes/_win.py
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from ctypes import (
|
||||
byref, sizeof, windll, Structure, WinError, POINTER,
|
||||
c_size_t, c_char, c_void_p
|
||||
)
|
||||
from ctypes.wintypes import DWORD, LONG
|
||||
|
||||
from ._consts import SHELL_NAMES
|
||||
|
||||
|
||||
ERROR_NO_MORE_FILES = 18
|
||||
INVALID_HANDLE_VALUE = c_void_p(-1).value
|
||||
|
||||
|
||||
if sys.version_info[0] < 3:
|
||||
string_types = (str, unicode) # noqa
|
||||
else:
|
||||
string_types = (str,)
|
||||
|
||||
|
||||
class PROCESSENTRY32(Structure):
|
||||
_fields_ = [
|
||||
('dwSize', DWORD),
|
||||
('cntUsage', DWORD),
|
||||
('th32ProcessID', DWORD),
|
||||
('th32DefaultHeapID', c_size_t),
|
||||
('th32ModuleID', DWORD),
|
||||
('cntThreads', DWORD),
|
||||
('th32ParentProcessID', DWORD),
|
||||
('pcPriClassBase', LONG),
|
||||
('dwFlags', DWORD),
|
||||
('szExeFile', c_char * 260),
|
||||
]
|
||||
|
||||
|
||||
LPPROCESSENTRY32 = POINTER(PROCESSENTRY32)
|
||||
|
||||
|
||||
def CreateToolhelp32Snapshot(dwFlags=2, th32ProcessID=0):
|
||||
hSnapshot = windll.kernel32.CreateToolhelp32Snapshot(
|
||||
dwFlags,
|
||||
th32ProcessID
|
||||
)
|
||||
if hSnapshot == INVALID_HANDLE_VALUE:
|
||||
raise WinError()
|
||||
return hSnapshot
|
||||
|
||||
|
||||
def Process32First(hSnapshot):
|
||||
pe = PROCESSENTRY32()
|
||||
pe.dwSize = sizeof(PROCESSENTRY32)
|
||||
success = windll.kernel32.Process32First(hSnapshot, byref(pe))
|
||||
if not success:
|
||||
if windll.kernel32.GetLastError() == ERROR_NO_MORE_FILES:
|
||||
return
|
||||
raise WinError()
|
||||
return pe
|
||||
|
||||
|
||||
def Process32Next(hSnapshot, pe=None):
|
||||
if pe is None:
|
||||
pe = PROCESSENTRY32()
|
||||
pe.dwSize = sizeof(PROCESSENTRY32)
|
||||
success = windll.kernel32.Process32Next(hSnapshot, byref(pe))
|
||||
if not success:
|
||||
if windll.kernel32.GetLastError() == ERROR_NO_MORE_FILES:
|
||||
return
|
||||
raise WinError()
|
||||
return pe
|
||||
|
||||
|
||||
def get_all_processes():
|
||||
"""Return a dictionary of properties about all processes.
|
||||
>>> get_all_processes()
|
||||
{
|
||||
1509: {
|
||||
'parent_pid': 1201,
|
||||
'executable': 'C:\\Program\\\\ Files\\Python36\\python.exe'
|
||||
}
|
||||
}
|
||||
"""
|
||||
h_process = CreateToolhelp32Snapshot()
|
||||
pids = {}
|
||||
pe = Process32First(h_process)
|
||||
while pe:
|
||||
pids[pe.th32ProcessID] = {
|
||||
'executable': str(pe.szExeFile.decode('utf-8'))
|
||||
}
|
||||
if pe.th32ParentProcessID:
|
||||
pids[pe.th32ProcessID]['parent_pid'] = pe.th32ParentProcessID
|
||||
pe = Process32Next(h_process, pe)
|
||||
|
||||
return pids
|
||||
|
||||
|
||||
def _get_executable(process_dict):
|
||||
try:
|
||||
executable = process_dict.get('executable')
|
||||
except (AttributeError, TypeError):
|
||||
return None
|
||||
if isinstance(executable, string_types):
|
||||
executable = executable.lower().rsplit('.', 1)[0]
|
||||
return executable
|
||||
|
||||
|
||||
def get_shell(pid=None, max_depth=6):
|
||||
"""Get the shell that the supplied pid or os.getpid() is running in.
|
||||
"""
|
||||
if not pid:
|
||||
pid = os.getpid()
|
||||
processes = get_all_processes()
|
||||
|
||||
def check_parent(pid, lvl=0):
|
||||
ppid = processes[pid].get('parent_pid')
|
||||
shell_name = _get_executable(processes.get(ppid))
|
||||
if shell_name in SHELL_NAMES:
|
||||
return (shell_name, processes[ppid]['executable'])
|
||||
if lvl >= max_depth:
|
||||
return None
|
||||
return check_parent(ppid, lvl=lvl + 1)
|
||||
|
||||
shell_name = _get_executable(processes.get(pid))
|
||||
if shell_name in SHELL_NAMES:
|
||||
return (shell_name, processes[pid]['executable'])
|
||||
try:
|
||||
return check_parent(pid)
|
||||
except KeyError:
|
||||
return None
|
||||
Vendored
+56
@@ -0,0 +1,56 @@
|
||||
import collections
|
||||
import os
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from ._consts import SHELL_NAMES
|
||||
|
||||
|
||||
Process = collections.namedtuple('Process', 'args pid ppid')
|
||||
|
||||
|
||||
def _get_process_mapping():
|
||||
"""Try to look up the process tree via the output of `ps`.
|
||||
"""
|
||||
output = subprocess.check_output([
|
||||
'ps', '-ww', '-o', 'pid=', '-o', 'ppid=', '-o', 'args=',
|
||||
])
|
||||
if not isinstance(output, str):
|
||||
output = output.decode(sys.stdout.encoding)
|
||||
processes = {}
|
||||
for line in output.split('\n'):
|
||||
try:
|
||||
pid, ppid, args = line.strip().split(maxsplit=2)
|
||||
except ValueError:
|
||||
continue
|
||||
processes[pid] = Process(
|
||||
args=tuple(shlex.split(args)), pid=pid, ppid=ppid,
|
||||
)
|
||||
return processes
|
||||
|
||||
|
||||
def get_shell(pid=None, max_depth=6):
|
||||
"""Get the shell that the supplied pid or os.getpid() is running in.
|
||||
"""
|
||||
pid = str(pid or os.getpid())
|
||||
mapping = _get_process_mapping()
|
||||
login_shell = os.environ.get('SHELL', '')
|
||||
for _ in range(max_depth):
|
||||
try:
|
||||
proc = mapping[pid]
|
||||
except KeyError:
|
||||
break
|
||||
name = os.path.basename(proc.args[0]).lower()
|
||||
if name in SHELL_NAMES:
|
||||
return (name, proc.args[0])
|
||||
elif proc.args[0].startswith('-'):
|
||||
# This is the login shell. Use the SHELL environ if possible
|
||||
# because it provides better information.
|
||||
if login_shell:
|
||||
name = login_shell.lower()
|
||||
else:
|
||||
name = proc.args[0][1:].lower()
|
||||
return (os.path.basename(name), name)
|
||||
pid = proc.ppid # Go up one level.
|
||||
return None
|
||||
Vendored
+1
@@ -32,6 +32,7 @@ requests==2.18.4
|
||||
certifi==2018.1.18
|
||||
requirements-parser==0.2.0
|
||||
requirementslib
|
||||
shellingham
|
||||
six==1.10.0
|
||||
semver==2.7.8
|
||||
shutilwhich==1.1.0
|
||||
|
||||
Reference in New Issue
Block a user