diff options
Diffstat (limited to 'bestflags/pipeline_process.py')
-rw-r--r-- | bestflags/pipeline_process.py | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/bestflags/pipeline_process.py b/bestflags/pipeline_process.py new file mode 100644 index 00000000..6b878b30 --- /dev/null +++ b/bestflags/pipeline_process.py @@ -0,0 +1,51 @@ +"""Pipeline process that encapsulates the actual content. + +The actual stages include the Steering algorithm, the builder and the executor. +""" + +__author__ = 'yuhenglong@google.com (Yuheng Long)' + +import multiprocessing + + +class PipelineProcess(multiprocessing.Process): + """A process that encapsulates the actual content. + + It continuously pull tasks from the queue until a poison pill is received. + Once a job is received, it will hand it to the actual stage for processing. + """ + + # Poison pill means shutdown + POISON_PILL = None + + def __init__(self, method, task_queue, result_queue): + """Set up input/output queue and the actual method to be called. + + Args: + method: The actual pipeline stage to be invoked. + task_queue: The input task queue for this pipeline stage. + result_queue: The output task queue for this pipeline stage. + """ + + multiprocessing.Process.__init__(self) + self._method = method + self._task_queue = task_queue + self._result_queue = result_queue + + def run(self): + """Busy pulling the next task from the queue for execution. + + Once a job is pulled, this stage invokes the actual stage method and submits + the result to the next pipeline stage. + + The process will terminate on receiving the poison pill from previous stage. + """ + + while True: + next_task = self.task_queue.get() + if next_task is None: + # Poison pill means shutdown + self.result_queue.put(None) + break + self._method(next_task) + self.result_queue.put(next_task) |