aboutsummaryrefslogtreecommitdiff
path: root/bestflags/pipeline_process_test.py
blob: 04e641ecc4a931ca017a7b8d47f57c27db1bced5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Pipeline Process unittest.

Part of the Chrome build flags optimization.
"""

__author__ = "yuhenglong@google.com (Yuheng Long)"

import multiprocessing
import unittest

from mock_task import MockTask
import pipeline_process


# Pick an integer at random.
ERROR = -334
# Pick an integer at random.
TEST_STAGE = -8


def MockHelper(stage, done_dict, helper_queue, _, result_queue):
    """This method echos input to the output."""

    assert stage == TEST_STAGE
    while True:
        if not helper_queue.empty():
            task = helper_queue.get()
            if task == pipeline_process.POISONPILL:
                # Poison pill means shutdown
                break

            if task in done_dict:
                # verify that it does not get duplicate "1"s in the test.
                result_queue.put(ERROR)
            else:
                result_queue.put(("helper", task.GetIdentifier(TEST_STAGE)))


def MockWorker(stage, task, _, result_queue):
    assert stage == TEST_STAGE
    result_queue.put(("worker", task.GetIdentifier(TEST_STAGE)))


class PipelineProcessTest(unittest.TestCase):
    """This class test the PipelineProcess.

    All the task inserted into the input queue should be taken out and hand to the
    actual pipeline handler, except for the POISON_PILL.  All these task should
    also be passed to the next pipeline stage via the output queue.
    """

    def testRun(self):
        """Test the run method.

        Ensure that all the tasks inserted into the queue are properly handled.
        """

        manager = multiprocessing.Manager()
        inp = manager.Queue()
        output = manager.Queue()

        process = pipeline_process.PipelineProcess(
            2, "testing", {}, TEST_STAGE, inp, MockHelper, MockWorker, output
        )

        process.start()
        inp.put(MockTask(TEST_STAGE, 1))
        inp.put(MockTask(TEST_STAGE, 1))
        inp.put(MockTask(TEST_STAGE, 2))
        inp.put(pipeline_process.POISONPILL)
        process.join()

        # All tasks are processed once and only once.
        result = [
            ("worker", 1),
            ("helper", 1),
            ("worker", 2),
            pipeline_process.POISONPILL,
        ]
        while result:
            task = output.get()

            # One "1"s is passed to the worker and one to the helper.
            self.assertNotEqual(task, ERROR)

            # The messages received should be exactly the same as the result.
            self.assertTrue(task in result)
            result.remove(task)


if __name__ == "__main__":
    unittest.main()