aboutsummaryrefslogtreecommitdiff
path: root/bestflags
diff options
context:
space:
mode:
authorYuheng Long <yuhenglong@google.com>2013-06-11 16:16:34 -0700
committerChromeBot <chrome-bot@google.com>2013-06-18 16:12:04 -0700
commite610c1904b8fbdb4c14c67dede25aafc02167259 (patch)
treee566e185d2e0d1d243e8ea83b120445d9208e7bf /bestflags
parentf20cffac082e3d920818f230ffc80ae6976267c0 (diff)
downloadtoolchain-utils-e610c1904b8fbdb4c14c67dede25aafc02167259.tar.gz
Have the pipeline process working. Added the unit test for this class.
BUG=None TEST=None Change-Id: I7fe9fd5b1610959399000b1dfc9b6db55c5c28fb Reviewed-on: https://gerrit-int.chromium.org/39473 Reviewed-by: Luis Lozano <llozano@chromium.org> Tested-by: Yuheng Long <yuhenglong@google.com> Commit-Queue: Yuheng Long <yuhenglong@google.com>
Diffstat (limited to 'bestflags')
-rw-r--r--bestflags/pipeline_process.py95
-rw-r--r--bestflags/pipeline_process_test.py63
2 files changed, 142 insertions, 16 deletions
diff --git a/bestflags/pipeline_process.py b/bestflags/pipeline_process.py
index 6b878b30..21d3c584 100644
--- a/bestflags/pipeline_process.py
+++ b/bestflags/pipeline_process.py
@@ -1,37 +1,76 @@
"""Pipeline process that encapsulates the actual content.
-The actual stages include the Steering algorithm, the builder and the executor.
+The actual stages include the builder and the executor.
"""
__author__ = 'yuhenglong@google.com (Yuheng Long)'
import multiprocessing
+# Pick an integer at random.
+POISONPILL = 975
+
class PipelineProcess(multiprocessing.Process):
- """A process that encapsulates the actual content.
+ """A process that encapsulates the actual content pipeline stage.
- It continuously pull tasks from the queue until a poison pill is received.
+ The actual pipeline stage can be the builder or the tester. This process
+ continuously pull tasks from the queue until a poison pill is received.
Once a job is received, it will hand it to the actual stage for processing.
- """
- # Poison pill means shutdown
- POISON_PILL = None
+ Each pipeline stage contains three modules.
+ The first module continuously pulls task from the input queue. It searches the
+ cache to check whether the task has encountered before. If so, duplicate
+ computation can be avoided.
+ The second module consists of a pool of workers that do the actual work, e.g.,
+ the worker will compile the source code and get the image in the builder
+ pipeline stage.
+ The third module is a helper that put the result cost to the cost field of the
+ duplicate tasks. For example, if two tasks are equivalent, only one task, say
+ t1 will be executed and the other task, say t2 will not be executed. The third
+ mode gets the result from t1, when it is available and set the cost of t2 to
+ be the same as that of t1.
+ """
- def __init__(self, method, task_queue, result_queue):
+ def __init__(self, num_processes, name, cache, stage, task_queue, helper,
+ worker, result_queue):
"""Set up input/output queue and the actual method to be called.
Args:
- method: The actual pipeline stage to be invoked.
+ num_processes: Number of helpers subprocessors this stage has.
+ name: The name of this stage.
+ cache: The computed tasks encountered before.
+ stage: An int value that specifies the stage for this pipeline stage, for
+ example, build stage or test stage. This value will be used to retrieve
+ the keys in different stage. I.e., the flags set is the key in build
+ stage and the checksum is the key in the test stage. The key is used to
+ detect duplicates.
task_queue: The input task queue for this pipeline stage.
+ helper: The method hosted by the helper module to fill up the cost of the
+ duplicate tasks.
+ worker: The method hosted by the worker pools to do the actual work, e.g.,
+ compile the image.
result_queue: The output task queue for this pipeline stage.
"""
multiprocessing.Process.__init__(self)
- self._method = method
+
+ self._name = name
self._task_queue = task_queue
self._result_queue = result_queue
+ self._helper = helper
+ self._worker = worker
+
+ self._cache = cache
+ self._stage = stage
+ self._num_processes = num_processes
+
+ # the queues used by the modules for communication
+ manager = multiprocessing.Manager()
+ self._helper_queue = manager.Queue()
+ self._work_queue = manager.Queue()
+
def run(self):
"""Busy pulling the next task from the queue for execution.
@@ -41,11 +80,39 @@ class PipelineProcess(multiprocessing.Process):
The process will terminate on receiving the poison pill from previous stage.
"""
+ # the worker pool
+ self._pool = multiprocessing.Pool(self._num_processes)
+
+ # the helper process
+ helper_process = multiprocessing.Process(target=self._helper,
+ args=(self._cache,
+ self._helper_queue,
+ self._work_queue,
+ self._result_queue))
+ helper_process.start()
+ mycache = self._cache.keys()
+
while True:
- next_task = self.task_queue.get()
- if next_task is None:
+ task = self._task_queue.get()
+ if task == POISONPILL:
# Poison pill means shutdown
- self.result_queue.put(None)
+ self._result_queue.put(POISONPILL)
break
- self._method(next_task)
- self.result_queue.put(next_task)
+
+ task_key = task.get_key(self._stage)
+ if task_key in mycache:
+ # The task has been encountered before. It will be sent to the helper
+ # module for further processing.
+ self._helper_queue.put(task)
+ else:
+ # Let the workers do the actual work.
+ self._pool.apply_async(self._worker, args=(task, self._work_queue,
+ self._result_queue))
+ mycache.append(task_key)
+
+ # Shutdown the workers pool and the helper process.
+ self._pool.close()
+ self._pool.join()
+
+ self._helper_queue.put(POISONPILL)
+ helper_process.join()
diff --git a/bestflags/pipeline_process_test.py b/bestflags/pipeline_process_test.py
index 8df23278..e40b5a21 100644
--- a/bestflags/pipeline_process_test.py
+++ b/bestflags/pipeline_process_test.py
@@ -2,10 +2,42 @@
__author__ = 'yuhenglong@google.com (Yuheng Long)'
+import multiprocessing
import unittest
import pipeline_process
+# Pick an integer at random.
+ERROR = -334
+
+
+def MockHelper(done_dict, helper_queue, work_queue, result_queue):
+ """This method echos input to the output."""
+ while True:
+ if not helper_queue.empty():
+ task = helper_queue.get()
+ if task == pipeline_process.POISONPILL:
+ # Poison pill means shutdown
+ break
+
+ if task in done_dict:
+ # verify that it does not get duplicate "1"s in the test.
+ result_queue.put(ERROR)
+ else:
+ result_queue.put(('helper', task.get_key(0)))
+
+
+def MockWorker(task, buffer_queue, result_queue):
+ result_queue.put(('worker', task.get_key(0)))
+
+
+class MockTask(object):
+ def __init__(self, key):
+ self._key = key
+
+ def get_key(self, stage):
+ return self._key
+
class PipelineProcessTest(unittest.TestCase):
"""This class test the PipelineProcess.
@@ -19,11 +51,38 @@ class PipelineProcessTest(unittest.TestCase):
pass
def testRun(self):
- """"Test the run method.
+ """Test the run method.
Ensure that all the tasks inserted into the queue are properly handled.
"""
- pass
+
+ manager = multiprocessing.Manager()
+ inp = manager.Queue()
+ output = manager.Queue()
+
+ process = pipeline_process.PipelineProcess(2, 'testing', {}, 'test', inp,
+ MockHelper, MockWorker, output)
+
+ process.start()
+ inp.put(MockTask(1))
+ inp.put(MockTask(1))
+ inp.put(MockTask(2))
+ inp.put(pipeline_process.POISONPILL)
+ process.join()
+
+ # All tasks are processed once and only once.
+ result = [('worker', 1), ('helper', 1), ('worker', 2),
+ pipeline_process.POISONPILL]
+ while result:
+ task = output.get()
+
+ # One "1"s is passed to the worker and one to the helper.
+ self.assertNotEqual(task, ERROR)
+
+ # The messages received should be exactly the same as the result.
+ self.assertTrue(task in result)
+ result.remove(task)
+
if __name__ == '__main__':
unittest.main()