aboutsummaryrefslogtreecommitdiff
path: root/bestflags/pipeline_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'bestflags/pipeline_worker.py')
-rw-r--r--bestflags/pipeline_worker.py32
1 files changed, 16 insertions, 16 deletions
diff --git a/bestflags/pipeline_worker.py b/bestflags/pipeline_worker.py
index 4a1722a8..6ec28fe6 100644
--- a/bestflags/pipeline_worker.py
+++ b/bestflags/pipeline_worker.py
@@ -15,7 +15,7 @@ __author__ = 'yuhenglong@google.com (Yuheng Long)'
import pipeline_process
-def helper(stage, done_dict, helper_queue, completed_queue, result_queue):
+def Helper(stage, done_dict, helper_queue, completed_queue, result_queue):
"""Helper that filters duplicate tasks.
This method Continuously pulls duplicate tasks from the helper_queue. The
@@ -53,10 +53,10 @@ def helper(stage, done_dict, helper_queue, completed_queue, result_queue):
break
# The task has not been performed before.
- assert not task.done(stage)
+ assert not task.Done(stage)
# The identifier of this task.
- identifier = task.get_identifier(stage)
+ identifier = task.GetIdentifier(stage)
# If a duplicate task comes before the corresponding resolved results from
# the completed_queue, it will be put in the waiting list. If the result
@@ -65,23 +65,23 @@ def helper(stage, done_dict, helper_queue, completed_queue, result_queue):
if identifier in done_dict:
# This task has been encountered before and the result is available. The
# result can be resolved right away.
- task.set_result(stage, done_dict[identifier])
+ task.SetResult(stage, done_dict[identifier])
result_queue.put(task)
else:
waiting_list.append(task)
# Check and get completed tasks from completed_queue.
- get_result_from_completed_queue(stage, completed_queue, done_dict,
- waiting_list, result_queue)
+ GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
+ result_queue)
# Wait to resolve the results of the remaining duplicate tasks.
while waiting_list:
- get_result_from_completed_queue(stage, completed_queue, done_dict,
- waiting_list, result_queue)
+ GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
+ result_queue)
-def get_result_from_completed_queue(stage, completed_queue, done_dict,
- waiting_list, result_queue):
+def GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
+ result_queue):
"""Pull results from the completed queue and resolves duplicate tasks.
Args:
@@ -108,14 +108,14 @@ def get_result_from_completed_queue(stage, completed_queue, done_dict,
(identifier, result) = completed_queue.get()
done_dict[identifier] = result
- tasks = [t for t in waiting_list if t.get_identifier(stage) == identifier]
+ tasks = [t for t in waiting_list if t.GetIdentifier(stage) == identifier]
for duplicate_task in tasks:
- duplicate_task.set_result(stage, result)
+ duplicate_task.SetResult(stage, result)
result_queue.put(duplicate_task)
waiting_list.remove(duplicate_task)
-def worker(stage, task, helper_queue, result_queue):
+def Worker(stage, task, helper_queue, result_queue):
"""Worker that performs the task.
This method calls the work method of the input task and distribute the result
@@ -132,8 +132,8 @@ def worker(stage, task, helper_queue, result_queue):
"""
# The task has not been completed before.
- assert not task.done(stage)
+ assert not task.Done(stage)
- task.work(stage)
- helper_queue.put((task.get_identifier(stage), task.get_result(stage)))
+ task.Work(stage)
+ helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage)))
result_queue.put(task)