diff options
-rw-r--r-- | bestflags/builder.py | 132 | ||||
-rw-r--r-- | bestflags/builder_test.py | 164 | ||||
-rw-r--r-- | bestflags/executor.py | 64 | ||||
-rw-r--r-- | bestflags/executor_test.py | 42 | ||||
-rw-r--r-- | bestflags/pipeline_worker.py | 137 | ||||
-rw-r--r-- | bestflags/pipeline_worker_test.py | 173 |
6 files changed, 310 insertions, 402 deletions
diff --git a/bestflags/builder.py b/bestflags/builder.py deleted file mode 100644 index 535c1a14..00000000 --- a/bestflags/builder.py +++ /dev/null @@ -1,132 +0,0 @@ -"""The Build stage of the framework. - -This module defines the builder helper and the actual build worker. If there are -duplicate tasks, for example t1 and t2, needs to be built, one of them, for -example t1, will be built and the helper waits for the result of t1 and set the -results of the other task, t2 here, to be the same as that of t1. Setting the -result of t2 to be the same as t1 is referred to as resolving the result of t2. -The build worker invokes the compile method of the tasks that are not duplicate. -""" - -__author__ = 'yuhenglong@google.com (Yuheng Long)' - -import pipeline_process - - -def build_helper(done_dict, helper_queue, built_queue, result_queue): - """Build helper. - - This method Continuously pulls duplicate tasks from the helper_queue. The - duplicate tasks need not be compiled. This method also pulls completed tasks - from the worker queue and let the results of the duplicate tasks be the - same as their corresponding finished task. - - Args: - done_dict: A dictionary of tasks that are done. The key of the dictionary is - the optimization flags of the task. The value of the dictionary is the - compilation results of the corresponding task. - helper_queue: A queue of duplicate tasks whose results need to be resolved. - This is a communication channel between the pipeline_process and this - helper process. - built_queue: A queue of tasks that have been built. The results of these - tasks are needed to resolve the results of the duplicate tasks. This is - the communication channel between the actual build workers and this helper - process. - result_queue: After the results of the duplicate tasks have been resolved, - the duplicate tasks will be sent to the next stage via this queue. - """ - - # The list of duplicate tasks, the results of which need to be resolved. - waiting_list = [] - - while True: - # Pull duplicate task from the helper queue. - if not helper_queue.empty(): - task = helper_queue.get() - - if task == pipeline_process.POISONPILL: - # Poison pill means no more duplicate task from the helper queue. - break - - # The task has not been compiled before. - assert not task.compiled() - - # The optimization flags of this task. - flags = task.get_flags() - - # If a duplicate task come before the corresponding resolved results from - # the built_queue, it will be put in the waiting list. If the result - # arrives before the duplicate task, the duplicate task will be resolved - # right away. - if flags in done_dict: - # This task has been encountered before and the result is available. The - # result can be resolved right away. - task.set_build_result(done_dict[flags]) - result_queue.put(task) - else: - waiting_list.append(task) - - # Check and get compiled tasks from compiled_queue. - get_result_from_built_queue(built_queue, done_dict, waiting_list, - result_queue) - - # Wait to resolve the results of the remaining duplicate tasks. - while waiting_list: - get_result_from_built_queue(built_queue, done_dict, waiting_list, - result_queue) - - -def get_result_from_built_queue(built_queue, done_dict, waiting_list, - result_queue): - """Pull results from the compiled queue and resolves duplicate tasks. - - Args: - built_queue: A queue of tasks that have been built. The results of these - tasks are needed to resolve the results of the duplicate tasks. This is - the communication channel between the actual build workers and this helper - process. - done_dict: A dictionary of tasks that are done. The key of the dictionary is - the optimization flags of the task. The value of the dictionary is the - compilation results of the corresponding task. - waiting_list: The list of duplicate tasks, the results of which need to be - resolved. - result_queue: After the results of the duplicate tasks have been resolved, - the duplicate tasks will be sent to the next stage via this queue. - - This helper method tries to pull a compiled task from the compiled queue. - If it gets a task from the queue, it resolves the results of all the relevant - duplicate tasks in the waiting list. Relevant tasks are the tasks that have - the same flags as the currently received results from the built_queue. - """ - # Pull completed task from the worker queue. - if not built_queue.empty(): - (flags, build_result) = built_queue.get() - done_dict[flags] = build_result - - task_list = [t for t in waiting_list if t.get_flags() == flags] - for duplicate_task in task_list: - duplicate_task.set_build_result(build_result) - result_queue.put(duplicate_task) - waiting_list.remove(duplicate_task) - - -def build_worker(task, helper_queue, result_queue): - """Build worker. - - This method calls the compile method of the input task and distribute the - result to the helper and the next stage. - - Args: - task: Input task that needs to be built. - helper_queue: Queue that holds the completed tasks and the build results. - This is a communication channel between the worker and the helper. - result_queue: Queue that holds the completed tasks and the build results. - This is a communication channel between the worker and the next stage. - """ - - # The task has not been compiled before. - assert not task.compiled() - - task.compile() - helper_queue.put((task.get_flags(), task.get_build_result())) - result_queue.put(task) diff --git a/bestflags/builder_test.py b/bestflags/builder_test.py deleted file mode 100644 index a705ee1a..00000000 --- a/bestflags/builder_test.py +++ /dev/null @@ -1,164 +0,0 @@ -"""Builder unittest. - -This module tests the build helper method and the worker method. -""" - -__author__ = 'yuhenglong@google.com (Yuheng Long)' - -import multiprocessing -import random -import sys -import unittest - -import builder -import pipeline_process - - -def MockTaskCostGenerator(): - """Calls a random number generator and returns a negative number.""" - return random.randint(-sys.maxint - 1, -1) - - -class MockTask(object): - """This class emulates an actual task. - - It does not do the actual compile, but simply returns the build result as when - this task is constructed. - """ - - def __init__(self, flags, cost): - """Set up the compile results for this task. - - Args: - flags: the optimization flags of this task. - cost: the mork build cost of this task. - - The _pre_cost field stored the 'compiled' cost. Once this task is - compiled, i.e., by calling the compile method , the _cost field will have - this 'compiled' cost. - """ - - self._flags = flags - self._pre_cost = cost - - def get_flags(self): - return self._flags - - def __eq__(self, other): - if isinstance(other, MockTask): - return self._flags == other._flags and self._cost == other._cost - return False - - def set_build_result(self, cost): - self._cost = cost - - def compile(self): - self._cost = self._pre_cost - - def get_build_result(self): - return self._cost - - def compiled(self): - """Indicates whether the task has been compiled.""" - - return '_cost' in self.__dict__ - - -class BuilderTest(unittest.TestCase): - """This class tests the Builder. - - Given the same flags set, the image and the cost should result the same from - the builder. - """ - - def testHelper(self): - """"Test the build helper. - - Call the build method twice, and test the results. The results should be the - same, i.e., the cost should be the same. - """ - - # Set up the input, helper and output queue for the worker method. - manager = multiprocessing.Manager() - helper_queue = manager.Queue() - output_queue = manager.Queue() - built_queue = manager.Queue() - - # Set up the helper process that holds the helper method. - helper_process = multiprocessing.Process(target=builder.build_helper, - args=({}, helper_queue, - built_queue, output_queue)) - helper_process.start() - - # A dictionary defines the mock compile result to the build_helper. - mock_compile_result = {1: 1995, 2: 59, 9: 1027} - - # Test if there is a task that is done before, whether the duplicate task - # will have the same result. Here, two different scenarios are tested. That - # is the mock results are added to the built_queue before and after the - # corresponding mock tasks being ;added to the input queue. - built_queue.put((9, mock_compile_result[9])) - - # The output of the helper should contain all the following tasks. - results = [1, 1, 2, 9] - - # Testing the correctness of having tasks having the same flags key, here 1. - for result in results: - helper_queue.put(MockTask(result, MockTaskCostGenerator())) - - built_queue.put((2, mock_compile_result[2])) - built_queue.put((1, mock_compile_result[1])) - - # Signal there is no more duplicate task. - helper_queue.put(pipeline_process.POISONPILL) - helper_process.join() - - while results: - task = output_queue.get() - flags = task._flags - cost = task._cost - self.assertTrue(flags in results) - if flags in mock_compile_result: - self.assertTrue(cost, mock_compile_result[flags]) - results.remove(task._flags) - - def testWorker(self): - """"Test the actual build worker method. - - The worker should process all the input tasks and output the tasks to the - helper and result queue. - """ - - manager = multiprocessing.Manager() - output_queue = manager.Queue() - built_queue = manager.Queue() - - # A dictionary defines the mock tasks and their corresponding compile - # results. - mock_compile_tasks = {1: 86, 2: 788} - - mock_tasks = [] - - for flag, cost in mock_compile_tasks.iteritems(): - mock_tasks.append(MockTask(flag, cost)) - - # Submit the mock tasks to the build worker. - for mock_task in mock_tasks: - builder.build_worker(mock_task, built_queue, output_queue) - - # The tasks, from the output queue, should be the same as the input and - # should be compiled. - for task in mock_tasks: - output = output_queue.get() - self.assertEqual(output, task) - self.assertTrue(output.compiled()) - - # The tasks, from the built queue, should be defined in the - # mock_compile_tasks dictionary. - for flag, cost in mock_compile_tasks.iteritems(): - helper_input = built_queue.get() - self.assertEqual(helper_input, (flag, cost)) - - -if __name__ == '__main__': - unittest.main() diff --git a/bestflags/executor.py b/bestflags/executor.py deleted file mode 100644 index 91dd9288..00000000 --- a/bestflags/executor.py +++ /dev/null @@ -1,64 +0,0 @@ -"""The Execution stage of the framework. - -Execute the image against a set of benchmarks. This stage sets up a number of -processes, calls the actual execute method and caches the results. -""" - -__author__ = 'yuhenglong@google.com (Yuheng Long)' - -import multiprocessing - - -class Tester(object): - """Execute the generated images against a set of benchmark applications.""" - - def __init__(self, numProcess, costs): - """Set up the process pool and the results cached. - - Args: - numProcess: Maximum number of execution to run in parallel - costs: Executions that have been benchmarked before - """ - - self._pool = multiprocessing.Pool(numProcess) - self._costs = costs - - def _set_cost(self, image, cost): - """Record the execution result for the current image. - - Args: - image: The input image for the execution - cost: the time it takes to execute the image - """ - - pass - - def _execute(self, task): - """Execute the benchmarks on task. - - The concrete subclass should implement the actual execution. - - Args: - task: The input task for the execution - """ - # raise Exception('Must be implemented in child class') - pass - - def _execute_task(self, task): - """Execute the input task and record the cost. - - Args: - task: The task to be compiled - """ - pass - - def execute(self, generation): - """Execute the image for all entities in a generation. - - Call them in parallel in processes. - - Args: - generation: A new generation to be executed. - """ - - self._pool.map(self._execute_task, generation.task, 1) diff --git a/bestflags/executor_test.py b/bestflags/executor_test.py deleted file mode 100644 index 0f27fb9a..00000000 --- a/bestflags/executor_test.py +++ /dev/null @@ -1,42 +0,0 @@ -"""Tester unittest.""" - -__author__ = 'yuhenglong@google.com (Yuheng Long)' - -import unittest - -import executor - - -class TesterTest(unittest.TestCase): - """This class test the Executor. - - Given the same flags set and/or checksum, the image and the cost should be the - same from the Executor. - """ - - def setUp(self): - """Create the Executor to be tested.""" - - self.tester = executor.Tester(1, None) - - def testExecute(self): - """"Test the execute method. - - Call the execute method twice, and test the results. The results should be - the same, i.e., the cost should be the same. - Either the execute method or the set_execution_result of the input - Generation for the Tester should be called, but not both. - """ - self.tester.execute(self) - - def testInit(self): - """"Test the init method. - - If a certain checksum has been encountered before, the Tester should not - reexecute the images with the same checksum. - """ - - pass - -if __name__ == '__main__': - unittest.main() diff --git a/bestflags/pipeline_worker.py b/bestflags/pipeline_worker.py new file mode 100644 index 00000000..6ab965e2 --- /dev/null +++ b/bestflags/pipeline_worker.py @@ -0,0 +1,137 @@ +"""The pipeline_worker functions of the build and test stage of the framework. + +This module defines the helper and the worker. If there are duplicate tasks, for +example t1 and t2, needs to be built/tested, one of them, for example t1, will +be built/tested and the helper waits for the result of t1 and set the results of +the other task, t2 here, to be the same as that of t1. Setting the result of t2 +to be the same as t1 is referred to as resolving the result of t2. +The worker invokes the work method of the tasks that are not duplicate. +""" + +__author__ = 'yuhenglong@google.com (Yuheng Long)' + +import pipeline_process + + +def helper(stage, done_dict, helper_queue, completed_queue, result_queue): + """Helper that filters duplicate tasks. + + This method Continuously pulls duplicate tasks from the helper_queue. The + duplicate tasks need not be compiled/tested. This method also pulls completed + tasks from the worker queue and let the results of the duplicate tasks be the + same as their corresponding finished task. + + Args: + stage: The current stage of the pipeline, for example, build stage or test + stage. + done_dict: A dictionary of tasks that are done. The key of the dictionary is + the identifier of the task. The value of the dictionary is the results of + performing the corresponding task. + helper_queue: A queue of duplicate tasks whose results need to be resolved. + This is a communication channel between the pipeline_process and this + helper process. + completed_queue: A queue of tasks that have been built/tested. The results + of these tasks are needed to resolve the results of the duplicate tasks. + This is the communication channel between the workers and this helper + process. + result_queue: After the results of the duplicate tasks have been resolved, + the duplicate tasks will be sent to the next stage via this queue. + """ + + # The list of duplicate tasks, the results of which need to be resolved. + waiting_list = [] + + while True: + # Pull duplicate task from the helper queue. + if not helper_queue.empty(): + task = helper_queue.get() + + if task == pipeline_process.POISONPILL: + # Poison pill means no more duplicate task from the helper queue. + break + + # The task has not been performed before. + assert not task.done(stage) + + # The identifier of this task. + identifier = task.get_identifier(stage) + + # If a duplicate task comes before the corresponding resolved results from + # the completed_queue, it will be put in the waiting list. If the result + # arrives before the duplicate task, the duplicate task will be resolved + # right away. + if identifier in done_dict: + # This task has been encountered before and the result is available. The + # result can be resolved right away. + task.set_result(stage, done_dict[identifier]) + result_queue.put(task) + else: + waiting_list.append(task) + + # Check and get completed tasks from completed_queue. + get_result_from_completed_queue(stage, completed_queue, done_dict, + waiting_list, result_queue) + + # Wait to resolve the results of the remaining duplicate tasks. + while waiting_list: + get_result_from_completed_queue(stage, completed_queue, done_dict, + waiting_list, result_queue) + + +def get_result_from_completed_queue(stage, completed_queue, done_dict, + waiting_list, result_queue): + """Pull results from the completed queue and resolves duplicate tasks. + + Args: + stage: The current stage of the pipeline, for example, build stage or test + stage. + completed_queue: A queue of tasks that have been performed. The results of + these tasks are needed to resolve the results of the duplicate tasks. This + is the communication channel between the workers and this method. + done_dict: A dictionary of tasks that are done. The key of the dictionary is + the optimization flags of the task. The value of the dictionary is the + compilation results of the corresponding task. + waiting_list: The list of duplicate tasks, the results of which need to be + resolved. + result_queue: After the results of the duplicate tasks have been resolved, + the duplicate tasks will be sent to the next stage via this queue. + + This helper method tries to pull a completed task from the completed queue. + If it gets a task from the queue, it resolves the results of all the relevant + duplicate tasks in the waiting list. Relevant tasks are the tasks that have + the same flags as the currently received results from the completed_queue. + """ + # Pull completed task from the worker queue. + if not completed_queue.empty(): + (identifier, result) = completed_queue.get() + done_dict[identifier] = result + + tasks = [t for t in waiting_list if t.get_identifier(stage) == identifier] + for duplicate_task in tasks: + duplicate_task.set_result(stage, result) + result_queue.put(duplicate_task) + waiting_list.remove(duplicate_task) + + +def worker(stage, task, helper_queue, result_queue): + """Worker that performs the task. + + This method calls the work method of the input task and distribute the result + to the helper and the next stage. + + Args: + stage: The current stage of the pipeline, for example, build stage or test + stage. + task: Input task that needs to be performed. + helper_queue: Queue that holds the completed tasks and the results. This is + the communication channel between the worker and the helper. + result_queue: Queue that holds the completed tasks and the results. This is + the communication channel between the worker and the next stage. + """ + + # The task has not been completed before. + assert not task.done(stage) + + task.work(stage) + helper_queue.put((task.get_identifier(stage), task.get_result(stage))) + result_queue.put(task) diff --git a/bestflags/pipeline_worker_test.py b/bestflags/pipeline_worker_test.py new file mode 100644 index 00000000..340b3011 --- /dev/null +++ b/bestflags/pipeline_worker_test.py @@ -0,0 +1,173 @@ +"""Unittest for the pipeline_worker functions in the build/test stage. + +This module tests the helper method and the worker method. +""" + +__author__ = 'yuhenglong@google.com (Yuheng Long)' + +import multiprocessing +import random +import sys +import unittest + +import pipeline_process +import pipeline_worker + + +TESTSTAGE = 0 + + +def MockTaskCostGenerator(): + """Calls a random number generator and returns a negative number.""" + return random.randint(-sys.maxint - 1, -1) + + +class MockTask(object): + """This class emulates an actual task. + + It does not do the actual work, but simply returns the result as given when + this task is constructed. + """ + + def __init__(self, identifier, cost): + """Set up the results for this task. + + Args: + identifier: the identifier of this task. + cost: the mock cost of this task. + + The _pre_cost field stores the cost. Once this task is performed, i.e., by + calling the work method , the _cost field will have this cost. + """ + + self._identifier = identifier + self._pre_cost = cost + + def get_identifier(self, stage): + assert stage == TESTSTAGE + return self._identifier + + def __eq__(self, other): + if isinstance(other, MockTask): + return self._identifier == other._identifier and self._cost == other._cost + return False + + def set_result(self, stage, cost): + assert stage == TESTSTAGE + self._cost = cost + + def work(self, stage): + assert stage == TESTSTAGE + self._cost = self._pre_cost + + def get_result(self, stage): + assert stage == TESTSTAGE + return self._cost + + def done(self, stage): + """Indicates whether the task has been performed.""" + + assert stage == TESTSTAGE + return '_cost' in self.__dict__ + + +class AuxiliaryTest(unittest.TestCase): + """This class tests the pipeline_worker functions. + + Given the same identifier, the cost should result the same from the + pipeline_worker functions. + """ + + def testHelper(self): + """"Test the helper. + + Call the helper method twice, and test the results. The results should be + the same, i.e., the cost should be the same. + """ + + # Set up the input, helper and output queue for the helper method. + manager = multiprocessing.Manager() + helper_queue = manager.Queue() + result_queue = manager.Queue() + completed_queue = manager.Queue() + + # Set up the helper process that holds the helper method. + helper_process = multiprocessing.Process(target=pipeline_worker.helper, + args=(TESTSTAGE, {}, helper_queue, + completed_queue, + result_queue)) + helper_process.start() + + # A dictionary defines the mock result to the helper. + mock_result = {1: 1995, 2: 59, 9: 1027} + + # Test if there is a task that is done before, whether the duplicate task + # will have the same result. Here, two different scenarios are tested. That + # is the mock results are added to the completed_queue before and after the + # corresponding mock tasks being added to the input queue. + completed_queue.put((9, mock_result[9])) + + # The output of the helper should contain all the following tasks. + results = [1, 1, 2, 9] + + # Testing the correctness of having tasks having the same identifier, here + # 1. + for result in results: + helper_queue.put(MockTask(result, MockTaskCostGenerator())) + + completed_queue.put((2, mock_result[2])) + completed_queue.put((1, mock_result[1])) + + # Signal there is no more duplicate task. + helper_queue.put(pipeline_process.POISONPILL) + helper_process.join() + + while results: + task = result_queue.get() + identifier = task._identifier + cost = task._cost + self.assertTrue(identifier in results) + if identifier in mock_result: + self.assertTrue(cost, mock_result[identifier]) + results.remove(task._identifier) + + def testWorker(self): + """"Test the worker method. + + The worker should process all the input tasks and output the tasks to the + helper and result queue. + """ + + manager = multiprocessing.Manager() + result_queue = manager.Queue() + completed_queue = manager.Queue() + + # A dictionary defines the mock tasks and their corresponding results. + mock_work_tasks = {1: 86, 2: 788} + + mock_tasks = [] + + for flag, cost in mock_work_tasks.iteritems(): + mock_tasks.append(MockTask(flag, cost)) + + # Submit the mock tasks to the worker. + for mock_task in mock_tasks: + pipeline_worker.worker(TESTSTAGE, mock_task, completed_queue, + result_queue) + + # The tasks, from the output queue, should be the same as the input and + # should be performed. + for task in mock_tasks: + output = result_queue.get() + self.assertEqual(output, task) + self.assertTrue(output.done(TESTSTAGE)) + + # The tasks, from the completed_queue, should be defined in the + # mock_work_tasks dictionary. + for flag, cost in mock_work_tasks.iteritems(): + helper_input = completed_queue.get() + self.assertEqual(helper_input, (flag, cost)) + + +if __name__ == '__main__': + unittest.main() |