diff options
author | David James <davidjames@google.com> | 2014-12-09 13:44:40 -0800 |
---|---|---|
committer | chrome-internal-fetch <chrome-internal-fetch@google.com> | 2014-12-10 23:47:49 +0000 |
commit | f6cbe2f6480b63bb07f60b19255cc4cf68557260 (patch) | |
tree | 638003cf68157ce56ebb4984f7ca7a10a9849481 | |
parent | fbb3b5286f5b76cd001cd3c0ca47b9108f02bee7 (diff) | |
download | chromite-f6cbe2f6480b63bb07f60b19255cc4cf68557260.tar.gz |
parallel.RunTasksInProcessPool: Support returning values.
BUG=chromium:440495
TEST=New unit test.
TEST=Test with new feature in followup CL.
Change-Id: Ib9e4bbd2726a02bd7bcf405be3521726ad26f838
Reviewed-on: https://chromium-review.googlesource.com/234240
Reviewed-by: Don Garrett <dgarrett@chromium.org>
Trybot-Ready: David James <davidjames@chromium.org>
Commit-Queue: David James <davidjames@chromium.org>
Reviewed-by: Mike Frysinger <vapier@chromium.org>
Tested-by: David James <davidjames@chromium.org>
-rwxr-xr-x | lib/cgroups_unittest.py | 3 | ||||
-rw-r--r-- | lib/parallel.py | 22 | ||||
-rwxr-xr-x | lib/parallel_unittest.py | 22 |
3 files changed, 33 insertions, 14 deletions
diff --git a/lib/cgroups_unittest.py b/lib/cgroups_unittest.py index 79cbf804e..3957faf24 100755 --- a/lib/cgroups_unittest.py +++ b/lib/cgroups_unittest.py @@ -30,7 +30,8 @@ class TestCreateGroups(cros_test_lib.TestCase): """Run many cros_sdk processes in parallel to test for race conditions.""" with sudo.SudoKeepAlive(): with cgroups.SimpleContainChildren('example', sigterm_timeout=5): - parallel.RunTasksInProcessPool(self._CrosSdk, [[]] * 20, 10) + parallel.RunTasksInProcessPool(self._CrosSdk, [[]] * 20, + processes=10) if __name__ == '__main__': diff --git a/lib/parallel.py b/lib/parallel.py index 1bee91af3..a669523a6 100644 --- a/lib/parallel.py +++ b/lib/parallel.py @@ -765,13 +765,27 @@ def RunTasksInProcessPool(task, inputs, processes=None, onexit=None): processes: Number of processes, at most, to launch. onexit: Function to run in each background process after all inputs are processed. - """ + Returns: + Returns a list containing the return values of the task for each input. + """ if not processes: # - Use >=16 processes by default, in case it's a network-bound operation. # - Try to use all of the CPUs, in case it's a CPU-bound operation. processes = min(max(16, multiprocessing.cpu_count()), len(inputs)) - with BackgroundTaskRunner(task, processes=processes, onexit=onexit) as queue: - for x in inputs: - queue.put(x) + with Manager() as manager: + # Set up output queue. + out_queue = manager.Queue() + fn = lambda idx, task_args: out_queue.put((idx, task(*task_args))) + + # Micro-optimization: Setup the queue so that BackgroundTaskRunner + # doesn't have to set up another Manager process. + queue = manager.Queue() + + with BackgroundTaskRunner(fn, queue=queue, processes=processes, + onexit=onexit) as queue: + for idx, input_args in enumerate(inputs): + queue.put((idx, input_args)) + + return [x[1] for x in sorted(out_queue.get() for _ in range(len(inputs)))] diff --git a/lib/parallel_unittest.py b/lib/parallel_unittest.py index 89ee60d6e..d163f7a0c 100755 --- a/lib/parallel_unittest.py +++ b/lib/parallel_unittest.py @@ -337,6 +337,7 @@ class TestParallelMock(TestBackgroundWrapper): def _Callback(self): self._calls += 1 + return self._calls def testRunParallelSteps(self): """Make sure RunParallelSteps is mocked out.""" @@ -349,11 +350,15 @@ class TestParallelMock(TestBackgroundWrapper): with ParallelMock(): parallel.RunTasksInProcessPool(self._Callback, []) self.assertEqual(0, self._calls) - parallel.RunTasksInProcessPool(self._Callback, [[]]) + result = parallel.RunTasksInProcessPool(self._Callback, [[]]) self.assertEqual(1, self._calls) - parallel.RunTasksInProcessPool(self._Callback, [], processes=9, - onexit=self._Callback) + self.assertEqual([1], result) + result = parallel.RunTasksInProcessPool(self._Callback, [], processes=9, + onexit=self._Callback) self.assertEqual(10, self._calls) + self.assertEqual([], result) + result = parallel.RunTasksInProcessPool(self._Callback, [[]] * 10) + self.assertEqual(range(11, 21), result) class TestExceptions(cros_test_lib.MockOutputTestCase): @@ -377,13 +382,12 @@ class TestExceptions(cros_test_lib.MockOutputTestCase): lambda: parallel.RunParallelSteps([fn])): output_str = ex_str = ex = None with self.OutputCapturer() as capture: - try: + with self.assertRaises(parallel.BackgroundFailure) as ex: task() - except parallel.BackgroundFailure as ex: - output_str = capture.GetStdout() - ex_str = str(ex) + output_str = capture.GetStdout() + ex_str = str(ex.exception) - self.assertTrue(exc_type in [x.type for x in ex.exc_infos]) + self.assertTrue(exc_type in [x.type for x in ex.exception.exc_infos]) self.assertEqual(output_str, _GREETING) self.assertTrue(str(exc_type) in ex_str) @@ -396,7 +400,7 @@ class TestExceptions(cros_test_lib.MockOutputTestCase): def testFailedPickle(self): """PicklingError should be thrown when an argument fails to pickle.""" with self.assertRaises(cPickle.PicklingError): - parallel.RunTasksInProcessPool(self._SystemExit, [self._SystemExit]) + parallel.RunTasksInProcessPool(self._SystemExit, [[self._SystemExit]]) def testFailedPickleOnReturn(self): """PicklingError should be thrown when a return value fails to pickle.""" |