aboutsummaryrefslogtreecommitdiff
path: root/bestflags/steering_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'bestflags/steering_test.py')
-rw-r--r--bestflags/steering_test.py170
1 files changed, 170 insertions, 0 deletions
diff --git a/bestflags/steering_test.py b/bestflags/steering_test.py
new file mode 100644
index 00000000..c96e362f
--- /dev/null
+++ b/bestflags/steering_test.py
@@ -0,0 +1,170 @@
+# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Steering stage unittest.
+
+Part of the Chrome build flags optimization.
+"""
+
+__author__ = 'yuhenglong@google.com (Yuheng Long)'
+
+import multiprocessing
+import unittest
+
+from generation import Generation
+from mock_task import IdentifierMockTask
+import pipeline_process
+import steering
+
+# Pick an integer at random.
+STEERING_TEST_STAGE = -8
+
+# The number of generations to be used to do the testing.
+NUMBER_OF_GENERATIONS = 20
+
+# The number of tasks to be put in each generation to be tested.
+NUMBER_OF_TASKS = 20
+
+# The stride of permutation used to shuffle the input list of tasks. Should be
+# relatively prime with NUMBER_OF_TASKS.
+STRIDE = 7
+
+
+class MockGeneration(Generation):
+ """This class emulates an actual generation.
+
+ It will output the next_generations when the method Next is called. The
+ next_generations is initiated when the MockGeneration instance is constructed.
+ """
+
+ def __init__(self, tasks, next_generations):
+ """Set up the next generations for this task.
+
+ Args:
+ tasks: A set of tasks to be run.
+ next_generations: A list of generations as the next generation of the
+ current generation.
+ """
+ Generation.__init__(self, tasks, None)
+ self._next_generations = next_generations
+
+ def Next(self, _):
+ return self._next_generations
+
+ def IsImproved(self):
+ if self._next_generations:
+ return True
+ return False
+
+
+class SteeringTest(unittest.TestCase):
+ """This class test the steering method.
+
+ The steering algorithm should return if there is no new task in the initial
+ generation. The steering algorithm should send all the tasks to the next stage
+ and should terminate once there is no pending generation. A generation is
+ pending if it contains pending task. A task is pending if its (test) result
+ is not ready.
+ """
+
+ def testSteering(self):
+ """Test that the steering algorithm processes all the tasks properly.
+
+ Test that the steering algorithm sends all the tasks to the next stage. Test
+ that the steering algorithm terminates once all the tasks have been
+ processed, i.e., the results for the tasks are all ready.
+ """
+
+ # A list of generations used to test the steering stage.
+ generations = []
+
+ task_index = 0
+ previous_generations = None
+
+ # Generate a sequence of generations to be tested. Each generation will
+ # output the next generation in reverse order of the list when the "Next"
+ # method is called.
+ for _ in range(NUMBER_OF_GENERATIONS):
+ # Use a consecutive sequence of numbers as identifiers for the set of
+ # tasks put into a generation.
+ test_ranges = range(task_index, task_index + NUMBER_OF_TASKS)
+ tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
+ steering_tasks = set(tasks)
+
+ # Let the previous generation as the offspring generation of the current
+ # generation.
+ current_generation = MockGeneration(steering_tasks, previous_generations)
+ generations.insert(0, current_generation)
+ previous_generations = [current_generation]
+
+ task_index += NUMBER_OF_TASKS
+
+ # If there is no generation at all, the unittest returns right away.
+ if not current_generation:
+ return
+
+ # Set up the input and result queue for the steering method.
+ manager = multiprocessing.Manager()
+ input_queue = manager.Queue()
+ result_queue = manager.Queue()
+
+ steering_process = multiprocessing.Process(
+ target=steering.Steering,
+ args=(set(), [current_generation], input_queue, result_queue))
+ steering_process.start()
+
+ # Test that each generation is processed properly. I.e., the generations are
+ # processed in order.
+ while generations:
+ generation = generations.pop(0)
+ tasks = [task for task in generation.Pool()]
+
+ # Test that all the tasks are processed once and only once.
+ while tasks:
+ task = result_queue.get()
+
+ assert task in tasks
+ tasks.remove(task)
+
+ input_queue.put(task)
+
+ task = result_queue.get()
+
+ # Test that the steering algorithm returns properly after processing all
+ # the generations.
+ assert task == pipeline_process.POISONPILL
+
+ steering_process.join()
+
+ def testCache(self):
+ """The steering algorithm returns immediately if there is no new tasks.
+
+ If all the new tasks have been cached before, the steering algorithm does
+ not have to execute these tasks again and thus can terminate right away.
+ """
+
+ # Put a set of tasks in the cache and add this set to initial generation.
+ test_ranges = range(NUMBER_OF_TASKS)
+ tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
+ steering_tasks = set(tasks)
+
+ current_generation = MockGeneration(steering_tasks, None)
+
+ # Set up the input and result queue for the steering method.
+ manager = multiprocessing.Manager()
+ input_queue = manager.Queue()
+ result_queue = manager.Queue()
+
+ steering_process = multiprocessing.Process(
+ target=steering.Steering,
+ args=(steering_tasks, [current_generation], input_queue, result_queue))
+
+ steering_process.start()
+
+ # Test that the steering method returns right away.
+ assert result_queue.get() == pipeline_process.POISONPILL
+ steering_process.join()
+
+
+if __name__ == '__main__':
+ unittest.main()