aboutsummaryrefslogtreecommitdiff
path: root/bestflags/steering_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'bestflags/steering_test.py')
-rw-r--r--bestflags/steering_test.py244
1 files changed, 129 insertions, 115 deletions
diff --git a/bestflags/steering_test.py b/bestflags/steering_test.py
index c96e362f..28a2f108 100644
--- a/bestflags/steering_test.py
+++ b/bestflags/steering_test.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Steering stage unittest.
@@ -6,7 +6,7 @@
Part of the Chrome build flags optimization.
"""
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
+__author__ = "yuhenglong@google.com (Yuheng Long)"
import multiprocessing
import unittest
@@ -16,6 +16,7 @@ from mock_task import IdentifierMockTask
import pipeline_process
import steering
+
# Pick an integer at random.
STEERING_TEST_STAGE = -8
@@ -31,140 +32,153 @@ STRIDE = 7
class MockGeneration(Generation):
- """This class emulates an actual generation.
-
- It will output the next_generations when the method Next is called. The
- next_generations is initiated when the MockGeneration instance is constructed.
- """
-
- def __init__(self, tasks, next_generations):
- """Set up the next generations for this task.
+ """This class emulates an actual generation.
- Args:
- tasks: A set of tasks to be run.
- next_generations: A list of generations as the next generation of the
- current generation.
+ It will output the next_generations when the method Next is called. The
+ next_generations is initiated when the MockGeneration instance is constructed.
"""
- Generation.__init__(self, tasks, None)
- self._next_generations = next_generations
- def Next(self, _):
- return self._next_generations
+ def __init__(self, tasks, next_generations):
+ """Set up the next generations for this task.
- def IsImproved(self):
- if self._next_generations:
- return True
- return False
+ Args:
+ tasks: A set of tasks to be run.
+ next_generations: A list of generations as the next generation of the
+ current generation.
+ """
+ Generation.__init__(self, tasks, None)
+ self._next_generations = next_generations
+ def Next(self, _):
+ return self._next_generations
-class SteeringTest(unittest.TestCase):
- """This class test the steering method.
+ def IsImproved(self):
+ if self._next_generations:
+ return True
+ return False
- The steering algorithm should return if there is no new task in the initial
- generation. The steering algorithm should send all the tasks to the next stage
- and should terminate once there is no pending generation. A generation is
- pending if it contains pending task. A task is pending if its (test) result
- is not ready.
- """
- def testSteering(self):
- """Test that the steering algorithm processes all the tasks properly.
+class SteeringTest(unittest.TestCase):
+ """This class test the steering method.
- Test that the steering algorithm sends all the tasks to the next stage. Test
- that the steering algorithm terminates once all the tasks have been
- processed, i.e., the results for the tasks are all ready.
+ The steering algorithm should return if there is no new task in the initial
+ generation. The steering algorithm should send all the tasks to the next stage
+ and should terminate once there is no pending generation. A generation is
+ pending if it contains pending task. A task is pending if its (test) result
+ is not ready.
"""
- # A list of generations used to test the steering stage.
- generations = []
-
- task_index = 0
- previous_generations = None
-
- # Generate a sequence of generations to be tested. Each generation will
- # output the next generation in reverse order of the list when the "Next"
- # method is called.
- for _ in range(NUMBER_OF_GENERATIONS):
- # Use a consecutive sequence of numbers as identifiers for the set of
- # tasks put into a generation.
- test_ranges = range(task_index, task_index + NUMBER_OF_TASKS)
- tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
- steering_tasks = set(tasks)
-
- # Let the previous generation as the offspring generation of the current
- # generation.
- current_generation = MockGeneration(steering_tasks, previous_generations)
- generations.insert(0, current_generation)
- previous_generations = [current_generation]
-
- task_index += NUMBER_OF_TASKS
-
- # If there is no generation at all, the unittest returns right away.
- if not current_generation:
- return
-
- # Set up the input and result queue for the steering method.
- manager = multiprocessing.Manager()
- input_queue = manager.Queue()
- result_queue = manager.Queue()
-
- steering_process = multiprocessing.Process(
- target=steering.Steering,
- args=(set(), [current_generation], input_queue, result_queue))
- steering_process.start()
-
- # Test that each generation is processed properly. I.e., the generations are
- # processed in order.
- while generations:
- generation = generations.pop(0)
- tasks = [task for task in generation.Pool()]
-
- # Test that all the tasks are processed once and only once.
- while tasks:
- task = result_queue.get()
-
- assert task in tasks
- tasks.remove(task)
-
- input_queue.put(task)
+ def testSteering(self):
+ """Test that the steering algorithm processes all the tasks properly.
+
+ Test that the steering algorithm sends all the tasks to the next stage. Test
+ that the steering algorithm terminates once all the tasks have been
+ processed, i.e., the results for the tasks are all ready.
+ """
+
+ # A list of generations used to test the steering stage.
+ generations = []
+
+ task_index = 0
+ previous_generations = None
+
+ # Generate a sequence of generations to be tested. Each generation will
+ # output the next generation in reverse order of the list when the "Next"
+ # method is called.
+ for _ in range(NUMBER_OF_GENERATIONS):
+ # Use a consecutive sequence of numbers as identifiers for the set of
+ # tasks put into a generation.
+ test_ranges = range(task_index, task_index + NUMBER_OF_TASKS)
+ tasks = [
+ IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges
+ ]
+ steering_tasks = set(tasks)
+
+ # Let the previous generation as the offspring generation of the current
+ # generation.
+ current_generation = MockGeneration(
+ steering_tasks, previous_generations
+ )
+ generations.insert(0, current_generation)
+ previous_generations = [current_generation]
+
+ task_index += NUMBER_OF_TASKS
+
+ # If there is no generation at all, the unittest returns right away.
+ if not current_generation:
+ return
+
+ # Set up the input and result queue for the steering method.
+ manager = multiprocessing.Manager()
+ input_queue = manager.Queue()
+ result_queue = manager.Queue()
+
+ steering_process = multiprocessing.Process(
+ target=steering.Steering,
+ args=(set(), [current_generation], input_queue, result_queue),
+ )
+ steering_process.start()
+
+ # Test that each generation is processed properly. I.e., the generations are
+ # processed in order.
+ while generations:
+ generation = generations.pop(0)
+ tasks = [task for task in generation.Pool()]
+
+ # Test that all the tasks are processed once and only once.
+ while tasks:
+ task = result_queue.get()
+
+ assert task in tasks
+ tasks.remove(task)
+
+ input_queue.put(task)
- task = result_queue.get()
+ task = result_queue.get()
- # Test that the steering algorithm returns properly after processing all
- # the generations.
- assert task == pipeline_process.POISONPILL
+ # Test that the steering algorithm returns properly after processing all
+ # the generations.
+ assert task == pipeline_process.POISONPILL
- steering_process.join()
+ steering_process.join()
- def testCache(self):
- """The steering algorithm returns immediately if there is no new tasks.
+ def testCache(self):
+ """The steering algorithm returns immediately if there is no new tasks.
- If all the new tasks have been cached before, the steering algorithm does
- not have to execute these tasks again and thus can terminate right away.
- """
+ If all the new tasks have been cached before, the steering algorithm does
+ not have to execute these tasks again and thus can terminate right away.
+ """
- # Put a set of tasks in the cache and add this set to initial generation.
- test_ranges = range(NUMBER_OF_TASKS)
- tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
- steering_tasks = set(tasks)
+ # Put a set of tasks in the cache and add this set to initial generation.
+ test_ranges = range(NUMBER_OF_TASKS)
+ tasks = [
+ IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges
+ ]
+ steering_tasks = set(tasks)
- current_generation = MockGeneration(steering_tasks, None)
+ current_generation = MockGeneration(steering_tasks, None)
- # Set up the input and result queue for the steering method.
- manager = multiprocessing.Manager()
- input_queue = manager.Queue()
- result_queue = manager.Queue()
+ # Set up the input and result queue for the steering method.
+ manager = multiprocessing.Manager()
+ input_queue = manager.Queue()
+ result_queue = manager.Queue()
- steering_process = multiprocessing.Process(
- target=steering.Steering,
- args=(steering_tasks, [current_generation], input_queue, result_queue))
+ steering_process = multiprocessing.Process(
+ target=steering.Steering,
+ args=(
+ steering_tasks,
+ [current_generation],
+ input_queue,
+ result_queue,
+ ),
+ )
- steering_process.start()
+ steering_process.start()
- # Test that the steering method returns right away.
- assert result_queue.get() == pipeline_process.POISONPILL
- steering_process.join()
+ # Test that the steering method returns right away.
+ assert result_queue.get() == pipeline_process.POISONPILL
+ steering_process.join()
-if __name__ == '__main__':
- unittest.main()
+if __name__ == "__main__":
+ unittest.main()