diff options
Diffstat (limited to 'bestflags/pipeline_worker.py')
-rw-r--r-- | bestflags/pipeline_worker.py | 32 |
1 files changed, 16 insertions, 16 deletions
diff --git a/bestflags/pipeline_worker.py b/bestflags/pipeline_worker.py index 4a1722a8..6ec28fe6 100644 --- a/bestflags/pipeline_worker.py +++ b/bestflags/pipeline_worker.py @@ -15,7 +15,7 @@ __author__ = 'yuhenglong@google.com (Yuheng Long)' import pipeline_process -def helper(stage, done_dict, helper_queue, completed_queue, result_queue): +def Helper(stage, done_dict, helper_queue, completed_queue, result_queue): """Helper that filters duplicate tasks. This method Continuously pulls duplicate tasks from the helper_queue. The @@ -53,10 +53,10 @@ def helper(stage, done_dict, helper_queue, completed_queue, result_queue): break # The task has not been performed before. - assert not task.done(stage) + assert not task.Done(stage) # The identifier of this task. - identifier = task.get_identifier(stage) + identifier = task.GetIdentifier(stage) # If a duplicate task comes before the corresponding resolved results from # the completed_queue, it will be put in the waiting list. If the result @@ -65,23 +65,23 @@ def helper(stage, done_dict, helper_queue, completed_queue, result_queue): if identifier in done_dict: # This task has been encountered before and the result is available. The # result can be resolved right away. - task.set_result(stage, done_dict[identifier]) + task.SetResult(stage, done_dict[identifier]) result_queue.put(task) else: waiting_list.append(task) # Check and get completed tasks from completed_queue. - get_result_from_completed_queue(stage, completed_queue, done_dict, - waiting_list, result_queue) + GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list, + result_queue) # Wait to resolve the results of the remaining duplicate tasks. while waiting_list: - get_result_from_completed_queue(stage, completed_queue, done_dict, - waiting_list, result_queue) + GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list, + result_queue) -def get_result_from_completed_queue(stage, completed_queue, done_dict, - waiting_list, result_queue): +def GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list, + result_queue): """Pull results from the completed queue and resolves duplicate tasks. Args: @@ -108,14 +108,14 @@ def get_result_from_completed_queue(stage, completed_queue, done_dict, (identifier, result) = completed_queue.get() done_dict[identifier] = result - tasks = [t for t in waiting_list if t.get_identifier(stage) == identifier] + tasks = [t for t in waiting_list if t.GetIdentifier(stage) == identifier] for duplicate_task in tasks: - duplicate_task.set_result(stage, result) + duplicate_task.SetResult(stage, result) result_queue.put(duplicate_task) waiting_list.remove(duplicate_task) -def worker(stage, task, helper_queue, result_queue): +def Worker(stage, task, helper_queue, result_queue): """Worker that performs the task. This method calls the work method of the input task and distribute the result @@ -132,8 +132,8 @@ def worker(stage, task, helper_queue, result_queue): """ # The task has not been completed before. - assert not task.done(stage) + assert not task.Done(stage) - task.work(stage) - helper_queue.put((task.get_identifier(stage), task.get_result(stage))) + task.Work(stage) + helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage))) result_queue.put(task) |