aboutsummaryrefslogtreecommitdiff
path: root/bestflags/pipeline_process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bestflags/pipeline_process.py')
-rw-r--r--bestflags/pipeline_process.py51
1 files changed, 51 insertions, 0 deletions
diff --git a/bestflags/pipeline_process.py b/bestflags/pipeline_process.py
new file mode 100644
index 00000000..6b878b30
--- /dev/null
+++ b/bestflags/pipeline_process.py
@@ -0,0 +1,51 @@
+"""Pipeline process that encapsulates the actual content.
+
+The actual stages include the Steering algorithm, the builder and the executor.
+"""
+
+__author__ = 'yuhenglong@google.com (Yuheng Long)'
+
+import multiprocessing
+
+
+class PipelineProcess(multiprocessing.Process):
+ """A process that encapsulates the actual content.
+
+ It continuously pull tasks from the queue until a poison pill is received.
+ Once a job is received, it will hand it to the actual stage for processing.
+ """
+
+ # Poison pill means shutdown
+ POISON_PILL = None
+
+ def __init__(self, method, task_queue, result_queue):
+ """Set up input/output queue and the actual method to be called.
+
+ Args:
+ method: The actual pipeline stage to be invoked.
+ task_queue: The input task queue for this pipeline stage.
+ result_queue: The output task queue for this pipeline stage.
+ """
+
+ multiprocessing.Process.__init__(self)
+ self._method = method
+ self._task_queue = task_queue
+ self._result_queue = result_queue
+
+ def run(self):
+ """Busy pulling the next task from the queue for execution.
+
+ Once a job is pulled, this stage invokes the actual stage method and submits
+ the result to the next pipeline stage.
+
+ The process will terminate on receiving the poison pill from previous stage.
+ """
+
+ while True:
+ next_task = self.task_queue.get()
+ if next_task is None:
+ # Poison pill means shutdown
+ self.result_queue.put(None)
+ break
+ self._method(next_task)
+ self.result_queue.put(next_task)