diff options
author | David James <davidjames@google.com> | 2014-12-09 13:44:40 -0800 |
---|---|---|
committer | chrome-internal-fetch <chrome-internal-fetch@google.com> | 2014-12-10 23:47:49 +0000 |
commit | f6cbe2f6480b63bb07f60b19255cc4cf68557260 (patch) | |
tree | 638003cf68157ce56ebb4984f7ca7a10a9849481 /lib/parallel.py | |
parent | fbb3b5286f5b76cd001cd3c0ca47b9108f02bee7 (diff) | |
download | chromite-f6cbe2f6480b63bb07f60b19255cc4cf68557260.tar.gz |
parallel.RunTasksInProcessPool: Support returning values.
BUG=chromium:440495
TEST=New unit test.
TEST=Test with new feature in followup CL.
Change-Id: Ib9e4bbd2726a02bd7bcf405be3521726ad26f838
Reviewed-on: https://chromium-review.googlesource.com/234240
Reviewed-by: Don Garrett <dgarrett@chromium.org>
Trybot-Ready: David James <davidjames@chromium.org>
Commit-Queue: David James <davidjames@chromium.org>
Reviewed-by: Mike Frysinger <vapier@chromium.org>
Tested-by: David James <davidjames@chromium.org>
Diffstat (limited to 'lib/parallel.py')
-rw-r--r-- | lib/parallel.py | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/lib/parallel.py b/lib/parallel.py index 1bee91af3..a669523a6 100644 --- a/lib/parallel.py +++ b/lib/parallel.py @@ -765,13 +765,27 @@ def RunTasksInProcessPool(task, inputs, processes=None, onexit=None): processes: Number of processes, at most, to launch. onexit: Function to run in each background process after all inputs are processed. - """ + Returns: + Returns a list containing the return values of the task for each input. + """ if not processes: # - Use >=16 processes by default, in case it's a network-bound operation. # - Try to use all of the CPUs, in case it's a CPU-bound operation. processes = min(max(16, multiprocessing.cpu_count()), len(inputs)) - with BackgroundTaskRunner(task, processes=processes, onexit=onexit) as queue: - for x in inputs: - queue.put(x) + with Manager() as manager: + # Set up output queue. + out_queue = manager.Queue() + fn = lambda idx, task_args: out_queue.put((idx, task(*task_args))) + + # Micro-optimization: Setup the queue so that BackgroundTaskRunner + # doesn't have to set up another Manager process. + queue = manager.Queue() + + with BackgroundTaskRunner(fn, queue=queue, processes=processes, + onexit=onexit) as queue: + for idx, input_args in enumerate(inputs): + queue.put((idx, input_args)) + + return [x[1] for x in sorted(out_queue.get() for _ in range(len(inputs)))] |