diff options
Diffstat (limited to 'bestflags/pipeline_process.py')
-rw-r--r-- | bestflags/pipeline_process.py | 230 |
1 files changed, 126 insertions, 104 deletions
diff --git a/bestflags/pipeline_process.py b/bestflags/pipeline_process.py index 31f5f21f..3aab96fe 100644 --- a/bestflags/pipeline_process.py +++ b/bestflags/pipeline_process.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. +# Copyright 2013 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Pipeline process that encapsulates the actual content. @@ -8,116 +8,138 @@ Part of the Chrome build flags optimization. The actual stages include the builder and the executor. """ -__author__ = 'yuhenglong@google.com (Yuheng Long)' +__author__ = "yuhenglong@google.com (Yuheng Long)" import multiprocessing + # Pick an integer at random. POISONPILL = 975 class PipelineProcess(multiprocessing.Process): - """A process that encapsulates the actual content pipeline stage. - - The actual pipeline stage can be the builder or the tester. This process - continuously pull tasks from the queue until a poison pill is received. - Once a job is received, it will hand it to the actual stage for processing. - - Each pipeline stage contains three modules. - The first module continuously pulls task from the input queue. It searches the - cache to check whether the task has encountered before. If so, duplicate - computation can be avoided. - The second module consists of a pool of workers that do the actual work, e.g., - the worker will compile the source code and get the image in the builder - pipeline stage. - The third module is a helper that put the result cost to the cost field of the - duplicate tasks. For example, if two tasks are equivalent, only one task, say - t1 will be executed and the other task, say t2 will not be executed. The third - mode gets the result from t1, when it is available and set the cost of t2 to - be the same as that of t1. - """ - - def __init__(self, num_processes, name, cache, stage, task_queue, helper, - worker, result_queue): - """Set up input/output queue and the actual method to be called. - - Args: - num_processes: Number of helpers subprocessors this stage has. - name: The name of this stage. - cache: The computed tasks encountered before. - stage: An int value that specifies the stage for this pipeline stage, for - example, build stage or test stage. This value will be used to retrieve - the keys in different stage. I.e., the flags set is the key in build - stage and the checksum is the key in the test stage. The key is used to - detect duplicates. - task_queue: The input task queue for this pipeline stage. - helper: The method hosted by the helper module to fill up the cost of the - duplicate tasks. - worker: The method hosted by the worker pools to do the actual work, e.g., - compile the image. - result_queue: The output task queue for this pipeline stage. - """ - - multiprocessing.Process.__init__(self) - - self._name = name - self._task_queue = task_queue - self._result_queue = result_queue - - self._helper = helper - self._worker = worker - - self._cache = cache - self._stage = stage - self._num_processes = num_processes - - # the queues used by the modules for communication - manager = multiprocessing.Manager() - self._helper_queue = manager.Queue() - self._work_queue = manager.Queue() - - def run(self): - """Busy pulling the next task from the queue for execution. - - Once a job is pulled, this stage invokes the actual stage method and submits - the result to the next pipeline stage. - - The process will terminate on receiving the poison pill from previous stage. + """A process that encapsulates the actual content pipeline stage. + + The actual pipeline stage can be the builder or the tester. This process + continuously pull tasks from the queue until a poison pill is received. + Once a job is received, it will hand it to the actual stage for processing. + + Each pipeline stage contains three modules. + The first module continuously pulls task from the input queue. It searches the + cache to check whether the task has encountered before. If so, duplicate + computation can be avoided. + The second module consists of a pool of workers that do the actual work, e.g., + the worker will compile the source code and get the image in the builder + pipeline stage. + The third module is a helper that put the result cost to the cost field of the + duplicate tasks. For example, if two tasks are equivalent, only one task, say + t1 will be executed and the other task, say t2 will not be executed. The third + mode gets the result from t1, when it is available and set the cost of t2 to + be the same as that of t1. """ - # the worker pool - work_pool = multiprocessing.Pool(self._num_processes) - - # the helper process - helper_process = multiprocessing.Process( - target=self._helper, - args=(self._stage, self._cache, self._helper_queue, self._work_queue, - self._result_queue)) - helper_process.start() - mycache = self._cache.keys() - - while True: - task = self._task_queue.get() - if task == POISONPILL: - # Poison pill means shutdown - self._result_queue.put(POISONPILL) - break - - task_key = task.GetIdentifier(self._stage) - if task_key in mycache: - # The task has been encountered before. It will be sent to the helper - # module for further processing. - self._helper_queue.put(task) - else: - # Let the workers do the actual work. - work_pool.apply_async( - self._worker, - args=(self._stage, task, self._work_queue, self._result_queue)) - mycache.append(task_key) - - # Shutdown the workers pool and the helper process. - work_pool.close() - work_pool.join() - - self._helper_queue.put(POISONPILL) - helper_process.join() + def __init__( + self, + num_processes, + name, + cache, + stage, + task_queue, + helper, + worker, + result_queue, + ): + """Set up input/output queue and the actual method to be called. + + Args: + num_processes: Number of helpers subprocessors this stage has. + name: The name of this stage. + cache: The computed tasks encountered before. + stage: An int value that specifies the stage for this pipeline stage, for + example, build stage or test stage. This value will be used to retrieve + the keys in different stage. I.e., the flags set is the key in build + stage and the checksum is the key in the test stage. The key is used to + detect duplicates. + task_queue: The input task queue for this pipeline stage. + helper: The method hosted by the helper module to fill up the cost of the + duplicate tasks. + worker: The method hosted by the worker pools to do the actual work, e.g., + compile the image. + result_queue: The output task queue for this pipeline stage. + """ + + multiprocessing.Process.__init__(self) + + self._name = name + self._task_queue = task_queue + self._result_queue = result_queue + + self._helper = helper + self._worker = worker + + self._cache = cache + self._stage = stage + self._num_processes = num_processes + + # the queues used by the modules for communication + manager = multiprocessing.Manager() + self._helper_queue = manager.Queue() + self._work_queue = manager.Queue() + + def run(self): + """Busy pulling the next task from the queue for execution. + + Once a job is pulled, this stage invokes the actual stage method and submits + the result to the next pipeline stage. + + The process will terminate on receiving the poison pill from previous stage. + """ + + # the worker pool + work_pool = multiprocessing.Pool(self._num_processes) + + # the helper process + helper_process = multiprocessing.Process( + target=self._helper, + args=( + self._stage, + self._cache, + self._helper_queue, + self._work_queue, + self._result_queue, + ), + ) + helper_process.start() + mycache = self._cache.keys() + + while True: + task = self._task_queue.get() + if task == POISONPILL: + # Poison pill means shutdown + self._result_queue.put(POISONPILL) + break + + task_key = task.GetIdentifier(self._stage) + if task_key in mycache: + # The task has been encountered before. It will be sent to the helper + # module for further processing. + self._helper_queue.put(task) + else: + # Let the workers do the actual work. + work_pool.apply_async( + self._worker, + args=( + self._stage, + task, + self._work_queue, + self._result_queue, + ), + ) + mycache.append(task_key) + + # Shutdown the workers pool and the helper process. + work_pool.close() + work_pool.join() + + self._helper_queue.put(POISONPILL) + helper_process.join() |