- def __init__(self, logger = False, dry = False, max_threads = 10):
- self.logger = logger
- self.dry = dry
- self.threads = []
- # queue of call tuples (method name, args and kwargs) to
- # be threaded
- self.queue = Queue.Queue()
- # build our threads
- for n in range(max_threads):
- self.threads.append(ShellThread(self.queue, logger=logger, dry=dry))
- def __getattr__(self, name):
- # override call and callAsUser (and possibly others)
- def thunk(*args, **kwargs):
- on_success = kwargs.pop("on_success")
- on_error = kwargs.pop("on_error")
- self.queue.put((name, args, kwargs, on_success, on_error))
- return thunk
- def start(self):
- for thread in self.threads:
- thread.start()
+ def __init__(self, logger = False, dry = False, max = 10):
+ super(ParallelShell, self).__init__(logger=logger,dry=dry)
+ self.running = {}
+ self.max = max # maximum of commands to run in parallel
+ def async(self, proc, args, python, on_success, on_error):
+ """Gets handed a subprocess.Proc object from our deferred
+ execution"""
+ self.running[proc.pid] = (proc, args, python, on_success, on_error)
+ def wait(self):
+ # bail out immediately on initial ramp up
+ if len(self.running) < self.max: return
+ # now, wait for open pids.
+ try:
+ pid, status = os.waitpid(-1, 0)
+ except OSError as e:
+ return
+ # ooh, zombie process. reap it
+ proc, args, python, on_success, on_error = self.running.pop(pid)
+ # XXX: this is slightly dangerous; should actually use
+ # temporary files
+ stdout = proc.stdout.read()
+ stderr = proc.stderr.read()
+ self.log(stdout, stderr)
+ if status:
+ if python: eclass = PythonCallError
+ else: eclass = CallError
+ on_error(eclass(proc.returncode, args, stdout, stderr))
+ return
+ on_success(stdout, stderr)