diff --git a/clint/packages/applib/__init__.py b/clint/packages/applib/__init__.py deleted file mode 100644 index 1c0f215..0000000 --- a/clint/packages/applib/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -__version_info__ = ('1', '2') -__version__ = '.'.join(__version_info__) # + '.dev' diff --git a/clint/packages/applib/_cmdln.py b/clint/packages/applib/_cmdln.py deleted file mode 100644 index be17001..0000000 --- a/clint/packages/applib/_cmdln.py +++ /dev/null @@ -1,1843 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2002-2009 ActiveState Software Inc. -# License: MIT (see LICENSE.txt for license details) -# Author: Trent Mick - -"""An improvement on Python's standard cmd.py module. - -As with cmd.py, this module provides "a simple framework for writing -line-oriented command intepreters." This module provides a 'RawCmdln' -class that fixes some design flaws in cmd.Cmd, making it more scalable -and nicer to use for good 'cvs'- or 'svn'-style command line interfaces -or simple shells. And it provides a 'Cmdln' class that add -optparse-based option processing. Basically you use it like this: - - import cmdln - - class MySVN(cmdln.Cmdln): - name = "svn" - - @cmdln.alias('stat', 'st') - @cmdln.option('-v', '--verbose', action='store_true' - help='print verbose information') - def do_status(self, subcmd, opts, *paths): - print "handle 'svn status' command" - - #... - - if __name__ == "__main__": - shell = MySVN() - retval = shell.main() - sys.exit(retval) - -See the README.txt or for more -details. -""" - -__version_info__ = (1, 2, 0) -__version__ = '.'.join(map(str, __version_info__)) - -import os -from os import path -import sys -import re -import types -import cmd -import optparse -from pprint import pprint -try: - import ConfigParser -except ImportError: - import configparser as ConfigParser # python3 -import datetime - - -if sys.hexversion > 0x03000000: - ClassType = type -else: - ClassType = types.ClassType - - -#---- globals - -LOOP_ALWAYS, LOOP_NEVER, LOOP_IF_EMPTY = range(3) - -# An unspecified optional argument when None is a meaningful value. -_NOT_SPECIFIED = ("Not", "Specified") - -# Pattern to match a TypeError message from a call that -# failed because of incorrect number of arguments (see -# Python/getargs.c). -_INCORRECT_NUM_ARGS_RE = re.compile( - r"(takes [\w ]+ )(\d+)[\w ]*( arguments? \()(\d+)( given\))") - - - -#---- exceptions - -class CmdlnError(Exception): - """A cmdln.py usage error.""" - def __init__(self, msg): - self.msg = msg - def __str__(self): - return self.msg - -class CmdlnUserError(Exception): - """An error by a user of a cmdln-based tool/shell.""" - pass - - - -#---- public methods and classes - -def alias(*aliases): - """Decorator to add aliases for Cmdln.do_* command handlers. - - Example: - class MyShell(cmdln.Cmdln): - @cmdln.alias("!", "sh") - def do_shell(self, argv): - #...implement 'shell' command - """ - def decorate(f): - if not hasattr(f, "aliases"): - f.aliases = [] - f.aliases += aliases - return f - return decorate - - -class RawCmdln(cmd.Cmd): - """An improved (on cmd.Cmd) framework for building multi-subcommand - scripts (think "svn" & "cvs") and simple shells (think "pdb" and - "gdb"). - - A simple example: - - import cmdln - - class MySVN(cmdln.RawCmdln): - name = "svn" - - @cmdln.aliases('stat', 'st') - def do_status(self, argv): - print "handle 'svn status' command" - - if __name__ == "__main__": - shell = MySVN() - retval = shell.main() - sys.exit(retval) - """ - name = None # if unset, defaults basename(sys.argv[0]) - prompt = None # if unset, defaults to self.name+"> " - version = None # if set, default top-level options include --version - - # Default messages for some 'help' command error cases. - # They are interpolated with one arg: the command. - nohelp = "no help on '%s'" - unknowncmd = "unknown command: '%s'" - - helpindent = '' # string with which to indent help output - - def __init__(self, completekey='tab', - stdin=None, stdout=None, stderr=None): - """Cmdln(completekey='tab', stdin=None, stdout=None, stderr=None) - - The optional argument 'completekey' is the readline name of a - completion key; it defaults to the Tab key. If completekey is - not None and the readline module is available, command completion - is done automatically. - - The optional arguments 'stdin', 'stdout' and 'stderr' specify - alternate input, output and error output file objects; if not - specified, sys.* are used. - - If 'stdout' but not 'stderr' is specified, stdout is used for - error output. This is to provide least surprise for users used - to only the 'stdin' and 'stdout' options with cmd.Cmd. - """ - if self.name is None: - self.name = os.path.basename(sys.argv[0]) - if self.prompt is None: - self.prompt = self.name+"> " - self._name_str = self._str(self.name) - self._prompt_str = self._str(self.prompt) - if stdin is not None: - self.stdin = stdin - else: - self.stdin = sys.stdin - if stdout is not None: - self.stdout = stdout - else: - self.stdout = sys.stdout - if stderr is not None: - self.stderr = stderr - elif stdout is not None: - self.stderr = stdout - else: - self.stderr = sys.stderr - self.cmdqueue = [] - self.completekey = completekey - self.cmdlooping = False - - def get_option_defaults(self, cmdname): - """Return default values for command options - - For all options registered for the given command (`cmdname`), return - the default values as a dictionary (option name as keys, default value - as values) - - If `cmdname` is None, return default for top-level options - """ - return {} - - def get_optparser(self): - """Hook for subclasses to set the option parser for the - top-level command/shell. - - NOTE: you may not override this method anymore; cmdln.option decorator - can now be used on the class itself to create toplevel options. - - This option parser is retrieved and used by `.main()' to handle - top-level options. - - The default implements a single '-h|--help' option. Sub-classes - can return None to have no options at the top-level. Typically - an instance of CmdlnOptionParser should be returned. - """ - return self._create_toplevel_optparser() - - def _create_toplevel_optparser(self): - version = (self.version is not None - and "%s %s" % (self._name_str, self.version) - or None) - parser = CmdlnOptionParser(self, version=version) - - # if ``useconfig`` is used, add the -c option to specify extra config - # file - # if hasattr(self, 'defaultsconfig'): - # parser.add_option('-c', '--configfile', - # dest='configfile', - # help="specify the config file location", - # default=None) - - # add toplevel options - if hasattr(self, 'toplevel_optparser_options'): - for args, kwargs in self.toplevel_optparser_options: - parser.add_option(*args, **kwargs) - - return parser - - def postoptparse(self): - """Hook method executed just after `.main()' parses top-level - options. - - When called `self.options' holds the results of the option parse. - """ - - def main(self, argv=None, loop=LOOP_NEVER): - """A possible mainline handler for a script, like so: - - import cmdln - class MyCmd(cmdln.Cmdln): - name = "mycmd" - ... - - if __name__ == "__main__": - MyCmd().main() - - By default this will use sys.argv to issue a single command to - 'MyCmd', then exit. The 'loop' argument can be use to control - interactive shell behaviour. - - Arguments: - "argv" (optional, default sys.argv) is the command to run. - It must be a sequence, where the first element is the - command name and subsequent elements the args for that - command. - "loop" (optional, default LOOP_NEVER) is a constant - indicating if a command loop should be started (i.e. an - interactive shell). Valid values (constants on this module): - LOOP_ALWAYS start loop and run "argv", if any - LOOP_NEVER run "argv" (or .emptyline()) and exit - LOOP_IF_EMPTY run "argv", if given, and exit; - otherwise, start loop - """ - if argv is None: - argv = sys.argv - else: - argv = argv[:] # don't modify caller's list - - try: - self.optparser = self.get_optparser() - if self.optparser: # i.e. optparser=None means don't process for opts - try: - self.options, args = self.optparser.parse_args(argv[1:]) - except StopOptionProcessing: - return 0 - else: - # Set default options *after* parsing command line options - # This is an requirement for CmdlnWithConfigParser which - # relies on the -c option which is only parsed in the above - # `try' block - self.optparser.set_defaults(**self.get_option_defaults(None)) - self.options, args = self.optparser.parse_args(argv[1:]) - else: - self.options, args = None, argv[1:] - self.postoptparse() - except CmdlnUserError: - _, ex, _ = sys.exc_info() - msg = "%s: %s\nTry '%s help' for info.\n"\ - % (self.name, ex, self.name) - self.stderr.write(self._str(msg)) - self.stderr.flush() - return 1 - - if loop == LOOP_ALWAYS: - if args: - self.cmdqueue.append(args) - return self.cmdloop() - elif loop == LOOP_NEVER: - if args: - return self.cmd(args) - else: - return self.emptyline() - elif loop == LOOP_IF_EMPTY: - if args: - return self.cmd(args) - else: - return self.cmdloop() - - def cmd(self, argv): - """Run one command and exit. - - "argv" is the arglist for the command to run. argv[0] is the - command to run. If argv is an empty list then the - 'emptyline' handler is run. - - Returns the return value from the command handler. - """ - assert isinstance(argv, (list, tuple)), \ - "'argv' is not a sequence: %r" % argv - retval = None - try: - argv = self.precmd(argv) - retval = self.onecmd(argv) - self.postcmd(argv) - except: - if not self.cmdexc(argv): - raise - retval = 1 - return retval - - def _str(self, s): - """Safely convert the given str/unicode to a string for printing.""" - try: - return str(s) - except UnicodeError: - #XXX What is the proper encoding to use here? 'utf-8' seems - # to work better than "getdefaultencoding" (usually - # 'ascii'), on OS X at least. - #return s.encode(sys.getdefaultencoding(), "replace") - return s.encode("utf-8", "replace") - - def cmdloop(self, intro=None): - """Repeatedly issue a prompt, accept input, parse into an argv, and - dispatch (via .precmd(), .onecmd() and .postcmd()), passing them - the argv. In other words, start a shell. - - "intro" (optional) is a introductory message to print when - starting the command loop. This overrides the class - "intro" attribute, if any. - """ - self.cmdlooping = True - self.preloop() - if self.use_rawinput and self.completekey: - try: - import readline - self.old_completer = readline.get_completer() - readline.set_completer(self.complete) - readline.parse_and_bind(self.completekey+": complete") - except ImportError: - pass - try: - if intro is None: - intro = self.intro - if intro: - intro_str = self._str(intro) - self.stdout.write(intro_str+'\n') - self.stop = False - retval = None - while not self.stop: - if self.cmdqueue: - argv = self.cmdqueue.pop(0) - assert isinstance(argv, (list, tuple)), \ - "item on 'cmdqueue' is not a sequence: %r" % argv - else: - if self.use_rawinput: - try: - line = raw_input(self._prompt_str) - except EOFError: - line = 'EOF' - except KeyboardInterrupt: - line = 'KeyboardInterrupt' - else: - self.stdout.write(self._prompt_str) - self.stdout.flush() - line = self.stdin.readline() - if not len(line): - line = 'EOF' - else: - line = line[:-1] # chop '\n' - argv = line2argv(line) - try: - argv = self.precmd(argv) - retval = self.onecmd(argv) - self.postcmd(argv) - except: - if not self.cmdexc(argv): - raise - retval = 1 - self.lastretval = retval - self.postloop() - finally: - if self.use_rawinput and self.completekey: - try: - import readline - readline.set_completer(self.old_completer) - except ImportError: - pass - self.cmdlooping = False - return retval - - def precmd(self, argv): - """Hook method executed just before the command argv is - interpreted, but after the input prompt is generated and issued. - - "argv" is the cmd to run. - - Returns an argv to run (i.e. this method can modify the command - to run). - """ - return argv - - def postcmd(self, argv): - """Hook method executed just after a command dispatch is finished. - - "argv" is the command that was run. - """ - pass - - def cmdexc(self, argv): - """Called if an exception is raised in any of precmd(), onecmd(), - or postcmd(). If True is returned, the exception is deemed to have - been dealt with. Otherwise, the exception is re-raised. - - The default implementation handles CmdlnUserError's, which - typically correspond to user error in calling commands (as - opposed to programmer error in the design of the script using - cmdln.py). - """ - type, exc, traceback = sys.exc_info() - if isinstance(exc, CmdlnUserError): - msg = "%s %s: %s\nTry '%s help %s' for info.\n"\ - % (self.name, argv[0], exc, self.name, argv[0]) - self.stderr.write(self._str(msg)) - self.stderr.flush() - return True - - def onecmd(self, argv): - if not argv: - return self.emptyline() - self.lastcmd = argv - cmdname = self._get_canonical_cmd_name(argv[0]) - if cmdname: - handler = self._get_cmd_handler(cmdname) - if handler: - try: - return self._dispatch_cmd(handler, argv) - except KeyboardInterrupt: - return self.onecmd(["KeyboardInterrupt"]) - return self.default(argv) - - def _dispatch_cmd(self, handler, argv): - return handler(argv) - - def default(self, argv): - """Hook called to handle a command for which there is no handler. - - "argv" is the command and arguments to run. - - The default implementation writes an error message to stderr - and returns an error exit status. - - Returns a numeric command exit status. - """ - errmsg = self._str(self.unknowncmd % (argv[0],)) - if self.cmdlooping: - self.stderr.write(errmsg+"\n") - else: - self.stderr.write("%s: %s\nTry '%s help' for info.\n" - % (self._name_str, errmsg, self._name_str)) - self.stderr.flush() - return 1 - - def parseline(self, line): - # This is used by Cmd.complete (readline completer function) to - # massage the current line buffer before completion processing. - # We override to drop special '!' handling. - line = line.strip() - if not line: - return None, None, line - elif line[0] == '?': - line = 'help ' + line[1:] - i, n = 0, len(line) - while i < n and line[i] in self.identchars: i = i+1 - cmd, arg = line[:i], line[i:].strip() - return cmd, arg, line - - def helpdefault(self, cmd, known): - """Hook called to handle help on a command for which there is no - help handler. - - "cmd" is the command name on which help was requested. - "known" is a boolean indicating if this command is known - (i.e. if there is a handler for it). - - Returns a return code. - """ - if known: - msg = self._str(self.nohelp % (cmd,)) - if self.cmdlooping: - self.stderr.write(msg + '\n') - else: - self.stderr.write("%s: %s\n" % (self.name, msg)) - else: - msg = self.unknowncmd % (cmd,) - if self.cmdlooping: - self.stderr.write(msg + '\n') - else: - self.stderr.write("%s: %s\n" - "Try '%s help' for info.\n" - % (self.name, msg, self.name)) - self.stderr.flush() - return 1 - - def do_help(self, argv): - """${cmd_name}: give detailed help on a specific sub-command - - Usage: - ${name} help [COMMAND] - """ - if len(argv) > 1: # asking for help on a particular command - doc = None - cmdname = self._get_canonical_cmd_name(argv[1]) or argv[1] - if not cmdname: - return self.helpdefault(argv[1], False) - else: - helpfunc = getattr(self, "help_"+cmdname, None) - if helpfunc: - doc = helpfunc() - else: - handler = self._get_cmd_handler(cmdname) - if handler: - doc = handler.__doc__ - if doc is None: - return self.helpdefault(argv[1], handler != None) - else: # bare "help" command - doc = self.__class__.__doc__ # try class docstring - if doc is None: - # Try to provide some reasonable useful default help. - if self.cmdlooping: prefix = "" - else: prefix = self.name+' ' - doc = """Usage: - %sCOMMAND [ARGS...] - %shelp [COMMAND] - - ${option_list} - ${command_list} - ${help_list} - """ % (prefix, prefix) - cmdname = None - - if doc: # *do* have help content, massage and print that - doc = self._help_reindent(doc) - doc = self._help_preprocess(doc, cmdname) - doc = doc.rstrip() + '\n' # trim down trailing space - self.stdout.write(self._str(doc)) - self.stdout.flush() - do_help.aliases = ["?"] - - def _help_reindent(self, help, indent=None): - """Hook to re-indent help strings before writing to stdout. - - "help" is the help content to re-indent - "indent" is a string with which to indent each line of the - help content after normalizing. If unspecified or None - then the default is use: the 'self.helpindent' class - attribute. By default this is the empty string, i.e. - no indentation. - - By default, all common leading whitespace is removed and then - the lot is indented by 'self.helpindent'. When calculating the - common leading whitespace the first line is ignored -- hence - help content for Conan can be written as follows and have the - expected indentation: - - def do_crush(self, ...): - '''${cmd_name}: crush your enemies, see them driven before you... - - c.f. Conan the Barbarian''' - """ - if indent is None: - indent = self.helpindent - lines = help.splitlines(0) - _dedentlines(lines, skip_first_line=True) - lines = [(indent+line).rstrip() for line in lines] - return '\n'.join(lines) - - def _help_preprocess(self, help, cmdname): - """Hook to preprocess a help string before writing to stdout. - - "help" is the help string to process. - "cmdname" is the canonical sub-command name for which help - is being given, or None if the help is not specific to a - command. - - By default the following template variables are interpolated in - help content. (Note: these are similar to Python 2.4's - string.Template interpolation but not quite.) - - ${name} - The tool's/shell's name, i.e. 'self.name'. - ${option_list} - A formatted table of options for this shell/tool. - ${command_list} - A formatted table of available sub-commands. - ${help_list} - A formatted table of additional help topics (i.e. 'help_*' - methods with no matching 'do_*' method). - ${cmd_name} - The name (and aliases) for this sub-command formatted as: - "NAME (ALIAS1, ALIAS2, ...)". - ${cmd_usage} - A formatted usage block inferred from the command function - signature. - ${cmd_option_list} - A formatted table of options for this sub-command. (This is - only available for commands using the optparse integration, - i.e. using @cmdln.option decorators or manually setting the - 'optparser' attribute on the 'do_*' method.) - - Returns the processed help. - """ - preprocessors = { - "${name}": self._help_preprocess_name, - "${option_list}": self._help_preprocess_option_list, - "${command_list}": self._help_preprocess_command_list, - "${help_list}": self._help_preprocess_help_list, - "${cmd_name}": self._help_preprocess_cmd_name, - "${cmd_usage}": self._help_preprocess_cmd_usage, - "${cmd_option_list}": self._help_preprocess_cmd_option_list, - } - - for marker, preprocessor in preprocessors.items(): - if marker in help: - help = preprocessor(help, cmdname) - return help - - def _help_preprocess_name(self, help, cmdname=None): - return help.replace("${name}", self.name) - - def _help_preprocess_option_list(self, help, cmdname=None): - marker = "${option_list}" - indent, indent_width = _get_indent(marker, help) - suffix = _get_trailing_whitespace(marker, help) - - if self.optparser: - # Setup formatting options and format. - # - Indentation of 4 is better than optparse default of 2. - # C.f. Damian Conway's discussion of this in Perl Best - # Practices. - self.optparser.formatter.indent_increment = 4 - self.optparser.formatter.current_indent = indent_width - block = self.optparser.format_option_help() + '\n' - else: - block = "" - - help = help.replace(indent+marker+suffix, block, 1) - return help - - def _get_cmds_data(self): - # Find any aliases for commands. - token2canonical = self._get_canonical_map() - aliases = {} - for token, cmdname in token2canonical.items(): - if token == cmdname: continue - aliases.setdefault(cmdname, []).append(token) - - # Get the list of (non-hidden) commands and their - # documentation, if any. - cmdnames = {} # use a dict to strip duplicates - for attr in self.get_names(): - if attr.startswith("do_"): - cmdnames[attr[3:]] = True - cmdnames = list(sorted(cmdnames.keys())) - linedata = [] - for cmdname in cmdnames: - if aliases.get(cmdname): - a = aliases[cmdname] - a.sort() - cmdstr = "%s (%s)" % (cmdname, ", ".join(a)) - else: - cmdstr = cmdname - doc = None - try: - helpfunc = getattr(self, 'help_'+cmdname) - except AttributeError: - handler = self._get_cmd_handler(cmdname) - if handler: - doc = handler.__doc__ - else: - doc = helpfunc() - - # Strip "${cmd_name}: " from the start of a command's doc. Best - # practice dictates that command help strings begin with this, but - # it isn't at all wanted for the command list. - to_strip = "${cmd_name}:" - if doc and doc.startswith(to_strip): - #log.debug("stripping %r from start of %s's help string", - # to_strip, cmdname) - doc = doc[len(to_strip):].lstrip() - linedata.append( (cmdstr, doc) ) - - return linedata - - def _help_preprocess_command_list(self, help, cmdname=None): - marker = "${command_list}" - indent, indent_width = _get_indent(marker, help) - suffix = _get_trailing_whitespace(marker, help) - - linedata = self._get_cmds_data() - if linedata: - subindent = indent + ' '*4 - lines = _format_linedata(linedata, subindent, indent_width+4) - block = indent + "Commands:\n" \ - + '\n'.join(lines) + "\n\n" - help = help.replace(indent+marker+suffix, block, 1) - return help - - def _gen_names_and_attrs(self): - # Inheritance says we have to look in class and - # base classes; order is not important. - names = [] - classes = [self.__class__] - while classes: - aclass = classes.pop(0) - if aclass.__bases__: - classes = classes + list(aclass.__bases__) - for name in dir(aclass): - yield (name, getattr(aclass, name)) - - def _get_help_names(self): - """Return a mapping of help topic name to `.help_*()` method.""" - # Determine the additional help topics, if any. - help_names = {} - token2cmdname = self._get_canonical_map() - for attrname, attr in self._gen_names_and_attrs(): - if not attrname.startswith("help_"): continue - help_name = attrname[5:] - if help_name not in token2cmdname: - help_names[help_name] = attr - return help_names - - def _help_preprocess_help_list(self, help, cmdname=None): - marker = "${help_list}" - indent, indent_width = _get_indent(marker, help) - suffix = _get_trailing_whitespace(marker, help) - - help_names = self._get_help_names() - if help_names: - linedata = [(n, a.__doc__ or "") for n, a in help_names.items()] - linedata.sort() - - subindent = indent + ' '*4 - lines = _format_linedata(linedata, subindent, indent_width+4) - block = (indent - + "Additional help topics (run `%s help TOPIC'):\n" % self.name - + '\n'.join(lines) - + "\n\n") - else: - block = '' - help = help.replace(indent+marker+suffix, block, 1) - return help - - def _help_preprocess_cmd_name(self, help, cmdname=None): - marker = "${cmd_name}" - handler = self._get_cmd_handler(cmdname) - if not handler: - raise CmdlnError("cannot preprocess '%s' into help string: " - "could not find command handler for %r" - % (marker, cmdname)) - s = cmdname - if hasattr(handler, "aliases"): - s += " (%s)" % (", ".join(handler.aliases)) - help = help.replace(marker, s) - return help - - #TODO: this only makes sense as part of the Cmdln class. - # Add hooks to add help preprocessing template vars and put - # this one on that class. - def _help_preprocess_cmd_usage(self, help, cmdname=None): - marker = "${cmd_usage}" - handler = self._get_cmd_handler(cmdname) - if not handler: - raise CmdlnError("cannot preprocess '%s' into help string: " - "could not find command handler for %r" - % (marker, cmdname)) - indent, indent_width = _get_indent(marker, help) - suffix = _get_trailing_whitespace(marker, help) - - # Extract the introspection bits we need. - func = handler.__func__ - if func.__defaults__: - func_defaults = list(func.__defaults__) - else: - func_defaults = [] - co_argcount = func.__code__.co_argcount - co_varnames = func.__code__.co_varnames - co_flags = func.__code__.co_flags - CO_FLAGS_ARGS = 4 - CO_FLAGS_KWARGS = 8 - - # Adjust argcount for possible *args and **kwargs arguments. - argcount = co_argcount - if co_flags & CO_FLAGS_ARGS: argcount += 1 - if co_flags & CO_FLAGS_KWARGS: argcount += 1 - - # Determine the usage string. - usage = "%s %s" % (self.name, cmdname) - if argcount <= 2: # handler ::= do_FOO(self, argv) - usage += " [ARGS...]" - elif argcount >= 3: # handler ::= do_FOO(self, subcmd, opts, ...) - argnames = list(co_varnames[3:argcount]) - tail = "" - if co_flags & CO_FLAGS_KWARGS: - name = argnames.pop(-1) - import warnings - # There is no generally accepted mechanism for passing - # keyword arguments from the command line. Could - # *perhaps* consider: arg=value arg2=value2 ... - warnings.warn("argument '**%s' on '%s.%s' command " - "handler will never get values" - % (name, self.__class__.__name__, - func.__name__)) - if co_flags & CO_FLAGS_ARGS: - name = argnames.pop(-1) - tail = "[%s...]" % name.upper() - while func_defaults: - func_defaults.pop(-1) - name = argnames.pop(-1) - tail = "[%s%s%s]" % (name.upper(), (tail and ' ' or ''), tail) - while argnames: - name = argnames.pop(-1) - tail = "%s %s" % (name.upper(), tail) - usage += ' ' + tail - - block_lines = [ - self.helpindent + "Usage:", - self.helpindent + ' '*4 + usage - ] - block = '\n'.join(block_lines) + '\n\n' - - help = help.replace(indent+marker+suffix, block, 1) - return help - - #TODO: this only makes sense as part of the Cmdln class. - # Add hooks to add help preprocessing template vars and put - # this one on that class. - def _help_preprocess_cmd_option_list(self, help, cmdname=None): - marker = "${cmd_option_list}" - handler = self._get_cmd_handler(cmdname) - if not handler: - raise CmdlnError("cannot preprocess '%s' into help string: " - "could not find command handler for %r" - % (marker, cmdname)) - indent, indent_width = _get_indent(marker, help) - suffix = _get_trailing_whitespace(marker, help) - if hasattr(handler, "optparser"): - # Setup formatting options and format. - # - Indentation of 4 is better than optparse default of 2. - # C.f. Damian Conway's discussion of this in Perl Best - # Practices. - handler.optparser.formatter.indent_increment = 4 - handler.optparser.formatter.current_indent = indent_width - block = handler.optparser.format_option_help() + '\n' - else: - block = "" - - help = help.replace(indent+marker+suffix, block, 1) - return help - - def _get_canonical_cmd_name(self, token): - map = self._get_canonical_map() - return map.get(token, None) - - def _get_canonical_map(self): - """Return a mapping of available command names and aliases to - their canonical command name. - """ - cacheattr = "_token2canonical" - if not hasattr(self, cacheattr): - # Get the list of commands and their aliases, if any. - token2canonical = {} - cmd2funcname = {} # use a dict to strip duplicates - for attr in self.get_names(): - if attr.startswith("do_"): cmdname = attr[3:] - elif attr.startswith("_do_"): cmdname = attr[4:] - else: - continue - cmd2funcname[cmdname] = attr - token2canonical[cmdname] = cmdname - for cmdname, funcname in cmd2funcname.items(): # add aliases - func = getattr(self, funcname) - aliases = getattr(func, "aliases", []) - for alias in aliases: - if alias in cmd2funcname: - import warnings - warnings.warn("'%s' alias for '%s' command conflicts " - "with '%s' handler" - % (alias, cmdname, cmd2funcname[alias])) - continue - token2canonical[alias] = cmdname - setattr(self, cacheattr, token2canonical) - return getattr(self, cacheattr) - - def _get_cmd_handler(self, cmdname): - handler = None - try: - handler = getattr(self, 'do_' + cmdname) - except AttributeError: - try: - # Private command handlers begin with "_do_". - handler = getattr(self, '_do_' + cmdname) - except AttributeError: - pass - return handler - - def _do_EOF(self, argv): - # Default EOF handler - # TODO: A mechanism so "EOF" and "KeyboardInterrupt" work as handlers - # but are *not* real available commands. - self.stdout.write('\n') - self.stdout.flush() - self.stop = True - - def _do_KeyboardInterrupt(self, argv): - # Default keyboard interrupt (i.e. ) handler. - # TODO: A mechanism so "EOF" and "KeyboardInterrupt" work as handlers - # but are *not* real available commands. - self.stdout.write('\n') - self.stdout.flush() - - def emptyline(self): - # Different from cmd.Cmd: don't repeat the last command for an - # emptyline. - if self.cmdlooping: - pass - else: - return self.do_help(["help"]) - - -#---- optparse.py extension to fix (IMO) some deficiencies -# -# See the class _OptionParserEx docstring for details. -# - -class StopOptionProcessing(Exception): - """Indicate that option *and argument* processing should stop - cleanly. This is not an error condition. It is similar in spirit to - StopIteration. This is raised by _OptionParserEx's default "help" - and "version" option actions and can be raised by custom option - callbacks too. - - Hence the typical CmdlnOptionParser (a subclass of _OptionParserEx) - usage is: - - parser = CmdlnOptionParser(mycmd) - parser.add_option("-f", "--force", dest="force") - ... - try: - opts, args = parser.parse_args() - except StopOptionProcessing: - # normal termination, "--help" was probably given - sys.exit(0) - """ - -class _OptionParserEx(optparse.OptionParser): - """An optparse.OptionParser that uses exceptions instead of sys.exit. - - This class is an extension of optparse.OptionParser that differs - as follows: - - Correct (IMO) the default OptionParser error handling to never - sys.exit(). Instead OptParseError exceptions are passed through. - - Add the StopOptionProcessing exception (a la StopIteration) to - indicate normal termination of option processing. - See StopOptionProcessing's docstring for details. - - I'd also like to see the following in the core optparse.py, perhaps - as a RawOptionParser which would serve as a base class for the more - generally used OptionParser (that works as current): - - Remove the implicit addition of the -h|--help and --version - options. They can get in the way (e.g. if want '-?' and '-V' for - these as well) and it is not hard to do: - optparser.add_option("-h", "--help", action="help") - optparser.add_option("--version", action="version") - These are good practices, just not valid defaults if they can - get in the way. - """ - def error(self, msg): - raise optparse.OptParseError(msg) - - def exit(self, status=0, msg=None): - if status == 0: - raise StopOptionProcessing(msg) - else: - #TODO: don't lose status info here - raise optparse.OptParseError(msg) - - - -#---- optparse.py-based option processing support - -class CmdlnOptionParser(_OptionParserEx): - """An optparse.OptionParser class more appropriate for top-level - Cmdln options. For parsing of sub-command options, see - SubCmdOptionParser. - - Changes: - - disable_interspersed_args() by default, because a Cmdln instance - has sub-commands which may themselves have options. - - Redirect print_help() to the Cmdln.do_help() which is better - equiped to handle the "help" action. - - error() will raise a CmdlnUserError: OptionParse.error() is meant - to be called for user errors. Raising a well-known error here can - make error handling clearer. - - Also see the changes in _OptionParserEx. - """ - def __init__(self, cmdln, **kwargs): - self.cmdln = cmdln - kwargs["prog"] = self.cmdln.name - _OptionParserEx.__init__(self, **kwargs) - self.disable_interspersed_args() - - def print_help(self, file=None): - self.cmdln.onecmd(["help"]) - - def error(self, msg): - raise CmdlnUserError(msg) - - -class SubCmdOptionParser(_OptionParserEx): - def set_cmdln_info(self, cmdln, subcmd): - """Called by Cmdln to pass relevant info about itself needed - for print_help(). - """ - self.cmdln = cmdln - self.subcmd = subcmd - - def print_help(self, file=None): - self.cmdln.onecmd(["help", self.subcmd]) - - def error(self, msg): - raise CmdlnUserError(msg) - - -def option(*args, **kwargs): - """Decorator to add an option to the optparser argument of a Cmdln - subcommand - - To add a toplevel option, apply the decorator on the class itself. (see - p4.py for an example) - - Example: - @cmdln.option("-E", dest="environment_path") - class MyShell(cmdln.Cmdln): - @cmdln.option("-f", "--force", help="force removal") - def do_remove(self, subcmd, opts, *args): - #... - """ - def decorate_sub_command(method): - """create and add sub-command options""" - if not hasattr(method, "optparser"): - method.optparser = SubCmdOptionParser() - method.optparser.add_option(*args, **kwargs) - return method - def decorate_class(klass): - """store toplevel options""" - assert _forgiving_issubclass(klass, Cmdln) - _inherit_attr(klass, "toplevel_optparser_options", [], cp=lambda l: l[:]) - klass.toplevel_optparser_options.append( (args, kwargs) ) - return klass - - #XXX Is there a possible optimization for many options to not have a - # large stack depth here? - def decorate(obj): - if _forgiving_issubclass(obj, Cmdln): - return decorate_class(obj) - else: - return decorate_sub_command(obj) - return decorate - - -class Cmdln(RawCmdln): - """An improved (on cmd.Cmd) framework for building multi-subcommand - scripts (think "svn" & "cvs") and simple shells (think "pdb" and - "gdb"). - - A simple example: - - import cmdln - - class MySVN(cmdln.Cmdln): - name = "svn" - - @cmdln.aliases('stat', 'st') - @cmdln.option('-v', '--verbose', action='store_true' - help='print verbose information') - def do_status(self, subcmd, opts, *paths): - print "handle 'svn status' command" - - #... - - if __name__ == "__main__": - shell = MySVN() - retval = shell.main() - sys.exit(retval) - - 'Cmdln' extends 'RawCmdln' by providing optparse option processing - integration. See this class' _dispatch_cmd() docstring and general - cmdln document for more information. - """ - - def _dispatch_cmd(self, handler, argv): - """Introspect sub-command handler signature to determine how to - dispatch the command. The raw handler provided by the base - 'RawCmdln' class is still supported: - - def do_foo(self, argv): - # 'argv' is the vector of command line args, argv[0] is - # the command name itself (i.e. "foo" or an alias) - pass - - In addition, if the handler has more than 2 arguments option - processing is automatically done (using optparse): - - @cmdln.option('-v', '--verbose', action='store_true') - def do_bar(self, subcmd, opts, *args): - # subcmd = <"bar" or an alias> - # opts = - if opts.verbose: - print "lots of debugging output..." - # args = - for arg in args: - bar(arg) - - TODO: explain that "*args" can be other signatures as well. - - The `cmdln.option` decorator corresponds to an `add_option()` - method call on an `optparse.OptionParser` instance. - - You can declare a specific number of arguments: - - @cmdln.option('-v', '--verbose', action='store_true') - def do_bar2(self, subcmd, opts, bar_one, bar_two): - #... - - and an appropriate error message will be raised/printed if the - command is called with a different number of args. - """ - co_argcount = handler.__func__.__code__.co_argcount - if co_argcount == 2: # handler ::= do_foo(self, argv) - return handler(argv) - elif co_argcount >= 3: # handler ::= do_foo(self, subcmd, opts, ...) - try: - optparser = handler.optparser - except AttributeError: - optparser = handler.__func__.optparser = SubCmdOptionParser() - assert isinstance(optparser, SubCmdOptionParser) - - # apply subcommand options' defaults from config files, if any. - subcmd = handler.__name__.split('do_', 1)[1] - optparser.set_defaults(**self.get_option_defaults(subcmd)) - - optparser.set_cmdln_info(self, argv[0]) - try: - opts, args = optparser.parse_args(argv[1:]) - except StopOptionProcessing: - #TODO: this doesn't really fly for a replacement of - # optparse.py behaviour, does it? - return 0 # Normal command termination - - try: - return handler(argv[0], opts, *args) - except TypeError: - _, ex, _ = sys.exc_info() - # Some TypeError's are user errors: - # do_foo() takes at least 4 arguments (3 given) - # do_foo() takes at most 5 arguments (6 given) - # do_foo() takes exactly 5 arguments (6 given) - # do_foo() takes exactly 5 positional arguments (6 given) - # Raise CmdlnUserError for these with a suitably - # massaged error message. - tb = sys.exc_info()[2] # the traceback object - if tb.tb_next is not None: - # If the traceback is more than one level deep, then the - # TypeError do *not* happen on the "handler(...)" call - # above. In that we don't want to handle it specially - # here: it would falsely mask deeper code errors. - raise - msg = ex.args[0] - match = _INCORRECT_NUM_ARGS_RE.search(msg) - if match: - msg = list(match.groups()) - msg[1] = int(msg[1]) - 3 - if msg[1] == 1: - msg[2] = msg[2].replace("arguments", "argument") - msg[3] = int(msg[3]) - 3 - msg = ''.join(map(str, msg)) - raise CmdlnUserError(msg) - else: - raise - else: - raise CmdlnError("incorrect argcount for %s(): takes %d, must " - "take 2 for 'argv' signature or 3+ for 'opts' " - "signature" % (handler.__name__, co_argcount)) - - - -#---- support for generating `man` page output from a Cmdln class - -def man_sections_from_cmdln(inst, summary=None, description=None, author=None): - """Return man page sections appropriate for the given Cmdln instance. - Join these sections for man page content. - - The man page sections generated are: - NAME - SYNOPSIS - DESCRIPTION (if `description` is given) - OPTIONS - COMMANDS - HELP TOPICS (if any) - - @param inst {Cmdln} Instance of Cmdln subclass for which to generate - man page content. - @param summary {str} A one-liner summary of the command. - @param description {str} A description of the command. If given, - it will be used for a "DESCRIPTION" section. - @param author {str} The author name and email for the AUTHOR secion - of the man page. - @raises {ValueError} if man page content cannot be generated for the - given class. - """ - if not inst.__class__.name: - raise ValueError("cannot generate man page content: `name` is not " - "set on class %r" % inst.__class__) - data = { - "name": inst.name, - "ucname": inst.name.upper(), - "date": datetime.date.today().strftime("%b %Y"), - "cmdln_version": __version__, - "version_str": inst.version and " %s" % inst.version or "", - "summary_str": summary and r" \- %s" % summary or "", - } - - sections = [] - sections.append('.\\" Automatically generated by cmdln %(cmdln_version)s\n' - '.TH %(ucname)s "1" "%(date)s" "%(name)s%(version_str)s" "User Commands"\n' - % data) - sections.append(".SH NAME\n%(name)s%(summary_str)s\n" % data) - sections.append(_dedent(r""" - .SH SYNOPSIS - .B %(name)s - [\fIGLOBALOPTS\fR] \fISUBCOMMAND \fR[\fIOPTS\fR] [\fIARGS\fR...] - .br - .B %(name)s - \fIhelp SUBCOMMAND\fR - """) % data) - if description: - sections.append(".SH DESCRIPTION\n%s\n" % description) - - section = ".SH OPTIONS\n" - if not hasattr(inst, "optparser") is None: - #HACK: In case `.main()` hasn't been run. - inst.optparser = inst.get_optparser() - lines = inst._help_preprocess("${option_list}", None).splitlines(False) - for line in lines[1:]: - line = line.lstrip() - if not line: - continue - section += ".TP\n" - opts, desc = line.split(' ', 1) - section += ".B %s\n" % opts - section += "%s\n" % _dedent(desc.lstrip(), skip_first_line=True) - sections.append(section) - - section = ".SH COMMANDS\n" - cmds = inst._get_cmds_data() - for cmdstr, doc in cmds: - cmdname = cmdstr.split(' ')[0] # e.g. "commit (ci)" -> "commit" - doc = inst._help_reindent(doc, indent="") - doc = inst._help_preprocess(doc, cmdname) - doc = doc.rstrip() + "\n" # trim down trailing space - section += '.PP\n.SS %s\n%s\n' % (cmdstr, doc) - sections.append(section) - - help_names = inst._get_help_names() - if help_names: - section = ".SH HELP TOPICS\n" - for help_name, help_meth in sorted(help_names.items()): - help = help_meth(inst) - help = inst._help_reindent(help, indent="") - section += '.PP\n.SS %s\n%s\n' % (help_name, help) - sections.append(section) - - if author: - sections.append(".SH AUTHOR\n%s\n" % author) - - return sections - - - -#---- internal support functions - -def _inherit_attr(klass, attr, default, cp): - """Inherit the attribute from the base class - - Copy `attr` from base class (otherwise use `default`). Copying is done using - the passed `cp` function. - - The motivation behind writing this function is to allow inheritance among - Cmdln classes where base classes set 'common' options using the - `@cmdln.option` decorator. To ensure this, we must not write to the base - class's options when handling the derived class. - """ - if attr not in klass.__dict__: - if hasattr(klass, attr): - value = cp(getattr(klass, attr)) - else: - value = default - setattr(klass, attr, value) - -def _forgiving_issubclass(derived_class, base_class): - """Forgiving version of ``issubclass`` - - Does not throw any exception when arguments are not of class type - """ - return (type(derived_class) is ClassType and \ - type(base_class) is ClassType and \ - issubclass(derived_class, base_class)) - -def _format_linedata(linedata, indent, indent_width): - """Format specific linedata into a pleasant layout. - - "linedata" is a list of 2-tuples of the form: - (, ) - "indent" is a string to use for one level of indentation - "indent_width" is a number of columns by which the - formatted data will be indented when printed. - - The column is held to 30 columns. - """ - lines = [] - WIDTH = 78 - indent_width - SPACING = 2 - NAME_WIDTH_LOWER_BOUND = 13 - NAME_WIDTH_UPPER_BOUND = 30 - NAME_WIDTH = max([len(s) for s,d in linedata]) - if NAME_WIDTH < NAME_WIDTH_LOWER_BOUND: - NAME_WIDTH = NAME_WIDTH_LOWER_BOUND - elif NAME_WIDTH > NAME_WIDTH_UPPER_BOUND: - NAME_WIDTH = NAME_WIDTH_UPPER_BOUND - - DOC_WIDTH = WIDTH - NAME_WIDTH - SPACING - for namestr, doc in linedata: - line = indent + namestr - if len(namestr) <= NAME_WIDTH: - line += ' ' * (NAME_WIDTH + SPACING - len(namestr)) - else: - lines.append(line) - line = indent + ' ' * (NAME_WIDTH + SPACING) - line += _summarize_doc(doc, DOC_WIDTH) - lines.append(line.rstrip()) - return lines - -def _summarize_doc(doc, length=60): - r"""Parse out a short one line summary from the given doclines. - - "doc" is the doc string to summarize. - "length" is the max length for the summary - - >>> _summarize_doc("this function does this") - 'this function does this' - >>> _summarize_doc("this function does this", 10) - 'this fu...' - >>> _summarize_doc("this function does this\nand that") - 'this function does this and that' - >>> _summarize_doc("this function does this\n\nand that") - 'this function does this' - """ - import re - if doc is None: - return "" - assert length > 3, "length <= 3 is absurdly short for a doc summary" - doclines = doc.strip().splitlines(0) - if not doclines: - return "" - - summlines = [] - for i, line in enumerate(doclines): - stripped = line.strip() - if not stripped: - break - summlines.append(stripped) - if len(''.join(summlines)) >= length: - break - - summary = ' '.join(summlines) - if len(summary) > length: - summary = summary[:length-3] + "..." - return summary - - -def line2argv(line): - r"""Parse the given line into an argument vector. - - "line" is the line of input to parse. - - This may get niggly when dealing with quoting and escaping. The - current state of this parsing may not be completely thorough/correct - in this respect. - - >>> from cmdln import line2argv - >>> line2argv("foo") - ['foo'] - >>> line2argv("foo bar") - ['foo', 'bar'] - >>> line2argv("foo bar ") - ['foo', 'bar'] - >>> line2argv(" foo bar") - ['foo', 'bar'] - - Quote handling: - - >>> line2argv("'foo bar'") - ['foo bar'] - >>> line2argv('"foo bar"') - ['foo bar'] - >>> line2argv(r'"foo\"bar"') - ['foo"bar'] - >>> line2argv("'foo bar' spam") - ['foo bar', 'spam'] - >>> line2argv("'foo 'bar spam") - ['foo bar', 'spam'] - - >>> line2argv('some\tsimple\ttests') - ['some', 'simple', 'tests'] - >>> line2argv('a "more complex" test') - ['a', 'more complex', 'test'] - >>> line2argv('a more="complex test of " quotes') - ['a', 'more=complex test of ', 'quotes'] - >>> line2argv('a more" complex test of " quotes') - ['a', 'more complex test of ', 'quotes'] - >>> line2argv('an "embedded \\"quote\\""') - ['an', 'embedded "quote"'] - - # Komodo bug 48027 - >>> line2argv('foo bar C:\\') - ['foo', 'bar', 'C:\\'] - - # Komodo change 127581 - >>> line2argv(r'"\test\slash" "foo bar" "foo\"bar"') - ['\\test\\slash', 'foo bar', 'foo"bar'] - - # Komodo change 127629 - >>> if sys.platform == "win32": - ... line2argv(r'\foo\bar') == ['\\foo\\bar'] - ... line2argv(r'\\foo\\bar') == ['\\\\foo\\\\bar'] - ... line2argv('"foo') == ['foo'] - ... else: - ... line2argv(r'\foo\bar') == ['foobar'] - ... line2argv(r'\\foo\\bar') == ['\\foo\\bar'] - ... try: - ... line2argv('"foo') - ... except ValueError, ex: - ... "not terminated" in str(ex) - True - True - True - """ - line = line.strip() - argv = [] - state = "default" - arg = None # the current argument being parsed - i = -1 - WHITESPACE = '\t\n\x0b\x0c\r ' # don't use string.whitespace (bug 81316) - while 1: - i += 1 - if i >= len(line): break - ch = line[i] - - if ch == "\\" and i+1 < len(line): - # escaped char always added to arg, regardless of state - if arg is None: arg = "" - if (sys.platform == "win32" - or state in ("double-quoted", "single-quoted") - ) and line[i+1] not in tuple('"\''): - arg += ch - i += 1 - arg += line[i] - continue - - if state == "single-quoted": - if ch == "'": - state = "default" - else: - arg += ch - elif state == "double-quoted": - if ch == '"': - state = "default" - else: - arg += ch - elif state == "default": - if ch == '"': - if arg is None: arg = "" - state = "double-quoted" - elif ch == "'": - if arg is None: arg = "" - state = "single-quoted" - elif ch in WHITESPACE: - if arg is not None: - argv.append(arg) - arg = None - else: - if arg is None: arg = "" - arg += ch - if arg is not None: - argv.append(arg) - if not sys.platform == "win32" and state != "default": - raise ValueError("command line is not terminated: unfinished %s " - "segment" % state) - return argv - - -def argv2line(argv): - r"""Put together the given argument vector into a command line. - - "argv" is the argument vector to process. - - >>> from cmdln import argv2line - >>> argv2line(['foo']) - 'foo' - >>> argv2line(['foo', 'bar']) - 'foo bar' - >>> argv2line(['foo', 'bar baz']) - 'foo "bar baz"' - >>> argv2line(['foo"bar']) - 'foo"bar' - >>> print argv2line(['foo" bar']) - 'foo" bar' - >>> print argv2line(["foo' bar"]) - "foo' bar" - >>> argv2line(["foo'bar"]) - "foo'bar" - """ - escapedArgs = [] - for arg in argv: - if ' ' in arg and '"' not in arg: - arg = '"'+arg+'"' - elif ' ' in arg and "'" not in arg: - arg = "'"+arg+"'" - elif ' ' in arg: - arg = arg.replace('"', r'\"') - arg = '"'+arg+'"' - escapedArgs.append(arg) - return ' '.join(escapedArgs) - - -# Recipe: dedent (0.1) in /Users/trentm/tm/recipes/cookbook -def _dedentlines(lines, tabsize=8, skip_first_line=False): - """_dedentlines(lines, tabsize=8, skip_first_line=False) -> dedented lines - - "lines" is a list of lines to dedent. - "tabsize" is the tab width to use for indent width calculations. - "skip_first_line" is a boolean indicating if the first line should - be skipped for calculating the indent width and for dedenting. - This is sometimes useful for docstrings and similar. - - Same as dedent() except operates on a sequence of lines. Note: the - lines list is modified **in-place**. - """ - DEBUG = False - if DEBUG: - print("dedent: dedent(..., tabsize=%d, skip_first_line=%r)"\ - % (tabsize, skip_first_line)) - indents = [] - margin = None - for i, line in enumerate(lines): - if i == 0 and skip_first_line: continue - indent = 0 - for ch in line: - if ch == ' ': - indent += 1 - elif ch == '\t': - indent += tabsize - (indent % tabsize) - elif ch in '\r\n': - continue # skip all-whitespace lines - else: - break - else: - continue # skip all-whitespace lines - if DEBUG: print("dedent: indent=%d: %r" % (indent, line)) - if margin is None: - margin = indent - else: - margin = min(margin, indent) - if DEBUG: print("dedent: margin=%r" % margin) - - if margin is not None and margin > 0: - for i, line in enumerate(lines): - if i == 0 and skip_first_line: continue - removed = 0 - for j, ch in enumerate(line): - if ch == ' ': - removed += 1 - elif ch == '\t': - removed += tabsize - (removed % tabsize) - elif ch in '\r\n': - if DEBUG: print("dedent: %r: EOL -> strip up to EOL" % line) - lines[i] = lines[i][j:] - break - else: - raise ValueError("unexpected non-whitespace char %r in " - "line %r while removing %d-space margin" - % (ch, line, margin)) - if DEBUG: - print("dedent: %r: %r -> removed %d/%d"\ - % (line, ch, removed, margin)) - if removed == margin: - lines[i] = lines[i][j+1:] - break - elif removed > margin: - lines[i] = ' '*(removed-margin) + lines[i][j+1:] - break - return lines - -def _dedent(text, tabsize=8, skip_first_line=False): - """_dedent(text, tabsize=8, skip_first_line=False) -> dedented text - - "text" is the text to dedent. - "tabsize" is the tab width to use for indent width calculations. - "skip_first_line" is a boolean indicating if the first line should - be skipped for calculating the indent width and for dedenting. - This is sometimes useful for docstrings and similar. - - textwrap.dedent(s), but don't expand tabs to spaces - """ - lines = text.splitlines(1) - _dedentlines(lines, tabsize=tabsize, skip_first_line=skip_first_line) - return ''.join(lines) - -def _get_indent(marker, s, tab_width=8): - """_get_indent(marker, s, tab_width=8) -> - (, )""" - # Figure out how much the marker is indented. - INDENT_CHARS = tuple(' \t') - start = s.index(marker) - i = start - while i > 0: - if s[i-1] not in INDENT_CHARS: - break - i -= 1 - indent = s[i:start] - indent_width = 0 - for ch in indent: - if ch == ' ': - indent_width += 1 - elif ch == '\t': - indent_width += tab_width - (indent_width % tab_width) - return indent, indent_width - -def _get_trailing_whitespace(marker, s): - """Return the whitespace content trailing the given 'marker' in string 's', - up to and including a newline. - """ - suffix = '' - start = s.index(marker) + len(marker) - i = start - while i < len(s): - if s[i] in ' \t': - suffix += s[i] - elif s[i] in '\r\n': - suffix += s[i] - if s[i] == '\r' and i+1 < len(s) and s[i+1] == '\n': - suffix += s[i+1] - break - else: - break - i += 1 - return suffix - - - -#---- bash completion support -# Note: This is still experimental. I expect to change this -# significantly. -# -# To get Bash completion for a cmdln.Cmdln class, run the following -# bash command: -# $ complete -C 'python -m cmdln /path/to/script.py CmdlnClass' cmdname -# For example: -# $ complete -C 'python -m cmdln ~/bin/svn.py SVN' svn -# -#TODO: Simplify the above so don't have to given path to script (try to -# find it on PATH, if possible). Could also make class name -# optional if there is only one in the module (common case). - -if __name__ == "__main__" and len(sys.argv) == 6: - def _log(s): - return # no-op, comment out for debugging - from os.path import expanduser - fout = open(expanduser("~/tmp/bashcpln.log"), 'a') - fout.write(str(s) + '\n') - fout.close() - - # Recipe: module_from_path (1.0.1+) - def _module_from_path(path): - import imp, os, sys - path = os.path.expanduser(path) - dir = os.path.dirname(path) or os.curdir - name = os.path.splitext(os.path.basename(path))[0] - sys.path.insert(0, dir) - try: - iinfo = imp.find_module(name, [dir]) - return imp.load_module(name, *iinfo) - finally: - sys.path.remove(dir) - - def _get_bash_cplns(script_path, class_name, cmd_name, - token, preceding_token): - _log('--') - _log('get_cplns(%r, %r, %r, %r, %r)' - % (script_path, class_name, cmd_name, token, preceding_token)) - comp_line = os.environ["COMP_LINE"] - comp_point = int(os.environ["COMP_POINT"]) - _log("COMP_LINE: %r" % comp_line) - _log("COMP_POINT: %r" % comp_point) - - try: - script = _module_from_path(script_path) - except ImportError: - _, ex, _ = sys.exc_info() - _log("error importing `%s': %s" % (script_path, ex)) - return [] - shell = getattr(script, class_name)() - cmd_map = shell._get_canonical_map() - del cmd_map["EOF"] - del cmd_map["KeyboardInterrupt"] - - # Determine if completing the sub-command name. - parts = comp_line[:comp_point].split(None, 1) - _log(parts) - if len(parts) == 1 or not (' ' in parts[1] or '\t' in parts[1]): - #TODO: if parts[1].startswith('-'): handle top-level opts - _log("complete sub-command names") - matches = {} - for name, canon_name in cmd_map.items(): - if name.startswith(token): - matches[name] = canon_name - if not matches: - return [] - elif len(matches) == 1: - return matches.keys() - elif len(set(matches.values())) == 1: - return [matches.values()[0]] - else: - return matches.keys() - - # Otherwise, complete options for the given sub-command. - #TODO: refine this so it does the right thing with option args - if token.startswith('-'): - cmd_name = comp_line.split(None, 2)[1] - try: - cmd_canon_name = cmd_map[cmd_name] - except KeyError: - return [] - handler = shell._get_cmd_handler(cmd_canon_name) - optparser = getattr(handler, "optparser", None) - if optparser is None: - optparser = SubCmdOptionParser() - opt_strs = [] - for option in optparser.option_list: - for opt_str in option._short_opts + option._long_opts: - if opt_str.startswith(token): - opt_strs.append(opt_str) - return opt_strs - - return [] - - for cpln in _get_bash_cplns(*sys.argv[1:]): - print(cpln) - - - -## -- contrib -- - -@option("-c", "--configfile", dest="configfile", default=None, - metavar='FILENAME', - help='Configuration file to read options from') -class CmdlnWithConfigParser(Cmdln): - """Cmdln with configparser support - - Add a new option -c --configfile for reading config file; and set - default values for both toplevel and command-specific options. - - See examples/cfgexample.py - """ - - class NoConfigFile(Exception): pass - - def __init__(self, default_configfile=None, *args, **kwargs): - Cmdln.__init__(self, *args, **kwargs) - self._cfgparser = None - self._default_configfile = default_configfile - - def get_optparser(self): - parser = Cmdln.get_optparser(self) - parser.set_default('configfile', self._default_configfile) - return parser - - def _load_config(self): - if not self._cfgparser: - if self.options.configfile: - self._cfgparser = ConfigParser.SafeConfigParser() - if not path.exists(self.options.configfile): - raise CmdlnUserError( - 'config file "%s" does not exist' % \ - self.options.configfile) - self._cfgparser.read(self.options.configfile) - else: - raise self.NoConfigFile - - def get_option_defaults(self, cmd): - try: - self._load_config() - except self.NoConfigFile: - return {} - else: - section = cmd or 'cmdln' - try: - return dict(self._cfgparser.items(section)) - except ConfigParser.NoSectionError: - return {} - diff --git a/clint/packages/applib/_compression.py b/clint/packages/applib/_compression.py deleted file mode 100644 index d60029c..0000000 --- a/clint/packages/applib/_compression.py +++ /dev/null @@ -1,213 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -import sys -import os -from os import path -import tarfile -import zipfile -from contextlib import closing - -from applib import sh - -__all__ = ['implementors'] - - -class CompressedFile: - - def __init__(self, filename): - self.filename = filename - - def extractall_with_single_toplevel(self, f, names): - """Same as ``extractall`` but ensures a single toplevel directory - - Some compressed archives do not stick to the convension of having a - single top-level directory. For eg., - http://code.google.com/p/grapefruit/issues/detail?id=3 - - In such cases, a new toplevel directory corresponding to the name of the - compressed file (eg: 'grapefruit-0.1a3' if compressed file is named - 'grapefruit-0.1a3.tar.gz') is created and then extraction happens - *inside* that directory. - - - f: tarfile/zipefile file object - - names: List of filenames in the archive - - Return the absolute path to the toplevel directory. - """ - toplevels = _find_top_level_directories(names, sep='/') - - if len(toplevels) == 0: - raise sh.PackError('archive is empty') - elif len(toplevels) > 1: - toplevel = _archive_basename(self.filename) - os.mkdir(toplevel) - with sh.cd(toplevel): - f.extractall() - return path.abspath(toplevel) - else: - f.extractall() - toplevel = path.abspath(toplevels[0]) - assert path.exists(toplevel) - if not path.isdir(toplevel): - # eg: http://pypi.python.org/pypi/DeferArgs/0.4 - raise SingleFile('archive has a single file: %s', toplevel) - return toplevel - - - -class ZippedFile(CompressedFile): - """A zip file""" - - @staticmethod - def is_valid(filename): - return zipfile.is_zipfile(filename) - - def extract(self): - try: - f = zipfile.ZipFile(self.filename, 'r') - try: - return self.extractall_with_single_toplevel( - f, f.namelist()) - except OSError as e: - if e.errno == 17: - # http://bugs.python.org/issue6510 - raise sh.PackError(e) - # http://bugs.python.org/issue6609 - if sys.platform.startswith('win'): - if isinstance(e, WindowsError) and e.winerror == 267: - raise sh.PackError('uses Windows special name (%s)' % e) - raise - except IOError as e: - # http://bugs.python.org/issue10447 - if sys.platform == 'win32' and e.errno == 2: - raise sh.PackError('reached max path-length: %s' % e) - raise - finally: - f.close() - except (zipfile.BadZipfile, zipfile.LargeZipFile) as e: - raise sh.PackError(e) - - @classmethod - def pack(cls, paths, file): - raise NotImplementedError('pack: zip files not supported yet') - - -class TarredFile(CompressedFile): - """A tar.gz/bz2 file""" - - @classmethod - def is_valid(cls, filename): - try: - with closing(tarfile.open(filename, cls._get_mode())) as f: - return True - except tarfile.TarError: - return False - - def extract(self): - try: - f = tarfile.open(self.filename, self._get_mode()) - try: - _ensure_read_write_access(f) - return self.extractall_with_single_toplevel( - f, f.getnames()) - finally: - f.close() - except tarfile.TarError as e: - raise sh.PackError(e) - except IOError as e: - # see http://bugs.python.org/issue6584 - if 'CRC check failed' in str(e): - raise sh.PackError(e) - # See github issue #10 - elif e.errno == 22 and "invalid mode ('wb')" in str(e): - raise sh.PackError(e) - else: - raise - - @classmethod - def pack(cls, paths, file): - f = tarfile.open(file, cls._get_mode('w')) - try: - for pth in paths: - assert path.exists(pth), '"%s" does not exist' % path - f.add(pth) - finally: - f.close() - - def _get_mode(self): - """Return the mode for this tarfile""" - raise NotImplementedError() - - -class GzipTarredFile(TarredFile): - """A tar.gz2 file""" - - @staticmethod - def _get_mode(mode='r'): - assert mode in ['r', 'w'] - return mode + ':gz' - - -class Bzip2TarredFile(TarredFile): - """A tar.gz2 file""" - - @staticmethod - def _get_mode(mode='r'): - assert mode in ['r', 'w'] - return mode + ':bz2' - - -implementors = dict( - zip = ZippedFile, - tgz = GzipTarredFile, - bz2 = Bzip2TarredFile) - - -class MultipleTopLevels(sh.PackError): - """Can be extracted, but contains multiple top-level dirs""" -class SingleFile(sh.PackError): - """Contains nothing but a single file. Compressed archived is expected to - contain one directory - """ - -def _ensure_read_write_access(tarfileobj): - """Ensure that the given tarfile will be readable and writable by the - user (the client program using this API) after extraction. - - Some tarballs have u-x set on directories or u-w on files. We reset such - perms here.. so that the extracted files remain accessible for reading - and deletion as per the user's wish. - - See also: http://bugs.python.org/issue6196 - """ - dir_perm = tarfile.TUREAD | tarfile.TUWRITE | tarfile.TUEXEC - file_perm = tarfile.TUREAD | tarfile.TUWRITE - - for tarinfo in tarfileobj.getmembers(): - tarinfo.mode |= (dir_perm if tarinfo.isdir() else file_perm) - - -def _find_top_level_directories(fileslist, sep): - """Find the distinct first components in the fileslist""" - toplevels = set() - for pth in fileslist: - firstcomponent = pth.split(sep, 1)[0] - toplevels.add(firstcomponent) - return list(toplevels) - - -def _archive_basename(filename): - """Return a suitable base directory name for the given archive""" - exts = ( - '.tar.gz', - '.tgz', - '.tar.bz2', - '.bz2', - '.zip') - - filename = path.basename(filename) - - for ext in exts: - if filename.endswith(ext): - return filename[:-len(ext)] - return filename + '.dir' diff --git a/clint/packages/applib/_proc.py b/clint/packages/applib/_proc.py deleted file mode 100644 index 171caaa..0000000 --- a/clint/packages/applib/_proc.py +++ /dev/null @@ -1,155 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -"""Process execution wrappers -""" - -from __future__ import unicode_literals -import os -import sys -import time -import subprocess -from tempfile import TemporaryFile -import warnings - -from applib.misc import xjoin -from applib.misc import safe_unicode - -__all__ = ['run', 'RunError', 'RunNonZeroReturn', 'RunTimedout'] - -warnings.filterwarnings('ignore', message='.*With\-statements.*', - category=DeprecationWarning) - - -class RunError(Exception): - - def __init__(self, cmd, stdout, stderr, errors): - self.stdout = stdout - self.stderr = stderr - - msg = errors[:] - msg.extend([ - 'command: {0}'.format(safe_unicode(cmd)), - 'pwd: {0}'.format(xjoin(os.getcwd()))]) - - if stderr is None: - msg.append( - 'OUTPUT:\n{0}'.format(_limit_str(safe_unicode(stdout)))) - else: - msg.extend([ - 'STDERR:\n{0}'.format(_limit_str(safe_unicode(stderr))), - 'STDOUT:\n{0}'.format(_limit_str(safe_unicode(stdout)))]) - - super(RunError, self).__init__('\n'.join(msg)) - - -class RunNonZeroReturn(RunError): - """The command returned non-zero exit code""" - - def __init__(self, p, cmd, stdout, stderr): - super(RunNonZeroReturn, self).__init__(cmd, stdout, stderr, [ - 'non-zero returncode: {0}'.format(p.returncode) - ]) - - -class RunTimedout(RunError): - """process is taking too much time""" - - def __init__(self, cmd, timeout, stdout, stderr): - super(RunTimedout, self).__init__(cmd, stdout, stderr, [ - 'timed out; ergo process is terminated', - 'seconds elapsed: {0}'.format(timeout), - ]) - - -# TODO: support for incremental results (sometimes a process run for a few -# minutes, but we need to send the stdout as soon as it appears. -def run(cmd, merge_streams=False, timeout=None, env=None): - """Improved replacement for commands.getoutput() - - The following features are implemented: - - - timeout (in seconds) - - support for merged streams (stdout+stderr together) - - `cmd` can be a full command string, or list of prog/args. - - Note that returned data is of *undecoded* str/bytes type (not unicode) - - Return (stdout, stderr) - """ - if isinstance(cmd, (list, tuple)): - shell = False - else: - shell = True - # Fix for cmd.exe quote issue. See comment #3 and #4 in - # http://firefly.activestate.com/sridharr/pypm/ticket/126#comment:3 - if sys.platform.startswith('win') and cmd.startswith('"'): - cmd = '"{0}"'.format(cmd) - - # redirect stdout and stderr to temporary *files* - with TemporaryFile() as outf: - with TemporaryFile() as errf: - p = subprocess.Popen(cmd, env=env, shell=shell, stdout=outf, - stderr=outf if merge_streams else errf) - - if timeout is None: - p.wait() - else: - # poll for terminated status till timeout is reached - t_nought = time.time() - seconds_passed = 0 - while True: - if p.poll() is not None: - break - seconds_passed = time.time() - t_nought - if timeout and seconds_passed > timeout: - p.terminate() - raise RunTimedout( - cmd, timeout, - _read_tmpfd(outf), - None if merge_streams else _read_tmpfd(errf)) - time.sleep(0.1) - - # the process has exited by now; nothing will to be written to - # outfd/errfd anymore. - stdout = _read_tmpfd(outf) - stderr = _read_tmpfd(errf) - - if p.returncode != 0: - raise RunNonZeroReturn(p, cmd, stdout, None if merge_streams else stderr) - else: - return stdout, stderr - - -def _read_tmpfd(fil): - """Read from a temporary file object - - Call this method only when nothing more will be written to the temporary - file - i.e., all the writing has already been done. - """ - fil.seek(0) - return fil.read() - - -def _limit_str(s, maxchars=80*15): - if len(s) > maxchars: - return '[...]\n' + s[-maxchars:] - return s - - -def _disable_windows_error_popup(): - """Set error mode to disable Windows error popup - - This setting is effective for current process and all the child processes - """ - # disable nasty critical error pop-ups on Windows - import win32api, win32con - win32api.SetErrorMode(win32con.SEM_FAILCRITICALERRORS | - win32con.SEM_NOOPENFILEERRORBOX) -if sys.platform.startswith('win'): - try: - import win32api - except ImportError: - pass # XXX: this means, you will get annoying popups - else: - _disable_windows_error_popup() diff --git a/clint/packages/applib/_simpledb.py b/clint/packages/applib/_simpledb.py deleted file mode 100644 index e3b1720..0000000 --- a/clint/packages/applib/_simpledb.py +++ /dev/null @@ -1,269 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -"""Simple wrapper around SQLalchemy - -This module hides the complexity of SQLAlchemy to provide a simple interface to -store and manipulate Python objects each with a set of properties. Unlike the -default behaviour of sqlalchemy's declaritive_base, inheritance of objects will -not require "join", rather it creates a separate table. This makes it easy to -use objects around from parts of not-so-related applications. - -For example, a ``SourcePackage`` table is created by Grail. Then, PyPM will -extend it as ``BinaryPackage`` which gets extended to ``RepoPackage``. The table -for RepoPackage will be concretely inherited, meaning - there will be just be -one table without having to 'join' to another SourcePackage table. - -At the moment, PyPM and Grail use this module. It may not be of use to others, -and we may change the api/behaviour. Hence, it makes sense to keep it as an -internal module. -""" - -import sys -import os -from os.path import exists, dirname -from contextlib import contextmanager -import json - -from sqlalchemy import Table, Column, MetaData -from sqlalchemy import create_engine -from sqlalchemy.types import String, Text, Boolean, PickleType -from sqlalchemy.orm import sessionmaker, scoped_session, mapper - - -# A PickleType that will work on both Python 2.x and 3.x -# i.e., if you *write* to a DB entry using Python 3.x, we are letting -# Python 3.x apps to read from it as well. -# WARNING: Ideally, if you are starting a new project, please -# use something else like JSON. See -# http://twitter.com/zzzeek/status/9765871731867648 -Pickle2Type = PickleType(protocol=2) - - -def setup(db_class, simple_object_cls, primary_keys): - """A simple API to configure the metadata""" - table_name = simple_object_cls.__name__ - column_names = simple_object_cls.FIELDS - - metadata = MetaData() - table = Table(table_name, metadata, - *[Column(cname, _get_best_column_type(cname), - primary_key=cname in primary_keys) - for cname in column_names]) - - db_class.metadata = metadata - db_class.mapper_class = simple_object_cls - db_class.table = table - - mapper(simple_object_cls, table) - - -def sqlalchemy_escape(val, escape_char, special_chars): - """Escape a string according for use in LIKE operator - - >>> sqlalchemy_escape("text_table", "\\", "%_") - 'text\_table' - """ - if sys.version_info[:2] >= (3, 0): - assert isinstance(val, str) - else: - assert isinstance(val, basestring) - result = [] - for c in val: - if c in special_chars + escape_char: - result.extend(escape_char + c) - else: - result.extend(c) - return ''.join(result) - - -class SimpleDatabase(object): - metadata = None # to be set up derived classes - - class DoesNotExist(IOError): - def __init__(self, path): - super(IOError, self).__init__( - 'database file %s does not exist' % path) - - def __init__(self, path, touch=False): - """ - touch - create database, if it does not exist - """ - self.path = path - sqlite_uri = 'sqlite:///%s' % self.path - self.engine = create_engine(sqlite_uri, echo=False) - self.create_session = sessionmaker( - bind=self.engine, - autocommit=False, - - # See the comment by Michael Bayer - # http://groups.google.com/group/sqlalchemy/browse_thread/thread/7c1eb642435adde7 - # expire_on_commit=False - ) - self.create_scoped_session = scoped_session(self.create_session) - - if not exists(self.path): - if touch: - assert exists(dirname(self.path)), 'missing: ' + dirname(self.path) - self.metadata.create_all(self.engine) - else: - raise self.DoesNotExist(path) - - def reset(self): - """Reset the database - - Drop all tables and recreate them - """ - self.metadata.drop_all(self.engine) - self.metadata.create_all(self.engine) - - def close(self): - self.engine.dispose() - - @contextmanager - def transaction(self, session=None): - """Start a new transaction based on the passed session object. If session - is not passed, then create one and make sure of closing it finally. - """ - local_session = None - if session is None: - local_session = session = self.create_scoped_session() - try: - yield session - finally: - # Since ``local_session`` was created locally, close it here itself - if local_session is not None: - # but wait! - # http://groups.google.com/group/sqlalchemy/browse_thread/thread/7c1eb642435adde7 - # To workaround this issue with sqlalchemy, we can either: - # 1) pass the session object explicitly - # 2) do not close the session at all (bad idea - could lead to memory leaks) - # - # Till pypm implements atomic transations in client.installer, - # we retain this hack (i.e., we choose (2) for now) - pass # local_session.close() - - def __str__(self): - return '{0.__class__.__name__}<{0.path}>'.format(self) - - -class SimpleObject(object): - """Object with a collection of fields. - - The following features are supported: - - 1) Automatically initialize the fields in __init__ - 2) Inherit and extend with additional fields - 2) Ability to convert from other object types (with extra/less fields) - 3) Interoperate with sqlalchemy.orm (i.e., plain `self.foo=value` works) - """ - - # Public fields in this object - FIELDS = [] - - def __init__(self, **kwargs): - """Initialize the object with FIELDS whose values are in ``kwargs``""" - self.__assert_field_mapping(kwargs) - for field in self.FIELDS: - setattr(self, field, kwargs[field]) - - @classmethod - def create_from(cls, another, **kwargs): - """Create from another object of different type. - - Another object must be from a derived class of SimpleObject (which - contains FIELDS) - """ - reused_fields = {} - for field, value in another.get_fields(): - if field in cls.FIELDS: - reused_fields[field] = value - reused_fields.update(kwargs) - return cls(**reused_fields) - - def get_fields(self): - """Return fields as a list of (name,value)""" - for field in self.FIELDS: - yield field, getattr(self, field) - - def to_dict(self): - return dict(self.get_fields()) - - def to_json(self): - return json.dumps(self.to_dict()) - - @classmethod - def from_json(cls, json_string): - values = json.loads(json_string) - return cls(**_remove_unicode_keys(values)) - - def __assert_field_mapping(self, mapping): - """Assert that mapping.keys() == FIELDS. - - The programmer is not supposed to pass extra/less number of fields - """ - passed_keys = set(mapping.keys()) - class_fields = set(self.FIELDS) - - if passed_keys != class_fields: - raise ValueError('\n'.join([ - "{0} got different fields from expected".format( - self.__class__), - " got : {0}".format(list(sorted(passed_keys))), - " expected: {0}".format(list(sorted(class_fields)))])) - - -class _get_best_column_type(): - """Return the best column type for the given name.""" - mapping = dict( - name = String, - version = String, - keywords = String, - home_page = String, - license = String, - author = String, - author_email = String, - maintainer = String, - maintainer_email = String, - osarch = String, - pyver = String, - pkg_version = String, - relpath = String, - tags = String, - original_source = String, - patched_source = String, - - summary = Text, - description = Text, - - python3 = Boolean, - metadata_hash = String, - - install_requires = Pickle2Type, - files_list = Pickle2Type, - ) - - def __call__(self, name): - try: - return self.mapping[name] - except KeyError: - raise KeyError( - 'missing key. add type for "{0}" in self.mapping'.format( - name)) -_get_best_column_type = _get_best_column_type() - - -def _remove_unicode_keys(dictobj): - """Convert keys from 'unicode' to 'str' type. - - workaround for - """ - if sys.version_info[:2] >= (3, 0): return dictobj - - assert isinstance(dictobj, dict) - - newdict = {} - for key, value in dictobj.items(): - if type(key) is unicode: - key = key.encode('utf-8') - newdict[key] = value - return newdict diff --git a/clint/packages/applib/base.py b/clint/packages/applib/base.py deleted file mode 100644 index 1814e07..0000000 --- a/clint/packages/applib/base.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -"""Base module""" - -import sys -from os.path import abspath, join, expanduser -import logging - -from appdirs import AppDirs - -from applib import location, log -from applib.log import LogawareCmdln as Cmdln - -__all__ = ['Application', 'Cmdln'] - - -class Application(object): - """Object representing the application - - - name: Name of the application - - - company: Company developing the application - - - compatibility_version: The major version which promises - backward-compatability among all of its minor - versions. Eg: 5.2; 5.2.1, 5.2.2, etc.. should - use the same compatability version (5.2). This - value is used in the settings directory path. - - - locations: An object holding a set of OS-specific but - generic location values (eg: APPDATA). See - ``Locations`` class for details. - """ - - def __init__(self, name, company, compatibility_version=None): - self.name = name - self.company = company - self.compatibility_version = compatibility_version - self.locations = AppDirs2( - name, company, compatibility_version, roaming=False) - - def run(self, cmdln_class): - """Run the application using the given cmdln processor. - - This method also ensures configuration of logging handlers for console - """ - assert issubclass(cmdln_class, Cmdln) - l = logging.getLogger('') - log.setup_trace(l, self.locations.log_file_path) - cmdln_class(install_console=True).main() - - -class AppDirs2(AppDirs): - @property - def log_file_path(self): - if sys.platform in ('win32', 'darwin'): - name = self.appname + '.log' - else: - name = self.appname.lower() + '.log' - return join(self.user_log_dir, name) - - -if __name__ == '__main__': - # self-test code - app = Application('PyPM', 'ActiveState', '0.1') - print('user_data_dir', app.locations.user_data_dir) - print('site_data_dir', app.locations.site_data_dir) - print('user_cache_dir', app.locations.user_cache_dir) - print('log_file_path', app.locations.log_file_path) - diff --git a/clint/packages/applib/location.py b/clint/packages/applib/location.py deleted file mode 100644 index dc6412e..0000000 --- a/clint/packages/applib/location.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -# This module is deprecated - -from appdirs import * - - -#---- self test code - -if __name__ == "__main__": - print("applib: user data dir: %s" % user_data_dir("Komodo", "ActiveState")) - print("applib: site data dir: %s" % site_data_dir("Komodo", "ActiveState")) - print("applib: user cache dir: %s" % user_cache_dir("Komodo", "ActiveState")) - diff --git a/clint/packages/applib/log.py b/clint/packages/applib/log.py deleted file mode 100644 index a46792d..0000000 --- a/clint/packages/applib/log.py +++ /dev/null @@ -1,336 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -"""Logging utilities and console integration - -Don't use print, use ``LOG.info``. This ensures seemless integration with -application logging. -""" - -import sys -import os -import stat -from os.path import expanduser, join, exists, isabs, dirname -import logging -from datetime import datetime -from contextlib import contextmanager - -from applib import sh, textui, _cmdln as cmdln - - -if sys.hexversion > 0x03000000: - def unicode_literal(s): - return s # strings are unicode by default on py3 -else: - from io import open - def unicode_literal(s): - return s.decode('utf-8') - - -@cmdln.option('-v', '--verbose', action="count", dest='verbosity_level', - default=None, - help='-v will show tracebacks; -vv also debug messages') -class LogawareCmdln(cmdln.CmdlnWithConfigParser): - """A Cmdln class that integrates with this modules's functionality - - 1. Add -v and -vv global options: show tracebacks when sub commands throw - them only if -v or -vv is passed by the user. - - 2. Wrap all sub command methods and call `initialize` (to be defined by the - derived class) automatically. - """ - - def __init__(self, install_console=False, default_verbosity=0, *args, **kwargs): - """ - Arguments: - - install_console: install console handlers in logger - """ - cmdln.CmdlnWithConfigParser.__init__(self, *args, **kwargs) - self.__initialized = False - self.__install_console = install_console - self.__default_verbosity = default_verbosity - - def initialize(self): - """This method is called by ``bootstrapped`` - once and only once.""" - raise NotImplementedError('must be defined by the derived class') - - @contextmanager - def bootstrapped(self): - """Run the sub-command after bootstrapping - - It is required to wrap the sub-command code in this context, which takes - care of the following: - - - Invokes `setup_console` passing `verbosity_level` - - Invokes `self.initialize` automatically but no more than once. - - Intercept unhandled exceptions and display them according to - verbosity_level - """ - l = logging.getLogger('') - - if not self.__initialized: - # install console (if required) and call the `initialize` method - # once. - if self.__install_console: - if self.options.verbosity_level is None: - self.options.verbosity_level = self.__default_verbosity - setup_console(l, self.options.verbosity_level) - with self.__run_safely(l): - self.initialize() - self.__initialized = True - - with self.__run_safely(l): - yield - - @contextmanager - def __run_safely(self, l): - try: - yield - except KeyboardInterrupt: - # user presses Ctrl-C to terminate the program - l.info('') # print a new-line for the shell prompt's sake - sys.exit(5) - except Exception as e: - if self.__install_console: - # setup_console handles all exceptions; let it do so by calling - # log.exception and exitting immediately. - l.exception(e) - sys.exit(1) # exit to shell - else: - # as setup_console is not used, raise exceptions normally. - raise - - -def setup_console(l, verbosity_level): - """Setup console output for logging calls""" - l.setLevel(logging.DEBUG) # level-logic is instead in the handler - - existing_consoles = [h for h in l.handlers if isinstance(h, ConsoleHandler)] - if existing_consoles: - assert len(existing_consoles) == 1, \ - 'more than one console installed. not possible.' - # re-use existing console handler - h = existing_consoles[0] - assert h.verbosity_level == verbosity_level, \ - 'already has console with different verbosity level' - else: - # create a new console handler - h = ConsoleHandler(verbosity_level) - h.setFormatter(ConsoleFormatter()) - l.addHandler(h) - - -def setup_trace(l, tracefile): - """Trace logging calls to a standard log file - - Log file name and location will be determined based on platform. - """ - l.setLevel(logging.DEBUG) # trace file must have >=DEBUG entries - sh.mkdirs(dirname(tracefile)) - _rollover_log(tracefile) - _begin_log_section(tracefile) - h = logging.FileHandler(tracefile) - h.setFormatter( - logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s")) - l.addHandler(h) - - -@contextmanager -def handledby(l, filename, create_dirs=False, level=None, formatter=None): - """Momentarily handle logger `l` using FileHandler - - Within the 'with' context, all logging calls made to the logger `l` will get - written to the file `filename`. When exiting the 'with' context, this file - is closed and the handler will be removed. - """ - assert isabs(filename), 'not an absolute path: {0}'.format(filename) - - if create_dirs: - sh.mkdirs(dirname(filename)) - - h = logging.FileHandler(filename) - if level: - h.setLevel(level) - if formatter: - h.setFormatter(formatter) - l.addHandler(h) - - try: - yield - finally: - h.close() - l.removeHandler(h) - - -@contextmanager -def archivedby(l, logs_directory, entity_name, level=None, formatter=None): - """Like `handledby` but the log file is stored in archive. - - The exact path to the log file is determined as follows: - - $logs_directory/2009/03/24/142356_$entity_name.txt - """ - now = datetime.now() # NOTE: this is local time, not UTC time. - filename = join(logs_directory, - now.strftime('%Y'), now.strftime('%m'), now.strftime('%d'), - '{0}_{1}.txt'.format(now.strftime('%H%M%S'), entity_name)) - assert not exists(filename), 'already exists: {0}'.format(filename) - with handledby(l, filename, create_dirs=True, - level=level, formatter=formatter): - yield filename - - -@contextmanager -def wrapped(l): - """'With' context to intercept and log any exceptions raised""" - try: - yield - except Exception as e: - l.exception(e) - raise - - -def runonconsole(l): - """Run on console .. and exit the program appropriately. - - If an exception is raised, it is silently logged (so - - - >>> with log.run(logging.getLogger('pypm')) as retcode: - """ - try: - yield - except Exception as e: - l.exception(e) - sys.exit(1) - sys.exit(0) - - -try: - from logging import NullHandler -except ImportError: - class NullHandler(logging.Handler): - def emit(self, record): - pass - - -# -- internal - -def _rollover_log(logfile, maxsize=(2<<20)): - """Move $logfile to $logfile.old if its size exceeds `maxsize`""" - if exists(logfile): - filesize = os.stat(logfile)[stat.ST_SIZE] - if filesize >= maxsize: - sh.mv(logfile, logfile+'.old') - - -def _begin_log_section(logfile): - """Begin a new section in the logfile - - Also write the current datetime - """ - LINE_BUFFERED=1 - with open(logfile, 'a', LINE_BUFFERED, encoding='utf-8') as f: - f.write(unicode_literal('\n')) # sections are separated by newline - f.write(unicode_literal('{0}\n'.format(datetime.now()))) - - -class ConsoleHandler(logging.StreamHandler): - """Send messages to console - - INFO messages are sent to stdout. Other levels to stderr. - - By default, INFO/WARN/ERROR messages are sent as-it-is to console .. while - EXCEPTION messages are pruned and shown as error unless verbosity level is - greater than zero. If verbosity level is greater than one, then DEBUG - messages are also shown. - """ - - def __init__(self, verbosity_level): - logging.StreamHandler.__init__(self) - self.stream = None # reset it; we are not going to use it anyway - self.verbosity_level = verbosity_level - - def emit(self, record): - if record.levelno == logging.INFO: - self.__emit(record, sys.stdout) - elif record.levelno == logging.WARN: - self.__emit(record, sys.stderr) - elif record.levelno == logging.DEBUG: - # show DEBUG messages with verbosity_level >= 2 - if self.verbosity_level > 1: - self.__emit(record, sys.stderr) - elif record.levelno >= logging.ERROR: - if record.exc_info and self.verbosity_level < 1: - # supress full traceback with verbosity_level <= 0 - with new_record_exc_info(record, None): - self.__emit(record, sys.stderr) - else: - self.__emit(record, sys.stderr) - else: - raise NotImplementedError( - "don't know about level: {0}".format(record.levelno)) - - def __emit(self, record, strm): - # override handler stream with ours (which could stdout or stderr) - self.stream = strm - - with textui.safe_output(): - # We *trust* that `logging` module's `emit()` will always terminate the - # message with newlines. This is essential for not breaking the progress - # bar, if any. - logging.StreamHandler.emit(self, record) - - def flush(self): - # Workaround a bug in logging module - # See: - # http://bugs.python.org/issue6333 - if self.stream and hasattr(self.stream, 'flush') and not self.stream.closed: - try: - logging.StreamHandler.flush(self) - except IOError as e: - if e.errno == 32: - # skip 'broken pipe' errors that likely occur due to - # killing the process on the other end of the pipe - # eg: piping command output to `less` and then pressing - # Q in the middle of it. - pass - else: - raise - - -def _clear_record_traceback_cache(record): - """Clear the traceback cache stored in `record` (LogRecord) - - Workaround for: http://bugs.python.org/issue6435 - """ - record.exc_text = None - - -@contextmanager -def new_record_exc_info(record, exc_info): - """Temporarily assign `exc_info` to `record`""" - _clear_record_traceback_cache(record) - old_exc_info = record.exc_info - record.exc_info = exc_info - try: - yield - finally: - record.exc_info = old_exc_info - _clear_record_traceback_cache(record) - - -class ConsoleFormatter(logging.Formatter): - """A formatter that attaches 'error:' prefix to error/critical messages""" - - def format(self, record): - # attach 'error:' prefix to error/critical messages - # attach 'warning:' prefix accordingly - s = logging.Formatter.format(self, record) - if record.levelno >= logging.ERROR: - return 'error: {0}'.format(s) - elif record.levelno == logging.WARNING: - return 'warning: {0}'.format(s) - else: - return s - diff --git a/clint/packages/applib/misc.py b/clint/packages/applib/misc.py deleted file mode 100644 index 80b1bb0..0000000 --- a/clint/packages/applib/misc.py +++ /dev/null @@ -1,104 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -"""Miscelleneous utility functions -""" - -import sys -from os import path -import six - -from applib import _cmdln as cmdln - -__all__ = ['xjoin', 'existing'] - - -def xjoin(*c): - """Equivalent to normpath(abspath(join(*c)))""" - return path.normpath(path.abspath(path.join(*c))) - - -def existing(pth): - """Return path, but assert its presence first""" - assert isinstance(pth, (str, unicode)), \ - 'not of string type: %s <%s>' % (pth, type(pth)) - assert exists(pth), 'file/directory not found: %s' % pth - return pth - - -def require_option(options, option_name, details=None): - """ - >>> require_option('foo-bar') - ... - CmdlnUserError: required option, --foo-bar, is mising - - From http://twitter.com/ActiveState/status/19782350475 - 'required options' - conforming to unix standard vs being creative with - non-positional arguments. http://bit.ly/d2iiUL #python #optparse ^SR - """ - option_var_name = option_name.replace('-', '_') - if not hasattr(options, option_var_name): - raise ValueError( - "require_option: undefined option '%s'" % option_var_name) - if getattr(options, option_var_name) is None: - msg = 'required option "--{0}" is missing'.format(option_name) - if details: - msg = '%s (%s)' % (msg, details) - raise cmdln.CmdlnUserError(msg) - - -def safe_unicode(obj): - """Return the unicode/text representation of `obj` without throwing UnicodeDecodeError - - Returned value is only a *representation*, not necessarily identical. - """ - if type(obj) not in (six.text_type, six.binary_type): - obj = six.text_type(obj) - if type(obj) is six.text_type: - return obj - else: - return obj.decode(errors='ignore') - - -def _hack_unix2win_path_conversion(cmdln_options, option_names): - """Hack to convert Unix paths in cmdln options (via config file) to - Windows specific netshare location - - Config file must define the mapping as config var "unix2win_path_mapping" - """ - require_option(cmdln_options, 'unix2win_path_mapping') - - for opt in option_names: - setattr( - cmdln_options, - opt, - _cmdln_canonical_path( - cmdln_options.unix2win_path_mapping, - getattr(cmdln_options, opt))) - - -def _cmdln_canonical_path(unix2win_path_mapping, unixpath): - """Given a unix path return the platform-specific path - - On Windows, use the given mapping to translate the path. On Unix platforms, - this function essentially returns `unixpath`. - - The mapping is simply a buildout.cfg-friendly multiline string that would - get parsed as dictionary which should have path prefixes as keys, and the - translated Windows net share path as the values. See PyPM's - etc/activestate.conf for an example. - - The mapping is typically supposed to be defined in the config file under - the cmdln section. This function is used by PyPM and Grail. - """ - unix2win_path_mapping = unix2win_path_mapping or "" - - # convert buildout.cfg-style multiline mapping to a dict - m = dict([ - [x.strip() for x in line.strip().split(None, 1)] - for line in unix2win_path_mapping.splitlines() if line.strip()]) - - if sys.platform.startswith('win'): - for prefix, netsharepath in m.items(): - if unixpath.startswith(prefix): - return netsharepath + unixpath[len(prefix):] - return unixpath diff --git a/clint/packages/applib/sh.py b/clint/packages/applib/sh.py deleted file mode 100644 index 9e37187..0000000 --- a/clint/packages/applib/sh.py +++ /dev/null @@ -1,203 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -"""Various shell related wrappers -""" - -import os -from os import path -import shutil -import tempfile -from fnmatch import fnmatch -from contextlib import contextmanager - -from applib._proc import * - - -# -# Compression routines -# - -class PackError(Exception): - """Error during pack or unpack""" - - -def unpack_archive(filename, pth='.'): - """Unpack the archive under ``path`` - - Return (unpacked directory path, filetype) - """ - from applib import _compression - - assert path.isfile(filename), 'not a file: %s' % filename - assert path.isdir(pth) - - for filetype, implementor in _compression.implementors.items(): - if implementor.is_valid(filename): - with cd(pth): - return (implementor(filename).extract(), filetype) - else: - raise PackError('unknown compression format: ' + filename) - - -def pack_archive(filename, files, pwd, filetype="tgz"): - """Pack the given `files` from directory `pwd` - - `filetype` must be one of ["tgz", "tbz2", "zip"] - """ - from applib import _compression - - assert path.isdir(pwd) - assert filetype in _compression.implementors, 'invalid filetype: %s' % filetype - - if path.exists(filename): - rm(filename) - - with cd(pwd): - relnames = [path.relpath(file, pwd) for file in files] - _compression.implementors[filetype].pack(relnames, filename) - - return filename - - -# -# Path/file routines -# - -def mkdirs(pth): - """Make all directories along ``pth``""" - if not path.exists(pth): - os.makedirs(pth) - else: - assert path.isdir(pth) - - -def rm(p): - """Remove the specified path recursively. Similar to `rm -rf ARG` - - Note: if ARG is a symlink, only that symlink will be removed. - """ - if path.lexists(p): - if path.isdir(p) and not path.islink(p): - shutil.rmtree(p) - else: - os.remove(p) - - -def mv(src, dest, _mkdirs=False): - """Move `src` to `dest`""" - if _mkdirs: - mkdirs(path.dirname(dest)) - shutil.move(src, dest) - - -def cp(src, dest, _mkdirs=False, ignore=None, copyperms=True): - """Copy `src` to `dest` recursively""" - assert path.exists(src) - - if _mkdirs: - mkdirs(path.dirname(dest)) - - if path.isdir(src): - _copytree(src, dest, ignore=ignore, copyperms=copyperms) - else: - shutil.copyfile(src, dest) - - -def find(pth, pattern): - """Find files or directories matching ``pattern`` under ``pth``""" - matches = [] - if path.isfile(pth): - if fnmatch(path.basename(pth), pattern): - matches.append(pth) - else: - for root, dirs, files in os.walk(pth): - matches.extend([ - path.join(root, f) for f in files+dirs - if fnmatch(f, pattern) - ]) - return matches - - -@contextmanager -def cd(pth): - """With context to temporarily change directory""" - assert path.isdir(existing(pth)), pth - - cwd = os.getcwd() - os.chdir(pth) - try: - yield - finally: - os.chdir(cwd) - - -@contextmanager -def tmpdir(prefix='tmp-', suffix=''): - """__with__ context to work in a temporary working directory - - Temporary directory will be deleted unless an exception was raised. During - the context, CWD will be changed to the temporary directory. - """ - d = tempfile.mkdtemp(prefix=prefix, suffix=suffix) - with cd(d): - yield d - rm(d) - - -def existing(pth): - """Return `pth` after checking it exists""" - if not path.exists(pth): - raise IOError('"{0}" does not exist'.format(pth)) - return pth - - -def _copytree(src, dst, symlinks=False, ignore=None, copyperms=True): - """Forked shutil.copytree for `copyperms` support""" - names = os.listdir(src) - if ignore is not None: - ignored_names = ignore(src, names) - else: - ignored_names = set() - - os.makedirs(dst) - errors = [] - for name in names: - if name in ignored_names: - continue - srcname = os.path.join(src, name) - dstname = os.path.join(dst, name) - try: - if symlinks and os.path.islink(srcname): - linkto = os.readlink(srcname) - os.symlink(linkto, dstname) - elif os.path.isdir(srcname): - _copytree(srcname, dstname, symlinks, ignore, copyperms) - else: - shutil.copy(srcname, dstname) - # XXX What about devices, sockets etc.? - except (IOError, os.error) as why: - raise - errors.append((srcname, dstname, str(why))) - # catch the Error from the recursive copytree so that we can - # continue with other files - except shutil.Error: - _, err = sys.exec_info() - errors.extend(err.args[0]) - if copyperms: - try: - shutil.copystat(src, dst) - except WindowsError: - # can't copy file access times on Windows - pass - except OSError: - _, why = sys.exec_info() - errors.extend((src, dst, str(why))) - if errors: - raise shutil.Error(errors) - - -# WindowsError is not available on other platforms -try: - WindowsError -except NameError: - class WindowsError(OSError): pass diff --git a/clint/packages/applib/test/all.py b/clint/packages/applib/test/all.py deleted file mode 100644 index 02ca448..0000000 --- a/clint/packages/applib/test/all.py +++ /dev/null @@ -1,195 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -from __future__ import unicode_literals -import os -from os import path -import tempfile -import sys - -import pytest - -from applib import sh -from applib import textui -from applib.misc import safe_unicode -import six - - -fixtures = path.join(path.dirname(__file__), 'fixtures') - -def test_import(): - import applib - import applib.base - import applib.sh - import applib.textui - import applib.log - import applib.misc - - -def test_sh_runerror_unicode(): - # https://github.com/activestate/applib/issues/12 - with pytest.raises(sh.RunError): - try: - sh.run('echo ' + "\u1234" + " & nonexistant") - except sh.RunError as e: - print(safe_unicode(e)) - raise - - -def test_sh_runerror_limit(): - with pytest.raises(sh.RunError): - from random import choice - import string - LINE = ''.join([choice(string.ascii_letters) for x in range(80)]) - try: - sh.run(r'''python -c "print('\n'.join(['%s']*100)); raise SystemExit('an error');"''' % LINE) - except sh.RunError as e: - c = str(e).count(LINE) - # def _limit_str(s, maxchars=80*15): --- so ~ 15 lines (not 100 lines) - assert c < 20, "original message: %s" % e - assert '[...]' in str(e) - raise - - -def test_safe_unicode(): - from applib.misc import safe_unicode - import six - - abc_bytes = b'ab\nc' - abc_text = 'ab\nc' - - assert safe_unicode(abc_bytes) == abc_text - assert safe_unicode(abc_text) == abc_text - - foo_text = 'abc' # note: \x89 is ignored. - foo_bytes = b'\x89abc' - - assert safe_unicode(foo_text) == foo_text - assert safe_unicode(foo_bytes) == foo_text - - -def test_sh_rm_file(): - with sh.tmpdir(): - with open('afile', 'w') as f: f.close() - assert path.exists('afile') - sh.rm('afile') - assert not path.exists('afile') - - -def test_sh_rm_dir(): - with sh.tmpdir(): - sh.mkdirs('adir') - with sh.cd('adir'): - with open('afile', 'w') as f: f.close() - assert path.exists('afile') - assert path.exists('adir') - sh.rm('adir') - assert not path.exists('adir') - - -# Workaround a py.test bug: -# Error evaluating 'skipif' expression -# b'sys.platform == "win32"' -# Failed: expression is not a string -def skipif(expr): - if not six.PY3: - expr = expr.encode() - return pytest.mark.skipif(expr) - - -@skipif('sys.platform == "win32"') -def test_sh_rm_symlink(): - with sh.tmpdir(): - with open('afile', 'w') as f: f.close() - assert path.exists('afile') - os.symlink('afile', 'alink') - assert path.lexists('alink') - sh.rm('alink') - assert not path.lexists('alink') - - -@skipif('sys.platform == "win32"') -def test_sh_rm_broken_symlink(): - with sh.tmpdir(): - os.symlink('afile-notexist', 'alink') - assert not path.exists('alink') - assert path.lexists('alink') - sh.rm('alink') - assert not path.lexists('alink') - - -@skipif('sys.platform == "win32"') -def test_sh_rm_symlink_dir(): - with sh.tmpdir(): - sh.mkdirs('adir') - with sh.cd('adir'): - with open('afile', 'w') as f: f.close() - assert path.exists('afile') - assert path.exists('adir') - os.symlink('adir', 'alink') - assert path.lexists('alink') - sh.rm('alink') - assert path.exists('adir') - assert not path.lexists('alink') - - -def test_console_width_detection(): - width = textui.find_console_width() - assert width is None - - -def test_colprint(): - sample_table = [ - ['python-daemon', '4.5.7.7.3-1', 'blah foo meh yuck'], - ['foo', '6.1', ('some very loooooooooong string here .. I ' - 'suggest we make it even longer .. so longer ' - ' that normal terminal widths should entail ' - 'colprint to trim the string')]] - textui.colprint(sample_table) - - # try with empty inputs - textui.colprint(None) - textui.colprint([]) - - -def test_compression_ensure_read_access(): - """Test the ensure_read_access() hack in _compression.py""" - def test_pkg(pkgpath): - testdir = tempfile.mkdtemp('-test', 'pypm-') - extracted_dir, _ = sh.unpack_archive(pkgpath, testdir) - # check if we have read access on the directory - for child in os.listdir(extracted_dir): - p = path.join(extracted_dir, child) - if path.isdir(p): - os.listdir(p) - sh.rm(testdir) - - yield 'u-x on dirs', test_pkg, path.join(fixtures, 'generator_tools-0.3.5.tar.gz') - yield 'u-w on ._setup.py', test_pkg, path.join(fixtures, 'TracProjectMenu-1.0.tar.gz') - - -def test_compression_catch_invalid_mode(): - """Error from - tarfile.py should be handled""" - def extract(): - testdir = tempfile.mkdtemp('-test', 'pypm-') - extracted_dir, _ = sh.unpack_archive( - path.join(fixtures, 'libtele-0.2.tar.gz'), testdir) - if sys.platform == 'win32' and sys.version_info[:2] >= (2, 7): - with pytest.raises(sh.PackError): - extract() - else: - extract() - - -@pytest.mark.xfail -def test_compression_issue_11(): - """https://github.com/ActiveState/applib/issues/#issue/11 - - * Windows: IOError: [Errno 13] Permission denied: '.\\airi-0.0.1\\AIRi' - * OSX: IOError: [Errno 21] Is a directory: './airi-0.0.1/AIRi - * OSX Archive Utility (Finder): Unable to unarchive "airi-0.0.1.tar" ... - (Error 1 - Operation not permitted.) - """ - testdir = tempfile.mkdtemp('applib') - d, _ = sh.unpack_archive(path.join(fixtures, 'airi-0.0.1.tar.gz'), testdir) - diff --git a/clint/packages/applib/test/fixtures/TracProjectMenu-1.0.tar.gz b/clint/packages/applib/test/fixtures/TracProjectMenu-1.0.tar.gz deleted file mode 100644 index 741c65b..0000000 Binary files a/clint/packages/applib/test/fixtures/TracProjectMenu-1.0.tar.gz and /dev/null differ diff --git a/clint/packages/applib/test/fixtures/airi-0.0.1.tar.gz b/clint/packages/applib/test/fixtures/airi-0.0.1.tar.gz deleted file mode 100644 index 19f8c5a..0000000 Binary files a/clint/packages/applib/test/fixtures/airi-0.0.1.tar.gz and /dev/null differ diff --git a/clint/packages/applib/test/fixtures/generator_tools-0.3.5.tar.gz b/clint/packages/applib/test/fixtures/generator_tools-0.3.5.tar.gz deleted file mode 100644 index 592f4cd..0000000 Binary files a/clint/packages/applib/test/fixtures/generator_tools-0.3.5.tar.gz and /dev/null differ diff --git a/clint/packages/applib/test/fixtures/libtele-0.2.tar.gz b/clint/packages/applib/test/fixtures/libtele-0.2.tar.gz deleted file mode 100644 index e42a7ab..0000000 Binary files a/clint/packages/applib/test/fixtures/libtele-0.2.tar.gz and /dev/null differ diff --git a/clint/packages/applib/textui.py b/clint/packages/applib/textui.py deleted file mode 100644 index 632db70..0000000 --- a/clint/packages/applib/textui.py +++ /dev/null @@ -1,359 +0,0 @@ -# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. - -"""Textual UI: progress bar and colprint""" - -import sys -from datetime import datetime, timedelta -from contextlib import contextmanager -import logging -import math - -import six - -LOG = logging.getLogger(__name__) - -__all__ = ['colprint', 'find_console_width', 'ProgressBar', - 'clear_progress_bar', 'redraw_progress_bar', 'safe_output'] - -if not six.PY3: - input = raw_input - - -class ProgressBar(object): - """Show percent progress every 'n' seconds""" - - def __init__(self, total, delay=0.1, show_size=lambda x: x, note=None): - """ - total - total number of items that are going to be processed - delay - update delay in seconds - show_size - function to return the string to display instead of the number `size` - """ - assert total >= 0, total - assert delay >= 0, delay - assert show_size - - _set_current_progress_bar(self) - self.delay = timedelta(seconds=delay) - self.delay_duration = timedelta(seconds=1) - self.start = datetime.now() - self.elapsed = None # time elapsed from start - self.estimated_time_left = None - self.lastprint = None - self.lastprint_duration = None # for updating duration/ETA - self.lastprocessed = 0 - self.total = total - self.processed = 0 - self.show_size = show_size - self.note = note - self.duration_display = '' - self._length = 0 # current length of the progress display - - @classmethod - def iterate(cls, sequence, note=None, post=None): - """Iterate a sequence and update the progress bar accordingly - - The sequence must have a 'len' attribute if it is an arbitrary - generator. - - note -- Text to print before the progress bar - post -- Text to print at the end of progress (w/ fmt vars) - """ - p = cls(len(sequence), note=note) - clean_exit = False - try: - for item in sequence: - yield item - p.tick() - clean_exit = True - finally: - p.close() - if post and clean_exit: - sys.stdout.write(post.format(**p.__dict__) + '\n') - - def tick(self, items=1): - """The method that updates the display if necessary. - - After creating the ``PercentProgress`` object, this method must be - called for every item processed (or, pass items=ITEMS for every ITEMS - processed). - - This method must be called no more than ``self.total`` times (otherwise - you get assertion error .. implying a bug in your code) - - Return True if progress bar was redrawn. - """ - self.processed += items - assert self.processed <= self.total, \ - '{0} <= {1}'.format(self.processed, self.total) - - now = datetime.now() - if (self.lastprint == None or - (now - self.lastprint) > self.delay): - self.lastprint = now - self.redraw() - return True - else: - return False - - def clear(self): - """Erase the entire progress bar and put the cursor at first column""" - # Move cursor to the beginning of current progress line so that further - # messages will overwrite the progress bar. Also overwrite the previous - # progress bar with empty space. - sys.stdout.write('\r' + ' '*self._length + '\r') - sys.stdout.flush() - - def close(self): - """Close (hide) the progress bar - - Erase the progress bar and print the closing message in place of the - previous progress bar text. - """ - self.redraw() - self.clear() - _del_current_progress_bar(self) - - def redraw(self): - self.clear() - percent = _calculate_percent(self.processed, self.total) - now = datetime.now() - self.elapsed = now - self.start - if self.processed: - self.estimated_time_left = self.elapsed.seconds * (self.total-self.processed)/self.processed - - # Update time elapsed/left once a second only (delay_duration = 1s). - if self.elapsed.seconds and ( - self.lastprint_duration is None or \ - now - self.lastprint_duration > self.delay_duration): - - self.lastprint_duration = now - elapsed = _format_duration(self.elapsed.seconds) - if self.estimated_time_left: - self.duration_display = '({0}; {1} left)'.format( - elapsed, _format_duration(self.estimated_time_left)) - else: - self.duration_display = '({0})'.format(elapsed) - - bar_width = 20 - bar_filled = int(round(20.0/100 * percent)) - filled = ['='] * bar_filled - if filled: - filled[-1] = '>' - filled = ''.join(filled) - - progress_bar = ''.join([ - (self.note+': ') if self.note else '', - # header: - '[', - - # solid bar - filled, - - # empty space - ' ' * (bar_width-bar_filled), - - # footer - '] {0:-3}% {1}/{2} {3}'.format( - percent, - self.show_size(self.processed), - self.show_size(self.total), - self.duration_display - ) - ]) - - self._length = len(progress_bar) - sys.stdout.write('\r' + progress_bar + '\r') - sys.stdout.flush() - - -def clear_progress_bar(): - """Clear progress bar, if any""" - if _current_progress_bar: - _current_progress_bar.clear() - - -def redraw_progress_bar(): - """Redraw progress bar, if any""" - if _current_progress_bar: - _current_progress_bar.redraw() - - -@contextmanager -def safe_output(): - """Wrapper that makes it safe to print to stdout - - If a progress bar is currently being shown, this wrapper takes care of - clearing it before .. and then redrawing it after - """ - clear_progress_bar() - yield - redraw_progress_bar() - - -def askyesno(question, default): - """Ask (Y/N) type of question to the user""" - assert isinstance(default, bool), '"default" must be a boolean' - - s = '{0} ({1}/{2}) '.format( - question, - default and 'Y' or 'y', - default and 'n' or 'N') - - while True: - val = input(s).strip().lower() - - if val == '': - return default - elif val in ('y', 'yes', 'ok'): - return True - elif val in ('n', 'no'): - return False - - -# This function was written by Alex Martelli -# http://stackoverflow.com/questions/1396820/ -def colprint(table, totwidth=None): - """Print the table in terminal taking care of wrapping/alignment - - - `table`: A table of strings. Elements must not be `None` - - `totwidth`: If None, console width is used - """ - if not table: return - if totwidth is None: - totwidth = find_console_width() - if totwidth is not None: - totwidth -= 1 # for not printing an extra empty line on windows - numcols = max(len(row) for row in table) - # ensure all rows have >= numcols columns, maybe empty - padded = [row+numcols*['',] for row in table] - # compute col widths, including separating space (except for last one) - widths = [ 1 + max(len(x) for x in column) for column in zip(*padded)] - widths[-1] -= 1 - # drop or truncate columns from the right in order to fit - if totwidth is not None: - while sum(widths) > totwidth: - mustlose = sum(widths) - totwidth - if widths[-1] <= mustlose: - del widths[-1] - else: - widths[-1] -= mustlose - break - # and finally, the output phase! - for row in padded: - s = ''.join(['%*s' % (-w, i[:w]) - for w, i in zip(widths, row)]) - LOG.info(s) - - -def find_console_width(): - """Return the console width - - Return ``None`` if stdout is not a terminal (eg: a pipe) - """ - if sys.platform.startswith('win'): - return _find_windows_console_width() - else: - return _find_unix_console_width() - - -@contextmanager -def longrun(log, finalfn=lambda: None): - """Decorator for performing a long operation with consideration for the - command line. - - 1. Catch keyboard interrupts and exit gracefully - - 2. Print total time elapsed always at the end (successful or not) - - 3. Call ``finalfn`` always at the end (successful or not) - """ - start_time = datetime.now() - - try: - yield - except KeyboardInterrupt: - log.info('*** interrupted by user - Ctrl+c ***') - raise SystemExit(3) - finally: - finalfn() - end_time = datetime.now() - - log.info('') - log.info('-----') - log.info('Total time elapsed: %s', end_time-start_time) - - -def _find_unix_console_width(): - import termios, fcntl, struct, sys - - # fcntl.ioctl will fail if stdout is not a tty - if not sys.stdout.isatty(): - return None - - s = struct.pack("HHHH", 0, 0, 0, 0) - fd_stdout = sys.stdout.fileno() - size = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s) - height, width = struct.unpack("HHHH", size)[:2] - return width - - -def _find_windows_console_width(): - # http://code.activestate.com/recipes/440694/ - from ctypes import windll, create_string_buffer - STDIN, STDOUT, STDERR = -10, -11, -12 - - h = windll.kernel32.GetStdHandle(STDERR) - csbi = create_string_buffer(22) - res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) - - if res: - import struct - (bufx, bufy, curx, cury, wattr, - left, top, right, bottom, - maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) - sizex = right - left + 1 - sizey = bottom - top + 1 - return sizex - - - -def _byteshr(bytes): - """Human-readable version of bytes count""" - for x in ['bytes','KB','MB','GB','TB']: - if bytes < 1024.0: - return "%3.1f%s" % (bytes, x) - bytes /= 1024.0 - raise ValueError('cannot find human-readable version') - - -def _calculate_percent(numerator, denominator): - assert numerator <= denominator, '%d <= %d' % (numerator, denominator) - if denominator == 0: - if numerator == 0: - return 100 - else: - raise ValueError('denominator cannot be zero') - - return int(round( numerator / float(denominator) * 100 )) - - -def _format_duration(seconds): - s = [] - if seconds > 60: - s.append('{0}m'.format(int(seconds/60))) - s.append('{0}s'.format(int(seconds % 60))) - return ''.join(s) - - -# Handle to the current progress bar object. There cannot be more than one -# progress bar for obvious reasons. -_current_progress_bar = None -def _set_current_progress_bar(pbar): - global _current_progress_bar - assert _current_progress_bar is None, 'there is already a pbar' - _current_progress_bar = pbar -def _del_current_progress_bar(pbar): - global _current_progress_bar - assert _current_progress_bar is pbar, 'pbar is something else' - _current_progress_bar = None -