diff options
author | Yuheng Long <yuhenglong@google.com> | 2013-07-22 13:51:17 -0700 |
---|---|---|
committer | ChromeBot <chrome-bot@google.com> | 2013-07-25 17:25:37 -0700 |
commit | a5712a2c71aa665dcca808963d152228890c8364 (patch) | |
tree | 176ebb146015c9275bae9f5f66ecc761a42dacd2 /bestflags | |
parent | fefa5c0174b32eb359eca91acebab772356a4473 (diff) | |
download | toolchain-utils-a5712a2c71aa665dcca808963d152228890c8364.tar.gz |
Add the steering stage of the framework.
BUG=None
TEST=unit testings for the pipeline stage, pipeline workers, generation and
steering.
Change-Id: Id92bcf04ee24dfbc918f59ac8d87d30ee69e47b3
Reviewed-on: https://gerrit-int.chromium.org/41454
Reviewed-by: Simon Que <sque@google.com>
Reviewed-by: Luis Lozano <llozano@chromium.org>
Commit-Queue: Yuheng Long <yuhenglong@google.com>
Tested-by: Yuheng Long <yuhenglong@google.com>
Diffstat (limited to 'bestflags')
-rw-r--r-- | bestflags/generation.py | 25 | ||||
-rw-r--r-- | bestflags/generation_test.py | 24 | ||||
-rw-r--r-- | bestflags/mock_task.py | 22 | ||||
-rw-r--r-- | bestflags/steering.py | 114 | ||||
-rw-r--r-- | bestflags/steering_test.py | 159 |
5 files changed, 287 insertions, 57 deletions
diff --git a/bestflags/generation.py b/bestflags/generation.py index f1a63631..7f0e94a1 100644 --- a/bestflags/generation.py +++ b/bestflags/generation.py @@ -85,19 +85,26 @@ class Generation(object): Args: task: A task that has its results ready. + + Returns: + Whether the input task belongs to this generation. """ - # If there is a match. - if task in self._exe_pool: - # Remove the place holder task in this generation and store the new input - # task and its result. - self._exe_pool.remove(task) - self._exe_pool.add(task) + # If there is a match, the input task belongs to this generation. + if task not in self._exe_pool: + return False + + # Remove the place holder task in this generation and store the new input + # task and its result. + self._exe_pool.remove(task) + self._exe_pool.add(task) + + # The current generation will have one less task to wait on. + self._pending -= 1 - # The current generation will have one less task to wait on. - self._pending -= 1 + assert self._pending >= 0 - assert self._pending >= 0 + return True def Improve(self): """True if this generation has improvement over its parent generation. diff --git a/bestflags/generation_test.py b/bestflags/generation_test.py index 748a9886..1249f7dd 100644 --- a/bestflags/generation_test.py +++ b/bestflags/generation_test.py @@ -13,7 +13,7 @@ import random import unittest from generation import Generation -from mock_task import MockTask +from mock_task import IdentifierMockTask # Pick an integer at random. @@ -27,24 +27,6 @@ NUMTASKS = 20 STRIDE = 7 -class GenerationMockTask(MockTask): - """This class defines the mock task to test the Generation class. - - The task instances will be inserted into a set. Therefore the hash and the - equal methods are overridden. The generation class considers the identifier to - set the cost of the task in a set, thus the identifier is used in the - overriding methods. - """ - - def __hash__(self): - return self._identifier - - def __eq__(self, other): - if isinstance(other, MockTask): - return self._identifier == other.GetIdentifier(self._stage) - return False - - class GenerationTest(unittest.TestCase): """This class test the Generation class. @@ -66,7 +48,7 @@ class GenerationTest(unittest.TestCase): testing_tasks = range(NUMTASKS) # The tasks for the generation to be tested. - generation_tasks = [GenerationMockTask(TESTSTAGE, t) for t in testing_tasks] + generation_tasks = [IdentifierMockTask(TESTSTAGE, t) for t in testing_tasks] gen = Generation(set(generation_tasks), None) @@ -81,7 +63,7 @@ class GenerationTest(unittest.TestCase): # Mark a task as done by calling the UpdateTask method of the generation. # Send the generation the task as well as its results. - gen.UpdateTask(GenerationMockTask(TESTSTAGE, testing_task)) + gen.UpdateTask(IdentifierMockTask(TESTSTAGE, testing_task)) # The Done method should return true after all the tasks in the permuted # list is set. diff --git a/bestflags/mock_task.py b/bestflags/mock_task.py index 059536cd..d2c3726f 100644 --- a/bestflags/mock_task.py +++ b/bestflags/mock_task.py @@ -2,7 +2,7 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -"""This module defines the common mock task used by varies unit tests. +"""This module defines the common mock tasks used by various unit tests. Part of the Chrome build flags optimization. """ @@ -66,3 +66,23 @@ class MockTask(object): assert stage == self._stage return '_cost' in self.__dict__ + + def CacheSteeringCost(self): + pass + + +class IdentifierMockTask(MockTask): + """This class defines the mock task that does not consider the cost. + + The task instances will be inserted into a set. Therefore the hash and the + equal methods are overridden. The unittests that compares identities of the + tasks for equality can use this mock task instead of the base mock tack. + """ + + def __hash__(self): + return self._identifier + + def __eq__(self, other): + if isinstance(other, MockTask): + return self._identifier == other.GetIdentifier(self._stage) + return False diff --git a/bestflags/steering.py b/bestflags/steering.py index ce718a45..09c78387 100644 --- a/bestflags/steering.py +++ b/bestflags/steering.py @@ -2,31 +2,115 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -"""A Genetic Algorithm implementation for selecting good flags. +"""The framework stage that produces the next generation of tasks to run. Part of the Chrome build flags optimization. """ __author__ = 'yuhenglong@google.com (Yuheng Long)' +import pipeline_process -class Steering(object): - """The steering algorithm that produce the next generation to be run.""" - def __init__(self, steps): - """Set up the number of steps generations this algorithm should evolve. +def Steering(cache, generations, input_queue, result_queue): + """The core method template that produces the next generation of tasks to run. - Args: - steps: number of steps that the feed back loop should perform - """ + This method waits for the results of the tasks from the previous generation. + Upon the arrival of all these results, the method uses them to generate the + next generation of tasks. - self._steps = steps + The main logic of producing the next generation from previous generation is + application specific. For example, in the genetic algorithm, a task is + produced by combining two parents tasks, while in the hill climbing algorithm, + a task is generated by its immediate neighbor. The method 'Next' is overridden + in the concrete subclasses of the class Generation to produce the next + application-specific generation. The steering method invokes the 'Next' + method, produces the next generation and submits the tasks in this generation + to the next stage, e.g., the build/compilation stage. - def Run(self, generation): - """Generate a set of new generations for the next round of execution. + Args: + cache: It stores the experiments that have been conducted before. Used to + avoid duplicate works. + generations: The initial generations of tasks to be run. + input_queue: The input results from the last stage of the framework. These + results will trigger new iteration of the algorithm. + result_queue: The output task queue for this pipeline stage. The new tasks + generated by the steering algorithm will be sent to the next stage via + this queue. + """ - Args: - generation: the previous generation. - """ + # Generations that have pending tasks to be executed. Pending tasks are those + # whose results are not ready. The tasks that have their results ready are + # referenced to as ready tasks. Once there is no pending generation, the + # algorithm terminates. + waiting = generations - pass + # Record how many initial tasks there are. If there is no task at all, the + # algorithm can terminate right away. + num_tasks = 0 + + # Submit all the tasks in the initial generations to the next stage of the + # framework. The next stage can be the build/compilation stage. + for generation in generations: + # Only send the task that has not been performed before to the next stage. + for task in [task for task in generation.Pool() if task not in cache]: + result_queue.put(task) + cache.add(task) + num_tasks += 1 + + # If there is no task to be executed at all, the algorithm returns right away. + if not num_tasks: + # Inform the next stage that there will be no more task. + result_queue.put(pipeline_process.POISONPILL) + return + + # The algorithm is done if there is no pending generation. A generation is + # pending if it has pending task. + while waiting: + # Busy-waiting for the next task. + if input_queue.empty(): + continue + + # If there is a task whose result is ready from the last stage of the + # feedback loop, there will be one less pending task. + task = input_queue.get() + + # Store the result of this ready task. Intermediate results can be used to + # generate report for final result or be used to reboot from a crash from + # the failure of any module of the framework. + task.CacheSteeringCost() + + # Find out which pending generation this ready task belongs to. This pending + # generation will have one less pending task. The "next" expression iterates + # the generations in waiting until the first generation whose UpdateTask + # method returns true. + generation = next(gen for gen in waiting if gen.UpdateTask(task)) + + # If there is still any pending task, do nothing. + if not generation.Done(): + continue + + # All the tasks in the generation are finished. The generation is ready to + # produce the next generation. + waiting.remove(generation) + + # Check whether a generation should generate the next generation. + # A generation may not generate the next generation, e.g., because a + # fixpoint has been reached, there has not been any improvement for a few + # generations or a local maxima is reached. + if not generation.Improve(): + continue + + for new_generation in generation.Next(cache): + # Make sure that each generation should contain at least one task. + assert new_generation.Pool() + waiting.append(new_generation) + + # Send the tasks of the new generations to the next stage for execution. + for new_task in new_generation.Pool(): + result_queue.put(new_task) + cache.add(new_task) + + # Steering algorithm is finished and it informs the next stage that there will + # be no more task. + result_queue.put(pipeline_process.POISONPILL) diff --git a/bestflags/steering_test.py b/bestflags/steering_test.py index 828d2265..b3ce686d 100644 --- a/bestflags/steering_test.py +++ b/bestflags/steering_test.py @@ -9,28 +9,165 @@ Part of the Chrome build flags optimization. __author__ = 'yuhenglong@google.com (Yuheng Long)' +import multiprocessing import unittest +from generation import Generation +from mock_task import IdentifierMockTask +import pipeline_process import steering +# Pick an integer at random. +STEERING_TEST_STAGE = -8 + +# The number of generations to be used to do the testing. +NUMBER_OF_GENERATIONS = 20 + +# The number of tasks to be put in each generation to be tested. +NUMBER_OF_TASKS = 20 + +# The stride of permutation used to shuffle the input list of tasks. Should be +# relatively prime with NUMBER_OF_TASKS. +STRIDE = 7 + + +class MockGeneration(Generation): + """This class emulates an actual generation. + + It will output the next_generations when the method Next is called. The + next_generations is initiated when the MockGeneration instance is constructed. + """ + + def __init__(self, tasks, next_generations): + """Set up the next generations for this task. + + Args: + tasks: A set of tasks to be run. + next_generations: A list of generations as the next generation of the + current generation. + """ + Generation.__init__(self, tasks, None) + self._next_generations = next_generations + + def Next(self, _): + return self._next_generations + + def Improve(self): + if self._next_generations: + return True + return False + + class SteeringTest(unittest.TestCase): - """This class test the Steering class. + """This class test the steering method. - This steering algorithm should stop either it has generated a certain number - of generations or the generation has no further improvement. + The steering algorithm should return if there is no new task in the initial + generation. The steering algorithm should send all the tasks to the next stage + and should terminate once there is no pending generation. A generation is + pending if it contains pending task. A task is pending if its (test) result + is not ready. """ - def setUp(self): - pass + def testSteering(self): + """Test that the steering algorithm processes all the tasks properly. + + Test that the steering algorithm sends all the tasks to the next stage. Test + that the steering algorithm terminates once all the tasks have been + processed, i.e., the results for the tasks are all ready. + """ + + # A list of generations used to test the steering stage. + generations = [] + + task_index = 0 + previous_generations = None + + # Generate a sequence of generations to be tested. Each generation will + # output the next generation in reverse order of the list when the "Next" + # method is called. + for _ in range(NUMBER_OF_GENERATIONS): + # Use a consecutive sequence of numbers as identifiers for the set of + # tasks put into a generation. + test_ranges = range(task_index, task_index + NUMBER_OF_TASKS) + tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges] + steering_tasks = set(tasks) + + # Let the previous generation as the offspring generation of the current + # generation. + current_generation = MockGeneration(steering_tasks, previous_generations) + generations.insert(0, current_generation) + previous_generations = [current_generation] + + task_index += NUMBER_OF_TASKS + + # If there is no generation at all, the unittest returns right away. + if not current_generation: + return + + # Set up the input and result queue for the steering method. + manager = multiprocessing.Manager() + input_queue = manager.Queue() + result_queue = manager.Queue() + + steering_process = multiprocessing.Process(target=steering.Steering, + args=(set(), + [current_generation], + input_queue, result_queue)) + steering_process.start() + + # Test that each generation is processed properly. I.e., the generations are + # processed in order. + while generations: + generation = generations.pop(0) + tasks = [task for task in generation.Pool()] + + # Test that all the tasks are processed once and only once. + while tasks: + task = result_queue.get() + + assert task in tasks + tasks.remove(task) + + input_queue.put(task) + + task = result_queue.get() + + # Test that the steering algorithm returns properly after processing all + # the generations. + assert task == pipeline_process.POISONPILL + + steering_process.join() + + def testCache(self): + """The steering algorithm returns immediately if there is no new tasks. + + If all the new tasks have been cached before, the steering algorithm does + not have to execute these tasks again and thus can terminate right away. + """ + + # Put a set of tasks in the cache and add this set to initial generation. + test_ranges = range(NUMBER_OF_TASKS) + tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges] + steering_tasks = set(tasks) + + current_generation = MockGeneration(steering_tasks, None) + + # Set up the input and result queue for the steering method. + manager = multiprocessing.Manager() + input_queue = manager.Queue() + result_queue = manager.Queue() + + steering_process = multiprocessing.Process(target=steering.Steering, + args=(steering_tasks, + [current_generation], + input_queue, result_queue)) - def testGeneration(self): - """"Test proper termination for a number of generations.""" - pass + steering_process.start() - def testImprove(self): - """"Test proper termination for no improvement between generations.""" - pass + # Test that the steering method returns right away. + assert result_queue.get() == pipeline_process.POISONPILL + steering_process.join() if __name__ == '__main__': unittest.main() |