summaryrefslogtreecommitdiff
path: root/lib/parallel.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/parallel.py')
-rw-r--r--lib/parallel.py22
1 files changed, 18 insertions, 4 deletions
diff --git a/lib/parallel.py b/lib/parallel.py
index 1bee91af3..a669523a6 100644
--- a/lib/parallel.py
+++ b/lib/parallel.py
@@ -765,13 +765,27 @@ def RunTasksInProcessPool(task, inputs, processes=None, onexit=None):
processes: Number of processes, at most, to launch.
onexit: Function to run in each background process after all inputs are
processed.
- """
+ Returns:
+ Returns a list containing the return values of the task for each input.
+ """
if not processes:
# - Use >=16 processes by default, in case it's a network-bound operation.
# - Try to use all of the CPUs, in case it's a CPU-bound operation.
processes = min(max(16, multiprocessing.cpu_count()), len(inputs))
- with BackgroundTaskRunner(task, processes=processes, onexit=onexit) as queue:
- for x in inputs:
- queue.put(x)
+ with Manager() as manager:
+ # Set up output queue.
+ out_queue = manager.Queue()
+ fn = lambda idx, task_args: out_queue.put((idx, task(*task_args)))
+
+ # Micro-optimization: Setup the queue so that BackgroundTaskRunner
+ # doesn't have to set up another Manager process.
+ queue = manager.Queue()
+
+ with BackgroundTaskRunner(fn, queue=queue, processes=processes,
+ onexit=onexit) as queue:
+ for idx, input_args in enumerate(inputs):
+ queue.put((idx, input_args))
+
+ return [x[1] for x in sorted(out_queue.get() for _ in range(len(inputs)))]