diff options
author | David James <davidjames@chromium.org> | 2013-11-01 11:45:36 -0700 |
---|---|---|
committer | chrome-internal-fetch <chrome-internal-fetch@google.com> | 2013-11-04 18:00:42 +0000 |
commit | c14bbe19010475b2bd24d405b4894a5274371606 (patch) | |
tree | 8b8604221aa9c7408466ac85366122e63f158718 /lib/parallel.py | |
parent | 3e65992b26596dd8a02f3609458d113e8415d3f4 (diff) | |
download | chromite-c14bbe19010475b2bd24d405b4894a5274371606.tar.gz |
Mark SIGKILL exits as potentially flaky, and propagate them up.
Currently, we don't propagate up errors in the case where a
SIGKILL exit was flaky. Do this.
BUG=chromium:308196
TEST=New unit test.
Change-Id: I49b2f1d0f0e0e0dc9641001f7dfa2bcc079389ac
Reviewed-on: https://chromium-review.googlesource.com/175515
Reviewed-by: Mike Frysinger <vapier@chromium.org>
Commit-Queue: David James <davidjames@chromium.org>
Tested-by: David James <davidjames@chromium.org>
Diffstat (limited to 'lib/parallel.py')
-rw-r--r-- | lib/parallel.py | 32 |
1 files changed, 24 insertions, 8 deletions
diff --git a/lib/parallel.py b/lib/parallel.py index ed13df4fd..a7056427c 100644 --- a/lib/parallel.py +++ b/lib/parallel.py @@ -155,12 +155,14 @@ class _BackgroundTask(multiprocessing.Process): with open(self._output.name, 'r') as output: pos = 0 running, exited_cleanly, msg, error = (True, False, None, None) + possibly_flaky = False while running: # Check whether the process is still alive. running = self.is_alive() try: - error, results = self._queue.get(True, self.PRINT_INTERVAL) + error, results, possibly_flaky = \ + self._queue.get(True, self.PRINT_INTERVAL) running = False exited_cleanly = True except Queue.Empty: @@ -174,8 +176,12 @@ class _BackgroundTask(multiprocessing.Process): msg = '%r hung for %r seconds' % (self, self.EXIT_TIMEOUT) self._KillChildren([self]) elif not exited_cleanly: + # Treat SIGKILL signals as potentially flaky. + if self.exitcode == -signal.SIGKILL: + possibly_flaky = True msg = ('%r exited unexpectedly with code %s' % - (self, self.EXIT_TIMEOUT)) + (self, self.exitcode)) + # Read output from process. output.seek(pos) buf = output.read(_BUFSIZE) @@ -187,6 +193,9 @@ class _BackgroundTask(multiprocessing.Process): (self, self.SILENT_TIMEOUT)) self._KillChildren([self]) + # Timeouts are possibly flaky. + possibly_flaky = True + # Read remaining output from the process. output.seek(pos) buf = output.read(_BUFSIZE) @@ -218,7 +227,7 @@ class _BackgroundTask(multiprocessing.Process): self.Cleanup(silent=True) # If a traceback occurred, return it. - return error + return error, possibly_flaky def start(self): """Invoke multiprocessing.Process.start after flushing output/err.""" @@ -238,13 +247,14 @@ class _BackgroundTask(multiprocessing.Process): self._semaphore.acquire() error = 'Unexpected exception in %r' % self + possibly_flaky = False pid = os.getpid() try: - error = self._Run() + error, possibly_flaky = self._Run() finally: if not self._killing.is_set() and os.getpid() == pid: results = results_lib.Results.Get() - self._queue.put((error, results)) + self._queue.put((error, results, possibly_flaky)) if self._semaphore is not None: self._semaphore.release() @@ -260,6 +270,7 @@ class _BackgroundTask(multiprocessing.Process): sys.stdout.flush() sys.stderr.flush() + possibly_flaky = False # Send all output to a named temporary file. with open(self._output.name, 'w', 0) as output: # Back up sys.std{err,out}. These aren't used, but we keep a copy so @@ -287,6 +298,7 @@ class _BackgroundTask(multiprocessing.Process): self._task(*self._task_args, **self._task_kwargs) except results_lib.StepFailure as ex: error = str(ex) + possibly_flaky = ex.possibly_flaky except BaseException as ex: error = traceback.format_exc() if self._killing.is_set(): @@ -295,7 +307,7 @@ class _BackgroundTask(multiprocessing.Process): sys.stdout.flush() sys.stderr.flush() - return error + return error, possibly_flaky @classmethod def _KillChildren(cls, bg_tasks, log_level=logging.WARNING): @@ -361,6 +373,7 @@ class _BackgroundTask(multiprocessing.Process): """ semaphore = None + possibly_flaky = False if max_parallel is not None: semaphore = multiprocessing.Semaphore(max_parallel) @@ -376,10 +389,12 @@ class _BackgroundTask(multiprocessing.Process): finally: # Wait for each step to complete. tracebacks = [] + flaky_tasks = [] while bg_tasks: task = bg_tasks.popleft() - error = task.Wait() + error, possibly_flaky = task.Wait() if error is not None: + flaky_tasks.append(possibly_flaky) tracebacks.append(error) if halt_on_error: break @@ -390,7 +405,8 @@ class _BackgroundTask(multiprocessing.Process): # Propagate any exceptions. if tracebacks: - raise BackgroundFailure('\n' + ''.join(tracebacks)) + possibly_flaky = flaky_tasks and all(flaky_tasks) + raise BackgroundFailure('\n' + ''.join(tracebacks), possibly_flaky) @staticmethod def TaskRunner(queue, task, onexit=None, task_args=None, task_kwargs=None): |