Vendorized applib. Will likely use many of it's helpers.

This commit is contained in:
Kenneth Reitz
2011-03-18 17:34:56 -04:00
parent a54fedd0fb
commit bec67c0457
16 changed files with 3765 additions and 0 deletions
+4
View File
@@ -0,0 +1,4 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
__version_info__ = ('1', '2')
__version__ = '.'.join(__version_info__) # + '.dev'
File diff suppressed because it is too large Load Diff
+213
View File
@@ -0,0 +1,213 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
import sys
import os
from os import path
import tarfile
import zipfile
from contextlib import closing
from applib import sh
__all__ = ['implementors']
class CompressedFile:
def __init__(self, filename):
self.filename = filename
def extractall_with_single_toplevel(self, f, names):
"""Same as ``extractall`` but ensures a single toplevel directory
Some compressed archives do not stick to the convension of having a
single top-level directory. For eg.,
http://code.google.com/p/grapefruit/issues/detail?id=3
In such cases, a new toplevel directory corresponding to the name of the
compressed file (eg: 'grapefruit-0.1a3' if compressed file is named
'grapefruit-0.1a3.tar.gz') is created and then extraction happens
*inside* that directory.
- f: tarfile/zipefile file object
- names: List of filenames in the archive
Return the absolute path to the toplevel directory.
"""
toplevels = _find_top_level_directories(names, sep='/')
if len(toplevels) == 0:
raise sh.PackError('archive is empty')
elif len(toplevels) > 1:
toplevel = _archive_basename(self.filename)
os.mkdir(toplevel)
with sh.cd(toplevel):
f.extractall()
return path.abspath(toplevel)
else:
f.extractall()
toplevel = path.abspath(toplevels[0])
assert path.exists(toplevel)
if not path.isdir(toplevel):
# eg: http://pypi.python.org/pypi/DeferArgs/0.4
raise SingleFile('archive has a single file: %s', toplevel)
return toplevel
class ZippedFile(CompressedFile):
"""A zip file"""
@staticmethod
def is_valid(filename):
return zipfile.is_zipfile(filename)
def extract(self):
try:
f = zipfile.ZipFile(self.filename, 'r')
try:
return self.extractall_with_single_toplevel(
f, f.namelist())
except OSError as e:
if e.errno == 17:
# http://bugs.python.org/issue6510
raise sh.PackError(e)
# http://bugs.python.org/issue6609
if sys.platform.startswith('win'):
if isinstance(e, WindowsError) and e.winerror == 267:
raise sh.PackError('uses Windows special name (%s)' % e)
raise
except IOError as e:
# http://bugs.python.org/issue10447
if sys.platform == 'win32' and e.errno == 2:
raise sh.PackError('reached max path-length: %s' % e)
raise
finally:
f.close()
except (zipfile.BadZipfile, zipfile.LargeZipFile) as e:
raise sh.PackError(e)
@classmethod
def pack(cls, paths, file):
raise NotImplementedError('pack: zip files not supported yet')
class TarredFile(CompressedFile):
"""A tar.gz/bz2 file"""
@classmethod
def is_valid(cls, filename):
try:
with closing(tarfile.open(filename, cls._get_mode())) as f:
return True
except tarfile.TarError:
return False
def extract(self):
try:
f = tarfile.open(self.filename, self._get_mode())
try:
_ensure_read_write_access(f)
return self.extractall_with_single_toplevel(
f, f.getnames())
finally:
f.close()
except tarfile.TarError as e:
raise sh.PackError(e)
except IOError as e:
# see http://bugs.python.org/issue6584
if 'CRC check failed' in str(e):
raise sh.PackError(e)
# See github issue #10
elif e.errno == 22 and "invalid mode ('wb')" in str(e):
raise sh.PackError(e)
else:
raise
@classmethod
def pack(cls, paths, file):
f = tarfile.open(file, cls._get_mode('w'))
try:
for pth in paths:
assert path.exists(pth), '"%s" does not exist' % path
f.add(pth)
finally:
f.close()
def _get_mode(self):
"""Return the mode for this tarfile"""
raise NotImplementedError()
class GzipTarredFile(TarredFile):
"""A tar.gz2 file"""
@staticmethod
def _get_mode(mode='r'):
assert mode in ['r', 'w']
return mode + ':gz'
class Bzip2TarredFile(TarredFile):
"""A tar.gz2 file"""
@staticmethod
def _get_mode(mode='r'):
assert mode in ['r', 'w']
return mode + ':bz2'
implementors = dict(
zip = ZippedFile,
tgz = GzipTarredFile,
bz2 = Bzip2TarredFile)
class MultipleTopLevels(sh.PackError):
"""Can be extracted, but contains multiple top-level dirs"""
class SingleFile(sh.PackError):
"""Contains nothing but a single file. Compressed archived is expected to
contain one directory
"""
def _ensure_read_write_access(tarfileobj):
"""Ensure that the given tarfile will be readable and writable by the
user (the client program using this API) after extraction.
Some tarballs have u-x set on directories or u-w on files. We reset such
perms here.. so that the extracted files remain accessible for reading
and deletion as per the user's wish.
See also: http://bugs.python.org/issue6196
"""
dir_perm = tarfile.TUREAD | tarfile.TUWRITE | tarfile.TUEXEC
file_perm = tarfile.TUREAD | tarfile.TUWRITE
for tarinfo in tarfileobj.getmembers():
tarinfo.mode |= (dir_perm if tarinfo.isdir() else file_perm)
def _find_top_level_directories(fileslist, sep):
"""Find the distinct first components in the fileslist"""
toplevels = set()
for pth in fileslist:
firstcomponent = pth.split(sep, 1)[0]
toplevels.add(firstcomponent)
return list(toplevels)
def _archive_basename(filename):
"""Return a suitable base directory name for the given archive"""
exts = (
'.tar.gz',
'.tgz',
'.tar.bz2',
'.bz2',
'.zip')
filename = path.basename(filename)
for ext in exts:
if filename.endswith(ext):
return filename[:-len(ext)]
return filename + '.dir'
+155
View File
@@ -0,0 +1,155 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
"""Process execution wrappers
"""
from __future__ import unicode_literals
import os
import sys
import time
import subprocess
from tempfile import TemporaryFile
import warnings
from applib.misc import xjoin
from applib.misc import safe_unicode
__all__ = ['run', 'RunError', 'RunNonZeroReturn', 'RunTimedout']
warnings.filterwarnings('ignore', message='.*With\-statements.*',
category=DeprecationWarning)
class RunError(Exception):
def __init__(self, cmd, stdout, stderr, errors):
self.stdout = stdout
self.stderr = stderr
msg = errors[:]
msg.extend([
'command: {0}'.format(safe_unicode(cmd)),
'pwd: {0}'.format(xjoin(os.getcwd()))])
if stderr is None:
msg.append(
'OUTPUT:\n{0}'.format(_limit_str(safe_unicode(stdout))))
else:
msg.extend([
'STDERR:\n{0}'.format(_limit_str(safe_unicode(stderr))),
'STDOUT:\n{0}'.format(_limit_str(safe_unicode(stdout)))])
super(RunError, self).__init__('\n'.join(msg))
class RunNonZeroReturn(RunError):
"""The command returned non-zero exit code"""
def __init__(self, p, cmd, stdout, stderr):
super(RunNonZeroReturn, self).__init__(cmd, stdout, stderr, [
'non-zero returncode: {0}'.format(p.returncode)
])
class RunTimedout(RunError):
"""process is taking too much time"""
def __init__(self, cmd, timeout, stdout, stderr):
super(RunTimedout, self).__init__(cmd, stdout, stderr, [
'timed out; ergo process is terminated',
'seconds elapsed: {0}'.format(timeout),
])
# TODO: support for incremental results (sometimes a process run for a few
# minutes, but we need to send the stdout as soon as it appears.
def run(cmd, merge_streams=False, timeout=None, env=None):
"""Improved replacement for commands.getoutput()
The following features are implemented:
- timeout (in seconds)
- support for merged streams (stdout+stderr together)
`cmd` can be a full command string, or list of prog/args.
Note that returned data is of *undecoded* str/bytes type (not unicode)
Return (stdout, stderr)
"""
if isinstance(cmd, (list, tuple)):
shell = False
else:
shell = True
# Fix for cmd.exe quote issue. See comment #3 and #4 in
# http://firefly.activestate.com/sridharr/pypm/ticket/126#comment:3
if sys.platform.startswith('win') and cmd.startswith('"'):
cmd = '"{0}"'.format(cmd)
# redirect stdout and stderr to temporary *files*
with TemporaryFile() as outf:
with TemporaryFile() as errf:
p = subprocess.Popen(cmd, env=env, shell=shell, stdout=outf,
stderr=outf if merge_streams else errf)
if timeout is None:
p.wait()
else:
# poll for terminated status till timeout is reached
t_nought = time.time()
seconds_passed = 0
while True:
if p.poll() is not None:
break
seconds_passed = time.time() - t_nought
if timeout and seconds_passed > timeout:
p.terminate()
raise RunTimedout(
cmd, timeout,
_read_tmpfd(outf),
None if merge_streams else _read_tmpfd(errf))
time.sleep(0.1)
# the process has exited by now; nothing will to be written to
# outfd/errfd anymore.
stdout = _read_tmpfd(outf)
stderr = _read_tmpfd(errf)
if p.returncode != 0:
raise RunNonZeroReturn(p, cmd, stdout, None if merge_streams else stderr)
else:
return stdout, stderr
def _read_tmpfd(fil):
"""Read from a temporary file object
Call this method only when nothing more will be written to the temporary
file - i.e., all the writing has already been done.
"""
fil.seek(0)
return fil.read()
def _limit_str(s, maxchars=80*15):
if len(s) > maxchars:
return '[...]\n' + s[-maxchars:]
return s
def _disable_windows_error_popup():
"""Set error mode to disable Windows error popup
This setting is effective for current process and all the child processes
"""
# disable nasty critical error pop-ups on Windows
import win32api, win32con
win32api.SetErrorMode(win32con.SEM_FAILCRITICALERRORS |
win32con.SEM_NOOPENFILEERRORBOX)
if sys.platform.startswith('win'):
try:
import win32api
except ImportError:
pass # XXX: this means, you will get annoying popups
else:
_disable_windows_error_popup()
+269
View File
@@ -0,0 +1,269 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
"""Simple wrapper around SQLalchemy
This module hides the complexity of SQLAlchemy to provide a simple interface to
store and manipulate Python objects each with a set of properties. Unlike the
default behaviour of sqlalchemy's declaritive_base, inheritance of objects will
not require "join", rather it creates a separate table. This makes it easy to
use objects around from parts of not-so-related applications.
For example, a ``SourcePackage`` table is created by Grail. Then, PyPM will
extend it as ``BinaryPackage`` which gets extended to ``RepoPackage``. The table
for RepoPackage will be concretely inherited, meaning - there will be just be
one table without having to 'join' to another SourcePackage table.
At the moment, PyPM and Grail use this module. It may not be of use to others,
and we may change the api/behaviour. Hence, it makes sense to keep it as an
internal module.
"""
import sys
import os
from os.path import exists, dirname
from contextlib import contextmanager
import json
from sqlalchemy import Table, Column, MetaData
from sqlalchemy import create_engine
from sqlalchemy.types import String, Text, Boolean, PickleType
from sqlalchemy.orm import sessionmaker, scoped_session, mapper
# A PickleType that will work on both Python 2.x and 3.x
# i.e., if you *write* to a DB entry using Python 3.x, we are letting
# Python 3.x apps to read from it as well.
# WARNING: Ideally, if you are starting a new project, please
# use something else like JSON. See
# http://twitter.com/zzzeek/status/9765871731867648
Pickle2Type = PickleType(protocol=2)
def setup(db_class, simple_object_cls, primary_keys):
"""A simple API to configure the metadata"""
table_name = simple_object_cls.__name__
column_names = simple_object_cls.FIELDS
metadata = MetaData()
table = Table(table_name, metadata,
*[Column(cname, _get_best_column_type(cname),
primary_key=cname in primary_keys)
for cname in column_names])
db_class.metadata = metadata
db_class.mapper_class = simple_object_cls
db_class.table = table
mapper(simple_object_cls, table)
def sqlalchemy_escape(val, escape_char, special_chars):
"""Escape a string according for use in LIKE operator
>>> sqlalchemy_escape("text_table", "\\", "%_")
'text\_table'
"""
if sys.version_info[:2] >= (3, 0):
assert isinstance(val, str)
else:
assert isinstance(val, basestring)
result = []
for c in val:
if c in special_chars + escape_char:
result.extend(escape_char + c)
else:
result.extend(c)
return ''.join(result)
class SimpleDatabase(object):
metadata = None # to be set up derived classes
class DoesNotExist(IOError):
def __init__(self, path):
super(IOError, self).__init__(
'database file %s does not exist' % path)
def __init__(self, path, touch=False):
"""
touch - create database, if it does not exist
"""
self.path = path
sqlite_uri = 'sqlite:///%s' % self.path
self.engine = create_engine(sqlite_uri, echo=False)
self.create_session = sessionmaker(
bind=self.engine,
autocommit=False,
# See the comment by Michael Bayer
# http://groups.google.com/group/sqlalchemy/browse_thread/thread/7c1eb642435adde7
# expire_on_commit=False
)
self.create_scoped_session = scoped_session(self.create_session)
if not exists(self.path):
if touch:
assert exists(dirname(self.path)), 'missing: ' + dirname(self.path)
self.metadata.create_all(self.engine)
else:
raise self.DoesNotExist(path)
def reset(self):
"""Reset the database
Drop all tables and recreate them
"""
self.metadata.drop_all(self.engine)
self.metadata.create_all(self.engine)
def close(self):
self.engine.dispose()
@contextmanager
def transaction(self, session=None):
"""Start a new transaction based on the passed session object. If session
is not passed, then create one and make sure of closing it finally.
"""
local_session = None
if session is None:
local_session = session = self.create_scoped_session()
try:
yield session
finally:
# Since ``local_session`` was created locally, close it here itself
if local_session is not None:
# but wait!
# http://groups.google.com/group/sqlalchemy/browse_thread/thread/7c1eb642435adde7
# To workaround this issue with sqlalchemy, we can either:
# 1) pass the session object explicitly
# 2) do not close the session at all (bad idea - could lead to memory leaks)
#
# Till pypm implements atomic transations in client.installer,
# we retain this hack (i.e., we choose (2) for now)
pass # local_session.close()
def __str__(self):
return '{0.__class__.__name__}<{0.path}>'.format(self)
class SimpleObject(object):
"""Object with a collection of fields.
The following features are supported:
1) Automatically initialize the fields in __init__
2) Inherit and extend with additional fields
2) Ability to convert from other object types (with extra/less fields)
3) Interoperate with sqlalchemy.orm (i.e., plain `self.foo=value` works)
"""
# Public fields in this object
FIELDS = []
def __init__(self, **kwargs):
"""Initialize the object with FIELDS whose values are in ``kwargs``"""
self.__assert_field_mapping(kwargs)
for field in self.FIELDS:
setattr(self, field, kwargs[field])
@classmethod
def create_from(cls, another, **kwargs):
"""Create from another object of different type.
Another object must be from a derived class of SimpleObject (which
contains FIELDS)
"""
reused_fields = {}
for field, value in another.get_fields():
if field in cls.FIELDS:
reused_fields[field] = value
reused_fields.update(kwargs)
return cls(**reused_fields)
def get_fields(self):
"""Return fields as a list of (name,value)"""
for field in self.FIELDS:
yield field, getattr(self, field)
def to_dict(self):
return dict(self.get_fields())
def to_json(self):
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_string):
values = json.loads(json_string)
return cls(**_remove_unicode_keys(values))
def __assert_field_mapping(self, mapping):
"""Assert that mapping.keys() == FIELDS.
The programmer is not supposed to pass extra/less number of fields
"""
passed_keys = set(mapping.keys())
class_fields = set(self.FIELDS)
if passed_keys != class_fields:
raise ValueError('\n'.join([
"{0} got different fields from expected".format(
self.__class__),
" got : {0}".format(list(sorted(passed_keys))),
" expected: {0}".format(list(sorted(class_fields)))]))
class _get_best_column_type():
"""Return the best column type for the given name."""
mapping = dict(
name = String,
version = String,
keywords = String,
home_page = String,
license = String,
author = String,
author_email = String,
maintainer = String,
maintainer_email = String,
osarch = String,
pyver = String,
pkg_version = String,
relpath = String,
tags = String,
original_source = String,
patched_source = String,
summary = Text,
description = Text,
python3 = Boolean,
metadata_hash = String,
install_requires = Pickle2Type,
files_list = Pickle2Type,
)
def __call__(self, name):
try:
return self.mapping[name]
except KeyError:
raise KeyError(
'missing key. add type for "{0}" in self.mapping'.format(
name))
_get_best_column_type = _get_best_column_type()
def _remove_unicode_keys(dictobj):
"""Convert keys from 'unicode' to 'str' type.
workaround for <http://bugs.python.org/issue2646>
"""
if sys.version_info[:2] >= (3, 0): return dictobj
assert isinstance(dictobj, dict)
newdict = {}
for key, value in dictobj.items():
if type(key) is unicode:
key = key.encode('utf-8')
newdict[key] = value
return newdict
+70
View File
@@ -0,0 +1,70 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
"""Base module"""
import sys
from os.path import abspath, join, expanduser
import logging
from appdirs import AppDirs
from applib import location, log
from applib.log import LogawareCmdln as Cmdln
__all__ = ['Application', 'Cmdln']
class Application(object):
"""Object representing the application
- name: Name of the application
- company: Company developing the application
- compatibility_version: The major version which promises
backward-compatability among all of its minor
versions. Eg: 5.2; 5.2.1, 5.2.2, etc.. should
use the same compatability version (5.2). This
value is used in the settings directory path.
- locations: An object holding a set of OS-specific but
generic location values (eg: APPDATA). See
``Locations`` class for details.
"""
def __init__(self, name, company, compatibility_version=None):
self.name = name
self.company = company
self.compatibility_version = compatibility_version
self.locations = AppDirs2(
name, company, compatibility_version, roaming=False)
def run(self, cmdln_class):
"""Run the application using the given cmdln processor.
This method also ensures configuration of logging handlers for console
"""
assert issubclass(cmdln_class, Cmdln)
l = logging.getLogger('')
log.setup_trace(l, self.locations.log_file_path)
cmdln_class(install_console=True).main()
class AppDirs2(AppDirs):
@property
def log_file_path(self):
if sys.platform in ('win32', 'darwin'):
name = self.appname + '.log'
else:
name = self.appname.lower() + '.log'
return join(self.user_log_dir, name)
if __name__ == '__main__':
# self-test code
app = Application('PyPM', 'ActiveState', '0.1')
print('user_data_dir', app.locations.user_data_dir)
print('site_data_dir', app.locations.site_data_dir)
print('user_cache_dir', app.locations.user_cache_dir)
print('log_file_path', app.locations.log_file_path)
+14
View File
@@ -0,0 +1,14 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
# This module is deprecated
from appdirs import *
#---- self test code
if __name__ == "__main__":
print("applib: user data dir: %s" % user_data_dir("Komodo", "ActiveState"))
print("applib: site data dir: %s" % site_data_dir("Komodo", "ActiveState"))
print("applib: user cache dir: %s" % user_cache_dir("Komodo", "ActiveState"))
+336
View File
@@ -0,0 +1,336 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
"""Logging utilities and console integration
Don't use print, use ``LOG.info``. This ensures seemless integration with
application logging.
"""
import sys
import os
import stat
from os.path import expanduser, join, exists, isabs, dirname
import logging
from datetime import datetime
from contextlib import contextmanager
from applib import sh, textui, _cmdln as cmdln
if sys.hexversion > 0x03000000:
def unicode_literal(s):
return s # strings are unicode by default on py3
else:
from io import open
def unicode_literal(s):
return s.decode('utf-8')
@cmdln.option('-v', '--verbose', action="count", dest='verbosity_level',
default=None,
help='-v will show tracebacks; -vv also debug messages')
class LogawareCmdln(cmdln.CmdlnWithConfigParser):
"""A Cmdln class that integrates with this modules's functionality
1. Add -v and -vv global options: show tracebacks when sub commands throw
them only if -v or -vv is passed by the user.
2. Wrap all sub command methods and call `initialize` (to be defined by the
derived class) automatically.
"""
def __init__(self, install_console=False, default_verbosity=0, *args, **kwargs):
"""
Arguments:
- install_console: install console handlers in logger
"""
cmdln.CmdlnWithConfigParser.__init__(self, *args, **kwargs)
self.__initialized = False
self.__install_console = install_console
self.__default_verbosity = default_verbosity
def initialize(self):
"""This method is called by ``bootstrapped`` - once and only once."""
raise NotImplementedError('must be defined by the derived class')
@contextmanager
def bootstrapped(self):
"""Run the sub-command after bootstrapping
It is required to wrap the sub-command code in this context, which takes
care of the following:
- Invokes `setup_console` passing `verbosity_level`
- Invokes `self.initialize` automatically but no more than once.
- Intercept unhandled exceptions and display them according to
verbosity_level
"""
l = logging.getLogger('')
if not self.__initialized:
# install console (if required) and call the `initialize` method
# once.
if self.__install_console:
if self.options.verbosity_level is None:
self.options.verbosity_level = self.__default_verbosity
setup_console(l, self.options.verbosity_level)
with self.__run_safely(l):
self.initialize()
self.__initialized = True
with self.__run_safely(l):
yield
@contextmanager
def __run_safely(self, l):
try:
yield
except KeyboardInterrupt:
# user presses Ctrl-C to terminate the program
l.info('') # print a new-line for the shell prompt's sake
sys.exit(5)
except Exception as e:
if self.__install_console:
# setup_console handles all exceptions; let it do so by calling
# log.exception and exitting immediately.
l.exception(e)
sys.exit(1) # exit to shell
else:
# as setup_console is not used, raise exceptions normally.
raise
def setup_console(l, verbosity_level):
"""Setup console output for logging calls"""
l.setLevel(logging.DEBUG) # level-logic is instead in the handler
existing_consoles = [h for h in l.handlers if isinstance(h, ConsoleHandler)]
if existing_consoles:
assert len(existing_consoles) == 1, \
'more than one console installed. not possible.'
# re-use existing console handler
h = existing_consoles[0]
assert h.verbosity_level == verbosity_level, \
'already has console with different verbosity level'
else:
# create a new console handler
h = ConsoleHandler(verbosity_level)
h.setFormatter(ConsoleFormatter())
l.addHandler(h)
def setup_trace(l, tracefile):
"""Trace logging calls to a standard log file
Log file name and location will be determined based on platform.
"""
l.setLevel(logging.DEBUG) # trace file must have >=DEBUG entries
sh.mkdirs(dirname(tracefile))
_rollover_log(tracefile)
_begin_log_section(tracefile)
h = logging.FileHandler(tracefile)
h.setFormatter(
logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
l.addHandler(h)
@contextmanager
def handledby(l, filename, create_dirs=False, level=None, formatter=None):
"""Momentarily handle logger `l` using FileHandler
Within the 'with' context, all logging calls made to the logger `l` will get
written to the file `filename`. When exiting the 'with' context, this file
is closed and the handler will be removed.
"""
assert isabs(filename), 'not an absolute path: {0}'.format(filename)
if create_dirs:
sh.mkdirs(dirname(filename))
h = logging.FileHandler(filename)
if level:
h.setLevel(level)
if formatter:
h.setFormatter(formatter)
l.addHandler(h)
try:
yield
finally:
h.close()
l.removeHandler(h)
@contextmanager
def archivedby(l, logs_directory, entity_name, level=None, formatter=None):
"""Like `handledby` but the log file is stored in archive.
The exact path to the log file is determined as follows:
$logs_directory/2009/03/24/142356_$entity_name.txt
"""
now = datetime.now() # NOTE: this is local time, not UTC time.
filename = join(logs_directory,
now.strftime('%Y'), now.strftime('%m'), now.strftime('%d'),
'{0}_{1}.txt'.format(now.strftime('%H%M%S'), entity_name))
assert not exists(filename), 'already exists: {0}'.format(filename)
with handledby(l, filename, create_dirs=True,
level=level, formatter=formatter):
yield filename
@contextmanager
def wrapped(l):
"""'With' context to intercept and log any exceptions raised"""
try:
yield
except Exception as e:
l.exception(e)
raise
def runonconsole(l):
"""Run on console .. and exit the program appropriately.
If an exception is raised, it is silently logged (so
>>> with log.run(logging.getLogger('pypm')) as retcode:
"""
try:
yield
except Exception as e:
l.exception(e)
sys.exit(1)
sys.exit(0)
try:
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record):
pass
# -- internal
def _rollover_log(logfile, maxsize=(2<<20)):
"""Move $logfile to $logfile.old if its size exceeds `maxsize`"""
if exists(logfile):
filesize = os.stat(logfile)[stat.ST_SIZE]
if filesize >= maxsize:
sh.mv(logfile, logfile+'.old')
def _begin_log_section(logfile):
"""Begin a new section in the logfile
Also write the current datetime
"""
LINE_BUFFERED=1
with open(logfile, 'a', LINE_BUFFERED, encoding='utf-8') as f:
f.write(unicode_literal('\n')) # sections are separated by newline
f.write(unicode_literal('{0}\n'.format(datetime.now())))
class ConsoleHandler(logging.StreamHandler):
"""Send messages to console
INFO messages are sent to stdout. Other levels to stderr.
By default, INFO/WARN/ERROR messages are sent as-it-is to console .. while
EXCEPTION messages are pruned and shown as error unless verbosity level is
greater than zero. If verbosity level is greater than one, then DEBUG
messages are also shown.
"""
def __init__(self, verbosity_level):
logging.StreamHandler.__init__(self)
self.stream = None # reset it; we are not going to use it anyway
self.verbosity_level = verbosity_level
def emit(self, record):
if record.levelno == logging.INFO:
self.__emit(record, sys.stdout)
elif record.levelno == logging.WARN:
self.__emit(record, sys.stderr)
elif record.levelno == logging.DEBUG:
# show DEBUG messages with verbosity_level >= 2
if self.verbosity_level > 1:
self.__emit(record, sys.stderr)
elif record.levelno >= logging.ERROR:
if record.exc_info and self.verbosity_level < 1:
# supress full traceback with verbosity_level <= 0
with new_record_exc_info(record, None):
self.__emit(record, sys.stderr)
else:
self.__emit(record, sys.stderr)
else:
raise NotImplementedError(
"don't know about level: {0}".format(record.levelno))
def __emit(self, record, strm):
# override handler stream with ours (which could stdout or stderr)
self.stream = strm
with textui.safe_output():
# We *trust* that `logging` module's `emit()` will always terminate the
# message with newlines. This is essential for not breaking the progress
# bar, if any.
logging.StreamHandler.emit(self, record)
def flush(self):
# Workaround a bug in logging module
# See:
# http://bugs.python.org/issue6333
if self.stream and hasattr(self.stream, 'flush') and not self.stream.closed:
try:
logging.StreamHandler.flush(self)
except IOError as e:
if e.errno == 32:
# skip 'broken pipe' errors that likely occur due to
# killing the process on the other end of the pipe
# eg: piping command output to `less` and then pressing
# Q in the middle of it.
pass
else:
raise
def _clear_record_traceback_cache(record):
"""Clear the traceback cache stored in `record` (LogRecord)
Workaround for: http://bugs.python.org/issue6435
"""
record.exc_text = None
@contextmanager
def new_record_exc_info(record, exc_info):
"""Temporarily assign `exc_info` to `record`"""
_clear_record_traceback_cache(record)
old_exc_info = record.exc_info
record.exc_info = exc_info
try:
yield
finally:
record.exc_info = old_exc_info
_clear_record_traceback_cache(record)
class ConsoleFormatter(logging.Formatter):
"""A formatter that attaches 'error:' prefix to error/critical messages"""
def format(self, record):
# attach 'error:' prefix to error/critical messages
# attach 'warning:' prefix accordingly
s = logging.Formatter.format(self, record)
if record.levelno >= logging.ERROR:
return 'error: {0}'.format(s)
elif record.levelno == logging.WARNING:
return 'warning: {0}'.format(s)
else:
return s
+104
View File
@@ -0,0 +1,104 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
"""Miscelleneous utility functions
"""
import sys
from os import path
import six
from applib import _cmdln as cmdln
__all__ = ['xjoin', 'existing']
def xjoin(*c):
"""Equivalent to normpath(abspath(join(*c)))"""
return path.normpath(path.abspath(path.join(*c)))
def existing(pth):
"""Return path, but assert its presence first"""
assert isinstance(pth, (str, unicode)), \
'not of string type: %s <%s>' % (pth, type(pth))
assert exists(pth), 'file/directory not found: %s' % pth
return pth
def require_option(options, option_name, details=None):
"""
>>> require_option('foo-bar')
...
CmdlnUserError: required option, --foo-bar, is mising
From http://twitter.com/ActiveState/status/19782350475
'required options' - conforming to unix standard vs being creative with
non-positional arguments. http://bit.ly/d2iiUL #python #optparse ^SR
"""
option_var_name = option_name.replace('-', '_')
if not hasattr(options, option_var_name):
raise ValueError(
"require_option: undefined option '%s'" % option_var_name)
if getattr(options, option_var_name) is None:
msg = 'required option "--{0}" is missing'.format(option_name)
if details:
msg = '%s (%s)' % (msg, details)
raise cmdln.CmdlnUserError(msg)
def safe_unicode(obj):
"""Return the unicode/text representation of `obj` without throwing UnicodeDecodeError
Returned value is only a *representation*, not necessarily identical.
"""
if type(obj) not in (six.text_type, six.binary_type):
obj = six.text_type(obj)
if type(obj) is six.text_type:
return obj
else:
return obj.decode(errors='ignore')
def _hack_unix2win_path_conversion(cmdln_options, option_names):
"""Hack to convert Unix paths in cmdln options (via config file) to
Windows specific netshare location
Config file must define the mapping as config var "unix2win_path_mapping"
"""
require_option(cmdln_options, 'unix2win_path_mapping')
for opt in option_names:
setattr(
cmdln_options,
opt,
_cmdln_canonical_path(
cmdln_options.unix2win_path_mapping,
getattr(cmdln_options, opt)))
def _cmdln_canonical_path(unix2win_path_mapping, unixpath):
"""Given a unix path return the platform-specific path
On Windows, use the given mapping to translate the path. On Unix platforms,
this function essentially returns `unixpath`.
The mapping is simply a buildout.cfg-friendly multiline string that would
get parsed as dictionary which should have path prefixes as keys, and the
translated Windows net share path as the values. See PyPM's
etc/activestate.conf for an example.
The mapping is typically supposed to be defined in the config file under
the cmdln section. This function is used by PyPM and Grail.
"""
unix2win_path_mapping = unix2win_path_mapping or ""
# convert buildout.cfg-style multiline mapping to a dict
m = dict([
[x.strip() for x in line.strip().split(None, 1)]
for line in unix2win_path_mapping.splitlines() if line.strip()])
if sys.platform.startswith('win'):
for prefix, netsharepath in m.items():
if unixpath.startswith(prefix):
return netsharepath + unixpath[len(prefix):]
return unixpath
+203
View File
@@ -0,0 +1,203 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
"""Various shell related wrappers
"""
import os
from os import path
import shutil
import tempfile
from fnmatch import fnmatch
from contextlib import contextmanager
from applib._proc import *
#
# Compression routines
#
class PackError(Exception):
"""Error during pack or unpack"""
def unpack_archive(filename, pth='.'):
"""Unpack the archive under ``path``
Return (unpacked directory path, filetype)
"""
from applib import _compression
assert path.isfile(filename), 'not a file: %s' % filename
assert path.isdir(pth)
for filetype, implementor in _compression.implementors.items():
if implementor.is_valid(filename):
with cd(pth):
return (implementor(filename).extract(), filetype)
else:
raise PackError('unknown compression format: ' + filename)
def pack_archive(filename, files, pwd, filetype="tgz"):
"""Pack the given `files` from directory `pwd`
`filetype` must be one of ["tgz", "tbz2", "zip"]
"""
from applib import _compression
assert path.isdir(pwd)
assert filetype in _compression.implementors, 'invalid filetype: %s' % filetype
if path.exists(filename):
rm(filename)
with cd(pwd):
relnames = [path.relpath(file, pwd) for file in files]
_compression.implementors[filetype].pack(relnames, filename)
return filename
#
# Path/file routines
#
def mkdirs(pth):
"""Make all directories along ``pth``"""
if not path.exists(pth):
os.makedirs(pth)
else:
assert path.isdir(pth)
def rm(p):
"""Remove the specified path recursively. Similar to `rm -rf ARG`
Note: if ARG is a symlink, only that symlink will be removed.
"""
if path.lexists(p):
if path.isdir(p) and not path.islink(p):
shutil.rmtree(p)
else:
os.remove(p)
def mv(src, dest, _mkdirs=False):
"""Move `src` to `dest`"""
if _mkdirs:
mkdirs(path.dirname(dest))
shutil.move(src, dest)
def cp(src, dest, _mkdirs=False, ignore=None, copyperms=True):
"""Copy `src` to `dest` recursively"""
assert path.exists(src)
if _mkdirs:
mkdirs(path.dirname(dest))
if path.isdir(src):
_copytree(src, dest, ignore=ignore, copyperms=copyperms)
else:
shutil.copyfile(src, dest)
def find(pth, pattern):
"""Find files or directories matching ``pattern`` under ``pth``"""
matches = []
if path.isfile(pth):
if fnmatch(path.basename(pth), pattern):
matches.append(pth)
else:
for root, dirs, files in os.walk(pth):
matches.extend([
path.join(root, f) for f in files+dirs
if fnmatch(f, pattern)
])
return matches
@contextmanager
def cd(pth):
"""With context to temporarily change directory"""
assert path.isdir(existing(pth)), pth
cwd = os.getcwd()
os.chdir(pth)
try:
yield
finally:
os.chdir(cwd)
@contextmanager
def tmpdir(prefix='tmp-', suffix=''):
"""__with__ context to work in a temporary working directory
Temporary directory will be deleted unless an exception was raised. During
the context, CWD will be changed to the temporary directory.
"""
d = tempfile.mkdtemp(prefix=prefix, suffix=suffix)
with cd(d):
yield d
rm(d)
def existing(pth):
"""Return `pth` after checking it exists"""
if not path.exists(pth):
raise IOError('"{0}" does not exist'.format(pth))
return pth
def _copytree(src, dst, symlinks=False, ignore=None, copyperms=True):
"""Forked shutil.copytree for `copyperms` support"""
names = os.listdir(src)
if ignore is not None:
ignored_names = ignore(src, names)
else:
ignored_names = set()
os.makedirs(dst)
errors = []
for name in names:
if name in ignored_names:
continue
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
try:
if symlinks and os.path.islink(srcname):
linkto = os.readlink(srcname)
os.symlink(linkto, dstname)
elif os.path.isdir(srcname):
_copytree(srcname, dstname, symlinks, ignore, copyperms)
else:
shutil.copy(srcname, dstname)
# XXX What about devices, sockets etc.?
except (IOError, os.error) as why:
raise
errors.append((srcname, dstname, str(why)))
# catch the Error from the recursive copytree so that we can
# continue with other files
except shutil.Error:
_, err = sys.exec_info()
errors.extend(err.args[0])
if copyperms:
try:
shutil.copystat(src, dst)
except WindowsError:
# can't copy file access times on Windows
pass
except OSError:
_, why = sys.exec_info()
errors.extend((src, dst, str(why)))
if errors:
raise shutil.Error(errors)
# WindowsError is not available on other platforms
try:
WindowsError
except NameError:
class WindowsError(OSError): pass
+195
View File
@@ -0,0 +1,195 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
from __future__ import unicode_literals
import os
from os import path
import tempfile
import sys
import pytest
from applib import sh
from applib import textui
from applib.misc import safe_unicode
import six
fixtures = path.join(path.dirname(__file__), 'fixtures')
def test_import():
import applib
import applib.base
import applib.sh
import applib.textui
import applib.log
import applib.misc
def test_sh_runerror_unicode():
# https://github.com/activestate/applib/issues/12
with pytest.raises(sh.RunError):
try:
sh.run('echo ' + "\u1234" + " & nonexistant")
except sh.RunError as e:
print(safe_unicode(e))
raise
def test_sh_runerror_limit():
with pytest.raises(sh.RunError):
from random import choice
import string
LINE = ''.join([choice(string.ascii_letters) for x in range(80)])
try:
sh.run(r'''python -c "print('\n'.join(['%s']*100)); raise SystemExit('an error');"''' % LINE)
except sh.RunError as e:
c = str(e).count(LINE)
# def _limit_str(s, maxchars=80*15): --- so ~ 15 lines (not 100 lines)
assert c < 20, "original message: %s" % e
assert '[...]' in str(e)
raise
def test_safe_unicode():
from applib.misc import safe_unicode
import six
abc_bytes = b'ab\nc'
abc_text = 'ab\nc'
assert safe_unicode(abc_bytes) == abc_text
assert safe_unicode(abc_text) == abc_text
foo_text = 'abc' # note: \x89 is ignored.
foo_bytes = b'\x89abc'
assert safe_unicode(foo_text) == foo_text
assert safe_unicode(foo_bytes) == foo_text
def test_sh_rm_file():
with sh.tmpdir():
with open('afile', 'w') as f: f.close()
assert path.exists('afile')
sh.rm('afile')
assert not path.exists('afile')
def test_sh_rm_dir():
with sh.tmpdir():
sh.mkdirs('adir')
with sh.cd('adir'):
with open('afile', 'w') as f: f.close()
assert path.exists('afile')
assert path.exists('adir')
sh.rm('adir')
assert not path.exists('adir')
# Workaround a py.test bug:
# Error evaluating 'skipif' expression
# b'sys.platform == "win32"'
# Failed: expression is not a string
def skipif(expr):
if not six.PY3:
expr = expr.encode()
return pytest.mark.skipif(expr)
@skipif('sys.platform == "win32"')
def test_sh_rm_symlink():
with sh.tmpdir():
with open('afile', 'w') as f: f.close()
assert path.exists('afile')
os.symlink('afile', 'alink')
assert path.lexists('alink')
sh.rm('alink')
assert not path.lexists('alink')
@skipif('sys.platform == "win32"')
def test_sh_rm_broken_symlink():
with sh.tmpdir():
os.symlink('afile-notexist', 'alink')
assert not path.exists('alink')
assert path.lexists('alink')
sh.rm('alink')
assert not path.lexists('alink')
@skipif('sys.platform == "win32"')
def test_sh_rm_symlink_dir():
with sh.tmpdir():
sh.mkdirs('adir')
with sh.cd('adir'):
with open('afile', 'w') as f: f.close()
assert path.exists('afile')
assert path.exists('adir')
os.symlink('adir', 'alink')
assert path.lexists('alink')
sh.rm('alink')
assert path.exists('adir')
assert not path.lexists('alink')
def test_console_width_detection():
width = textui.find_console_width()
assert width is None
def test_colprint():
sample_table = [
['python-daemon', '4.5.7.7.3-1', 'blah foo meh yuck'],
['foo', '6.1', ('some very loooooooooong string here .. I '
'suggest we make it even longer .. so longer '
' that normal terminal widths should entail '
'colprint to trim the string')]]
textui.colprint(sample_table)
# try with empty inputs
textui.colprint(None)
textui.colprint([])
def test_compression_ensure_read_access():
"""Test the ensure_read_access() hack in _compression.py"""
def test_pkg(pkgpath):
testdir = tempfile.mkdtemp('-test', 'pypm-')
extracted_dir, _ = sh.unpack_archive(pkgpath, testdir)
# check if we have read access on the directory
for child in os.listdir(extracted_dir):
p = path.join(extracted_dir, child)
if path.isdir(p):
os.listdir(p)
sh.rm(testdir)
yield 'u-x on dirs', test_pkg, path.join(fixtures, 'generator_tools-0.3.5.tar.gz')
yield 'u-w on ._setup.py', test_pkg, path.join(fixtures, 'TracProjectMenu-1.0.tar.gz')
def test_compression_catch_invalid_mode():
"""Error <IOError: [Errno 22] invalid mode ('wb') or filename> from
tarfile.py should be handled"""
def extract():
testdir = tempfile.mkdtemp('-test', 'pypm-')
extracted_dir, _ = sh.unpack_archive(
path.join(fixtures, 'libtele-0.2.tar.gz'), testdir)
if sys.platform == 'win32' and sys.version_info[:2] >= (2, 7):
with pytest.raises(sh.PackError):
extract()
else:
extract()
@pytest.mark.xfail
def test_compression_issue_11():
"""https://github.com/ActiveState/applib/issues/#issue/11
* Windows: IOError: [Errno 13] Permission denied: '.\\airi-0.0.1\\AIRi'
* OSX: IOError: [Errno 21] Is a directory: './airi-0.0.1/AIRi
* OSX Archive Utility (Finder): Unable to unarchive "airi-0.0.1.tar" ...
(Error 1 - Operation not permitted.)
"""
testdir = tempfile.mkdtemp('applib')
d, _ = sh.unpack_archive(path.join(fixtures, 'airi-0.0.1.tar.gz'), testdir)
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+359
View File
@@ -0,0 +1,359 @@
# Copyright (c) 2010 ActiveState Software Inc. All rights reserved.
"""Textual UI: progress bar and colprint"""
import sys
from datetime import datetime, timedelta
from contextlib import contextmanager
import logging
import math
import six
LOG = logging.getLogger(__name__)
__all__ = ['colprint', 'find_console_width', 'ProgressBar',
'clear_progress_bar', 'redraw_progress_bar', 'safe_output']
if not six.PY3:
input = raw_input
class ProgressBar(object):
"""Show percent progress every 'n' seconds"""
def __init__(self, total, delay=0.1, show_size=lambda x: x, note=None):
"""
total - total number of items that are going to be processed
delay - update delay in seconds
show_size - function to return the string to display instead of the number `size`
"""
assert total >= 0, total
assert delay >= 0, delay
assert show_size
_set_current_progress_bar(self)
self.delay = timedelta(seconds=delay)
self.delay_duration = timedelta(seconds=1)
self.start = datetime.now()
self.elapsed = None # time elapsed from start
self.estimated_time_left = None
self.lastprint = None
self.lastprint_duration = None # for updating duration/ETA
self.lastprocessed = 0
self.total = total
self.processed = 0
self.show_size = show_size
self.note = note
self.duration_display = ''
self._length = 0 # current length of the progress display
@classmethod
def iterate(cls, sequence, note=None, post=None):
"""Iterate a sequence and update the progress bar accordingly
The sequence must have a 'len' attribute if it is an arbitrary
generator.
note -- Text to print before the progress bar
post -- Text to print at the end of progress (w/ fmt vars)
"""
p = cls(len(sequence), note=note)
clean_exit = False
try:
for item in sequence:
yield item
p.tick()
clean_exit = True
finally:
p.close()
if post and clean_exit:
sys.stdout.write(post.format(**p.__dict__) + '\n')
def tick(self, items=1):
"""The method that updates the display if necessary.
After creating the ``PercentProgress`` object, this method must be
called for every item processed (or, pass items=ITEMS for every ITEMS
processed).
This method must be called no more than ``self.total`` times (otherwise
you get assertion error .. implying a bug in your code)
Return True if progress bar was redrawn.
"""
self.processed += items
assert self.processed <= self.total, \
'{0} <= {1}'.format(self.processed, self.total)
now = datetime.now()
if (self.lastprint == None or
(now - self.lastprint) > self.delay):
self.lastprint = now
self.redraw()
return True
else:
return False
def clear(self):
"""Erase the entire progress bar and put the cursor at first column"""
# Move cursor to the beginning of current progress line so that further
# messages will overwrite the progress bar. Also overwrite the previous
# progress bar with empty space.
sys.stdout.write('\r' + ' '*self._length + '\r')
sys.stdout.flush()
def close(self):
"""Close (hide) the progress bar
Erase the progress bar and print the closing message in place of the
previous progress bar text.
"""
self.redraw()
self.clear()
_del_current_progress_bar(self)
def redraw(self):
self.clear()
percent = _calculate_percent(self.processed, self.total)
now = datetime.now()
self.elapsed = now - self.start
if self.processed:
self.estimated_time_left = self.elapsed.seconds * (self.total-self.processed)/self.processed
# Update time elapsed/left once a second only (delay_duration = 1s).
if self.elapsed.seconds and (
self.lastprint_duration is None or \
now - self.lastprint_duration > self.delay_duration):
self.lastprint_duration = now
elapsed = _format_duration(self.elapsed.seconds)
if self.estimated_time_left:
self.duration_display = '({0}; {1} left)'.format(
elapsed, _format_duration(self.estimated_time_left))
else:
self.duration_display = '({0})'.format(elapsed)
bar_width = 20
bar_filled = int(round(20.0/100 * percent))
filled = ['='] * bar_filled
if filled:
filled[-1] = '>'
filled = ''.join(filled)
progress_bar = ''.join([
(self.note+': ') if self.note else '',
# header:
'[',
# solid bar
filled,
# empty space
' ' * (bar_width-bar_filled),
# footer
'] {0:-3}% {1}/{2} {3}'.format(
percent,
self.show_size(self.processed),
self.show_size(self.total),
self.duration_display
)
])
self._length = len(progress_bar)
sys.stdout.write('\r' + progress_bar + '\r')
sys.stdout.flush()
def clear_progress_bar():
"""Clear progress bar, if any"""
if _current_progress_bar:
_current_progress_bar.clear()
def redraw_progress_bar():
"""Redraw progress bar, if any"""
if _current_progress_bar:
_current_progress_bar.redraw()
@contextmanager
def safe_output():
"""Wrapper that makes it safe to print to stdout
If a progress bar is currently being shown, this wrapper takes care of
clearing it before .. and then redrawing it after
"""
clear_progress_bar()
yield
redraw_progress_bar()
def askyesno(question, default):
"""Ask (Y/N) type of question to the user"""
assert isinstance(default, bool), '"default" must be a boolean'
s = '{0} ({1}/{2}) '.format(
question,
default and 'Y' or 'y',
default and 'n' or 'N')
while True:
val = input(s).strip().lower()
if val == '':
return default
elif val in ('y', 'yes', 'ok'):
return True
elif val in ('n', 'no'):
return False
# This function was written by Alex Martelli
# http://stackoverflow.com/questions/1396820/
def colprint(table, totwidth=None):
"""Print the table in terminal taking care of wrapping/alignment
- `table`: A table of strings. Elements must not be `None`
- `totwidth`: If None, console width is used
"""
if not table: return
if totwidth is None:
totwidth = find_console_width()
if totwidth is not None:
totwidth -= 1 # for not printing an extra empty line on windows
numcols = max(len(row) for row in table)
# ensure all rows have >= numcols columns, maybe empty
padded = [row+numcols*['',] for row in table]
# compute col widths, including separating space (except for last one)
widths = [ 1 + max(len(x) for x in column) for column in zip(*padded)]
widths[-1] -= 1
# drop or truncate columns from the right in order to fit
if totwidth is not None:
while sum(widths) > totwidth:
mustlose = sum(widths) - totwidth
if widths[-1] <= mustlose:
del widths[-1]
else:
widths[-1] -= mustlose
break
# and finally, the output phase!
for row in padded:
s = ''.join(['%*s' % (-w, i[:w])
for w, i in zip(widths, row)])
LOG.info(s)
def find_console_width():
"""Return the console width
Return ``None`` if stdout is not a terminal (eg: a pipe)
"""
if sys.platform.startswith('win'):
return _find_windows_console_width()
else:
return _find_unix_console_width()
@contextmanager
def longrun(log, finalfn=lambda: None):
"""Decorator for performing a long operation with consideration for the
command line.
1. Catch keyboard interrupts and exit gracefully
2. Print total time elapsed always at the end (successful or not)
3. Call ``finalfn`` always at the end (successful or not)
"""
start_time = datetime.now()
try:
yield
except KeyboardInterrupt:
log.info('*** interrupted by user - Ctrl+c ***')
raise SystemExit(3)
finally:
finalfn()
end_time = datetime.now()
log.info('')
log.info('-----')
log.info('Total time elapsed: %s', end_time-start_time)
def _find_unix_console_width():
import termios, fcntl, struct, sys
# fcntl.ioctl will fail if stdout is not a tty
if not sys.stdout.isatty():
return None
s = struct.pack("HHHH", 0, 0, 0, 0)
fd_stdout = sys.stdout.fileno()
size = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s)
height, width = struct.unpack("HHHH", size)[:2]
return width
def _find_windows_console_width():
# http://code.activestate.com/recipes/440694/
from ctypes import windll, create_string_buffer
STDIN, STDOUT, STDERR = -10, -11, -12
h = windll.kernel32.GetStdHandle(STDERR)
csbi = create_string_buffer(22)
res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
if res:
import struct
(bufx, bufy, curx, cury, wattr,
left, top, right, bottom,
maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
sizex = right - left + 1
sizey = bottom - top + 1
return sizex
def _byteshr(bytes):
"""Human-readable version of bytes count"""
for x in ['bytes','KB','MB','GB','TB']:
if bytes < 1024.0:
return "%3.1f%s" % (bytes, x)
bytes /= 1024.0
raise ValueError('cannot find human-readable version')
def _calculate_percent(numerator, denominator):
assert numerator <= denominator, '%d <= %d' % (numerator, denominator)
if denominator == 0:
if numerator == 0:
return 100
else:
raise ValueError('denominator cannot be zero')
return int(round( numerator / float(denominator) * 100 ))
def _format_duration(seconds):
s = []
if seconds > 60:
s.append('{0}m'.format(int(seconds/60)))
s.append('{0}s'.format(int(seconds % 60)))
return ''.join(s)
# Handle to the current progress bar object. There cannot be more than one
# progress bar for obvious reasons.
_current_progress_bar = None
def _set_current_progress_bar(pbar):
global _current_progress_bar
assert _current_progress_bar is None, 'there is already a pbar'
_current_progress_bar = pbar
def _del_current_progress_bar(pbar):
global _current_progress_bar
assert _current_progress_bar is pbar, 'pbar is something else'
_current_progress_bar = None