diff options
Diffstat (limited to 'bestflags/pipeline_worker.py')
-rw-r--r-- | bestflags/pipeline_worker.py | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/bestflags/pipeline_worker.py b/bestflags/pipeline_worker.py new file mode 100644 index 00000000..e21ec2c8 --- /dev/null +++ b/bestflags/pipeline_worker.py @@ -0,0 +1,142 @@ +# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""The pipeline_worker functions of the build and test stage of the framework. + +Part of the Chrome build flags optimization. + +This module defines the helper and the worker. If there are duplicate tasks, for +example t1 and t2, needs to be built/tested, one of them, for example t1, will +be built/tested and the helper waits for the result of t1 and set the results of +the other task, t2 here, to be the same as that of t1. Setting the result of t2 +to be the same as t1 is referred to as resolving the result of t2. +The worker invokes the work method of the tasks that are not duplicate. +""" + +__author__ = 'yuhenglong@google.com (Yuheng Long)' + +import pipeline_process + + +def Helper(stage, done_dict, helper_queue, completed_queue, result_queue): + """Helper that filters duplicate tasks. + + This method Continuously pulls duplicate tasks from the helper_queue. The + duplicate tasks need not be compiled/tested. This method also pulls completed + tasks from the worker queue and let the results of the duplicate tasks be the + same as their corresponding finished task. + + Args: + stage: The current stage of the pipeline, for example, build stage or test + stage. + done_dict: A dictionary of tasks that are done. The key of the dictionary is + the identifier of the task. The value of the dictionary is the results of + performing the corresponding task. + helper_queue: A queue of duplicate tasks whose results need to be resolved. + This is a communication channel between the pipeline_process and this + helper process. + completed_queue: A queue of tasks that have been built/tested. The results + of these tasks are needed to resolve the results of the duplicate tasks. + This is the communication channel between the workers and this helper + process. + result_queue: After the results of the duplicate tasks have been resolved, + the duplicate tasks will be sent to the next stage via this queue. + """ + + # The list of duplicate tasks, the results of which need to be resolved. + waiting_list = [] + + while True: + # Pull duplicate task from the helper queue. + if not helper_queue.empty(): + task = helper_queue.get() + + if task == pipeline_process.POISONPILL: + # Poison pill means no more duplicate task from the helper queue. + break + + # The task has not been performed before. + assert not task.Done(stage) + + # The identifier of this task. + identifier = task.GetIdentifier(stage) + + # If a duplicate task comes before the corresponding resolved results from + # the completed_queue, it will be put in the waiting list. If the result + # arrives before the duplicate task, the duplicate task will be resolved + # right away. + if identifier in done_dict: + # This task has been encountered before and the result is available. The + # result can be resolved right away. + task.SetResult(stage, done_dict[identifier]) + result_queue.put(task) + else: + waiting_list.append(task) + + # Check and get completed tasks from completed_queue. + GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list, + result_queue) + + # Wait to resolve the results of the remaining duplicate tasks. + while waiting_list: + GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list, + result_queue) + + +def GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list, + result_queue): + """Pull results from the completed queue and resolves duplicate tasks. + + Args: + stage: The current stage of the pipeline, for example, build stage or test + stage. + completed_queue: A queue of tasks that have been performed. The results of + these tasks are needed to resolve the results of the duplicate tasks. This + is the communication channel between the workers and this method. + done_dict: A dictionary of tasks that are done. The key of the dictionary is + the optimization flags of the task. The value of the dictionary is the + compilation results of the corresponding task. + waiting_list: The list of duplicate tasks, the results of which need to be + resolved. + result_queue: After the results of the duplicate tasks have been resolved, + the duplicate tasks will be sent to the next stage via this queue. + + This helper method tries to pull a completed task from the completed queue. + If it gets a task from the queue, it resolves the results of all the relevant + duplicate tasks in the waiting list. Relevant tasks are the tasks that have + the same flags as the currently received results from the completed_queue. + """ + # Pull completed task from the worker queue. + if not completed_queue.empty(): + (identifier, result) = completed_queue.get() + done_dict[identifier] = result + + tasks = [t for t in waiting_list if t.GetIdentifier(stage) == identifier] + for duplicate_task in tasks: + duplicate_task.SetResult(stage, result) + result_queue.put(duplicate_task) + waiting_list.remove(duplicate_task) + + +def Worker(stage, task, helper_queue, result_queue): + """Worker that performs the task. + + This method calls the work method of the input task and distribute the result + to the helper and the next stage. + + Args: + stage: The current stage of the pipeline, for example, build stage or test + stage. + task: Input task that needs to be performed. + helper_queue: Queue that holds the completed tasks and the results. This is + the communication channel between the worker and the helper. + result_queue: Queue that holds the completed tasks and the results. This is + the communication channel between the worker and the next stage. + """ + + # The task has not been completed before. + assert not task.Done(stage) + + task.Work(stage) + helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage))) + result_queue.put(task) |