diff options
Diffstat (limited to 'bestflags/pipeline_process_test.py')
-rw-r--r-- | bestflags/pipeline_process_test.py | 104 |
1 files changed, 55 insertions, 49 deletions
diff --git a/bestflags/pipeline_process_test.py b/bestflags/pipeline_process_test.py index b9d84067..04e641ec 100644 --- a/bestflags/pipeline_process_test.py +++ b/bestflags/pipeline_process_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. +# Copyright 2013 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Pipeline Process unittest. @@ -6,7 +6,7 @@ Part of the Chrome build flags optimization. """ -__author__ = 'yuhenglong@google.com (Yuheng Long)' +__author__ = "yuhenglong@google.com (Yuheng Long)" import multiprocessing import unittest @@ -14,6 +14,7 @@ import unittest from mock_task import MockTask import pipeline_process + # Pick an integer at random. ERROR = -334 # Pick an integer at random. @@ -21,69 +22,74 @@ TEST_STAGE = -8 def MockHelper(stage, done_dict, helper_queue, _, result_queue): - """This method echos input to the output.""" + """This method echos input to the output.""" - assert stage == TEST_STAGE - while True: - if not helper_queue.empty(): - task = helper_queue.get() - if task == pipeline_process.POISONPILL: - # Poison pill means shutdown - break + assert stage == TEST_STAGE + while True: + if not helper_queue.empty(): + task = helper_queue.get() + if task == pipeline_process.POISONPILL: + # Poison pill means shutdown + break - if task in done_dict: - # verify that it does not get duplicate "1"s in the test. - result_queue.put(ERROR) - else: - result_queue.put(('helper', task.GetIdentifier(TEST_STAGE))) + if task in done_dict: + # verify that it does not get duplicate "1"s in the test. + result_queue.put(ERROR) + else: + result_queue.put(("helper", task.GetIdentifier(TEST_STAGE))) def MockWorker(stage, task, _, result_queue): - assert stage == TEST_STAGE - result_queue.put(('worker', task.GetIdentifier(TEST_STAGE))) + assert stage == TEST_STAGE + result_queue.put(("worker", task.GetIdentifier(TEST_STAGE))) class PipelineProcessTest(unittest.TestCase): - """This class test the PipelineProcess. + """This class test the PipelineProcess. - All the task inserted into the input queue should be taken out and hand to the - actual pipeline handler, except for the POISON_PILL. All these task should - also be passed to the next pipeline stage via the output queue. - """ + All the task inserted into the input queue should be taken out and hand to the + actual pipeline handler, except for the POISON_PILL. All these task should + also be passed to the next pipeline stage via the output queue. + """ - def testRun(self): - """Test the run method. + def testRun(self): + """Test the run method. - Ensure that all the tasks inserted into the queue are properly handled. - """ + Ensure that all the tasks inserted into the queue are properly handled. + """ - manager = multiprocessing.Manager() - inp = manager.Queue() - output = manager.Queue() + manager = multiprocessing.Manager() + inp = manager.Queue() + output = manager.Queue() - process = pipeline_process.PipelineProcess( - 2, 'testing', {}, TEST_STAGE, inp, MockHelper, MockWorker, output) + process = pipeline_process.PipelineProcess( + 2, "testing", {}, TEST_STAGE, inp, MockHelper, MockWorker, output + ) - process.start() - inp.put(MockTask(TEST_STAGE, 1)) - inp.put(MockTask(TEST_STAGE, 1)) - inp.put(MockTask(TEST_STAGE, 2)) - inp.put(pipeline_process.POISONPILL) - process.join() + process.start() + inp.put(MockTask(TEST_STAGE, 1)) + inp.put(MockTask(TEST_STAGE, 1)) + inp.put(MockTask(TEST_STAGE, 2)) + inp.put(pipeline_process.POISONPILL) + process.join() - # All tasks are processed once and only once. - result = [('worker', 1), ('helper', 1), ('worker', 2), - pipeline_process.POISONPILL] - while result: - task = output.get() + # All tasks are processed once and only once. + result = [ + ("worker", 1), + ("helper", 1), + ("worker", 2), + pipeline_process.POISONPILL, + ] + while result: + task = output.get() - # One "1"s is passed to the worker and one to the helper. - self.assertNotEqual(task, ERROR) + # One "1"s is passed to the worker and one to the helper. + self.assertNotEqual(task, ERROR) - # The messages received should be exactly the same as the result. - self.assertTrue(task in result) - result.remove(task) + # The messages received should be exactly the same as the result. + self.assertTrue(task in result) + result.remove(task) -if __name__ == '__main__': - unittest.main() +if __name__ == "__main__": + unittest.main() |