summaryrefslogtreecommitdiff
path: root/lib/parallel.py
diff options
context:
space:
mode:
authorDavid James <davidjames@chromium.org>2013-11-01 11:45:36 -0700
committerchrome-internal-fetch <chrome-internal-fetch@google.com>2013-11-04 18:00:42 +0000
commitc14bbe19010475b2bd24d405b4894a5274371606 (patch)
tree8b8604221aa9c7408466ac85366122e63f158718 /lib/parallel.py
parent3e65992b26596dd8a02f3609458d113e8415d3f4 (diff)
downloadchromite-c14bbe19010475b2bd24d405b4894a5274371606.tar.gz
Mark SIGKILL exits as potentially flaky, and propagate them up.
Currently, we don't propagate up errors in the case where a SIGKILL exit was flaky. Do this. BUG=chromium:308196 TEST=New unit test. Change-Id: I49b2f1d0f0e0e0dc9641001f7dfa2bcc079389ac Reviewed-on: https://chromium-review.googlesource.com/175515 Reviewed-by: Mike Frysinger <vapier@chromium.org> Commit-Queue: David James <davidjames@chromium.org> Tested-by: David James <davidjames@chromium.org>
Diffstat (limited to 'lib/parallel.py')
-rw-r--r--lib/parallel.py32
1 files changed, 24 insertions, 8 deletions
diff --git a/lib/parallel.py b/lib/parallel.py
index ed13df4fd..a7056427c 100644
--- a/lib/parallel.py
+++ b/lib/parallel.py
@@ -155,12 +155,14 @@ class _BackgroundTask(multiprocessing.Process):
with open(self._output.name, 'r') as output:
pos = 0
running, exited_cleanly, msg, error = (True, False, None, None)
+ possibly_flaky = False
while running:
# Check whether the process is still alive.
running = self.is_alive()
try:
- error, results = self._queue.get(True, self.PRINT_INTERVAL)
+ error, results, possibly_flaky = \
+ self._queue.get(True, self.PRINT_INTERVAL)
running = False
exited_cleanly = True
except Queue.Empty:
@@ -174,8 +176,12 @@ class _BackgroundTask(multiprocessing.Process):
msg = '%r hung for %r seconds' % (self, self.EXIT_TIMEOUT)
self._KillChildren([self])
elif not exited_cleanly:
+ # Treat SIGKILL signals as potentially flaky.
+ if self.exitcode == -signal.SIGKILL:
+ possibly_flaky = True
msg = ('%r exited unexpectedly with code %s' %
- (self, self.EXIT_TIMEOUT))
+ (self, self.exitcode))
+
# Read output from process.
output.seek(pos)
buf = output.read(_BUFSIZE)
@@ -187,6 +193,9 @@ class _BackgroundTask(multiprocessing.Process):
(self, self.SILENT_TIMEOUT))
self._KillChildren([self])
+ # Timeouts are possibly flaky.
+ possibly_flaky = True
+
# Read remaining output from the process.
output.seek(pos)
buf = output.read(_BUFSIZE)
@@ -218,7 +227,7 @@ class _BackgroundTask(multiprocessing.Process):
self.Cleanup(silent=True)
# If a traceback occurred, return it.
- return error
+ return error, possibly_flaky
def start(self):
"""Invoke multiprocessing.Process.start after flushing output/err."""
@@ -238,13 +247,14 @@ class _BackgroundTask(multiprocessing.Process):
self._semaphore.acquire()
error = 'Unexpected exception in %r' % self
+ possibly_flaky = False
pid = os.getpid()
try:
- error = self._Run()
+ error, possibly_flaky = self._Run()
finally:
if not self._killing.is_set() and os.getpid() == pid:
results = results_lib.Results.Get()
- self._queue.put((error, results))
+ self._queue.put((error, results, possibly_flaky))
if self._semaphore is not None:
self._semaphore.release()
@@ -260,6 +270,7 @@ class _BackgroundTask(multiprocessing.Process):
sys.stdout.flush()
sys.stderr.flush()
+ possibly_flaky = False
# Send all output to a named temporary file.
with open(self._output.name, 'w', 0) as output:
# Back up sys.std{err,out}. These aren't used, but we keep a copy so
@@ -287,6 +298,7 @@ class _BackgroundTask(multiprocessing.Process):
self._task(*self._task_args, **self._task_kwargs)
except results_lib.StepFailure as ex:
error = str(ex)
+ possibly_flaky = ex.possibly_flaky
except BaseException as ex:
error = traceback.format_exc()
if self._killing.is_set():
@@ -295,7 +307,7 @@ class _BackgroundTask(multiprocessing.Process):
sys.stdout.flush()
sys.stderr.flush()
- return error
+ return error, possibly_flaky
@classmethod
def _KillChildren(cls, bg_tasks, log_level=logging.WARNING):
@@ -361,6 +373,7 @@ class _BackgroundTask(multiprocessing.Process):
"""
semaphore = None
+ possibly_flaky = False
if max_parallel is not None:
semaphore = multiprocessing.Semaphore(max_parallel)
@@ -376,10 +389,12 @@ class _BackgroundTask(multiprocessing.Process):
finally:
# Wait for each step to complete.
tracebacks = []
+ flaky_tasks = []
while bg_tasks:
task = bg_tasks.popleft()
- error = task.Wait()
+ error, possibly_flaky = task.Wait()
if error is not None:
+ flaky_tasks.append(possibly_flaky)
tracebacks.append(error)
if halt_on_error:
break
@@ -390,7 +405,8 @@ class _BackgroundTask(multiprocessing.Process):
# Propagate any exceptions.
if tracebacks:
- raise BackgroundFailure('\n' + ''.join(tracebacks))
+ possibly_flaky = flaky_tasks and all(flaky_tasks)
+ raise BackgroundFailure('\n' + ''.join(tracebacks), possibly_flaky)
@staticmethod
def TaskRunner(queue, task, onexit=None, task_args=None, task_kwargs=None):