diff --git a/clint/packages/applib/__init__.py b/clint/packages/applib/__init__.py new file mode 100644 index 0000000..1c0f215 --- /dev/null +++ b/clint/packages/applib/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +__version_info__ = ('1', '2') +__version__ = '.'.join(__version_info__) # + '.dev' diff --git a/clint/packages/applib/_cmdln.py b/clint/packages/applib/_cmdln.py new file mode 100644 index 0000000..be17001 --- /dev/null +++ b/clint/packages/applib/_cmdln.py @@ -0,0 +1,1843 @@ +#!/usr/bin/env python +# Copyright (c) 2002-2009 ActiveState Software Inc. +# License: MIT (see LICENSE.txt for license details) +# Author: Trent Mick + +"""An improvement on Python's standard cmd.py module. + +As with cmd.py, this module provides "a simple framework for writing +line-oriented command intepreters." This module provides a 'RawCmdln' +class that fixes some design flaws in cmd.Cmd, making it more scalable +and nicer to use for good 'cvs'- or 'svn'-style command line interfaces +or simple shells. And it provides a 'Cmdln' class that add +optparse-based option processing. Basically you use it like this: + + import cmdln + + class MySVN(cmdln.Cmdln): + name = "svn" + + @cmdln.alias('stat', 'st') + @cmdln.option('-v', '--verbose', action='store_true' + help='print verbose information') + def do_status(self, subcmd, opts, *paths): + print "handle 'svn status' command" + + #... + + if __name__ == "__main__": + shell = MySVN() + retval = shell.main() + sys.exit(retval) + +See the README.txt or for more +details. +""" + +__version_info__ = (1, 2, 0) +__version__ = '.'.join(map(str, __version_info__)) + +import os +from os import path +import sys +import re +import types +import cmd +import optparse +from pprint import pprint +try: + import ConfigParser +except ImportError: + import configparser as ConfigParser # python3 +import datetime + + +if sys.hexversion > 0x03000000: + ClassType = type +else: + ClassType = types.ClassType + + +#---- globals + +LOOP_ALWAYS, LOOP_NEVER, LOOP_IF_EMPTY = range(3) + +# An unspecified optional argument when None is a meaningful value. +_NOT_SPECIFIED = ("Not", "Specified") + +# Pattern to match a TypeError message from a call that +# failed because of incorrect number of arguments (see +# Python/getargs.c). +_INCORRECT_NUM_ARGS_RE = re.compile( + r"(takes [\w ]+ )(\d+)[\w ]*( arguments? \()(\d+)( given\))") + + + +#---- exceptions + +class CmdlnError(Exception): + """A cmdln.py usage error.""" + def __init__(self, msg): + self.msg = msg + def __str__(self): + return self.msg + +class CmdlnUserError(Exception): + """An error by a user of a cmdln-based tool/shell.""" + pass + + + +#---- public methods and classes + +def alias(*aliases): + """Decorator to add aliases for Cmdln.do_* command handlers. + + Example: + class MyShell(cmdln.Cmdln): + @cmdln.alias("!", "sh") + def do_shell(self, argv): + #...implement 'shell' command + """ + def decorate(f): + if not hasattr(f, "aliases"): + f.aliases = [] + f.aliases += aliases + return f + return decorate + + +class RawCmdln(cmd.Cmd): + """An improved (on cmd.Cmd) framework for building multi-subcommand + scripts (think "svn" & "cvs") and simple shells (think "pdb" and + "gdb"). + + A simple example: + + import cmdln + + class MySVN(cmdln.RawCmdln): + name = "svn" + + @cmdln.aliases('stat', 'st') + def do_status(self, argv): + print "handle 'svn status' command" + + if __name__ == "__main__": + shell = MySVN() + retval = shell.main() + sys.exit(retval) + """ + name = None # if unset, defaults basename(sys.argv[0]) + prompt = None # if unset, defaults to self.name+"> " + version = None # if set, default top-level options include --version + + # Default messages for some 'help' command error cases. + # They are interpolated with one arg: the command. + nohelp = "no help on '%s'" + unknowncmd = "unknown command: '%s'" + + helpindent = '' # string with which to indent help output + + def __init__(self, completekey='tab', + stdin=None, stdout=None, stderr=None): + """Cmdln(completekey='tab', stdin=None, stdout=None, stderr=None) + + The optional argument 'completekey' is the readline name of a + completion key; it defaults to the Tab key. If completekey is + not None and the readline module is available, command completion + is done automatically. + + The optional arguments 'stdin', 'stdout' and 'stderr' specify + alternate input, output and error output file objects; if not + specified, sys.* are used. + + If 'stdout' but not 'stderr' is specified, stdout is used for + error output. This is to provide least surprise for users used + to only the 'stdin' and 'stdout' options with cmd.Cmd. + """ + if self.name is None: + self.name = os.path.basename(sys.argv[0]) + if self.prompt is None: + self.prompt = self.name+"> " + self._name_str = self._str(self.name) + self._prompt_str = self._str(self.prompt) + if stdin is not None: + self.stdin = stdin + else: + self.stdin = sys.stdin + if stdout is not None: + self.stdout = stdout + else: + self.stdout = sys.stdout + if stderr is not None: + self.stderr = stderr + elif stdout is not None: + self.stderr = stdout + else: + self.stderr = sys.stderr + self.cmdqueue = [] + self.completekey = completekey + self.cmdlooping = False + + def get_option_defaults(self, cmdname): + """Return default values for command options + + For all options registered for the given command (`cmdname`), return + the default values as a dictionary (option name as keys, default value + as values) + + If `cmdname` is None, return default for top-level options + """ + return {} + + def get_optparser(self): + """Hook for subclasses to set the option parser for the + top-level command/shell. + + NOTE: you may not override this method anymore; cmdln.option decorator + can now be used on the class itself to create toplevel options. + + This option parser is retrieved and used by `.main()' to handle + top-level options. + + The default implements a single '-h|--help' option. Sub-classes + can return None to have no options at the top-level. Typically + an instance of CmdlnOptionParser should be returned. + """ + return self._create_toplevel_optparser() + + def _create_toplevel_optparser(self): + version = (self.version is not None + and "%s %s" % (self._name_str, self.version) + or None) + parser = CmdlnOptionParser(self, version=version) + + # if ``useconfig`` is used, add the -c option to specify extra config + # file + # if hasattr(self, 'defaultsconfig'): + # parser.add_option('-c', '--configfile', + # dest='configfile', + # help="specify the config file location", + # default=None) + + # add toplevel options + if hasattr(self, 'toplevel_optparser_options'): + for args, kwargs in self.toplevel_optparser_options: + parser.add_option(*args, **kwargs) + + return parser + + def postoptparse(self): + """Hook method executed just after `.main()' parses top-level + options. + + When called `self.options' holds the results of the option parse. + """ + + def main(self, argv=None, loop=LOOP_NEVER): + """A possible mainline handler for a script, like so: + + import cmdln + class MyCmd(cmdln.Cmdln): + name = "mycmd" + ... + + if __name__ == "__main__": + MyCmd().main() + + By default this will use sys.argv to issue a single command to + 'MyCmd', then exit. The 'loop' argument can be use to control + interactive shell behaviour. + + Arguments: + "argv" (optional, default sys.argv) is the command to run. + It must be a sequence, where the first element is the + command name and subsequent elements the args for that + command. + "loop" (optional, default LOOP_NEVER) is a constant + indicating if a command loop should be started (i.e. an + interactive shell). Valid values (constants on this module): + LOOP_ALWAYS start loop and run "argv", if any + LOOP_NEVER run "argv" (or .emptyline()) and exit + LOOP_IF_EMPTY run "argv", if given, and exit; + otherwise, start loop + """ + if argv is None: + argv = sys.argv + else: + argv = argv[:] # don't modify caller's list + + try: + self.optparser = self.get_optparser() + if self.optparser: # i.e. optparser=None means don't process for opts + try: + self.options, args = self.optparser.parse_args(argv[1:]) + except StopOptionProcessing: + return 0 + else: + # Set default options *after* parsing command line options + # This is an requirement for CmdlnWithConfigParser which + # relies on the -c option which is only parsed in the above + # `try' block + self.optparser.set_defaults(**self.get_option_defaults(None)) + self.options, args = self.optparser.parse_args(argv[1:]) + else: + self.options, args = None, argv[1:] + self.postoptparse() + except CmdlnUserError: + _, ex, _ = sys.exc_info() + msg = "%s: %s\nTry '%s help' for info.\n"\ + % (self.name, ex, self.name) + self.stderr.write(self._str(msg)) + self.stderr.flush() + return 1 + + if loop == LOOP_ALWAYS: + if args: + self.cmdqueue.append(args) + return self.cmdloop() + elif loop == LOOP_NEVER: + if args: + return self.cmd(args) + else: + return self.emptyline() + elif loop == LOOP_IF_EMPTY: + if args: + return self.cmd(args) + else: + return self.cmdloop() + + def cmd(self, argv): + """Run one command and exit. + + "argv" is the arglist for the command to run. argv[0] is the + command to run. If argv is an empty list then the + 'emptyline' handler is run. + + Returns the return value from the command handler. + """ + assert isinstance(argv, (list, tuple)), \ + "'argv' is not a sequence: %r" % argv + retval = None + try: + argv = self.precmd(argv) + retval = self.onecmd(argv) + self.postcmd(argv) + except: + if not self.cmdexc(argv): + raise + retval = 1 + return retval + + def _str(self, s): + """Safely convert the given str/unicode to a string for printing.""" + try: + return str(s) + except UnicodeError: + #XXX What is the proper encoding to use here? 'utf-8' seems + # to work better than "getdefaultencoding" (usually + # 'ascii'), on OS X at least. + #return s.encode(sys.getdefaultencoding(), "replace") + return s.encode("utf-8", "replace") + + def cmdloop(self, intro=None): + """Repeatedly issue a prompt, accept input, parse into an argv, and + dispatch (via .precmd(), .onecmd() and .postcmd()), passing them + the argv. In other words, start a shell. + + "intro" (optional) is a introductory message to print when + starting the command loop. This overrides the class + "intro" attribute, if any. + """ + self.cmdlooping = True + self.preloop() + if self.use_rawinput and self.completekey: + try: + import readline + self.old_completer = readline.get_completer() + readline.set_completer(self.complete) + readline.parse_and_bind(self.completekey+": complete") + except ImportError: + pass + try: + if intro is None: + intro = self.intro + if intro: + intro_str = self._str(intro) + self.stdout.write(intro_str+'\n') + self.stop = False + retval = None + while not self.stop: + if self.cmdqueue: + argv = self.cmdqueue.pop(0) + assert isinstance(argv, (list, tuple)), \ + "item on 'cmdqueue' is not a sequence: %r" % argv + else: + if self.use_rawinput: + try: + line = raw_input(self._prompt_str) + except EOFError: + line = 'EOF' + except KeyboardInterrupt: + line = 'KeyboardInterrupt' + else: + self.stdout.write(self._prompt_str) + self.stdout.flush() + line = self.stdin.readline() + if not len(line): + line = 'EOF' + else: + line = line[:-1] # chop '\n' + argv = line2argv(line) + try: + argv = self.precmd(argv) + retval = self.onecmd(argv) + self.postcmd(argv) + except: + if not self.cmdexc(argv): + raise + retval = 1 + self.lastretval = retval + self.postloop() + finally: + if self.use_rawinput and self.completekey: + try: + import readline + readline.set_completer(self.old_completer) + except ImportError: + pass + self.cmdlooping = False + return retval + + def precmd(self, argv): + """Hook method executed just before the command argv is + interpreted, but after the input prompt is generated and issued. + + "argv" is the cmd to run. + + Returns an argv to run (i.e. this method can modify the command + to run). + """ + return argv + + def postcmd(self, argv): + """Hook method executed just after a command dispatch is finished. + + "argv" is the command that was run. + """ + pass + + def cmdexc(self, argv): + """Called if an exception is raised in any of precmd(), onecmd(), + or postcmd(). If True is returned, the exception is deemed to have + been dealt with. Otherwise, the exception is re-raised. + + The default implementation handles CmdlnUserError's, which + typically correspond to user error in calling commands (as + opposed to programmer error in the design of the script using + cmdln.py). + """ + type, exc, traceback = sys.exc_info() + if isinstance(exc, CmdlnUserError): + msg = "%s %s: %s\nTry '%s help %s' for info.\n"\ + % (self.name, argv[0], exc, self.name, argv[0]) + self.stderr.write(self._str(msg)) + self.stderr.flush() + return True + + def onecmd(self, argv): + if not argv: + return self.emptyline() + self.lastcmd = argv + cmdname = self._get_canonical_cmd_name(argv[0]) + if cmdname: + handler = self._get_cmd_handler(cmdname) + if handler: + try: + return self._dispatch_cmd(handler, argv) + except KeyboardInterrupt: + return self.onecmd(["KeyboardInterrupt"]) + return self.default(argv) + + def _dispatch_cmd(self, handler, argv): + return handler(argv) + + def default(self, argv): + """Hook called to handle a command for which there is no handler. + + "argv" is the command and arguments to run. + + The default implementation writes an error message to stderr + and returns an error exit status. + + Returns a numeric command exit status. + """ + errmsg = self._str(self.unknowncmd % (argv[0],)) + if self.cmdlooping: + self.stderr.write(errmsg+"\n") + else: + self.stderr.write("%s: %s\nTry '%s help' for info.\n" + % (self._name_str, errmsg, self._name_str)) + self.stderr.flush() + return 1 + + def parseline(self, line): + # This is used by Cmd.complete (readline completer function) to + # massage the current line buffer before completion processing. + # We override to drop special '!' handling. + line = line.strip() + if not line: + return None, None, line + elif line[0] == '?': + line = 'help ' + line[1:] + i, n = 0, len(line) + while i < n and line[i] in self.identchars: i = i+1 + cmd, arg = line[:i], line[i:].strip() + return cmd, arg, line + + def helpdefault(self, cmd, known): + """Hook called to handle help on a command for which there is no + help handler. + + "cmd" is the command name on which help was requested. + "known" is a boolean indicating if this command is known + (i.e. if there is a handler for it). + + Returns a return code. + """ + if known: + msg = self._str(self.nohelp % (cmd,)) + if self.cmdlooping: + self.stderr.write(msg + '\n') + else: + self.stderr.write("%s: %s\n" % (self.name, msg)) + else: + msg = self.unknowncmd % (cmd,) + if self.cmdlooping: + self.stderr.write(msg + '\n') + else: + self.stderr.write("%s: %s\n" + "Try '%s help' for info.\n" + % (self.name, msg, self.name)) + self.stderr.flush() + return 1 + + def do_help(self, argv): + """${cmd_name}: give detailed help on a specific sub-command + + Usage: + ${name} help [COMMAND] + """ + if len(argv) > 1: # asking for help on a particular command + doc = None + cmdname = self._get_canonical_cmd_name(argv[1]) or argv[1] + if not cmdname: + return self.helpdefault(argv[1], False) + else: + helpfunc = getattr(self, "help_"+cmdname, None) + if helpfunc: + doc = helpfunc() + else: + handler = self._get_cmd_handler(cmdname) + if handler: + doc = handler.__doc__ + if doc is None: + return self.helpdefault(argv[1], handler != None) + else: # bare "help" command + doc = self.__class__.__doc__ # try class docstring + if doc is None: + # Try to provide some reasonable useful default help. + if self.cmdlooping: prefix = "" + else: prefix = self.name+' ' + doc = """Usage: + %sCOMMAND [ARGS...] + %shelp [COMMAND] + + ${option_list} + ${command_list} + ${help_list} + """ % (prefix, prefix) + cmdname = None + + if doc: # *do* have help content, massage and print that + doc = self._help_reindent(doc) + doc = self._help_preprocess(doc, cmdname) + doc = doc.rstrip() + '\n' # trim down trailing space + self.stdout.write(self._str(doc)) + self.stdout.flush() + do_help.aliases = ["?"] + + def _help_reindent(self, help, indent=None): + """Hook to re-indent help strings before writing to stdout. + + "help" is the help content to re-indent + "indent" is a string with which to indent each line of the + help content after normalizing. If unspecified or None + then the default is use: the 'self.helpindent' class + attribute. By default this is the empty string, i.e. + no indentation. + + By default, all common leading whitespace is removed and then + the lot is indented by 'self.helpindent'. When calculating the + common leading whitespace the first line is ignored -- hence + help content for Conan can be written as follows and have the + expected indentation: + + def do_crush(self, ...): + '''${cmd_name}: crush your enemies, see them driven before you... + + c.f. Conan the Barbarian''' + """ + if indent is None: + indent = self.helpindent + lines = help.splitlines(0) + _dedentlines(lines, skip_first_line=True) + lines = [(indent+line).rstrip() for line in lines] + return '\n'.join(lines) + + def _help_preprocess(self, help, cmdname): + """Hook to preprocess a help string before writing to stdout. + + "help" is the help string to process. + "cmdname" is the canonical sub-command name for which help + is being given, or None if the help is not specific to a + command. + + By default the following template variables are interpolated in + help content. (Note: these are similar to Python 2.4's + string.Template interpolation but not quite.) + + ${name} + The tool's/shell's name, i.e. 'self.name'. + ${option_list} + A formatted table of options for this shell/tool. + ${command_list} + A formatted table of available sub-commands. + ${help_list} + A formatted table of additional help topics (i.e. 'help_*' + methods with no matching 'do_*' method). + ${cmd_name} + The name (and aliases) for this sub-command formatted as: + "NAME (ALIAS1, ALIAS2, ...)". + ${cmd_usage} + A formatted usage block inferred from the command function + signature. + ${cmd_option_list} + A formatted table of options for this sub-command. (This is + only available for commands using the optparse integration, + i.e. using @cmdln.option decorators or manually setting the + 'optparser' attribute on the 'do_*' method.) + + Returns the processed help. + """ + preprocessors = { + "${name}": self._help_preprocess_name, + "${option_list}": self._help_preprocess_option_list, + "${command_list}": self._help_preprocess_command_list, + "${help_list}": self._help_preprocess_help_list, + "${cmd_name}": self._help_preprocess_cmd_name, + "${cmd_usage}": self._help_preprocess_cmd_usage, + "${cmd_option_list}": self._help_preprocess_cmd_option_list, + } + + for marker, preprocessor in preprocessors.items(): + if marker in help: + help = preprocessor(help, cmdname) + return help + + def _help_preprocess_name(self, help, cmdname=None): + return help.replace("${name}", self.name) + + def _help_preprocess_option_list(self, help, cmdname=None): + marker = "${option_list}" + indent, indent_width = _get_indent(marker, help) + suffix = _get_trailing_whitespace(marker, help) + + if self.optparser: + # Setup formatting options and format. + # - Indentation of 4 is better than optparse default of 2. + # C.f. Damian Conway's discussion of this in Perl Best + # Practices. + self.optparser.formatter.indent_increment = 4 + self.optparser.formatter.current_indent = indent_width + block = self.optparser.format_option_help() + '\n' + else: + block = "" + + help = help.replace(indent+marker+suffix, block, 1) + return help + + def _get_cmds_data(self): + # Find any aliases for commands. + token2canonical = self._get_canonical_map() + aliases = {} + for token, cmdname in token2canonical.items(): + if token == cmdname: continue + aliases.setdefault(cmdname, []).append(token) + + # Get the list of (non-hidden) commands and their + # documentation, if any. + cmdnames = {} # use a dict to strip duplicates + for attr in self.get_names(): + if attr.startswith("do_"): + cmdnames[attr[3:]] = True + cmdnames = list(sorted(cmdnames.keys())) + linedata = [] + for cmdname in cmdnames: + if aliases.get(cmdname): + a = aliases[cmdname] + a.sort() + cmdstr = "%s (%s)" % (cmdname, ", ".join(a)) + else: + cmdstr = cmdname + doc = None + try: + helpfunc = getattr(self, 'help_'+cmdname) + except AttributeError: + handler = self._get_cmd_handler(cmdname) + if handler: + doc = handler.__doc__ + else: + doc = helpfunc() + + # Strip "${cmd_name}: " from the start of a command's doc. Best + # practice dictates that command help strings begin with this, but + # it isn't at all wanted for the command list. + to_strip = "${cmd_name}:" + if doc and doc.startswith(to_strip): + #log.debug("stripping %r from start of %s's help string", + # to_strip, cmdname) + doc = doc[len(to_strip):].lstrip() + linedata.append( (cmdstr, doc) ) + + return linedata + + def _help_preprocess_command_list(self, help, cmdname=None): + marker = "${command_list}" + indent, indent_width = _get_indent(marker, help) + suffix = _get_trailing_whitespace(marker, help) + + linedata = self._get_cmds_data() + if linedata: + subindent = indent + ' '*4 + lines = _format_linedata(linedata, subindent, indent_width+4) + block = indent + "Commands:\n" \ + + '\n'.join(lines) + "\n\n" + help = help.replace(indent+marker+suffix, block, 1) + return help + + def _gen_names_and_attrs(self): + # Inheritance says we have to look in class and + # base classes; order is not important. + names = [] + classes = [self.__class__] + while classes: + aclass = classes.pop(0) + if aclass.__bases__: + classes = classes + list(aclass.__bases__) + for name in dir(aclass): + yield (name, getattr(aclass, name)) + + def _get_help_names(self): + """Return a mapping of help topic name to `.help_*()` method.""" + # Determine the additional help topics, if any. + help_names = {} + token2cmdname = self._get_canonical_map() + for attrname, attr in self._gen_names_and_attrs(): + if not attrname.startswith("help_"): continue + help_name = attrname[5:] + if help_name not in token2cmdname: + help_names[help_name] = attr + return help_names + + def _help_preprocess_help_list(self, help, cmdname=None): + marker = "${help_list}" + indent, indent_width = _get_indent(marker, help) + suffix = _get_trailing_whitespace(marker, help) + + help_names = self._get_help_names() + if help_names: + linedata = [(n, a.__doc__ or "") for n, a in help_names.items()] + linedata.sort() + + subindent = indent + ' '*4 + lines = _format_linedata(linedata, subindent, indent_width+4) + block = (indent + + "Additional help topics (run `%s help TOPIC'):\n" % self.name + + '\n'.join(lines) + + "\n\n") + else: + block = '' + help = help.replace(indent+marker+suffix, block, 1) + return help + + def _help_preprocess_cmd_name(self, help, cmdname=None): + marker = "${cmd_name}" + handler = self._get_cmd_handler(cmdname) + if not handler: + raise CmdlnError("cannot preprocess '%s' into help string: " + "could not find command handler for %r" + % (marker, cmdname)) + s = cmdname + if hasattr(handler, "aliases"): + s += " (%s)" % (", ".join(handler.aliases)) + help = help.replace(marker, s) + return help + + #TODO: this only makes sense as part of the Cmdln class. + # Add hooks to add help preprocessing template vars and put + # this one on that class. + def _help_preprocess_cmd_usage(self, help, cmdname=None): + marker = "${cmd_usage}" + handler = self._get_cmd_handler(cmdname) + if not handler: + raise CmdlnError("cannot preprocess '%s' into help string: " + "could not find command handler for %r" + % (marker, cmdname)) + indent, indent_width = _get_indent(marker, help) + suffix = _get_trailing_whitespace(marker, help) + + # Extract the introspection bits we need. + func = handler.__func__ + if func.__defaults__: + func_defaults = list(func.__defaults__) + else: + func_defaults = [] + co_argcount = func.__code__.co_argcount + co_varnames = func.__code__.co_varnames + co_flags = func.__code__.co_flags + CO_FLAGS_ARGS = 4 + CO_FLAGS_KWARGS = 8 + + # Adjust argcount for possible *args and **kwargs arguments. + argcount = co_argcount + if co_flags & CO_FLAGS_ARGS: argcount += 1 + if co_flags & CO_FLAGS_KWARGS: argcount += 1 + + # Determine the usage string. + usage = "%s %s" % (self.name, cmdname) + if argcount <= 2: # handler ::= do_FOO(self, argv) + usage += " [ARGS...]" + elif argcount >= 3: # handler ::= do_FOO(self, subcmd, opts, ...) + argnames = list(co_varnames[3:argcount]) + tail = "" + if co_flags & CO_FLAGS_KWARGS: + name = argnames.pop(-1) + import warnings + # There is no generally accepted mechanism for passing + # keyword arguments from the command line. Could + # *perhaps* consider: arg=value arg2=value2 ... + warnings.warn("argument '**%s' on '%s.%s' command " + "handler will never get values" + % (name, self.__class__.__name__, + func.__name__)) + if co_flags & CO_FLAGS_ARGS: + name = argnames.pop(-1) + tail = "[%s...]" % name.upper() + while func_defaults: + func_defaults.pop(-1) + name = argnames.pop(-1) + tail = "[%s%s%s]" % (name.upper(), (tail and ' ' or ''), tail) + while argnames: + name = argnames.pop(-1) + tail = "%s %s" % (name.upper(), tail) + usage += ' ' + tail + + block_lines = [ + self.helpindent + "Usage:", + self.helpindent + ' '*4 + usage + ] + block = '\n'.join(block_lines) + '\n\n' + + help = help.replace(indent+marker+suffix, block, 1) + return help + + #TODO: this only makes sense as part of the Cmdln class. + # Add hooks to add help preprocessing template vars and put + # this one on that class. + def _help_preprocess_cmd_option_list(self, help, cmdname=None): + marker = "${cmd_option_list}" + handler = self._get_cmd_handler(cmdname) + if not handler: + raise CmdlnError("cannot preprocess '%s' into help string: " + "could not find command handler for %r" + % (marker, cmdname)) + indent, indent_width = _get_indent(marker, help) + suffix = _get_trailing_whitespace(marker, help) + if hasattr(handler, "optparser"): + # Setup formatting options and format. + # - Indentation of 4 is better than optparse default of 2. + # C.f. Damian Conway's discussion of this in Perl Best + # Practices. + handler.optparser.formatter.indent_increment = 4 + handler.optparser.formatter.current_indent = indent_width + block = handler.optparser.format_option_help() + '\n' + else: + block = "" + + help = help.replace(indent+marker+suffix, block, 1) + return help + + def _get_canonical_cmd_name(self, token): + map = self._get_canonical_map() + return map.get(token, None) + + def _get_canonical_map(self): + """Return a mapping of available command names and aliases to + their canonical command name. + """ + cacheattr = "_token2canonical" + if not hasattr(self, cacheattr): + # Get the list of commands and their aliases, if any. + token2canonical = {} + cmd2funcname = {} # use a dict to strip duplicates + for attr in self.get_names(): + if attr.startswith("do_"): cmdname = attr[3:] + elif attr.startswith("_do_"): cmdname = attr[4:] + else: + continue + cmd2funcname[cmdname] = attr + token2canonical[cmdname] = cmdname + for cmdname, funcname in cmd2funcname.items(): # add aliases + func = getattr(self, funcname) + aliases = getattr(func, "aliases", []) + for alias in aliases: + if alias in cmd2funcname: + import warnings + warnings.warn("'%s' alias for '%s' command conflicts " + "with '%s' handler" + % (alias, cmdname, cmd2funcname[alias])) + continue + token2canonical[alias] = cmdname + setattr(self, cacheattr, token2canonical) + return getattr(self, cacheattr) + + def _get_cmd_handler(self, cmdname): + handler = None + try: + handler = getattr(self, 'do_' + cmdname) + except AttributeError: + try: + # Private command handlers begin with "_do_". + handler = getattr(self, '_do_' + cmdname) + except AttributeError: + pass + return handler + + def _do_EOF(self, argv): + # Default EOF handler + # TODO: A mechanism so "EOF" and "KeyboardInterrupt" work as handlers + # but are *not* real available commands. + self.stdout.write('\n') + self.stdout.flush() + self.stop = True + + def _do_KeyboardInterrupt(self, argv): + # Default keyboard interrupt (i.e. ) handler. + # TODO: A mechanism so "EOF" and "KeyboardInterrupt" work as handlers + # but are *not* real available commands. + self.stdout.write('\n') + self.stdout.flush() + + def emptyline(self): + # Different from cmd.Cmd: don't repeat the last command for an + # emptyline. + if self.cmdlooping: + pass + else: + return self.do_help(["help"]) + + +#---- optparse.py extension to fix (IMO) some deficiencies +# +# See the class _OptionParserEx docstring for details. +# + +class StopOptionProcessing(Exception): + """Indicate that option *and argument* processing should stop + cleanly. This is not an error condition. It is similar in spirit to + StopIteration. This is raised by _OptionParserEx's default "help" + and "version" option actions and can be raised by custom option + callbacks too. + + Hence the typical CmdlnOptionParser (a subclass of _OptionParserEx) + usage is: + + parser = CmdlnOptionParser(mycmd) + parser.add_option("-f", "--force", dest="force") + ... + try: + opts, args = parser.parse_args() + except StopOptionProcessing: + # normal termination, "--help" was probably given + sys.exit(0) + """ + +class _OptionParserEx(optparse.OptionParser): + """An optparse.OptionParser that uses exceptions instead of sys.exit. + + This class is an extension of optparse.OptionParser that differs + as follows: + - Correct (IMO) the default OptionParser error handling to never + sys.exit(). Instead OptParseError exceptions are passed through. + - Add the StopOptionProcessing exception (a la StopIteration) to + indicate normal termination of option processing. + See StopOptionProcessing's docstring for details. + + I'd also like to see the following in the core optparse.py, perhaps + as a RawOptionParser which would serve as a base class for the more + generally used OptionParser (that works as current): + - Remove the implicit addition of the -h|--help and --version + options. They can get in the way (e.g. if want '-?' and '-V' for + these as well) and it is not hard to do: + optparser.add_option("-h", "--help", action="help") + optparser.add_option("--version", action="version") + These are good practices, just not valid defaults if they can + get in the way. + """ + def error(self, msg): + raise optparse.OptParseError(msg) + + def exit(self, status=0, msg=None): + if status == 0: + raise StopOptionProcessing(msg) + else: + #TODO: don't lose status info here + raise optparse.OptParseError(msg) + + + +#---- optparse.py-based option processing support + +class CmdlnOptionParser(_OptionParserEx): + """An optparse.OptionParser class more appropriate for top-level + Cmdln options. For parsing of sub-command options, see + SubCmdOptionParser. + + Changes: + - disable_interspersed_args() by default, because a Cmdln instance + has sub-commands which may themselves have options. + - Redirect print_help() to the Cmdln.do_help() which is better + equiped to handle the "help" action. + - error() will raise a CmdlnUserError: OptionParse.error() is meant + to be called for user errors. Raising a well-known error here can + make error handling clearer. + - Also see the changes in _OptionParserEx. + """ + def __init__(self, cmdln, **kwargs): + self.cmdln = cmdln + kwargs["prog"] = self.cmdln.name + _OptionParserEx.__init__(self, **kwargs) + self.disable_interspersed_args() + + def print_help(self, file=None): + self.cmdln.onecmd(["help"]) + + def error(self, msg): + raise CmdlnUserError(msg) + + +class SubCmdOptionParser(_OptionParserEx): + def set_cmdln_info(self, cmdln, subcmd): + """Called by Cmdln to pass relevant info about itself needed + for print_help(). + """ + self.cmdln = cmdln + self.subcmd = subcmd + + def print_help(self, file=None): + self.cmdln.onecmd(["help", self.subcmd]) + + def error(self, msg): + raise CmdlnUserError(msg) + + +def option(*args, **kwargs): + """Decorator to add an option to the optparser argument of a Cmdln + subcommand + + To add a toplevel option, apply the decorator on the class itself. (see + p4.py for an example) + + Example: + @cmdln.option("-E", dest="environment_path") + class MyShell(cmdln.Cmdln): + @cmdln.option("-f", "--force", help="force removal") + def do_remove(self, subcmd, opts, *args): + #... + """ + def decorate_sub_command(method): + """create and add sub-command options""" + if not hasattr(method, "optparser"): + method.optparser = SubCmdOptionParser() + method.optparser.add_option(*args, **kwargs) + return method + def decorate_class(klass): + """store toplevel options""" + assert _forgiving_issubclass(klass, Cmdln) + _inherit_attr(klass, "toplevel_optparser_options", [], cp=lambda l: l[:]) + klass.toplevel_optparser_options.append( (args, kwargs) ) + return klass + + #XXX Is there a possible optimization for many options to not have a + # large stack depth here? + def decorate(obj): + if _forgiving_issubclass(obj, Cmdln): + return decorate_class(obj) + else: + return decorate_sub_command(obj) + return decorate + + +class Cmdln(RawCmdln): + """An improved (on cmd.Cmd) framework for building multi-subcommand + scripts (think "svn" & "cvs") and simple shells (think "pdb" and + "gdb"). + + A simple example: + + import cmdln + + class MySVN(cmdln.Cmdln): + name = "svn" + + @cmdln.aliases('stat', 'st') + @cmdln.option('-v', '--verbose', action='store_true' + help='print verbose information') + def do_status(self, subcmd, opts, *paths): + print "handle 'svn status' command" + + #... + + if __name__ == "__main__": + shell = MySVN() + retval = shell.main() + sys.exit(retval) + + 'Cmdln' extends 'RawCmdln' by providing optparse option processing + integration. See this class' _dispatch_cmd() docstring and general + cmdln document for more information. + """ + + def _dispatch_cmd(self, handler, argv): + """Introspect sub-command handler signature to determine how to + dispatch the command. The raw handler provided by the base + 'RawCmdln' class is still supported: + + def do_foo(self, argv): + # 'argv' is the vector of command line args, argv[0] is + # the command name itself (i.e. "foo" or an alias) + pass + + In addition, if the handler has more than 2 arguments option + processing is automatically done (using optparse): + + @cmdln.option('-v', '--verbose', action='store_true') + def do_bar(self, subcmd, opts, *args): + # subcmd = <"bar" or an alias> + # opts = + if opts.verbose: + print "lots of debugging output..." + # args = + for arg in args: + bar(arg) + + TODO: explain that "*args" can be other signatures as well. + + The `cmdln.option` decorator corresponds to an `add_option()` + method call on an `optparse.OptionParser` instance. + + You can declare a specific number of arguments: + + @cmdln.option('-v', '--verbose', action='store_true') + def do_bar2(self, subcmd, opts, bar_one, bar_two): + #... + + and an appropriate error message will be raised/printed if the + command is called with a different number of args. + """ + co_argcount = handler.__func__.__code__.co_argcount + if co_argcount == 2: # handler ::= do_foo(self, argv) + return handler(argv) + elif co_argcount >= 3: # handler ::= do_foo(self, subcmd, opts, ...) + try: + optparser = handler.optparser + except AttributeError: + optparser = handler.__func__.optparser = SubCmdOptionParser() + assert isinstance(optparser, SubCmdOptionParser) + + # apply subcommand options' defaults from config files, if any. + subcmd = handler.__name__.split('do_', 1)[1] + optparser.set_defaults(**self.get_option_defaults(subcmd)) + + optparser.set_cmdln_info(self, argv[0]) + try: + opts, args = optparser.parse_args(argv[1:]) + except StopOptionProcessing: + #TODO: this doesn't really fly for a replacement of + # optparse.py behaviour, does it? + return 0 # Normal command termination + + try: + return handler(argv[0], opts, *args) + except TypeError: + _, ex, _ = sys.exc_info() + # Some TypeError's are user errors: + # do_foo() takes at least 4 arguments (3 given) + # do_foo() takes at most 5 arguments (6 given) + # do_foo() takes exactly 5 arguments (6 given) + # do_foo() takes exactly 5 positional arguments (6 given) + # Raise CmdlnUserError for these with a suitably + # massaged error message. + tb = sys.exc_info()[2] # the traceback object + if tb.tb_next is not None: + # If the traceback is more than one level deep, then the + # TypeError do *not* happen on the "handler(...)" call + # above. In that we don't want to handle it specially + # here: it would falsely mask deeper code errors. + raise + msg = ex.args[0] + match = _INCORRECT_NUM_ARGS_RE.search(msg) + if match: + msg = list(match.groups()) + msg[1] = int(msg[1]) - 3 + if msg[1] == 1: + msg[2] = msg[2].replace("arguments", "argument") + msg[3] = int(msg[3]) - 3 + msg = ''.join(map(str, msg)) + raise CmdlnUserError(msg) + else: + raise + else: + raise CmdlnError("incorrect argcount for %s(): takes %d, must " + "take 2 for 'argv' signature or 3+ for 'opts' " + "signature" % (handler.__name__, co_argcount)) + + + +#---- support for generating `man` page output from a Cmdln class + +def man_sections_from_cmdln(inst, summary=None, description=None, author=None): + """Return man page sections appropriate for the given Cmdln instance. + Join these sections for man page content. + + The man page sections generated are: + NAME + SYNOPSIS + DESCRIPTION (if `description` is given) + OPTIONS + COMMANDS + HELP TOPICS (if any) + + @param inst {Cmdln} Instance of Cmdln subclass for which to generate + man page content. + @param summary {str} A one-liner summary of the command. + @param description {str} A description of the command. If given, + it will be used for a "DESCRIPTION" section. + @param author {str} The author name and email for the AUTHOR secion + of the man page. + @raises {ValueError} if man page content cannot be generated for the + given class. + """ + if not inst.__class__.name: + raise ValueError("cannot generate man page content: `name` is not " + "set on class %r" % inst.__class__) + data = { + "name": inst.name, + "ucname": inst.name.upper(), + "date": datetime.date.today().strftime("%b %Y"), + "cmdln_version": __version__, + "version_str": inst.version and " %s" % inst.version or "", + "summary_str": summary and r" \- %s" % summary or "", + } + + sections = [] + sections.append('.\\" Automatically generated by cmdln %(cmdln_version)s\n' + '.TH %(ucname)s "1" "%(date)s" "%(name)s%(version_str)s" "User Commands"\n' + % data) + sections.append(".SH NAME\n%(name)s%(summary_str)s\n" % data) + sections.append(_dedent(r""" + .SH SYNOPSIS + .B %(name)s + [\fIGLOBALOPTS\fR] \fISUBCOMMAND \fR[\fIOPTS\fR] [\fIARGS\fR...] + .br + .B %(name)s + \fIhelp SUBCOMMAND\fR + """) % data) + if description: + sections.append(".SH DESCRIPTION\n%s\n" % description) + + section = ".SH OPTIONS\n" + if not hasattr(inst, "optparser") is None: + #HACK: In case `.main()` hasn't been run. + inst.optparser = inst.get_optparser() + lines = inst._help_preprocess("${option_list}", None).splitlines(False) + for line in lines[1:]: + line = line.lstrip() + if not line: + continue + section += ".TP\n" + opts, desc = line.split(' ', 1) + section += ".B %s\n" % opts + section += "%s\n" % _dedent(desc.lstrip(), skip_first_line=True) + sections.append(section) + + section = ".SH COMMANDS\n" + cmds = inst._get_cmds_data() + for cmdstr, doc in cmds: + cmdname = cmdstr.split(' ')[0] # e.g. "commit (ci)" -> "commit" + doc = inst._help_reindent(doc, indent="") + doc = inst._help_preprocess(doc, cmdname) + doc = doc.rstrip() + "\n" # trim down trailing space + section += '.PP\n.SS %s\n%s\n' % (cmdstr, doc) + sections.append(section) + + help_names = inst._get_help_names() + if help_names: + section = ".SH HELP TOPICS\n" + for help_name, help_meth in sorted(help_names.items()): + help = help_meth(inst) + help = inst._help_reindent(help, indent="") + section += '.PP\n.SS %s\n%s\n' % (help_name, help) + sections.append(section) + + if author: + sections.append(".SH AUTHOR\n%s\n" % author) + + return sections + + + +#---- internal support functions + +def _inherit_attr(klass, attr, default, cp): + """Inherit the attribute from the base class + + Copy `attr` from base class (otherwise use `default`). Copying is done using + the passed `cp` function. + + The motivation behind writing this function is to allow inheritance among + Cmdln classes where base classes set 'common' options using the + `@cmdln.option` decorator. To ensure this, we must not write to the base + class's options when handling the derived class. + """ + if attr not in klass.__dict__: + if hasattr(klass, attr): + value = cp(getattr(klass, attr)) + else: + value = default + setattr(klass, attr, value) + +def _forgiving_issubclass(derived_class, base_class): + """Forgiving version of ``issubclass`` + + Does not throw any exception when arguments are not of class type + """ + return (type(derived_class) is ClassType and \ + type(base_class) is ClassType and \ + issubclass(derived_class, base_class)) + +def _format_linedata(linedata, indent, indent_width): + """Format specific linedata into a pleasant layout. + + "linedata" is a list of 2-tuples of the form: + (, ) + "indent" is a string to use for one level of indentation + "indent_width" is a number of columns by which the + formatted data will be indented when printed. + + The column is held to 30 columns. + """ + lines = [] + WIDTH = 78 - indent_width + SPACING = 2 + NAME_WIDTH_LOWER_BOUND = 13 + NAME_WIDTH_UPPER_BOUND = 30 + NAME_WIDTH = max([len(s) for s,d in linedata]) + if NAME_WIDTH < NAME_WIDTH_LOWER_BOUND: + NAME_WIDTH = NAME_WIDTH_LOWER_BOUND + elif NAME_WIDTH > NAME_WIDTH_UPPER_BOUND: + NAME_WIDTH = NAME_WIDTH_UPPER_BOUND + + DOC_WIDTH = WIDTH - NAME_WIDTH - SPACING + for namestr, doc in linedata: + line = indent + namestr + if len(namestr) <= NAME_WIDTH: + line += ' ' * (NAME_WIDTH + SPACING - len(namestr)) + else: + lines.append(line) + line = indent + ' ' * (NAME_WIDTH + SPACING) + line += _summarize_doc(doc, DOC_WIDTH) + lines.append(line.rstrip()) + return lines + +def _summarize_doc(doc, length=60): + r"""Parse out a short one line summary from the given doclines. + + "doc" is the doc string to summarize. + "length" is the max length for the summary + + >>> _summarize_doc("this function does this") + 'this function does this' + >>> _summarize_doc("this function does this", 10) + 'this fu...' + >>> _summarize_doc("this function does this\nand that") + 'this function does this and that' + >>> _summarize_doc("this function does this\n\nand that") + 'this function does this' + """ + import re + if doc is None: + return "" + assert length > 3, "length <= 3 is absurdly short for a doc summary" + doclines = doc.strip().splitlines(0) + if not doclines: + return "" + + summlines = [] + for i, line in enumerate(doclines): + stripped = line.strip() + if not stripped: + break + summlines.append(stripped) + if len(''.join(summlines)) >= length: + break + + summary = ' '.join(summlines) + if len(summary) > length: + summary = summary[:length-3] + "..." + return summary + + +def line2argv(line): + r"""Parse the given line into an argument vector. + + "line" is the line of input to parse. + + This may get niggly when dealing with quoting and escaping. The + current state of this parsing may not be completely thorough/correct + in this respect. + + >>> from cmdln import line2argv + >>> line2argv("foo") + ['foo'] + >>> line2argv("foo bar") + ['foo', 'bar'] + >>> line2argv("foo bar ") + ['foo', 'bar'] + >>> line2argv(" foo bar") + ['foo', 'bar'] + + Quote handling: + + >>> line2argv("'foo bar'") + ['foo bar'] + >>> line2argv('"foo bar"') + ['foo bar'] + >>> line2argv(r'"foo\"bar"') + ['foo"bar'] + >>> line2argv("'foo bar' spam") + ['foo bar', 'spam'] + >>> line2argv("'foo 'bar spam") + ['foo bar', 'spam'] + + >>> line2argv('some\tsimple\ttests') + ['some', 'simple', 'tests'] + >>> line2argv('a "more complex" test') + ['a', 'more complex', 'test'] + >>> line2argv('a more="complex test of " quotes') + ['a', 'more=complex test of ', 'quotes'] + >>> line2argv('a more" complex test of " quotes') + ['a', 'more complex test of ', 'quotes'] + >>> line2argv('an "embedded \\"quote\\""') + ['an', 'embedded "quote"'] + + # Komodo bug 48027 + >>> line2argv('foo bar C:\\') + ['foo', 'bar', 'C:\\'] + + # Komodo change 127581 + >>> line2argv(r'"\test\slash" "foo bar" "foo\"bar"') + ['\\test\\slash', 'foo bar', 'foo"bar'] + + # Komodo change 127629 + >>> if sys.platform == "win32": + ... line2argv(r'\foo\bar') == ['\\foo\\bar'] + ... line2argv(r'\\foo\\bar') == ['\\\\foo\\\\bar'] + ... line2argv('"foo') == ['foo'] + ... else: + ... line2argv(r'\foo\bar') == ['foobar'] + ... line2argv(r'\\foo\\bar') == ['\\foo\\bar'] + ... try: + ... line2argv('"foo') + ... except ValueError, ex: + ... "not terminated" in str(ex) + True + True + True + """ + line = line.strip() + argv = [] + state = "default" + arg = None # the current argument being parsed + i = -1 + WHITESPACE = '\t\n\x0b\x0c\r ' # don't use string.whitespace (bug 81316) + while 1: + i += 1 + if i >= len(line): break + ch = line[i] + + if ch == "\\" and i+1 < len(line): + # escaped char always added to arg, regardless of state + if arg is None: arg = "" + if (sys.platform == "win32" + or state in ("double-quoted", "single-quoted") + ) and line[i+1] not in tuple('"\''): + arg += ch + i += 1 + arg += line[i] + continue + + if state == "single-quoted": + if ch == "'": + state = "default" + else: + arg += ch + elif state == "double-quoted": + if ch == '"': + state = "default" + else: + arg += ch + elif state == "default": + if ch == '"': + if arg is None: arg = "" + state = "double-quoted" + elif ch == "'": + if arg is None: arg = "" + state = "single-quoted" + elif ch in WHITESPACE: + if arg is not None: + argv.append(arg) + arg = None + else: + if arg is None: arg = "" + arg += ch + if arg is not None: + argv.append(arg) + if not sys.platform == "win32" and state != "default": + raise ValueError("command line is not terminated: unfinished %s " + "segment" % state) + return argv + + +def argv2line(argv): + r"""Put together the given argument vector into a command line. + + "argv" is the argument vector to process. + + >>> from cmdln import argv2line + >>> argv2line(['foo']) + 'foo' + >>> argv2line(['foo', 'bar']) + 'foo bar' + >>> argv2line(['foo', 'bar baz']) + 'foo "bar baz"' + >>> argv2line(['foo"bar']) + 'foo"bar' + >>> print argv2line(['foo" bar']) + 'foo" bar' + >>> print argv2line(["foo' bar"]) + "foo' bar" + >>> argv2line(["foo'bar"]) + "foo'bar" + """ + escapedArgs = [] + for arg in argv: + if ' ' in arg and '"' not in arg: + arg = '"'+arg+'"' + elif ' ' in arg and "'" not in arg: + arg = "'"+arg+"'" + elif ' ' in arg: + arg = arg.replace('"', r'\"') + arg = '"'+arg+'"' + escapedArgs.append(arg) + return ' '.join(escapedArgs) + + +# Recipe: dedent (0.1) in /Users/trentm/tm/recipes/cookbook +def _dedentlines(lines, tabsize=8, skip_first_line=False): + """_dedentlines(lines, tabsize=8, skip_first_line=False) -> dedented lines + + "lines" is a list of lines to dedent. + "tabsize" is the tab width to use for indent width calculations. + "skip_first_line" is a boolean indicating if the first line should + be skipped for calculating the indent width and for dedenting. + This is sometimes useful for docstrings and similar. + + Same as dedent() except operates on a sequence of lines. Note: the + lines list is modified **in-place**. + """ + DEBUG = False + if DEBUG: + print("dedent: dedent(..., tabsize=%d, skip_first_line=%r)"\ + % (tabsize, skip_first_line)) + indents = [] + margin = None + for i, line in enumerate(lines): + if i == 0 and skip_first_line: continue + indent = 0 + for ch in line: + if ch == ' ': + indent += 1 + elif ch == '\t': + indent += tabsize - (indent % tabsize) + elif ch in '\r\n': + continue # skip all-whitespace lines + else: + break + else: + continue # skip all-whitespace lines + if DEBUG: print("dedent: indent=%d: %r" % (indent, line)) + if margin is None: + margin = indent + else: + margin = min(margin, indent) + if DEBUG: print("dedent: margin=%r" % margin) + + if margin is not None and margin > 0: + for i, line in enumerate(lines): + if i == 0 and skip_first_line: continue + removed = 0 + for j, ch in enumerate(line): + if ch == ' ': + removed += 1 + elif ch == '\t': + removed += tabsize - (removed % tabsize) + elif ch in '\r\n': + if DEBUG: print("dedent: %r: EOL -> strip up to EOL" % line) + lines[i] = lines[i][j:] + break + else: + raise ValueError("unexpected non-whitespace char %r in " + "line %r while removing %d-space margin" + % (ch, line, margin)) + if DEBUG: + print("dedent: %r: %r -> removed %d/%d"\ + % (line, ch, removed, margin)) + if removed == margin: + lines[i] = lines[i][j+1:] + break + elif removed > margin: + lines[i] = ' '*(removed-margin) + lines[i][j+1:] + break + return lines + +def _dedent(text, tabsize=8, skip_first_line=False): + """_dedent(text, tabsize=8, skip_first_line=False) -> dedented text + + "text" is the text to dedent. + "tabsize" is the tab width to use for indent width calculations. + "skip_first_line" is a boolean indicating if the first line should + be skipped for calculating the indent width and for dedenting. + This is sometimes useful for docstrings and similar. + + textwrap.dedent(s), but don't expand tabs to spaces + """ + lines = text.splitlines(1) + _dedentlines(lines, tabsize=tabsize, skip_first_line=skip_first_line) + return ''.join(lines) + +def _get_indent(marker, s, tab_width=8): + """_get_indent(marker, s, tab_width=8) -> + (, )""" + # Figure out how much the marker is indented. + INDENT_CHARS = tuple(' \t') + start = s.index(marker) + i = start + while i > 0: + if s[i-1] not in INDENT_CHARS: + break + i -= 1 + indent = s[i:start] + indent_width = 0 + for ch in indent: + if ch == ' ': + indent_width += 1 + elif ch == '\t': + indent_width += tab_width - (indent_width % tab_width) + return indent, indent_width + +def _get_trailing_whitespace(marker, s): + """Return the whitespace content trailing the given 'marker' in string 's', + up to and including a newline. + """ + suffix = '' + start = s.index(marker) + len(marker) + i = start + while i < len(s): + if s[i] in ' \t': + suffix += s[i] + elif s[i] in '\r\n': + suffix += s[i] + if s[i] == '\r' and i+1 < len(s) and s[i+1] == '\n': + suffix += s[i+1] + break + else: + break + i += 1 + return suffix + + + +#---- bash completion support +# Note: This is still experimental. I expect to change this +# significantly. +# +# To get Bash completion for a cmdln.Cmdln class, run the following +# bash command: +# $ complete -C 'python -m cmdln /path/to/script.py CmdlnClass' cmdname +# For example: +# $ complete -C 'python -m cmdln ~/bin/svn.py SVN' svn +# +#TODO: Simplify the above so don't have to given path to script (try to +# find it on PATH, if possible). Could also make class name +# optional if there is only one in the module (common case). + +if __name__ == "__main__" and len(sys.argv) == 6: + def _log(s): + return # no-op, comment out for debugging + from os.path import expanduser + fout = open(expanduser("~/tmp/bashcpln.log"), 'a') + fout.write(str(s) + '\n') + fout.close() + + # Recipe: module_from_path (1.0.1+) + def _module_from_path(path): + import imp, os, sys + path = os.path.expanduser(path) + dir = os.path.dirname(path) or os.curdir + name = os.path.splitext(os.path.basename(path))[0] + sys.path.insert(0, dir) + try: + iinfo = imp.find_module(name, [dir]) + return imp.load_module(name, *iinfo) + finally: + sys.path.remove(dir) + + def _get_bash_cplns(script_path, class_name, cmd_name, + token, preceding_token): + _log('--') + _log('get_cplns(%r, %r, %r, %r, %r)' + % (script_path, class_name, cmd_name, token, preceding_token)) + comp_line = os.environ["COMP_LINE"] + comp_point = int(os.environ["COMP_POINT"]) + _log("COMP_LINE: %r" % comp_line) + _log("COMP_POINT: %r" % comp_point) + + try: + script = _module_from_path(script_path) + except ImportError: + _, ex, _ = sys.exc_info() + _log("error importing `%s': %s" % (script_path, ex)) + return [] + shell = getattr(script, class_name)() + cmd_map = shell._get_canonical_map() + del cmd_map["EOF"] + del cmd_map["KeyboardInterrupt"] + + # Determine if completing the sub-command name. + parts = comp_line[:comp_point].split(None, 1) + _log(parts) + if len(parts) == 1 or not (' ' in parts[1] or '\t' in parts[1]): + #TODO: if parts[1].startswith('-'): handle top-level opts + _log("complete sub-command names") + matches = {} + for name, canon_name in cmd_map.items(): + if name.startswith(token): + matches[name] = canon_name + if not matches: + return [] + elif len(matches) == 1: + return matches.keys() + elif len(set(matches.values())) == 1: + return [matches.values()[0]] + else: + return matches.keys() + + # Otherwise, complete options for the given sub-command. + #TODO: refine this so it does the right thing with option args + if token.startswith('-'): + cmd_name = comp_line.split(None, 2)[1] + try: + cmd_canon_name = cmd_map[cmd_name] + except KeyError: + return [] + handler = shell._get_cmd_handler(cmd_canon_name) + optparser = getattr(handler, "optparser", None) + if optparser is None: + optparser = SubCmdOptionParser() + opt_strs = [] + for option in optparser.option_list: + for opt_str in option._short_opts + option._long_opts: + if opt_str.startswith(token): + opt_strs.append(opt_str) + return opt_strs + + return [] + + for cpln in _get_bash_cplns(*sys.argv[1:]): + print(cpln) + + + +## -- contrib -- + +@option("-c", "--configfile", dest="configfile", default=None, + metavar='FILENAME', + help='Configuration file to read options from') +class CmdlnWithConfigParser(Cmdln): + """Cmdln with configparser support + + Add a new option -c --configfile for reading config file; and set + default values for both toplevel and command-specific options. + + See examples/cfgexample.py + """ + + class NoConfigFile(Exception): pass + + def __init__(self, default_configfile=None, *args, **kwargs): + Cmdln.__init__(self, *args, **kwargs) + self._cfgparser = None + self._default_configfile = default_configfile + + def get_optparser(self): + parser = Cmdln.get_optparser(self) + parser.set_default('configfile', self._default_configfile) + return parser + + def _load_config(self): + if not self._cfgparser: + if self.options.configfile: + self._cfgparser = ConfigParser.SafeConfigParser() + if not path.exists(self.options.configfile): + raise CmdlnUserError( + 'config file "%s" does not exist' % \ + self.options.configfile) + self._cfgparser.read(self.options.configfile) + else: + raise self.NoConfigFile + + def get_option_defaults(self, cmd): + try: + self._load_config() + except self.NoConfigFile: + return {} + else: + section = cmd or 'cmdln' + try: + return dict(self._cfgparser.items(section)) + except ConfigParser.NoSectionError: + return {} + diff --git a/clint/packages/applib/_compression.py b/clint/packages/applib/_compression.py new file mode 100644 index 0000000..d60029c --- /dev/null +++ b/clint/packages/applib/_compression.py @@ -0,0 +1,213 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +import sys +import os +from os import path +import tarfile +import zipfile +from contextlib import closing + +from applib import sh + +__all__ = ['implementors'] + + +class CompressedFile: + + def __init__(self, filename): + self.filename = filename + + def extractall_with_single_toplevel(self, f, names): + """Same as ``extractall`` but ensures a single toplevel directory + + Some compressed archives do not stick to the convension of having a + single top-level directory. For eg., + http://code.google.com/p/grapefruit/issues/detail?id=3 + + In such cases, a new toplevel directory corresponding to the name of the + compressed file (eg: 'grapefruit-0.1a3' if compressed file is named + 'grapefruit-0.1a3.tar.gz') is created and then extraction happens + *inside* that directory. + + - f: tarfile/zipefile file object + - names: List of filenames in the archive + + Return the absolute path to the toplevel directory. + """ + toplevels = _find_top_level_directories(names, sep='/') + + if len(toplevels) == 0: + raise sh.PackError('archive is empty') + elif len(toplevels) > 1: + toplevel = _archive_basename(self.filename) + os.mkdir(toplevel) + with sh.cd(toplevel): + f.extractall() + return path.abspath(toplevel) + else: + f.extractall() + toplevel = path.abspath(toplevels[0]) + assert path.exists(toplevel) + if not path.isdir(toplevel): + # eg: http://pypi.python.org/pypi/DeferArgs/0.4 + raise SingleFile('archive has a single file: %s', toplevel) + return toplevel + + + +class ZippedFile(CompressedFile): + """A zip file""" + + @staticmethod + def is_valid(filename): + return zipfile.is_zipfile(filename) + + def extract(self): + try: + f = zipfile.ZipFile(self.filename, 'r') + try: + return self.extractall_with_single_toplevel( + f, f.namelist()) + except OSError as e: + if e.errno == 17: + # http://bugs.python.org/issue6510 + raise sh.PackError(e) + # http://bugs.python.org/issue6609 + if sys.platform.startswith('win'): + if isinstance(e, WindowsError) and e.winerror == 267: + raise sh.PackError('uses Windows special name (%s)' % e) + raise + except IOError as e: + # http://bugs.python.org/issue10447 + if sys.platform == 'win32' and e.errno == 2: + raise sh.PackError('reached max path-length: %s' % e) + raise + finally: + f.close() + except (zipfile.BadZipfile, zipfile.LargeZipFile) as e: + raise sh.PackError(e) + + @classmethod + def pack(cls, paths, file): + raise NotImplementedError('pack: zip files not supported yet') + + +class TarredFile(CompressedFile): + """A tar.gz/bz2 file""" + + @classmethod + def is_valid(cls, filename): + try: + with closing(tarfile.open(filename, cls._get_mode())) as f: + return True + except tarfile.TarError: + return False + + def extract(self): + try: + f = tarfile.open(self.filename, self._get_mode()) + try: + _ensure_read_write_access(f) + return self.extractall_with_single_toplevel( + f, f.getnames()) + finally: + f.close() + except tarfile.TarError as e: + raise sh.PackError(e) + except IOError as e: + # see http://bugs.python.org/issue6584 + if 'CRC check failed' in str(e): + raise sh.PackError(e) + # See github issue #10 + elif e.errno == 22 and "invalid mode ('wb')" in str(e): + raise sh.PackError(e) + else: + raise + + @classmethod + def pack(cls, paths, file): + f = tarfile.open(file, cls._get_mode('w')) + try: + for pth in paths: + assert path.exists(pth), '"%s" does not exist' % path + f.add(pth) + finally: + f.close() + + def _get_mode(self): + """Return the mode for this tarfile""" + raise NotImplementedError() + + +class GzipTarredFile(TarredFile): + """A tar.gz2 file""" + + @staticmethod + def _get_mode(mode='r'): + assert mode in ['r', 'w'] + return mode + ':gz' + + +class Bzip2TarredFile(TarredFile): + """A tar.gz2 file""" + + @staticmethod + def _get_mode(mode='r'): + assert mode in ['r', 'w'] + return mode + ':bz2' + + +implementors = dict( + zip = ZippedFile, + tgz = GzipTarredFile, + bz2 = Bzip2TarredFile) + + +class MultipleTopLevels(sh.PackError): + """Can be extracted, but contains multiple top-level dirs""" +class SingleFile(sh.PackError): + """Contains nothing but a single file. Compressed archived is expected to + contain one directory + """ + +def _ensure_read_write_access(tarfileobj): + """Ensure that the given tarfile will be readable and writable by the + user (the client program using this API) after extraction. + + Some tarballs have u-x set on directories or u-w on files. We reset such + perms here.. so that the extracted files remain accessible for reading + and deletion as per the user's wish. + + See also: http://bugs.python.org/issue6196 + """ + dir_perm = tarfile.TUREAD | tarfile.TUWRITE | tarfile.TUEXEC + file_perm = tarfile.TUREAD | tarfile.TUWRITE + + for tarinfo in tarfileobj.getmembers(): + tarinfo.mode |= (dir_perm if tarinfo.isdir() else file_perm) + + +def _find_top_level_directories(fileslist, sep): + """Find the distinct first components in the fileslist""" + toplevels = set() + for pth in fileslist: + firstcomponent = pth.split(sep, 1)[0] + toplevels.add(firstcomponent) + return list(toplevels) + + +def _archive_basename(filename): + """Return a suitable base directory name for the given archive""" + exts = ( + '.tar.gz', + '.tgz', + '.tar.bz2', + '.bz2', + '.zip') + + filename = path.basename(filename) + + for ext in exts: + if filename.endswith(ext): + return filename[:-len(ext)] + return filename + '.dir' diff --git a/clint/packages/applib/_proc.py b/clint/packages/applib/_proc.py new file mode 100644 index 0000000..171caaa --- /dev/null +++ b/clint/packages/applib/_proc.py @@ -0,0 +1,155 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +"""Process execution wrappers +""" + +from __future__ import unicode_literals +import os +import sys +import time +import subprocess +from tempfile import TemporaryFile +import warnings + +from applib.misc import xjoin +from applib.misc import safe_unicode + +__all__ = ['run', 'RunError', 'RunNonZeroReturn', 'RunTimedout'] + +warnings.filterwarnings('ignore', message='.*With\-statements.*', + category=DeprecationWarning) + + +class RunError(Exception): + + def __init__(self, cmd, stdout, stderr, errors): + self.stdout = stdout + self.stderr = stderr + + msg = errors[:] + msg.extend([ + 'command: {0}'.format(safe_unicode(cmd)), + 'pwd: {0}'.format(xjoin(os.getcwd()))]) + + if stderr is None: + msg.append( + 'OUTPUT:\n{0}'.format(_limit_str(safe_unicode(stdout)))) + else: + msg.extend([ + 'STDERR:\n{0}'.format(_limit_str(safe_unicode(stderr))), + 'STDOUT:\n{0}'.format(_limit_str(safe_unicode(stdout)))]) + + super(RunError, self).__init__('\n'.join(msg)) + + +class RunNonZeroReturn(RunError): + """The command returned non-zero exit code""" + + def __init__(self, p, cmd, stdout, stderr): + super(RunNonZeroReturn, self).__init__(cmd, stdout, stderr, [ + 'non-zero returncode: {0}'.format(p.returncode) + ]) + + +class RunTimedout(RunError): + """process is taking too much time""" + + def __init__(self, cmd, timeout, stdout, stderr): + super(RunTimedout, self).__init__(cmd, stdout, stderr, [ + 'timed out; ergo process is terminated', + 'seconds elapsed: {0}'.format(timeout), + ]) + + +# TODO: support for incremental results (sometimes a process run for a few +# minutes, but we need to send the stdout as soon as it appears. +def run(cmd, merge_streams=False, timeout=None, env=None): + """Improved replacement for commands.getoutput() + + The following features are implemented: + + - timeout (in seconds) + - support for merged streams (stdout+stderr together) + + `cmd` can be a full command string, or list of prog/args. + + Note that returned data is of *undecoded* str/bytes type (not unicode) + + Return (stdout, stderr) + """ + if isinstance(cmd, (list, tuple)): + shell = False + else: + shell = True + # Fix for cmd.exe quote issue. See comment #3 and #4 in + # http://firefly.activestate.com/sridharr/pypm/ticket/126#comment:3 + if sys.platform.startswith('win') and cmd.startswith('"'): + cmd = '"{0}"'.format(cmd) + + # redirect stdout and stderr to temporary *files* + with TemporaryFile() as outf: + with TemporaryFile() as errf: + p = subprocess.Popen(cmd, env=env, shell=shell, stdout=outf, + stderr=outf if merge_streams else errf) + + if timeout is None: + p.wait() + else: + # poll for terminated status till timeout is reached + t_nought = time.time() + seconds_passed = 0 + while True: + if p.poll() is not None: + break + seconds_passed = time.time() - t_nought + if timeout and seconds_passed > timeout: + p.terminate() + raise RunTimedout( + cmd, timeout, + _read_tmpfd(outf), + None if merge_streams else _read_tmpfd(errf)) + time.sleep(0.1) + + # the process has exited by now; nothing will to be written to + # outfd/errfd anymore. + stdout = _read_tmpfd(outf) + stderr = _read_tmpfd(errf) + + if p.returncode != 0: + raise RunNonZeroReturn(p, cmd, stdout, None if merge_streams else stderr) + else: + return stdout, stderr + + +def _read_tmpfd(fil): + """Read from a temporary file object + + Call this method only when nothing more will be written to the temporary + file - i.e., all the writing has already been done. + """ + fil.seek(0) + return fil.read() + + +def _limit_str(s, maxchars=80*15): + if len(s) > maxchars: + return '[...]\n' + s[-maxchars:] + return s + + +def _disable_windows_error_popup(): + """Set error mode to disable Windows error popup + + This setting is effective for current process and all the child processes + """ + # disable nasty critical error pop-ups on Windows + import win32api, win32con + win32api.SetErrorMode(win32con.SEM_FAILCRITICALERRORS | + win32con.SEM_NOOPENFILEERRORBOX) +if sys.platform.startswith('win'): + try: + import win32api + except ImportError: + pass # XXX: this means, you will get annoying popups + else: + _disable_windows_error_popup() diff --git a/clint/packages/applib/_simpledb.py b/clint/packages/applib/_simpledb.py new file mode 100644 index 0000000..e3b1720 --- /dev/null +++ b/clint/packages/applib/_simpledb.py @@ -0,0 +1,269 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +"""Simple wrapper around SQLalchemy + +This module hides the complexity of SQLAlchemy to provide a simple interface to +store and manipulate Python objects each with a set of properties. Unlike the +default behaviour of sqlalchemy's declaritive_base, inheritance of objects will +not require "join", rather it creates a separate table. This makes it easy to +use objects around from parts of not-so-related applications. + +For example, a ``SourcePackage`` table is created by Grail. Then, PyPM will +extend it as ``BinaryPackage`` which gets extended to ``RepoPackage``. The table +for RepoPackage will be concretely inherited, meaning - there will be just be +one table without having to 'join' to another SourcePackage table. + +At the moment, PyPM and Grail use this module. It may not be of use to others, +and we may change the api/behaviour. Hence, it makes sense to keep it as an +internal module. +""" + +import sys +import os +from os.path import exists, dirname +from contextlib import contextmanager +import json + +from sqlalchemy import Table, Column, MetaData +from sqlalchemy import create_engine +from sqlalchemy.types import String, Text, Boolean, PickleType +from sqlalchemy.orm import sessionmaker, scoped_session, mapper + + +# A PickleType that will work on both Python 2.x and 3.x +# i.e., if you *write* to a DB entry using Python 3.x, we are letting +# Python 3.x apps to read from it as well. +# WARNING: Ideally, if you are starting a new project, please +# use something else like JSON. See +# http://twitter.com/zzzeek/status/9765871731867648 +Pickle2Type = PickleType(protocol=2) + + +def setup(db_class, simple_object_cls, primary_keys): + """A simple API to configure the metadata""" + table_name = simple_object_cls.__name__ + column_names = simple_object_cls.FIELDS + + metadata = MetaData() + table = Table(table_name, metadata, + *[Column(cname, _get_best_column_type(cname), + primary_key=cname in primary_keys) + for cname in column_names]) + + db_class.metadata = metadata + db_class.mapper_class = simple_object_cls + db_class.table = table + + mapper(simple_object_cls, table) + + +def sqlalchemy_escape(val, escape_char, special_chars): + """Escape a string according for use in LIKE operator + + >>> sqlalchemy_escape("text_table", "\\", "%_") + 'text\_table' + """ + if sys.version_info[:2] >= (3, 0): + assert isinstance(val, str) + else: + assert isinstance(val, basestring) + result = [] + for c in val: + if c in special_chars + escape_char: + result.extend(escape_char + c) + else: + result.extend(c) + return ''.join(result) + + +class SimpleDatabase(object): + metadata = None # to be set up derived classes + + class DoesNotExist(IOError): + def __init__(self, path): + super(IOError, self).__init__( + 'database file %s does not exist' % path) + + def __init__(self, path, touch=False): + """ + touch - create database, if it does not exist + """ + self.path = path + sqlite_uri = 'sqlite:///%s' % self.path + self.engine = create_engine(sqlite_uri, echo=False) + self.create_session = sessionmaker( + bind=self.engine, + autocommit=False, + + # See the comment by Michael Bayer + # http://groups.google.com/group/sqlalchemy/browse_thread/thread/7c1eb642435adde7 + # expire_on_commit=False + ) + self.create_scoped_session = scoped_session(self.create_session) + + if not exists(self.path): + if touch: + assert exists(dirname(self.path)), 'missing: ' + dirname(self.path) + self.metadata.create_all(self.engine) + else: + raise self.DoesNotExist(path) + + def reset(self): + """Reset the database + + Drop all tables and recreate them + """ + self.metadata.drop_all(self.engine) + self.metadata.create_all(self.engine) + + def close(self): + self.engine.dispose() + + @contextmanager + def transaction(self, session=None): + """Start a new transaction based on the passed session object. If session + is not passed, then create one and make sure of closing it finally. + """ + local_session = None + if session is None: + local_session = session = self.create_scoped_session() + try: + yield session + finally: + # Since ``local_session`` was created locally, close it here itself + if local_session is not None: + # but wait! + # http://groups.google.com/group/sqlalchemy/browse_thread/thread/7c1eb642435adde7 + # To workaround this issue with sqlalchemy, we can either: + # 1) pass the session object explicitly + # 2) do not close the session at all (bad idea - could lead to memory leaks) + # + # Till pypm implements atomic transations in client.installer, + # we retain this hack (i.e., we choose (2) for now) + pass # local_session.close() + + def __str__(self): + return '{0.__class__.__name__}<{0.path}>'.format(self) + + +class SimpleObject(object): + """Object with a collection of fields. + + The following features are supported: + + 1) Automatically initialize the fields in __init__ + 2) Inherit and extend with additional fields + 2) Ability to convert from other object types (with extra/less fields) + 3) Interoperate with sqlalchemy.orm (i.e., plain `self.foo=value` works) + """ + + # Public fields in this object + FIELDS = [] + + def __init__(self, **kwargs): + """Initialize the object with FIELDS whose values are in ``kwargs``""" + self.__assert_field_mapping(kwargs) + for field in self.FIELDS: + setattr(self, field, kwargs[field]) + + @classmethod + def create_from(cls, another, **kwargs): + """Create from another object of different type. + + Another object must be from a derived class of SimpleObject (which + contains FIELDS) + """ + reused_fields = {} + for field, value in another.get_fields(): + if field in cls.FIELDS: + reused_fields[field] = value + reused_fields.update(kwargs) + return cls(**reused_fields) + + def get_fields(self): + """Return fields as a list of (name,value)""" + for field in self.FIELDS: + yield field, getattr(self, field) + + def to_dict(self): + return dict(self.get_fields()) + + def to_json(self): + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_string): + values = json.loads(json_string) + return cls(**_remove_unicode_keys(values)) + + def __assert_field_mapping(self, mapping): + """Assert that mapping.keys() == FIELDS. + + The programmer is not supposed to pass extra/less number of fields + """ + passed_keys = set(mapping.keys()) + class_fields = set(self.FIELDS) + + if passed_keys != class_fields: + raise ValueError('\n'.join([ + "{0} got different fields from expected".format( + self.__class__), + " got : {0}".format(list(sorted(passed_keys))), + " expected: {0}".format(list(sorted(class_fields)))])) + + +class _get_best_column_type(): + """Return the best column type for the given name.""" + mapping = dict( + name = String, + version = String, + keywords = String, + home_page = String, + license = String, + author = String, + author_email = String, + maintainer = String, + maintainer_email = String, + osarch = String, + pyver = String, + pkg_version = String, + relpath = String, + tags = String, + original_source = String, + patched_source = String, + + summary = Text, + description = Text, + + python3 = Boolean, + metadata_hash = String, + + install_requires = Pickle2Type, + files_list = Pickle2Type, + ) + + def __call__(self, name): + try: + return self.mapping[name] + except KeyError: + raise KeyError( + 'missing key. add type for "{0}" in self.mapping'.format( + name)) +_get_best_column_type = _get_best_column_type() + + +def _remove_unicode_keys(dictobj): + """Convert keys from 'unicode' to 'str' type. + + workaround for + """ + if sys.version_info[:2] >= (3, 0): return dictobj + + assert isinstance(dictobj, dict) + + newdict = {} + for key, value in dictobj.items(): + if type(key) is unicode: + key = key.encode('utf-8') + newdict[key] = value + return newdict diff --git a/clint/packages/applib/base.py b/clint/packages/applib/base.py new file mode 100644 index 0000000..1814e07 --- /dev/null +++ b/clint/packages/applib/base.py @@ -0,0 +1,70 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +"""Base module""" + +import sys +from os.path import abspath, join, expanduser +import logging + +from appdirs import AppDirs + +from applib import location, log +from applib.log import LogawareCmdln as Cmdln + +__all__ = ['Application', 'Cmdln'] + + +class Application(object): + """Object representing the application + + - name: Name of the application + + - company: Company developing the application + + - compatibility_version: The major version which promises + backward-compatability among all of its minor + versions. Eg: 5.2; 5.2.1, 5.2.2, etc.. should + use the same compatability version (5.2). This + value is used in the settings directory path. + + - locations: An object holding a set of OS-specific but + generic location values (eg: APPDATA). See + ``Locations`` class for details. + """ + + def __init__(self, name, company, compatibility_version=None): + self.name = name + self.company = company + self.compatibility_version = compatibility_version + self.locations = AppDirs2( + name, company, compatibility_version, roaming=False) + + def run(self, cmdln_class): + """Run the application using the given cmdln processor. + + This method also ensures configuration of logging handlers for console + """ + assert issubclass(cmdln_class, Cmdln) + l = logging.getLogger('') + log.setup_trace(l, self.locations.log_file_path) + cmdln_class(install_console=True).main() + + +class AppDirs2(AppDirs): + @property + def log_file_path(self): + if sys.platform in ('win32', 'darwin'): + name = self.appname + '.log' + else: + name = self.appname.lower() + '.log' + return join(self.user_log_dir, name) + + +if __name__ == '__main__': + # self-test code + app = Application('PyPM', 'ActiveState', '0.1') + print('user_data_dir', app.locations.user_data_dir) + print('site_data_dir', app.locations.site_data_dir) + print('user_cache_dir', app.locations.user_cache_dir) + print('log_file_path', app.locations.log_file_path) + diff --git a/clint/packages/applib/location.py b/clint/packages/applib/location.py new file mode 100644 index 0000000..dc6412e --- /dev/null +++ b/clint/packages/applib/location.py @@ -0,0 +1,14 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +# This module is deprecated + +from appdirs import * + + +#---- self test code + +if __name__ == "__main__": + print("applib: user data dir: %s" % user_data_dir("Komodo", "ActiveState")) + print("applib: site data dir: %s" % site_data_dir("Komodo", "ActiveState")) + print("applib: user cache dir: %s" % user_cache_dir("Komodo", "ActiveState")) + diff --git a/clint/packages/applib/log.py b/clint/packages/applib/log.py new file mode 100644 index 0000000..a46792d --- /dev/null +++ b/clint/packages/applib/log.py @@ -0,0 +1,336 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +"""Logging utilities and console integration + +Don't use print, use ``LOG.info``. This ensures seemless integration with +application logging. +""" + +import sys +import os +import stat +from os.path import expanduser, join, exists, isabs, dirname +import logging +from datetime import datetime +from contextlib import contextmanager + +from applib import sh, textui, _cmdln as cmdln + + +if sys.hexversion > 0x03000000: + def unicode_literal(s): + return s # strings are unicode by default on py3 +else: + from io import open + def unicode_literal(s): + return s.decode('utf-8') + + +@cmdln.option('-v', '--verbose', action="count", dest='verbosity_level', + default=None, + help='-v will show tracebacks; -vv also debug messages') +class LogawareCmdln(cmdln.CmdlnWithConfigParser): + """A Cmdln class that integrates with this modules's functionality + + 1. Add -v and -vv global options: show tracebacks when sub commands throw + them only if -v or -vv is passed by the user. + + 2. Wrap all sub command methods and call `initialize` (to be defined by the + derived class) automatically. + """ + + def __init__(self, install_console=False, default_verbosity=0, *args, **kwargs): + """ + Arguments: + - install_console: install console handlers in logger + """ + cmdln.CmdlnWithConfigParser.__init__(self, *args, **kwargs) + self.__initialized = False + self.__install_console = install_console + self.__default_verbosity = default_verbosity + + def initialize(self): + """This method is called by ``bootstrapped`` - once and only once.""" + raise NotImplementedError('must be defined by the derived class') + + @contextmanager + def bootstrapped(self): + """Run the sub-command after bootstrapping + + It is required to wrap the sub-command code in this context, which takes + care of the following: + + - Invokes `setup_console` passing `verbosity_level` + - Invokes `self.initialize` automatically but no more than once. + - Intercept unhandled exceptions and display them according to + verbosity_level + """ + l = logging.getLogger('') + + if not self.__initialized: + # install console (if required) and call the `initialize` method + # once. + if self.__install_console: + if self.options.verbosity_level is None: + self.options.verbosity_level = self.__default_verbosity + setup_console(l, self.options.verbosity_level) + with self.__run_safely(l): + self.initialize() + self.__initialized = True + + with self.__run_safely(l): + yield + + @contextmanager + def __run_safely(self, l): + try: + yield + except KeyboardInterrupt: + # user presses Ctrl-C to terminate the program + l.info('') # print a new-line for the shell prompt's sake + sys.exit(5) + except Exception as e: + if self.__install_console: + # setup_console handles all exceptions; let it do so by calling + # log.exception and exitting immediately. + l.exception(e) + sys.exit(1) # exit to shell + else: + # as setup_console is not used, raise exceptions normally. + raise + + +def setup_console(l, verbosity_level): + """Setup console output for logging calls""" + l.setLevel(logging.DEBUG) # level-logic is instead in the handler + + existing_consoles = [h for h in l.handlers if isinstance(h, ConsoleHandler)] + if existing_consoles: + assert len(existing_consoles) == 1, \ + 'more than one console installed. not possible.' + # re-use existing console handler + h = existing_consoles[0] + assert h.verbosity_level == verbosity_level, \ + 'already has console with different verbosity level' + else: + # create a new console handler + h = ConsoleHandler(verbosity_level) + h.setFormatter(ConsoleFormatter()) + l.addHandler(h) + + +def setup_trace(l, tracefile): + """Trace logging calls to a standard log file + + Log file name and location will be determined based on platform. + """ + l.setLevel(logging.DEBUG) # trace file must have >=DEBUG entries + sh.mkdirs(dirname(tracefile)) + _rollover_log(tracefile) + _begin_log_section(tracefile) + h = logging.FileHandler(tracefile) + h.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s")) + l.addHandler(h) + + +@contextmanager +def handledby(l, filename, create_dirs=False, level=None, formatter=None): + """Momentarily handle logger `l` using FileHandler + + Within the 'with' context, all logging calls made to the logger `l` will get + written to the file `filename`. When exiting the 'with' context, this file + is closed and the handler will be removed. + """ + assert isabs(filename), 'not an absolute path: {0}'.format(filename) + + if create_dirs: + sh.mkdirs(dirname(filename)) + + h = logging.FileHandler(filename) + if level: + h.setLevel(level) + if formatter: + h.setFormatter(formatter) + l.addHandler(h) + + try: + yield + finally: + h.close() + l.removeHandler(h) + + +@contextmanager +def archivedby(l, logs_directory, entity_name, level=None, formatter=None): + """Like `handledby` but the log file is stored in archive. + + The exact path to the log file is determined as follows: + + $logs_directory/2009/03/24/142356_$entity_name.txt + """ + now = datetime.now() # NOTE: this is local time, not UTC time. + filename = join(logs_directory, + now.strftime('%Y'), now.strftime('%m'), now.strftime('%d'), + '{0}_{1}.txt'.format(now.strftime('%H%M%S'), entity_name)) + assert not exists(filename), 'already exists: {0}'.format(filename) + with handledby(l, filename, create_dirs=True, + level=level, formatter=formatter): + yield filename + + +@contextmanager +def wrapped(l): + """'With' context to intercept and log any exceptions raised""" + try: + yield + except Exception as e: + l.exception(e) + raise + + +def runonconsole(l): + """Run on console .. and exit the program appropriately. + + If an exception is raised, it is silently logged (so + + + >>> with log.run(logging.getLogger('pypm')) as retcode: + """ + try: + yield + except Exception as e: + l.exception(e) + sys.exit(1) + sys.exit(0) + + +try: + from logging import NullHandler +except ImportError: + class NullHandler(logging.Handler): + def emit(self, record): + pass + + +# -- internal + +def _rollover_log(logfile, maxsize=(2<<20)): + """Move $logfile to $logfile.old if its size exceeds `maxsize`""" + if exists(logfile): + filesize = os.stat(logfile)[stat.ST_SIZE] + if filesize >= maxsize: + sh.mv(logfile, logfile+'.old') + + +def _begin_log_section(logfile): + """Begin a new section in the logfile + + Also write the current datetime + """ + LINE_BUFFERED=1 + with open(logfile, 'a', LINE_BUFFERED, encoding='utf-8') as f: + f.write(unicode_literal('\n')) # sections are separated by newline + f.write(unicode_literal('{0}\n'.format(datetime.now()))) + + +class ConsoleHandler(logging.StreamHandler): + """Send messages to console + + INFO messages are sent to stdout. Other levels to stderr. + + By default, INFO/WARN/ERROR messages are sent as-it-is to console .. while + EXCEPTION messages are pruned and shown as error unless verbosity level is + greater than zero. If verbosity level is greater than one, then DEBUG + messages are also shown. + """ + + def __init__(self, verbosity_level): + logging.StreamHandler.__init__(self) + self.stream = None # reset it; we are not going to use it anyway + self.verbosity_level = verbosity_level + + def emit(self, record): + if record.levelno == logging.INFO: + self.__emit(record, sys.stdout) + elif record.levelno == logging.WARN: + self.__emit(record, sys.stderr) + elif record.levelno == logging.DEBUG: + # show DEBUG messages with verbosity_level >= 2 + if self.verbosity_level > 1: + self.__emit(record, sys.stderr) + elif record.levelno >= logging.ERROR: + if record.exc_info and self.verbosity_level < 1: + # supress full traceback with verbosity_level <= 0 + with new_record_exc_info(record, None): + self.__emit(record, sys.stderr) + else: + self.__emit(record, sys.stderr) + else: + raise NotImplementedError( + "don't know about level: {0}".format(record.levelno)) + + def __emit(self, record, strm): + # override handler stream with ours (which could stdout or stderr) + self.stream = strm + + with textui.safe_output(): + # We *trust* that `logging` module's `emit()` will always terminate the + # message with newlines. This is essential for not breaking the progress + # bar, if any. + logging.StreamHandler.emit(self, record) + + def flush(self): + # Workaround a bug in logging module + # See: + # http://bugs.python.org/issue6333 + if self.stream and hasattr(self.stream, 'flush') and not self.stream.closed: + try: + logging.StreamHandler.flush(self) + except IOError as e: + if e.errno == 32: + # skip 'broken pipe' errors that likely occur due to + # killing the process on the other end of the pipe + # eg: piping command output to `less` and then pressing + # Q in the middle of it. + pass + else: + raise + + +def _clear_record_traceback_cache(record): + """Clear the traceback cache stored in `record` (LogRecord) + + Workaround for: http://bugs.python.org/issue6435 + """ + record.exc_text = None + + +@contextmanager +def new_record_exc_info(record, exc_info): + """Temporarily assign `exc_info` to `record`""" + _clear_record_traceback_cache(record) + old_exc_info = record.exc_info + record.exc_info = exc_info + try: + yield + finally: + record.exc_info = old_exc_info + _clear_record_traceback_cache(record) + + +class ConsoleFormatter(logging.Formatter): + """A formatter that attaches 'error:' prefix to error/critical messages""" + + def format(self, record): + # attach 'error:' prefix to error/critical messages + # attach 'warning:' prefix accordingly + s = logging.Formatter.format(self, record) + if record.levelno >= logging.ERROR: + return 'error: {0}'.format(s) + elif record.levelno == logging.WARNING: + return 'warning: {0}'.format(s) + else: + return s + diff --git a/clint/packages/applib/misc.py b/clint/packages/applib/misc.py new file mode 100644 index 0000000..80b1bb0 --- /dev/null +++ b/clint/packages/applib/misc.py @@ -0,0 +1,104 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +"""Miscelleneous utility functions +""" + +import sys +from os import path +import six + +from applib import _cmdln as cmdln + +__all__ = ['xjoin', 'existing'] + + +def xjoin(*c): + """Equivalent to normpath(abspath(join(*c)))""" + return path.normpath(path.abspath(path.join(*c))) + + +def existing(pth): + """Return path, but assert its presence first""" + assert isinstance(pth, (str, unicode)), \ + 'not of string type: %s <%s>' % (pth, type(pth)) + assert exists(pth), 'file/directory not found: %s' % pth + return pth + + +def require_option(options, option_name, details=None): + """ + >>> require_option('foo-bar') + ... + CmdlnUserError: required option, --foo-bar, is mising + + From http://twitter.com/ActiveState/status/19782350475 + 'required options' - conforming to unix standard vs being creative with + non-positional arguments. http://bit.ly/d2iiUL #python #optparse ^SR + """ + option_var_name = option_name.replace('-', '_') + if not hasattr(options, option_var_name): + raise ValueError( + "require_option: undefined option '%s'" % option_var_name) + if getattr(options, option_var_name) is None: + msg = 'required option "--{0}" is missing'.format(option_name) + if details: + msg = '%s (%s)' % (msg, details) + raise cmdln.CmdlnUserError(msg) + + +def safe_unicode(obj): + """Return the unicode/text representation of `obj` without throwing UnicodeDecodeError + + Returned value is only a *representation*, not necessarily identical. + """ + if type(obj) not in (six.text_type, six.binary_type): + obj = six.text_type(obj) + if type(obj) is six.text_type: + return obj + else: + return obj.decode(errors='ignore') + + +def _hack_unix2win_path_conversion(cmdln_options, option_names): + """Hack to convert Unix paths in cmdln options (via config file) to + Windows specific netshare location + + Config file must define the mapping as config var "unix2win_path_mapping" + """ + require_option(cmdln_options, 'unix2win_path_mapping') + + for opt in option_names: + setattr( + cmdln_options, + opt, + _cmdln_canonical_path( + cmdln_options.unix2win_path_mapping, + getattr(cmdln_options, opt))) + + +def _cmdln_canonical_path(unix2win_path_mapping, unixpath): + """Given a unix path return the platform-specific path + + On Windows, use the given mapping to translate the path. On Unix platforms, + this function essentially returns `unixpath`. + + The mapping is simply a buildout.cfg-friendly multiline string that would + get parsed as dictionary which should have path prefixes as keys, and the + translated Windows net share path as the values. See PyPM's + etc/activestate.conf for an example. + + The mapping is typically supposed to be defined in the config file under + the cmdln section. This function is used by PyPM and Grail. + """ + unix2win_path_mapping = unix2win_path_mapping or "" + + # convert buildout.cfg-style multiline mapping to a dict + m = dict([ + [x.strip() for x in line.strip().split(None, 1)] + for line in unix2win_path_mapping.splitlines() if line.strip()]) + + if sys.platform.startswith('win'): + for prefix, netsharepath in m.items(): + if unixpath.startswith(prefix): + return netsharepath + unixpath[len(prefix):] + return unixpath diff --git a/clint/packages/applib/sh.py b/clint/packages/applib/sh.py new file mode 100644 index 0000000..9e37187 --- /dev/null +++ b/clint/packages/applib/sh.py @@ -0,0 +1,203 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +"""Various shell related wrappers +""" + +import os +from os import path +import shutil +import tempfile +from fnmatch import fnmatch +from contextlib import contextmanager + +from applib._proc import * + + +# +# Compression routines +# + +class PackError(Exception): + """Error during pack or unpack""" + + +def unpack_archive(filename, pth='.'): + """Unpack the archive under ``path`` + + Return (unpacked directory path, filetype) + """ + from applib import _compression + + assert path.isfile(filename), 'not a file: %s' % filename + assert path.isdir(pth) + + for filetype, implementor in _compression.implementors.items(): + if implementor.is_valid(filename): + with cd(pth): + return (implementor(filename).extract(), filetype) + else: + raise PackError('unknown compression format: ' + filename) + + +def pack_archive(filename, files, pwd, filetype="tgz"): + """Pack the given `files` from directory `pwd` + + `filetype` must be one of ["tgz", "tbz2", "zip"] + """ + from applib import _compression + + assert path.isdir(pwd) + assert filetype in _compression.implementors, 'invalid filetype: %s' % filetype + + if path.exists(filename): + rm(filename) + + with cd(pwd): + relnames = [path.relpath(file, pwd) for file in files] + _compression.implementors[filetype].pack(relnames, filename) + + return filename + + +# +# Path/file routines +# + +def mkdirs(pth): + """Make all directories along ``pth``""" + if not path.exists(pth): + os.makedirs(pth) + else: + assert path.isdir(pth) + + +def rm(p): + """Remove the specified path recursively. Similar to `rm -rf ARG` + + Note: if ARG is a symlink, only that symlink will be removed. + """ + if path.lexists(p): + if path.isdir(p) and not path.islink(p): + shutil.rmtree(p) + else: + os.remove(p) + + +def mv(src, dest, _mkdirs=False): + """Move `src` to `dest`""" + if _mkdirs: + mkdirs(path.dirname(dest)) + shutil.move(src, dest) + + +def cp(src, dest, _mkdirs=False, ignore=None, copyperms=True): + """Copy `src` to `dest` recursively""" + assert path.exists(src) + + if _mkdirs: + mkdirs(path.dirname(dest)) + + if path.isdir(src): + _copytree(src, dest, ignore=ignore, copyperms=copyperms) + else: + shutil.copyfile(src, dest) + + +def find(pth, pattern): + """Find files or directories matching ``pattern`` under ``pth``""" + matches = [] + if path.isfile(pth): + if fnmatch(path.basename(pth), pattern): + matches.append(pth) + else: + for root, dirs, files in os.walk(pth): + matches.extend([ + path.join(root, f) for f in files+dirs + if fnmatch(f, pattern) + ]) + return matches + + +@contextmanager +def cd(pth): + """With context to temporarily change directory""" + assert path.isdir(existing(pth)), pth + + cwd = os.getcwd() + os.chdir(pth) + try: + yield + finally: + os.chdir(cwd) + + +@contextmanager +def tmpdir(prefix='tmp-', suffix=''): + """__with__ context to work in a temporary working directory + + Temporary directory will be deleted unless an exception was raised. During + the context, CWD will be changed to the temporary directory. + """ + d = tempfile.mkdtemp(prefix=prefix, suffix=suffix) + with cd(d): + yield d + rm(d) + + +def existing(pth): + """Return `pth` after checking it exists""" + if not path.exists(pth): + raise IOError('"{0}" does not exist'.format(pth)) + return pth + + +def _copytree(src, dst, symlinks=False, ignore=None, copyperms=True): + """Forked shutil.copytree for `copyperms` support""" + names = os.listdir(src) + if ignore is not None: + ignored_names = ignore(src, names) + else: + ignored_names = set() + + os.makedirs(dst) + errors = [] + for name in names: + if name in ignored_names: + continue + srcname = os.path.join(src, name) + dstname = os.path.join(dst, name) + try: + if symlinks and os.path.islink(srcname): + linkto = os.readlink(srcname) + os.symlink(linkto, dstname) + elif os.path.isdir(srcname): + _copytree(srcname, dstname, symlinks, ignore, copyperms) + else: + shutil.copy(srcname, dstname) + # XXX What about devices, sockets etc.? + except (IOError, os.error) as why: + raise + errors.append((srcname, dstname, str(why))) + # catch the Error from the recursive copytree so that we can + # continue with other files + except shutil.Error: + _, err = sys.exec_info() + errors.extend(err.args[0]) + if copyperms: + try: + shutil.copystat(src, dst) + except WindowsError: + # can't copy file access times on Windows + pass + except OSError: + _, why = sys.exec_info() + errors.extend((src, dst, str(why))) + if errors: + raise shutil.Error(errors) + + +# WindowsError is not available on other platforms +try: + WindowsError +except NameError: + class WindowsError(OSError): pass diff --git a/clint/packages/applib/test/all.py b/clint/packages/applib/test/all.py new file mode 100644 index 0000000..02ca448 --- /dev/null +++ b/clint/packages/applib/test/all.py @@ -0,0 +1,195 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +from __future__ import unicode_literals +import os +from os import path +import tempfile +import sys + +import pytest + +from applib import sh +from applib import textui +from applib.misc import safe_unicode +import six + + +fixtures = path.join(path.dirname(__file__), 'fixtures') + +def test_import(): + import applib + import applib.base + import applib.sh + import applib.textui + import applib.log + import applib.misc + + +def test_sh_runerror_unicode(): + # https://github.com/activestate/applib/issues/12 + with pytest.raises(sh.RunError): + try: + sh.run('echo ' + "\u1234" + " & nonexistant") + except sh.RunError as e: + print(safe_unicode(e)) + raise + + +def test_sh_runerror_limit(): + with pytest.raises(sh.RunError): + from random import choice + import string + LINE = ''.join([choice(string.ascii_letters) for x in range(80)]) + try: + sh.run(r'''python -c "print('\n'.join(['%s']*100)); raise SystemExit('an error');"''' % LINE) + except sh.RunError as e: + c = str(e).count(LINE) + # def _limit_str(s, maxchars=80*15): --- so ~ 15 lines (not 100 lines) + assert c < 20, "original message: %s" % e + assert '[...]' in str(e) + raise + + +def test_safe_unicode(): + from applib.misc import safe_unicode + import six + + abc_bytes = b'ab\nc' + abc_text = 'ab\nc' + + assert safe_unicode(abc_bytes) == abc_text + assert safe_unicode(abc_text) == abc_text + + foo_text = 'abc' # note: \x89 is ignored. + foo_bytes = b'\x89abc' + + assert safe_unicode(foo_text) == foo_text + assert safe_unicode(foo_bytes) == foo_text + + +def test_sh_rm_file(): + with sh.tmpdir(): + with open('afile', 'w') as f: f.close() + assert path.exists('afile') + sh.rm('afile') + assert not path.exists('afile') + + +def test_sh_rm_dir(): + with sh.tmpdir(): + sh.mkdirs('adir') + with sh.cd('adir'): + with open('afile', 'w') as f: f.close() + assert path.exists('afile') + assert path.exists('adir') + sh.rm('adir') + assert not path.exists('adir') + + +# Workaround a py.test bug: +# Error evaluating 'skipif' expression +# b'sys.platform == "win32"' +# Failed: expression is not a string +def skipif(expr): + if not six.PY3: + expr = expr.encode() + return pytest.mark.skipif(expr) + + +@skipif('sys.platform == "win32"') +def test_sh_rm_symlink(): + with sh.tmpdir(): + with open('afile', 'w') as f: f.close() + assert path.exists('afile') + os.symlink('afile', 'alink') + assert path.lexists('alink') + sh.rm('alink') + assert not path.lexists('alink') + + +@skipif('sys.platform == "win32"') +def test_sh_rm_broken_symlink(): + with sh.tmpdir(): + os.symlink('afile-notexist', 'alink') + assert not path.exists('alink') + assert path.lexists('alink') + sh.rm('alink') + assert not path.lexists('alink') + + +@skipif('sys.platform == "win32"') +def test_sh_rm_symlink_dir(): + with sh.tmpdir(): + sh.mkdirs('adir') + with sh.cd('adir'): + with open('afile', 'w') as f: f.close() + assert path.exists('afile') + assert path.exists('adir') + os.symlink('adir', 'alink') + assert path.lexists('alink') + sh.rm('alink') + assert path.exists('adir') + assert not path.lexists('alink') + + +def test_console_width_detection(): + width = textui.find_console_width() + assert width is None + + +def test_colprint(): + sample_table = [ + ['python-daemon', '4.5.7.7.3-1', 'blah foo meh yuck'], + ['foo', '6.1', ('some very loooooooooong string here .. I ' + 'suggest we make it even longer .. so longer ' + ' that normal terminal widths should entail ' + 'colprint to trim the string')]] + textui.colprint(sample_table) + + # try with empty inputs + textui.colprint(None) + textui.colprint([]) + + +def test_compression_ensure_read_access(): + """Test the ensure_read_access() hack in _compression.py""" + def test_pkg(pkgpath): + testdir = tempfile.mkdtemp('-test', 'pypm-') + extracted_dir, _ = sh.unpack_archive(pkgpath, testdir) + # check if we have read access on the directory + for child in os.listdir(extracted_dir): + p = path.join(extracted_dir, child) + if path.isdir(p): + os.listdir(p) + sh.rm(testdir) + + yield 'u-x on dirs', test_pkg, path.join(fixtures, 'generator_tools-0.3.5.tar.gz') + yield 'u-w on ._setup.py', test_pkg, path.join(fixtures, 'TracProjectMenu-1.0.tar.gz') + + +def test_compression_catch_invalid_mode(): + """Error from + tarfile.py should be handled""" + def extract(): + testdir = tempfile.mkdtemp('-test', 'pypm-') + extracted_dir, _ = sh.unpack_archive( + path.join(fixtures, 'libtele-0.2.tar.gz'), testdir) + if sys.platform == 'win32' and sys.version_info[:2] >= (2, 7): + with pytest.raises(sh.PackError): + extract() + else: + extract() + + +@pytest.mark.xfail +def test_compression_issue_11(): + """https://github.com/ActiveState/applib/issues/#issue/11 + + * Windows: IOError: [Errno 13] Permission denied: '.\\airi-0.0.1\\AIRi' + * OSX: IOError: [Errno 21] Is a directory: './airi-0.0.1/AIRi + * OSX Archive Utility (Finder): Unable to unarchive "airi-0.0.1.tar" ... + (Error 1 - Operation not permitted.) + """ + testdir = tempfile.mkdtemp('applib') + d, _ = sh.unpack_archive(path.join(fixtures, 'airi-0.0.1.tar.gz'), testdir) + diff --git a/clint/packages/applib/test/fixtures/TracProjectMenu-1.0.tar.gz b/clint/packages/applib/test/fixtures/TracProjectMenu-1.0.tar.gz new file mode 100644 index 0000000..741c65b Binary files /dev/null and b/clint/packages/applib/test/fixtures/TracProjectMenu-1.0.tar.gz differ diff --git a/clint/packages/applib/test/fixtures/airi-0.0.1.tar.gz b/clint/packages/applib/test/fixtures/airi-0.0.1.tar.gz new file mode 100644 index 0000000..19f8c5a Binary files /dev/null and b/clint/packages/applib/test/fixtures/airi-0.0.1.tar.gz differ diff --git a/clint/packages/applib/test/fixtures/generator_tools-0.3.5.tar.gz b/clint/packages/applib/test/fixtures/generator_tools-0.3.5.tar.gz new file mode 100644 index 0000000..592f4cd Binary files /dev/null and b/clint/packages/applib/test/fixtures/generator_tools-0.3.5.tar.gz differ diff --git a/clint/packages/applib/test/fixtures/libtele-0.2.tar.gz b/clint/packages/applib/test/fixtures/libtele-0.2.tar.gz new file mode 100644 index 0000000..e42a7ab Binary files /dev/null and b/clint/packages/applib/test/fixtures/libtele-0.2.tar.gz differ diff --git a/clint/packages/applib/textui.py b/clint/packages/applib/textui.py new file mode 100644 index 0000000..632db70 --- /dev/null +++ b/clint/packages/applib/textui.py @@ -0,0 +1,359 @@ +# Copyright (c) 2010 ActiveState Software Inc. All rights reserved. + +"""Textual UI: progress bar and colprint""" + +import sys +from datetime import datetime, timedelta +from contextlib import contextmanager +import logging +import math + +import six + +LOG = logging.getLogger(__name__) + +__all__ = ['colprint', 'find_console_width', 'ProgressBar', + 'clear_progress_bar', 'redraw_progress_bar', 'safe_output'] + +if not six.PY3: + input = raw_input + + +class ProgressBar(object): + """Show percent progress every 'n' seconds""" + + def __init__(self, total, delay=0.1, show_size=lambda x: x, note=None): + """ + total - total number of items that are going to be processed + delay - update delay in seconds + show_size - function to return the string to display instead of the number `size` + """ + assert total >= 0, total + assert delay >= 0, delay + assert show_size + + _set_current_progress_bar(self) + self.delay = timedelta(seconds=delay) + self.delay_duration = timedelta(seconds=1) + self.start = datetime.now() + self.elapsed = None # time elapsed from start + self.estimated_time_left = None + self.lastprint = None + self.lastprint_duration = None # for updating duration/ETA + self.lastprocessed = 0 + self.total = total + self.processed = 0 + self.show_size = show_size + self.note = note + self.duration_display = '' + self._length = 0 # current length of the progress display + + @classmethod + def iterate(cls, sequence, note=None, post=None): + """Iterate a sequence and update the progress bar accordingly + + The sequence must have a 'len' attribute if it is an arbitrary + generator. + + note -- Text to print before the progress bar + post -- Text to print at the end of progress (w/ fmt vars) + """ + p = cls(len(sequence), note=note) + clean_exit = False + try: + for item in sequence: + yield item + p.tick() + clean_exit = True + finally: + p.close() + if post and clean_exit: + sys.stdout.write(post.format(**p.__dict__) + '\n') + + def tick(self, items=1): + """The method that updates the display if necessary. + + After creating the ``PercentProgress`` object, this method must be + called for every item processed (or, pass items=ITEMS for every ITEMS + processed). + + This method must be called no more than ``self.total`` times (otherwise + you get assertion error .. implying a bug in your code) + + Return True if progress bar was redrawn. + """ + self.processed += items + assert self.processed <= self.total, \ + '{0} <= {1}'.format(self.processed, self.total) + + now = datetime.now() + if (self.lastprint == None or + (now - self.lastprint) > self.delay): + self.lastprint = now + self.redraw() + return True + else: + return False + + def clear(self): + """Erase the entire progress bar and put the cursor at first column""" + # Move cursor to the beginning of current progress line so that further + # messages will overwrite the progress bar. Also overwrite the previous + # progress bar with empty space. + sys.stdout.write('\r' + ' '*self._length + '\r') + sys.stdout.flush() + + def close(self): + """Close (hide) the progress bar + + Erase the progress bar and print the closing message in place of the + previous progress bar text. + """ + self.redraw() + self.clear() + _del_current_progress_bar(self) + + def redraw(self): + self.clear() + percent = _calculate_percent(self.processed, self.total) + now = datetime.now() + self.elapsed = now - self.start + if self.processed: + self.estimated_time_left = self.elapsed.seconds * (self.total-self.processed)/self.processed + + # Update time elapsed/left once a second only (delay_duration = 1s). + if self.elapsed.seconds and ( + self.lastprint_duration is None or \ + now - self.lastprint_duration > self.delay_duration): + + self.lastprint_duration = now + elapsed = _format_duration(self.elapsed.seconds) + if self.estimated_time_left: + self.duration_display = '({0}; {1} left)'.format( + elapsed, _format_duration(self.estimated_time_left)) + else: + self.duration_display = '({0})'.format(elapsed) + + bar_width = 20 + bar_filled = int(round(20.0/100 * percent)) + filled = ['='] * bar_filled + if filled: + filled[-1] = '>' + filled = ''.join(filled) + + progress_bar = ''.join([ + (self.note+': ') if self.note else '', + # header: + '[', + + # solid bar + filled, + + # empty space + ' ' * (bar_width-bar_filled), + + # footer + '] {0:-3}% {1}/{2} {3}'.format( + percent, + self.show_size(self.processed), + self.show_size(self.total), + self.duration_display + ) + ]) + + self._length = len(progress_bar) + sys.stdout.write('\r' + progress_bar + '\r') + sys.stdout.flush() + + +def clear_progress_bar(): + """Clear progress bar, if any""" + if _current_progress_bar: + _current_progress_bar.clear() + + +def redraw_progress_bar(): + """Redraw progress bar, if any""" + if _current_progress_bar: + _current_progress_bar.redraw() + + +@contextmanager +def safe_output(): + """Wrapper that makes it safe to print to stdout + + If a progress bar is currently being shown, this wrapper takes care of + clearing it before .. and then redrawing it after + """ + clear_progress_bar() + yield + redraw_progress_bar() + + +def askyesno(question, default): + """Ask (Y/N) type of question to the user""" + assert isinstance(default, bool), '"default" must be a boolean' + + s = '{0} ({1}/{2}) '.format( + question, + default and 'Y' or 'y', + default and 'n' or 'N') + + while True: + val = input(s).strip().lower() + + if val == '': + return default + elif val in ('y', 'yes', 'ok'): + return True + elif val in ('n', 'no'): + return False + + +# This function was written by Alex Martelli +# http://stackoverflow.com/questions/1396820/ +def colprint(table, totwidth=None): + """Print the table in terminal taking care of wrapping/alignment + + - `table`: A table of strings. Elements must not be `None` + - `totwidth`: If None, console width is used + """ + if not table: return + if totwidth is None: + totwidth = find_console_width() + if totwidth is not None: + totwidth -= 1 # for not printing an extra empty line on windows + numcols = max(len(row) for row in table) + # ensure all rows have >= numcols columns, maybe empty + padded = [row+numcols*['',] for row in table] + # compute col widths, including separating space (except for last one) + widths = [ 1 + max(len(x) for x in column) for column in zip(*padded)] + widths[-1] -= 1 + # drop or truncate columns from the right in order to fit + if totwidth is not None: + while sum(widths) > totwidth: + mustlose = sum(widths) - totwidth + if widths[-1] <= mustlose: + del widths[-1] + else: + widths[-1] -= mustlose + break + # and finally, the output phase! + for row in padded: + s = ''.join(['%*s' % (-w, i[:w]) + for w, i in zip(widths, row)]) + LOG.info(s) + + +def find_console_width(): + """Return the console width + + Return ``None`` if stdout is not a terminal (eg: a pipe) + """ + if sys.platform.startswith('win'): + return _find_windows_console_width() + else: + return _find_unix_console_width() + + +@contextmanager +def longrun(log, finalfn=lambda: None): + """Decorator for performing a long operation with consideration for the + command line. + + 1. Catch keyboard interrupts and exit gracefully + + 2. Print total time elapsed always at the end (successful or not) + + 3. Call ``finalfn`` always at the end (successful or not) + """ + start_time = datetime.now() + + try: + yield + except KeyboardInterrupt: + log.info('*** interrupted by user - Ctrl+c ***') + raise SystemExit(3) + finally: + finalfn() + end_time = datetime.now() + + log.info('') + log.info('-----') + log.info('Total time elapsed: %s', end_time-start_time) + + +def _find_unix_console_width(): + import termios, fcntl, struct, sys + + # fcntl.ioctl will fail if stdout is not a tty + if not sys.stdout.isatty(): + return None + + s = struct.pack("HHHH", 0, 0, 0, 0) + fd_stdout = sys.stdout.fileno() + size = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s) + height, width = struct.unpack("HHHH", size)[:2] + return width + + +def _find_windows_console_width(): + # http://code.activestate.com/recipes/440694/ + from ctypes import windll, create_string_buffer + STDIN, STDOUT, STDERR = -10, -11, -12 + + h = windll.kernel32.GetStdHandle(STDERR) + csbi = create_string_buffer(22) + res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) + + if res: + import struct + (bufx, bufy, curx, cury, wattr, + left, top, right, bottom, + maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) + sizex = right - left + 1 + sizey = bottom - top + 1 + return sizex + + + +def _byteshr(bytes): + """Human-readable version of bytes count""" + for x in ['bytes','KB','MB','GB','TB']: + if bytes < 1024.0: + return "%3.1f%s" % (bytes, x) + bytes /= 1024.0 + raise ValueError('cannot find human-readable version') + + +def _calculate_percent(numerator, denominator): + assert numerator <= denominator, '%d <= %d' % (numerator, denominator) + if denominator == 0: + if numerator == 0: + return 100 + else: + raise ValueError('denominator cannot be zero') + + return int(round( numerator / float(denominator) * 100 )) + + +def _format_duration(seconds): + s = [] + if seconds > 60: + s.append('{0}m'.format(int(seconds/60))) + s.append('{0}s'.format(int(seconds % 60))) + return ''.join(s) + + +# Handle to the current progress bar object. There cannot be more than one +# progress bar for obvious reasons. +_current_progress_bar = None +def _set_current_progress_bar(pbar): + global _current_progress_bar + assert _current_progress_bar is None, 'there is already a pbar' + _current_progress_bar = pbar +def _del_current_progress_bar(pbar): + global _current_progress_bar + assert _current_progress_bar is pbar, 'pbar is something else' + _current_progress_bar = None +