aboutsummaryrefslogtreecommitdiff
path: root/bestflags/pipeline_process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bestflags/pipeline_process.py')
-rw-r--r--bestflags/pipeline_process.py230
1 files changed, 126 insertions, 104 deletions
diff --git a/bestflags/pipeline_process.py b/bestflags/pipeline_process.py
index 31f5f21f..3aab96fe 100644
--- a/bestflags/pipeline_process.py
+++ b/bestflags/pipeline_process.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Pipeline process that encapsulates the actual content.
@@ -8,116 +8,138 @@ Part of the Chrome build flags optimization.
The actual stages include the builder and the executor.
"""
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
+__author__ = "yuhenglong@google.com (Yuheng Long)"
import multiprocessing
+
# Pick an integer at random.
POISONPILL = 975
class PipelineProcess(multiprocessing.Process):
- """A process that encapsulates the actual content pipeline stage.
-
- The actual pipeline stage can be the builder or the tester. This process
- continuously pull tasks from the queue until a poison pill is received.
- Once a job is received, it will hand it to the actual stage for processing.
-
- Each pipeline stage contains three modules.
- The first module continuously pulls task from the input queue. It searches the
- cache to check whether the task has encountered before. If so, duplicate
- computation can be avoided.
- The second module consists of a pool of workers that do the actual work, e.g.,
- the worker will compile the source code and get the image in the builder
- pipeline stage.
- The third module is a helper that put the result cost to the cost field of the
- duplicate tasks. For example, if two tasks are equivalent, only one task, say
- t1 will be executed and the other task, say t2 will not be executed. The third
- mode gets the result from t1, when it is available and set the cost of t2 to
- be the same as that of t1.
- """
-
- def __init__(self, num_processes, name, cache, stage, task_queue, helper,
- worker, result_queue):
- """Set up input/output queue and the actual method to be called.
-
- Args:
- num_processes: Number of helpers subprocessors this stage has.
- name: The name of this stage.
- cache: The computed tasks encountered before.
- stage: An int value that specifies the stage for this pipeline stage, for
- example, build stage or test stage. This value will be used to retrieve
- the keys in different stage. I.e., the flags set is the key in build
- stage and the checksum is the key in the test stage. The key is used to
- detect duplicates.
- task_queue: The input task queue for this pipeline stage.
- helper: The method hosted by the helper module to fill up the cost of the
- duplicate tasks.
- worker: The method hosted by the worker pools to do the actual work, e.g.,
- compile the image.
- result_queue: The output task queue for this pipeline stage.
- """
-
- multiprocessing.Process.__init__(self)
-
- self._name = name
- self._task_queue = task_queue
- self._result_queue = result_queue
-
- self._helper = helper
- self._worker = worker
-
- self._cache = cache
- self._stage = stage
- self._num_processes = num_processes
-
- # the queues used by the modules for communication
- manager = multiprocessing.Manager()
- self._helper_queue = manager.Queue()
- self._work_queue = manager.Queue()
-
- def run(self):
- """Busy pulling the next task from the queue for execution.
-
- Once a job is pulled, this stage invokes the actual stage method and submits
- the result to the next pipeline stage.
-
- The process will terminate on receiving the poison pill from previous stage.
+ """A process that encapsulates the actual content pipeline stage.
+
+ The actual pipeline stage can be the builder or the tester. This process
+ continuously pull tasks from the queue until a poison pill is received.
+ Once a job is received, it will hand it to the actual stage for processing.
+
+ Each pipeline stage contains three modules.
+ The first module continuously pulls task from the input queue. It searches the
+ cache to check whether the task has encountered before. If so, duplicate
+ computation can be avoided.
+ The second module consists of a pool of workers that do the actual work, e.g.,
+ the worker will compile the source code and get the image in the builder
+ pipeline stage.
+ The third module is a helper that put the result cost to the cost field of the
+ duplicate tasks. For example, if two tasks are equivalent, only one task, say
+ t1 will be executed and the other task, say t2 will not be executed. The third
+ mode gets the result from t1, when it is available and set the cost of t2 to
+ be the same as that of t1.
"""
- # the worker pool
- work_pool = multiprocessing.Pool(self._num_processes)
-
- # the helper process
- helper_process = multiprocessing.Process(
- target=self._helper,
- args=(self._stage, self._cache, self._helper_queue, self._work_queue,
- self._result_queue))
- helper_process.start()
- mycache = self._cache.keys()
-
- while True:
- task = self._task_queue.get()
- if task == POISONPILL:
- # Poison pill means shutdown
- self._result_queue.put(POISONPILL)
- break
-
- task_key = task.GetIdentifier(self._stage)
- if task_key in mycache:
- # The task has been encountered before. It will be sent to the helper
- # module for further processing.
- self._helper_queue.put(task)
- else:
- # Let the workers do the actual work.
- work_pool.apply_async(
- self._worker,
- args=(self._stage, task, self._work_queue, self._result_queue))
- mycache.append(task_key)
-
- # Shutdown the workers pool and the helper process.
- work_pool.close()
- work_pool.join()
-
- self._helper_queue.put(POISONPILL)
- helper_process.join()
+ def __init__(
+ self,
+ num_processes,
+ name,
+ cache,
+ stage,
+ task_queue,
+ helper,
+ worker,
+ result_queue,
+ ):
+ """Set up input/output queue and the actual method to be called.
+
+ Args:
+ num_processes: Number of helpers subprocessors this stage has.
+ name: The name of this stage.
+ cache: The computed tasks encountered before.
+ stage: An int value that specifies the stage for this pipeline stage, for
+ example, build stage or test stage. This value will be used to retrieve
+ the keys in different stage. I.e., the flags set is the key in build
+ stage and the checksum is the key in the test stage. The key is used to
+ detect duplicates.
+ task_queue: The input task queue for this pipeline stage.
+ helper: The method hosted by the helper module to fill up the cost of the
+ duplicate tasks.
+ worker: The method hosted by the worker pools to do the actual work, e.g.,
+ compile the image.
+ result_queue: The output task queue for this pipeline stage.
+ """
+
+ multiprocessing.Process.__init__(self)
+
+ self._name = name
+ self._task_queue = task_queue
+ self._result_queue = result_queue
+
+ self._helper = helper
+ self._worker = worker
+
+ self._cache = cache
+ self._stage = stage
+ self._num_processes = num_processes
+
+ # the queues used by the modules for communication
+ manager = multiprocessing.Manager()
+ self._helper_queue = manager.Queue()
+ self._work_queue = manager.Queue()
+
+ def run(self):
+ """Busy pulling the next task from the queue for execution.
+
+ Once a job is pulled, this stage invokes the actual stage method and submits
+ the result to the next pipeline stage.
+
+ The process will terminate on receiving the poison pill from previous stage.
+ """
+
+ # the worker pool
+ work_pool = multiprocessing.Pool(self._num_processes)
+
+ # the helper process
+ helper_process = multiprocessing.Process(
+ target=self._helper,
+ args=(
+ self._stage,
+ self._cache,
+ self._helper_queue,
+ self._work_queue,
+ self._result_queue,
+ ),
+ )
+ helper_process.start()
+ mycache = self._cache.keys()
+
+ while True:
+ task = self._task_queue.get()
+ if task == POISONPILL:
+ # Poison pill means shutdown
+ self._result_queue.put(POISONPILL)
+ break
+
+ task_key = task.GetIdentifier(self._stage)
+ if task_key in mycache:
+ # The task has been encountered before. It will be sent to the helper
+ # module for further processing.
+ self._helper_queue.put(task)
+ else:
+ # Let the workers do the actual work.
+ work_pool.apply_async(
+ self._worker,
+ args=(
+ self._stage,
+ task,
+ self._work_queue,
+ self._result_queue,
+ ),
+ )
+ mycache.append(task_key)
+
+ # Shutdown the workers pool and the helper process.
+ work_pool.close()
+ work_pool.join()
+
+ self._helper_queue.put(POISONPILL)
+ helper_process.join()