""" Miscellaneous utility functions and classes. .. testsetup:: * from wizard.util import * """ import os.path import os import subprocess import pwd import sys import socket import errno import itertools import wizard class ChangeDirectory(object): """ Context for temporarily changing the working directory. >>> with ChangeDirectory("/tmp"): ... print os.getcwd() /tmp """ def __init__(self, dir): self.dir = dir self.olddir = None def __enter__(self): self.olddir = os.getcwd() chdir(self.dir) def __exit__(self, *args): chdir(self.olddir) class Counter(object): """ Object for counting different values when you don't know what they are a priori. Supports index access and iteration. >>> counter = Counter() >>> counter.count("foo") >>> print counter["foo"] 1 """ def __init__(self): self.dict = {} def count(self, value): """Increments count for ``value``.""" self.dict.setdefault(value, 0) self.dict[value] += 1 def __getitem__(self, key): return self.dict[key] def __iter__(self): return self.dict.__iter__() class PipeToLess(object): """ Context for printing output to a pager. Use this if output is expected to be long. """ def __enter__(self): self.proc = subprocess.Popen("less", stdin=subprocess.PIPE) self.old_stdout = sys.stdout sys.stdout = self.proc.stdin def __exit__(self, *args): if self.proc: self.proc.stdin.close() self.proc.wait() sys.stdout = self.old_stdout def chdir(dir): """ Changes a directory, but has special exceptions for certain classes of errors. """ try: os.chdir(dir) except OSError as e: if e.errno == errno.EACCES: raise PermissionsError() elif e.errno == errno.ENOENT: raise NoSuchDirectoryError() else: raise e def dictmap(f, d): """ A map function for dictionaries. Only changes values. >>> dictmap(lambda x: x + 2, {'a': 1, 'b': 2}) {'a': 3, 'b': 4} """ return dict((k,f(v)) for k,v in d.items()) def dictkmap(f, d): """ A map function for dictionaries that passes key and value. >>> dictkmap(lambda x, y: x + y, {1: 4, 3: 4}) {1: 5, 3: 7} """ return dict((k,f(k,v)) for k,v in d.items()) def get_exception_name(output): """ Reads the traceback from a Python program and grabs the fully qualified exception name. """ lines = output.split("\n") cue = False result = "(unknown)" for line in lines[1:]: line = line.rstrip() if not line: continue if line[0] == ' ': cue = True continue if cue: cue = False if line[-1] == ":": result = line[:-1] else: result = line return result def get_dir_uid(dir): """Finds the uid of the person who owns this directory.""" return os.stat(dir).st_uid def get_dir_owner(dir = "."): """ Finds the name of the locker this directory is in. .. note:: This function uses the passwd database and thus only works on scripts servers when querying directories that live on AFS. """ pwentry = pwd.getpwuid(get_dir_uid(dir)) # XXX: Error handling! return pwentry.pw_name def get_revision(): """Returns the commit ID of the current Wizard install.""" # If you decide to convert this to use wizard.shell, be warned # that there is a circular dependency, so this function would # probably have to live somewhere else, probably wizard.git wizard_git = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".git") return subprocess.Popen(["git", "--git-dir=" + wizard_git, "rev-parse", "HEAD"], stdout=subprocess.PIPE).communicate()[0].rstrip() def get_operator_info(): """ Returns tuple of ``(realname, email)`` about the person running the script. If run from a scripts server, get info from Hesiod. Otherwise, use the passwd database (email generated probably won't actually accept mail). Useful when generating commit messages. """ username = get_operator_name_from_gssapi() if username: # scripts approach hesinfo = subprocess.Popen(["hesinfo", username, "passwd"],stdout=subprocess.PIPE).communicate()[0] fields = hesinfo.partition(",")[0] realname = fields.rpartition(":")[2] return realname, username + "@mit.edu" else: # more traditional approach, but the email probably doesn't work uid = os.getuid() if not uid: # since root isn't actually a useful designation, but maybe # SUDO_USER contains something helpful sudo_user = os.getenv("SUDO_USER") if not sudo_user: raise NoOperatorInfo pwdentry = pwd.getpwnam(sudo_user) else: pwdentry = pwd.getpwuid(uid) # XXX: error checking might be nice # We follow the Ubuntu convention of gecos being a comma split field # with the person's realname being the first entry. return pwdentry.pw_gecos.split(",")[0], pwdentry.pw_name + "@" + socket.gethostname() def get_operator_git(): """ Returns ``Real Name `` suitable for use in Git ``Something-by:`` string. """ return "%s <%s>" % get_operator_info() def get_operator_name_from_gssapi(): """ Returns username of the person operating this script based off of the :envvar:`SSH_GSSAPI_NAME` environment variable. .. note:: :envvar:`SSH_GSSAPI_NAME` is not set by a vanilla OpenSSH distributions. Scripts servers are patched to support this environment variable. """ principal = os.getenv("SSH_GSSAPI_NAME") if not principal: return None instance, _, _ = principal.partition("@") if instance.endswith("/root"): username, _, _ = principal.partition("/") else: username = instance return username def set_operator_env(): """ Sets :envvar:`GIT_COMMITTER_NAME` and :envvar:`GIT_COMMITTER_EMAIL` environment variables if applicable. Does nothing if :func:`get_operator_info` throws :exc:`NoOperatorInfo`. """ try: op_realname, op_email = get_operator_info() os.putenv("GIT_COMMITTER_NAME", op_realname) os.putenv("GIT_COMMITTER_EMAIL", op_email) except NoOperatorInfo: pass def set_author_env(): """ Sets :envvar:`GIT_AUTHOR_NAME` and :envvar:`GIT_AUTHOR_EMAIL` environment variables if applicable. Does nothing if :func:`get_dir_owner` fails. """ try: # XXX: should check if the directory is in AFS, and if not, use # a more traditional metric lockername = get_dir_owner() os.putenv("GIT_AUTHOR_NAME", "%s locker" % lockername) os.putenv("GIT_AUTHOR_EMAIL", "%s@scripts.mit.edu" % lockername) except KeyError: # XXX: This doesn't actually make sense pass def set_git_env(): """Sets all appropriate environment variables for Git commits.""" set_operator_env() set_author_env() def get_git_footer(): """Returns strings for placing in Git log info about Wizard.""" return "\n".join(["Wizard-revision: %s" % get_revision() ,"Wizard-args: %s" % " ".join(sys.argv) ]) def safe_unlink(file): """Moves a file/dir to a backup location.""" prefix = "%s.bak" % file name = None for i in itertools.count(): name = "%s.%d" % (prefix, i) if not os.path.exists(name): break os.rename(file, name) return name class NoOperatorInfo(wizard.Error): """No information could be found about the operator from Kerberos.""" pass class PermissionsError(IOError): errno = errno.EACCES class NoSuchDirectoryError(IOError): errno = errno.ENOENT