# Copyright 2014 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """ Wrapper that allows method execution in parallel. This class wraps a list of objects of the same type, emulates their interface, and executes any functions called on the objects in parallel in ReraiserThreads. This means that, given a list of objects: class Foo: def __init__(self): self.baz = Baz() def bar(self, my_param): // do something list_of_foos = [Foo(1), Foo(2), Foo(3)] we can take a sequential operation on that list of objects: for f in list_of_foos: f.bar('Hello') and run it in parallel across all of the objects: Parallelizer(list_of_foos).bar('Hello') It can also handle (non-method) attributes of objects, so that this: for f in list_of_foos: f.baz.myBazMethod() can be run in parallel with: Parallelizer(list_of_foos).baz.myBazMethod() Because it emulates the interface of the wrapped objects, a Parallelizer can be passed to a method or function that takes objects of that type: def DoesSomethingWithFoo(the_foo): the_foo.bar('Hello') the_foo.bar('world') the_foo.baz.myBazMethod DoesSomethingWithFoo(Parallelizer(list_of_foos)) Note that this class spins up a thread for each object. Using this class to parallelize operations that are already fast will incur a net performance penalty. """ # pylint: disable=protected-access from devil.utils import reraiser_thread from devil.utils import watchdog_timer _DEFAULT_TIMEOUT = 30 _DEFAULT_RETRIES = 3 class Parallelizer(object): """Allows parallel execution of method calls across a group of objects.""" def __init__(self, objs): self._orig_objs = objs self._objs = objs def __getattr__(self, name): """Emulate getting the |name| attribute of |self|. Args: name: The name of the attribute to retrieve. Returns: A Parallelizer emulating the |name| attribute of |self|. """ self.pGet(None) r = type(self)(self._orig_objs) r._objs = [getattr(o, name) for o in self._objs] return r def __getitem__(self, index): """Emulate getting the value of |self| at |index|. Returns: A Parallelizer emulating the value of |self| at |index|. """ self.pGet(None) r = type(self)(self._orig_objs) r._objs = [o[index] for o in self._objs] return r def __call__(self, *args, **kwargs): """Emulate calling |self| with |args| and |kwargs|. Note that this call is asynchronous. Call pFinish on the return value to block until the call finishes. Returns: A Parallelizer wrapping the ReraiserThreadGroup running the call in parallel. Raises: AttributeError if the wrapped objects aren't callable. """ self.pGet(None) for o in self._objs: if not callable(o): raise AttributeError("'%s' is not callable" % o.__name__) r = type(self)(self._orig_objs) r._objs = reraiser_thread.ReraiserThreadGroup( [reraiser_thread.ReraiserThread( o, args=args, kwargs=kwargs, name='%s.%s' % (str(d), o.__name__)) for d, o in zip(self._orig_objs, self._objs)]) r._objs.StartAll() return r def pFinish(self, timeout): """Finish any outstanding asynchronous operations. Args: timeout: The maximum number of seconds to wait for an individual result to return, or None to wait forever. Returns: self, now emulating the return values. """ self._assertNoShadow('pFinish') if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): self._objs.JoinAll() self._objs = self._objs.GetAllReturnValues( watchdog_timer.WatchdogTimer(timeout)) return self def pGet(self, timeout): """Get the current wrapped objects. Args: timeout: Same as |pFinish|. Returns: A list of the results, in order of the provided devices. Raises: Any exception raised by any of the called functions. """ self._assertNoShadow('pGet') self.pFinish(timeout) return self._objs def pMap(self, f, *args, **kwargs): """Map a function across the current wrapped objects in parallel. This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. Note that this call is asynchronous. Call pFinish on the return value to block until the call finishes. Args: f: The function to call. args: The positional args to pass to f. kwargs: The keyword args to pass to f. Returns: A Parallelizer wrapping the ReraiserThreadGroup running the map in parallel. """ self._assertNoShadow('pMap') r = type(self)(self._orig_objs) r._objs = reraiser_thread.ReraiserThreadGroup( [reraiser_thread.ReraiserThread( f, args=tuple([o] + list(args)), kwargs=kwargs, name='%s(%s)' % (f.__name__, d)) for d, o in zip(self._orig_objs, self._objs)]) r._objs.StartAll() return r def _assertNoShadow(self, attr_name): """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts. If the wrapped objects _do_ have an |attr_name| attribute, it will be inaccessible to clients. Args: attr_name: The attribute to check. Raises: AssertionError if the wrapped objects have an attribute named 'attr_name' or '_assertNoShadow'. """ if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): assert not hasattr(self._objs, '_assertNoShadow') assert not hasattr(self._objs, attr_name) else: assert not any(hasattr(o, '_assertNoShadow') for o in self._objs) assert not any(hasattr(o, attr_name) for o in self._objs) class SyncParallelizer(Parallelizer): """A Parallelizer that blocks on function calls.""" def __enter__(self): """Emulate entering the context of |self|. Note that this call is synchronous. Returns: A Parallelizer emulating the value returned from entering into the context of |self|. """ r = type(self)(self._orig_objs) r._objs = [o.__enter__ for o in r._objs] return r.__call__() def __exit__(self, exc_type, exc_val, exc_tb): """Emulate exiting the context of |self|. Note that this call is synchronous. Args: exc_type: the exception type. exc_val: the exception value. exc_tb: the exception traceback. """ r = type(self)(self._orig_objs) r._objs = [o.__exit__ for o in r._objs] r.__call__(exc_type, exc_val, exc_tb) # override def __call__(self, *args, **kwargs): """Emulate calling |self| with |args| and |kwargs|. Note that this call is synchronous. Returns: A Parallelizer emulating the value returned from calling |self| with |args| and |kwargs|. Raises: AttributeError if the wrapped objects aren't callable. """ r = super(SyncParallelizer, self).__call__(*args, **kwargs) r.pFinish(None) return r # override def pMap(self, f, *args, **kwargs): """Map a function across the current wrapped objects in parallel. This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. Note that this call is synchronous. Args: f: The function to call. args: The positional args to pass to f. kwargs: The keyword args to pass to f. Returns: A Parallelizer wrapping the ReraiserThreadGroup running the map in parallel. """ r = super(SyncParallelizer, self).pMap(f, *args, **kwargs) r.pFinish(None) return r