]> scripts.mit.edu Git - wizard.git/blob - lib/wizard/shell.py
Rewrite ParallelShell to use waitpid() instead of threads.
[wizard.git] / lib / wizard / shell.py
1 import subprocess
2 import sys
3 import os
4 import Queue
5 import threading
6
7 import wizard as _wizard
8 from wizard import util
9
10 wizard = sys.argv[0]
11
12 class CallError(_wizard.Error):
13     def __init__(self, code, args, stdout, stderr):
14         self.code = code
15         self.args = args
16         self.stdout = stdout
17         self.stderr = stderr
18     def __str__(self):
19         return "CallError [%d]" % self.code
20
21 class PythonCallError(CallError):
22     def __init__(self, code, args, stdout, stderr):
23         self.name = util.get_exception_name(stderr)
24         CallError.__init__(self, code, args, stdout, stderr)
25     def __str__(self):
26         return "PythonCallError [%s]" % self.name
27
28 def is_python(args):
29     return args[0] == "python" or args[0] == wizard
30
31 class Shell(object):
32     """An advanced shell, with the ability to do dry-run and log commands"""
33     def __init__(self, logger = False, dry = False):
34         """ `logger`    The logger
35             `dry`       Don't run any commands, just print them"""
36         self.logger = logger
37         self.dry = dry
38     def call(self, *args, **kwargs):
39         kwargs.setdefault("python", None)
40         if self.dry or self.logger:
41             self.logger.info("$ " + ' '.join(args))
42         if self.dry:
43             return
44         if kwargs["python"] is None and is_python(args):
45             kwargs["python"] = True
46         proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
47         if hasattr(self, "async"):
48             self.async(proc, args, **kwargs)
49             return proc
50         stdout, stderr = proc.communicate()
51         self.log(stdout, stderr)
52         if proc.returncode:
53             if kwargs["python"]: eclass = PythonCallError
54             else: eclass = CallError
55             raise eclass(proc.returncode, args, stdout, stderr)
56         return (stdout, stderr)
57     def log(self, stdout, stderr):
58         if self.logger and stdout: self.logger.info(stdout)
59         if self.logger and stderr: self.logger.info("STDERR: " + stderr)
60     def callAsUser(self, *args, **kwargs):
61         user = kwargs.pop("user", None)
62         kwargs.setdefault("python", is_python(args))
63         if not user: return self.call(*args, **kwargs)
64         return self.call("sudo", "-u", user, *args, **kwargs)
65
66 class ParallelShell(Shell):
67     """Commands are queued here, and executed in parallel (with
68     threading) in accordance with the maximum number of allowed
69     subprocesses, and result in callback execution when they finish."""
70     def __init__(self, logger = False, dry = False, max = 10):
71         super(ParallelShell, self).__init__(logger=logger,dry=dry)
72         self.running = {}
73         self.max = max # maximum of commands to run in parallel
74     def async(self, proc, args, python, on_success, on_error):
75         """Gets handed a subprocess.Proc object from our deferred
76         execution"""
77         self.running[proc.pid] = (proc, args, python, on_success, on_error)
78     def wait(self):
79         # bail out immediately on initial ramp up
80         if len(self.running) < self.max: return
81         # now, wait for open pids.
82         try:
83             pid, status = os.waitpid(-1, 0)
84         except OSError as e:
85             return
86         # ooh, zombie process. reap it
87         proc, args, python, on_success, on_error = self.running.pop(pid)
88         # XXX: this is slightly dangerous; should actually use
89         # temporary files
90         stdout = proc.stdout.read()
91         stderr = proc.stderr.read()
92         self.log(stdout, stderr)
93         if status:
94             if python: eclass = PythonCallError
95             else: eclass = CallError
96             on_error(eclass(proc.returncode, args, stdout, stderr))
97             return
98         on_success(stdout, stderr)
99     def join(self):
100         """Waits for all of our subprocesses to terminate."""
101         try:
102             while os.waitpid(-1, 0):
103                 pass
104         except OSError as e:
105             return
106
107 class DummyParallelShell(ParallelShell):
108     """Same API as ParallelShell, but doesn't actually parallelize (by
109     using only one thread)"""
110     def __init__(self, logger = False, dry = False):
111         super(DummyParallelShell, self).__init__(logger, dry, max=1)
112