summaryrefslogtreecommitdiff
path: root/lib/parallel.py
diff options
context:
space:
mode:
authorDavid James <davidjames@google.com>2014-12-09 13:44:40 -0800
committerchrome-internal-fetch <chrome-internal-fetch@google.com>2014-12-10 23:47:49 +0000
commitf6cbe2f6480b63bb07f60b19255cc4cf68557260 (patch)
tree638003cf68157ce56ebb4984f7ca7a10a9849481 /lib/parallel.py
parentfbb3b5286f5b76cd001cd3c0ca47b9108f02bee7 (diff)
downloadchromite-f6cbe2f6480b63bb07f60b19255cc4cf68557260.tar.gz
parallel.RunTasksInProcessPool: Support returning values.
BUG=chromium:440495 TEST=New unit test. TEST=Test with new feature in followup CL. Change-Id: Ib9e4bbd2726a02bd7bcf405be3521726ad26f838 Reviewed-on: https://chromium-review.googlesource.com/234240 Reviewed-by: Don Garrett <dgarrett@chromium.org> Trybot-Ready: David James <davidjames@chromium.org> Commit-Queue: David James <davidjames@chromium.org> Reviewed-by: Mike Frysinger <vapier@chromium.org> Tested-by: David James <davidjames@chromium.org>
Diffstat (limited to 'lib/parallel.py')
-rw-r--r--lib/parallel.py22
1 files changed, 18 insertions, 4 deletions
diff --git a/lib/parallel.py b/lib/parallel.py
index 1bee91af3..a669523a6 100644
--- a/lib/parallel.py
+++ b/lib/parallel.py
@@ -765,13 +765,27 @@ def RunTasksInProcessPool(task, inputs, processes=None, onexit=None):
processes: Number of processes, at most, to launch.
onexit: Function to run in each background process after all inputs are
processed.
- """
+ Returns:
+ Returns a list containing the return values of the task for each input.
+ """
if not processes:
# - Use >=16 processes by default, in case it's a network-bound operation.
# - Try to use all of the CPUs, in case it's a CPU-bound operation.
processes = min(max(16, multiprocessing.cpu_count()), len(inputs))
- with BackgroundTaskRunner(task, processes=processes, onexit=onexit) as queue:
- for x in inputs:
- queue.put(x)
+ with Manager() as manager:
+ # Set up output queue.
+ out_queue = manager.Queue()
+ fn = lambda idx, task_args: out_queue.put((idx, task(*task_args)))
+
+ # Micro-optimization: Setup the queue so that BackgroundTaskRunner
+ # doesn't have to set up another Manager process.
+ queue = manager.Queue()
+
+ with BackgroundTaskRunner(fn, queue=queue, processes=processes,
+ onexit=onexit) as queue:
+ for idx, input_args in enumerate(inputs):
+ queue.put((idx, input_args))
+
+ return [x[1] for x in sorted(out_queue.get() for _ in range(len(inputs)))]