diff options
Diffstat (limited to 'systrace/catapult/devil/devil/utils/parallelizer_test.py')
-rw-r--r-- | systrace/catapult/devil/devil/utils/parallelizer_test.py | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/systrace/catapult/devil/devil/utils/parallelizer_test.py b/systrace/catapult/devil/devil/utils/parallelizer_test.py new file mode 100644 index 0000000..32ff7ec --- /dev/null +++ b/systrace/catapult/devil/devil/utils/parallelizer_test.py @@ -0,0 +1,162 @@ +# Copyright 2014 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Unit tests for the contents of parallelizer.py.""" + +# pylint: disable=W0212 +# pylint: disable=W0613 + +import os +import tempfile +import time +import unittest + +from devil.utils import parallelizer + + +class ParallelizerTestObject(object): + """Class used to test parallelizer.Parallelizer.""" + + parallel = parallelizer.Parallelizer + + def __init__(self, thing, completion_file_name=None): + self._thing = thing + self._completion_file_name = completion_file_name + self.helper = ParallelizerTestObjectHelper(thing) + + @staticmethod + def doReturn(what): + return what + + @classmethod + def doRaise(cls, what): + raise what + + def doSetTheThing(self, new_thing): + self._thing = new_thing + + def doReturnTheThing(self): + return self._thing + + def doRaiseTheThing(self): + raise self._thing + + def doRaiseIfExceptionElseSleepFor(self, sleep_duration): + if isinstance(self._thing, Exception): + raise self._thing + time.sleep(sleep_duration) + self._write_completion_file() + return self._thing + + def _write_completion_file(self): + if self._completion_file_name and len(self._completion_file_name): + with open(self._completion_file_name, 'w+b') as completion_file: + completion_file.write('complete') + + def __getitem__(self, index): + return self._thing[index] + + def __str__(self): + return type(self).__name__ + + +class ParallelizerTestObjectHelper(object): + + def __init__(self, thing): + self._thing = thing + + def doReturnStringThing(self): + return str(self._thing) + + +class ParallelizerTest(unittest.TestCase): + + def testInitEmptyList(self): + r = parallelizer.Parallelizer([]).replace('a', 'b').pGet(0.1) + self.assertEquals([], r) + + def testMethodCall(self): + test_data = ['abc_foo', 'def_foo', 'ghi_foo'] + expected = ['abc_bar', 'def_bar', 'ghi_bar'] + r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1) + self.assertEquals(expected, r) + + def testMutate(self): + devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)] + self.assertTrue(all(d.doReturnTheThing() for d in devices)) + ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1) + self.assertTrue(not any(d.doReturnTheThing() for d in devices)) + + def testAllReturn(self): + devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)] + results = ParallelizerTestObject.parallel( + devices).doReturnTheThing().pGet(1) + self.assertTrue(isinstance(results, list)) + self.assertEquals(10, len(results)) + self.assertTrue(all(results)) + + def testAllRaise(self): + devices = [ParallelizerTestObject(Exception('thing %d' % i)) + for i in xrange(0, 10)] + p = ParallelizerTestObject.parallel(devices).doRaiseTheThing() + with self.assertRaises(Exception): + p.pGet(1) + + def testOneFailOthersComplete(self): + parallel_device_count = 10 + exception_index = 7 + exception_msg = 'thing %d' % exception_index + + try: + completion_files = [tempfile.NamedTemporaryFile(delete=False) + for _ in xrange(0, parallel_device_count)] + devices = [ + ParallelizerTestObject( + i if i != exception_index else Exception(exception_msg), + completion_files[i].name) + for i in xrange(0, parallel_device_count)] + for f in completion_files: + f.close() + p = ParallelizerTestObject.parallel(devices) + with self.assertRaises(Exception) as e: + p.doRaiseIfExceptionElseSleepFor(2).pGet(3) + self.assertTrue(exception_msg in str(e.exception)) + for i in xrange(0, parallel_device_count): + with open(completion_files[i].name) as f: + if i == exception_index: + self.assertEquals('', f.read()) + else: + self.assertEquals('complete', f.read()) + finally: + for f in completion_files: + os.remove(f.name) + + def testReusable(self): + devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)] + p = ParallelizerTestObject.parallel(devices) + results = p.doReturn(True).pGet(1) + self.assertTrue(all(results)) + results = p.doReturn(True).pGet(1) + self.assertTrue(all(results)) + with self.assertRaises(Exception): + results = p.doRaise(Exception('reusableTest')).pGet(1) + + def testContained(self): + devices = [ParallelizerTestObject(i) for i in xrange(0, 10)] + results = (ParallelizerTestObject.parallel(devices).helper + .doReturnStringThing().pGet(1)) + self.assertTrue(isinstance(results, list)) + self.assertEquals(10, len(results)) + for i in xrange(0, 10): + self.assertEquals(str(i), results[i]) + + def testGetItem(self): + devices = [ParallelizerTestObject(range(i, i + 10)) for i in xrange(0, 10)] + results = ParallelizerTestObject.parallel(devices)[9].pGet(1) + self.assertEquals(range(9, 19), results) + + +if __name__ == '__main__': + unittest.main(verbosity=2) + |