aboutsummaryrefslogtreecommitdiff
path: root/bestflags
diff options
context:
space:
mode:
authorYuheng Long <yuhenglong@google.com>2013-06-28 09:32:43 -0700
committerChromeBot <chrome-bot@google.com>2013-07-10 14:24:00 -0700
commit761748dc64a0d2a6a90714097445fcf782a8934e (patch)
tree8fe0d8c56eaeaf9bdda204d0a0f5594183f75aca /bestflags
parent2dd2e9e80ad235793e1591d21a16d23b2b5243d7 (diff)
downloadtoolchain-utils-761748dc64a0d2a6a90714097445fcf782a8934e.tar.gz
Merge the test stage and the build stage.
Use the pipeline_worker to represent both stage. These two stages differ on getting the key of the task in the stage and setting the value of this stage. The key uniquely distinguish a task from another task. For example, the key for the build phase is the optimization flags, while the key for the test stage is the checksum of the built image. BUG=None TEST=None Change-Id: Id057c17026806acd406fe78cc70ed996273ca9aa Reviewed-on: https://gerrit-int.chromium.org/40376 Reviewed-by: Simon Que <sque@google.com> Reviewed-by: Luis Lozano <llozano@chromium.org> Commit-Queue: Yuheng Long <yuhenglong@google.com> Tested-by: Yuheng Long <yuhenglong@google.com>
Diffstat (limited to 'bestflags')
-rw-r--r--bestflags/builder.py132
-rw-r--r--bestflags/builder_test.py164
-rw-r--r--bestflags/executor.py64
-rw-r--r--bestflags/executor_test.py42
-rw-r--r--bestflags/pipeline_worker.py137
-rw-r--r--bestflags/pipeline_worker_test.py173
6 files changed, 310 insertions, 402 deletions
diff --git a/bestflags/builder.py b/bestflags/builder.py
deleted file mode 100644
index 535c1a14..00000000
--- a/bestflags/builder.py
+++ /dev/null
@@ -1,132 +0,0 @@
-"""The Build stage of the framework.
-
-This module defines the builder helper and the actual build worker. If there are
-duplicate tasks, for example t1 and t2, needs to be built, one of them, for
-example t1, will be built and the helper waits for the result of t1 and set the
-results of the other task, t2 here, to be the same as that of t1. Setting the
-result of t2 to be the same as t1 is referred to as resolving the result of t2.
-The build worker invokes the compile method of the tasks that are not duplicate.
-"""
-
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
-
-import pipeline_process
-
-
-def build_helper(done_dict, helper_queue, built_queue, result_queue):
- """Build helper.
-
- This method Continuously pulls duplicate tasks from the helper_queue. The
- duplicate tasks need not be compiled. This method also pulls completed tasks
- from the worker queue and let the results of the duplicate tasks be the
- same as their corresponding finished task.
-
- Args:
- done_dict: A dictionary of tasks that are done. The key of the dictionary is
- the optimization flags of the task. The value of the dictionary is the
- compilation results of the corresponding task.
- helper_queue: A queue of duplicate tasks whose results need to be resolved.
- This is a communication channel between the pipeline_process and this
- helper process.
- built_queue: A queue of tasks that have been built. The results of these
- tasks are needed to resolve the results of the duplicate tasks. This is
- the communication channel between the actual build workers and this helper
- process.
- result_queue: After the results of the duplicate tasks have been resolved,
- the duplicate tasks will be sent to the next stage via this queue.
- """
-
- # The list of duplicate tasks, the results of which need to be resolved.
- waiting_list = []
-
- while True:
- # Pull duplicate task from the helper queue.
- if not helper_queue.empty():
- task = helper_queue.get()
-
- if task == pipeline_process.POISONPILL:
- # Poison pill means no more duplicate task from the helper queue.
- break
-
- # The task has not been compiled before.
- assert not task.compiled()
-
- # The optimization flags of this task.
- flags = task.get_flags()
-
- # If a duplicate task come before the corresponding resolved results from
- # the built_queue, it will be put in the waiting list. If the result
- # arrives before the duplicate task, the duplicate task will be resolved
- # right away.
- if flags in done_dict:
- # This task has been encountered before and the result is available. The
- # result can be resolved right away.
- task.set_build_result(done_dict[flags])
- result_queue.put(task)
- else:
- waiting_list.append(task)
-
- # Check and get compiled tasks from compiled_queue.
- get_result_from_built_queue(built_queue, done_dict, waiting_list,
- result_queue)
-
- # Wait to resolve the results of the remaining duplicate tasks.
- while waiting_list:
- get_result_from_built_queue(built_queue, done_dict, waiting_list,
- result_queue)
-
-
-def get_result_from_built_queue(built_queue, done_dict, waiting_list,
- result_queue):
- """Pull results from the compiled queue and resolves duplicate tasks.
-
- Args:
- built_queue: A queue of tasks that have been built. The results of these
- tasks are needed to resolve the results of the duplicate tasks. This is
- the communication channel between the actual build workers and this helper
- process.
- done_dict: A dictionary of tasks that are done. The key of the dictionary is
- the optimization flags of the task. The value of the dictionary is the
- compilation results of the corresponding task.
- waiting_list: The list of duplicate tasks, the results of which need to be
- resolved.
- result_queue: After the results of the duplicate tasks have been resolved,
- the duplicate tasks will be sent to the next stage via this queue.
-
- This helper method tries to pull a compiled task from the compiled queue.
- If it gets a task from the queue, it resolves the results of all the relevant
- duplicate tasks in the waiting list. Relevant tasks are the tasks that have
- the same flags as the currently received results from the built_queue.
- """
- # Pull completed task from the worker queue.
- if not built_queue.empty():
- (flags, build_result) = built_queue.get()
- done_dict[flags] = build_result
-
- task_list = [t for t in waiting_list if t.get_flags() == flags]
- for duplicate_task in task_list:
- duplicate_task.set_build_result(build_result)
- result_queue.put(duplicate_task)
- waiting_list.remove(duplicate_task)
-
-
-def build_worker(task, helper_queue, result_queue):
- """Build worker.
-
- This method calls the compile method of the input task and distribute the
- result to the helper and the next stage.
-
- Args:
- task: Input task that needs to be built.
- helper_queue: Queue that holds the completed tasks and the build results.
- This is a communication channel between the worker and the helper.
- result_queue: Queue that holds the completed tasks and the build results.
- This is a communication channel between the worker and the next stage.
- """
-
- # The task has not been compiled before.
- assert not task.compiled()
-
- task.compile()
- helper_queue.put((task.get_flags(), task.get_build_result()))
- result_queue.put(task)
diff --git a/bestflags/builder_test.py b/bestflags/builder_test.py
deleted file mode 100644
index a705ee1a..00000000
--- a/bestflags/builder_test.py
+++ /dev/null
@@ -1,164 +0,0 @@
-"""Builder unittest.
-
-This module tests the build helper method and the worker method.
-"""
-
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
-
-import multiprocessing
-import random
-import sys
-import unittest
-
-import builder
-import pipeline_process
-
-
-def MockTaskCostGenerator():
- """Calls a random number generator and returns a negative number."""
- return random.randint(-sys.maxint - 1, -1)
-
-
-class MockTask(object):
- """This class emulates an actual task.
-
- It does not do the actual compile, but simply returns the build result as when
- this task is constructed.
- """
-
- def __init__(self, flags, cost):
- """Set up the compile results for this task.
-
- Args:
- flags: the optimization flags of this task.
- cost: the mork build cost of this task.
-
- The _pre_cost field stored the 'compiled' cost. Once this task is
- compiled, i.e., by calling the compile method , the _cost field will have
- this 'compiled' cost.
- """
-
- self._flags = flags
- self._pre_cost = cost
-
- def get_flags(self):
- return self._flags
-
- def __eq__(self, other):
- if isinstance(other, MockTask):
- return self._flags == other._flags and self._cost == other._cost
- return False
-
- def set_build_result(self, cost):
- self._cost = cost
-
- def compile(self):
- self._cost = self._pre_cost
-
- def get_build_result(self):
- return self._cost
-
- def compiled(self):
- """Indicates whether the task has been compiled."""
-
- return '_cost' in self.__dict__
-
-
-class BuilderTest(unittest.TestCase):
- """This class tests the Builder.
-
- Given the same flags set, the image and the cost should result the same from
- the builder.
- """
-
- def testHelper(self):
- """"Test the build helper.
-
- Call the build method twice, and test the results. The results should be the
- same, i.e., the cost should be the same.
- """
-
- # Set up the input, helper and output queue for the worker method.
- manager = multiprocessing.Manager()
- helper_queue = manager.Queue()
- output_queue = manager.Queue()
- built_queue = manager.Queue()
-
- # Set up the helper process that holds the helper method.
- helper_process = multiprocessing.Process(target=builder.build_helper,
- args=({}, helper_queue,
- built_queue, output_queue))
- helper_process.start()
-
- # A dictionary defines the mock compile result to the build_helper.
- mock_compile_result = {1: 1995, 2: 59, 9: 1027}
-
- # Test if there is a task that is done before, whether the duplicate task
- # will have the same result. Here, two different scenarios are tested. That
- # is the mock results are added to the built_queue before and after the
- # corresponding mock tasks being ;added to the input queue.
- built_queue.put((9, mock_compile_result[9]))
-
- # The output of the helper should contain all the following tasks.
- results = [1, 1, 2, 9]
-
- # Testing the correctness of having tasks having the same flags key, here 1.
- for result in results:
- helper_queue.put(MockTask(result, MockTaskCostGenerator()))
-
- built_queue.put((2, mock_compile_result[2]))
- built_queue.put((1, mock_compile_result[1]))
-
- # Signal there is no more duplicate task.
- helper_queue.put(pipeline_process.POISONPILL)
- helper_process.join()
-
- while results:
- task = output_queue.get()
- flags = task._flags
- cost = task._cost
- self.assertTrue(flags in results)
- if flags in mock_compile_result:
- self.assertTrue(cost, mock_compile_result[flags])
- results.remove(task._flags)
-
- def testWorker(self):
- """"Test the actual build worker method.
-
- The worker should process all the input tasks and output the tasks to the
- helper and result queue.
- """
-
- manager = multiprocessing.Manager()
- output_queue = manager.Queue()
- built_queue = manager.Queue()
-
- # A dictionary defines the mock tasks and their corresponding compile
- # results.
- mock_compile_tasks = {1: 86, 2: 788}
-
- mock_tasks = []
-
- for flag, cost in mock_compile_tasks.iteritems():
- mock_tasks.append(MockTask(flag, cost))
-
- # Submit the mock tasks to the build worker.
- for mock_task in mock_tasks:
- builder.build_worker(mock_task, built_queue, output_queue)
-
- # The tasks, from the output queue, should be the same as the input and
- # should be compiled.
- for task in mock_tasks:
- output = output_queue.get()
- self.assertEqual(output, task)
- self.assertTrue(output.compiled())
-
- # The tasks, from the built queue, should be defined in the
- # mock_compile_tasks dictionary.
- for flag, cost in mock_compile_tasks.iteritems():
- helper_input = built_queue.get()
- self.assertEqual(helper_input, (flag, cost))
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/bestflags/executor.py b/bestflags/executor.py
deleted file mode 100644
index 91dd9288..00000000
--- a/bestflags/executor.py
+++ /dev/null
@@ -1,64 +0,0 @@
-"""The Execution stage of the framework.
-
-Execute the image against a set of benchmarks. This stage sets up a number of
-processes, calls the actual execute method and caches the results.
-"""
-
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
-
-import multiprocessing
-
-
-class Tester(object):
- """Execute the generated images against a set of benchmark applications."""
-
- def __init__(self, numProcess, costs):
- """Set up the process pool and the results cached.
-
- Args:
- numProcess: Maximum number of execution to run in parallel
- costs: Executions that have been benchmarked before
- """
-
- self._pool = multiprocessing.Pool(numProcess)
- self._costs = costs
-
- def _set_cost(self, image, cost):
- """Record the execution result for the current image.
-
- Args:
- image: The input image for the execution
- cost: the time it takes to execute the image
- """
-
- pass
-
- def _execute(self, task):
- """Execute the benchmarks on task.
-
- The concrete subclass should implement the actual execution.
-
- Args:
- task: The input task for the execution
- """
- # raise Exception('Must be implemented in child class')
- pass
-
- def _execute_task(self, task):
- """Execute the input task and record the cost.
-
- Args:
- task: The task to be compiled
- """
- pass
-
- def execute(self, generation):
- """Execute the image for all entities in a generation.
-
- Call them in parallel in processes.
-
- Args:
- generation: A new generation to be executed.
- """
-
- self._pool.map(self._execute_task, generation.task, 1)
diff --git a/bestflags/executor_test.py b/bestflags/executor_test.py
deleted file mode 100644
index 0f27fb9a..00000000
--- a/bestflags/executor_test.py
+++ /dev/null
@@ -1,42 +0,0 @@
-"""Tester unittest."""
-
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
-
-import unittest
-
-import executor
-
-
-class TesterTest(unittest.TestCase):
- """This class test the Executor.
-
- Given the same flags set and/or checksum, the image and the cost should be the
- same from the Executor.
- """
-
- def setUp(self):
- """Create the Executor to be tested."""
-
- self.tester = executor.Tester(1, None)
-
- def testExecute(self):
- """"Test the execute method.
-
- Call the execute method twice, and test the results. The results should be
- the same, i.e., the cost should be the same.
- Either the execute method or the set_execution_result of the input
- Generation for the Tester should be called, but not both.
- """
- self.tester.execute(self)
-
- def testInit(self):
- """"Test the init method.
-
- If a certain checksum has been encountered before, the Tester should not
- reexecute the images with the same checksum.
- """
-
- pass
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/bestflags/pipeline_worker.py b/bestflags/pipeline_worker.py
new file mode 100644
index 00000000..6ab965e2
--- /dev/null
+++ b/bestflags/pipeline_worker.py
@@ -0,0 +1,137 @@
+"""The pipeline_worker functions of the build and test stage of the framework.
+
+This module defines the helper and the worker. If there are duplicate tasks, for
+example t1 and t2, needs to be built/tested, one of them, for example t1, will
+be built/tested and the helper waits for the result of t1 and set the results of
+the other task, t2 here, to be the same as that of t1. Setting the result of t2
+to be the same as t1 is referred to as resolving the result of t2.
+The worker invokes the work method of the tasks that are not duplicate.
+"""
+
+__author__ = 'yuhenglong@google.com (Yuheng Long)'
+
+import pipeline_process
+
+
+def helper(stage, done_dict, helper_queue, completed_queue, result_queue):
+ """Helper that filters duplicate tasks.
+
+ This method Continuously pulls duplicate tasks from the helper_queue. The
+ duplicate tasks need not be compiled/tested. This method also pulls completed
+ tasks from the worker queue and let the results of the duplicate tasks be the
+ same as their corresponding finished task.
+
+ Args:
+ stage: The current stage of the pipeline, for example, build stage or test
+ stage.
+ done_dict: A dictionary of tasks that are done. The key of the dictionary is
+ the identifier of the task. The value of the dictionary is the results of
+ performing the corresponding task.
+ helper_queue: A queue of duplicate tasks whose results need to be resolved.
+ This is a communication channel between the pipeline_process and this
+ helper process.
+ completed_queue: A queue of tasks that have been built/tested. The results
+ of these tasks are needed to resolve the results of the duplicate tasks.
+ This is the communication channel between the workers and this helper
+ process.
+ result_queue: After the results of the duplicate tasks have been resolved,
+ the duplicate tasks will be sent to the next stage via this queue.
+ """
+
+ # The list of duplicate tasks, the results of which need to be resolved.
+ waiting_list = []
+
+ while True:
+ # Pull duplicate task from the helper queue.
+ if not helper_queue.empty():
+ task = helper_queue.get()
+
+ if task == pipeline_process.POISONPILL:
+ # Poison pill means no more duplicate task from the helper queue.
+ break
+
+ # The task has not been performed before.
+ assert not task.done(stage)
+
+ # The identifier of this task.
+ identifier = task.get_identifier(stage)
+
+ # If a duplicate task comes before the corresponding resolved results from
+ # the completed_queue, it will be put in the waiting list. If the result
+ # arrives before the duplicate task, the duplicate task will be resolved
+ # right away.
+ if identifier in done_dict:
+ # This task has been encountered before and the result is available. The
+ # result can be resolved right away.
+ task.set_result(stage, done_dict[identifier])
+ result_queue.put(task)
+ else:
+ waiting_list.append(task)
+
+ # Check and get completed tasks from completed_queue.
+ get_result_from_completed_queue(stage, completed_queue, done_dict,
+ waiting_list, result_queue)
+
+ # Wait to resolve the results of the remaining duplicate tasks.
+ while waiting_list:
+ get_result_from_completed_queue(stage, completed_queue, done_dict,
+ waiting_list, result_queue)
+
+
+def get_result_from_completed_queue(stage, completed_queue, done_dict,
+ waiting_list, result_queue):
+ """Pull results from the completed queue and resolves duplicate tasks.
+
+ Args:
+ stage: The current stage of the pipeline, for example, build stage or test
+ stage.
+ completed_queue: A queue of tasks that have been performed. The results of
+ these tasks are needed to resolve the results of the duplicate tasks. This
+ is the communication channel between the workers and this method.
+ done_dict: A dictionary of tasks that are done. The key of the dictionary is
+ the optimization flags of the task. The value of the dictionary is the
+ compilation results of the corresponding task.
+ waiting_list: The list of duplicate tasks, the results of which need to be
+ resolved.
+ result_queue: After the results of the duplicate tasks have been resolved,
+ the duplicate tasks will be sent to the next stage via this queue.
+
+ This helper method tries to pull a completed task from the completed queue.
+ If it gets a task from the queue, it resolves the results of all the relevant
+ duplicate tasks in the waiting list. Relevant tasks are the tasks that have
+ the same flags as the currently received results from the completed_queue.
+ """
+ # Pull completed task from the worker queue.
+ if not completed_queue.empty():
+ (identifier, result) = completed_queue.get()
+ done_dict[identifier] = result
+
+ tasks = [t for t in waiting_list if t.get_identifier(stage) == identifier]
+ for duplicate_task in tasks:
+ duplicate_task.set_result(stage, result)
+ result_queue.put(duplicate_task)
+ waiting_list.remove(duplicate_task)
+
+
+def worker(stage, task, helper_queue, result_queue):
+ """Worker that performs the task.
+
+ This method calls the work method of the input task and distribute the result
+ to the helper and the next stage.
+
+ Args:
+ stage: The current stage of the pipeline, for example, build stage or test
+ stage.
+ task: Input task that needs to be performed.
+ helper_queue: Queue that holds the completed tasks and the results. This is
+ the communication channel between the worker and the helper.
+ result_queue: Queue that holds the completed tasks and the results. This is
+ the communication channel between the worker and the next stage.
+ """
+
+ # The task has not been completed before.
+ assert not task.done(stage)
+
+ task.work(stage)
+ helper_queue.put((task.get_identifier(stage), task.get_result(stage)))
+ result_queue.put(task)
diff --git a/bestflags/pipeline_worker_test.py b/bestflags/pipeline_worker_test.py
new file mode 100644
index 00000000..340b3011
--- /dev/null
+++ b/bestflags/pipeline_worker_test.py
@@ -0,0 +1,173 @@
+"""Unittest for the pipeline_worker functions in the build/test stage.
+
+This module tests the helper method and the worker method.
+"""
+
+__author__ = 'yuhenglong@google.com (Yuheng Long)'
+
+import multiprocessing
+import random
+import sys
+import unittest
+
+import pipeline_process
+import pipeline_worker
+
+
+TESTSTAGE = 0
+
+
+def MockTaskCostGenerator():
+ """Calls a random number generator and returns a negative number."""
+ return random.randint(-sys.maxint - 1, -1)
+
+
+class MockTask(object):
+ """This class emulates an actual task.
+
+ It does not do the actual work, but simply returns the result as given when
+ this task is constructed.
+ """
+
+ def __init__(self, identifier, cost):
+ """Set up the results for this task.
+
+ Args:
+ identifier: the identifier of this task.
+ cost: the mock cost of this task.
+
+ The _pre_cost field stores the cost. Once this task is performed, i.e., by
+ calling the work method , the _cost field will have this cost.
+ """
+
+ self._identifier = identifier
+ self._pre_cost = cost
+
+ def get_identifier(self, stage):
+ assert stage == TESTSTAGE
+ return self._identifier
+
+ def __eq__(self, other):
+ if isinstance(other, MockTask):
+ return self._identifier == other._identifier and self._cost == other._cost
+ return False
+
+ def set_result(self, stage, cost):
+ assert stage == TESTSTAGE
+ self._cost = cost
+
+ def work(self, stage):
+ assert stage == TESTSTAGE
+ self._cost = self._pre_cost
+
+ def get_result(self, stage):
+ assert stage == TESTSTAGE
+ return self._cost
+
+ def done(self, stage):
+ """Indicates whether the task has been performed."""
+
+ assert stage == TESTSTAGE
+ return '_cost' in self.__dict__
+
+
+class AuxiliaryTest(unittest.TestCase):
+ """This class tests the pipeline_worker functions.
+
+ Given the same identifier, the cost should result the same from the
+ pipeline_worker functions.
+ """
+
+ def testHelper(self):
+ """"Test the helper.
+
+ Call the helper method twice, and test the results. The results should be
+ the same, i.e., the cost should be the same.
+ """
+
+ # Set up the input, helper and output queue for the helper method.
+ manager = multiprocessing.Manager()
+ helper_queue = manager.Queue()
+ result_queue = manager.Queue()
+ completed_queue = manager.Queue()
+
+ # Set up the helper process that holds the helper method.
+ helper_process = multiprocessing.Process(target=pipeline_worker.helper,
+ args=(TESTSTAGE, {}, helper_queue,
+ completed_queue,
+ result_queue))
+ helper_process.start()
+
+ # A dictionary defines the mock result to the helper.
+ mock_result = {1: 1995, 2: 59, 9: 1027}
+
+ # Test if there is a task that is done before, whether the duplicate task
+ # will have the same result. Here, two different scenarios are tested. That
+ # is the mock results are added to the completed_queue before and after the
+ # corresponding mock tasks being added to the input queue.
+ completed_queue.put((9, mock_result[9]))
+
+ # The output of the helper should contain all the following tasks.
+ results = [1, 1, 2, 9]
+
+ # Testing the correctness of having tasks having the same identifier, here
+ # 1.
+ for result in results:
+ helper_queue.put(MockTask(result, MockTaskCostGenerator()))
+
+ completed_queue.put((2, mock_result[2]))
+ completed_queue.put((1, mock_result[1]))
+
+ # Signal there is no more duplicate task.
+ helper_queue.put(pipeline_process.POISONPILL)
+ helper_process.join()
+
+ while results:
+ task = result_queue.get()
+ identifier = task._identifier
+ cost = task._cost
+ self.assertTrue(identifier in results)
+ if identifier in mock_result:
+ self.assertTrue(cost, mock_result[identifier])
+ results.remove(task._identifier)
+
+ def testWorker(self):
+ """"Test the worker method.
+
+ The worker should process all the input tasks and output the tasks to the
+ helper and result queue.
+ """
+
+ manager = multiprocessing.Manager()
+ result_queue = manager.Queue()
+ completed_queue = manager.Queue()
+
+ # A dictionary defines the mock tasks and their corresponding results.
+ mock_work_tasks = {1: 86, 2: 788}
+
+ mock_tasks = []
+
+ for flag, cost in mock_work_tasks.iteritems():
+ mock_tasks.append(MockTask(flag, cost))
+
+ # Submit the mock tasks to the worker.
+ for mock_task in mock_tasks:
+ pipeline_worker.worker(TESTSTAGE, mock_task, completed_queue,
+ result_queue)
+
+ # The tasks, from the output queue, should be the same as the input and
+ # should be performed.
+ for task in mock_tasks:
+ output = result_queue.get()
+ self.assertEqual(output, task)
+ self.assertTrue(output.done(TESTSTAGE))
+
+ # The tasks, from the completed_queue, should be defined in the
+ # mock_work_tasks dictionary.
+ for flag, cost in mock_work_tasks.iteritems():
+ helper_input = completed_queue.get()
+ self.assertEqual(helper_input, (flag, cost))
+
+
+if __name__ == '__main__':
+ unittest.main()