summaryrefslogtreecommitdiff
path: root/systrace/catapult/devil/devil/utils/parallelizer_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'systrace/catapult/devil/devil/utils/parallelizer_test.py')
-rw-r--r--systrace/catapult/devil/devil/utils/parallelizer_test.py162
1 files changed, 162 insertions, 0 deletions
diff --git a/systrace/catapult/devil/devil/utils/parallelizer_test.py b/systrace/catapult/devil/devil/utils/parallelizer_test.py
new file mode 100644
index 0000000..32ff7ec
--- /dev/null
+++ b/systrace/catapult/devil/devil/utils/parallelizer_test.py
@@ -0,0 +1,162 @@
+# Copyright 2014 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Unit tests for the contents of parallelizer.py."""
+
+# pylint: disable=W0212
+# pylint: disable=W0613
+
+import os
+import tempfile
+import time
+import unittest
+
+from devil.utils import parallelizer
+
+
+class ParallelizerTestObject(object):
+ """Class used to test parallelizer.Parallelizer."""
+
+ parallel = parallelizer.Parallelizer
+
+ def __init__(self, thing, completion_file_name=None):
+ self._thing = thing
+ self._completion_file_name = completion_file_name
+ self.helper = ParallelizerTestObjectHelper(thing)
+
+ @staticmethod
+ def doReturn(what):
+ return what
+
+ @classmethod
+ def doRaise(cls, what):
+ raise what
+
+ def doSetTheThing(self, new_thing):
+ self._thing = new_thing
+
+ def doReturnTheThing(self):
+ return self._thing
+
+ def doRaiseTheThing(self):
+ raise self._thing
+
+ def doRaiseIfExceptionElseSleepFor(self, sleep_duration):
+ if isinstance(self._thing, Exception):
+ raise self._thing
+ time.sleep(sleep_duration)
+ self._write_completion_file()
+ return self._thing
+
+ def _write_completion_file(self):
+ if self._completion_file_name and len(self._completion_file_name):
+ with open(self._completion_file_name, 'w+b') as completion_file:
+ completion_file.write('complete')
+
+ def __getitem__(self, index):
+ return self._thing[index]
+
+ def __str__(self):
+ return type(self).__name__
+
+
+class ParallelizerTestObjectHelper(object):
+
+ def __init__(self, thing):
+ self._thing = thing
+
+ def doReturnStringThing(self):
+ return str(self._thing)
+
+
+class ParallelizerTest(unittest.TestCase):
+
+ def testInitEmptyList(self):
+ r = parallelizer.Parallelizer([]).replace('a', 'b').pGet(0.1)
+ self.assertEquals([], r)
+
+ def testMethodCall(self):
+ test_data = ['abc_foo', 'def_foo', 'ghi_foo']
+ expected = ['abc_bar', 'def_bar', 'ghi_bar']
+ r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1)
+ self.assertEquals(expected, r)
+
+ def testMutate(self):
+ devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
+ self.assertTrue(all(d.doReturnTheThing() for d in devices))
+ ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1)
+ self.assertTrue(not any(d.doReturnTheThing() for d in devices))
+
+ def testAllReturn(self):
+ devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
+ results = ParallelizerTestObject.parallel(
+ devices).doReturnTheThing().pGet(1)
+ self.assertTrue(isinstance(results, list))
+ self.assertEquals(10, len(results))
+ self.assertTrue(all(results))
+
+ def testAllRaise(self):
+ devices = [ParallelizerTestObject(Exception('thing %d' % i))
+ for i in xrange(0, 10)]
+ p = ParallelizerTestObject.parallel(devices).doRaiseTheThing()
+ with self.assertRaises(Exception):
+ p.pGet(1)
+
+ def testOneFailOthersComplete(self):
+ parallel_device_count = 10
+ exception_index = 7
+ exception_msg = 'thing %d' % exception_index
+
+ try:
+ completion_files = [tempfile.NamedTemporaryFile(delete=False)
+ for _ in xrange(0, parallel_device_count)]
+ devices = [
+ ParallelizerTestObject(
+ i if i != exception_index else Exception(exception_msg),
+ completion_files[i].name)
+ for i in xrange(0, parallel_device_count)]
+ for f in completion_files:
+ f.close()
+ p = ParallelizerTestObject.parallel(devices)
+ with self.assertRaises(Exception) as e:
+ p.doRaiseIfExceptionElseSleepFor(2).pGet(3)
+ self.assertTrue(exception_msg in str(e.exception))
+ for i in xrange(0, parallel_device_count):
+ with open(completion_files[i].name) as f:
+ if i == exception_index:
+ self.assertEquals('', f.read())
+ else:
+ self.assertEquals('complete', f.read())
+ finally:
+ for f in completion_files:
+ os.remove(f.name)
+
+ def testReusable(self):
+ devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
+ p = ParallelizerTestObject.parallel(devices)
+ results = p.doReturn(True).pGet(1)
+ self.assertTrue(all(results))
+ results = p.doReturn(True).pGet(1)
+ self.assertTrue(all(results))
+ with self.assertRaises(Exception):
+ results = p.doRaise(Exception('reusableTest')).pGet(1)
+
+ def testContained(self):
+ devices = [ParallelizerTestObject(i) for i in xrange(0, 10)]
+ results = (ParallelizerTestObject.parallel(devices).helper
+ .doReturnStringThing().pGet(1))
+ self.assertTrue(isinstance(results, list))
+ self.assertEquals(10, len(results))
+ for i in xrange(0, 10):
+ self.assertEquals(str(i), results[i])
+
+ def testGetItem(self):
+ devices = [ParallelizerTestObject(range(i, i + 10)) for i in xrange(0, 10)]
+ results = ParallelizerTestObject.parallel(devices)[9].pGet(1)
+ self.assertEquals(range(9, 19), results)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
+