aboutsummaryrefslogtreecommitdiff
path: root/bestflags/pipeline_process_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'bestflags/pipeline_process_test.py')
-rw-r--r--bestflags/pipeline_process_test.py104
1 files changed, 55 insertions, 49 deletions
diff --git a/bestflags/pipeline_process_test.py b/bestflags/pipeline_process_test.py
index b9d84067..04e641ec 100644
--- a/bestflags/pipeline_process_test.py
+++ b/bestflags/pipeline_process_test.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Pipeline Process unittest.
@@ -6,7 +6,7 @@
Part of the Chrome build flags optimization.
"""
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
+__author__ = "yuhenglong@google.com (Yuheng Long)"
import multiprocessing
import unittest
@@ -14,6 +14,7 @@ import unittest
from mock_task import MockTask
import pipeline_process
+
# Pick an integer at random.
ERROR = -334
# Pick an integer at random.
@@ -21,69 +22,74 @@ TEST_STAGE = -8
def MockHelper(stage, done_dict, helper_queue, _, result_queue):
- """This method echos input to the output."""
+ """This method echos input to the output."""
- assert stage == TEST_STAGE
- while True:
- if not helper_queue.empty():
- task = helper_queue.get()
- if task == pipeline_process.POISONPILL:
- # Poison pill means shutdown
- break
+ assert stage == TEST_STAGE
+ while True:
+ if not helper_queue.empty():
+ task = helper_queue.get()
+ if task == pipeline_process.POISONPILL:
+ # Poison pill means shutdown
+ break
- if task in done_dict:
- # verify that it does not get duplicate "1"s in the test.
- result_queue.put(ERROR)
- else:
- result_queue.put(('helper', task.GetIdentifier(TEST_STAGE)))
+ if task in done_dict:
+ # verify that it does not get duplicate "1"s in the test.
+ result_queue.put(ERROR)
+ else:
+ result_queue.put(("helper", task.GetIdentifier(TEST_STAGE)))
def MockWorker(stage, task, _, result_queue):
- assert stage == TEST_STAGE
- result_queue.put(('worker', task.GetIdentifier(TEST_STAGE)))
+ assert stage == TEST_STAGE
+ result_queue.put(("worker", task.GetIdentifier(TEST_STAGE)))
class PipelineProcessTest(unittest.TestCase):
- """This class test the PipelineProcess.
+ """This class test the PipelineProcess.
- All the task inserted into the input queue should be taken out and hand to the
- actual pipeline handler, except for the POISON_PILL. All these task should
- also be passed to the next pipeline stage via the output queue.
- """
+ All the task inserted into the input queue should be taken out and hand to the
+ actual pipeline handler, except for the POISON_PILL. All these task should
+ also be passed to the next pipeline stage via the output queue.
+ """
- def testRun(self):
- """Test the run method.
+ def testRun(self):
+ """Test the run method.
- Ensure that all the tasks inserted into the queue are properly handled.
- """
+ Ensure that all the tasks inserted into the queue are properly handled.
+ """
- manager = multiprocessing.Manager()
- inp = manager.Queue()
- output = manager.Queue()
+ manager = multiprocessing.Manager()
+ inp = manager.Queue()
+ output = manager.Queue()
- process = pipeline_process.PipelineProcess(
- 2, 'testing', {}, TEST_STAGE, inp, MockHelper, MockWorker, output)
+ process = pipeline_process.PipelineProcess(
+ 2, "testing", {}, TEST_STAGE, inp, MockHelper, MockWorker, output
+ )
- process.start()
- inp.put(MockTask(TEST_STAGE, 1))
- inp.put(MockTask(TEST_STAGE, 1))
- inp.put(MockTask(TEST_STAGE, 2))
- inp.put(pipeline_process.POISONPILL)
- process.join()
+ process.start()
+ inp.put(MockTask(TEST_STAGE, 1))
+ inp.put(MockTask(TEST_STAGE, 1))
+ inp.put(MockTask(TEST_STAGE, 2))
+ inp.put(pipeline_process.POISONPILL)
+ process.join()
- # All tasks are processed once and only once.
- result = [('worker', 1), ('helper', 1), ('worker', 2),
- pipeline_process.POISONPILL]
- while result:
- task = output.get()
+ # All tasks are processed once and only once.
+ result = [
+ ("worker", 1),
+ ("helper", 1),
+ ("worker", 2),
+ pipeline_process.POISONPILL,
+ ]
+ while result:
+ task = output.get()
- # One "1"s is passed to the worker and one to the helper.
- self.assertNotEqual(task, ERROR)
+ # One "1"s is passed to the worker and one to the helper.
+ self.assertNotEqual(task, ERROR)
- # The messages received should be exactly the same as the result.
- self.assertTrue(task in result)
- result.remove(task)
+ # The messages received should be exactly the same as the result.
+ self.assertTrue(task in result)
+ result.remove(task)
-if __name__ == '__main__':
- unittest.main()
+if __name__ == "__main__":
+ unittest.main()