]> scripts.mit.edu Git - wizard.git/blob - wizard/shell.py
Refactor to get rid of _package.py using __import__ magic.
[wizard.git] / wizard / shell.py
1 import subprocess
2 import logging
3 import sys
4 import os
5
6 import wizard
7 from wizard import util
8
9 """This is the path to the wizard executable as specified
10 by the caller; it lets us recursively invoke wizard"""
11 wizard = sys.argv[0]
12
13 def is_python(args):
14     return args[0] == "python" or args[0] == wizard
15
16 class Shell(object):
17     """An advanced shell, with the ability to do dry-run and log commands"""
18     def __init__(self, dry = False):
19         """ `dry`       Don't run any commands, just print them"""
20         self.dry = dry
21     def call(self, *args, **kwargs):
22         kwargs.setdefault("python", None)
23         logging.info("Running `" + ' '.join(args) + "`")
24         if self.dry:
25             return
26         if kwargs["python"] is None and is_python(args):
27             kwargs["python"] = True
28         # XXX: There is a possible problem here where we can fill up
29         # the kernel buffer if we have 64KB of data.  This shouldn't
30         # be a problem, and the fix for such case would be to write to
31         # temporary files instead of a pipe.
32         # Another possible way of fixing this is converting from a
33         # waitpid() pump to a select() pump, creating a pipe to
34         # ourself, and then setting up a
35         # SIGCHILD handler to write a single byte to the pipe to get
36         # us out of select() when a subprocess exits.
37         proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
38         if hasattr(self, "async"):
39             self.async(proc, args, **kwargs)
40             return proc
41         stdout, stderr = proc.communicate()
42         self.log(stdout, stderr)
43         if proc.returncode:
44             if kwargs["python"]: eclass = PythonCallError
45             else: eclass = CallError
46             raise eclass(proc.returncode, args, stdout, stderr)
47         return (stdout, stderr)
48     def log(self, stdout, stderr):
49         if stdout:
50             logging.debug("STDOUT:\n" + stdout)
51         if stderr:
52             logging.debug("STDERR:\n" + stderr)
53     def callAsUser(self, *args, **kwargs):
54         user = kwargs.pop("user", None)
55         uid = kwargs.pop("uid", None)
56         kwargs.setdefault("python", is_python(args))
57         if not user and not uid: return self.call(*args, **kwargs)
58         if util.get_operator_name():
59             # This might be generalized as "preserve some environment"
60             args.insert(0, "SSH_GSSAPI_NAME=" + util.get_operator_name())
61         if uid: return self.call("sudo", "-u", "#" + str(uid), *args, **kwargs)
62         if user: return self.call("sudo", "-u", user, *args, **kwargs)
63
64 class ParallelShell(Shell):
65     """Commands are queued here, and executed in parallel (with
66     threading) in accordance with the maximum number of allowed
67     subprocesses, and result in callback execution when they finish."""
68     def __init__(self, dry = False, max = 10):
69         super(ParallelShell, self).__init__(dry=dry)
70         self.running = {}
71         self.max = max # maximum of commands to run in parallel
72     def async(self, proc, args, python, on_success, on_error):
73         """Gets handed a subprocess.Proc object from our deferred
74         execution"""
75         self.running[proc.pid] = (proc, args, python, on_success, on_error)
76     def wait(self):
77         # bail out immediately on initial ramp up
78         if len(self.running) < self.max: return
79         # now, wait for open pids.
80         try:
81             pid, status = os.waitpid(-1, 0)
82         except OSError as e:
83             if e.errno == errno.ECHILD: return
84             raise e
85         # ooh, zombie process. reap it
86         proc, args, python, on_success, on_error = self.running.pop(pid)
87         # XXX: this is slightly dangerous; should actually use
88         # temporary files
89         stdout = proc.stdout.read()
90         stderr = proc.stderr.read()
91         self.log(stdout, stderr)
92         if status:
93             if python: eclass = PythonCallError
94             else: eclass = CallError
95             on_error(eclass(proc.returncode, args, stdout, stderr))
96             return
97         on_success(stdout, stderr)
98     def join(self):
99         """Waits for all of our subprocesses to terminate."""
100         try:
101             while os.waitpid(-1, 0):
102                 pass
103         except OSError as e:
104             if e.errno == errno.ECHILD: return
105             raise e
106
107 class DummyParallelShell(ParallelShell):
108     """Same API as ParallelShell, but doesn't actually parallelize (by
109     using only one thread)"""
110     def __init__(self, dry = False):
111         super(DummyParallelShell, self).__init__(dry=dry, max=1)
112
113 class CallError(wizard.Error):
114     def __init__(self, code, args, stdout, stderr):
115         self.code = code
116         self.args = args
117         self.stdout = stdout
118         self.stderr = stderr
119     def __str__(self):
120         return "CallError [%d]" % self.code
121
122 class PythonCallError(CallError):
123     def __init__(self, code, args, stdout, stderr):
124         self.name = util.get_exception_name(stderr)
125         CallError.__init__(self, code, args, stdout, stderr)
126     def __str__(self):
127         return "PythonCallError [%s]" % self.name
128