summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorPrathmesh Prabhu <pprabhu@chromium.org>2015-03-23 14:51:28 -0700
committerChromeOS Commit Bot <chromeos-commit-bot@chromium.org>2015-03-28 00:58:10 +0000
commitfdd904c9438894ae458760dcb06b3acd8c71ada9 (patch)
tree5732465bf2d4f803c45a89e3f40acc249dd574f9 /lib
parent33ab6f763a2bc5b120f5a4d87f56d54054cfadc0 (diff)
downloadchromite-fdd904c9438894ae458760dcb06b3acd8c71ada9.tar.gz
parallel: Halt tasks on foreground error with halt_on_error set.
Before this CL, halt_on_error would halt remaining background tasks whenever one of the background tasks failed. This CL extends the behaviour to halt on exceptions raised from the foreground process as well. It exposes the new behaviour to clients through BackgroundTaskRunner context function. While there, fix timeout in parallel_unittest so that they work correctly even if softer signals fail to kill background tasks. BUG=chromium:396723 TEST=unittests, including a new test that fails without the halt_on_error flag. Change-Id: Ib5105c6658d78e5f2a5107500de72c3716b04e02 Reviewed-on: https://chromium-review.googlesource.com/261969 Reviewed-by: David James <davidjames@chromium.org> Commit-Queue: Prathmesh Prabhu <pprabhu@chromium.org> Tested-by: Prathmesh Prabhu <pprabhu@chromium.org>
Diffstat (limited to 'lib')
-rw-r--r--lib/parallel.py12
-rw-r--r--lib/parallel_unittest.py30
2 files changed, 38 insertions, 4 deletions
diff --git a/lib/parallel.py b/lib/parallel.py
index dd263c1f8..4db9061ad 100644
--- a/lib/parallel.py
+++ b/lib/parallel.py
@@ -537,9 +537,10 @@ class _BackgroundTask(multiprocessing.Process):
except BaseException:
foreground_except = sys.exc_info()
finally:
- # Wait for each step to complete.
errors = []
- while bg_tasks:
+ skip_bg_wait = halt_on_error and foreground_except is not None
+ # Wait for each step to complete.
+ while not skip_bg_wait and bg_tasks:
task = bg_tasks.popleft()
task_errors = task.Wait()
if task_errors:
@@ -717,11 +718,16 @@ def BackgroundTaskRunner(task, *args, **kwargs):
processes: Number of processes to launch.
onexit: Function to run in each background process after all inputs are
processed.
+ halt_on_error: After the first exception occurs, halt any running steps, and
+ squelch any further output, including any exceptions that might occur.
+ Halts on exceptions in any of the background processes, or in the
+ foreground process using the BackgroundTaskRunner.
"""
queue = kwargs.pop('queue', None)
processes = kwargs.pop('processes', None)
onexit = kwargs.pop('onexit', None)
+ halt_on_error = kwargs.pop('halt_on_error', False)
with cros_build_lib.ContextManagerStack() as stack:
if queue is None:
@@ -735,7 +741,7 @@ def BackgroundTaskRunner(task, *args, **kwargs):
onexit=onexit, task_args=args,
task_kwargs=kwargs)
steps = [child] * processes
- with _BackgroundTask.ParallelTasks(steps):
+ with _BackgroundTask.ParallelTasks(steps, halt_on_error=halt_on_error):
try:
yield queue
finally:
diff --git a/lib/parallel_unittest.py b/lib/parallel_unittest.py
index 2c9997b01..9227dbfda 100644
--- a/lib/parallel_unittest.py
+++ b/lib/parallel_unittest.py
@@ -426,6 +426,10 @@ class TestExceptions(cros_test_lib.MockOutputTestCase):
parallel.RunParallelSteps([self._BadPickler], return_values=True)
+class _TestForegroundException(Exception):
+ """An exception to be raised by the foreground process."""
+
+
class TestHalting(cros_test_lib.MockOutputTestCase, TestBackgroundWrapper):
"""Test that child processes are halted when exceptions occur."""
@@ -433,6 +437,16 @@ class TestHalting(cros_test_lib.MockOutputTestCase, TestBackgroundWrapper):
self.failed = multiprocessing.Event()
self.passed = multiprocessing.Event()
+ def _GetKillChildrenTimeout(self):
+ """Return a timeout that is long enough for _BackgroundTask._KillChildren.
+
+ This unittest is not meant to restrict which signal succeeds in killing the
+ background process, so use a long enough timeout whenever asserting that the
+ background process is killed, keeping buffer for slow builders.
+ """
+ return (parallel._BackgroundTask.SIGTERM_TIMEOUT +
+ parallel._BackgroundTask.SIGKILL_TIMEOUT) + 30
+
def _Pass(self):
self.passed.set()
sys.stdout.write(_GREETING)
@@ -443,9 +457,13 @@ class TestHalting(cros_test_lib.MockOutputTestCase, TestBackgroundWrapper):
sys.exit(1)
def _Fail(self):
- self.failed.wait(60)
+ self.failed.wait(self._GetKillChildrenTimeout())
self.failed.set()
+ def _PassEventually(self):
+ self.passed.wait(self._GetKillChildrenTimeout())
+ self.passed.set()
+
@unittest.skipIf(_SKIP_FLAKY_TESTS, 'Occasionally fails.')
def testExceptionRaising(self):
"""Test that exceptions halt all running steps."""
@@ -463,6 +481,16 @@ class TestHalting(cros_test_lib.MockOutputTestCase, TestBackgroundWrapper):
self.assertEqual(output_str, _GREETING)
self.assertFalse(self.failed.is_set())
+ def testForegroundExceptionRaising(self):
+ """Test that BackgroundTaskRunner halts tasks on a foreground exception."""
+ with self.assertRaises(_TestForegroundException):
+ with parallel.BackgroundTaskRunner(self._PassEventually,
+ processes=1,
+ halt_on_error=True) as queue:
+ queue.put([])
+ raise _TestForegroundException()
+ self.assertFalse(self.passed.is_set())
+
@unittest.skipIf(_SKIP_FLAKY_TESTS, 'Occasionally fails.')
def testTempFileCleanup(self):
"""Test that all temp files are cleaned up."""