aboutsummaryrefslogtreecommitdiff
path: root/bestflags/pipeline_process_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'bestflags/pipeline_process_test.py')
-rw-r--r--bestflags/pipeline_process_test.py63
1 files changed, 61 insertions, 2 deletions
diff --git a/bestflags/pipeline_process_test.py b/bestflags/pipeline_process_test.py
index 8df23278..e40b5a21 100644
--- a/bestflags/pipeline_process_test.py
+++ b/bestflags/pipeline_process_test.py
@@ -2,10 +2,42 @@
__author__ = 'yuhenglong@google.com (Yuheng Long)'
+import multiprocessing
import unittest
import pipeline_process
+# Pick an integer at random.
+ERROR = -334
+
+
+def MockHelper(done_dict, helper_queue, work_queue, result_queue):
+ """This method echos input to the output."""
+ while True:
+ if not helper_queue.empty():
+ task = helper_queue.get()
+ if task == pipeline_process.POISONPILL:
+ # Poison pill means shutdown
+ break
+
+ if task in done_dict:
+ # verify that it does not get duplicate "1"s in the test.
+ result_queue.put(ERROR)
+ else:
+ result_queue.put(('helper', task.get_key(0)))
+
+
+def MockWorker(task, buffer_queue, result_queue):
+ result_queue.put(('worker', task.get_key(0)))
+
+
+class MockTask(object):
+ def __init__(self, key):
+ self._key = key
+
+ def get_key(self, stage):
+ return self._key
+
class PipelineProcessTest(unittest.TestCase):
"""This class test the PipelineProcess.
@@ -19,11 +51,38 @@ class PipelineProcessTest(unittest.TestCase):
pass
def testRun(self):
- """"Test the run method.
+ """Test the run method.
Ensure that all the tasks inserted into the queue are properly handled.
"""
- pass
+
+ manager = multiprocessing.Manager()
+ inp = manager.Queue()
+ output = manager.Queue()
+
+ process = pipeline_process.PipelineProcess(2, 'testing', {}, 'test', inp,
+ MockHelper, MockWorker, output)
+
+ process.start()
+ inp.put(MockTask(1))
+ inp.put(MockTask(1))
+ inp.put(MockTask(2))
+ inp.put(pipeline_process.POISONPILL)
+ process.join()
+
+ # All tasks are processed once and only once.
+ result = [('worker', 1), ('helper', 1), ('worker', 2),
+ pipeline_process.POISONPILL]
+ while result:
+ task = output.get()
+
+ # One "1"s is passed to the worker and one to the helper.
+ self.assertNotEqual(task, ERROR)
+
+ # The messages received should be exactly the same as the result.
+ self.assertTrue(task in result)
+ result.remove(task)
+
if __name__ == '__main__':
unittest.main()