aboutsummaryrefslogtreecommitdiff
path: root/bestflags/pipeline_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'bestflags/pipeline_worker.py')
-rw-r--r--bestflags/pipeline_worker.py245
1 files changed, 125 insertions, 120 deletions
diff --git a/bestflags/pipeline_worker.py b/bestflags/pipeline_worker.py
index e21ec2c8..f18be66b 100644
--- a/bestflags/pipeline_worker.py
+++ b/bestflags/pipeline_worker.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The pipeline_worker functions of the build and test stage of the framework.
@@ -13,130 +13,135 @@ to be the same as t1 is referred to as resolving the result of t2.
The worker invokes the work method of the tasks that are not duplicate.
"""
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
+__author__ = "yuhenglong@google.com (Yuheng Long)"
import pipeline_process
def Helper(stage, done_dict, helper_queue, completed_queue, result_queue):
- """Helper that filters duplicate tasks.
-
- This method Continuously pulls duplicate tasks from the helper_queue. The
- duplicate tasks need not be compiled/tested. This method also pulls completed
- tasks from the worker queue and let the results of the duplicate tasks be the
- same as their corresponding finished task.
-
- Args:
- stage: The current stage of the pipeline, for example, build stage or test
- stage.
- done_dict: A dictionary of tasks that are done. The key of the dictionary is
- the identifier of the task. The value of the dictionary is the results of
- performing the corresponding task.
- helper_queue: A queue of duplicate tasks whose results need to be resolved.
- This is a communication channel between the pipeline_process and this
- helper process.
- completed_queue: A queue of tasks that have been built/tested. The results
- of these tasks are needed to resolve the results of the duplicate tasks.
- This is the communication channel between the workers and this helper
- process.
- result_queue: After the results of the duplicate tasks have been resolved,
- the duplicate tasks will be sent to the next stage via this queue.
- """
-
- # The list of duplicate tasks, the results of which need to be resolved.
- waiting_list = []
-
- while True:
- # Pull duplicate task from the helper queue.
- if not helper_queue.empty():
- task = helper_queue.get()
-
- if task == pipeline_process.POISONPILL:
- # Poison pill means no more duplicate task from the helper queue.
- break
-
- # The task has not been performed before.
- assert not task.Done(stage)
-
- # The identifier of this task.
- identifier = task.GetIdentifier(stage)
-
- # If a duplicate task comes before the corresponding resolved results from
- # the completed_queue, it will be put in the waiting list. If the result
- # arrives before the duplicate task, the duplicate task will be resolved
- # right away.
- if identifier in done_dict:
- # This task has been encountered before and the result is available. The
- # result can be resolved right away.
- task.SetResult(stage, done_dict[identifier])
- result_queue.put(task)
- else:
- waiting_list.append(task)
-
- # Check and get completed tasks from completed_queue.
- GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
- result_queue)
-
- # Wait to resolve the results of the remaining duplicate tasks.
- while waiting_list:
- GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
- result_queue)
-
-
-def GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
- result_queue):
- """Pull results from the completed queue and resolves duplicate tasks.
-
- Args:
- stage: The current stage of the pipeline, for example, build stage or test
- stage.
- completed_queue: A queue of tasks that have been performed. The results of
- these tasks are needed to resolve the results of the duplicate tasks. This
- is the communication channel between the workers and this method.
- done_dict: A dictionary of tasks that are done. The key of the dictionary is
- the optimization flags of the task. The value of the dictionary is the
- compilation results of the corresponding task.
- waiting_list: The list of duplicate tasks, the results of which need to be
- resolved.
- result_queue: After the results of the duplicate tasks have been resolved,
- the duplicate tasks will be sent to the next stage via this queue.
-
- This helper method tries to pull a completed task from the completed queue.
- If it gets a task from the queue, it resolves the results of all the relevant
- duplicate tasks in the waiting list. Relevant tasks are the tasks that have
- the same flags as the currently received results from the completed_queue.
- """
- # Pull completed task from the worker queue.
- if not completed_queue.empty():
- (identifier, result) = completed_queue.get()
- done_dict[identifier] = result
-
- tasks = [t for t in waiting_list if t.GetIdentifier(stage) == identifier]
- for duplicate_task in tasks:
- duplicate_task.SetResult(stage, result)
- result_queue.put(duplicate_task)
- waiting_list.remove(duplicate_task)
+ """Helper that filters duplicate tasks.
+
+ This method Continuously pulls duplicate tasks from the helper_queue. The
+ duplicate tasks need not be compiled/tested. This method also pulls completed
+ tasks from the worker queue and let the results of the duplicate tasks be the
+ same as their corresponding finished task.
+
+ Args:
+ stage: The current stage of the pipeline, for example, build stage or test
+ stage.
+ done_dict: A dictionary of tasks that are done. The key of the dictionary is
+ the identifier of the task. The value of the dictionary is the results of
+ performing the corresponding task.
+ helper_queue: A queue of duplicate tasks whose results need to be resolved.
+ This is a communication channel between the pipeline_process and this
+ helper process.
+ completed_queue: A queue of tasks that have been built/tested. The results
+ of these tasks are needed to resolve the results of the duplicate tasks.
+ This is the communication channel between the workers and this helper
+ process.
+ result_queue: After the results of the duplicate tasks have been resolved,
+ the duplicate tasks will be sent to the next stage via this queue.
+ """
+
+ # The list of duplicate tasks, the results of which need to be resolved.
+ waiting_list = []
+
+ while True:
+ # Pull duplicate task from the helper queue.
+ if not helper_queue.empty():
+ task = helper_queue.get()
+
+ if task == pipeline_process.POISONPILL:
+ # Poison pill means no more duplicate task from the helper queue.
+ break
+
+ # The task has not been performed before.
+ assert not task.Done(stage)
+
+ # The identifier of this task.
+ identifier = task.GetIdentifier(stage)
+
+ # If a duplicate task comes before the corresponding resolved results from
+ # the completed_queue, it will be put in the waiting list. If the result
+ # arrives before the duplicate task, the duplicate task will be resolved
+ # right away.
+ if identifier in done_dict:
+ # This task has been encountered before and the result is available. The
+ # result can be resolved right away.
+ task.SetResult(stage, done_dict[identifier])
+ result_queue.put(task)
+ else:
+ waiting_list.append(task)
+
+ # Check and get completed tasks from completed_queue.
+ GetResultFromCompletedQueue(
+ stage, completed_queue, done_dict, waiting_list, result_queue
+ )
+
+ # Wait to resolve the results of the remaining duplicate tasks.
+ while waiting_list:
+ GetResultFromCompletedQueue(
+ stage, completed_queue, done_dict, waiting_list, result_queue
+ )
+
+
+def GetResultFromCompletedQueue(
+ stage, completed_queue, done_dict, waiting_list, result_queue
+):
+ """Pull results from the completed queue and resolves duplicate tasks.
+
+ Args:
+ stage: The current stage of the pipeline, for example, build stage or test
+ stage.
+ completed_queue: A queue of tasks that have been performed. The results of
+ these tasks are needed to resolve the results of the duplicate tasks. This
+ is the communication channel between the workers and this method.
+ done_dict: A dictionary of tasks that are done. The key of the dictionary is
+ the optimization flags of the task. The value of the dictionary is the
+ compilation results of the corresponding task.
+ waiting_list: The list of duplicate tasks, the results of which need to be
+ resolved.
+ result_queue: After the results of the duplicate tasks have been resolved,
+ the duplicate tasks will be sent to the next stage via this queue.
+
+ This helper method tries to pull a completed task from the completed queue.
+ If it gets a task from the queue, it resolves the results of all the relevant
+ duplicate tasks in the waiting list. Relevant tasks are the tasks that have
+ the same flags as the currently received results from the completed_queue.
+ """
+ # Pull completed task from the worker queue.
+ if not completed_queue.empty():
+ (identifier, result) = completed_queue.get()
+ done_dict[identifier] = result
+
+ tasks = [
+ t for t in waiting_list if t.GetIdentifier(stage) == identifier
+ ]
+ for duplicate_task in tasks:
+ duplicate_task.SetResult(stage, result)
+ result_queue.put(duplicate_task)
+ waiting_list.remove(duplicate_task)
def Worker(stage, task, helper_queue, result_queue):
- """Worker that performs the task.
-
- This method calls the work method of the input task and distribute the result
- to the helper and the next stage.
-
- Args:
- stage: The current stage of the pipeline, for example, build stage or test
- stage.
- task: Input task that needs to be performed.
- helper_queue: Queue that holds the completed tasks and the results. This is
- the communication channel between the worker and the helper.
- result_queue: Queue that holds the completed tasks and the results. This is
- the communication channel between the worker and the next stage.
- """
-
- # The task has not been completed before.
- assert not task.Done(stage)
-
- task.Work(stage)
- helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage)))
- result_queue.put(task)
+ """Worker that performs the task.
+
+ This method calls the work method of the input task and distribute the result
+ to the helper and the next stage.
+
+ Args:
+ stage: The current stage of the pipeline, for example, build stage or test
+ stage.
+ task: Input task that needs to be performed.
+ helper_queue: Queue that holds the completed tasks and the results. This is
+ the communication channel between the worker and the helper.
+ result_queue: Queue that holds the completed tasks and the results. This is
+ the communication channel between the worker and the next stage.
+ """
+
+ # The task has not been completed before.
+ assert not task.Done(stage)
+
+ task.Work(stage)
+ helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage)))
+ result_queue.put(task)