7 import wizard as _wizard
8 from wizard import util
12 class CallError(_wizard.Error):
13 def __init__(self, code, args, stdout, stderr):
19 return "CallError [%d]" % self.code
21 class PythonCallError(CallError):
22 def __init__(self, code, args, stdout, stderr):
23 self.name = util.get_exception_name(stderr)
24 CallError.__init__(self, code, args, stdout, stderr)
26 return "PythonCallError [%s]" % self.name
29 return args[0] == "python" or args[0] == wizard
32 """An advanced shell, with the ability to do dry-run and log commands"""
33 def __init__(self, logger = False, dry = False):
34 """ `logger` The logger
35 `dry` Don't run any commands, just print them"""
38 def call(self, *args, **kwargs):
39 kwargs.setdefault("python", None)
40 if self.dry or self.logger:
41 self.logger.info("Running `" + ' '.join(args) + "`")
44 if kwargs["python"] is None and is_python(args):
45 kwargs["python"] = True
46 # XXX: There is a possible problem here where we can fill up
47 # the kernel buffer if we have 64KB of data. This shouldn't
48 # be a problem, and the fix for such case would be to write to
49 # temporary files instead of a pipe.
50 # Another possible way of fixing this is converting from a
51 # waitpid() pump to a select() pump, creating a pipe to
52 # ourself, and then setting up a
53 # SIGCHILD handler to write a single byte to the pipe to get
54 # us out of select() when a subprocess exits.
55 proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
56 if hasattr(self, "async"):
57 self.async(proc, args, **kwargs)
59 stdout, stderr = proc.communicate()
60 self.log(stdout, stderr)
62 if kwargs["python"]: eclass = PythonCallError
63 else: eclass = CallError
64 raise eclass(proc.returncode, args, stdout, stderr)
65 return (stdout, stderr)
66 def log(self, stdout, stderr):
67 if self.logger and stdout:
68 self.logger.debug("STDOUT:\n" + stdout)
69 if self.logger and stderr:
70 self.logger.debug("STDERR:\n" + stderr)
71 def callAsUser(self, *args, **kwargs):
72 user = kwargs.pop("user", None)
73 uid = kwargs.pop("uid", None)
74 kwargs.setdefault("python", is_python(args))
75 if not user and not uid: return self.call(*args, **kwargs)
76 if uid: return self.call("sudo", "-u", "#" + uid, *args, **kwargs)
77 if user: return self.call("sudo", "-u", user, *args, **kwargs)
79 class ParallelShell(Shell):
80 """Commands are queued here, and executed in parallel (with
81 threading) in accordance with the maximum number of allowed
82 subprocesses, and result in callback execution when they finish."""
83 def __init__(self, logger = False, dry = False, max = 10):
84 super(ParallelShell, self).__init__(logger=logger,dry=dry)
86 self.max = max # maximum of commands to run in parallel
87 def async(self, proc, args, python, on_success, on_error):
88 """Gets handed a subprocess.Proc object from our deferred
90 self.running[proc.pid] = (proc, args, python, on_success, on_error)
92 # bail out immediately on initial ramp up
93 if len(self.running) < self.max: return
94 # now, wait for open pids.
96 pid, status = os.waitpid(-1, 0)
98 if e.errno == errno.ECHILD: return
100 # ooh, zombie process. reap it
101 proc, args, python, on_success, on_error = self.running.pop(pid)
102 # XXX: this is slightly dangerous; should actually use
104 stdout = proc.stdout.read()
105 stderr = proc.stderr.read()
106 self.log(stdout, stderr)
108 if python: eclass = PythonCallError
109 else: eclass = CallError
110 on_error(eclass(proc.returncode, args, stdout, stderr))
112 on_success(stdout, stderr)
114 """Waits for all of our subprocesses to terminate."""
116 while os.waitpid(-1, 0):
119 if e.errno == errno.ECHILD: return
122 class DummyParallelShell(ParallelShell):
123 """Same API as ParallelShell, but doesn't actually parallelize (by
124 using only one thread)"""
125 def __init__(self, logger = False, dry = False):
126 super(DummyParallelShell, self).__init__(logger, dry, max=1)