diff options
Diffstat (limited to 'bestflags/pipeline_worker_test.py')
-rw-r--r-- | bestflags/pipeline_worker_test.py | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/bestflags/pipeline_worker_test.py b/bestflags/pipeline_worker_test.py new file mode 100644 index 00000000..340b3011 --- /dev/null +++ b/bestflags/pipeline_worker_test.py @@ -0,0 +1,173 @@ +"""Unittest for the pipeline_worker functions in the build/test stage. + +This module tests the helper method and the worker method. +""" + +__author__ = 'yuhenglong@google.com (Yuheng Long)' + +import multiprocessing +import random +import sys +import unittest + +import pipeline_process +import pipeline_worker + + +TESTSTAGE = 0 + + +def MockTaskCostGenerator(): + """Calls a random number generator and returns a negative number.""" + return random.randint(-sys.maxint - 1, -1) + + +class MockTask(object): + """This class emulates an actual task. + + It does not do the actual work, but simply returns the result as given when + this task is constructed. + """ + + def __init__(self, identifier, cost): + """Set up the results for this task. + + Args: + identifier: the identifier of this task. + cost: the mock cost of this task. + + The _pre_cost field stores the cost. Once this task is performed, i.e., by + calling the work method , the _cost field will have this cost. + """ + + self._identifier = identifier + self._pre_cost = cost + + def get_identifier(self, stage): + assert stage == TESTSTAGE + return self._identifier + + def __eq__(self, other): + if isinstance(other, MockTask): + return self._identifier == other._identifier and self._cost == other._cost + return False + + def set_result(self, stage, cost): + assert stage == TESTSTAGE + self._cost = cost + + def work(self, stage): + assert stage == TESTSTAGE + self._cost = self._pre_cost + + def get_result(self, stage): + assert stage == TESTSTAGE + return self._cost + + def done(self, stage): + """Indicates whether the task has been performed.""" + + assert stage == TESTSTAGE + return '_cost' in self.__dict__ + + +class AuxiliaryTest(unittest.TestCase): + """This class tests the pipeline_worker functions. + + Given the same identifier, the cost should result the same from the + pipeline_worker functions. + """ + + def testHelper(self): + """"Test the helper. + + Call the helper method twice, and test the results. The results should be + the same, i.e., the cost should be the same. + """ + + # Set up the input, helper and output queue for the helper method. + manager = multiprocessing.Manager() + helper_queue = manager.Queue() + result_queue = manager.Queue() + completed_queue = manager.Queue() + + # Set up the helper process that holds the helper method. + helper_process = multiprocessing.Process(target=pipeline_worker.helper, + args=(TESTSTAGE, {}, helper_queue, + completed_queue, + result_queue)) + helper_process.start() + + # A dictionary defines the mock result to the helper. + mock_result = {1: 1995, 2: 59, 9: 1027} + + # Test if there is a task that is done before, whether the duplicate task + # will have the same result. Here, two different scenarios are tested. That + # is the mock results are added to the completed_queue before and after the + # corresponding mock tasks being added to the input queue. + completed_queue.put((9, mock_result[9])) + + # The output of the helper should contain all the following tasks. + results = [1, 1, 2, 9] + + # Testing the correctness of having tasks having the same identifier, here + # 1. + for result in results: + helper_queue.put(MockTask(result, MockTaskCostGenerator())) + + completed_queue.put((2, mock_result[2])) + completed_queue.put((1, mock_result[1])) + + # Signal there is no more duplicate task. + helper_queue.put(pipeline_process.POISONPILL) + helper_process.join() + + while results: + task = result_queue.get() + identifier = task._identifier + cost = task._cost + self.assertTrue(identifier in results) + if identifier in mock_result: + self.assertTrue(cost, mock_result[identifier]) + results.remove(task._identifier) + + def testWorker(self): + """"Test the worker method. + + The worker should process all the input tasks and output the tasks to the + helper and result queue. + """ + + manager = multiprocessing.Manager() + result_queue = manager.Queue() + completed_queue = manager.Queue() + + # A dictionary defines the mock tasks and their corresponding results. + mock_work_tasks = {1: 86, 2: 788} + + mock_tasks = [] + + for flag, cost in mock_work_tasks.iteritems(): + mock_tasks.append(MockTask(flag, cost)) + + # Submit the mock tasks to the worker. + for mock_task in mock_tasks: + pipeline_worker.worker(TESTSTAGE, mock_task, completed_queue, + result_queue) + + # The tasks, from the output queue, should be the same as the input and + # should be performed. + for task in mock_tasks: + output = result_queue.get() + self.assertEqual(output, task) + self.assertTrue(output.done(TESTSTAGE)) + + # The tasks, from the completed_queue, should be defined in the + # mock_work_tasks dictionary. + for flag, cost in mock_work_tasks.iteritems(): + helper_input = completed_queue.get() + self.assertEqual(helper_input, (flag, cost)) + + +if __name__ == '__main__': + unittest.main() |