aboutsummaryrefslogtreecommitdiff
path: root/catapult/devil/devil/utils/parallelizer.py
blob: 930d01f93aea43f775fbf2096320b3a03a850098 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
""" Wrapper that allows method execution in parallel.

This class wraps a list of objects of the same type, emulates their
interface, and executes any functions called on the objects in parallel
in ReraiserThreads.

This means that, given a list of objects:

  class Foo:
    def __init__(self):
      self.baz = Baz()

    def bar(self, my_param):
      // do something

  list_of_foos = [Foo(1), Foo(2), Foo(3)]

we can take a sequential operation on that list of objects:

  for f in list_of_foos:
    f.bar('Hello')

and run it in parallel across all of the objects:

  Parallelizer(list_of_foos).bar('Hello')

It can also handle (non-method) attributes of objects, so that this:

  for f in list_of_foos:
    f.baz.myBazMethod()

can be run in parallel with:

  Parallelizer(list_of_foos).baz.myBazMethod()

Because it emulates the interface of the wrapped objects, a Parallelizer
can be passed to a method or function that takes objects of that type:

  def DoesSomethingWithFoo(the_foo):
    the_foo.bar('Hello')
    the_foo.bar('world')
    the_foo.baz.myBazMethod

  DoesSomethingWithFoo(Parallelizer(list_of_foos))

Note that this class spins up a thread for each object. Using this class
to parallelize operations that are already fast will incur a net performance
penalty.

"""
# pylint: disable=protected-access

from devil.utils import reraiser_thread
from devil.utils import watchdog_timer

_DEFAULT_TIMEOUT = 30
_DEFAULT_RETRIES = 3


class Parallelizer(object):
  """Allows parallel execution of method calls across a group of objects."""

  def __init__(self, objs):
    self._orig_objs = objs
    self._objs = objs

  def __getattr__(self, name):
    """Emulate getting the |name| attribute of |self|.

    Args:
      name: The name of the attribute to retrieve.
    Returns:
      A Parallelizer emulating the |name| attribute of |self|.
    """
    self.pGet(None)

    r = type(self)(self._orig_objs)
    r._objs = [getattr(o, name) for o in self._objs]
    return r

  def __getitem__(self, index):
    """Emulate getting the value of |self| at |index|.

    Returns:
      A Parallelizer emulating the value of |self| at |index|.
    """
    self.pGet(None)

    r = type(self)(self._orig_objs)
    r._objs = [o[index] for o in self._objs]
    return r

  def __call__(self, *args, **kwargs):
    """Emulate calling |self| with |args| and |kwargs|.

    Note that this call is asynchronous. Call pFinish on the return value to
    block until the call finishes.

    Returns:
      A Parallelizer wrapping the ReraiserThreadGroup running the call in
      parallel.
    Raises:
      AttributeError if the wrapped objects aren't callable.
    """
    self.pGet(None)

    for o in self._objs:
      if not callable(o):
        raise AttributeError("'%s' is not callable" % o.__name__)

    r = type(self)(self._orig_objs)
    r._objs = reraiser_thread.ReraiserThreadGroup([
        reraiser_thread.ReraiserThread(
            o, args=args, kwargs=kwargs, name='%s.%s' % (str(d), o.__name__))
        for d, o in zip(self._orig_objs, self._objs)
    ])
    r._objs.StartAll()
    return r

  def pFinish(self, timeout):
    """Finish any outstanding asynchronous operations.

    Args:
      timeout: The maximum number of seconds to wait for an individual
               result to return, or None to wait forever.
    Returns:
      self, now emulating the return values.
    """
    self._assertNoShadow('pFinish')
    if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
      self._objs.JoinAll()
      self._objs = self._objs.GetAllReturnValues(
          watchdog_timer.WatchdogTimer(timeout))
    return self

  def pGet(self, timeout):
    """Get the current wrapped objects.

    Args:
      timeout: Same as |pFinish|.
    Returns:
      A list of the results, in order of the provided devices.
    Raises:
      Any exception raised by any of the called functions.
    """
    self._assertNoShadow('pGet')
    self.pFinish(timeout)
    return self._objs

  def pMap(self, f, *args, **kwargs):
    """Map a function across the current wrapped objects in parallel.

    This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.

    Note that this call is asynchronous. Call pFinish on the return value to
    block until the call finishes.

    Args:
      f: The function to call.
      args: The positional args to pass to f.
      kwargs: The keyword args to pass to f.
    Returns:
      A Parallelizer wrapping the ReraiserThreadGroup running the map in
      parallel.
    """
    self._assertNoShadow('pMap')
    r = type(self)(self._orig_objs)
    r._objs = reraiser_thread.ReraiserThreadGroup([
        reraiser_thread.ReraiserThread(
            f,
            args=tuple([o] + list(args)),
            kwargs=kwargs,
            name='%s(%s)' % (f.__name__, d))
        for d, o in zip(self._orig_objs, self._objs)
    ])
    r._objs.StartAll()
    return r

  def _assertNoShadow(self, attr_name):
    """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts.

    If the wrapped objects _do_ have an |attr_name| attribute, it will be
    inaccessible to clients.

    Args:
      attr_name: The attribute to check.
    Raises:
      AssertionError if the wrapped objects have an attribute named 'attr_name'
      or '_assertNoShadow'.
    """
    if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
      assert not hasattr(self._objs, '_assertNoShadow')
      assert not hasattr(self._objs, attr_name)
    else:
      assert not any(hasattr(o, '_assertNoShadow') for o in self._objs)
      assert not any(hasattr(o, attr_name) for o in self._objs)


class SyncParallelizer(Parallelizer):
  """A Parallelizer that blocks on function calls."""

  def __enter__(self):
    """Emulate entering the context of |self|.

    Note that this call is synchronous.

    Returns:
      A Parallelizer emulating the value returned from entering into the
      context of |self|.
    """
    r = type(self)(self._orig_objs)
    r._objs = [o.__enter__ for o in r._objs]
    return r.__call__()

  def __exit__(self, exc_type, exc_val, exc_tb):
    """Emulate exiting the context of |self|.

    Note that this call is synchronous.

    Args:
      exc_type: the exception type.
      exc_val: the exception value.
      exc_tb: the exception traceback.
    """
    r = type(self)(self._orig_objs)
    r._objs = [o.__exit__ for o in r._objs]
    r.__call__(exc_type, exc_val, exc_tb)

  # override
  def __call__(self, *args, **kwargs):
    """Emulate calling |self| with |args| and |kwargs|.

    Note that this call is synchronous.

    Returns:
      A Parallelizer emulating the value returned from calling |self| with
      |args| and |kwargs|.
    Raises:
      AttributeError if the wrapped objects aren't callable.
    """
    r = super(SyncParallelizer, self).__call__(*args, **kwargs)
    r.pFinish(None)
    return r

  # override
  def pMap(self, f, *args, **kwargs):
    """Map a function across the current wrapped objects in parallel.

    This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.

    Note that this call is synchronous.

    Args:
      f: The function to call.
      args: The positional args to pass to f.
      kwargs: The keyword args to pass to f.
    Returns:
      A Parallelizer wrapping the ReraiserThreadGroup running the map in
      parallel.
    """
    r = super(SyncParallelizer, self).pMap(f, *args, **kwargs)
    r.pFinish(None)
    return r