remove tests from psutil

This commit is contained in:
Nate Prewitt
2017-04-05 20:48:50 -06:00
parent 570d151727
commit 225ba9f0c7
13 changed files with 0 additions and 8334 deletions
-19
View File
@@ -1,19 +0,0 @@
Instructions for running tests
==============================
- The recommended way to run tests (also on Windows) is to cd into psutil root
directory and run ``make test``.
- Depending on the Python version, dependencies for running tests include
``ipaddress``, ``mock`` and ``unittest2`` modules.
On Windows also ``pywin32`` and ``wmi`` modules are recommended
(although optional).
Run ``make setup-dev-env`` to install all deps (also on Windows).
- To run tests on all supported Python versions install tox
(``pip install tox``) then run ``tox`` from psutil root directory.
- Every time a commit is pushed tests are automatically run on Travis
(Linux, OSX) and appveyor (Windows):
- https://travis-ci.org/giampaolo/psutil/
- https://ci.appveyor.com/project/giampaolo/psutil
-736
View File
@@ -1,736 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Test utilities.
"""
from __future__ import print_function
import atexit
import contextlib
import errno
import functools
import ipaddress # python >= 3.3 / requires "pip install ipaddress"
import os
import re
import shutil
import socket
import stat
import subprocess
import sys
import tempfile
import textwrap
import threading
import time
import warnings
from socket import AF_INET
from socket import SOCK_DGRAM
from socket import SOCK_STREAM
try:
from unittest import mock # py3
except ImportError:
import mock # NOQA - requires "pip install mock"
import psutil
from psutil import LINUX
from psutil import POSIX
from psutil import WINDOWS
from psutil._compat import PY3
from psutil._compat import unicode
from psutil._compat import which
if sys.version_info < (2, 7):
import unittest2 as unittest # requires "pip install unittest2"
else:
import unittest
if sys.version_info >= (3, 4):
import enum
else:
enum = None
if PY3:
import importlib
# python <=3.3
if not hasattr(importlib, 'reload'):
import imp as importlib
else:
import imp as importlib
__all__ = [
# constants
'APPVEYOR', 'DEVNULL', 'GLOBAL_TIMEOUT', 'MEMORY_TOLERANCE', 'NO_RETRIES',
'PYPY', 'PYTHON', 'RLIMIT_SUPPORT', 'ROOT_DIR', 'SCRIPTS_DIR',
'TESTFILE_PREFIX', 'TESTFN', 'TESTFN_UNICODE', 'TOX', 'TRAVIS',
'VALID_PROC_STATUSES', 'VERBOSITY',
# classes
'ThreadTask'
# test utils
'check_connection_ntuple', 'check_net_address', 'unittest', 'cleanup',
'skip_on_access_denied', 'skip_on_not_implemented', 'retry_before_failing',
'run_test_module_by_name',
# fs utils
'chdir', 'safe_rmpath', 'create_exe',
# subprocesses
'pyrun', 'reap_children', 'get_test_subprocess',
# os
'get_winver', 'get_kernel_version',
# sync primitives
'call_until', 'wait_for_pid', 'wait_for_file',
# others
'warn', 'decode_path', 'encode_path',
]
# ===================================================================
# --- constants
# ===================================================================
# conf for retry_before_failing() decorator
NO_RETRIES = 10
# bytes tolerance for OS memory related tests
MEMORY_TOLERANCE = 500 * 1024 # 500KB
# the timeout used in functions which have to wait
GLOBAL_TIMEOUT = 3
AF_INET6 = getattr(socket, "AF_INET6")
AF_UNIX = getattr(socket, "AF_UNIX", None)
PYTHON = os.path.realpath(sys.executable)
DEVNULL = open(os.devnull, 'r+')
TESTFILE_PREFIX = '$testfn'
TESTFN = os.path.join(os.path.realpath(os.getcwd()), TESTFILE_PREFIX)
_TESTFN = TESTFN + '-internal'
TESTFN_UNICODE = TESTFN + "-ƒőő"
if not PY3:
try:
TESTFN_UNICODE = unicode(TESTFN, sys.getfilesystemencoding())
except UnicodeDecodeError:
TESTFN_UNICODE = TESTFN + "-???"
TOX = os.getenv('TOX') or '' in ('1', 'true')
PYPY = '__pypy__' in sys.builtin_module_names
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
'..', '..'))
SCRIPTS_DIR = os.path.join(ROOT_DIR, 'scripts')
WIN_VISTA = (6, 0, 0) if WINDOWS else None
VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil)
if x.startswith('STATUS_')]
# whether we're running this test suite on Travis (https://travis-ci.org/)
TRAVIS = bool(os.environ.get('TRAVIS'))
# whether we're running this test suite on Appveyor for Windows
# (http://www.appveyor.com/)
APPVEYOR = bool(os.environ.get('APPVEYOR'))
if TRAVIS or APPVEYOR:
GLOBAL_TIMEOUT = GLOBAL_TIMEOUT * 4
VERBOSITY = 1 if os.getenv('SILENT') or TOX else 2
# assertRaisesRegexp renamed to assertRaisesRegex in 3.3; add support
# for the new name
if not hasattr(unittest.TestCase, 'assertRaisesRegex'):
unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
# ===================================================================
# --- classes
# ===================================================================
class ThreadTask(threading.Thread):
"""A thread object used for running process thread tests."""
def __init__(self):
threading.Thread.__init__(self)
self._running = False
self._interval = None
self._flag = threading.Event()
def __repr__(self):
name = self.__class__.__name__
return '<%s running=%s at %#x>' % (name, self._running, id(self))
def start(self, interval=0.001):
"""Start thread and keep it running until an explicit
stop() request. Polls for shutdown every 'timeout' seconds.
"""
if self._running:
raise ValueError("already started")
self._interval = interval
threading.Thread.start(self)
self._flag.wait()
def run(self):
self._running = True
self._flag.set()
while self._running:
time.sleep(self._interval)
def stop(self):
"""Stop thread execution and and waits until it is stopped."""
if not self._running:
raise ValueError("already stopped")
self._running = False
self.join()
# ===================================================================
# --- subprocesses
# ===================================================================
_subprocesses_started = set()
def get_test_subprocess(cmd=None, **kwds):
"""Return a subprocess.Popen object to use in tests.
By default stdout and stderr are redirected to /dev/null and the
python interpreter is used as test process.
It also attemps to make sure the process is in a reasonably
initialized state.
"""
kwds.setdefault("stdin", DEVNULL)
kwds.setdefault("stdout", DEVNULL)
if cmd is None:
safe_rmpath(_TESTFN)
pyline = "from time import sleep;"
pyline += "open(r'%s', 'w').close();" % _TESTFN
pyline += "sleep(60)"
cmd = [PYTHON, "-c", pyline]
sproc = subprocess.Popen(cmd, **kwds)
wait_for_file(_TESTFN, delete_file=True, empty=True)
else:
sproc = subprocess.Popen(cmd, **kwds)
wait_for_pid(sproc.pid)
_subprocesses_started.add(sproc)
return sproc
_testfiles = []
def pyrun(src):
"""Run python 'src' code in a separate interpreter.
Return interpreter subprocess.
"""
if PY3:
src = bytes(src, 'ascii')
with tempfile.NamedTemporaryFile(
prefix=TESTFILE_PREFIX, delete=False) as f:
_testfiles.append(f.name)
f.write(src)
f.flush()
subp = get_test_subprocess([PYTHON, f.name], stdout=None,
stderr=None)
wait_for_pid(subp.pid)
return subp
def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
"""run cmd in a subprocess and return its output.
raises RuntimeError on error.
"""
p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise RuntimeError(stderr)
if stderr:
if PY3:
stderr = str(stderr, sys.stderr.encoding or
sys.getfilesystemencoding())
warn(stderr)
if PY3:
stdout = str(stdout, sys.stdout.encoding or
sys.getfilesystemencoding())
return stdout.strip()
def reap_children(recursive=False):
"""Terminate and wait() any subprocess started by this test suite
and ensure that no zombies stick around to hog resources and
create problems when looking for refleaks.
If resursive is True it also tries to terminate and wait()
all grandchildren started by this process.
"""
# Get the children here, before terminating the children sub
# processes as we don't want to lose the intermediate reference
# in case of grandchildren.
if recursive:
children = psutil.Process().children(recursive=True)
else:
children = []
# Terminate subprocess.Popen instances "cleanly" by closing their
# fds and wiat()ing for them in order to avoid zombies.
subprocs = _subprocesses_started.copy()
_subprocesses_started.clear()
for subp in subprocs:
try:
subp.terminate()
except OSError as err:
if err.errno != errno.ESRCH:
raise
if subp.stdout:
subp.stdout.close()
if subp.stderr:
subp.stderr.close()
try:
# Flushing a BufferedWriter may raise an error.
if subp.stdin:
subp.stdin.close()
finally:
# Wait for the process to terminate, to avoid zombies.
try:
subp.wait()
except OSError as err:
if err.errno != errno.ECHILD:
raise
# Terminates grandchildren.
if children:
for p in children:
try:
p.terminate()
except psutil.NoSuchProcess:
pass
gone, alive = psutil.wait_procs(children, timeout=GLOBAL_TIMEOUT)
for p in alive:
warn("couldn't terminate process %r; attempting kill()" % p)
try:
p.kill()
except psutil.NoSuchProcess:
pass
_, alive = psutil.wait_procs(alive, timeout=GLOBAL_TIMEOUT)
if alive:
for p in alive:
warn("process %r survived kill()" % p)
# ===================================================================
# --- OS
# ===================================================================
if not POSIX:
def get_kernel_version():
return ()
else:
def get_kernel_version():
"""Return a tuple such as (2, 6, 36)."""
s = ""
uname = os.uname()[2]
for c in uname:
if c.isdigit() or c == '.':
s += c
else:
break
if not s:
raise ValueError("can't parse %r" % uname)
minor = 0
micro = 0
nums = s.split('.')
major = int(nums[0])
if len(nums) >= 2:
minor = int(nums[1])
if len(nums) >= 3:
micro = int(nums[2])
return (major, minor, micro)
if LINUX:
RLIMIT_SUPPORT = get_kernel_version() >= (2, 6, 36)
else:
RLIMIT_SUPPORT = False
if not WINDOWS:
def get_winver():
raise NotImplementedError("not a Windows OS")
else:
def get_winver():
wv = sys.getwindowsversion()
if hasattr(wv, 'service_pack_major'): # python >= 2.7
sp = wv.service_pack_major or 0
else:
r = re.search("\s\d$", wv[4])
if r:
sp = int(r.group(0))
else:
sp = 0
return (wv[0], wv[1], sp)
# ===================================================================
# --- sync primitives
# ===================================================================
class retry(object):
"""A retry decorator."""
def __init__(self,
exception=Exception,
timeout=None,
retries=None,
interval=0.001,
logfun=lambda s: print(s, file=sys.stderr),
):
if timeout and retries:
raise ValueError("timeout and retries args are mutually exclusive")
self.exception = exception
self.timeout = timeout
self.retries = retries
self.interval = interval
self.logfun = logfun
def __iter__(self):
if self.timeout:
stop_at = time.time() + self.timeout
while time.time() < stop_at:
yield
elif self.retries:
for _ in range(self.retries):
yield
else:
while True:
yield
def sleep(self):
if self.interval is not None:
time.sleep(self.interval)
def __call__(self, fun):
@functools.wraps(fun)
def wrapper(*args, **kwargs):
exc = None
for _ in self:
try:
return fun(*args, **kwargs)
except self.exception as _:
exc = _
if self.logfun is not None:
self.logfun(exc)
self.sleep()
else:
if PY3:
raise exc
else:
raise
# This way the user of the decorated function can change config
# parameters.
wrapper.decorator = self
return wrapper
@retry(exception=psutil.NoSuchProcess, logfun=None, timeout=GLOBAL_TIMEOUT,
interval=0.001)
def wait_for_pid(pid):
"""Wait for pid to show up in the process list then return.
Used in the test suite to give time the sub process to initialize.
"""
psutil.Process(pid)
if WINDOWS:
# give it some more time to allow better initialization
time.sleep(0.01)
@retry(exception=(EnvironmentError, AssertionError), logfun=None,
timeout=GLOBAL_TIMEOUT, interval=0.001)
def wait_for_file(fname, delete_file=True, empty=False):
"""Wait for a file to be written on disk with some content."""
with open(fname, "rb") as f:
data = f.read()
if not empty:
assert data
if delete_file:
os.remove(fname)
return data
@retry(exception=AssertionError, logfun=None, timeout=GLOBAL_TIMEOUT,
interval=0.001)
def call_until(fun, expr):
"""Keep calling function for timeout secs and exit if eval()
expression is True.
"""
ret = fun()
assert eval(expr)
return ret
# ===================================================================
# --- fs
# ===================================================================
def safe_rmpath(path):
"Convenience function for removing temporary test files or dirs"
try:
st = os.stat(path)
if stat.S_ISDIR(st.st_mode):
os.rmdir(path)
else:
os.remove(path)
except OSError as err:
if err.errno != errno.ENOENT:
raise
def safe_mkdir(dir):
"Convenience function for creating a directory"
try:
os.mkdir(dir)
except OSError as err:
if err.errno != errno.EEXIST:
raise
@contextlib.contextmanager
def chdir(dirname):
"Context manager which temporarily changes the current directory."
curdir = os.getcwd()
try:
os.chdir(dirname)
yield
finally:
os.chdir(curdir)
def create_exe(outpath, c_code=None):
"""Creates an executable file in the given location."""
assert not os.path.exists(outpath), outpath
if which("gcc"):
if c_code is None:
c_code = textwrap.dedent(
"""
#include <unistd.h>
int main() {
pause();
return 1;
}
""")
with tempfile.NamedTemporaryFile(
suffix='.c', delete=False, mode='wt') as f:
f.write(c_code)
try:
subprocess.check_call(["gcc", f.name, "-o", outpath])
finally:
safe_rmpath(f.name)
else:
# fallback - use python's executable
if c_code is not None:
raise ValueError(
"can't specify c_code arg as gcc is not installed")
shutil.copyfile(sys.executable, outpath)
if POSIX:
st = os.stat(outpath)
os.chmod(outpath, st.st_mode | stat.S_IEXEC)
# ===================================================================
# --- testing
# ===================================================================
class TestCase(unittest.TestCase):
def __str__(self):
return "%s.%s.%s" % (
self.__class__.__module__, self.__class__.__name__,
self._testMethodName)
# Hack that overrides default unittest.TestCase in order to print
# a full path representation of the single unit tests being run.
unittest.TestCase = TestCase
def retry_before_failing(retries=NO_RETRIES):
"""Decorator which runs a test function and retries N times before
actually failing.
"""
return retry(exception=AssertionError, timeout=None, retries=retries)
def run_test_module_by_name(name):
# testmodules = [os.path.splitext(x)[0] for x in os.listdir(HERE)
# if x.endswith('.py') and x.startswith('test_')]
name = os.path.splitext(os.path.basename(name))[0]
suite = unittest.TestSuite()
suite.addTest(unittest.defaultTestLoader.loadTestsFromName(name))
result = unittest.TextTestRunner(verbosity=VERBOSITY).run(suite)
success = result.wasSuccessful()
sys.exit(0 if success else 1)
def skip_on_access_denied(only_if=None):
"""Decorator to Ignore AccessDenied exceptions."""
def decorator(fun):
@functools.wraps(fun)
def wrapper(*args, **kwargs):
try:
return fun(*args, **kwargs)
except psutil.AccessDenied:
if only_if is not None:
if not only_if:
raise
msg = "%r was skipped because it raised AccessDenied" \
% fun.__name__
raise unittest.SkipTest(msg)
return wrapper
return decorator
def skip_on_not_implemented(only_if=None):
"""Decorator to Ignore NotImplementedError exceptions."""
def decorator(fun):
@functools.wraps(fun)
def wrapper(*args, **kwargs):
try:
return fun(*args, **kwargs)
except NotImplementedError:
if only_if is not None:
if not only_if:
raise
msg = "%r was skipped because it raised NotImplementedError" \
% fun.__name__
raise unittest.SkipTest(msg)
return wrapper
return decorator
def check_net_address(addr, family):
"""Check a net address validity. Supported families are IPv4,
IPv6 and MAC addresses.
"""
if enum and PY3:
assert isinstance(family, enum.IntEnum), family
if family == AF_INET:
octs = [int(x) for x in addr.split('.')]
assert len(octs) == 4, addr
for num in octs:
assert 0 <= num <= 255, addr
if not PY3:
addr = unicode(addr)
ipaddress.IPv4Address(addr)
elif family == AF_INET6:
assert isinstance(addr, str), addr
if not PY3:
addr = unicode(addr)
ipaddress.IPv6Address(addr)
elif family == psutil.AF_LINK:
assert re.match('([a-fA-F0-9]{2}[:|\-]?){6}', addr) is not None, addr
else:
raise ValueError("unknown family %r", family)
def check_connection_ntuple(conn):
"""Check validity of a connection namedtuple."""
valid_conn_states = [getattr(psutil, x) for x in dir(psutil) if
x.startswith('CONN_')]
assert conn[0] == conn.fd
assert conn[1] == conn.family
assert conn[2] == conn.type
assert conn[3] == conn.laddr
assert conn[4] == conn.raddr
assert conn[5] == conn.status
assert conn.type in (SOCK_STREAM, SOCK_DGRAM), repr(conn.type)
assert conn.family in (AF_INET, AF_INET6, AF_UNIX), repr(conn.family)
assert conn.status in valid_conn_states, conn.status
# check IP address and port sanity
for addr in (conn.laddr, conn.raddr):
if not addr:
continue
if conn.family in (AF_INET, AF_INET6):
assert isinstance(addr, tuple), addr
ip, port = addr
assert isinstance(port, int), port
assert 0 <= port <= 65535, port
check_net_address(ip, conn.family)
elif conn.family == AF_UNIX:
assert isinstance(addr, (str, None)), addr
else:
raise ValueError("unknown family %r", conn.family)
if conn.family in (AF_INET, AF_INET6):
# actually try to bind the local socket; ignore IPv6
# sockets as their address might be represented as
# an IPv4-mapped-address (e.g. "::127.0.0.1")
# and that's rejected by bind()
if conn.family == AF_INET:
s = socket.socket(conn.family, conn.type)
with contextlib.closing(s):
try:
s.bind((conn.laddr[0], 0))
except socket.error as err:
if err.errno != errno.EADDRNOTAVAIL:
raise
elif conn.family == AF_UNIX:
assert not conn.raddr, repr(conn.raddr)
assert conn.status == psutil.CONN_NONE, conn.status
if getattr(conn, 'fd', -1) != -1:
assert conn.fd > 0, conn
if hasattr(socket, 'fromfd') and not WINDOWS:
try:
dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
except (socket.error, OSError) as err:
if err.args[0] != errno.EBADF:
raise
else:
with contextlib.closing(dupsock):
assert dupsock.family == conn.family
assert dupsock.type == conn.type
def cleanup():
for name in os.listdir('.'):
if name.startswith(TESTFILE_PREFIX):
try:
safe_rmpath(name)
except UnicodeEncodeError as exc:
warn(exc)
for path in _testfiles:
safe_rmpath(path)
atexit.register(cleanup)
atexit.register(lambda: DEVNULL.close())
# ===================================================================
# --- others
# ===================================================================
def warn(msg):
"""Raise a warning msg."""
warnings.warn(msg, UserWarning)
# In Python 3 paths are unicode objects by default. Surrogate escapes
# are used to handle non-character data.
def encode_path(path):
if PY3:
return path.encode(sys.getfilesystemencoding(),
errors="surrogateescape")
else:
return path
def decode_path(path):
if PY3:
return path.decode(sys.getfilesystemencoding(),
errors="surrogateescape")
else:
return path
-37
View File
@@ -1,37 +0,0 @@
#!/usr/bin/env python
# Copyright (C) 2007-2016 Giampaolo Rodola' <g.rodola@gmail.com>.
# Use of this source code is governed by MIT license that can be
# found in the LICENSE file.
"""Script for running all test files (except memory leaks tests)."""
import os
import sys
from psutil.tests import unittest
from psutil.tests import VERBOSITY
def get_suite():
HERE = os.path.abspath(os.path.dirname(__file__))
testmodules = [os.path.splitext(x)[0] for x in os.listdir(HERE)
if x.endswith('.py') and x.startswith('test_') and not
x.startswith('test_memory_leaks')]
suite = unittest.TestSuite()
for tm in testmodules:
# ...so that "make test" will print the full test paths
tm = "psutil.tests.%s" % tm
suite.addTest(unittest.defaultTestLoader.loadTestsFromName(tm))
return suite
def main():
# run tests
result = unittest.TextTestRunner(verbosity=VERBOSITY).run(get_suite())
success = result.wasSuccessful()
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()
-482
View File
@@ -1,482 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# TODO: (FreeBSD) add test for comparing connections with 'sockstat' cmd.
"""Tests specific to all BSD platforms."""
import datetime
import os
import re
import subprocess
import sys
import time
import psutil
from psutil import BSD
from psutil import FREEBSD
from psutil import NETBSD
from psutil import OPENBSD
from psutil._compat import PY3
from psutil.tests import get_test_subprocess
from psutil.tests import MEMORY_TOLERANCE
from psutil.tests import reap_children
from psutil.tests import retry_before_failing
from psutil.tests import run_test_module_by_name
from psutil.tests import sh
from psutil.tests import unittest
from psutil.tests import which
if BSD:
PAGESIZE = os.sysconf("SC_PAGE_SIZE")
if os.getuid() == 0: # muse requires root privileges
MUSE_AVAILABLE = which('muse')
else:
MUSE_AVAILABLE = False
else:
MUSE_AVAILABLE = False
def sysctl(cmdline):
"""Expects a sysctl command with an argument and parse the result
returning only the value of interest.
"""
result = sh("sysctl " + cmdline)
if FREEBSD:
result = result[result.find(": ") + 2:]
elif OPENBSD or NETBSD:
result = result[result.find("=") + 1:]
try:
return int(result)
except ValueError:
return result
def muse(field):
"""Thin wrapper around 'muse' cmdline utility."""
out = sh('muse')
for line in out.split('\n'):
if line.startswith(field):
break
else:
raise ValueError("line not found")
return int(line.split()[1])
# =====================================================================
# --- All BSD*
# =====================================================================
@unittest.skipUnless(BSD, "BSD only")
class BSDSpecificTestCase(unittest.TestCase):
"""Generic tests common to all BSD variants."""
@classmethod
def setUpClass(cls):
cls.pid = get_test_subprocess().pid
@classmethod
def tearDownClass(cls):
reap_children()
def test_process_create_time(self):
cmdline = "ps -o lstart -p %s" % self.pid
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0]
if PY3:
output = str(output, sys.stdout.encoding)
start_ps = output.replace('STARTED', '').strip()
start_psutil = psutil.Process(self.pid).create_time()
start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
time.localtime(start_psutil))
self.assertEqual(start_ps, start_psutil)
def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -k "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total = int(total) * 1024
used = int(used) * 1024
free = int(free) * 1024
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(part.device, dev)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.free, free))
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
@unittest.skipIf(not which('sysctl'), "sysctl cmd not available")
def test_cpu_count_logical(self):
syst = sysctl("hw.ncpu")
self.assertEqual(psutil.cpu_count(logical=True), syst)
@unittest.skipIf(not which('sysctl'), "sysctl cmd not available")
def test_virtual_memory_total(self):
num = sysctl('hw.physmem')
self.assertEqual(num, psutil.virtual_memory().total)
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('mtu (\d+)', out)[0]))
# =====================================================================
# --- FreeBSD
# =====================================================================
@unittest.skipUnless(FREEBSD, "FREEBSD only")
class FreeBSDSpecificTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.pid = get_test_subprocess().pid
@classmethod
def tearDownClass(cls):
reap_children()
@retry_before_failing()
def test_proc_memory_maps(self):
out = sh('procstat -v %s' % self.pid)
maps = psutil.Process(self.pid).memory_maps(grouped=False)
lines = out.split('\n')[1:]
while lines:
line = lines.pop()
fields = line.split()
_, start, stop, perms, res = fields[:5]
map = maps.pop()
self.assertEqual("%s-%s" % (start, stop), map.addr)
self.assertEqual(int(res), map.rss)
if not map.path.startswith('['):
self.assertEqual(fields[10], map.path)
def test_proc_exe(self):
out = sh('procstat -b %s' % self.pid)
self.assertEqual(psutil.Process(self.pid).exe(),
out.split('\n')[1].split()[-1])
def test_proc_cmdline(self):
out = sh('procstat -c %s' % self.pid)
self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()),
' '.join(out.split('\n')[1].split()[2:]))
def test_proc_uids_gids(self):
out = sh('procstat -s %s' % self.pid)
euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8]
p = psutil.Process(self.pid)
uids = p.uids()
gids = p.gids()
self.assertEqual(uids.real, int(ruid))
self.assertEqual(uids.effective, int(euid))
self.assertEqual(uids.saved, int(suid))
self.assertEqual(gids.real, int(rgid))
self.assertEqual(gids.effective, int(egid))
self.assertEqual(gids.saved, int(sgid))
@retry_before_failing()
def test_proc_ctx_switches(self):
tested = []
out = sh('procstat -r %s' % self.pid)
p = psutil.Process(self.pid)
for line in out.split('\n'):
line = line.lower().strip()
if ' voluntary context' in line:
pstat_value = int(line.split()[-1])
psutil_value = p.num_ctx_switches().voluntary
self.assertEqual(pstat_value, psutil_value)
tested.append(None)
elif ' involuntary context' in line:
pstat_value = int(line.split()[-1])
psutil_value = p.num_ctx_switches().involuntary
self.assertEqual(pstat_value, psutil_value)
tested.append(None)
if len(tested) != 2:
raise RuntimeError("couldn't find lines match in procstat out")
@retry_before_failing()
def test_proc_cpu_times(self):
tested = []
out = sh('procstat -r %s' % self.pid)
p = psutil.Process(self.pid)
for line in out.split('\n'):
line = line.lower().strip()
if 'user time' in line:
pstat_value = float('0.' + line.split()[-1].split('.')[-1])
psutil_value = p.cpu_times().user
self.assertEqual(pstat_value, psutil_value)
tested.append(None)
elif 'system time' in line:
pstat_value = float('0.' + line.split()[-1].split('.')[-1])
psutil_value = p.cpu_times().system
self.assertEqual(pstat_value, psutil_value)
tested.append(None)
if len(tested) != 2:
raise RuntimeError("couldn't find lines match in procstat out")
# --- virtual_memory(); tests against sysctl
@retry_before_failing()
def test_vmem_active(self):
syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE
self.assertAlmostEqual(psutil.virtual_memory().active, syst,
delta=MEMORY_TOLERANCE)
@retry_before_failing()
def test_vmem_inactive(self):
syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE
self.assertAlmostEqual(psutil.virtual_memory().inactive, syst,
delta=MEMORY_TOLERANCE)
@retry_before_failing()
def test_vmem_wired(self):
syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE
self.assertAlmostEqual(psutil.virtual_memory().wired, syst,
delta=MEMORY_TOLERANCE)
@retry_before_failing()
def test_vmem_cached(self):
syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE
self.assertAlmostEqual(psutil.virtual_memory().cached, syst,
delta=MEMORY_TOLERANCE)
@retry_before_failing()
def test_vmem_free(self):
syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE
self.assertAlmostEqual(psutil.virtual_memory().free, syst,
delta=MEMORY_TOLERANCE)
@retry_before_failing()
def test_vmem_buffers(self):
syst = sysctl("vfs.bufspace")
self.assertAlmostEqual(psutil.virtual_memory().buffers, syst,
delta=MEMORY_TOLERANCE)
# --- virtual_memory(); tests against muse
@unittest.skipUnless(MUSE_AVAILABLE, "muse not installed")
def test_muse_vmem_total(self):
num = muse('Total')
self.assertEqual(psutil.virtual_memory().total, num)
@unittest.skipUnless(MUSE_AVAILABLE, "muse not installed")
@retry_before_failing()
def test_muse_vmem_active(self):
num = muse('Active')
self.assertAlmostEqual(psutil.virtual_memory().active, num,
delta=MEMORY_TOLERANCE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse not installed")
@retry_before_failing()
def test_muse_vmem_inactive(self):
num = muse('Inactive')
self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
delta=MEMORY_TOLERANCE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse not installed")
@retry_before_failing()
def test_muse_vmem_wired(self):
num = muse('Wired')
self.assertAlmostEqual(psutil.virtual_memory().wired, num,
delta=MEMORY_TOLERANCE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse not installed")
@retry_before_failing()
def test_muse_vmem_cached(self):
num = muse('Cache')
self.assertAlmostEqual(psutil.virtual_memory().cached, num,
delta=MEMORY_TOLERANCE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse not installed")
@retry_before_failing()
def test_muse_vmem_free(self):
num = muse('Free')
self.assertAlmostEqual(psutil.virtual_memory().free, num,
delta=MEMORY_TOLERANCE)
@unittest.skipUnless(MUSE_AVAILABLE, "muse not installed")
@retry_before_failing()
def test_muse_vmem_buffers(self):
num = muse('Buffer')
self.assertAlmostEqual(psutil.virtual_memory().buffers, num,
delta=MEMORY_TOLERANCE)
def test_cpu_stats_ctx_switches(self):
self.assertAlmostEqual(psutil.cpu_stats().ctx_switches,
sysctl('vm.stats.sys.v_swtch'), delta=1000)
def test_cpu_stats_interrupts(self):
self.assertAlmostEqual(psutil.cpu_stats().interrupts,
sysctl('vm.stats.sys.v_intr'), delta=1000)
def test_cpu_stats_soft_interrupts(self):
self.assertAlmostEqual(psutil.cpu_stats().soft_interrupts,
sysctl('vm.stats.sys.v_soft'), delta=1000)
def test_cpu_stats_syscalls(self):
self.assertAlmostEqual(psutil.cpu_stats().syscalls,
sysctl('vm.stats.sys.v_syscall'), delta=1000)
# def test_cpu_stats_traps(self):
# self.assertAlmostEqual(psutil.cpu_stats().traps,
# sysctl('vm.stats.sys.v_trap'), delta=1000)
# --- others
def test_boot_time(self):
s = sysctl('sysctl kern.boottime')
s = s[s.find(" sec = ") + 7:]
s = s[:s.find(',')]
btime = int(s)
self.assertEqual(btime, psutil.boot_time())
# --- sensors_battery
@unittest.skipUnless(
hasattr(psutil, "sensors_battery") and psutil.sensors_battery(),
"no battery")
def test_sensors_battery(self):
def secs2hours(secs):
m, s = divmod(secs, 60)
h, m = divmod(m, 60)
return "%d:%02d" % (h, m)
out = sh("acpiconf -i 0")
fields = dict([(x.split('\t')[0], x.split('\t')[-1])
for x in out.split("\n")])
metrics = psutil.sensors_battery()
percent = int(fields['Remaining capacity:'].replace('%', ''))
remaining_time = fields['Remaining time:']
self.assertEqual(metrics.percent, percent)
if remaining_time == 'unknown':
self.assertEqual(metrics.secsleft, psutil.POWER_TIME_UNLIMITED)
else:
self.assertEqual(secs2hours(metrics.secsleft), remaining_time)
def test_sensors_battery_against_sysctl(self):
self.assertEqual(psutil.sensors_battery().percent,
sysctl("hw.acpi.battery.life"))
self.assertEqual(psutil.sensors_battery().power_plugged,
sysctl("hw.acpi.acline") == 1)
secsleft = psutil.sensors_battery().secsleft
if secsleft < 0:
self.assertEqual(sysctl("hw.acpi.battery.time"), -1)
else:
self.assertEqual(secsleft, sysctl("hw.acpi.battery.time") * 60)
# =====================================================================
# --- OpenBSD
# =====================================================================
@unittest.skipUnless(OPENBSD, "OPENBSD only")
class OpenBSDSpecificTestCase(unittest.TestCase):
def test_boot_time(self):
s = sysctl('kern.boottime')
sys_bt = datetime.datetime.strptime(s, "%a %b %d %H:%M:%S %Y")
psutil_bt = datetime.datetime.fromtimestamp(psutil.boot_time())
self.assertEqual(sys_bt, psutil_bt)
# =====================================================================
# --- NetBSD
# =====================================================================
@unittest.skipUnless(NETBSD, "NETBSD only")
class NetBSDSpecificTestCase(unittest.TestCase):
def parse_meminfo(self, look_for):
with open('/proc/meminfo', 'rb') as f:
for line in f:
if line.startswith(look_for):
return int(line.split()[1]) * 1024
raise ValueError("can't find %s" % look_for)
def test_vmem_total(self):
self.assertEqual(
psutil.virtual_memory().total, self.parse_meminfo("MemTotal:"))
def test_vmem_free(self):
self.assertAlmostEqual(
psutil.virtual_memory().free, self.parse_meminfo("MemFree:"),
delta=MEMORY_TOLERANCE)
def test_vmem_buffers(self):
self.assertAlmostEqual(
psutil.virtual_memory().buffers, self.parse_meminfo("Buffers:"),
delta=MEMORY_TOLERANCE)
def test_vmem_shared(self):
self.assertAlmostEqual(
psutil.virtual_memory().shared, self.parse_meminfo("MemShared:"),
delta=MEMORY_TOLERANCE)
def test_swapmem_total(self):
self.assertAlmostEqual(
psutil.swap_memory().total, self.parse_meminfo("SwapTotal:"),
delta=MEMORY_TOLERANCE)
def test_swapmem_free(self):
self.assertAlmostEqual(
psutil.swap_memory().free, self.parse_meminfo("SwapFree:"),
delta=MEMORY_TOLERANCE)
def test_swapmem_used(self):
smem = psutil.swap_memory()
self.assertEqual(smem.used, smem.total - smem.free)
def test_cpu_stats_interrupts(self):
with open('/proc/stat', 'rb') as f:
for line in f:
if line.startswith(b'intr'):
interrupts = int(line.split()[1])
break
else:
raise ValueError("couldn't find line")
self.assertAlmostEqual(
psutil.cpu_stats().interrupts, interrupts, delta=1000)
def test_cpu_stats_ctx_switches(self):
with open('/proc/stat', 'rb') as f:
for line in f:
if line.startswith(b'ctxt'):
ctx_switches = int(line.split()[1])
break
else:
raise ValueError("couldn't find line")
self.assertAlmostEqual(
psutil.cpu_stats().ctx_switches, ctx_switches, delta=1000)
if __name__ == '__main__':
run_test_module_by_name(__file__)
File diff suppressed because it is too large Load Diff
-611
View File
@@ -1,611 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Tests for detecting function memory leaks (typically the ones
implemented in C). It does so by calling a function many times and
checking whether process memory usage keeps increasing between
calls or over time.
Note that this may produce false positives (especially on Windows
for some reason).
"""
import errno
import functools
import gc
import os
import socket
import threading
import time
import psutil
import psutil._common
from psutil import FREEBSD
from psutil import LINUX
from psutil import OPENBSD
from psutil import OSX
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
from psutil._common import supports_ipv6
from psutil._compat import xrange
from psutil.tests import get_test_subprocess
from psutil.tests import reap_children
from psutil.tests import RLIMIT_SUPPORT
from psutil.tests import run_test_module_by_name
from psutil.tests import safe_rmpath
from psutil.tests import TESTFN
from psutil.tests import TRAVIS
from psutil.tests import unittest
LOOPS = 1000
MEMORY_TOLERANCE = 4096
RETRY_FOR = 3
SKIP_PYTHON_IMPL = True if TRAVIS else False
cext = psutil._psplatform.cext
thisproc = psutil.Process()
# ===================================================================
# utils
# ===================================================================
def skip_if_linux():
return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL,
"worthless on LINUX (pure python)")
def bytes2human(n):
"""
http://code.activestate.com/recipes/578019
>>> bytes2human(10000)
'9.8K'
>>> bytes2human(100001221)
'95.4M'
"""
symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
prefix = {}
for i, s in enumerate(symbols):
prefix[s] = 1 << (i + 1) * 10
for s in reversed(symbols):
if n >= prefix[s]:
value = float(n) / prefix[s]
return '%.2f%s' % (value, s)
return "%sB" % n
class TestMemLeak(unittest.TestCase):
"""Base framework class which calls a function many times and
produces a failure if process memory usage keeps increasing
between calls or over time.
"""
tolerance = MEMORY_TOLERANCE
loops = LOOPS
retry_for = RETRY_FOR
def setUp(self):
gc.collect()
def execute(self, fun, *args, **kwargs):
"""Test a callable."""
def call_many_times():
for x in xrange(loops):
self._call(fun, *args, **kwargs)
del x
gc.collect()
tolerance = kwargs.pop('tolerance_', None) or self.tolerance
loops = kwargs.pop('loops_', None) or self.loops
retry_for = kwargs.pop('retry_for_', None) or self.retry_for
self._call(fun, *args, **kwargs)
self.assertEqual(gc.garbage, [])
self.assertEqual(threading.active_count(), 1)
# Get 2 distinct memory samples, before and after having
# called fun repeadetly.
# step 1
call_many_times()
mem1 = self._get_mem()
# step 2
call_many_times()
mem2 = self._get_mem()
diff1 = mem2 - mem1
if diff1 > tolerance:
# This doesn't necessarily mean we have a leak yet.
# At this point we assume that after having called the
# function so many times the memory usage is stabilized
# and if there are no leaks it should not increase
# anymore.
# Let's keep calling fun for 3 more seconds and fail if
# we notice any difference.
ncalls = 0
stop_at = time.time() + retry_for
while time.time() <= stop_at:
self._call(fun, *args, **kwargs)
ncalls += 1
del stop_at
gc.collect()
mem3 = self._get_mem()
diff2 = mem3 - mem2
if mem3 > mem2:
# failure
self.fail("+%s after %s calls, +%s after another %s calls" % (
bytes2human(diff1),
loops,
bytes2human(diff2),
ncalls
))
def execute_w_exc(self, exc, fun, *args, **kwargs):
"""Convenience function which tests a callable raising
an exception.
"""
def call():
self.assertRaises(exc, fun, *args, **kwargs)
self.execute(call)
@staticmethod
def _get_mem():
# By using USS memory it seems it's less likely to bump
# into false positives.
if LINUX or WINDOWS or OSX:
return thisproc.memory_full_info().uss
else:
return thisproc.memory_info().rss
@staticmethod
def _call(fun, *args, **kwargs):
fun(*args, **kwargs)
# ===================================================================
# Process class
# ===================================================================
class TestProcessObjectLeaks(TestMemLeak):
"""Test leaks of Process class methods."""
proc = thisproc
def test_coverage(self):
skip = set((
"pid", "as_dict", "children", "cpu_affinity", "cpu_percent",
"ionice", "is_running", "kill", "memory_info_ex", "memory_percent",
"nice", "oneshot", "parent", "rlimit", "send_signal", "suspend",
"terminate", "wait"))
for name in dir(psutil.Process):
if name.startswith('_'):
continue
if name in skip:
continue
self.assertTrue(hasattr(self, "test_" + name), msg=name)
@skip_if_linux()
def test_name(self):
self.execute(self.proc.name)
@skip_if_linux()
def test_cmdline(self):
self.execute(self.proc.cmdline)
@skip_if_linux()
def test_exe(self):
self.execute(self.proc.exe)
@skip_if_linux()
def test_ppid(self):
self.execute(self.proc.ppid)
@unittest.skipUnless(POSIX, "POSIX only")
@skip_if_linux()
def test_uids(self):
self.execute(self.proc.uids)
@unittest.skipUnless(POSIX, "POSIX only")
@skip_if_linux()
def test_gids(self):
self.execute(self.proc.gids)
@skip_if_linux()
def test_status(self):
self.execute(self.proc.status)
def test_nice_get(self):
self.execute(self.proc.nice)
def test_nice_set(self):
niceness = thisproc.nice()
self.execute(self.proc.nice, niceness)
@unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
"platform not supported")
def test_ionice_get(self):
self.execute(self.proc.ionice)
@unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
"platform not supported")
def test_ionice_set(self):
if WINDOWS:
value = thisproc.ionice()
self.execute(self.proc.ionice, value)
else:
self.execute(self.proc.ionice, psutil.IOPRIO_CLASS_NONE)
fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)
self.execute_w_exc(OSError, fun)
@unittest.skipIf(OSX or SUNOS, "platform not supported")
@skip_if_linux()
def test_io_counters(self):
self.execute(self.proc.io_counters)
@unittest.skipIf(POSIX, "worthless on POSIX")
def test_username(self):
self.execute(self.proc.username)
@skip_if_linux()
def test_create_time(self):
self.execute(self.proc.create_time)
@skip_if_linux()
def test_num_threads(self):
self.execute(self.proc.num_threads)
@unittest.skipUnless(WINDOWS, "WINDOWS only")
def test_num_handles(self):
self.execute(self.proc.num_handles)
@unittest.skipUnless(POSIX, "POSIX only")
@skip_if_linux()
def test_num_fds(self):
self.execute(self.proc.num_fds)
@skip_if_linux()
def test_num_ctx_switches(self):
self.execute(self.proc.num_ctx_switches)
@skip_if_linux()
def test_threads(self):
self.execute(self.proc.threads)
@skip_if_linux()
def test_cpu_times(self):
self.execute(self.proc.cpu_times)
@skip_if_linux()
@unittest.skipUnless(hasattr(psutil.Process, "cpu_num"),
"platform not supported")
def test_cpu_num(self):
self.execute(self.proc.cpu_num)
@skip_if_linux()
def test_memory_info(self):
self.execute(self.proc.memory_info)
# also available on Linux but it's pure python
@unittest.skipUnless(OSX or WINDOWS,
"platform not supported")
def test_memory_full_info(self):
self.execute(self.proc.memory_full_info)
@unittest.skipUnless(POSIX, "POSIX only")
@skip_if_linux()
def test_terminal(self):
self.execute(self.proc.terminal)
@unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
"worthless on POSIX (pure python)")
def test_resume(self):
self.execute(self.proc.resume)
@skip_if_linux()
def test_cwd(self):
self.execute(self.proc.cwd)
@unittest.skipUnless(WINDOWS or LINUX or FREEBSD,
"platform not supported")
def test_cpu_affinity_get(self):
self.execute(self.proc.cpu_affinity)
@unittest.skipUnless(WINDOWS or LINUX or FREEBSD,
"platform not supported")
def test_cpu_affinity_set(self):
affinity = thisproc.cpu_affinity()
self.execute(self.proc.cpu_affinity, affinity)
if not TRAVIS:
self.execute_w_exc(ValueError, self.proc.cpu_affinity, [-1])
@skip_if_linux()
def test_open_files(self):
safe_rmpath(TESTFN) # needed after UNIX socket test has run
with open(TESTFN, 'w'):
self.execute(self.proc.open_files)
# OSX implementation is unbelievably slow
@unittest.skipIf(OSX, "too slow on OSX")
@unittest.skipIf(OPENBSD, "platform not supported")
@skip_if_linux()
def test_memory_maps(self):
self.execute(self.proc.memory_maps)
@unittest.skipUnless(LINUX, "LINUX only")
@unittest.skipUnless(LINUX and RLIMIT_SUPPORT, "LINUX >= 2.6.36 only")
def test_rlimit_get(self):
self.execute(self.proc.rlimit, psutil.RLIMIT_NOFILE)
@unittest.skipUnless(LINUX, "LINUX only")
@unittest.skipUnless(LINUX and RLIMIT_SUPPORT, "LINUX >= 2.6.36 only")
def test_rlimit_set(self):
limit = thisproc.rlimit(psutil.RLIMIT_NOFILE)
self.execute(self.proc.rlimit, psutil.RLIMIT_NOFILE, limit)
self.execute_w_exc(OSError, self.proc.rlimit, -1)
@skip_if_linux()
# Windows implementation is based on a single system-wide
# function (tested later).
@unittest.skipIf(WINDOWS, "worthless on WINDOWS")
def test_connections(self):
def create_socket(family, type):
sock = socket.socket(family, type)
sock.bind(('', 0))
if type == socket.SOCK_STREAM:
sock.listen(1)
return sock
socks = []
socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM))
socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM))
if supports_ipv6():
socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM))
socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM))
if hasattr(socket, 'AF_UNIX'):
safe_rmpath(TESTFN)
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(TESTFN)
s.listen(1)
socks.append(s)
kind = 'all'
# TODO: UNIX sockets are temporarily implemented by parsing
# 'pfiles' cmd output; we don't want that part of the code to
# be executed.
if SUNOS:
kind = 'inet'
try:
self.execute(self.proc.connections, kind)
finally:
for s in socks:
s.close()
@unittest.skipUnless(hasattr(psutil.Process, 'environ'),
"platform not supported")
def test_environ(self):
self.execute(self.proc.environ)
@unittest.skipUnless(WINDOWS, "WINDOWS only")
def test_proc_info(self):
self.execute(cext.proc_info, os.getpid())
class TestTerminatedProcessLeaks(TestProcessObjectLeaks):
"""Repeat the tests above looking for leaks occurring when dealing
with terminated processes raising NoSuchProcess exception.
The C functions are still invoked but will follow different code
paths. We'll check those code paths.
"""
@classmethod
def setUpClass(cls):
super(TestTerminatedProcessLeaks, cls).setUpClass()
p = get_test_subprocess()
cls.proc = psutil.Process(p.pid)
cls.proc.kill()
cls.proc.wait()
@classmethod
def tearDownClass(cls):
super(TestTerminatedProcessLeaks, cls).tearDownClass()
reap_children()
def _call(self, fun, *args, **kwargs):
try:
fun(*args, **kwargs)
except psutil.NoSuchProcess:
pass
if WINDOWS:
def test_kill(self):
self.execute(self.proc.kill)
def test_terminate(self):
self.execute(self.proc.terminate)
def test_suspend(self):
self.execute(self.proc.suspend)
def test_resume(self):
self.execute(self.proc.resume)
def test_wait(self):
self.execute(self.proc.wait)
def test_proc_info(self):
# test dual implementation
def call():
try:
return cext.proc_info(self.proc.pid)
except OSError as err:
if err.errno != errno.ESRCH:
raise
self.execute(call)
# ===================================================================
# system APIs
# ===================================================================
class TestModuleFunctionsLeaks(TestMemLeak):
"""Test leaks of psutil module functions."""
def test_coverage(self):
skip = set((
"version_info", "__version__", "process_iter", "wait_procs",
"cpu_percent", "cpu_times_percent", "cpu_count"))
for name in psutil.__all__:
if not name.islower():
continue
if name in skip:
continue
self.assertTrue(hasattr(self, "test_" + name), msg=name)
# --- cpu
@skip_if_linux()
def test_cpu_count_logical(self):
self.execute(psutil.cpu_count, logical=True)
@skip_if_linux()
def test_cpu_count_physical(self):
self.execute(psutil.cpu_count, logical=False)
@skip_if_linux()
def test_cpu_times(self):
self.execute(psutil.cpu_times)
@skip_if_linux()
def test_per_cpu_times(self):
self.execute(psutil.cpu_times, percpu=True)
def test_cpu_stats(self):
self.execute(psutil.cpu_stats)
@skip_if_linux()
@unittest.skipUnless(hasattr(psutil, "cpu_freq"), "platform not supported")
def test_cpu_freq(self):
self.execute(psutil.cpu_freq)
# --- mem
def test_virtual_memory(self):
self.execute(psutil.virtual_memory)
# TODO: remove this skip when this gets fixed
@unittest.skipIf(SUNOS,
"worthless on SUNOS (uses a subprocess)")
def test_swap_memory(self):
self.execute(psutil.swap_memory)
@unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
"worthless on POSIX (pure python)")
def test_pid_exists(self):
self.execute(psutil.pid_exists, os.getpid())
# --- disk
@unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
"worthless on POSIX (pure python)")
def test_disk_usage(self):
self.execute(psutil.disk_usage, '.')
def test_disk_partitions(self):
self.execute(psutil.disk_partitions)
@unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
'/proc/diskstats not available on this Linux version')
@skip_if_linux()
def test_disk_io_counters(self):
self.execute(psutil.disk_io_counters)
# --- proc
@skip_if_linux()
def test_pids(self):
self.execute(psutil.pids)
# --- net
@skip_if_linux()
def test_net_io_counters(self):
self.execute(psutil.net_io_counters)
@unittest.skipIf(LINUX,
"worthless on Linux (pure python)")
@unittest.skipIf(OSX and os.getuid() != 0, "need root access")
def test_net_connections(self):
self.execute(psutil.net_connections)
def test_net_if_addrs(self):
# Note: verified that on Windows this was a false positive.
self.execute(psutil.net_if_addrs,
tolerance_=80 * 1024 if WINDOWS else None)
@unittest.skipIf(TRAVIS, "EPERM on travis")
def test_net_if_stats(self):
self.execute(psutil.net_if_stats)
# --- sensors
@unittest.skipUnless(hasattr(psutil, "sensors_battery"),
"platform not supported")
@skip_if_linux()
def test_sensors_battery(self):
self.execute(psutil.sensors_battery)
@skip_if_linux()
@unittest.skipUnless(hasattr(psutil, "sensors_temperatures"),
"platform not supported")
def test_sensors_temperatures(self):
self.execute(psutil.sensors_temperatures)
@unittest.skipUnless(hasattr(psutil, "sensors_fans"),
"platform not supported")
@skip_if_linux()
def test_sensors_fans(self):
self.execute(psutil.sensors_fans)
# --- others
@skip_if_linux()
def test_boot_time(self):
self.execute(psutil.boot_time)
# XXX - on Windows this produces a false positive
@unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows")
def test_users(self):
self.execute(psutil.users)
if WINDOWS:
# --- win services
def test_win_service_iter(self):
self.execute(cext.winservice_enumerate)
def test_win_service_get_config(self):
name = next(psutil.win_service_iter()).name()
self.execute(cext.winservice_query_config, name)
def test_win_service_get_status(self):
name = next(psutil.win_service_iter()).name()
self.execute(cext.winservice_query_status, name)
def test_win_service_get_description(self):
name = next(psutil.win_service_iter()).name()
self.execute(cext.winservice_query_descr, name)
if __name__ == '__main__':
run_test_module_by_name(__file__)
-649
View File
@@ -1,649 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Miscellaneous tests.
"""
import ast
import errno
import imp
import json
import os
import pickle
import psutil
import socket
import stat
import sys
from psutil import LINUX
from psutil import NETBSD
from psutil import OPENBSD
from psutil import OSX
from psutil import POSIX
from psutil import WINDOWS
from psutil._common import memoize
from psutil._common import memoize_when_activated
from psutil._common import supports_ipv6
from psutil.tests import APPVEYOR
from psutil.tests import chdir
from psutil.tests import get_test_subprocess
from psutil.tests import importlib
from psutil.tests import mock
from psutil.tests import reap_children
from psutil.tests import retry
from psutil.tests import ROOT_DIR
from psutil.tests import run_test_module_by_name
from psutil.tests import safe_rmpath
from psutil.tests import SCRIPTS_DIR
from psutil.tests import sh
from psutil.tests import TESTFN
from psutil.tests import TOX
from psutil.tests import TRAVIS
from psutil.tests import unittest
from psutil.tests import wait_for_file
from psutil.tests import wait_for_pid
class TestMisc(unittest.TestCase):
"""Misc / generic tests."""
def test_process__repr__(self, func=repr):
p = psutil.Process()
r = func(p)
self.assertIn("psutil.Process", r)
self.assertIn("pid=%s" % p.pid, r)
self.assertIn("name=", r)
self.assertIn(p.name(), r)
with mock.patch.object(psutil.Process, "name",
side_effect=psutil.ZombieProcess(os.getpid())):
p = psutil.Process()
r = func(p)
self.assertIn("pid=%s" % p.pid, r)
self.assertIn("zombie", r)
self.assertNotIn("name=", r)
with mock.patch.object(psutil.Process, "name",
side_effect=psutil.NoSuchProcess(os.getpid())):
p = psutil.Process()
r = func(p)
self.assertIn("pid=%s" % p.pid, r)
self.assertIn("terminated", r)
self.assertNotIn("name=", r)
with mock.patch.object(psutil.Process, "name",
side_effect=psutil.AccessDenied(os.getpid())):
p = psutil.Process()
r = func(p)
self.assertIn("pid=%s" % p.pid, r)
self.assertNotIn("name=", r)
def test_process__str__(self):
self.test_process__repr__(func=str)
def test_no_such_process__repr__(self, func=repr):
self.assertEqual(
repr(psutil.NoSuchProcess(321)),
"psutil.NoSuchProcess process no longer exists (pid=321)")
self.assertEqual(
repr(psutil.NoSuchProcess(321, name='foo')),
"psutil.NoSuchProcess process no longer exists (pid=321, "
"name='foo')")
self.assertEqual(
repr(psutil.NoSuchProcess(321, msg='foo')),
"psutil.NoSuchProcess foo")
def test_zombie_process__repr__(self, func=repr):
self.assertEqual(
repr(psutil.ZombieProcess(321)),
"psutil.ZombieProcess process still exists but it's a zombie "
"(pid=321)")
self.assertEqual(
repr(psutil.ZombieProcess(321, name='foo')),
"psutil.ZombieProcess process still exists but it's a zombie "
"(pid=321, name='foo')")
self.assertEqual(
repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
"psutil.ZombieProcess process still exists but it's a zombie "
"(pid=321, name='foo', ppid=1)")
self.assertEqual(
repr(psutil.ZombieProcess(321, msg='foo')),
"psutil.ZombieProcess foo")
def test_access_denied__repr__(self, func=repr):
self.assertEqual(
repr(psutil.AccessDenied(321)),
"psutil.AccessDenied (pid=321)")
self.assertEqual(
repr(psutil.AccessDenied(321, name='foo')),
"psutil.AccessDenied (pid=321, name='foo')")
self.assertEqual(
repr(psutil.AccessDenied(321, msg='foo')),
"psutil.AccessDenied foo")
def test_timeout_expired__repr__(self, func=repr):
self.assertEqual(
repr(psutil.TimeoutExpired(321)),
"psutil.TimeoutExpired timeout after 321 seconds")
self.assertEqual(
repr(psutil.TimeoutExpired(321, pid=111)),
"psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
self.assertEqual(
repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
"psutil.TimeoutExpired timeout after 321 seconds "
"(pid=111, name='foo')")
def test_process__eq__(self):
p1 = psutil.Process()
p2 = psutil.Process()
self.assertEqual(p1, p2)
p2._ident = (0, 0)
self.assertNotEqual(p1, p2)
self.assertNotEqual(p1, 'foo')
def test_process__hash__(self):
s = set([psutil.Process(), psutil.Process()])
self.assertEqual(len(s), 1)
def test__all__(self):
dir_psutil = dir(psutil)
for name in dir_psutil:
if name in ('callable', 'error', 'namedtuple', 'tests',
'long', 'test', 'NUM_CPUS', 'BOOT_TIME',
'TOTAL_PHYMEM'):
continue
if not name.startswith('_'):
try:
__import__(name)
except ImportError:
if name not in psutil.__all__:
fun = getattr(psutil, name)
if fun is None:
continue
if (fun.__doc__ is not None and
'deprecated' not in fun.__doc__.lower()):
self.fail('%r not in psutil.__all__' % name)
# Import 'star' will break if __all__ is inconsistent, see:
# https://github.com/giampaolo/psutil/issues/656
# Can't do `from psutil import *` as it won't work on python 3
# so we simply iterate over __all__.
for name in psutil.__all__:
self.assertIn(name, dir_psutil)
def test_version(self):
self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
psutil.__version__)
def test_process_as_dict_no_new_names(self):
# See https://github.com/giampaolo/psutil/issues/813
p = psutil.Process()
p.foo = '1'
self.assertNotIn('foo', p.as_dict())
def test_memoize(self):
@memoize
def foo(*args, **kwargs):
"foo docstring"
calls.append(None)
return (args, kwargs)
calls = []
# no args
for x in range(2):
ret = foo()
expected = ((), {})
self.assertEqual(ret, expected)
self.assertEqual(len(calls), 1)
# with args
for x in range(2):
ret = foo(1)
expected = ((1, ), {})
self.assertEqual(ret, expected)
self.assertEqual(len(calls), 2)
# with args + kwargs
for x in range(2):
ret = foo(1, bar=2)
expected = ((1, ), {'bar': 2})
self.assertEqual(ret, expected)
self.assertEqual(len(calls), 3)
# clear cache
foo.cache_clear()
ret = foo()
expected = ((), {})
self.assertEqual(ret, expected)
self.assertEqual(len(calls), 4)
# docstring
self.assertEqual(foo.__doc__, "foo docstring")
def test_memoize_when_activated(self):
class Foo:
@memoize_when_activated
def foo(self):
calls.append(None)
f = Foo()
calls = []
f.foo()
f.foo()
self.assertEqual(len(calls), 2)
# activate
calls = []
f.foo.cache_activate()
f.foo()
f.foo()
self.assertEqual(len(calls), 1)
# deactivate
calls = []
f.foo.cache_deactivate()
f.foo()
f.foo()
self.assertEqual(len(calls), 2)
def test_parse_environ_block(self):
from psutil._common import parse_environ_block
def k(s):
return s.upper() if WINDOWS else s
self.assertEqual(parse_environ_block("a=1\0"),
{k("a"): "1"})
self.assertEqual(parse_environ_block("a=1\0b=2\0\0"),
{k("a"): "1", k("b"): "2"})
self.assertEqual(parse_environ_block("a=1\0b=\0\0"),
{k("a"): "1", k("b"): ""})
# ignore everything after \0\0
self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"),
{k("a"): "1", k("b"): "2"})
# ignore everything that is not an assignment
self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"})
self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"})
# do not fail if the block is incomplete
self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"})
def test_supports_ipv6(self):
if supports_ipv6():
with mock.patch('psutil._common.socket') as s:
s.has_ipv6 = False
assert not supports_ipv6()
with mock.patch('psutil._common.socket.socket',
side_effect=socket.error) as s:
assert not supports_ipv6()
assert s.called
with mock.patch('psutil._common.socket.socket',
side_effect=socket.gaierror) as s:
assert not supports_ipv6()
assert s.called
with mock.patch('psutil._common.socket.socket.bind',
side_effect=socket.gaierror) as s:
assert not supports_ipv6()
assert s.called
else:
if hasattr(socket, 'AF_INET6'):
with self.assertRaises(Exception):
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind(("::1", 0))
def test_isfile_strict(self):
from psutil._common import isfile_strict
this_file = os.path.abspath(__file__)
assert isfile_strict(this_file)
assert not isfile_strict(os.path.dirname(this_file))
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EPERM, "foo")):
self.assertRaises(OSError, isfile_strict, this_file)
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EACCES, "foo")):
self.assertRaises(OSError, isfile_strict, this_file)
with mock.patch('psutil._common.os.stat',
side_effect=OSError(errno.EINVAL, "foo")):
assert not isfile_strict(this_file)
with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
assert not isfile_strict(this_file)
def test_serialization(self):
def check(ret):
if json is not None:
json.loads(json.dumps(ret))
a = pickle.dumps(ret)
b = pickle.loads(a)
self.assertEqual(ret, b)
check(psutil.Process().as_dict())
check(psutil.virtual_memory())
check(psutil.swap_memory())
check(psutil.cpu_times())
check(psutil.cpu_times_percent(interval=0))
check(psutil.net_io_counters())
if LINUX and not os.path.exists('/proc/diskstats'):
pass
else:
if not APPVEYOR:
check(psutil.disk_io_counters())
check(psutil.disk_partitions())
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
def test_setup_script(self):
setup_py = os.path.join(ROOT_DIR, 'setup.py')
module = imp.load_source('setup', setup_py)
self.assertRaises(SystemExit, module.setup)
self.assertEqual(module.get_version(), psutil.__version__)
def test_ad_on_process_creation(self):
# We are supposed to be able to instantiate Process also in case
# of zombie processes or access denied.
with mock.patch.object(psutil.Process, 'create_time',
side_effect=psutil.AccessDenied) as meth:
psutil.Process()
assert meth.called
with mock.patch.object(psutil.Process, 'create_time',
side_effect=psutil.ZombieProcess(1)) as meth:
psutil.Process()
assert meth.called
with mock.patch.object(psutil.Process, 'create_time',
side_effect=ValueError) as meth:
with self.assertRaises(ValueError):
psutil.Process()
assert meth.called
def test_sanity_version_check(self):
# see: https://github.com/giampaolo/psutil/issues/564
with mock.patch(
"psutil._psplatform.cext.version", return_value="0.0.0"):
with self.assertRaises(ImportError) as cm:
importlib.reload(psutil)
self.assertIn("version conflict", str(cm.exception).lower())
# ===================================================================
# --- Example script tests
# ===================================================================
@unittest.skipIf(TOX, "can't test on TOX")
class TestScripts(unittest.TestCase):
"""Tests for scripts in the "scripts" directory."""
def assert_stdout(self, exe, args=None):
exe = '"%s"' % os.path.join(SCRIPTS_DIR, exe)
if args:
exe = exe + ' ' + args
try:
out = sh(sys.executable + ' ' + exe).strip()
except RuntimeError as err:
if 'AccessDenied' in str(err):
return str(err)
else:
raise
assert out, out
return out
def assert_syntax(self, exe, args=None):
exe = os.path.join(SCRIPTS_DIR, exe)
with open(exe, 'r') as f:
src = f.read()
ast.parse(src)
def test_coverage(self):
# make sure all example scripts have a test method defined
meths = dir(self)
for name in os.listdir(SCRIPTS_DIR):
if name.endswith('.py'):
if 'test_' + os.path.splitext(name)[0] not in meths:
# self.assert_stdout(name)
self.fail('no test defined for %r script'
% os.path.join(SCRIPTS_DIR, name))
@unittest.skipUnless(POSIX, "POSIX only")
def test_executable(self):
for name in os.listdir(SCRIPTS_DIR):
if name.endswith('.py'):
path = os.path.join(SCRIPTS_DIR, name)
if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
self.fail('%r is not executable' % path)
def test_disk_usage(self):
self.assert_stdout('disk_usage.py')
def test_free(self):
self.assert_stdout('free.py')
def test_meminfo(self):
self.assert_stdout('meminfo.py')
def test_procinfo(self):
self.assert_stdout('procinfo.py', args=str(os.getpid()))
# can't find users on APPVEYOR or TRAVIS
@unittest.skipIf(APPVEYOR or TRAVIS and not psutil.users(),
"unreliable on APPVEYOR or TRAVIS")
def test_who(self):
self.assert_stdout('who.py')
def test_ps(self):
self.assert_stdout('ps.py')
def test_pstree(self):
self.assert_stdout('pstree.py')
def test_netstat(self):
self.assert_stdout('netstat.py')
# permission denied on travis
@unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
def test_ifconfig(self):
self.assert_stdout('ifconfig.py')
@unittest.skipIf(OPENBSD or NETBSD, "platform not supported")
def test_pmap(self):
self.assert_stdout('pmap.py', args=str(os.getpid()))
@unittest.skipUnless(OSX or WINDOWS or LINUX, "platform not supported")
def test_procsmem(self):
self.assert_stdout('procsmem.py')
def test_killall(self):
self.assert_syntax('killall.py')
def test_nettop(self):
self.assert_syntax('nettop.py')
def test_top(self):
self.assert_syntax('top.py')
def test_iotop(self):
self.assert_syntax('iotop.py')
def test_pidof(self):
output = self.assert_stdout('pidof.py', args=psutil.Process().name())
self.assertIn(str(os.getpid()), output)
@unittest.skipUnless(WINDOWS, "WINDOWS only")
def test_winservices(self):
self.assert_stdout('winservices.py')
def test_cpu_distribution(self):
self.assert_syntax('cpu_distribution.py')
@unittest.skipIf(TRAVIS, "unreliable on travis")
def test_temperatures(self):
if hasattr(psutil, "sensors_temperatures") and \
psutil.sensors_temperatures():
self.assert_stdout('temperatures.py')
else:
self.assert_syntax('temperatures.py')
@unittest.skipIf(TRAVIS, "unreliable on travis")
def test_fans(self):
if hasattr(psutil, "sensors_fans") and psutil.sensors_fans():
self.assert_stdout('fans.py')
else:
self.assert_syntax('fans.py')
def test_battery(self):
if hasattr(psutil, "sensors_battery") and \
psutil.sensors_battery() is not None:
self.assert_stdout('battery.py')
else:
self.assert_syntax('battery.py')
@unittest.skipIf(APPVEYOR or TRAVIS, "unreliable on CI")
def test_sensors(self):
self.assert_stdout('sensors.py')
# ===================================================================
# --- Unit tests for test utilities.
# ===================================================================
class TestRetryDecorator(unittest.TestCase):
@mock.patch('time.sleep')
def test_retry_success(self, sleep):
# Fail 3 times out of 5; make sure the decorated fun returns.
@retry(retries=5, interval=1, logfun=None)
def foo():
while queue:
queue.pop()
1 / 0
return 1
queue = list(range(3))
self.assertEqual(foo(), 1)
self.assertEqual(sleep.call_count, 3)
@mock.patch('time.sleep')
def test_retry_failure(self, sleep):
# Fail 6 times out of 5; th function is supposed to raise exc.
@retry(retries=5, interval=1, logfun=None)
def foo():
while queue:
queue.pop()
1 / 0
return 1
queue = list(range(6))
self.assertRaises(ZeroDivisionError, foo)
self.assertEqual(sleep.call_count, 5)
@mock.patch('time.sleep')
def test_exception_arg(self, sleep):
@retry(exception=ValueError, interval=1)
def foo():
raise TypeError
self.assertRaises(TypeError, foo)
self.assertEqual(sleep.call_count, 0)
@mock.patch('time.sleep')
def test_no_interval_arg(self, sleep):
# if interval is not specified sleep is not supposed to be called
@retry(retries=5, interval=None, logfun=None)
def foo():
1 / 0
self.assertRaises(ZeroDivisionError, foo)
self.assertEqual(sleep.call_count, 0)
@mock.patch('time.sleep')
def test_retries_arg(self, sleep):
@retry(retries=5, interval=1, logfun=None)
def foo():
1 / 0
self.assertRaises(ZeroDivisionError, foo)
self.assertEqual(sleep.call_count, 5)
@mock.patch('time.sleep')
def test_retries_and_timeout_args(self, sleep):
self.assertRaises(ValueError, retry, retries=5, timeout=1)
class TestSyncTestUtils(unittest.TestCase):
def tearDown(self):
safe_rmpath(TESTFN)
def test_wait_for_pid(self):
wait_for_pid(os.getpid())
nopid = max(psutil.pids()) + 99999
with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
def test_wait_for_file(self):
with open(TESTFN, 'w') as f:
f.write('foo')
wait_for_file(TESTFN)
assert not os.path.exists(TESTFN)
def test_wait_for_file_empty(self):
with open(TESTFN, 'w'):
pass
wait_for_file(TESTFN, empty=True)
assert not os.path.exists(TESTFN)
def test_wait_for_file_no_file(self):
with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
self.assertRaises(IOError, wait_for_file, TESTFN)
def test_wait_for_file_no_delete(self):
with open(TESTFN, 'w') as f:
f.write('foo')
wait_for_file(TESTFN, delete_file=False)
assert os.path.exists(TESTFN)
class TestFSTestUtils(unittest.TestCase):
def setUp(self):
safe_rmpath(TESTFN)
tearDown = setUp
def test_safe_rmpath(self):
# test file is removed
open(TESTFN, 'w').close()
safe_rmpath(TESTFN)
assert not os.path.exists(TESTFN)
# test no exception if path does not exist
safe_rmpath(TESTFN)
# test dir is removed
os.mkdir(TESTFN)
safe_rmpath(TESTFN)
assert not os.path.exists(TESTFN)
# test other exceptions are raised
with mock.patch('psutil.tests.os.stat',
side_effect=OSError(errno.EINVAL, "")) as m:
with self.assertRaises(OSError):
safe_rmpath(TESTFN)
assert m.called
def test_chdir(self):
base = os.getcwd()
os.mkdir(TESTFN)
with chdir(TESTFN):
self.assertEqual(os.getcwd(), os.path.join(base, TESTFN))
self.assertEqual(os.getcwd(), base)
class TestTestUtils(unittest.TestCase):
def test_reap_children(self):
subp = get_test_subprocess()
p = psutil.Process(subp.pid)
assert p.is_running()
reap_children()
assert not p.is_running()
if __name__ == '__main__':
run_test_module_by_name(__file__)
-237
View File
@@ -1,237 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""OSX specific tests."""
import os
import re
import subprocess
import sys
import time
import psutil
from psutil import OSX
from psutil._compat import PY3
from psutil.tests import get_test_subprocess
from psutil.tests import MEMORY_TOLERANCE
from psutil.tests import reap_children
from psutil.tests import retry_before_failing
from psutil.tests import run_test_module_by_name
from psutil.tests import sh
from psutil.tests import unittest
PAGESIZE = os.sysconf("SC_PAGE_SIZE") if OSX else None
def sysctl(cmdline):
"""Expects a sysctl command with an argument and parse the result
returning only the value of interest.
"""
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
result = p.communicate()[0].strip().split()[1]
if PY3:
result = str(result, sys.stdout.encoding)
try:
return int(result)
except ValueError:
return result
def vm_stat(field):
"""Wrapper around 'vm_stat' cmdline utility."""
out = sh('vm_stat')
for line in out.split('\n'):
if field in line:
break
else:
raise ValueError("line not found")
return int(re.search('\d+', line).group(0)) * PAGESIZE
# http://code.activestate.com/recipes/578019/
def human2bytes(s):
SYMBOLS = {
'customary': ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'),
}
init = s
num = ""
while s and s[0:1].isdigit() or s[0:1] == '.':
num += s[0]
s = s[1:]
num = float(num)
letter = s.strip()
for name, sset in SYMBOLS.items():
if letter in sset:
break
else:
if letter == 'k':
sset = SYMBOLS['customary']
letter = letter.upper()
else:
raise ValueError("can't interpret %r" % init)
prefix = {sset[0]: 1}
for i, s in enumerate(sset[1:]):
prefix[s] = 1 << (i + 1) * 10
return int(num * prefix[letter])
@unittest.skipUnless(OSX, "OSX only")
class TestProcess(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.pid = get_test_subprocess().pid
@classmethod
def tearDownClass(cls):
reap_children()
def test_process_create_time(self):
cmdline = "ps -o lstart -p %s" % self.pid
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0]
if PY3:
output = str(output, sys.stdout.encoding)
start_ps = output.replace('STARTED', '').strip()
hhmmss = start_ps.split(' ')[-2]
year = start_ps.split(' ')[-1]
start_psutil = psutil.Process(self.pid).create_time()
self.assertEqual(
hhmmss,
time.strftime("%H:%M:%S", time.localtime(start_psutil)))
self.assertEqual(
year,
time.strftime("%Y", time.localtime(start_psutil)))
@unittest.skipUnless(OSX, "OSX only")
class TestSystemAPIs(unittest.TestCase):
# --- disk
def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -k "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total = int(total) * 1024
used = int(used) * 1024
free = int(free) * 1024
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(part.device, dev)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.free, free)
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.used, used)
# --- cpu
def test_cpu_count_logical(self):
num = sysctl("sysctl hw.logicalcpu")
self.assertEqual(num, psutil.cpu_count(logical=True))
def test_cpu_count_physical(self):
num = sysctl("sysctl hw.physicalcpu")
self.assertEqual(num, psutil.cpu_count(logical=False))
def test_cpu_freq(self):
freq = psutil.cpu_freq()
self.assertEqual(
freq.current * 1000 * 1000, sysctl("sysctl hw.cpufrequency"))
self.assertEqual(
freq.min * 1000 * 1000, sysctl("sysctl hw.cpufrequency_min"))
self.assertEqual(
freq.max * 1000 * 1000, sysctl("sysctl hw.cpufrequency_max"))
# --- virtual mem
def test_vmem_total(self):
sysctl_hwphymem = sysctl('sysctl hw.memsize')
self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total)
@retry_before_failing()
def test_vmem_free(self):
vmstat_val = vm_stat("free")
psutil_val = psutil.virtual_memory().free
self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE)
@retry_before_failing()
def test_vmem_available(self):
vmstat_val = vm_stat("inactive") + vm_stat("free")
psutil_val = psutil.virtual_memory().available
self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE)
@retry_before_failing()
def test_vmem_active(self):
vmstat_val = vm_stat("active")
psutil_val = psutil.virtual_memory().active
self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE)
@retry_before_failing()
def test_vmem_inactive(self):
vmstat_val = vm_stat("inactive")
psutil_val = psutil.virtual_memory().inactive
self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE)
@retry_before_failing()
def test_vmem_wired(self):
vmstat_val = vm_stat("wired")
psutil_val = psutil.virtual_memory().wired
self.assertAlmostEqual(psutil_val, vmstat_val, delta=MEMORY_TOLERANCE)
# --- swap mem
@retry_before_failing()
def test_swapmem_sin(self):
vmstat_val = vm_stat("Pageins")
psutil_val = psutil.swap_memory().sin
self.assertEqual(psutil_val, vmstat_val)
@retry_before_failing()
def test_swapmem_sout(self):
vmstat_val = vm_stat("Pageout")
psutil_val = psutil.swap_memory().sout
self.assertEqual(psutil_val, vmstat_val)
# Not very reliable.
# def test_swapmem_total(self):
# out = sh('sysctl vm.swapusage')
# out = out.replace('vm.swapusage: ', '')
# total, used, free = re.findall('\d+.\d+\w', out)
# psutil_smem = psutil.swap_memory()
# self.assertEqual(psutil_smem.total, human2bytes(total))
# self.assertEqual(psutil_smem.used, human2bytes(used))
# self.assertEqual(psutil_smem.free, human2bytes(free))
# --- network
def test_net_if_stats(self):
for name, stats in psutil.net_if_stats().items():
try:
out = sh("ifconfig %s" % name)
except RuntimeError:
pass
else:
self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
self.assertEqual(stats.mtu,
int(re.findall('mtu (\d+)', out)[0]))
if __name__ == '__main__':
run_test_module_by_name(__file__)
-353
View File
@@ -1,353 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""POSIX specific tests."""
import datetime
import errno
import os
import subprocess
import sys
import time
import psutil
from psutil import BSD
from psutil import LINUX
from psutil import OSX
from psutil import POSIX
from psutil import SUNOS
from psutil._compat import callable
from psutil._compat import PY3
from psutil.tests import APPVEYOR
from psutil.tests import get_kernel_version
from psutil.tests import get_test_subprocess
from psutil.tests import mock
from psutil.tests import PYTHON
from psutil.tests import reap_children
from psutil.tests import retry_before_failing
from psutil.tests import run_test_module_by_name
from psutil.tests import sh
from psutil.tests import skip_on_access_denied
from psutil.tests import TRAVIS
from psutil.tests import unittest
from psutil.tests import wait_for_pid
def ps(cmd):
"""Expects a ps command with a -o argument and parse the result
returning only the value of interest.
"""
if not LINUX:
cmd = cmd.replace(" --no-headers ", " ")
if SUNOS:
cmd = cmd.replace("-o command", "-o comm")
cmd = cmd.replace("-o start", "-o stime")
p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
if PY3:
output = str(output, sys.stdout.encoding)
if not LINUX:
output = output.split('\n')[1].strip()
try:
return int(output)
except ValueError:
return output
@unittest.skipUnless(POSIX, "POSIX only")
class TestProcess(unittest.TestCase):
"""Compare psutil results against 'ps' command line utility (mainly)."""
@classmethod
def setUpClass(cls):
cls.pid = get_test_subprocess([PYTHON, "-E", "-O"],
stdin=subprocess.PIPE).pid
wait_for_pid(cls.pid)
@classmethod
def tearDownClass(cls):
reap_children()
# for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
def test_ppid(self):
ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
ppid_psutil = psutil.Process(self.pid).ppid()
self.assertEqual(ppid_ps, ppid_psutil)
def test_uid(self):
uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
uid_psutil = psutil.Process(self.pid).uids().real
self.assertEqual(uid_ps, uid_psutil)
def test_gid(self):
gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
gid_psutil = psutil.Process(self.pid).gids().real
self.assertEqual(gid_ps, gid_psutil)
def test_username(self):
username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
username_psutil = psutil.Process(self.pid).username()
self.assertEqual(username_ps, username_psutil)
@skip_on_access_denied()
@retry_before_failing()
def test_rss_memory(self):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid)
rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
self.assertEqual(rss_ps, rss_psutil)
@skip_on_access_denied()
@retry_before_failing()
def test_vsz_memory(self):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid)
vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
self.assertEqual(vsz_ps, vsz_psutil)
def test_name(self):
# use command + arg since "comm" keyword not supported on all platforms
name_ps = ps("ps --no-headers -o command -p %s" % (
self.pid)).split(' ')[0]
# remove path if there is any, from the command
name_ps = os.path.basename(name_ps).lower()
name_psutil = psutil.Process(self.pid).name().lower()
self.assertEqual(name_ps, name_psutil)
@unittest.skipIf(OSX or BSD, 'ps -o start not available')
def test_create_time(self):
time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
time_psutil = psutil.Process(self.pid).create_time()
time_psutil_tstamp = datetime.datetime.fromtimestamp(
time_psutil).strftime("%H:%M:%S")
# sometimes ps shows the time rounded up instead of down, so we check
# for both possible values
round_time_psutil = round(time_psutil)
round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
round_time_psutil).strftime("%H:%M:%S")
self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
def test_exe(self):
ps_pathname = ps("ps --no-headers -o command -p %s" %
self.pid).split(' ')[0]
psutil_pathname = psutil.Process(self.pid).exe()
try:
self.assertEqual(ps_pathname, psutil_pathname)
except AssertionError:
# certain platforms such as BSD are more accurate returning:
# "/usr/local/bin/python2.7"
# ...instead of:
# "/usr/local/bin/python"
# We do not want to consider this difference in accuracy
# an error.
adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
self.assertEqual(ps_pathname, adjusted_ps_pathname)
def test_cmdline(self):
ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid)
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
if SUNOS:
# ps on Solaris only shows the first part of the cmdline
psutil_cmdline = psutil_cmdline.split(" ")[0]
self.assertEqual(ps_cmdline, psutil_cmdline)
def test_nice(self):
ps_nice = ps("ps --no-headers -o nice -p %s" % self.pid)
psutil_nice = psutil.Process().nice()
self.assertEqual(ps_nice, psutil_nice)
def test_num_fds(self):
# Note: this fails from time to time; I'm keen on thinking
# it doesn't mean something is broken
def call(p, attr):
args = ()
attr = getattr(p, name, None)
if attr is not None and callable(attr):
if name == 'rlimit':
args = (psutil.RLIMIT_NOFILE,)
attr(*args)
else:
attr
p = psutil.Process(os.getpid())
failures = []
ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
'send_signal', 'wait', 'children', 'as_dict']
if LINUX and get_kernel_version() < (2, 6, 36):
ignored_names.append('rlimit')
if LINUX and get_kernel_version() < (2, 6, 23):
ignored_names.append('num_ctx_switches')
for name in dir(psutil.Process):
if (name.startswith('_') or name in ignored_names):
continue
else:
try:
num1 = p.num_fds()
for x in range(2):
call(p, name)
num2 = p.num_fds()
except psutil.AccessDenied:
pass
else:
if abs(num2 - num1) > 1:
fail = "failure while processing Process.%s method " \
"(before=%s, after=%s)" % (name, num1, num2)
failures.append(fail)
if failures:
self.fail('\n' + '\n'.join(failures))
@unittest.skipUnless(os.path.islink("/proc/%s/cwd" % os.getpid()),
"/proc fs not available")
def test_cwd(self):
self.assertEqual(os.readlink("/proc/%s/cwd" % os.getpid()),
psutil.Process().cwd())
@unittest.skipUnless(POSIX, "POSIX only")
class TestSystemAPIs(unittest.TestCase):
"""Test some system APIs."""
@retry_before_failing()
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
if SUNOS:
cmd = ["ps", "-A", "-o", "pid"]
else:
cmd = ["ps", "ax", "-o", "pid"]
p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
assert p.poll() == 0
if PY3:
output = str(output, sys.stdout.encoding)
pids_ps = []
for line in output.split('\n')[1:]:
if line:
pid = int(line.split()[0].strip())
pids_ps.append(pid)
# remove ps subprocess pid which is supposed to be dead in meantime
pids_ps.remove(p.pid)
pids_psutil = psutil.pids()
pids_ps.sort()
pids_psutil.sort()
# on OSX ps doesn't show pid 0
if OSX and 0 not in pids_ps:
pids_ps.insert(0, 0)
if pids_ps != pids_psutil:
difference = [x for x in pids_psutil if x not in pids_ps] + \
[x for x in pids_ps if x not in pids_psutil]
self.fail("difference: " + str(difference))
# for some reason ifconfig -a does not report all interfaces
# returned by psutil
@unittest.skipIf(SUNOS, "unreliable on SUNOS")
@unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
def test_nic_names(self):
p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
if p.returncode != 0:
raise unittest.SkipTest('ifconfig returned no output')
if PY3:
output = str(output, sys.stdout.encoding)
for nic in psutil.net_io_counters(pernic=True).keys():
for line in output.split():
if line.startswith(nic):
break
else:
self.fail(
"couldn't find %s nic in 'ifconfig -a' output\n%s" % (
nic, output))
# can't find users on APPVEYOR or TRAVIS
@unittest.skipIf(APPVEYOR or TRAVIS and not psutil.users(),
"unreliable on APPVEYOR or TRAVIS")
@retry_before_failing()
def test_users(self):
out = sh("who")
lines = out.split('\n')
users = [x.split()[0] for x in lines]
self.assertEqual(len(users), len(psutil.users()))
terminals = [x.split()[1] for x in lines]
for u in psutil.users():
self.assertTrue(u.name in users, u.name)
self.assertTrue(u.terminal in terminals, u.terminal)
def test_pid_exists_let_raise(self):
# According to "man 2 kill" possible error values for kill
# are (EINVAL, EPERM, ESRCH). Test that any other errno
# results in an exception.
with mock.patch("psutil._psposix.os.kill",
side_effect=OSError(errno.EBADF, "")) as m:
self.assertRaises(OSError, psutil._psposix.pid_exists, os.getpid())
assert m.called
def test_os_waitpid_let_raise(self):
# os.waitpid() is supposed to catch EINTR and ECHILD only.
# Test that any other errno results in an exception.
with mock.patch("psutil._psposix.os.waitpid",
side_effect=OSError(errno.EBADF, "")) as m:
self.assertRaises(OSError, psutil._psposix.wait_pid, os.getpid())
assert m.called
def test_os_waitpid_eintr(self):
# os.waitpid() is supposed to "retry" on EINTR.
with mock.patch("psutil._psposix.os.waitpid",
side_effect=OSError(errno.EINTR, "")) as m:
self.assertRaises(
psutil._psposix.TimeoutExpired,
psutil._psposix.wait_pid, os.getpid(), timeout=0.01)
assert m.called
def test_os_waitpid_bad_ret_status(self):
# Simulate os.waitpid() returning a bad status.
with mock.patch("psutil._psposix.os.waitpid",
return_value=(1, -1)) as m:
self.assertRaises(ValueError,
psutil._psposix.wait_pid, os.getpid())
assert m.called
def test_disk_usage(self):
def df(device):
out = sh("df -k %s" % device).strip()
line = out.split('\n')[1]
fields = line.split()
total = int(fields[1]) * 1024
used = int(fields[2]) * 1024
free = int(fields[3]) * 1024
percent = float(fields[4].replace('%', ''))
return (total, used, free, percent)
tolerance = 4 * 1024 * 1024 # 4MB
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
try:
total, used, free, percent = df(part.device)
except RuntimeError as err:
# see:
# https://travis-ci.org/giampaolo/psutil/jobs/138338464
# https://travis-ci.org/giampaolo/psutil/jobs/138343361
if "no such file or directory" in str(err).lower() or \
"raw devices not supported" in str(err).lower():
continue
else:
raise
else:
self.assertAlmostEqual(usage.total, total, delta=tolerance)
self.assertAlmostEqual(usage.used, used, delta=tolerance)
self.assertAlmostEqual(usage.free, free, delta=tolerance)
self.assertAlmostEqual(usage.percent, percent, delta=1)
if __name__ == '__main__':
run_test_module_by_name(__file__)
File diff suppressed because it is too large Load Diff
-45
View File
@@ -1,45 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Sun OS specific tests."""
import os
import psutil
from psutil import SUNOS
from psutil.tests import run_test_module_by_name
from psutil.tests import sh
from psutil.tests import unittest
@unittest.skipUnless(SUNOS, "SUNOS only")
class SunOSSpecificTestCase(unittest.TestCase):
def test_swap_memory(self):
out = sh('env PATH=/usr/sbin:/sbin:%s swap -l' % os.environ['PATH'])
lines = out.strip().split('\n')[1:]
if not lines:
raise ValueError('no swap device(s) configured')
total = free = 0
for line in lines:
line = line.split()
t, f = line[-2:]
total += int(int(t) * 512)
free += int(int(f) * 512)
used = total - free
psutil_swap = psutil.swap_memory()
self.assertEqual(psutil_swap.total, total)
self.assertEqual(psutil_swap.used, used)
self.assertEqual(psutil_swap.free, free)
def test_cpu_count(self):
out = sh("/usr/sbin/psrinfo")
self.assertEqual(psutil.cpu_count(), len(out.split('\n')))
if __name__ == '__main__':
run_test_module_by_name(__file__)
-813
View File
@@ -1,813 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests for system APIS."""
import contextlib
import datetime
import errno
import os
import pprint
import shutil
import signal
import socket
import sys
import tempfile
import time
import psutil
from psutil import BSD
from psutil import FREEBSD
from psutil import LINUX
from psutil import NETBSD
from psutil import OPENBSD
from psutil import OSX
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
from psutil._compat import long
from psutil._compat import unicode
from psutil.tests import AF_INET6
from psutil.tests import APPVEYOR
from psutil.tests import check_net_address
from psutil.tests import DEVNULL
from psutil.tests import enum
from psutil.tests import get_test_subprocess
from psutil.tests import mock
from psutil.tests import reap_children
from psutil.tests import retry_before_failing
from psutil.tests import run_test_module_by_name
from psutil.tests import safe_rmpath
from psutil.tests import skip_on_access_denied
from psutil.tests import TESTFN
from psutil.tests import TESTFN_UNICODE
from psutil.tests import TRAVIS
from psutil.tests import unittest
# ===================================================================
# --- System-related API tests
# ===================================================================
class TestSystemAPIs(unittest.TestCase):
"""Tests for system-related APIs."""
def setUp(self):
safe_rmpath(TESTFN)
def tearDown(self):
reap_children()
def test_process_iter(self):
self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
sproc = get_test_subprocess()
self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
p = psutil.Process(sproc.pid)
p.kill()
p.wait()
self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])
with mock.patch('psutil.Process',
side_effect=psutil.NoSuchProcess(os.getpid())):
self.assertEqual(list(psutil.process_iter()), [])
with mock.patch('psutil.Process',
side_effect=psutil.AccessDenied(os.getpid())):
with self.assertRaises(psutil.AccessDenied):
list(psutil.process_iter())
def test_wait_procs(self):
def callback(p):
pids.append(p.pid)
pids = []
sproc1 = get_test_subprocess()
sproc2 = get_test_subprocess()
sproc3 = get_test_subprocess()
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
t = time.time()
gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)
self.assertLess(time.time() - t, 0.5)
self.assertEqual(gone, [])
self.assertEqual(len(alive), 3)
self.assertEqual(pids, [])
for p in alive:
self.assertFalse(hasattr(p, 'returncode'))
@retry_before_failing(30)
def test(procs, callback):
gone, alive = psutil.wait_procs(procs, timeout=0.03,
callback=callback)
self.assertEqual(len(gone), 1)
self.assertEqual(len(alive), 2)
return gone, alive
sproc3.terminate()
gone, alive = test(procs, callback)
self.assertIn(sproc3.pid, [x.pid for x in gone])
if POSIX:
self.assertEqual(gone.pop().returncode, -signal.SIGTERM)
else:
self.assertEqual(gone.pop().returncode, 1)
self.assertEqual(pids, [sproc3.pid])
for p in alive:
self.assertFalse(hasattr(p, 'returncode'))
@retry_before_failing(30)
def test(procs, callback):
gone, alive = psutil.wait_procs(procs, timeout=0.03,
callback=callback)
self.assertEqual(len(gone), 3)
self.assertEqual(len(alive), 0)
return gone, alive
sproc1.terminate()
sproc2.terminate()
gone, alive = test(procs, callback)
self.assertEqual(set(pids), set([sproc1.pid, sproc2.pid, sproc3.pid]))
for p in gone:
self.assertTrue(hasattr(p, 'returncode'))
def test_wait_procs_no_timeout(self):
sproc1 = get_test_subprocess()
sproc2 = get_test_subprocess()
sproc3 = get_test_subprocess()
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
for p in procs:
p.terminate()
gone, alive = psutil.wait_procs(procs)
def test_boot_time(self):
bt = psutil.boot_time()
self.assertIsInstance(bt, float)
self.assertGreater(bt, 0)
self.assertLess(bt, time.time())
@unittest.skipUnless(POSIX, 'POSIX only')
def test_PAGESIZE(self):
# pagesize is used internally to perform different calculations
# and it's determined by using SC_PAGE_SIZE; make sure
# getpagesize() returns the same value.
import resource
self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize())
def test_virtual_memory(self):
mem = psutil.virtual_memory()
assert mem.total > 0, mem
assert mem.available > 0, mem
assert 0 <= mem.percent <= 100, mem
assert mem.used > 0, mem
assert mem.free >= 0, mem
for name in mem._fields:
value = getattr(mem, name)
if name != 'percent':
self.assertIsInstance(value, (int, long))
if name != 'total':
if not value >= 0:
self.fail("%r < 0 (%s)" % (name, value))
if value > mem.total:
self.fail("%r > total (total=%s, %s=%s)"
% (name, mem.total, name, value))
def test_swap_memory(self):
mem = psutil.swap_memory()
assert mem.total >= 0, mem
assert mem.used >= 0, mem
if mem.total > 0:
# likely a system with no swap partition
assert mem.free > 0, mem
else:
assert mem.free == 0, mem
assert 0 <= mem.percent <= 100, mem
assert mem.sin >= 0, mem
assert mem.sout >= 0, mem
def test_pid_exists(self):
sproc = get_test_subprocess()
self.assertTrue(psutil.pid_exists(sproc.pid))
p = psutil.Process(sproc.pid)
p.kill()
p.wait()
self.assertFalse(psutil.pid_exists(sproc.pid))
self.assertFalse(psutil.pid_exists(-1))
self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
# pid 0
psutil.pid_exists(0) == 0 in psutil.pids()
def test_pid_exists_2(self):
reap_children()
pids = psutil.pids()
for pid in pids:
try:
assert psutil.pid_exists(pid)
except AssertionError:
# in case the process disappeared in meantime fail only
# if it is no longer in psutil.pids()
time.sleep(.1)
if pid in psutil.pids():
self.fail(pid)
pids = range(max(pids) + 5000, max(pids) + 6000)
for pid in pids:
self.assertFalse(psutil.pid_exists(pid), msg=pid)
def test_pids(self):
plist = [x.pid for x in psutil.process_iter()]
pidlist = psutil.pids()
self.assertEqual(plist.sort(), pidlist.sort())
# make sure every pid is unique
self.assertEqual(len(pidlist), len(set(pidlist)))
def test_test(self):
# test for psutil.test() function
stdout = sys.stdout
sys.stdout = DEVNULL
try:
psutil.test()
finally:
sys.stdout = stdout
def test_cpu_count(self):
logical = psutil.cpu_count()
self.assertEqual(logical, len(psutil.cpu_times(percpu=True)))
self.assertGreaterEqual(logical, 1)
#
if os.path.exists("/proc/cpuinfo"):
with open("/proc/cpuinfo") as fd:
cpuinfo_data = fd.read()
if "physical id" not in cpuinfo_data:
raise unittest.SkipTest("cpuinfo doesn't include physical id")
physical = psutil.cpu_count(logical=False)
self.assertGreaterEqual(physical, 1)
self.assertGreaterEqual(logical, physical)
def test_cpu_times(self):
# Check type, value >= 0, str().
total = 0
times = psutil.cpu_times()
sum(times)
for cp_time in times:
self.assertIsInstance(cp_time, float)
self.assertGreaterEqual(cp_time, 0.0)
total += cp_time
self.assertEqual(total, sum(times))
str(times)
# CPU times are always supposed to increase over time
# or at least remain the same and that's because time
# cannot go backwards.
# Surprisingly sometimes this might not be the case (at
# least on Windows and Linux), see:
# https://github.com/giampaolo/psutil/issues/392
# https://github.com/giampaolo/psutil/issues/645
# if not WINDOWS:
# last = psutil.cpu_times()
# for x in range(100):
# new = psutil.cpu_times()
# for field in new._fields:
# new_t = getattr(new, field)
# last_t = getattr(last, field)
# self.assertGreaterEqual(new_t, last_t,
# msg="%s %s" % (new_t, last_t))
# last = new
def test_cpu_times_time_increases(self):
# Make sure time increases between calls.
t1 = sum(psutil.cpu_times())
time.sleep(0.1)
t2 = sum(psutil.cpu_times())
difference = t2 - t1
if not difference >= 0.05:
self.fail("difference %s" % difference)
def test_per_cpu_times(self):
# Check type, value >= 0, str().
for times in psutil.cpu_times(percpu=True):
total = 0
sum(times)
for cp_time in times:
self.assertIsInstance(cp_time, float)
self.assertGreaterEqual(cp_time, 0.0)
total += cp_time
self.assertEqual(total, sum(times))
str(times)
self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
len(psutil.cpu_times(percpu=False)))
# Note: in theory CPU times are always supposed to increase over
# time or remain the same but never go backwards. In practice
# sometimes this is not the case.
# This issue seemd to be afflict Windows:
# https://github.com/giampaolo/psutil/issues/392
# ...but it turns out also Linux (rarely) behaves the same.
# last = psutil.cpu_times(percpu=True)
# for x in range(100):
# new = psutil.cpu_times(percpu=True)
# for index in range(len(new)):
# newcpu = new[index]
# lastcpu = last[index]
# for field in newcpu._fields:
# new_t = getattr(newcpu, field)
# last_t = getattr(lastcpu, field)
# self.assertGreaterEqual(
# new_t, last_t, msg="%s %s" % (lastcpu, newcpu))
# last = new
def test_per_cpu_times_2(self):
# Simulate some work load then make sure time have increased
# between calls.
tot1 = psutil.cpu_times(percpu=True)
stop_at = time.time() + 0.1
while True:
if time.time() >= stop_at:
break
tot2 = psutil.cpu_times(percpu=True)
for t1, t2 in zip(tot1, tot2):
t1, t2 = sum(t1), sum(t2)
difference = t2 - t1
if difference >= 0.05:
return
self.fail()
def test_cpu_times_comparison(self):
# Make sure the sum of all per cpu times is almost equal to
# base "one cpu" times.
base = psutil.cpu_times()
per_cpu = psutil.cpu_times(percpu=True)
summed_values = base._make([sum(num) for num in zip(*per_cpu)])
for field in base._fields:
self.assertAlmostEqual(
getattr(base, field), getattr(summed_values, field), delta=1)
def _test_cpu_percent(self, percent, last_ret, new_ret):
try:
self.assertIsInstance(percent, float)
self.assertGreaterEqual(percent, 0.0)
self.assertIsNot(percent, -0.0)
self.assertLessEqual(percent, 100.0 * psutil.cpu_count())
except AssertionError as err:
raise AssertionError("\n%s\nlast=%s\nnew=%s" % (
err, pprint.pformat(last_ret), pprint.pformat(new_ret)))
def test_cpu_percent(self):
last = psutil.cpu_percent(interval=0.001)
for x in range(100):
new = psutil.cpu_percent(interval=None)
self._test_cpu_percent(new, last, new)
last = new
with self.assertRaises(ValueError):
psutil.cpu_percent(interval=-1)
def test_per_cpu_percent(self):
last = psutil.cpu_percent(interval=0.001, percpu=True)
self.assertEqual(len(last), psutil.cpu_count())
for x in range(100):
new = psutil.cpu_percent(interval=None, percpu=True)
for percent in new:
self._test_cpu_percent(percent, last, new)
last = new
with self.assertRaises(ValueError):
psutil.cpu_percent(interval=-1, percpu=True)
def test_cpu_times_percent(self):
last = psutil.cpu_times_percent(interval=0.001)
for x in range(100):
new = psutil.cpu_times_percent(interval=None)
for percent in new:
self._test_cpu_percent(percent, last, new)
self._test_cpu_percent(sum(new), last, new)
last = new
def test_per_cpu_times_percent(self):
last = psutil.cpu_times_percent(interval=0.001, percpu=True)
self.assertEqual(len(last), psutil.cpu_count())
for x in range(100):
new = psutil.cpu_times_percent(interval=None, percpu=True)
for cpu in new:
for percent in cpu:
self._test_cpu_percent(percent, last, new)
self._test_cpu_percent(sum(cpu), last, new)
last = new
def test_per_cpu_times_percent_negative(self):
# see: https://github.com/giampaolo/psutil/issues/645
psutil.cpu_times_percent(percpu=True)
zero_times = [x._make([0 for x in range(len(x._fields))])
for x in psutil.cpu_times(percpu=True)]
with mock.patch('psutil.cpu_times', return_value=zero_times):
for cpu in psutil.cpu_times_percent(percpu=True):
for percent in cpu:
self._test_cpu_percent(percent, None, None)
@unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
"os.statvfs() not available")
def test_disk_usage(self):
usage = psutil.disk_usage(os.getcwd())
assert usage.total > 0, usage
assert usage.used > 0, usage
assert usage.free > 0, usage
assert usage.total > usage.used, usage
assert usage.total > usage.free, usage
assert 0 <= usage.percent <= 100, usage.percent
if hasattr(shutil, 'disk_usage'):
# py >= 3.3, see: http://bugs.python.org/issue12442
shutil_usage = shutil.disk_usage(os.getcwd())
tolerance = 5 * 1024 * 1024 # 5MB
self.assertEqual(usage.total, shutil_usage.total)
self.assertAlmostEqual(usage.free, shutil_usage.free,
delta=tolerance)
self.assertAlmostEqual(usage.used, shutil_usage.used,
delta=tolerance)
# if path does not exist OSError ENOENT is expected across
# all platforms
fname = tempfile.mktemp()
try:
psutil.disk_usage(fname)
except OSError as err:
if err.args[0] != errno.ENOENT:
raise
else:
self.fail("OSError not raised")
@unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
"os.statvfs() not available")
def test_disk_usage_unicode(self):
# see: https://github.com/giampaolo/psutil/issues/416
safe_rmpath(TESTFN_UNICODE)
self.addCleanup(safe_rmpath, TESTFN_UNICODE)
os.mkdir(TESTFN_UNICODE)
psutil.disk_usage(TESTFN_UNICODE)
@unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
"os.statvfs() not available")
@unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis")
def test_disk_partitions(self):
# all = False
ls = psutil.disk_partitions(all=False)
# on travis we get:
# self.assertEqual(p.cpu_affinity(), [n])
# AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
self.assertTrue(ls, msg=ls)
for disk in ls:
self.assertIsInstance(disk.device, (str, unicode))
self.assertIsInstance(disk.mountpoint, (str, unicode))
self.assertIsInstance(disk.fstype, (str, unicode))
self.assertIsInstance(disk.opts, (str, unicode))
if WINDOWS and 'cdrom' in disk.opts:
continue
if not POSIX:
assert os.path.exists(disk.device), disk
else:
# we cannot make any assumption about this, see:
# http://goo.gl/p9c43
disk.device
if SUNOS:
# on solaris apparently mount points can also be files
assert os.path.exists(disk.mountpoint), disk
else:
assert os.path.isdir(disk.mountpoint), disk
assert disk.fstype, disk
# all = True
ls = psutil.disk_partitions(all=True)
self.assertTrue(ls, msg=ls)
for disk in psutil.disk_partitions(all=True):
if not WINDOWS:
try:
os.stat(disk.mountpoint)
except OSError as err:
if TRAVIS and OSX and err.errno == errno.EIO:
continue
# http://mail.python.org/pipermail/python-dev/
# 2012-June/120787.html
if err.errno not in (errno.EPERM, errno.EACCES):
raise
else:
if SUNOS:
# on solaris apparently mount points can also be files
assert os.path.exists(disk.mountpoint), disk
else:
assert os.path.isdir(disk.mountpoint), disk
self.assertIsInstance(disk.fstype, str)
self.assertIsInstance(disk.opts, str)
def find_mount_point(path):
path = os.path.abspath(path)
while not os.path.ismount(path):
path = os.path.dirname(path)
return path.lower()
mount = find_mount_point(__file__)
mounts = [x.mountpoint.lower() for x in
psutil.disk_partitions(all=True)]
self.assertIn(mount, mounts)
psutil.disk_usage(mount)
@skip_on_access_denied()
def test_net_connections(self):
def check(cons, families, types_):
for conn in cons:
self.assertIn(conn.family, families, msg=conn)
if conn.family != getattr(socket, 'AF_UNIX', object()):
self.assertIn(conn.type, types_, msg=conn)
self.assertIsInstance(conn.status, (str, unicode))
from psutil._common import conn_tmap
for kind, groups in conn_tmap.items():
if SUNOS and kind == 'unix':
continue
families, types_ = groups
cons = psutil.net_connections(kind)
self.assertEqual(len(cons), len(set(cons)))
check(cons, families, types_)
def test_net_io_counters(self):
def check_ntuple(nt):
self.assertEqual(nt[0], nt.bytes_sent)
self.assertEqual(nt[1], nt.bytes_recv)
self.assertEqual(nt[2], nt.packets_sent)
self.assertEqual(nt[3], nt.packets_recv)
self.assertEqual(nt[4], nt.errin)
self.assertEqual(nt[5], nt.errout)
self.assertEqual(nt[6], nt.dropin)
self.assertEqual(nt[7], nt.dropout)
assert nt.bytes_sent >= 0, nt
assert nt.bytes_recv >= 0, nt
assert nt.packets_sent >= 0, nt
assert nt.packets_recv >= 0, nt
assert nt.errin >= 0, nt
assert nt.errout >= 0, nt
assert nt.dropin >= 0, nt
assert nt.dropout >= 0, nt
ret = psutil.net_io_counters(pernic=False)
check_ntuple(ret)
ret = psutil.net_io_counters(pernic=True)
self.assertNotEqual(ret, [])
for key in ret:
self.assertTrue(key)
self.assertIsInstance(key, (str, unicode))
check_ntuple(ret[key])
def test_net_if_addrs(self):
nics = psutil.net_if_addrs()
assert nics, nics
nic_stats = psutil.net_if_stats()
# Not reliable on all platforms (net_if_addrs() reports more
# interfaces).
# self.assertEqual(sorted(nics.keys()),
# sorted(psutil.net_io_counters(pernic=True).keys()))
families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
for nic, addrs in nics.items():
self.assertIsInstance(nic, (str, unicode))
self.assertEqual(len(set(addrs)), len(addrs))
for addr in addrs:
self.assertIsInstance(addr.family, int)
self.assertIsInstance(addr.address, str)
self.assertIsInstance(addr.netmask, (str, type(None)))
self.assertIsInstance(addr.broadcast, (str, type(None)))
self.assertIn(addr.family, families)
if sys.version_info >= (3, 4):
self.assertIsInstance(addr.family, enum.IntEnum)
if nic_stats[nic].isup:
# Do not test binding to addresses of interfaces
# that are down
if addr.family == socket.AF_INET:
s = socket.socket(addr.family)
with contextlib.closing(s):
s.bind((addr.address, 0))
elif addr.family == socket.AF_INET6:
info = socket.getaddrinfo(
addr.address, 0, socket.AF_INET6,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
af, socktype, proto, canonname, sa = info
s = socket.socket(af, socktype, proto)
with contextlib.closing(s):
s.bind(sa)
for ip in (addr.address, addr.netmask, addr.broadcast,
addr.ptp):
if ip is not None:
# TODO: skip AF_INET6 for now because I get:
# AddressValueError: Only hex digits permitted in
# u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
if addr.family != AF_INET6:
check_net_address(ip, addr.family)
# broadcast and ptp addresses are mutually exclusive
if addr.broadcast:
self.assertIsNone(addr.ptp)
elif addr.ptp:
self.assertIsNone(addr.broadcast)
if BSD or OSX or SUNOS:
if hasattr(socket, "AF_LINK"):
self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
elif LINUX:
self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
elif WINDOWS:
self.assertEqual(psutil.AF_LINK, -1)
def test_net_if_addrs_mac_null_bytes(self):
# Simulate that the underlying C function returns an incomplete
# MAC address. psutil is supposed to fill it with null bytes.
# https://github.com/giampaolo/psutil/issues/786
if POSIX:
ret = [('em1', psutil.AF_LINK, '06:3d:29', None, None, None)]
else:
ret = [('em1', -1, '06-3d-29', None, None, None)]
with mock.patch('psutil._psplatform.net_if_addrs',
return_value=ret) as m:
addr = psutil.net_if_addrs()['em1'][0]
assert m.called
if POSIX:
self.assertEqual(addr.address, '06:3d:29:00:00:00')
else:
self.assertEqual(addr.address, '06-3d-29-00-00-00')
@unittest.skipIf(TRAVIS, "unreliable on TRAVIS") # raises EPERM
def test_net_if_stats(self):
nics = psutil.net_if_stats()
assert nics, nics
all_duplexes = (psutil.NIC_DUPLEX_FULL,
psutil.NIC_DUPLEX_HALF,
psutil.NIC_DUPLEX_UNKNOWN)
for nic, stats in nics.items():
isup, duplex, speed, mtu = stats
self.assertIsInstance(isup, bool)
self.assertIn(duplex, all_duplexes)
self.assertIn(duplex, all_duplexes)
self.assertGreaterEqual(speed, 0)
self.assertGreaterEqual(mtu, 0)
@unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
'/proc/diskstats not available on this linux version')
@unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") # no visible disks
def test_disk_io_counters(self):
def check_ntuple(nt):
self.assertEqual(nt[0], nt.read_count)
self.assertEqual(nt[1], nt.write_count)
self.assertEqual(nt[2], nt.read_bytes)
self.assertEqual(nt[3], nt.write_bytes)
if not (OPENBSD or NETBSD):
self.assertEqual(nt[4], nt.read_time)
self.assertEqual(nt[5], nt.write_time)
if LINUX:
self.assertEqual(nt[6], nt.read_merged_count)
self.assertEqual(nt[7], nt.write_merged_count)
self.assertEqual(nt[8], nt.busy_time)
elif FREEBSD:
self.assertEqual(nt[6], nt.busy_time)
for name in nt._fields:
assert getattr(nt, name) >= 0, nt
ret = psutil.disk_io_counters(perdisk=False)
check_ntuple(ret)
ret = psutil.disk_io_counters(perdisk=True)
# make sure there are no duplicates
self.assertEqual(len(ret), len(set(ret)))
for key in ret:
assert key, key
check_ntuple(ret[key])
if LINUX and key[-1].isdigit():
# if 'sda1' is listed 'sda' shouldn't, see:
# https://github.com/giampaolo/psutil/issues/338
while key[-1].isdigit():
key = key[:-1]
self.assertNotIn(key, ret.keys())
# can't find users on APPVEYOR or TRAVIS
@unittest.skipIf(APPVEYOR or TRAVIS and not psutil.users(),
"unreliable on APPVEYOR or TRAVIS")
def test_users(self):
users = psutil.users()
self.assertNotEqual(users, [])
for user in users:
assert user.name, user
self.assertIsInstance(user.name, (str, unicode))
self.assertIsInstance(user.terminal, (str, unicode, None))
if user.host is not None:
self.assertIsInstance(user.host, (str, unicode, None))
user.terminal
user.host
assert user.started > 0.0, user
datetime.datetime.fromtimestamp(user.started)
def test_cpu_stats(self):
# Tested more extensively in per-platform test modules.
infos = psutil.cpu_stats()
for name in infos._fields:
value = getattr(infos, name)
self.assertGreaterEqual(value, 0)
if name in ('ctx_switches', 'interrupts'):
self.assertGreater(value, 0)
@unittest.skipUnless(hasattr(psutil, "cpu_freq"),
"platform not suported")
def test_cpu_freq(self):
def check_ls(ls):
for nt in ls:
self.assertLessEqual(nt.current, nt.max)
for name in nt._fields:
value = getattr(nt, name)
self.assertIsInstance(value, (int, long, float))
self.assertGreaterEqual(value, 0)
ls = psutil.cpu_freq(percpu=True)
if TRAVIS and not ls:
return
assert ls, ls
check_ls([psutil.cpu_freq(percpu=False)])
if LINUX:
self.assertEqual(len(ls), psutil.cpu_count())
def test_os_constants(self):
names = ["POSIX", "WINDOWS", "LINUX", "OSX", "FREEBSD", "OPENBSD",
"NETBSD", "BSD", "SUNOS"]
for name in names:
self.assertIsInstance(getattr(psutil, name), bool, msg=name)
if os.name == 'posix':
assert psutil.POSIX
assert not psutil.WINDOWS
names.remove("POSIX")
if "linux" in sys.platform.lower():
assert psutil.LINUX
names.remove("LINUX")
elif "bsd" in sys.platform.lower():
assert psutil.BSD
self.assertEqual([psutil.FREEBSD, psutil.OPENBSD,
psutil.NETBSD].count(True), 1)
names.remove("BSD")
names.remove("FREEBSD")
names.remove("OPENBSD")
names.remove("NETBSD")
elif "sunos" in sys.platform.lower() or \
"solaris" in sys.platform.lower():
assert psutil.SUNOS
names.remove("SUNOS")
elif "darwin" in sys.platform.lower():
assert psutil.OSX
names.remove("OSX")
else:
assert psutil.WINDOWS
assert not psutil.POSIX
names.remove("WINDOWS")
# assert all other constants are set to False
for name in names:
self.assertIs(getattr(psutil, name), False, msg=name)
@unittest.skipUnless(hasattr(psutil, "sensors_temperatures"),
"platform not supported")
def test_sensors_temperatures(self):
temps = psutil.sensors_temperatures()
for name, entries in temps.items():
self.assertIsInstance(name, (str, unicode))
for entry in entries:
self.assertIsInstance(entry.label, (str, unicode))
if entry.current is not None:
self.assertGreaterEqual(entry.current, 0)
if entry.high is not None:
self.assertGreaterEqual(entry.high, 0)
if entry.critical is not None:
self.assertGreaterEqual(entry.critical, 0)
@unittest.skipUnless(hasattr(psutil, "sensors_battery"),
"platform not supported")
def test_sensors_battery(self):
ret = psutil.sensors_battery()
if ret is None:
return # no battery
self.assertGreaterEqual(ret.percent, 0)
self.assertLessEqual(ret.percent, 100)
if ret.secsleft not in (psutil.POWER_TIME_UNKNOWN,
psutil.POWER_TIME_UNLIMITED):
self.assertGreaterEqual(ret.secsleft, 0)
else:
if ret.secsleft == psutil.POWER_TIME_UNLIMITED:
self.assertTrue(ret.power_plugged)
self.assertIsInstance(ret.power_plugged, bool)
@unittest.skipUnless(hasattr(psutil, "sensors_fans"),
"platform not supported")
def test_sensors_fans(self):
fans = psutil.sensors_fans()
for name, entries in fans.items():
self.assertIsInstance(name, (str, unicode))
for entry in entries:
self.assertIsInstance(entry.label, (str, unicode))
self.assertIsInstance(entry.current, (int, long))
self.assertGreaterEqual(entry.current, 0)
if __name__ == '__main__':
run_test_module_by_name(__file__)
-815
View File
@@ -1,815 +0,0 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Windows specific tests."""
import errno
import glob
import os
import platform
import re
import signal
import subprocess
import sys
import time
try:
import win32api # requires "pip install pypiwin32" / "make setup-dev-env"
import win32con
import win32process
import wmi # requires "pip install wmi" / "make setup-dev-env"
except ImportError:
if os.name == 'nt':
raise
import psutil
from psutil import WINDOWS
from psutil._compat import basestring
from psutil._compat import callable
from psutil._compat import PY3
from psutil.tests import APPVEYOR
from psutil.tests import get_test_subprocess
from psutil.tests import mock
from psutil.tests import reap_children
from psutil.tests import retry_before_failing
from psutil.tests import run_test_module_by_name
from psutil.tests import unittest
cext = psutil._psplatform.cext
# are we a 64 bit process
IS_64_BIT = sys.maxsize > 2**32
def wrap_exceptions(fun):
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except OSError as err:
from psutil._pswindows import ACCESS_DENIED_SET
if err.errno in ACCESS_DENIED_SET:
raise psutil.AccessDenied(None, None)
if err.errno == errno.ESRCH:
raise psutil.NoSuchProcess(None, None)
raise
return wrapper
# ===================================================================
# System APIs
# ===================================================================
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class TestSystemAPIs(unittest.TestCase):
def test_nic_names(self):
p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
out = p.communicate()[0]
if PY3:
out = str(out, sys.stdout.encoding or sys.getfilesystemencoding())
nics = psutil.net_io_counters(pernic=True).keys()
for nic in nics:
if "pseudo-interface" in nic.replace(' ', '-').lower():
continue
if nic not in out:
self.fail(
"%r nic wasn't found in 'ipconfig /all' output" % nic)
@unittest.skipUnless('NUMBER_OF_PROCESSORS' in os.environ,
'NUMBER_OF_PROCESSORS env var is not available')
def test_cpu_count(self):
num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
self.assertEqual(num_cpus, psutil.cpu_count())
def test_cpu_count_2(self):
sys_value = win32api.GetSystemInfo()[5]
psutil_value = psutil.cpu_count()
self.assertEqual(sys_value, psutil_value)
def test_cpu_freq(self):
w = wmi.WMI()
proc = w.Win32_Processor()[0]
self.assertEqual(proc.CurrentClockSpeed, psutil.cpu_freq().current)
self.assertEqual(proc.MaxClockSpeed, psutil.cpu_freq().max)
def test_total_phymem(self):
w = wmi.WMI().Win32_ComputerSystem()[0]
self.assertEqual(int(w.TotalPhysicalMemory),
psutil.virtual_memory().total)
# @unittest.skipIf(wmi is None, "wmi module is not installed")
# def test__UPTIME(self):
# # _UPTIME constant is not public but it is used internally
# # as value to return for pid 0 creation time.
# # WMI behaves the same.
# w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
# p = psutil.Process(0)
# wmic_create = str(w.CreationDate.split('.')[0])
# psutil_create = time.strftime("%Y%m%d%H%M%S",
# time.localtime(p.create_time()))
# Note: this test is not very reliable
@unittest.skipIf(APPVEYOR, "test not relieable on appveyor")
@retry_before_failing()
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
w = wmi.WMI().Win32_Process()
wmi_pids = set([x.ProcessId for x in w])
psutil_pids = set(psutil.pids())
self.assertEqual(wmi_pids, psutil_pids)
@retry_before_failing()
def test_disks(self):
ps_parts = psutil.disk_partitions(all=True)
wmi_parts = wmi.WMI().Win32_LogicalDisk()
for ps_part in ps_parts:
for wmi_part in wmi_parts:
if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
if not ps_part.mountpoint:
# this is usually a CD-ROM with no disk inserted
break
try:
usage = psutil.disk_usage(ps_part.mountpoint)
except OSError as err:
if err.errno == errno.ENOENT:
# usually this is the floppy
break
else:
raise
self.assertEqual(usage.total, int(wmi_part.Size))
wmi_free = int(wmi_part.FreeSpace)
self.assertEqual(usage.free, wmi_free)
# 10 MB tollerance
if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
self.fail("psutil=%s, wmi=%s" % (
usage.free, wmi_free))
break
else:
self.fail("can't find partition %s" % repr(ps_part))
def test_disk_usage(self):
for disk in psutil.disk_partitions():
sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint)
psutil_value = psutil.disk_usage(disk.mountpoint)
self.assertAlmostEqual(sys_value[0], psutil_value.free,
delta=1024 * 1024)
self.assertAlmostEqual(sys_value[1], psutil_value.total,
delta=1024 * 1024)
self.assertEqual(psutil_value.used,
psutil_value.total - psutil_value.free)
def test_disk_partitions(self):
sys_value = [
x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00")
if x and not x.startswith('A:')]
psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)]
self.assertEqual(sys_value, psutil_value)
def test_net_if_stats(self):
ps_names = set(cext.net_if_stats())
wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
wmi_names = set()
for wmi_adapter in wmi_adapters:
wmi_names.add(wmi_adapter.Name)
wmi_names.add(wmi_adapter.NetConnectionID)
self.assertTrue(ps_names & wmi_names,
"no common entries in %s, %s" % (ps_names, wmi_names))
# ===================================================================
# sensors_battery()
# ===================================================================
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class TestSensorsBattery(unittest.TestCase):
def test_percent(self):
w = wmi.WMI()
battery_psutil = psutil.sensors_battery()
if battery_psutil is None:
with self.assertRaises(IndexError):
w.query('select * from Win32_Battery')[0]
else:
battery_wmi = w.query('select * from Win32_Battery')[0]
if battery_psutil is None:
self.assertNot(battery_wmi.EstimatedChargeRemaining)
return
self.assertAlmostEqual(
battery_psutil.percent, battery_wmi.EstimatedChargeRemaining,
delta=1)
self.assertEqual(
battery_psutil.power_plugged, battery_wmi.BatteryStatus == 1)
def test_battery_present(self):
if win32api.GetPwrCapabilities()['SystemBatteriesPresent']:
self.assertIsNotNone(psutil.sensors_battery())
else:
self.assertIsNone(psutil.sensors_battery())
def test_emulate_no_battery(self):
with mock.patch("psutil._pswindows.cext.sensors_battery",
return_value=(0, 128, 0, 0)) as m:
self.assertIsNone(psutil.sensors_battery())
assert m.called
def test_emulate_power_connected(self):
with mock.patch("psutil._pswindows.cext.sensors_battery",
return_value=(1, 0, 0, 0)) as m:
self.assertEqual(psutil.sensors_battery().secsleft,
psutil.POWER_TIME_UNLIMITED)
assert m.called
def test_emulate_power_charging(self):
with mock.patch("psutil._pswindows.cext.sensors_battery",
return_value=(0, 8, 0, 0)) as m:
self.assertEqual(psutil.sensors_battery().secsleft,
psutil.POWER_TIME_UNLIMITED)
assert m.called
def test_emulate_secs_left_unknown(self):
with mock.patch("psutil._pswindows.cext.sensors_battery",
return_value=(0, 0, 0, -1)) as m:
self.assertEqual(psutil.sensors_battery().secsleft,
psutil.POWER_TIME_UNKNOWN)
assert m.called
# ===================================================================
# Process APIs
# ===================================================================
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class TestProcess(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.pid = get_test_subprocess().pid
@classmethod
def tearDownClass(cls):
reap_children()
def test_issue_24(self):
p = psutil.Process(0)
self.assertRaises(psutil.AccessDenied, p.kill)
def test_special_pid(self):
p = psutil.Process(4)
self.assertEqual(p.name(), 'System')
# use __str__ to access all common Process properties to check
# that nothing strange happens
str(p)
p.username()
self.assertTrue(p.create_time() >= 0.0)
try:
rss, vms = p.memory_info()[:2]
except psutil.AccessDenied:
# expected on Windows Vista and Windows 7
if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
raise
else:
self.assertTrue(rss > 0)
def test_send_signal(self):
p = psutil.Process(self.pid)
self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
def test_exe(self):
for p in psutil.process_iter():
try:
self.assertEqual(os.path.basename(p.exe()), p.name())
except psutil.Error:
pass
def test_num_handles_increment(self):
p = psutil.Process(os.getpid())
before = p.num_handles()
handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
win32con.FALSE, os.getpid())
after = p.num_handles()
self.assertEqual(after, before + 1)
win32api.CloseHandle(handle)
self.assertEqual(p.num_handles(), before)
def test_handles_leak(self):
# Call all Process methods and make sure no handles are left
# open. This is here mainly to make sure functions using
# OpenProcess() always call CloseHandle().
def call(p, attr):
attr = getattr(p, name, None)
if attr is not None and callable(attr):
attr()
else:
attr
p = psutil.Process(self.pid)
failures = []
for name in dir(psutil.Process):
if name.startswith('_') \
or name in ('terminate', 'kill', 'suspend', 'resume',
'nice', 'send_signal', 'wait', 'children',
'as_dict'):
continue
else:
try:
call(p, name)
num1 = p.num_handles()
call(p, name)
num2 = p.num_handles()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
else:
if num2 > num1:
fail = \
"failure while processing Process.%s method " \
"(before=%s, after=%s)" % (name, num1, num2)
failures.append(fail)
if failures:
self.fail('\n' + '\n'.join(failures))
def test_name_always_available(self):
# On Windows name() is never supposed to raise AccessDenied,
# see https://github.com/giampaolo/psutil/issues/627
for p in psutil.process_iter():
try:
p.name()
except psutil.NoSuchProcess:
pass
@unittest.skipUnless(sys.version_info >= (2, 7),
"CTRL_* signals not supported")
def test_ctrl_signals(self):
p = psutil.Process(get_test_subprocess().pid)
p.send_signal(signal.CTRL_C_EVENT)
p.send_signal(signal.CTRL_BREAK_EVENT)
p.kill()
p.wait()
self.assertRaises(psutil.NoSuchProcess,
p.send_signal, signal.CTRL_C_EVENT)
self.assertRaises(psutil.NoSuchProcess,
p.send_signal, signal.CTRL_BREAK_EVENT)
def test_compare_name_exe(self):
for p in psutil.process_iter():
try:
a = os.path.basename(p.exe())
b = p.name()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
else:
self.assertEqual(a, b)
def test_username(self):
sys_value = win32api.GetUserName()
psutil_value = psutil.Process().username()
self.assertEqual(sys_value, psutil_value.split('\\')[1])
def test_cmdline(self):
sys_value = re.sub(' +', ' ', win32api.GetCommandLine()).strip()
psutil_value = ' '.join(psutil.Process().cmdline())
self.assertEqual(sys_value, psutil_value)
# XXX - occasional failures
# def test_cpu_times(self):
# handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
# win32con.FALSE, os.getpid())
# self.addCleanup(win32api.CloseHandle, handle)
# sys_value = win32process.GetProcessTimes(handle)
# psutil_value = psutil.Process().cpu_times()
# self.assertAlmostEqual(
# psutil_value.user, sys_value['UserTime'] / 10000000.0,
# delta=0.2)
# self.assertAlmostEqual(
# psutil_value.user, sys_value['KernelTime'] / 10000000.0,
# delta=0.2)
def test_nice(self):
handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
win32con.FALSE, os.getpid())
self.addCleanup(win32api.CloseHandle, handle)
sys_value = win32process.GetPriorityClass(handle)
psutil_value = psutil.Process().nice()
self.assertEqual(psutil_value, sys_value)
def test_memory_info(self):
handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
win32con.FALSE, self.pid)
self.addCleanup(win32api.CloseHandle, handle)
sys_value = win32process.GetProcessMemoryInfo(handle)
psutil_value = psutil.Process(self.pid).memory_info()
self.assertEqual(
sys_value['PeakWorkingSetSize'], psutil_value.peak_wset)
self.assertEqual(
sys_value['WorkingSetSize'], psutil_value.wset)
self.assertEqual(
sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool)
self.assertEqual(
sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool)
self.assertEqual(
sys_value['QuotaPeakNonPagedPoolUsage'],
psutil_value.peak_nonpaged_pool)
self.assertEqual(
sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool)
self.assertEqual(
sys_value['PagefileUsage'], psutil_value.pagefile)
self.assertEqual(
sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile)
self.assertEqual(psutil_value.rss, psutil_value.wset)
self.assertEqual(psutil_value.vms, psutil_value.pagefile)
def test_wait(self):
handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
win32con.FALSE, self.pid)
self.addCleanup(win32api.CloseHandle, handle)
p = psutil.Process(self.pid)
p.terminate()
psutil_value = p.wait()
sys_value = win32process.GetExitCodeProcess(handle)
self.assertEqual(psutil_value, sys_value)
def test_cpu_affinity(self):
def from_bitmask(x):
return [i for i in range(64) if (1 << i) & x]
handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
win32con.FALSE, self.pid)
self.addCleanup(win32api.CloseHandle, handle)
sys_value = from_bitmask(
win32process.GetProcessAffinityMask(handle)[0])
psutil_value = psutil.Process(self.pid).cpu_affinity()
self.assertEqual(psutil_value, sys_value)
def test_io_counters(self):
handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
win32con.FALSE, os.getpid())
self.addCleanup(win32api.CloseHandle, handle)
sys_value = win32process.GetProcessIoCounters(handle)
psutil_value = psutil.Process().io_counters()
self.assertEqual(
psutil_value.read_count, sys_value['ReadOperationCount'])
self.assertEqual(
psutil_value.write_count, sys_value['WriteOperationCount'])
self.assertEqual(
psutil_value.read_bytes, sys_value['ReadTransferCount'])
self.assertEqual(
psutil_value.write_bytes, sys_value['WriteTransferCount'])
self.assertEqual(
psutil_value.other_count, sys_value['OtherOperationCount'])
self.assertEqual(
psutil_value.other_bytes, sys_value['OtherTransferCount'])
def test_num_handles(self):
import ctypes
import ctypes.wintypes
PROCESS_QUERY_INFORMATION = 0x400
handle = ctypes.windll.kernel32.OpenProcess(
PROCESS_QUERY_INFORMATION, 0, os.getpid())
self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle)
hndcnt = ctypes.wintypes.DWORD()
ctypes.windll.kernel32.GetProcessHandleCount(
handle, ctypes.byref(hndcnt))
sys_value = hndcnt.value
psutil_value = psutil.Process().num_handles()
ctypes.windll.kernel32.CloseHandle(handle)
self.assertEqual(psutil_value, sys_value + 1)
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class TestProcessWMI(unittest.TestCase):
"""Compare Process API results with WMI."""
@classmethod
def setUpClass(cls):
cls.pid = get_test_subprocess().pid
@classmethod
def tearDownClass(cls):
reap_children()
def test_name(self):
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
p = psutil.Process(self.pid)
self.assertEqual(p.name(), w.Caption)
def test_exe(self):
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
p = psutil.Process(self.pid)
# Note: wmi reports the exe as a lower case string.
# Being Windows paths case-insensitive we ignore that.
self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
def test_cmdline(self):
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
p = psutil.Process(self.pid)
self.assertEqual(' '.join(p.cmdline()),
w.CommandLine.replace('"', ''))
def test_username(self):
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
p = psutil.Process(self.pid)
domain, _, username = w.GetOwner()
username = "%s\\%s" % (domain, username)
self.assertEqual(p.username(), username)
def test_memory_rss(self):
time.sleep(0.1)
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
p = psutil.Process(self.pid)
rss = p.memory_info().rss
self.assertEqual(rss, int(w.WorkingSetSize))
def test_memory_vms(self):
time.sleep(0.1)
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
p = psutil.Process(self.pid)
vms = p.memory_info().vms
# http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
# ...claims that PageFileUsage is represented in Kilo
# bytes but funnily enough on certain platforms bytes are
# returned instead.
wmi_usage = int(w.PageFileUsage)
if (vms != wmi_usage) and (vms != wmi_usage * 1024):
self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
def test_create_time(self):
w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
p = psutil.Process(self.pid)
wmic_create = str(w.CreationDate.split('.')[0])
psutil_create = time.strftime("%Y%m%d%H%M%S",
time.localtime(p.create_time()))
self.assertEqual(wmic_create, psutil_create)
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class TestDualProcessImplementation(unittest.TestCase):
"""
Certain APIs on Windows have 2 internal implementations, one
based on documented Windows APIs, another one based
NtQuerySystemInformation() which gets called as fallback in
case the first fails because of limited permission error.
Here we test that the two methods return the exact same value,
see:
https://github.com/giampaolo/psutil/issues/304
"""
@classmethod
def setUpClass(cls):
cls.pid = get_test_subprocess().pid
@classmethod
def tearDownClass(cls):
reap_children()
# ---
# same tests as above but mimicks the AccessDenied failure of
# the first (fast) method failing with AD.
def test_name(self):
name = psutil.Process(self.pid).name()
with mock.patch("psutil._psplatform.cext.proc_exe",
side_effect=psutil.AccessDenied(os.getpid())) as fun:
self.assertEqual(psutil.Process(self.pid).name(), name)
assert fun.called
def test_memory_info(self):
mem_1 = psutil.Process(self.pid).memory_info()
with mock.patch("psutil._psplatform.cext.proc_memory_info",
side_effect=OSError(errno.EPERM, "msg")) as fun:
mem_2 = psutil.Process(self.pid).memory_info()
self.assertEqual(len(mem_1), len(mem_2))
for i in range(len(mem_1)):
self.assertGreaterEqual(mem_1[i], 0)
self.assertGreaterEqual(mem_2[i], 0)
self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512)
assert fun.called
def test_create_time(self):
ctime = psutil.Process(self.pid).create_time()
with mock.patch("psutil._psplatform.cext.proc_create_time",
side_effect=OSError(errno.EPERM, "msg")) as fun:
self.assertEqual(psutil.Process(self.pid).create_time(), ctime)
assert fun.called
def test_cpu_times(self):
cpu_times_1 = psutil.Process(self.pid).cpu_times()
with mock.patch("psutil._psplatform.cext.proc_cpu_times",
side_effect=OSError(errno.EPERM, "msg")) as fun:
cpu_times_2 = psutil.Process(self.pid).cpu_times()
assert fun.called
self.assertAlmostEqual(
cpu_times_1.user, cpu_times_2.user, delta=0.01)
self.assertAlmostEqual(
cpu_times_1.system, cpu_times_2.system, delta=0.01)
def test_io_counters(self):
io_counters_1 = psutil.Process(self.pid).io_counters()
with mock.patch("psutil._psplatform.cext.proc_io_counters",
side_effect=OSError(errno.EPERM, "msg")) as fun:
io_counters_2 = psutil.Process(self.pid).io_counters()
for i in range(len(io_counters_1)):
self.assertAlmostEqual(
io_counters_1[i], io_counters_2[i], delta=5)
assert fun.called
def test_num_handles(self):
num_handles = psutil.Process(self.pid).num_handles()
with mock.patch("psutil._psplatform.cext.proc_num_handles",
side_effect=OSError(errno.EPERM, "msg")) as fun:
psutil.Process(self.pid).num_handles() == num_handles
assert fun.called
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class RemoteProcessTestCase(unittest.TestCase):
"""Certain functions require calling ReadProcessMemory.
This trivially works when called on the current process.
Check that this works on other processes, especially when they
have a different bitness.
"""
@staticmethod
def find_other_interpreter():
# find a python interpreter that is of the opposite bitness from us
code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))"
# XXX: a different and probably more stable approach might be to access
# the registry but accessing 64 bit paths from a 32 bit process
for filename in glob.glob(r"C:\Python*\python.exe"):
proc = subprocess.Popen(args=[filename, "-c", code],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output, _ = proc.communicate()
if output == str(not IS_64_BIT):
return filename
@classmethod
def setUpClass(cls):
other_python = cls.find_other_interpreter()
if other_python is None:
raise unittest.SkipTest(
"could not find interpreter with opposite bitness")
if IS_64_BIT:
cls.python64 = sys.executable
cls.python32 = other_python
else:
cls.python64 = other_python
cls.python32 = sys.executable
test_args = ["-c", "import sys; sys.stdin.read()"]
def setUp(self):
env = os.environ.copy()
env["THINK_OF_A_NUMBER"] = str(os.getpid())
self.proc32 = get_test_subprocess([self.python32] + self.test_args,
env=env,
stdin=subprocess.PIPE)
self.proc64 = get_test_subprocess([self.python64] + self.test_args,
env=env,
stdin=subprocess.PIPE)
def tearDown(self):
self.proc32.communicate()
self.proc64.communicate()
reap_children()
@classmethod
def tearDownClass(cls):
reap_children()
def test_cmdline_32(self):
p = psutil.Process(self.proc32.pid)
self.assertEqual(len(p.cmdline()), 3)
self.assertEqual(p.cmdline()[1:], self.test_args)
def test_cmdline_64(self):
p = psutil.Process(self.proc64.pid)
self.assertEqual(len(p.cmdline()), 3)
self.assertEqual(p.cmdline()[1:], self.test_args)
def test_cwd_32(self):
p = psutil.Process(self.proc32.pid)
self.assertEqual(p.cwd(), os.getcwd())
def test_cwd_64(self):
p = psutil.Process(self.proc64.pid)
self.assertEqual(p.cwd(), os.getcwd())
def test_environ_32(self):
p = psutil.Process(self.proc32.pid)
e = p.environ()
self.assertIn("THINK_OF_A_NUMBER", e)
self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid()))
def test_environ_64(self):
p = psutil.Process(self.proc64.pid)
e = p.environ()
self.assertIn("THINK_OF_A_NUMBER", e)
self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid()))
# ===================================================================
# Windows services
# ===================================================================
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class TestServices(unittest.TestCase):
def test_win_service_iter(self):
valid_statuses = set([
"running",
"paused",
"start",
"pause",
"continue",
"stop",
"stopped",
])
valid_start_types = set([
"automatic",
"manual",
"disabled",
])
valid_statuses = set([
"running",
"paused",
"start_pending",
"pause_pending",
"continue_pending",
"stop_pending",
"stopped"
])
for serv in psutil.win_service_iter():
data = serv.as_dict()
self.assertIsInstance(data['name'], basestring)
self.assertNotEqual(data['name'].strip(), "")
self.assertIsInstance(data['display_name'], basestring)
self.assertIsInstance(data['username'], basestring)
self.assertIn(data['status'], valid_statuses)
if data['pid'] is not None:
psutil.Process(data['pid'])
self.assertIsInstance(data['binpath'], basestring)
self.assertIsInstance(data['username'], basestring)
self.assertIsInstance(data['start_type'], basestring)
self.assertIn(data['start_type'], valid_start_types)
self.assertIn(data['status'], valid_statuses)
self.assertIsInstance(data['description'], basestring)
pid = serv.pid()
if pid is not None:
p = psutil.Process(pid)
self.assertTrue(p.is_running())
# win_service_get
s = psutil.win_service_get(serv.name())
# test __eq__
self.assertEqual(serv, s)
def test_win_service_get(self):
name = next(psutil.win_service_iter()).name()
with self.assertRaises(psutil.NoSuchProcess) as cm:
psutil.win_service_get(name + '???')
self.assertEqual(cm.exception.name, name + '???')
# test NoSuchProcess
service = psutil.win_service_get(name)
exc = WindowsError(
psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST, "")
with mock.patch("psutil._psplatform.cext.winservice_query_status",
side_effect=exc):
self.assertRaises(psutil.NoSuchProcess, service.status)
with mock.patch("psutil._psplatform.cext.winservice_query_config",
side_effect=exc):
self.assertRaises(psutil.NoSuchProcess, service.username)
# test AccessDenied
exc = WindowsError(
psutil._psplatform.cext.ERROR_ACCESS_DENIED, "")
with mock.patch("psutil._psplatform.cext.winservice_query_status",
side_effect=exc):
self.assertRaises(psutil.AccessDenied, service.status)
with mock.patch("psutil._psplatform.cext.winservice_query_config",
side_effect=exc):
self.assertRaises(psutil.AccessDenied, service.username)
# test __str__ and __repr__
self.assertIn(service.name(), str(service))
self.assertIn(service.display_name(), str(service))
self.assertIn(service.name(), repr(service))
self.assertIn(service.display_name(), repr(service))
if __name__ == '__main__':
run_test_module_by_name(__file__)