aboutsummaryrefslogtreecommitdiff
path: root/bestflags
diff options
context:
space:
mode:
authorYuheng Long <yuhenglong@google.com>2013-07-22 13:51:17 -0700
committerChromeBot <chrome-bot@google.com>2013-07-25 17:25:37 -0700
commita5712a2c71aa665dcca808963d152228890c8364 (patch)
tree176ebb146015c9275bae9f5f66ecc761a42dacd2 /bestflags
parentfefa5c0174b32eb359eca91acebab772356a4473 (diff)
downloadtoolchain-utils-a5712a2c71aa665dcca808963d152228890c8364.tar.gz
Add the steering stage of the framework.
BUG=None TEST=unit testings for the pipeline stage, pipeline workers, generation and steering. Change-Id: Id92bcf04ee24dfbc918f59ac8d87d30ee69e47b3 Reviewed-on: https://gerrit-int.chromium.org/41454 Reviewed-by: Simon Que <sque@google.com> Reviewed-by: Luis Lozano <llozano@chromium.org> Commit-Queue: Yuheng Long <yuhenglong@google.com> Tested-by: Yuheng Long <yuhenglong@google.com>
Diffstat (limited to 'bestflags')
-rw-r--r--bestflags/generation.py25
-rw-r--r--bestflags/generation_test.py24
-rw-r--r--bestflags/mock_task.py22
-rw-r--r--bestflags/steering.py114
-rw-r--r--bestflags/steering_test.py159
5 files changed, 287 insertions, 57 deletions
diff --git a/bestflags/generation.py b/bestflags/generation.py
index f1a63631..7f0e94a1 100644
--- a/bestflags/generation.py
+++ b/bestflags/generation.py
@@ -85,19 +85,26 @@ class Generation(object):
Args:
task: A task that has its results ready.
+
+ Returns:
+ Whether the input task belongs to this generation.
"""
- # If there is a match.
- if task in self._exe_pool:
- # Remove the place holder task in this generation and store the new input
- # task and its result.
- self._exe_pool.remove(task)
- self._exe_pool.add(task)
+ # If there is a match, the input task belongs to this generation.
+ if task not in self._exe_pool:
+ return False
+
+ # Remove the place holder task in this generation and store the new input
+ # task and its result.
+ self._exe_pool.remove(task)
+ self._exe_pool.add(task)
+
+ # The current generation will have one less task to wait on.
+ self._pending -= 1
- # The current generation will have one less task to wait on.
- self._pending -= 1
+ assert self._pending >= 0
- assert self._pending >= 0
+ return True
def Improve(self):
"""True if this generation has improvement over its parent generation.
diff --git a/bestflags/generation_test.py b/bestflags/generation_test.py
index 748a9886..1249f7dd 100644
--- a/bestflags/generation_test.py
+++ b/bestflags/generation_test.py
@@ -13,7 +13,7 @@ import random
import unittest
from generation import Generation
-from mock_task import MockTask
+from mock_task import IdentifierMockTask
# Pick an integer at random.
@@ -27,24 +27,6 @@ NUMTASKS = 20
STRIDE = 7
-class GenerationMockTask(MockTask):
- """This class defines the mock task to test the Generation class.
-
- The task instances will be inserted into a set. Therefore the hash and the
- equal methods are overridden. The generation class considers the identifier to
- set the cost of the task in a set, thus the identifier is used in the
- overriding methods.
- """
-
- def __hash__(self):
- return self._identifier
-
- def __eq__(self, other):
- if isinstance(other, MockTask):
- return self._identifier == other.GetIdentifier(self._stage)
- return False
-
-
class GenerationTest(unittest.TestCase):
"""This class test the Generation class.
@@ -66,7 +48,7 @@ class GenerationTest(unittest.TestCase):
testing_tasks = range(NUMTASKS)
# The tasks for the generation to be tested.
- generation_tasks = [GenerationMockTask(TESTSTAGE, t) for t in testing_tasks]
+ generation_tasks = [IdentifierMockTask(TESTSTAGE, t) for t in testing_tasks]
gen = Generation(set(generation_tasks), None)
@@ -81,7 +63,7 @@ class GenerationTest(unittest.TestCase):
# Mark a task as done by calling the UpdateTask method of the generation.
# Send the generation the task as well as its results.
- gen.UpdateTask(GenerationMockTask(TESTSTAGE, testing_task))
+ gen.UpdateTask(IdentifierMockTask(TESTSTAGE, testing_task))
# The Done method should return true after all the tasks in the permuted
# list is set.
diff --git a/bestflags/mock_task.py b/bestflags/mock_task.py
index 059536cd..d2c3726f 100644
--- a/bestflags/mock_task.py
+++ b/bestflags/mock_task.py
@@ -2,7 +2,7 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-"""This module defines the common mock task used by varies unit tests.
+"""This module defines the common mock tasks used by various unit tests.
Part of the Chrome build flags optimization.
"""
@@ -66,3 +66,23 @@ class MockTask(object):
assert stage == self._stage
return '_cost' in self.__dict__
+
+ def CacheSteeringCost(self):
+ pass
+
+
+class IdentifierMockTask(MockTask):
+ """This class defines the mock task that does not consider the cost.
+
+ The task instances will be inserted into a set. Therefore the hash and the
+ equal methods are overridden. The unittests that compares identities of the
+ tasks for equality can use this mock task instead of the base mock tack.
+ """
+
+ def __hash__(self):
+ return self._identifier
+
+ def __eq__(self, other):
+ if isinstance(other, MockTask):
+ return self._identifier == other.GetIdentifier(self._stage)
+ return False
diff --git a/bestflags/steering.py b/bestflags/steering.py
index ce718a45..09c78387 100644
--- a/bestflags/steering.py
+++ b/bestflags/steering.py
@@ -2,31 +2,115 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-"""A Genetic Algorithm implementation for selecting good flags.
+"""The framework stage that produces the next generation of tasks to run.
Part of the Chrome build flags optimization.
"""
__author__ = 'yuhenglong@google.com (Yuheng Long)'
+import pipeline_process
-class Steering(object):
- """The steering algorithm that produce the next generation to be run."""
- def __init__(self, steps):
- """Set up the number of steps generations this algorithm should evolve.
+def Steering(cache, generations, input_queue, result_queue):
+ """The core method template that produces the next generation of tasks to run.
- Args:
- steps: number of steps that the feed back loop should perform
- """
+ This method waits for the results of the tasks from the previous generation.
+ Upon the arrival of all these results, the method uses them to generate the
+ next generation of tasks.
- self._steps = steps
+ The main logic of producing the next generation from previous generation is
+ application specific. For example, in the genetic algorithm, a task is
+ produced by combining two parents tasks, while in the hill climbing algorithm,
+ a task is generated by its immediate neighbor. The method 'Next' is overridden
+ in the concrete subclasses of the class Generation to produce the next
+ application-specific generation. The steering method invokes the 'Next'
+ method, produces the next generation and submits the tasks in this generation
+ to the next stage, e.g., the build/compilation stage.
- def Run(self, generation):
- """Generate a set of new generations for the next round of execution.
+ Args:
+ cache: It stores the experiments that have been conducted before. Used to
+ avoid duplicate works.
+ generations: The initial generations of tasks to be run.
+ input_queue: The input results from the last stage of the framework. These
+ results will trigger new iteration of the algorithm.
+ result_queue: The output task queue for this pipeline stage. The new tasks
+ generated by the steering algorithm will be sent to the next stage via
+ this queue.
+ """
- Args:
- generation: the previous generation.
- """
+ # Generations that have pending tasks to be executed. Pending tasks are those
+ # whose results are not ready. The tasks that have their results ready are
+ # referenced to as ready tasks. Once there is no pending generation, the
+ # algorithm terminates.
+ waiting = generations
- pass
+ # Record how many initial tasks there are. If there is no task at all, the
+ # algorithm can terminate right away.
+ num_tasks = 0
+
+ # Submit all the tasks in the initial generations to the next stage of the
+ # framework. The next stage can be the build/compilation stage.
+ for generation in generations:
+ # Only send the task that has not been performed before to the next stage.
+ for task in [task for task in generation.Pool() if task not in cache]:
+ result_queue.put(task)
+ cache.add(task)
+ num_tasks += 1
+
+ # If there is no task to be executed at all, the algorithm returns right away.
+ if not num_tasks:
+ # Inform the next stage that there will be no more task.
+ result_queue.put(pipeline_process.POISONPILL)
+ return
+
+ # The algorithm is done if there is no pending generation. A generation is
+ # pending if it has pending task.
+ while waiting:
+ # Busy-waiting for the next task.
+ if input_queue.empty():
+ continue
+
+ # If there is a task whose result is ready from the last stage of the
+ # feedback loop, there will be one less pending task.
+ task = input_queue.get()
+
+ # Store the result of this ready task. Intermediate results can be used to
+ # generate report for final result or be used to reboot from a crash from
+ # the failure of any module of the framework.
+ task.CacheSteeringCost()
+
+ # Find out which pending generation this ready task belongs to. This pending
+ # generation will have one less pending task. The "next" expression iterates
+ # the generations in waiting until the first generation whose UpdateTask
+ # method returns true.
+ generation = next(gen for gen in waiting if gen.UpdateTask(task))
+
+ # If there is still any pending task, do nothing.
+ if not generation.Done():
+ continue
+
+ # All the tasks in the generation are finished. The generation is ready to
+ # produce the next generation.
+ waiting.remove(generation)
+
+ # Check whether a generation should generate the next generation.
+ # A generation may not generate the next generation, e.g., because a
+ # fixpoint has been reached, there has not been any improvement for a few
+ # generations or a local maxima is reached.
+ if not generation.Improve():
+ continue
+
+ for new_generation in generation.Next(cache):
+ # Make sure that each generation should contain at least one task.
+ assert new_generation.Pool()
+ waiting.append(new_generation)
+
+ # Send the tasks of the new generations to the next stage for execution.
+ for new_task in new_generation.Pool():
+ result_queue.put(new_task)
+ cache.add(new_task)
+
+ # Steering algorithm is finished and it informs the next stage that there will
+ # be no more task.
+ result_queue.put(pipeline_process.POISONPILL)
diff --git a/bestflags/steering_test.py b/bestflags/steering_test.py
index 828d2265..b3ce686d 100644
--- a/bestflags/steering_test.py
+++ b/bestflags/steering_test.py
@@ -9,28 +9,165 @@ Part of the Chrome build flags optimization.
__author__ = 'yuhenglong@google.com (Yuheng Long)'
+import multiprocessing
import unittest
+from generation import Generation
+from mock_task import IdentifierMockTask
+import pipeline_process
import steering
+# Pick an integer at random.
+STEERING_TEST_STAGE = -8
+
+# The number of generations to be used to do the testing.
+NUMBER_OF_GENERATIONS = 20
+
+# The number of tasks to be put in each generation to be tested.
+NUMBER_OF_TASKS = 20
+
+# The stride of permutation used to shuffle the input list of tasks. Should be
+# relatively prime with NUMBER_OF_TASKS.
+STRIDE = 7
+
+
+class MockGeneration(Generation):
+ """This class emulates an actual generation.
+
+ It will output the next_generations when the method Next is called. The
+ next_generations is initiated when the MockGeneration instance is constructed.
+ """
+
+ def __init__(self, tasks, next_generations):
+ """Set up the next generations for this task.
+
+ Args:
+ tasks: A set of tasks to be run.
+ next_generations: A list of generations as the next generation of the
+ current generation.
+ """
+ Generation.__init__(self, tasks, None)
+ self._next_generations = next_generations
+
+ def Next(self, _):
+ return self._next_generations
+
+ def Improve(self):
+ if self._next_generations:
+ return True
+ return False
+
+
class SteeringTest(unittest.TestCase):
- """This class test the Steering class.
+ """This class test the steering method.
- This steering algorithm should stop either it has generated a certain number
- of generations or the generation has no further improvement.
+ The steering algorithm should return if there is no new task in the initial
+ generation. The steering algorithm should send all the tasks to the next stage
+ and should terminate once there is no pending generation. A generation is
+ pending if it contains pending task. A task is pending if its (test) result
+ is not ready.
"""
- def setUp(self):
- pass
+ def testSteering(self):
+ """Test that the steering algorithm processes all the tasks properly.
+
+ Test that the steering algorithm sends all the tasks to the next stage. Test
+ that the steering algorithm terminates once all the tasks have been
+ processed, i.e., the results for the tasks are all ready.
+ """
+
+ # A list of generations used to test the steering stage.
+ generations = []
+
+ task_index = 0
+ previous_generations = None
+
+ # Generate a sequence of generations to be tested. Each generation will
+ # output the next generation in reverse order of the list when the "Next"
+ # method is called.
+ for _ in range(NUMBER_OF_GENERATIONS):
+ # Use a consecutive sequence of numbers as identifiers for the set of
+ # tasks put into a generation.
+ test_ranges = range(task_index, task_index + NUMBER_OF_TASKS)
+ tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
+ steering_tasks = set(tasks)
+
+ # Let the previous generation as the offspring generation of the current
+ # generation.
+ current_generation = MockGeneration(steering_tasks, previous_generations)
+ generations.insert(0, current_generation)
+ previous_generations = [current_generation]
+
+ task_index += NUMBER_OF_TASKS
+
+ # If there is no generation at all, the unittest returns right away.
+ if not current_generation:
+ return
+
+ # Set up the input and result queue for the steering method.
+ manager = multiprocessing.Manager()
+ input_queue = manager.Queue()
+ result_queue = manager.Queue()
+
+ steering_process = multiprocessing.Process(target=steering.Steering,
+ args=(set(),
+ [current_generation],
+ input_queue, result_queue))
+ steering_process.start()
+
+ # Test that each generation is processed properly. I.e., the generations are
+ # processed in order.
+ while generations:
+ generation = generations.pop(0)
+ tasks = [task for task in generation.Pool()]
+
+ # Test that all the tasks are processed once and only once.
+ while tasks:
+ task = result_queue.get()
+
+ assert task in tasks
+ tasks.remove(task)
+
+ input_queue.put(task)
+
+ task = result_queue.get()
+
+ # Test that the steering algorithm returns properly after processing all
+ # the generations.
+ assert task == pipeline_process.POISONPILL
+
+ steering_process.join()
+
+ def testCache(self):
+ """The steering algorithm returns immediately if there is no new tasks.
+
+ If all the new tasks have been cached before, the steering algorithm does
+ not have to execute these tasks again and thus can terminate right away.
+ """
+
+ # Put a set of tasks in the cache and add this set to initial generation.
+ test_ranges = range(NUMBER_OF_TASKS)
+ tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
+ steering_tasks = set(tasks)
+
+ current_generation = MockGeneration(steering_tasks, None)
+
+ # Set up the input and result queue for the steering method.
+ manager = multiprocessing.Manager()
+ input_queue = manager.Queue()
+ result_queue = manager.Queue()
+
+ steering_process = multiprocessing.Process(target=steering.Steering,
+ args=(steering_tasks,
+ [current_generation],
+ input_queue, result_queue))
- def testGeneration(self):
- """"Test proper termination for a number of generations."""
- pass
+ steering_process.start()
- def testImprove(self):
- """"Test proper termination for no improvement between generations."""
- pass
+ # Test that the steering method returns right away.
+ assert result_queue.get() == pipeline_process.POISONPILL
+ steering_process.join()
if __name__ == '__main__':
unittest.main()