aboutsummaryrefslogtreecommitdiff
path: root/bestflags
diff options
context:
space:
mode:
Diffstat (limited to 'bestflags')
-rw-r--r--bestflags/builder.py132
-rw-r--r--bestflags/builder_test.py164
-rw-r--r--bestflags/executor.py64
-rw-r--r--bestflags/executor_test.py42
-rw-r--r--bestflags/pipeline_worker.py137
-rw-r--r--bestflags/pipeline_worker_test.py173
6 files changed, 310 insertions, 402 deletions
diff --git a/bestflags/builder.py b/bestflags/builder.py
deleted file mode 100644
index 535c1a14..00000000
--- a/bestflags/builder.py
+++ /dev/null
@@ -1,132 +0,0 @@
-"""The Build stage of the framework.
-
-This module defines the builder helper and the actual build worker. If there are
-duplicate tasks, for example t1 and t2, needs to be built, one of them, for
-example t1, will be built and the helper waits for the result of t1 and set the
-results of the other task, t2 here, to be the same as that of t1. Setting the
-result of t2 to be the same as t1 is referred to as resolving the result of t2.
-The build worker invokes the compile method of the tasks that are not duplicate.
-"""
-
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
-
-import pipeline_process
-
-
-def build_helper(done_dict, helper_queue, built_queue, result_queue):
- """Build helper.
-
- This method Continuously pulls duplicate tasks from the helper_queue. The
- duplicate tasks need not be compiled. This method also pulls completed tasks
- from the worker queue and let the results of the duplicate tasks be the
- same as their corresponding finished task.
-
- Args:
- done_dict: A dictionary of tasks that are done. The key of the dictionary is
- the optimization flags of the task. The value of the dictionary is the
- compilation results of the corresponding task.
- helper_queue: A queue of duplicate tasks whose results need to be resolved.
- This is a communication channel between the pipeline_process and this
- helper process.
- built_queue: A queue of tasks that have been built. The results of these
- tasks are needed to resolve the results of the duplicate tasks. This is
- the communication channel between the actual build workers and this helper
- process.
- result_queue: After the results of the duplicate tasks have been resolved,
- the duplicate tasks will be sent to the next stage via this queue.
- """
-
- # The list of duplicate tasks, the results of which need to be resolved.
- waiting_list = []
-
- while True:
- # Pull duplicate task from the helper queue.
- if not helper_queue.empty():
- task = helper_queue.get()
-
- if task == pipeline_process.POISONPILL:
- # Poison pill means no more duplicate task from the helper queue.
- break
-
- # The task has not been compiled before.
- assert not task.compiled()
-
- # The optimization flags of this task.
- flags = task.get_flags()
-
- # If a duplicate task come before the corresponding resolved results from
- # the built_queue, it will be put in the waiting list. If the result
- # arrives before the duplicate task, the duplicate task will be resolved
- # right away.
- if flags in done_dict:
- # This task has been encountered before and the result is available. The
- # result can be resolved right away.
- task.set_build_result(done_dict[flags])
- result_queue.put(task)
- else:
- waiting_list.append(task)
-
- # Check and get compiled tasks from compiled_queue.
- get_result_from_built_queue(built_queue, done_dict, waiting_list,
- result_queue)
-
- # Wait to resolve the results of the remaining duplicate tasks.
- while waiting_list:
- get_result_from_built_queue(built_queue, done_dict, waiting_list,
- result_queue)
-
-
-def get_result_from_built_queue(built_queue, done_dict, waiting_list,
- result_queue):
- """Pull results from the compiled queue and resolves duplicate tasks.
-
- Args:
- built_queue: A queue of tasks that have been built. The results of these
- tasks are needed to resolve the results of the duplicate tasks. This is
- the communication channel between the actual build workers and this helper
- process.
- done_dict: A dictionary of tasks that are done. The key of the dictionary is
- the optimization flags of the task. The value of the dictionary is the
- compilation results of the corresponding task.
- waiting_list: The list of duplicate tasks, the results of which need to be
- resolved.
- result_queue: After the results of the duplicate tasks have been resolved,
- the duplicate tasks will be sent to the next stage via this queue.
-
- This helper method tries to pull a compiled task from the compiled queue.
- If it gets a task from the queue, it resolves the results of all the relevant
- duplicate tasks in the waiting list. Relevant tasks are the tasks that have
- the same flags as the currently received results from the built_queue.
- """
- # Pull completed task from the worker queue.
- if not built_queue.empty():
- (flags, build_result) = built_queue.get()
- done_dict[flags] = build_result
-
- task_list = [t for t in waiting_list if t.get_flags() == flags]
- for duplicate_task in task_list:
- duplicate_task.set_build_result(build_result)
- result_queue.put(duplicate_task)
- waiting_list.remove(duplicate_task)
-
-
-def build_worker(task, helper_queue, result_queue):
- """Build worker.
-
- This method calls the compile method of the input task and distribute the
- result to the helper and the next stage.
-
- Args:
- task: Input task that needs to be built.
- helper_queue: Queue that holds the completed tasks and the build results.
- This is a communication channel between the worker and the helper.
- result_queue: Queue that holds the completed tasks and the build results.
- This is a communication channel between the worker and the next stage.
- """
-
- # The task has not been compiled before.
- assert not task.compiled()
-
- task.compile()
- helper_queue.put((task.get_flags(), task.get_build_result()))
- result_queue.put(task)
diff --git a/bestflags/builder_test.py b/bestflags/builder_test.py
deleted file mode 100644
index a705ee1a..00000000
--- a/bestflags/builder_test.py
+++ /dev/null
@@ -1,164 +0,0 @@
-"""Builder unittest.
-
-This module tests the build helper method and the worker method.
-"""
-
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
-
-import multiprocessing
-import random
-import sys
-import unittest
-
-import builder
-import pipeline_process
-
-
-def MockTaskCostGenerator():
- """Calls a random number generator and returns a negative number."""
- return random.randint(-sys.maxint - 1, -1)
-
-
-class MockTask(object):
- """This class emulates an actual task.
-
- It does not do the actual compile, but simply returns the build result as when
- this task is constructed.
- """
-
- def __init__(self, flags, cost):
- """Set up the compile results for this task.
-
- Args:
- flags: the optimization flags of this task.
- cost: the mork build cost of this task.
-
- The _pre_cost field stored the 'compiled' cost. Once this task is
- compiled, i.e., by calling the compile method , the _cost field will have
- this 'compiled' cost.
- """
-
- self._flags = flags
- self._pre_cost = cost
-
- def get_flags(self):
- return self._flags
-
- def __eq__(self, other):
- if isinstance(other, MockTask):
- return self._flags == other._flags and self._cost == other._cost
- return False
-
- def set_build_result(self, cost):
- self._cost = cost
-
- def compile(self):
- self._cost = self._pre_cost
-
- def get_build_result(self):
- return self._cost
-
- def compiled(self):
- """Indicates whether the task has been compiled."""
-
- return '_cost' in self.__dict__
-
-
-class BuilderTest(unittest.TestCase):
- """This class tests the Builder.
-
- Given the same flags set, the image and the cost should result the same from
- the builder.
- """
-
- def testHelper(self):
- """"Test the build helper.
-
- Call the build method twice, and test the results. The results should be the
- same, i.e., the cost should be the same.
- """
-
- # Set up the input, helper and output queue for the worker method.
- manager = multiprocessing.Manager()
- helper_queue = manager.Queue()
- output_queue = manager.Queue()
- built_queue = manager.Queue()
-
- # Set up the helper process that holds the helper method.
- helper_process = multiprocessing.Process(target=builder.build_helper,
- args=({}, helper_queue,
- built_queue, output_queue))
- helper_process.start()
-
- # A dictionary defines the mock compile result to the build_helper.
- mock_compile_result = {1: 1995, 2: 59, 9: 1027}
-
- # Test if there is a task that is done before, whether the duplicate task
- # will have the same result. Here, two different scenarios are tested. That
- # is the mock results are added to the built_queue before and after the
- # corresponding mock tasks being ;added to the input queue.
- built_queue.put((9, mock_compile_result[9]))
-
- # The output of the helper should contain all the following tasks.
- results = [1, 1, 2, 9]
-
- # Testing the correctness of having tasks having the same flags key, here 1.
- for result in results:
- helper_queue.put(MockTask(result, MockTaskCostGenerator()))
-
- built_queue.put((2, mock_compile_result[2]))
- built_queue.put((1, mock_compile_result[1]))
-
- # Signal there is no more duplicate task.
- helper_queue.put(pipeline_process.POISONPILL)
- helper_process.join()
-
- while results:
- task = output_queue.get()
- flags = task._flags
- cost = task._cost
- self.assertTrue(flags in results)
- if flags in mock_compile_result:
- self.assertTrue(cost, mock_compile_result[flags])
- results.remove(task._flags)
-
- def testWorker(self):
- """"Test the actual build worker method.
-
- The worker should process all the input tasks and output the tasks to the
- helper and result queue.
- """
-
- manager = multiprocessing.Manager()
- output_queue = manager.Queue()
- built_queue = manager.Queue()
-
- # A dictionary defines the mock tasks and their corresponding compile
- # results.
- mock_compile_tasks = {1: 86, 2: 788}
-
- mock_tasks = []
-
- for flag, cost in mock_compile_tasks.iteritems():
- mock_tasks.append(MockTask(flag, cost))
-
- # Submit the mock tasks to the build worker.
- for mock_task in mock_tasks:
- builder.build_worker(mock_task, built_queue, output_queue)
-
- # The tasks, from the output queue, should be the same as the input and
- # should be compiled.
- for task in mock_tasks:
- output = output_queue.get()
- self.assertEqual(output, task)
- self.assertTrue(output.compiled())
-
- # The tasks, from the built queue, should be defined in the
- # mock_compile_tasks dictionary.
- for flag, cost in mock_compile_tasks.iteritems():
- helper_input = built_queue.get()
- self.assertEqual(helper_input, (flag, cost))
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/bestflags/executor.py b/bestflags/executor.py
deleted file mode 100644
index 91dd9288..00000000
--- a/bestflags/executor.py
+++ /dev/null
@@ -1,64 +0,0 @@
-"""The Execution stage of the framework.
-
-Execute the image against a set of benchmarks. This stage sets up a number of
-processes, calls the actual execute method and caches the results.
-"""
-
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
-
-import multiprocessing
-
-
-class Tester(object):
- """Execute the generated images against a set of benchmark applications."""
-
- def __init__(self, numProcess, costs):
- """Set up the process pool and the results cached.
-
- Args:
- numProcess: Maximum number of execution to run in parallel
- costs: Executions that have been benchmarked before
- """
-
- self._pool = multiprocessing.Pool(numProcess)
- self._costs = costs
-
- def _set_cost(self, image, cost):
- """Record the execution result for the current image.
-
- Args:
- image: The input image for the execution
- cost: the time it takes to execute the image
- """
-
- pass
-
- def _execute(self, task):
- """Execute the benchmarks on task.
-
- The concrete subclass should implement the actual execution.
-
- Args:
- task: The input task for the execution
- """
- # raise Exception('Must be implemented in child class')
- pass
-
- def _execute_task(self, task):
- """Execute the input task and record the cost.
-
- Args:
- task: The task to be compiled
- """
- pass
-
- def execute(self, generation):
- """Execute the image for all entities in a generation.
-
- Call them in parallel in processes.
-
- Args:
- generation: A new generation to be executed.
- """
-
- self._pool.map(self._execute_task, generation.task, 1)
diff --git a/bestflags/executor_test.py b/bestflags/executor_test.py
deleted file mode 100644
index 0f27fb9a..00000000
--- a/bestflags/executor_test.py
+++ /dev/null
@@ -1,42 +0,0 @@
-"""Tester unittest."""
-
-__author__ = 'yuhenglong@google.com (Yuheng Long)'
-
-import unittest
-
-import executor
-
-
-class TesterTest(unittest.TestCase):
- """This class test the Executor.
-
- Given the same flags set and/or checksum, the image and the cost should be the
- same from the Executor.
- """
-
- def setUp(self):
- """Create the Executor to be tested."""
-
- self.tester = executor.Tester(1, None)
-
- def testExecute(self):
- """"Test the execute method.
-
- Call the execute method twice, and test the results. The results should be
- the same, i.e., the cost should be the same.
- Either the execute method or the set_execution_result of the input
- Generation for the Tester should be called, but not both.
- """
- self.tester.execute(self)
-
- def testInit(self):
- """"Test the init method.
-
- If a certain checksum has been encountered before, the Tester should not
- reexecute the images with the same checksum.
- """
-
- pass
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/bestflags/pipeline_worker.py b/bestflags/pipeline_worker.py
new file mode 100644
index 00000000..6ab965e2
--- /dev/null
+++ b/bestflags/pipeline_worker.py
@@ -0,0 +1,137 @@
+"""The pipeline_worker functions of the build and test stage of the framework.
+
+This module defines the helper and the worker. If there are duplicate tasks, for
+example t1 and t2, needs to be built/tested, one of them, for example t1, will
+be built/tested and the helper waits for the result of t1 and set the results of
+the other task, t2 here, to be the same as that of t1. Setting the result of t2
+to be the same as t1 is referred to as resolving the result of t2.
+The worker invokes the work method of the tasks that are not duplicate.
+"""
+
+__author__ = 'yuhenglong@google.com (Yuheng Long)'
+
+import pipeline_process
+
+
+def helper(stage, done_dict, helper_queue, completed_queue, result_queue):
+ """Helper that filters duplicate tasks.
+
+ This method Continuously pulls duplicate tasks from the helper_queue. The
+ duplicate tasks need not be compiled/tested. This method also pulls completed
+ tasks from the worker queue and let the results of the duplicate tasks be the
+ same as their corresponding finished task.
+
+ Args:
+ stage: The current stage of the pipeline, for example, build stage or test
+ stage.
+ done_dict: A dictionary of tasks that are done. The key of the dictionary is
+ the identifier of the task. The value of the dictionary is the results of
+ performing the corresponding task.
+ helper_queue: A queue of duplicate tasks whose results need to be resolved.
+ This is a communication channel between the pipeline_process and this
+ helper process.
+ completed_queue: A queue of tasks that have been built/tested. The results
+ of these tasks are needed to resolve the results of the duplicate tasks.
+ This is the communication channel between the workers and this helper
+ process.
+ result_queue: After the results of the duplicate tasks have been resolved,
+ the duplicate tasks will be sent to the next stage via this queue.
+ """
+
+ # The list of duplicate tasks, the results of which need to be resolved.
+ waiting_list = []
+
+ while True:
+ # Pull duplicate task from the helper queue.
+ if not helper_queue.empty():
+ task = helper_queue.get()
+
+ if task == pipeline_process.POISONPILL:
+ # Poison pill means no more duplicate task from the helper queue.
+ break
+
+ # The task has not been performed before.
+ assert not task.done(stage)
+
+ # The identifier of this task.
+ identifier = task.get_identifier(stage)
+
+ # If a duplicate task comes before the corresponding resolved results from
+ # the completed_queue, it will be put in the waiting list. If the result
+ # arrives before the duplicate task, the duplicate task will be resolved
+ # right away.
+ if identifier in done_dict:
+ # This task has been encountered before and the result is available. The
+ # result can be resolved right away.
+ task.set_result(stage, done_dict[identifier])
+ result_queue.put(task)
+ else:
+ waiting_list.append(task)
+
+ # Check and get completed tasks from completed_queue.
+ get_result_from_completed_queue(stage, completed_queue, done_dict,
+ waiting_list, result_queue)
+
+ # Wait to resolve the results of the remaining duplicate tasks.
+ while waiting_list:
+ get_result_from_completed_queue(stage, completed_queue, done_dict,
+ waiting_list, result_queue)
+
+
+def get_result_from_completed_queue(stage, completed_queue, done_dict,
+ waiting_list, result_queue):
+ """Pull results from the completed queue and resolves duplicate tasks.
+
+ Args:
+ stage: The current stage of the pipeline, for example, build stage or test
+ stage.
+ completed_queue: A queue of tasks that have been performed. The results of
+ these tasks are needed to resolve the results of the duplicate tasks. This
+ is the communication channel between the workers and this method.
+ done_dict: A dictionary of tasks that are done. The key of the dictionary is
+ the optimization flags of the task. The value of the dictionary is the
+ compilation results of the corresponding task.
+ waiting_list: The list of duplicate tasks, the results of which need to be
+ resolved.
+ result_queue: After the results of the duplicate tasks have been resolved,
+ the duplicate tasks will be sent to the next stage via this queue.
+
+ This helper method tries to pull a completed task from the completed queue.
+ If it gets a task from the queue, it resolves the results of all the relevant
+ duplicate tasks in the waiting list. Relevant tasks are the tasks that have
+ the same flags as the currently received results from the completed_queue.
+ """
+ # Pull completed task from the worker queue.
+ if not completed_queue.empty():
+ (identifier, result) = completed_queue.get()
+ done_dict[identifier] = result
+
+ tasks = [t for t in waiting_list if t.get_identifier(stage) == identifier]
+ for duplicate_task in tasks:
+ duplicate_task.set_result(stage, result)
+ result_queue.put(duplicate_task)
+ waiting_list.remove(duplicate_task)
+
+
+def worker(stage, task, helper_queue, result_queue):
+ """Worker that performs the task.
+
+ This method calls the work method of the input task and distribute the result
+ to the helper and the next stage.
+
+ Args:
+ stage: The current stage of the pipeline, for example, build stage or test
+ stage.
+ task: Input task that needs to be performed.
+ helper_queue: Queue that holds the completed tasks and the results. This is
+ the communication channel between the worker and the helper.
+ result_queue: Queue that holds the completed tasks and the results. This is
+ the communication channel between the worker and the next stage.
+ """
+
+ # The task has not been completed before.
+ assert not task.done(stage)
+
+ task.work(stage)
+ helper_queue.put((task.get_identifier(stage), task.get_result(stage)))
+ result_queue.put(task)
diff --git a/bestflags/pipeline_worker_test.py b/bestflags/pipeline_worker_test.py
new file mode 100644
index 00000000..340b3011
--- /dev/null
+++ b/bestflags/pipeline_worker_test.py
@@ -0,0 +1,173 @@
+"""Unittest for the pipeline_worker functions in the build/test stage.
+
+This module tests the helper method and the worker method.
+"""
+
+__author__ = 'yuhenglong@google.com (Yuheng Long)'
+
+import multiprocessing
+import random
+import sys
+import unittest
+
+import pipeline_process
+import pipeline_worker
+
+
+TESTSTAGE = 0
+
+
+def MockTaskCostGenerator():
+ """Calls a random number generator and returns a negative number."""
+ return random.randint(-sys.maxint - 1, -1)
+
+
+class MockTask(object):
+ """This class emulates an actual task.
+
+ It does not do the actual work, but simply returns the result as given when
+ this task is constructed.
+ """
+
+ def __init__(self, identifier, cost):
+ """Set up the results for this task.
+
+ Args:
+ identifier: the identifier of this task.
+ cost: the mock cost of this task.
+
+ The _pre_cost field stores the cost. Once this task is performed, i.e., by
+ calling the work method , the _cost field will have this cost.
+ """
+
+ self._identifier = identifier
+ self._pre_cost = cost
+
+ def get_identifier(self, stage):
+ assert stage == TESTSTAGE
+ return self._identifier
+
+ def __eq__(self, other):
+ if isinstance(other, MockTask):
+ return self._identifier == other._identifier and self._cost == other._cost
+ return False
+
+ def set_result(self, stage, cost):
+ assert stage == TESTSTAGE
+ self._cost = cost
+
+ def work(self, stage):
+ assert stage == TESTSTAGE
+ self._cost = self._pre_cost
+
+ def get_result(self, stage):
+ assert stage == TESTSTAGE
+ return self._cost
+
+ def done(self, stage):
+ """Indicates whether the task has been performed."""
+
+ assert stage == TESTSTAGE
+ return '_cost' in self.__dict__
+
+
+class AuxiliaryTest(unittest.TestCase):
+ """This class tests the pipeline_worker functions.
+
+ Given the same identifier, the cost should result the same from the
+ pipeline_worker functions.
+ """
+
+ def testHelper(self):
+ """"Test the helper.
+
+ Call the helper method twice, and test the results. The results should be
+ the same, i.e., the cost should be the same.
+ """
+
+ # Set up the input, helper and output queue for the helper method.
+ manager = multiprocessing.Manager()
+ helper_queue = manager.Queue()
+ result_queue = manager.Queue()
+ completed_queue = manager.Queue()
+
+ # Set up the helper process that holds the helper method.
+ helper_process = multiprocessing.Process(target=pipeline_worker.helper,
+ args=(TESTSTAGE, {}, helper_queue,
+ completed_queue,
+ result_queue))
+ helper_process.start()
+
+ # A dictionary defines the mock result to the helper.
+ mock_result = {1: 1995, 2: 59, 9: 1027}
+
+ # Test if there is a task that is done before, whether the duplicate task
+ # will have the same result. Here, two different scenarios are tested. That
+ # is the mock results are added to the completed_queue before and after the
+ # corresponding mock tasks being added to the input queue.
+ completed_queue.put((9, mock_result[9]))
+
+ # The output of the helper should contain all the following tasks.
+ results = [1, 1, 2, 9]
+
+ # Testing the correctness of having tasks having the same identifier, here
+ # 1.
+ for result in results:
+ helper_queue.put(MockTask(result, MockTaskCostGenerator()))
+
+ completed_queue.put((2, mock_result[2]))
+ completed_queue.put((1, mock_result[1]))
+
+ # Signal there is no more duplicate task.
+ helper_queue.put(pipeline_process.POISONPILL)
+ helper_process.join()
+
+ while results:
+ task = result_queue.get()
+ identifier = task._identifier
+ cost = task._cost
+ self.assertTrue(identifier in results)
+ if identifier in mock_result:
+ self.assertTrue(cost, mock_result[identifier])
+ results.remove(task._identifier)
+
+ def testWorker(self):
+ """"Test the worker method.
+
+ The worker should process all the input tasks and output the tasks to the
+ helper and result queue.
+ """
+
+ manager = multiprocessing.Manager()
+ result_queue = manager.Queue()
+ completed_queue = manager.Queue()
+
+ # A dictionary defines the mock tasks and their corresponding results.
+ mock_work_tasks = {1: 86, 2: 788}
+
+ mock_tasks = []
+
+ for flag, cost in mock_work_tasks.iteritems():
+ mock_tasks.append(MockTask(flag, cost))
+
+ # Submit the mock tasks to the worker.
+ for mock_task in mock_tasks:
+ pipeline_worker.worker(TESTSTAGE, mock_task, completed_queue,
+ result_queue)
+
+ # The tasks, from the output queue, should be the same as the input and
+ # should be performed.
+ for task in mock_tasks:
+ output = result_queue.get()
+ self.assertEqual(output, task)
+ self.assertTrue(output.done(TESTSTAGE))
+
+ # The tasks, from the completed_queue, should be defined in the
+ # mock_work_tasks dictionary.
+ for flag, cost in mock_work_tasks.iteritems():
+ helper_input = completed_queue.get()
+ self.assertEqual(helper_input, (flag, cost))
+
+
+if __name__ == '__main__':
+ unittest.main()