From e610c1904b8fbdb4c14c67dede25aafc02167259 Mon Sep 17 00:00:00 2001 From: Yuheng Long Date: Tue, 11 Jun 2013 16:16:34 -0700 Subject: Have the pipeline process working. Added the unit test for this class. BUG=None TEST=None Change-Id: I7fe9fd5b1610959399000b1dfc9b6db55c5c28fb Reviewed-on: https://gerrit-int.chromium.org/39473 Reviewed-by: Luis Lozano Tested-by: Yuheng Long Commit-Queue: Yuheng Long --- bestflags/pipeline_process.py | 95 ++++++++++++++++++++++++++++++++------ bestflags/pipeline_process_test.py | 63 ++++++++++++++++++++++++- 2 files changed, 142 insertions(+), 16 deletions(-) (limited to 'bestflags') diff --git a/bestflags/pipeline_process.py b/bestflags/pipeline_process.py index 6b878b30..21d3c584 100644 --- a/bestflags/pipeline_process.py +++ b/bestflags/pipeline_process.py @@ -1,37 +1,76 @@ """Pipeline process that encapsulates the actual content. -The actual stages include the Steering algorithm, the builder and the executor. +The actual stages include the builder and the executor. """ __author__ = 'yuhenglong@google.com (Yuheng Long)' import multiprocessing +# Pick an integer at random. +POISONPILL = 975 + class PipelineProcess(multiprocessing.Process): - """A process that encapsulates the actual content. + """A process that encapsulates the actual content pipeline stage. - It continuously pull tasks from the queue until a poison pill is received. + The actual pipeline stage can be the builder or the tester. This process + continuously pull tasks from the queue until a poison pill is received. Once a job is received, it will hand it to the actual stage for processing. - """ - # Poison pill means shutdown - POISON_PILL = None + Each pipeline stage contains three modules. + The first module continuously pulls task from the input queue. It searches the + cache to check whether the task has encountered before. If so, duplicate + computation can be avoided. + The second module consists of a pool of workers that do the actual work, e.g., + the worker will compile the source code and get the image in the builder + pipeline stage. + The third module is a helper that put the result cost to the cost field of the + duplicate tasks. For example, if two tasks are equivalent, only one task, say + t1 will be executed and the other task, say t2 will not be executed. The third + mode gets the result from t1, when it is available and set the cost of t2 to + be the same as that of t1. + """ - def __init__(self, method, task_queue, result_queue): + def __init__(self, num_processes, name, cache, stage, task_queue, helper, + worker, result_queue): """Set up input/output queue and the actual method to be called. Args: - method: The actual pipeline stage to be invoked. + num_processes: Number of helpers subprocessors this stage has. + name: The name of this stage. + cache: The computed tasks encountered before. + stage: An int value that specifies the stage for this pipeline stage, for + example, build stage or test stage. This value will be used to retrieve + the keys in different stage. I.e., the flags set is the key in build + stage and the checksum is the key in the test stage. The key is used to + detect duplicates. task_queue: The input task queue for this pipeline stage. + helper: The method hosted by the helper module to fill up the cost of the + duplicate tasks. + worker: The method hosted by the worker pools to do the actual work, e.g., + compile the image. result_queue: The output task queue for this pipeline stage. """ multiprocessing.Process.__init__(self) - self._method = method + + self._name = name self._task_queue = task_queue self._result_queue = result_queue + self._helper = helper + self._worker = worker + + self._cache = cache + self._stage = stage + self._num_processes = num_processes + + # the queues used by the modules for communication + manager = multiprocessing.Manager() + self._helper_queue = manager.Queue() + self._work_queue = manager.Queue() + def run(self): """Busy pulling the next task from the queue for execution. @@ -41,11 +80,39 @@ class PipelineProcess(multiprocessing.Process): The process will terminate on receiving the poison pill from previous stage. """ + # the worker pool + self._pool = multiprocessing.Pool(self._num_processes) + + # the helper process + helper_process = multiprocessing.Process(target=self._helper, + args=(self._cache, + self._helper_queue, + self._work_queue, + self._result_queue)) + helper_process.start() + mycache = self._cache.keys() + while True: - next_task = self.task_queue.get() - if next_task is None: + task = self._task_queue.get() + if task == POISONPILL: # Poison pill means shutdown - self.result_queue.put(None) + self._result_queue.put(POISONPILL) break - self._method(next_task) - self.result_queue.put(next_task) + + task_key = task.get_key(self._stage) + if task_key in mycache: + # The task has been encountered before. It will be sent to the helper + # module for further processing. + self._helper_queue.put(task) + else: + # Let the workers do the actual work. + self._pool.apply_async(self._worker, args=(task, self._work_queue, + self._result_queue)) + mycache.append(task_key) + + # Shutdown the workers pool and the helper process. + self._pool.close() + self._pool.join() + + self._helper_queue.put(POISONPILL) + helper_process.join() diff --git a/bestflags/pipeline_process_test.py b/bestflags/pipeline_process_test.py index 8df23278..e40b5a21 100644 --- a/bestflags/pipeline_process_test.py +++ b/bestflags/pipeline_process_test.py @@ -2,10 +2,42 @@ __author__ = 'yuhenglong@google.com (Yuheng Long)' +import multiprocessing import unittest import pipeline_process +# Pick an integer at random. +ERROR = -334 + + +def MockHelper(done_dict, helper_queue, work_queue, result_queue): + """This method echos input to the output.""" + while True: + if not helper_queue.empty(): + task = helper_queue.get() + if task == pipeline_process.POISONPILL: + # Poison pill means shutdown + break + + if task in done_dict: + # verify that it does not get duplicate "1"s in the test. + result_queue.put(ERROR) + else: + result_queue.put(('helper', task.get_key(0))) + + +def MockWorker(task, buffer_queue, result_queue): + result_queue.put(('worker', task.get_key(0))) + + +class MockTask(object): + def __init__(self, key): + self._key = key + + def get_key(self, stage): + return self._key + class PipelineProcessTest(unittest.TestCase): """This class test the PipelineProcess. @@ -19,11 +51,38 @@ class PipelineProcessTest(unittest.TestCase): pass def testRun(self): - """"Test the run method. + """Test the run method. Ensure that all the tasks inserted into the queue are properly handled. """ - pass + + manager = multiprocessing.Manager() + inp = manager.Queue() + output = manager.Queue() + + process = pipeline_process.PipelineProcess(2, 'testing', {}, 'test', inp, + MockHelper, MockWorker, output) + + process.start() + inp.put(MockTask(1)) + inp.put(MockTask(1)) + inp.put(MockTask(2)) + inp.put(pipeline_process.POISONPILL) + process.join() + + # All tasks are processed once and only once. + result = [('worker', 1), ('helper', 1), ('worker', 2), + pipeline_process.POISONPILL] + while result: + task = output.get() + + # One "1"s is passed to the worker and one to the helper. + self.assertNotEqual(task, ERROR) + + # The messages received should be exactly the same as the result. + self.assertTrue(task in result) + result.remove(task) + if __name__ == '__main__': unittest.main() -- cgit v1.2.3