summaryrefslogtreecommitdiff
path: root/lib/parallel.py
diff options
context:
space:
mode:
authorMike Frysinger <vapier@chromium.org>2013-05-30 19:57:58 -0400
committerChromeBot <chrome-bot@google.com>2013-06-04 10:08:21 -0700
commit167d83eb8e61346b88c70361acc9cdbf37362493 (patch)
treeeea2d667fb512e508ea8b437391f72b159c06042 /lib/parallel.py
parent1692d5e5ccc2bc2fb649fd7ab99d6acd22370454 (diff)
downloadchromite-167d83eb8e61346b88c70361acc9cdbf37362493.tar.gz
parallel: BackgroundTaskRunner: pass down args/kwargs to tasks
In order to use fun multiprocessing objects like Value and Queue, you have to serialize the objects sooner rather than later. That means you can't do something perfectly reasonable like: results = multiprocessing.Queue() with parallel.BackgroundTaskRunner(myfunc) as queue: for x in (1, 2, 3): queue.put((results, x)) Python will crap itself like so: Traceback (most recent call last): File "/usr/lib64/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) File "/usr/lib64/python2.7/multiprocessing/queues.py", line 77, in __getstate__ assert_spawning(self) File "/usr/lib64/python2.7/multiprocessing/forking.py", line 51, in assert_spawning ' through inheritance' % type(self).__name__ RuntimeError: Queue objects should only be shared between processes through inheritance In order to pass that results queue down to myfunc, you have to include it in the multiprocess.Process() initialization. Unfortunately, our current framework does not support passing args/kwargs at that phase. So let's throw in the plumbing to support arbitrary passing of args and kwargs down to the function at task creation time. Not only does this solve the above problem, but it makes implementations cleaner when you have a set of args/kwargs that are common to all calls as you no longer need to manually add them to the queue. Thus the aforementioned snippet now becomes: results = multiprocessing.Queue() with parallel.BackgroundTaskRunner(myfunc, results) as queue: for x in (1, 2, 3): queue.put((x,)) BUG=chromium:209442 BUG=chromium:213204 TEST=`lib/parallel_unittest.py` passes TEST=my new code that needs this functionality works Change-Id: I83ac1aa73f98f72c6a8d3a909f4b649059faf98e Reviewed-on: https://gerrit.chromium.org/gerrit/57182 Reviewed-by: David James <davidjames@chromium.org> Tested-by: Mike Frysinger <vapier@chromium.org> Commit-Queue: Mike Frysinger <vapier@chromium.org>
Diffstat (limited to 'lib/parallel.py')
-rw-r--r--lib/parallel.py52
1 files changed, 40 insertions, 12 deletions
diff --git a/lib/parallel.py b/lib/parallel.py
index 786821e4b..bf2906b87 100644
--- a/lib/parallel.py
+++ b/lib/parallel.py
@@ -61,12 +61,18 @@ class _BackgroundTask(multiprocessing.Process):
# Interval we check for updates from print statements.
PRINT_INTERVAL = 1
- def __init__(self, task, semaphore=None):
+ def __init__(self, task, semaphore=None, task_args=None, task_kwargs=None):
"""Create a new _BackgroundTask object.
If semaphore is supplied, it will be acquired for the duration of the
steps that are run in the background. This can be used to limit the
number of simultaneous parallel tasks.
+
+ Args:
+ task: The task (a functor) to run in the background.
+ semaphore: The lock to hold while |task| runs.
+ task_args: A list of args to pass to the |task|.
+ task_kwargs: A dict of optional args to pass to the |task|.
"""
multiprocessing.Process.__init__(self)
self._task = task
@@ -76,6 +82,8 @@ class _BackgroundTask(multiprocessing.Process):
self._killing = multiprocessing.Event()
self._output = None
self._parent_pid = None
+ self._task_args = task_args if task_args else ()
+ self._task_kwargs = task_kwargs if task_kwargs else {}
def _WaitForStartup(self):
# TODO(davidjames): Use python-2.7 syntax to simplify this.
@@ -270,7 +278,7 @@ class _BackgroundTask(multiprocessing.Process):
cls.SILENT_TIMEOUT -= cls.SILENT_TIMEOUT_STEP
# Actually launch the task.
- self._task()
+ self._task(*self._task_args, **self._task_kwargs)
except results_lib.StepFailure as ex:
error = str(ex)
except BaseException as ex:
@@ -353,7 +361,7 @@ class _BackgroundTask(multiprocessing.Process):
# First, start all the steps.
bg_tasks = collections.deque()
for step in steps:
- task = cls(step, semaphore)
+ task = cls(step, semaphore=semaphore)
task.start()
bg_tasks.append(task)
@@ -379,7 +387,7 @@ class _BackgroundTask(multiprocessing.Process):
raise BackgroundFailure('\n' + ''.join(tracebacks))
@staticmethod
- def TaskRunner(queue, task, onexit=None):
+ def TaskRunner(queue, task, onexit=None, task_args=None, task_kwargs=None):
"""Run task(*input) for each input in the queue.
Returns when it encounters an _AllTasksComplete object on the queue.
@@ -391,7 +399,16 @@ class _BackgroundTask(multiprocessing.Process):
be run.
task: Function to run on each queued input.
onexit: Function to run after all inputs are processed.
+ task_args: A list of args to pass to the |task|.
+ task_kwargs: A dict of optional args to pass to the |task|.
"""
+ if task_args is None:
+ task_args = []
+ elif not isinstance(task_args, list):
+ task_args = list(task_args)
+ if task_kwargs is None:
+ task_kwargs = {}
+
tracebacks = []
while True:
# Wait for a new item to show up on the queue. This is a blocking wait,
@@ -400,11 +417,15 @@ class _BackgroundTask(multiprocessing.Process):
if isinstance(x, _AllTasksComplete):
# All tasks are complete, so we should exit.
break
+ elif not isinstance(x, list):
+ x = task_args + list(x)
+ else:
+ x = task_args + x
# If no tasks failed yet, process the remaining tasks.
if not tracebacks:
try:
- task(*x)
+ task(*x, **task_kwargs)
except BaseException:
tracebacks.append(traceback.format_exc())
@@ -488,12 +509,13 @@ class _AllTasksComplete(object):
@contextlib.contextmanager
-def BackgroundTaskRunner(task, queue=None, processes=None, onexit=None):
+def BackgroundTaskRunner(task, *args, **kwargs):
"""Run the specified task on each queued input in a pool of processes.
This context manager starts a set of workers in the background, who each
- wait for input on the specified queue. These workers run task(*input) for
- each input on the queue.
+ wait for input on the specified queue. For each input on the queue, these
+ workers run task(*args + *input, **kwargs). Note that certain kwargs will
+ not pass through to the task (see Args below for the list).
The output from these tasks is saved to a temporary file. When control
returns to the context manager, the background output is printed in order,
@@ -504,13 +526,13 @@ def BackgroundTaskRunner(task, queue=None, processes=None, onexit=None):
BackgroundFailure is raised with full stack traces of all exceptions.
Example:
- # This will run somefunc('small', 'cow') in the background
+ # This will run somefunc(1, 'small', 'cow', foo='bar' in the background
# while "more random stuff" is being executed.
- def somefunc(arg1, arg2):
+ def somefunc(arg1, arg2, arg3, foo=None):
...
...
- with BackgroundTaskRunner(somefunc) as queue:
+ with BackgroundTaskRunner(somefunc, 1, foo='bar') as queue:
... do random stuff ...
queue.put(['small', 'cow'])
... do more random stuff ...
@@ -525,13 +547,19 @@ def BackgroundTaskRunner(task, queue=None, processes=None, onexit=None):
processed.
"""
+ queue = kwargs.pop('queue', None)
+ processes = kwargs.pop('processes', None)
+ onexit = kwargs.pop('onexit', None)
+
if queue is None:
queue = multiprocessing.Queue()
if not processes:
processes = multiprocessing.cpu_count()
- child = functools.partial(_BackgroundTask.TaskRunner, queue, task, onexit)
+ child = functools.partial(_BackgroundTask.TaskRunner, queue, task,
+ onexit=onexit, task_args=args,
+ task_kwargs=kwargs)
steps = [child] * processes
with _BackgroundTask.ParallelTasks(steps):
try: