1 | """ |
---|
2 | Wrappers around subprocess functionality that simulate an actual shell. |
---|
3 | """ |
---|
4 | |
---|
5 | import subprocess |
---|
6 | import logging |
---|
7 | import sys |
---|
8 | import os |
---|
9 | import errno |
---|
10 | |
---|
11 | class Shell(object): |
---|
12 | """ |
---|
13 | An advanced shell that performs logging. If ``dry`` is ``True``, |
---|
14 | no commands are actually run. |
---|
15 | """ |
---|
16 | def __init__(self, dry = False): |
---|
17 | self.dry = dry |
---|
18 | self.cwd = None |
---|
19 | def call(self, *args, **kwargs): |
---|
20 | """ |
---|
21 | Performs a system call. The actual executable and options should |
---|
22 | be passed as arguments to this function. Several keyword arguments |
---|
23 | are also supported: |
---|
24 | |
---|
25 | :param input: input to feed the subprocess on standard input. |
---|
26 | :param interactive: whether or not directly hook up all pipes |
---|
27 | to the controlling terminal, to allow interaction with subprocess. |
---|
28 | :param strip: if ``True``, instead of returning a tuple, |
---|
29 | return the string stdout output of the command with trailing newlines |
---|
30 | removed. This emulates the behavior of backticks and ``$()`` in Bash. |
---|
31 | Prefer to use :meth:`eval` instead (you should only need to explicitly |
---|
32 | specify this if you are using another wrapper around this function). |
---|
33 | :param log: if True, we log the call as INFO, if False, we log the call |
---|
34 | as DEBUG, otherwise, we detect based on ``strip``. |
---|
35 | :param stdout: |
---|
36 | :param stderr: |
---|
37 | :param stdin: a file-type object that will be written to or read from as a pipe. |
---|
38 | :returns: a tuple of strings ``(stdout, stderr)``, or a string ``stdout`` |
---|
39 | if ``strip`` is specified. |
---|
40 | |
---|
41 | >>> sh = Shell() |
---|
42 | >>> sh.call("echo", "Foobar") |
---|
43 | ('Foobar\\n', '') |
---|
44 | >>> sh.call("cat", input='Foobar') |
---|
45 | ('Foobar', '') |
---|
46 | """ |
---|
47 | self._wait() |
---|
48 | kwargs.setdefault("interactive", False) |
---|
49 | kwargs.setdefault("strip", False) |
---|
50 | kwargs.setdefault("python", None) |
---|
51 | kwargs.setdefault("log", None) |
---|
52 | kwargs.setdefault("stdout", subprocess.PIPE) |
---|
53 | kwargs.setdefault("stdin", subprocess.PIPE) |
---|
54 | kwargs.setdefault("stderr", subprocess.PIPE) |
---|
55 | msg = "Running `" + ' '.join(args) + "`" |
---|
56 | if kwargs["strip"] and not kwargs["log"] is True or kwargs["log"] is False: |
---|
57 | logging.debug(msg) |
---|
58 | else: |
---|
59 | logging.info(msg) |
---|
60 | if self.dry: |
---|
61 | if kwargs["strip"]: |
---|
62 | return '' |
---|
63 | return None, None |
---|
64 | kwargs.setdefault("input", None) |
---|
65 | if kwargs["interactive"]: |
---|
66 | stdout=sys.stdout |
---|
67 | stdin=sys.stdin |
---|
68 | stderr=sys.stderr |
---|
69 | else: |
---|
70 | stdout=kwargs["stdout"] |
---|
71 | stdin=kwargs["stdin"] |
---|
72 | stderr=kwargs["stderr"] |
---|
73 | # XXX: There is a possible problem here where we can fill up |
---|
74 | # the kernel buffer if we have 64KB of data. This shouldn't |
---|
75 | # be a problem, and the fix for such case would be to write to |
---|
76 | # temporary files instead of a pipe. |
---|
77 | # Another possible way of fixing this is converting from a |
---|
78 | # waitpid() pump to a select() pump, creating a pipe to |
---|
79 | # ourself, and then setting up a |
---|
80 | # SIGCHILD handler to write a single byte to the pipe to get |
---|
81 | # us out of select() when a subprocess exits. |
---|
82 | proc = subprocess.Popen(args, stdout=stdout, stderr=stderr, stdin=stdin, cwd=self.cwd, ) |
---|
83 | if self._async(proc, args, **kwargs): |
---|
84 | return proc |
---|
85 | stdout, stderr = proc.communicate(kwargs["input"]) |
---|
86 | # can occur if we were doing interactive communication; i.e. |
---|
87 | # we didn't pass in PIPE. |
---|
88 | if stdout is None: |
---|
89 | stdout = "" |
---|
90 | if stderr is None: |
---|
91 | stderr = "" |
---|
92 | if not kwargs["interactive"]: |
---|
93 | if kwargs["strip"]: |
---|
94 | self._log(None, stderr) |
---|
95 | else: |
---|
96 | self._log(stdout, stderr) |
---|
97 | if proc.returncode: |
---|
98 | raise CallError(proc.returncode, args, stdout, stderr) |
---|
99 | if kwargs["strip"]: |
---|
100 | return str(stdout).rstrip("\n") |
---|
101 | return (stdout, stderr) |
---|
102 | def _log(self, stdout, stderr): |
---|
103 | """Logs the standard output and standard input from a command.""" |
---|
104 | if stdout: |
---|
105 | logging.debug("STDOUT:\n" + stdout) |
---|
106 | if stderr: |
---|
107 | logging.debug("STDERR:\n" + stderr) |
---|
108 | def _wait(self): |
---|
109 | pass |
---|
110 | def _async(self, *args, **kwargs): |
---|
111 | return False |
---|
112 | def callAsUser(self, *args, **kwargs): |
---|
113 | """ |
---|
114 | Performs a system call as a different user. This is only possible |
---|
115 | if you are running as root. Keyword arguments |
---|
116 | are the same as :meth:`call` with the following additions: |
---|
117 | |
---|
118 | :param user: name of the user to run command as. |
---|
119 | :param uid: uid of the user to run command as. |
---|
120 | |
---|
121 | .. note:: |
---|
122 | |
---|
123 | The resulting system call internally uses :command:`sudo`, |
---|
124 | and as such environment variables will get scrubbed. We |
---|
125 | manually preserve :envvar:`SSH_GSSAPI_NAME`. |
---|
126 | """ |
---|
127 | user = kwargs.pop("user", None) |
---|
128 | uid = kwargs.pop("uid", None) |
---|
129 | if not user and not uid: return self.call(*args, **kwargs) |
---|
130 | if os.getenv("SSH_GSSAPI_NAME"): |
---|
131 | # This might be generalized as "preserve some environment" |
---|
132 | args = list(args) |
---|
133 | args.insert(0, "SSH_GSSAPI_NAME=" + os.getenv("SSH_GSSAPI_NAME")) |
---|
134 | if uid: return self.call("sudo", "-u", "#" + str(uid), *args, **kwargs) |
---|
135 | if user: return self.call("sudo", "-u", user, *args, **kwargs) |
---|
136 | def safeCall(self, *args, **kwargs): |
---|
137 | """ |
---|
138 | Checks if the owner of the current working directory is the same |
---|
139 | as the current user, and if it isn't, attempts to sudo to be |
---|
140 | that user. The intended use case is for calling Git commands |
---|
141 | when running as root, but this method should be used when |
---|
142 | interfacing with any moderately complex program that depends |
---|
143 | on working directory context. Keyword arguments are the |
---|
144 | same as :meth:`call`. |
---|
145 | """ |
---|
146 | if os.getuid(): |
---|
147 | return self.call(*args, **kwargs) |
---|
148 | uid = os.stat(os.getcwd()).st_uid |
---|
149 | # consider also checking ruid? |
---|
150 | if uid != os.geteuid(): |
---|
151 | kwargs['uid'] = uid |
---|
152 | return self.callAsUser(*args, **kwargs) |
---|
153 | else: |
---|
154 | return self.call(*args, **kwargs) |
---|
155 | def eval(self, *args, **kwargs): |
---|
156 | """ |
---|
157 | Evaluates a command and returns its output, with trailing newlines |
---|
158 | stripped (like backticks in Bash). This is a convenience method for |
---|
159 | calling :meth:`call` with ``strip``. |
---|
160 | |
---|
161 | >>> sh = Shell() |
---|
162 | >>> sh.eval("echo", "Foobar") |
---|
163 | 'Foobar' |
---|
164 | """ |
---|
165 | kwargs["strip"] = True |
---|
166 | return self.call(*args, **kwargs) |
---|
167 | def setcwd(self, cwd): |
---|
168 | """ |
---|
169 | Sets the directory processes are executed in. This sets a value |
---|
170 | to be passed as the ``cwd`` argument to ``subprocess.Popen``. |
---|
171 | """ |
---|
172 | self.cwd = cwd |
---|
173 | |
---|
174 | class ParallelShell(Shell): |
---|
175 | """ |
---|
176 | Modifies the semantics of :class:`Shell` so that |
---|
177 | commands are queued here, and executed in parallel using waitpid |
---|
178 | with ``max`` subprocesses, and result in callback execution |
---|
179 | when they finish. |
---|
180 | |
---|
181 | .. method:: call(*args, **kwargs) |
---|
182 | |
---|
183 | Enqueues a system call for parallel processing. If there are |
---|
184 | no openings in the queue, this will block. Keyword arguments |
---|
185 | are the same as :meth:`Shell.call` with the following additions: |
---|
186 | |
---|
187 | :param on_success: Callback function for success (zero exit status). |
---|
188 | The callback function should accept two arguments, |
---|
189 | ``stdout`` and ``stderr``. |
---|
190 | :param on_error: Callback function for failure (nonzero exit status). |
---|
191 | The callback function should accept one argument, the |
---|
192 | exception that would have been thrown by the synchronous |
---|
193 | version. |
---|
194 | :return: The :class:`subprocess.Proc` object that was opened. |
---|
195 | |
---|
196 | .. method:: callAsUser(*args, **kwargs) |
---|
197 | |
---|
198 | Enqueues a system call under a different user for parallel |
---|
199 | processing. Keyword arguments are the same as |
---|
200 | :meth:`Shell.callAsUser` with the additions of keyword |
---|
201 | arguments from :meth:`call`. |
---|
202 | |
---|
203 | .. method:: safeCall(*args, **kwargs) |
---|
204 | |
---|
205 | Enqueues a "safe" call for parallel processing. Keyword |
---|
206 | arguments are the same as :meth:`Shell.safeCall` with the |
---|
207 | additions of keyword arguments from :meth:`call`. |
---|
208 | |
---|
209 | .. method:: eval(*args, **kwargs) |
---|
210 | |
---|
211 | No difference from :meth:`call`. Consider having a |
---|
212 | non-parallel shell if the program you are shelling out |
---|
213 | to is fast. |
---|
214 | |
---|
215 | """ |
---|
216 | def __init__(self, dry = False, max = 10): |
---|
217 | super(ParallelShell, self).__init__(dry=dry) |
---|
218 | self.running = {} |
---|
219 | self.max = max # maximum of commands to run in parallel |
---|
220 | @staticmethod |
---|
221 | def make(no_parallelize, max): |
---|
222 | """Convenience method oriented towards command modules.""" |
---|
223 | if no_parallelize: |
---|
224 | return DummyParallelShell() |
---|
225 | else: |
---|
226 | return ParallelShell(max=max) |
---|
227 | def _async(self, proc, args, python, on_success, on_error, **kwargs): |
---|
228 | """ |
---|
229 | Gets handed a :class:`subprocess.Proc` object from our deferred |
---|
230 | execution. See :meth:`Shell.call` source code for details. |
---|
231 | """ |
---|
232 | self.running[proc.pid] = (proc, args, python, on_success, on_error) |
---|
233 | return True # so that the parent function returns |
---|
234 | def _wait(self): |
---|
235 | """ |
---|
236 | Blocking call that waits for an open subprocess slot. This is |
---|
237 | automatically called by :meth:`Shell.call`. |
---|
238 | """ |
---|
239 | # XXX: This API sucks; the actual call/callAsUser call should |
---|
240 | # probably block automatically (unless I have a good reason not to) |
---|
241 | # bail out immediately on initial ramp up |
---|
242 | if len(self.running) < self.max: return |
---|
243 | # now, wait for open pids. |
---|
244 | try: |
---|
245 | self.reap(*os.waitpid(-1, 0)) |
---|
246 | except OSError as e: |
---|
247 | if e.errno == errno.ECHILD: return |
---|
248 | raise |
---|
249 | def join(self): |
---|
250 | """Waits for all of our subprocesses to terminate.""" |
---|
251 | try: |
---|
252 | while True: |
---|
253 | self.reap(*os.waitpid(-1, 0)) |
---|
254 | except OSError as e: |
---|
255 | if e.errno == errno.ECHILD: return |
---|
256 | raise |
---|
257 | def reap(self, pid, status): |
---|
258 | """Reaps a process.""" |
---|
259 | # ooh, zombie process. reap it |
---|
260 | proc, args, python, on_success, on_error = self.running.pop(pid) |
---|
261 | # XXX: this is slightly dangerous; should actually use |
---|
262 | # temporary files |
---|
263 | stdout = proc.stdout.read() |
---|
264 | stderr = proc.stderr.read() |
---|
265 | self._log(stdout, stderr) |
---|
266 | if status: |
---|
267 | on_error(CallError(proc.returncode, args, stdout, stderr)) |
---|
268 | return |
---|
269 | on_success(stdout, stderr) |
---|
270 | |
---|
271 | # Setup a convenience global instance |
---|
272 | shell = Shell() |
---|
273 | call = shell.call |
---|
274 | callAsUser = shell.callAsUser |
---|
275 | safeCall = shell.safeCall |
---|
276 | eval = shell.eval |
---|
277 | |
---|
278 | class DummyParallelShell(ParallelShell): |
---|
279 | """Same API as :class:`ParallelShell`, but doesn't actually |
---|
280 | parallelize (i.e. all calls to :meth:`wait` block.)""" |
---|
281 | def __init__(self, dry = False): |
---|
282 | super(DummyParallelShell, self).__init__(dry=dry, max=1) |
---|
283 | |
---|
284 | class CallError: |
---|
285 | """Indicates that a subprocess call returned a nonzero exit status.""" |
---|
286 | #: The exit code of the failed subprocess. |
---|
287 | code = None |
---|
288 | #: List of the program and arguments that failed. |
---|
289 | args = None |
---|
290 | #: The stdout of the program. |
---|
291 | stdout = None |
---|
292 | #: The stderr of the program. |
---|
293 | stderr = None |
---|
294 | def __init__(self, code, args, stdout, stderr): |
---|
295 | self.code = code |
---|
296 | self.args = args |
---|
297 | self.stdout = stdout |
---|
298 | self.stderr = stderr |
---|
299 | def __str__(self): |
---|
300 | compact = self.stderr.rstrip().split("\n")[-1] |
---|
301 | return "%s (exited with %d)\n%s" % (compact, self.code, self.stderr) |
---|