""" Miscellaneous utility functions and classes. .. testsetup:: * from wizard.util import * """ import os.path import os import subprocess import pwd import sys import socket import wizard class ChangeDirectory(object): """ Context for temporarily changing the working directory. >>> with ChangeDirectory("/tmp"): ... print os.getcwd() /tmp """ def __init__(self, dir): self.dir = dir self.olddir = None def __enter__(self): self.olddir = os.getcwd() os.chdir(self.dir) def __exit__(self, *args): os.chdir(self.olddir) class Counter(object): """ Object for counting different values when you don't know what they are a priori. Supports index access and iteration. >>> counter = Counter() >>> counter.count("foo") >>> print counter["foo"] 1 """ def __init__(self): self.dict = {} def count(self, value): """Increments count for ``value``.""" self.dict.setdefault(value, 0) self.dict[value] += 1 def __getitem__(self, key): return self.dict[key] def __iter__(self): return self.dict.__iter__() class PipeToLess(object): """ Context for printing output to a pager. Use this if output is expected to be long. """ def __enter__(self): self.proc = subprocess.Popen("less", stdin=subprocess.PIPE) self.old_stdout = sys.stdout sys.stdout = self.proc.stdin def __exit__(self, *args): if self.proc: self.proc.stdin.close() self.proc.wait() sys.stdout = self.old_stdout def dictmap(f, d): """ A map function for dictionaries. Only changes values. >>> dictmap(lambda x: x + 2, {'a': 1, 'b': 2}) {'a': 3, 'b': 4} """ return dict((k,f(v)) for k,v in d.items()) def dictkmap(f, d): """ A map function for dictionaries that passes key and value. >>> dictkmap(lambda x, y: x + y, {1: 4, 3: 4}) {1: 5, 3: 6} """ return dict((k,f(k,v)) for k,v in d.items()) def get_exception_name(output): """ Reads the traceback from a Python program and grabs the fully qualified exception name. """ lines = output.split("\n") for line in lines[1:]: # skip the "traceback" line line = line.rstrip() if line[0] == ' ': continue if line[-1] == ":": return line[:-1] else: return line def get_dir_uid(dir): """Finds the uid of the person who owns this directory.""" return os.stat(dir).st_uid def get_dir_owner(dir = "."): """ Finds the name of the locker this directory is in. .. note:: This function uses the passwd database and thus only works on scripts servers when querying directories that live on AFS. """ pwentry = pwd.getpwuid(get_dir_uid(dir)) # XXX: Error handling! return pwentry.pw_name def get_revision(): """Returns the commit ID of the current Wizard install.""" # If you decide to convert this to use wizard.shell, be warned # that there is a circular dependency, so this function would # probably have to live somewhere else, probably wizard.git wizard_git = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".git") return subprocess.Popen(["git", "--git-dir=" + wizard_git, "rev-parse", "HEAD"], stdout=subprocess.PIPE).communicate()[0].rstrip() def get_operator_info(): """ Returns tuple of ``(realname, email)`` about the person running the script. If run from a scripts server, get info from Hesiod. Otherwise, use the passwd database (email generated probably won't actually accept mail). Useful when generating commit messages. """ username = get_operator_name_from_gssapi() if username: # scripts approach hesinfo = subprocess.Popen(["hesinfo", username, "passwd"],stdout=subprocess.PIPE).communicate()[0] fields = hesinfo.partition(",")[0] realname = fields.rpartition(":")[2] return realname, username + "@mit.edu" else: # more traditional approach, but the email probably doesn't work uid = os.getuid() if not uid: # since root isn't actually a useful designation, but maybe # SUDO_USER contains something helpful sudo_user = os.getenv("SUDO_USER") if not sudo_user: raise NoOperatorInfo pwdentry = pwd.getpwnam(sudo_user) else: pwdentry = pwd.getpwuid(uid) # XXX: error checking might be nice # We follow the Ubuntu convention of gecos being a comma split field # with the person's realname being the first entry. return pwdentry.pw_gecos.split(",")[0], pwdentry.pw_name + "@" + socket.gethostname() def get_operator_git(): """ Returns ``Real Name `` suitable for use in Git ``Something-by:`` string. """ return "%s <%s>" % get_operator_info() def get_operator_name_from_gssapi(): """ Returns username of the person operating this script based off of the :envvar:`SSH_GSSAPI_NAME` environment variable. .. note:: :envvar:`SSH_GSSAPI_NAME` is not set by a vanilla OpenSSH distributions. Scripts servers are patched to support this environment variable. """ principal = os.getenv("SSH_GSSAPI_NAME") if not principal: return None instance, _, _ = principal.partition("@") if instance.endswith("/root"): username, _, _ = principal.partition("/") else: username = instance return username def set_operator_env(): """ Sets :envvar:`GIT_COMMITTER_NAME` and :envvar:`GIT_COMMITTER_EMAIL` environment variables if applicable. Does nothing if :func:`get_operator_info` throws :exc:`NoOperatorInfo`. """ try: op_realname, op_email = get_operator_info() os.putenv("GIT_COMMITTER_NAME", op_realname) os.putenv("GIT_COMMITTER_EMAIL", op_email) except NoOperatorInfo: pass def set_author_env(): """ Sets :envvar:`GIT_AUTHOR_NAME` and :envvar:`GIT_AUTHOR_EMAIL` environment variables if applicable. Does nothing if :func:`get_dir_owner` fails. """ try: # XXX: should check if the directory is in AFS, and if not, use # a more traditional metric lockername = get_dir_owner() os.putenv("GIT_AUTHOR_NAME", "%s locker" % lockername) os.putenv("GIT_AUTHOR_EMAIL", "%s@scripts.mit.edu" % lockername) except KeyError: # XXX: This doesn't actually make sense pass def set_git_env(): """Sets all appropriate environment variables for Git commits.""" set_operator_env() set_author_env() def get_git_footer(): """Returns strings for placing in Git log info about Wizard.""" return "\n".join(["Wizard-revision: %s" % get_revision() ,"Wizard-args: %s" % " ".join(sys.argv) ]) class NoOperatorInfo(wizard.Error): """No information could be found about the operator from Kerberos.""" pass