6 import wizard as _wizard
7 from wizard import util
11 class CallError(_wizard.Error):
12 def __init__(self, code, args, stdout, stderr):
18 return "CallError [%d]" % self.code
20 class PythonCallError(CallError):
21 def __init__(self, code, args, stdout, stderr):
22 self.name = util.get_exception_name(stderr)
23 CallError.__init__(self, code, args, stdout, stderr)
25 return "PythonCallError [%s]" % self.name
28 return args[0] == "python" or args[0] == wizard
31 """An advanced shell, with the ability to do dry-run and log commands"""
32 def __init__(self, dry = False):
33 """ `dry` Don't run any commands, just print them"""
35 def call(self, *args, **kwargs):
36 kwargs.setdefault("python", None)
37 logging.info("Running `" + ' '.join(args) + "`")
40 if kwargs["python"] is None and is_python(args):
41 kwargs["python"] = True
42 # XXX: There is a possible problem here where we can fill up
43 # the kernel buffer if we have 64KB of data. This shouldn't
44 # be a problem, and the fix for such case would be to write to
45 # temporary files instead of a pipe.
46 # Another possible way of fixing this is converting from a
47 # waitpid() pump to a select() pump, creating a pipe to
48 # ourself, and then setting up a
49 # SIGCHILD handler to write a single byte to the pipe to get
50 # us out of select() when a subprocess exits.
51 proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
52 if hasattr(self, "async"):
53 self.async(proc, args, **kwargs)
55 stdout, stderr = proc.communicate()
56 self.log(stdout, stderr)
58 if kwargs["python"]: eclass = PythonCallError
59 else: eclass = CallError
60 raise eclass(proc.returncode, args, stdout, stderr)
61 return (stdout, stderr)
62 def log(self, stdout, stderr):
64 logging.debug("STDOUT:\n" + stdout)
66 logging.debug("STDERR:\n" + stderr)
67 def callAsUser(self, *args, **kwargs):
68 user = kwargs.pop("user", None)
69 uid = kwargs.pop("uid", None)
70 kwargs.setdefault("python", is_python(args))
71 if not user and not uid: return self.call(*args, **kwargs)
72 if uid: return self.call("sudo", "-u", "#" + uid, *args, **kwargs)
73 if user: return self.call("sudo", "-u", user, *args, **kwargs)
75 class ParallelShell(Shell):
76 """Commands are queued here, and executed in parallel (with
77 threading) in accordance with the maximum number of allowed
78 subprocesses, and result in callback execution when they finish."""
79 def __init__(self, dry = False, max = 10):
80 super(ParallelShell, self).__init__(dry=dry)
82 self.max = max # maximum of commands to run in parallel
83 def async(self, proc, args, python, on_success, on_error):
84 """Gets handed a subprocess.Proc object from our deferred
86 self.running[proc.pid] = (proc, args, python, on_success, on_error)
88 # bail out immediately on initial ramp up
89 if len(self.running) < self.max: return
90 # now, wait for open pids.
92 pid, status = os.waitpid(-1, 0)
94 if e.errno == errno.ECHILD: return
96 # ooh, zombie process. reap it
97 proc, args, python, on_success, on_error = self.running.pop(pid)
98 # XXX: this is slightly dangerous; should actually use
100 stdout = proc.stdout.read()
101 stderr = proc.stderr.read()
102 self.log(stdout, stderr)
104 if python: eclass = PythonCallError
105 else: eclass = CallError
106 on_error(eclass(proc.returncode, args, stdout, stderr))
108 on_success(stdout, stderr)
110 """Waits for all of our subprocesses to terminate."""
112 while os.waitpid(-1, 0):
115 if e.errno == errno.ECHILD: return
118 class DummyParallelShell(ParallelShell):
119 """Same API as ParallelShell, but doesn't actually parallelize (by
120 using only one thread)"""
121 def __init__(self, dry = False):
122 super(DummyParallelShell, self).__init__(dry=dry, max=1)