summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid James <davidjames@google.com>2014-12-09 13:44:40 -0800
committerchrome-internal-fetch <chrome-internal-fetch@google.com>2014-12-10 23:47:49 +0000
commitf6cbe2f6480b63bb07f60b19255cc4cf68557260 (patch)
tree638003cf68157ce56ebb4984f7ca7a10a9849481
parentfbb3b5286f5b76cd001cd3c0ca47b9108f02bee7 (diff)
downloadchromite-f6cbe2f6480b63bb07f60b19255cc4cf68557260.tar.gz
parallel.RunTasksInProcessPool: Support returning values.
BUG=chromium:440495 TEST=New unit test. TEST=Test with new feature in followup CL. Change-Id: Ib9e4bbd2726a02bd7bcf405be3521726ad26f838 Reviewed-on: https://chromium-review.googlesource.com/234240 Reviewed-by: Don Garrett <dgarrett@chromium.org> Trybot-Ready: David James <davidjames@chromium.org> Commit-Queue: David James <davidjames@chromium.org> Reviewed-by: Mike Frysinger <vapier@chromium.org> Tested-by: David James <davidjames@chromium.org>
-rwxr-xr-xlib/cgroups_unittest.py3
-rw-r--r--lib/parallel.py22
-rwxr-xr-xlib/parallel_unittest.py22
3 files changed, 33 insertions, 14 deletions
diff --git a/lib/cgroups_unittest.py b/lib/cgroups_unittest.py
index 79cbf804e..3957faf24 100755
--- a/lib/cgroups_unittest.py
+++ b/lib/cgroups_unittest.py
@@ -30,7 +30,8 @@ class TestCreateGroups(cros_test_lib.TestCase):
"""Run many cros_sdk processes in parallel to test for race conditions."""
with sudo.SudoKeepAlive():
with cgroups.SimpleContainChildren('example', sigterm_timeout=5):
- parallel.RunTasksInProcessPool(self._CrosSdk, [[]] * 20, 10)
+ parallel.RunTasksInProcessPool(self._CrosSdk, [[]] * 20,
+ processes=10)
if __name__ == '__main__':
diff --git a/lib/parallel.py b/lib/parallel.py
index 1bee91af3..a669523a6 100644
--- a/lib/parallel.py
+++ b/lib/parallel.py
@@ -765,13 +765,27 @@ def RunTasksInProcessPool(task, inputs, processes=None, onexit=None):
processes: Number of processes, at most, to launch.
onexit: Function to run in each background process after all inputs are
processed.
- """
+ Returns:
+ Returns a list containing the return values of the task for each input.
+ """
if not processes:
# - Use >=16 processes by default, in case it's a network-bound operation.
# - Try to use all of the CPUs, in case it's a CPU-bound operation.
processes = min(max(16, multiprocessing.cpu_count()), len(inputs))
- with BackgroundTaskRunner(task, processes=processes, onexit=onexit) as queue:
- for x in inputs:
- queue.put(x)
+ with Manager() as manager:
+ # Set up output queue.
+ out_queue = manager.Queue()
+ fn = lambda idx, task_args: out_queue.put((idx, task(*task_args)))
+
+ # Micro-optimization: Setup the queue so that BackgroundTaskRunner
+ # doesn't have to set up another Manager process.
+ queue = manager.Queue()
+
+ with BackgroundTaskRunner(fn, queue=queue, processes=processes,
+ onexit=onexit) as queue:
+ for idx, input_args in enumerate(inputs):
+ queue.put((idx, input_args))
+
+ return [x[1] for x in sorted(out_queue.get() for _ in range(len(inputs)))]
diff --git a/lib/parallel_unittest.py b/lib/parallel_unittest.py
index 89ee60d6e..d163f7a0c 100755
--- a/lib/parallel_unittest.py
+++ b/lib/parallel_unittest.py
@@ -337,6 +337,7 @@ class TestParallelMock(TestBackgroundWrapper):
def _Callback(self):
self._calls += 1
+ return self._calls
def testRunParallelSteps(self):
"""Make sure RunParallelSteps is mocked out."""
@@ -349,11 +350,15 @@ class TestParallelMock(TestBackgroundWrapper):
with ParallelMock():
parallel.RunTasksInProcessPool(self._Callback, [])
self.assertEqual(0, self._calls)
- parallel.RunTasksInProcessPool(self._Callback, [[]])
+ result = parallel.RunTasksInProcessPool(self._Callback, [[]])
self.assertEqual(1, self._calls)
- parallel.RunTasksInProcessPool(self._Callback, [], processes=9,
- onexit=self._Callback)
+ self.assertEqual([1], result)
+ result = parallel.RunTasksInProcessPool(self._Callback, [], processes=9,
+ onexit=self._Callback)
self.assertEqual(10, self._calls)
+ self.assertEqual([], result)
+ result = parallel.RunTasksInProcessPool(self._Callback, [[]] * 10)
+ self.assertEqual(range(11, 21), result)
class TestExceptions(cros_test_lib.MockOutputTestCase):
@@ -377,13 +382,12 @@ class TestExceptions(cros_test_lib.MockOutputTestCase):
lambda: parallel.RunParallelSteps([fn])):
output_str = ex_str = ex = None
with self.OutputCapturer() as capture:
- try:
+ with self.assertRaises(parallel.BackgroundFailure) as ex:
task()
- except parallel.BackgroundFailure as ex:
- output_str = capture.GetStdout()
- ex_str = str(ex)
+ output_str = capture.GetStdout()
+ ex_str = str(ex.exception)
- self.assertTrue(exc_type in [x.type for x in ex.exc_infos])
+ self.assertTrue(exc_type in [x.type for x in ex.exception.exc_infos])
self.assertEqual(output_str, _GREETING)
self.assertTrue(str(exc_type) in ex_str)
@@ -396,7 +400,7 @@ class TestExceptions(cros_test_lib.MockOutputTestCase):
def testFailedPickle(self):
"""PicklingError should be thrown when an argument fails to pickle."""
with self.assertRaises(cPickle.PicklingError):
- parallel.RunTasksInProcessPool(self._SystemExit, [self._SystemExit])
+ parallel.RunTasksInProcessPool(self._SystemExit, [[self._SystemExit]])
def testFailedPickleOnReturn(self):
"""PicklingError should be thrown when a return value fails to pickle."""