]> scripts.mit.edu Git - wizard.git/blob - wizard/shell.py
Massive refactoring for readability (thanks geofft!)
[wizard.git] / wizard / shell.py
1 import subprocess
2 import logging
3 import sys
4 import os
5
6 import wizard as _wizard
7 from wizard import util
8
9 wizard = sys.argv[0]
10
11 class CallError(_wizard.Error):
12     def __init__(self, code, args, stdout, stderr):
13         self.code = code
14         self.args = args
15         self.stdout = stdout
16         self.stderr = stderr
17     def __str__(self):
18         return "CallError [%d]" % self.code
19
20 class PythonCallError(CallError):
21     def __init__(self, code, args, stdout, stderr):
22         self.name = util.get_exception_name(stderr)
23         CallError.__init__(self, code, args, stdout, stderr)
24     def __str__(self):
25         return "PythonCallError [%s]" % self.name
26
27 def is_python(args):
28     return args[0] == "python" or args[0] == wizard
29
30 class Shell(object):
31     """An advanced shell, with the ability to do dry-run and log commands"""
32     def __init__(self, dry = False):
33         """ `dry`       Don't run any commands, just print them"""
34         self.dry = dry
35     def call(self, *args, **kwargs):
36         kwargs.setdefault("python", None)
37         logging.info("Running `" + ' '.join(args) + "`")
38         if self.dry:
39             return
40         if kwargs["python"] is None and is_python(args):
41             kwargs["python"] = True
42         # XXX: There is a possible problem here where we can fill up
43         # the kernel buffer if we have 64KB of data.  This shouldn't
44         # be a problem, and the fix for such case would be to write to
45         # temporary files instead of a pipe.
46         # Another possible way of fixing this is converting from a
47         # waitpid() pump to a select() pump, creating a pipe to
48         # ourself, and then setting up a
49         # SIGCHILD handler to write a single byte to the pipe to get
50         # us out of select() when a subprocess exits.
51         proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
52         if hasattr(self, "async"):
53             self.async(proc, args, **kwargs)
54             return proc
55         stdout, stderr = proc.communicate()
56         self.log(stdout, stderr)
57         if proc.returncode:
58             if kwargs["python"]: eclass = PythonCallError
59             else: eclass = CallError
60             raise eclass(proc.returncode, args, stdout, stderr)
61         return (stdout, stderr)
62     def log(self, stdout, stderr):
63         if stdout:
64             logging.debug("STDOUT:\n" + stdout)
65         if stderr:
66             logging.debug("STDERR:\n" + stderr)
67     def callAsUser(self, *args, **kwargs):
68         user = kwargs.pop("user", None)
69         uid = kwargs.pop("uid", None)
70         kwargs.setdefault("python", is_python(args))
71         if not user and not uid: return self.call(*args, **kwargs)
72         if uid: return self.call("sudo", "-u", "#" + uid, *args, **kwargs)
73         if user: return self.call("sudo", "-u", user, *args, **kwargs)
74
75 class ParallelShell(Shell):
76     """Commands are queued here, and executed in parallel (with
77     threading) in accordance with the maximum number of allowed
78     subprocesses, and result in callback execution when they finish."""
79     def __init__(self, dry = False, max = 10):
80         super(ParallelShell, self).__init__(dry=dry)
81         self.running = {}
82         self.max = max # maximum of commands to run in parallel
83     def async(self, proc, args, python, on_success, on_error):
84         """Gets handed a subprocess.Proc object from our deferred
85         execution"""
86         self.running[proc.pid] = (proc, args, python, on_success, on_error)
87     def wait(self):
88         # bail out immediately on initial ramp up
89         if len(self.running) < self.max: return
90         # now, wait for open pids.
91         try:
92             pid, status = os.waitpid(-1, 0)
93         except OSError as e:
94             if e.errno == errno.ECHILD: return
95             raise e
96         # ooh, zombie process. reap it
97         proc, args, python, on_success, on_error = self.running.pop(pid)
98         # XXX: this is slightly dangerous; should actually use
99         # temporary files
100         stdout = proc.stdout.read()
101         stderr = proc.stderr.read()
102         self.log(stdout, stderr)
103         if status:
104             if python: eclass = PythonCallError
105             else: eclass = CallError
106             on_error(eclass(proc.returncode, args, stdout, stderr))
107             return
108         on_success(stdout, stderr)
109     def join(self):
110         """Waits for all of our subprocesses to terminate."""
111         try:
112             while os.waitpid(-1, 0):
113                 pass
114         except OSError as e:
115             if e.errno == errno.ECHILD: return
116             raise e
117
118 class DummyParallelShell(ParallelShell):
119     """Same API as ParallelShell, but doesn't actually parallelize (by
120     using only one thread)"""
121     def __init__(self, dry = False):
122         super(DummyParallelShell, self).__init__(dry=dry, max=1)
123