From c14bbe19010475b2bd24d405b4894a5274371606 Mon Sep 17 00:00:00 2001 From: David James Date: Fri, 1 Nov 2013 11:45:36 -0700 Subject: Mark SIGKILL exits as potentially flaky, and propagate them up. Currently, we don't propagate up errors in the case where a SIGKILL exit was flaky. Do this. BUG=chromium:308196 TEST=New unit test. Change-Id: I49b2f1d0f0e0e0dc9641001f7dfa2bcc079389ac Reviewed-on: https://chromium-review.googlesource.com/175515 Reviewed-by: Mike Frysinger Commit-Queue: David James Tested-by: David James --- lib/parallel.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) (limited to 'lib/parallel.py') diff --git a/lib/parallel.py b/lib/parallel.py index ed13df4fd..a7056427c 100644 --- a/lib/parallel.py +++ b/lib/parallel.py @@ -155,12 +155,14 @@ class _BackgroundTask(multiprocessing.Process): with open(self._output.name, 'r') as output: pos = 0 running, exited_cleanly, msg, error = (True, False, None, None) + possibly_flaky = False while running: # Check whether the process is still alive. running = self.is_alive() try: - error, results = self._queue.get(True, self.PRINT_INTERVAL) + error, results, possibly_flaky = \ + self._queue.get(True, self.PRINT_INTERVAL) running = False exited_cleanly = True except Queue.Empty: @@ -174,8 +176,12 @@ class _BackgroundTask(multiprocessing.Process): msg = '%r hung for %r seconds' % (self, self.EXIT_TIMEOUT) self._KillChildren([self]) elif not exited_cleanly: + # Treat SIGKILL signals as potentially flaky. + if self.exitcode == -signal.SIGKILL: + possibly_flaky = True msg = ('%r exited unexpectedly with code %s' % - (self, self.EXIT_TIMEOUT)) + (self, self.exitcode)) + # Read output from process. output.seek(pos) buf = output.read(_BUFSIZE) @@ -187,6 +193,9 @@ class _BackgroundTask(multiprocessing.Process): (self, self.SILENT_TIMEOUT)) self._KillChildren([self]) + # Timeouts are possibly flaky. + possibly_flaky = True + # Read remaining output from the process. output.seek(pos) buf = output.read(_BUFSIZE) @@ -218,7 +227,7 @@ class _BackgroundTask(multiprocessing.Process): self.Cleanup(silent=True) # If a traceback occurred, return it. - return error + return error, possibly_flaky def start(self): """Invoke multiprocessing.Process.start after flushing output/err.""" @@ -238,13 +247,14 @@ class _BackgroundTask(multiprocessing.Process): self._semaphore.acquire() error = 'Unexpected exception in %r' % self + possibly_flaky = False pid = os.getpid() try: - error = self._Run() + error, possibly_flaky = self._Run() finally: if not self._killing.is_set() and os.getpid() == pid: results = results_lib.Results.Get() - self._queue.put((error, results)) + self._queue.put((error, results, possibly_flaky)) if self._semaphore is not None: self._semaphore.release() @@ -260,6 +270,7 @@ class _BackgroundTask(multiprocessing.Process): sys.stdout.flush() sys.stderr.flush() + possibly_flaky = False # Send all output to a named temporary file. with open(self._output.name, 'w', 0) as output: # Back up sys.std{err,out}. These aren't used, but we keep a copy so @@ -287,6 +298,7 @@ class _BackgroundTask(multiprocessing.Process): self._task(*self._task_args, **self._task_kwargs) except results_lib.StepFailure as ex: error = str(ex) + possibly_flaky = ex.possibly_flaky except BaseException as ex: error = traceback.format_exc() if self._killing.is_set(): @@ -295,7 +307,7 @@ class _BackgroundTask(multiprocessing.Process): sys.stdout.flush() sys.stderr.flush() - return error + return error, possibly_flaky @classmethod def _KillChildren(cls, bg_tasks, log_level=logging.WARNING): @@ -361,6 +373,7 @@ class _BackgroundTask(multiprocessing.Process): """ semaphore = None + possibly_flaky = False if max_parallel is not None: semaphore = multiprocessing.Semaphore(max_parallel) @@ -376,10 +389,12 @@ class _BackgroundTask(multiprocessing.Process): finally: # Wait for each step to complete. tracebacks = [] + flaky_tasks = [] while bg_tasks: task = bg_tasks.popleft() - error = task.Wait() + error, possibly_flaky = task.Wait() if error is not None: + flaky_tasks.append(possibly_flaky) tracebacks.append(error) if halt_on_error: break @@ -390,7 +405,8 @@ class _BackgroundTask(multiprocessing.Process): # Propagate any exceptions. if tracebacks: - raise BackgroundFailure('\n' + ''.join(tracebacks)) + possibly_flaky = flaky_tasks and all(flaky_tasks) + raise BackgroundFailure('\n' + ''.join(tracebacks), possibly_flaky) @staticmethod def TaskRunner(queue, task, onexit=None, task_args=None, task_kwargs=None): -- cgit v1.2.3