diff options
Diffstat (limited to 'bestflags/steering_test.py')
-rw-r--r-- | bestflags/steering_test.py | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/bestflags/steering_test.py b/bestflags/steering_test.py new file mode 100644 index 00000000..c96e362f --- /dev/null +++ b/bestflags/steering_test.py @@ -0,0 +1,170 @@ +# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""Steering stage unittest. + +Part of the Chrome build flags optimization. +""" + +__author__ = 'yuhenglong@google.com (Yuheng Long)' + +import multiprocessing +import unittest + +from generation import Generation +from mock_task import IdentifierMockTask +import pipeline_process +import steering + +# Pick an integer at random. +STEERING_TEST_STAGE = -8 + +# The number of generations to be used to do the testing. +NUMBER_OF_GENERATIONS = 20 + +# The number of tasks to be put in each generation to be tested. +NUMBER_OF_TASKS = 20 + +# The stride of permutation used to shuffle the input list of tasks. Should be +# relatively prime with NUMBER_OF_TASKS. +STRIDE = 7 + + +class MockGeneration(Generation): + """This class emulates an actual generation. + + It will output the next_generations when the method Next is called. The + next_generations is initiated when the MockGeneration instance is constructed. + """ + + def __init__(self, tasks, next_generations): + """Set up the next generations for this task. + + Args: + tasks: A set of tasks to be run. + next_generations: A list of generations as the next generation of the + current generation. + """ + Generation.__init__(self, tasks, None) + self._next_generations = next_generations + + def Next(self, _): + return self._next_generations + + def IsImproved(self): + if self._next_generations: + return True + return False + + +class SteeringTest(unittest.TestCase): + """This class test the steering method. + + The steering algorithm should return if there is no new task in the initial + generation. The steering algorithm should send all the tasks to the next stage + and should terminate once there is no pending generation. A generation is + pending if it contains pending task. A task is pending if its (test) result + is not ready. + """ + + def testSteering(self): + """Test that the steering algorithm processes all the tasks properly. + + Test that the steering algorithm sends all the tasks to the next stage. Test + that the steering algorithm terminates once all the tasks have been + processed, i.e., the results for the tasks are all ready. + """ + + # A list of generations used to test the steering stage. + generations = [] + + task_index = 0 + previous_generations = None + + # Generate a sequence of generations to be tested. Each generation will + # output the next generation in reverse order of the list when the "Next" + # method is called. + for _ in range(NUMBER_OF_GENERATIONS): + # Use a consecutive sequence of numbers as identifiers for the set of + # tasks put into a generation. + test_ranges = range(task_index, task_index + NUMBER_OF_TASKS) + tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges] + steering_tasks = set(tasks) + + # Let the previous generation as the offspring generation of the current + # generation. + current_generation = MockGeneration(steering_tasks, previous_generations) + generations.insert(0, current_generation) + previous_generations = [current_generation] + + task_index += NUMBER_OF_TASKS + + # If there is no generation at all, the unittest returns right away. + if not current_generation: + return + + # Set up the input and result queue for the steering method. + manager = multiprocessing.Manager() + input_queue = manager.Queue() + result_queue = manager.Queue() + + steering_process = multiprocessing.Process( + target=steering.Steering, + args=(set(), [current_generation], input_queue, result_queue)) + steering_process.start() + + # Test that each generation is processed properly. I.e., the generations are + # processed in order. + while generations: + generation = generations.pop(0) + tasks = [task for task in generation.Pool()] + + # Test that all the tasks are processed once and only once. + while tasks: + task = result_queue.get() + + assert task in tasks + tasks.remove(task) + + input_queue.put(task) + + task = result_queue.get() + + # Test that the steering algorithm returns properly after processing all + # the generations. + assert task == pipeline_process.POISONPILL + + steering_process.join() + + def testCache(self): + """The steering algorithm returns immediately if there is no new tasks. + + If all the new tasks have been cached before, the steering algorithm does + not have to execute these tasks again and thus can terminate right away. + """ + + # Put a set of tasks in the cache and add this set to initial generation. + test_ranges = range(NUMBER_OF_TASKS) + tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges] + steering_tasks = set(tasks) + + current_generation = MockGeneration(steering_tasks, None) + + # Set up the input and result queue for the steering method. + manager = multiprocessing.Manager() + input_queue = manager.Queue() + result_queue = manager.Queue() + + steering_process = multiprocessing.Process( + target=steering.Steering, + args=(steering_tasks, [current_generation], input_queue, result_queue)) + + steering_process.start() + + # Test that the steering method returns right away. + assert result_queue.get() == pipeline_process.POISONPILL + steering_process.join() + + +if __name__ == '__main__': + unittest.main() |