aboutsummaryrefslogtreecommitdiff
path: root/bestflags/steering_test.py
blob: 28a2f1082b4047bef57ddad93511e20ad9f6eb31 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Steering stage unittest.

Part of the Chrome build flags optimization.
"""

__author__ = "yuhenglong@google.com (Yuheng Long)"

import multiprocessing
import unittest

from generation import Generation
from mock_task import IdentifierMockTask
import pipeline_process
import steering


# Pick an integer at random.
STEERING_TEST_STAGE = -8

# The number of generations to be used to do the testing.
NUMBER_OF_GENERATIONS = 20

# The number of tasks to be put in each generation to be tested.
NUMBER_OF_TASKS = 20

# The stride of permutation used to shuffle the input list of tasks. Should be
# relatively prime with NUMBER_OF_TASKS.
STRIDE = 7


class MockGeneration(Generation):
    """This class emulates an actual generation.

    It will output the next_generations when the method Next is called. The
    next_generations is initiated when the MockGeneration instance is constructed.
    """

    def __init__(self, tasks, next_generations):
        """Set up the next generations for this task.

        Args:
          tasks: A set of tasks to be run.
          next_generations: A list of generations as the next generation of the
            current generation.
        """
        Generation.__init__(self, tasks, None)
        self._next_generations = next_generations

    def Next(self, _):
        return self._next_generations

    def IsImproved(self):
        if self._next_generations:
            return True
        return False


class SteeringTest(unittest.TestCase):
    """This class test the steering method.

    The steering algorithm should return if there is no new task in the initial
    generation. The steering algorithm should send all the tasks to the next stage
    and should terminate once there is no pending generation. A generation is
    pending if it contains pending task. A task is pending if its (test) result
    is not ready.
    """

    def testSteering(self):
        """Test that the steering algorithm processes all the tasks properly.

        Test that the steering algorithm sends all the tasks to the next stage. Test
        that the steering algorithm terminates once all the tasks have been
        processed, i.e., the results for the tasks are all ready.
        """

        # A list of generations used to test the steering stage.
        generations = []

        task_index = 0
        previous_generations = None

        # Generate a sequence of generations to be tested. Each generation will
        # output the next generation in reverse order of the list when the "Next"
        # method is called.
        for _ in range(NUMBER_OF_GENERATIONS):
            # Use a consecutive sequence of numbers as identifiers for the set of
            # tasks put into a generation.
            test_ranges = range(task_index, task_index + NUMBER_OF_TASKS)
            tasks = [
                IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges
            ]
            steering_tasks = set(tasks)

            # Let the previous generation as the offspring generation of the current
            # generation.
            current_generation = MockGeneration(
                steering_tasks, previous_generations
            )
            generations.insert(0, current_generation)
            previous_generations = [current_generation]

            task_index += NUMBER_OF_TASKS

        # If there is no generation at all, the unittest returns right away.
        if not current_generation:
            return

        # Set up the input and result queue for the steering method.
        manager = multiprocessing.Manager()
        input_queue = manager.Queue()
        result_queue = manager.Queue()

        steering_process = multiprocessing.Process(
            target=steering.Steering,
            args=(set(), [current_generation], input_queue, result_queue),
        )
        steering_process.start()

        # Test that each generation is processed properly. I.e., the generations are
        # processed in order.
        while generations:
            generation = generations.pop(0)
            tasks = [task for task in generation.Pool()]

            # Test that all the tasks are processed once and only once.
            while tasks:
                task = result_queue.get()

                assert task in tasks
                tasks.remove(task)

                input_queue.put(task)

        task = result_queue.get()

        # Test that the steering algorithm returns properly after processing all
        # the generations.
        assert task == pipeline_process.POISONPILL

        steering_process.join()

    def testCache(self):
        """The steering algorithm returns immediately if there is no new tasks.

        If all the new tasks have been cached before, the steering algorithm does
        not have to execute these tasks again and thus can terminate right away.
        """

        # Put a set of tasks in the cache and add this set to initial generation.
        test_ranges = range(NUMBER_OF_TASKS)
        tasks = [
            IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges
        ]
        steering_tasks = set(tasks)

        current_generation = MockGeneration(steering_tasks, None)

        # Set up the input and result queue for the steering method.
        manager = multiprocessing.Manager()
        input_queue = manager.Queue()
        result_queue = manager.Queue()

        steering_process = multiprocessing.Process(
            target=steering.Steering,
            args=(
                steering_tasks,
                [current_generation],
                input_queue,
                result_queue,
            ),
        )

        steering_process.start()

        # Test that the steering method returns right away.
        assert result_queue.get() == pipeline_process.POISONPILL
        steering_process.join()


if __name__ == "__main__":
    unittest.main()