diff options
author | Yuheng Long <yuhenglong@google.com> | 2013-06-19 09:44:24 -0700 |
---|---|---|
committer | ChromeBot <chrome-bot@google.com> | 2013-06-27 19:45:56 -0700 |
commit | 1dd70cd5565f1800b5a2133922d84022cf619ddd (patch) | |
tree | a99ac9832f6421221f5a50d7f82870b6b8d31805 /bestflags | |
parent | 5fe6dc81886f49eb2089c883c6862f2e896155cf (diff) | |
download | toolchain-utils-1dd70cd5565f1800b5a2133922d84022cf619ddd.tar.gz |
Added the logic and the test for the builder of the framework.
BUG=None
TEST=None
Change-Id: Ibf65d94c042b39f340ccaae7a9aaa16d390c8d50
Reviewed-on: https://gerrit-int.chromium.org/39864
Reviewed-by: Simon Que <sque@google.com>
Reviewed-by: Luis Lozano <llozano@chromium.org>
Commit-Queue: Yuheng Long <yuhenglong@google.com>
Tested-by: Yuheng Long <yuhenglong@google.com>
Diffstat (limited to 'bestflags')
-rw-r--r-- | bestflags/builder.py | 179 | ||||
-rw-r--r-- | bestflags/builder_test.py | 158 | ||||
-rw-r--r-- | bestflags/pipeline_process.py | 2 |
3 files changed, 267 insertions, 72 deletions
diff --git a/bestflags/builder.py b/bestflags/builder.py index a75bd425..535c1a14 100644 --- a/bestflags/builder.py +++ b/bestflags/builder.py @@ -1,59 +1,132 @@ """The Build stage of the framework. -Build the image according to the flag set. This stage sets up a number of -processes, calls the actual build method and caches the results. +This module defines the builder helper and the actual build worker. If there are +duplicate tasks, for example t1 and t2, needs to be built, one of them, for +example t1, will be built and the helper waits for the result of t1 and set the +results of the other task, t2 here, to be the same as that of t1. Setting the +result of t2 to be the same as t1 is referred to as resolving the result of t2. +The build worker invokes the compile method of the tasks that are not duplicate. """ __author__ = 'yuhenglong@google.com (Yuheng Long)' -import multiprocessing - - -class Builder(object): - """Compiling the source code to generate images using multiple processes.""" - - def __init__(self, numProcess, images): - """Set up the process pool and the images cached. - - Args: - numProcess: Maximum number of builds to run in parallel - images: Images that have been generated before - """ - if numProcess <= 0: - numProcess = 1 - self._pool = multiprocessing.Pool(numProcess) - self._images = images - - def _set_cost(self, flag_set, image, cost): - """Record the build result for the current flag_set. - - Args: - flag_set: The optimization combination - image: The result image for the build - cost: the time it takes to build the image - """ - - pass - - def _build_task(self, task): - """Compile the task and generate output. - - This stage includes compiling the input task, generating an image for the - task and computing the checksum for the image. - - Args: - task: The task to be compiled - """ - - pass - - def build(self, generation): - """Build the images for all entities in a generation. - - Call them in parallel in processes. - - Args: - generation: A new generation to be built. - """ - - self._pool.map(self._build_task, generation.task, 1) +import pipeline_process + + +def build_helper(done_dict, helper_queue, built_queue, result_queue): + """Build helper. + + This method Continuously pulls duplicate tasks from the helper_queue. The + duplicate tasks need not be compiled. This method also pulls completed tasks + from the worker queue and let the results of the duplicate tasks be the + same as their corresponding finished task. + + Args: + done_dict: A dictionary of tasks that are done. The key of the dictionary is + the optimization flags of the task. The value of the dictionary is the + compilation results of the corresponding task. + helper_queue: A queue of duplicate tasks whose results need to be resolved. + This is a communication channel between the pipeline_process and this + helper process. + built_queue: A queue of tasks that have been built. The results of these + tasks are needed to resolve the results of the duplicate tasks. This is + the communication channel between the actual build workers and this helper + process. + result_queue: After the results of the duplicate tasks have been resolved, + the duplicate tasks will be sent to the next stage via this queue. + """ + + # The list of duplicate tasks, the results of which need to be resolved. + waiting_list = [] + + while True: + # Pull duplicate task from the helper queue. + if not helper_queue.empty(): + task = helper_queue.get() + + if task == pipeline_process.POISONPILL: + # Poison pill means no more duplicate task from the helper queue. + break + + # The task has not been compiled before. + assert not task.compiled() + + # The optimization flags of this task. + flags = task.get_flags() + + # If a duplicate task come before the corresponding resolved results from + # the built_queue, it will be put in the waiting list. If the result + # arrives before the duplicate task, the duplicate task will be resolved + # right away. + if flags in done_dict: + # This task has been encountered before and the result is available. The + # result can be resolved right away. + task.set_build_result(done_dict[flags]) + result_queue.put(task) + else: + waiting_list.append(task) + + # Check and get compiled tasks from compiled_queue. + get_result_from_built_queue(built_queue, done_dict, waiting_list, + result_queue) + + # Wait to resolve the results of the remaining duplicate tasks. + while waiting_list: + get_result_from_built_queue(built_queue, done_dict, waiting_list, + result_queue) + + +def get_result_from_built_queue(built_queue, done_dict, waiting_list, + result_queue): + """Pull results from the compiled queue and resolves duplicate tasks. + + Args: + built_queue: A queue of tasks that have been built. The results of these + tasks are needed to resolve the results of the duplicate tasks. This is + the communication channel between the actual build workers and this helper + process. + done_dict: A dictionary of tasks that are done. The key of the dictionary is + the optimization flags of the task. The value of the dictionary is the + compilation results of the corresponding task. + waiting_list: The list of duplicate tasks, the results of which need to be + resolved. + result_queue: After the results of the duplicate tasks have been resolved, + the duplicate tasks will be sent to the next stage via this queue. + + This helper method tries to pull a compiled task from the compiled queue. + If it gets a task from the queue, it resolves the results of all the relevant + duplicate tasks in the waiting list. Relevant tasks are the tasks that have + the same flags as the currently received results from the built_queue. + """ + # Pull completed task from the worker queue. + if not built_queue.empty(): + (flags, build_result) = built_queue.get() + done_dict[flags] = build_result + + task_list = [t for t in waiting_list if t.get_flags() == flags] + for duplicate_task in task_list: + duplicate_task.set_build_result(build_result) + result_queue.put(duplicate_task) + waiting_list.remove(duplicate_task) + + +def build_worker(task, helper_queue, result_queue): + """Build worker. + + This method calls the compile method of the input task and distribute the + result to the helper and the next stage. + + Args: + task: Input task that needs to be built. + helper_queue: Queue that holds the completed tasks and the build results. + This is a communication channel between the worker and the helper. + result_queue: Queue that holds the completed tasks and the build results. + This is a communication channel between the worker and the next stage. + """ + + # The task has not been compiled before. + assert not task.compiled() + + task.compile() + helper_queue.put((task.get_flags(), task.get_build_result())) + result_queue.put(task) diff --git a/bestflags/builder_test.py b/bestflags/builder_test.py index 9a636ff3..a705ee1a 100644 --- a/bestflags/builder_test.py +++ b/bestflags/builder_test.py @@ -1,42 +1,164 @@ -"""Builder unittest.""" +"""Builder unittest. + +This module tests the build helper method and the worker method. +""" __author__ = 'yuhenglong@google.com (Yuheng Long)' +import multiprocessing +import random +import sys import unittest import builder +import pipeline_process + + +def MockTaskCostGenerator(): + """Calls a random number generator and returns a negative number.""" + return random.randint(-sys.maxint - 1, -1) + + +class MockTask(object): + """This class emulates an actual task. + + It does not do the actual compile, but simply returns the build result as when + this task is constructed. + """ + + def __init__(self, flags, cost): + """Set up the compile results for this task. + + Args: + flags: the optimization flags of this task. + cost: the mork build cost of this task. + + The _pre_cost field stored the 'compiled' cost. Once this task is + compiled, i.e., by calling the compile method , the _cost field will have + this 'compiled' cost. + """ + + self._flags = flags + self._pre_cost = cost + + def get_flags(self): + return self._flags + + def __eq__(self, other): + if isinstance(other, MockTask): + return self._flags == other._flags and self._cost == other._cost + return False + + def set_build_result(self, cost): + self._cost = cost + + def compile(self): + self._cost = self._pre_cost + + def get_build_result(self): + return self._cost + + def compiled(self): + """Indicates whether the task has been compiled.""" + + return '_cost' in self.__dict__ class BuilderTest(unittest.TestCase): - """This class test the Builder. + """This class tests the Builder. Given the same flags set, the image and the cost should result the same from the builder. """ - def setUp(self): - """Create the Builder to be tested.""" - - self.builder = builder.Builder(1, None) - - def testCompile(self): - """"Test the build method. + def testHelper(self): + """"Test the build helper. Call the build method twice, and test the results. The results should be the - same, i.e., the image, the cost and the checksum should be the same. - Either the compile method or the set_compile_result of the input Generation - for the Builder should be called, but not both. + same, i.e., the cost should be the same. """ - self.builder.build(self) - def testInit(self): - """"Test the init method. + # Set up the input, helper and output queue for the worker method. + manager = multiprocessing.Manager() + helper_queue = manager.Queue() + output_queue = manager.Queue() + built_queue = manager.Queue() + + # Set up the helper process that holds the helper method. + helper_process = multiprocessing.Process(target=builder.build_helper, + args=({}, helper_queue, + built_queue, output_queue)) + helper_process.start() + + # A dictionary defines the mock compile result to the build_helper. + mock_compile_result = {1: 1995, 2: 59, 9: 1027} + + # Test if there is a task that is done before, whether the duplicate task + # will have the same result. Here, two different scenarios are tested. That + # is the mock results are added to the built_queue before and after the + # corresponding mock tasks being ;added to the input queue. + built_queue.put((9, mock_compile_result[9])) + + # The output of the helper should contain all the following tasks. + results = [1, 1, 2, 9] + + # Testing the correctness of having tasks having the same flags key, here 1. + for result in results: + helper_queue.put(MockTask(result, MockTaskCostGenerator())) + + built_queue.put((2, mock_compile_result[2])) + built_queue.put((1, mock_compile_result[1])) + + # Signal there is no more duplicate task. + helper_queue.put(pipeline_process.POISONPILL) + helper_process.join() - If a certain flag set has been encountered before, the builder should not - recompile the image with the same optimization flag set. + while results: + task = output_queue.get() + flags = task._flags + cost = task._cost + self.assertTrue(flags in results) + if flags in mock_compile_result: + self.assertTrue(cost, mock_compile_result[flags]) + results.remove(task._flags) + + def testWorker(self): + """"Test the actual build worker method. + + The worker should process all the input tasks and output the tasks to the + helper and result queue. """ - pass + manager = multiprocessing.Manager() + output_queue = manager.Queue() + built_queue = manager.Queue() + + # A dictionary defines the mock tasks and their corresponding compile + # results. + mock_compile_tasks = {1: 86, 2: 788} + + mock_tasks = [] + + for flag, cost in mock_compile_tasks.iteritems(): + mock_tasks.append(MockTask(flag, cost)) + + # Submit the mock tasks to the build worker. + for mock_task in mock_tasks: + builder.build_worker(mock_task, built_queue, output_queue) + + # The tasks, from the output queue, should be the same as the input and + # should be compiled. + for task in mock_tasks: + output = output_queue.get() + self.assertEqual(output, task) + self.assertTrue(output.compiled()) + + # The tasks, from the built queue, should be defined in the + # mock_compile_tasks dictionary. + for flag, cost in mock_compile_tasks.iteritems(): + helper_input = built_queue.get() + self.assertEqual(helper_input, (flag, cost)) + if __name__ == '__main__': unittest.main() diff --git a/bestflags/pipeline_process.py b/bestflags/pipeline_process.py index 37f4a994..29d88f52 100644 --- a/bestflags/pipeline_process.py +++ b/bestflags/pipeline_process.py @@ -107,7 +107,7 @@ class PipelineProcess(multiprocessing.Process): else: # Let the workers do the actual work. work_pool.apply_async(self._worker, args=(task, self._work_queue, - self._result_queue)) + self._result_queue)) mycache.append(task_key) # Shutdown the workers pool and the helper process. |