summaryrefslogtreecommitdiff
path: root/lib/parallel.py
blob: bdb4b13334a62d36d8f1f6664d1df491ea6e5bfc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Module for running cbuildbot stages in the background."""

import collections
import contextlib
import errno
import functools
import multiprocessing
import os
import Queue
import signal
import sys
import tempfile
import traceback

from chromite.buildbot import cbuildbot_results as results_lib

_PRINT_INTERVAL = 1
_BUFSIZE = 1024


class BackgroundFailure(results_lib.StepFailure):
  pass


class _BackgroundSteps(multiprocessing.Process):
  """Run a list of functions in sequence in the background.

  These functions may be the 'Run' functions from buildbot stages or just plain
  functions. They will be run in the background. Output from these functions
  is saved to a temporary file and is printed when the 'WaitForStep' function
  is called.
  """

  def __init__(self, semaphore=None):
    """Create a new _BackgroundSteps object.

    If semaphore is supplied, it will be acquired for the duration of the
    steps that are run in the background. This can be used to limit the
    number of simultaneous parallel tasks.
    """
    multiprocessing.Process.__init__(self)
    self._steps = collections.deque()
    self._queue = multiprocessing.Queue()
    self._semaphore = semaphore
    self._started = multiprocessing.Event()

  def AddStep(self, step):
    """Add a step to the list of steps to run in the background."""
    output = tempfile.NamedTemporaryFile(delete=False, bufsize=0)
    self._steps.append((step, output))

  def Kill(self):
    """Kill a running task."""
    self._started.wait()
    # Kill the children nicely with a KeyboardInterrupt.
    try:
      os.kill(self.pid, signal.SIGINT)
    except OSError as ex:
      if ex.errno != errno.ESRCH:
        raise

  def WaitForStep(self):
    """Wait for the next step to complete.

    Output from the step is printed as the step runs.

    If an exception occurs, return a string containing the traceback.
    """
    assert not self.Empty()
    _step, output = self._steps.popleft()

    # Flush stdout and stderr to be sure no output is interleaved.
    sys.stdout.flush()
    sys.stderr.flush()

    # File position pointers are shared across processes, so we must open
    # our own file descriptor to ensure output is not lost.
    output_name = output.name
    with open(output_name, 'r') as output:
      os.unlink(output_name)
      pos = 0
      more_output = True
      while more_output:
        # Check whether the process is finished.
        try:
          error, results = self._queue.get(True, _PRINT_INTERVAL)
          more_output = False
        except Queue.Empty:
          more_output = True

        # Print output so far.
        output.seek(pos)
        buf = output.read(_BUFSIZE)
        while len(buf) > 0:
          sys.stdout.write(buf)
          pos += len(buf)
          if len(buf) < _BUFSIZE:
            break
          buf = output.read(_BUFSIZE)
        sys.stdout.flush()

    # Propagate any results.
    for result in results:
      results_lib.Results.Record(*result)

    # If a traceback occurred, return it.
    return error

  def Empty(self):
    """Return True if there are any steps left to run."""
    return len(self._steps) == 0

  def start(self):
    """Invoke multiprocessing.Process.start after flushing output/err."""
    sys.stdout.flush()
    sys.stderr.flush()
    return multiprocessing.Process.start(self)

  def run(self):
    """Run the list of steps."""
    if self._semaphore is not None:
      self._semaphore.acquire()
    try:
      self._RunSteps()
    finally:
      if self._semaphore is not None:
        self._semaphore.release()

  def _RunSteps(self):
    """Internal method for running the list of steps."""

    # The default handler for SIGINT sometimes forgets to actually raise the
    # exception (and we can reproduce this using unit tests), so we define a
    # custom one instead.
    def kill_us(_sig_num, _frame):
      raise KeyboardInterrupt('SIGINT received')
    signal.signal(signal.SIGINT, kill_us)

    sys.stdout.flush()
    sys.stderr.flush()
    orig_stdout, orig_stderr = sys.stdout, sys.stderr
    stdout_fileno = sys.__stdout__.fileno()
    stderr_fileno = sys.__stderr__.fileno()
    orig_stdout_fd, orig_stderr_fd = map(os.dup,
                                         [stdout_fileno, stderr_fileno])
    cancel = False
    while self._steps:
      step, output = self._steps.popleft()
      # Send all output to a named temporary file.
      os.dup2(output.fileno(), stdout_fileno)
      os.dup2(output.fileno(), stderr_fileno)
      # Replace std[out|err] with unbuffered file objects
      sys.stdout = os.fdopen(sys.__stdout__.fileno(), 'w', 0)
      sys.stderr = os.fdopen(sys.__stderr__.fileno(), 'w', 0)
      error = None
      try:
        results_lib.Results.Clear()
        self._started.set()
        if not cancel:
          step()
      except results_lib.StepFailure as ex:
        error = str(ex)
      except BaseException as ex:
        error = traceback.format_exc()
        # If it's a fatal exception, don't run any more steps.
        if isinstance(ex, (SystemExit, KeyboardInterrupt)):
          cancel = True

      sys.stdout.flush()
      sys.stderr.flush()
      output.close()
      sys.stdout, sys.stderr = orig_stdout, orig_stderr
      os.dup2(orig_stdout_fd, stdout_fileno)
      os.dup2(orig_stderr_fd, stderr_fileno)
      map(os.close, [orig_stdout_fd, orig_stderr_fd])
      results = results_lib.Results.Get()
      self._queue.put((error, results))


@contextlib.contextmanager
def _ParallelSteps(steps, max_parallel=None, halt_on_error=False):
  """Run a list of functions in parallel.

  This function launches the provided functions in the background, yields,
  and then waits for the functions to exit.

  The output from the functions is saved to a temporary file and printed as if
  they were run in sequence.

  If exceptions occur in the steps, we join together the tracebacks and print
  them after all parallel tasks have finished running. Further, a
  BackgroundFailure is raised with full stack traces of all exceptions.

  Args:
    steps: A list of functions to run.
    max_parallel: The maximum number of simultaneous tasks to run in parallel.
      By default, run all tasks in parallel.
    halt_on_error: After the first exception occurs, halt any running steps,
      and squelch any further output, including any exceptions that might occur.
  """

  semaphore = None
  if max_parallel is not None:
    semaphore = multiprocessing.Semaphore(max_parallel)

  # First, start all the steps.
  bg_steps = []
  for step in steps:
    bg = _BackgroundSteps(semaphore)
    bg.AddStep(step)
    bg.start()
    bg_steps.append(bg)

  try:
    yield
  finally:
    # Wait for each step to complete.
    tracebacks = []
    for bg in bg_steps:
      while not bg.Empty():
        if tracebacks and halt_on_error:
          bg.Kill()
          break
        else:
          error = bg.WaitForStep()
          if error is not None:
            tracebacks.append(error)
      bg.join()

    # Propagate any exceptions.
    if tracebacks:
      raise BackgroundFailure('\n' + ''.join(tracebacks))


def RunParallelSteps(steps, max_parallel=None, halt_on_error=False):
  """Run a list of functions in parallel.

  This function blocks until all steps are completed.

  The output from the functions is saved to a temporary file and printed as if
  they were run in sequence.

  If exceptions occur in the steps, we join together the tracebacks and print
  them after all parallel tasks have finished running. Further, a
  BackgroundFailure is raised with full stack traces of all exceptions.

  Args:
    steps: A list of functions to run.
    max_parallel: The maximum number of simultaneous tasks to run in parallel.
      By default, run all tasks in parallel.
    halt_on_error: After the first exception occurs, halt any running steps,
      and squelch any further output, including any exceptions that might occur.

  Example:
    # This snippet will execute in parallel:
    #   somefunc()
    #   anotherfunc()
    #   funcfunc()
    steps = [somefunc, anotherfunc, funcfunc]
    RunParallelSteps(steps)
    # Blocks until all calls have completed.
  """
  with _ParallelSteps(steps, max_parallel=max_parallel,
                      halt_on_error=halt_on_error):
    pass


class _AllTasksComplete(object):
  """Sentinel object to indicate that all tasks are complete."""


def _TaskRunner(queue, task, onexit=None):
  """Run task(*input) for each input in the queue.

  Returns when it encounters an _AllTasksComplete object on the queue.
  If exceptions occur, save them off and re-raise them as a
  BackgroundFailure once we've finished processing the items in the queue.

  Args:
    queue: A queue of tasks to run. Add tasks to this queue, and they will
      be run.
    task: Function to run on each queued input.
    onexit: Function to run after all inputs are processed.
  """
  tracebacks = []
  while True:
    # Wait for a new item to show up on the queue. This is a blocking wait,
    # so if there's nothing to do, we just sit here.
    x = queue.get()
    if isinstance(x, _AllTasksComplete):
      # All tasks are complete, so we should exit.
      break

    # If no tasks failed yet, process the remaining tasks.
    if not tracebacks:
      try:
        task(*x)
      except BaseException:
        tracebacks.append(traceback.format_exc())

  # Run exit handlers.
  if onexit:
    onexit()

  # Propagate any exceptions.
  if tracebacks:
    raise BackgroundFailure('\n' + ''.join(tracebacks))


@contextlib.contextmanager
def BackgroundTaskRunner(task, queue=None, processes=None, onexit=None):
  """Run the specified task on each queued input in a pool of processes.

  This context manager starts a set of workers in the background, who each
  wait for input on the specified queue. These workers run task(*input) for
  each input on the queue.

  The output from these tasks is saved to a temporary file. When control
  returns to the context manager, the background output is printed in order,
  as if the tasks were run in sequence.

  If exceptions occur in the steps, we join together the tracebacks and print
  them after all parallel tasks have finished running. Further, a
  BackgroundFailure is raised with full stack traces of all exceptions.

  Example:
    # This will run somefunc('small', 'cow') in the background
    # while "more random stuff" is being executed.

    def somefunc(arg1, arg2):
      ...
    ...
    with BackgroundTaskRunner(somefunc) as queue:
      ... do random stuff ...
      queue.put(['small', 'cow'])
      ... do more random stuff ...
    # Exiting the with statement will block until all calls have completed.

  Args:
    task: Function to run on each queued input.
    queue: A queue of tasks to run. Add tasks to this queue, and they will
      be run in the background.  If None, one will be created on the fly.
    processes: Number of processes to launch.
    onexit: Function to run in each background process after all inputs are
      processed.
  """

  if queue is None:
    queue = multiprocessing.Queue()

  if not processes:
    processes = multiprocessing.cpu_count()

  steps = [functools.partial(_TaskRunner, queue, task, onexit)] * processes
  with _ParallelSteps(steps):
    try:
      yield queue
    finally:
      for _ in xrange(processes):
        queue.put(_AllTasksComplete())


def RunTasksInProcessPool(task, inputs, processes=None, onexit=None):
  """Run the specified function with each supplied input in a pool of processes.

  This function runs task(*x) for x in inputs in a pool of processes. This
  function blocks until all tasks are completed.

  The output from these tasks is saved to a temporary file. When control
  returns to the context manager, the background output is printed in order,
  as if the tasks were run in sequence.

  If exceptions occur in the steps, we join together the tracebacks and print
  them after all parallel tasks have finished running. Further, a
  BackgroundFailure is raised with full stack traces of all exceptions.

  Example:
    # This snippet will execute in parallel:
    #   somefunc('hi', 'fat', 'code')
    #   somefunc('foo', 'bar', 'cow')

    def somefunc(arg1, arg2, arg3):
      ...
    ...
    inputs = [
      ['hi', 'fat', 'code'],
      ['foo', 'bar', 'cow'],
    ]
    RunTasksInProcessPool(somefunc, inputs)
    # Blocks until all calls have completed.

  Args:
    task: Function to run on each input.
    inputs: List of inputs.
    processes: Number of processes, at most, to launch.
    onexit: Function to run in each background process after all inputs are
      processed.
  """

  if not processes:
    processes = min(multiprocessing.cpu_count(), len(inputs))

  with BackgroundTaskRunner(task, processes=processes, onexit=onexit) as queue:
    for x in inputs:
      queue.put(x)