diff options
Diffstat (limited to 'lib/parallel.py')
-rw-r--r-- | lib/parallel.py | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/lib/parallel.py b/lib/parallel.py index 1bee91af3..a669523a6 100644 --- a/lib/parallel.py +++ b/lib/parallel.py @@ -765,13 +765,27 @@ def RunTasksInProcessPool(task, inputs, processes=None, onexit=None): processes: Number of processes, at most, to launch. onexit: Function to run in each background process after all inputs are processed. - """ + Returns: + Returns a list containing the return values of the task for each input. + """ if not processes: # - Use >=16 processes by default, in case it's a network-bound operation. # - Try to use all of the CPUs, in case it's a CPU-bound operation. processes = min(max(16, multiprocessing.cpu_count()), len(inputs)) - with BackgroundTaskRunner(task, processes=processes, onexit=onexit) as queue: - for x in inputs: - queue.put(x) + with Manager() as manager: + # Set up output queue. + out_queue = manager.Queue() + fn = lambda idx, task_args: out_queue.put((idx, task(*task_args))) + + # Micro-optimization: Setup the queue so that BackgroundTaskRunner + # doesn't have to set up another Manager process. + queue = manager.Queue() + + with BackgroundTaskRunner(fn, queue=queue, processes=processes, + onexit=onexit) as queue: + for idx, input_args in enumerate(inputs): + queue.put((idx, input_args)) + + return [x[1] for x in sorted(out_queue.get() for _ in range(len(inputs)))] |