"""Unittest for the pipeline_worker functions in the build/test stage. Part of the Chrome build flags optimization. This module tests the helper method and the worker method. """ __author__ = 'yuhenglong@google.com (Yuheng Long)' import multiprocessing import random import sys import unittest import pipeline_process import pipeline_worker TESTSTAGE = 0 def MockTaskCostGenerator(): """Calls a random number generator and returns a negative number.""" return random.randint(-sys.maxint - 1, -1) class MockTask(object): """This class emulates an actual task. It does not do the actual work, but simply returns the result as given when this task is constructed. """ def __init__(self, identifier, cost): """Set up the results for this task. Args: identifier: the identifier of this task. cost: the mock cost of this task. The _pre_cost field stores the cost. Once this task is performed, i.e., by calling the work method , the _cost field will have this cost. """ self._identifier = identifier self._pre_cost = cost def get_identifier(self, stage): assert stage == TESTSTAGE return self._identifier def __eq__(self, other): if isinstance(other, MockTask): return self._identifier == other._identifier and self._cost == other._cost return False def set_result(self, stage, cost): assert stage == TESTSTAGE self._cost = cost def work(self, stage): assert stage == TESTSTAGE self._cost = self._pre_cost def get_result(self, stage): assert stage == TESTSTAGE return self._cost def done(self, stage): """Indicates whether the task has been performed.""" assert stage == TESTSTAGE return '_cost' in self.__dict__ class AuxiliaryTest(unittest.TestCase): """This class tests the pipeline_worker functions. Given the same identifier, the cost should result the same from the pipeline_worker functions. """ def testHelper(self): """"Test the helper. Call the helper method twice, and test the results. The results should be the same, i.e., the cost should be the same. """ # Set up the input, helper and output queue for the helper method. manager = multiprocessing.Manager() helper_queue = manager.Queue() result_queue = manager.Queue() completed_queue = manager.Queue() # Set up the helper process that holds the helper method. helper_process = multiprocessing.Process(target=pipeline_worker.helper, args=(TESTSTAGE, {}, helper_queue, completed_queue, result_queue)) helper_process.start() # A dictionary defines the mock result to the helper. mock_result = {1: 1995, 2: 59, 9: 1027} # Test if there is a task that is done before, whether the duplicate task # will have the same result. Here, two different scenarios are tested. That # is the mock results are added to the completed_queue before and after the # corresponding mock tasks being added to the input queue. completed_queue.put((9, mock_result[9])) # The output of the helper should contain all the following tasks. results = [1, 1, 2, 9] # Testing the correctness of having tasks having the same identifier, here # 1. for result in results: helper_queue.put(MockTask(result, MockTaskCostGenerator())) completed_queue.put((2, mock_result[2])) completed_queue.put((1, mock_result[1])) # Signal there is no more duplicate task. helper_queue.put(pipeline_process.POISONPILL) helper_process.join() while results: task = result_queue.get() identifier = task._identifier cost = task._cost self.assertTrue(identifier in results) if identifier in mock_result: self.assertTrue(cost, mock_result[identifier]) results.remove(task._identifier) def testWorker(self): """"Test the worker method. The worker should process all the input tasks and output the tasks to the helper and result queue. """ manager = multiprocessing.Manager() result_queue = manager.Queue() completed_queue = manager.Queue() # A dictionary defines the mock tasks and their corresponding results. mock_work_tasks = {1: 86, 2: 788} mock_tasks = [] for flag, cost in mock_work_tasks.iteritems(): mock_tasks.append(MockTask(flag, cost)) # Submit the mock tasks to the worker. for mock_task in mock_tasks: pipeline_worker.worker(TESTSTAGE, mock_task, completed_queue, result_queue) # The tasks, from the output queue, should be the same as the input and # should be performed. for task in mock_tasks: output = result_queue.get() self.assertEqual(output, task) self.assertTrue(output.done(TESTSTAGE)) # The tasks, from the completed_queue, should be defined in the # mock_work_tasks dictionary. for flag, cost in mock_work_tasks.iteritems(): helper_input = completed_queue.get() self.assertEqual(helper_input, (flag, cost)) if __name__ == '__main__': unittest.main()