diff options
Diffstat (limited to 'crosperf/schedv2.py')
-rw-r--r-- | crosperf/schedv2.py | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/crosperf/schedv2.py b/crosperf/schedv2.py new file mode 100644 index 00000000..50df8edd --- /dev/null +++ b/crosperf/schedv2.py @@ -0,0 +1,308 @@ +#!/usr/bin/python + +# Copyright 2015 Google Inc. All Rights Reserved. + +import sys +import test_flag +import traceback + +from collections import defaultdict +from machine_image_manager import MachineImageManager +from threading import Lock +from threading import Thread +from utils import logger + + +class DutWorker(Thread): + """Working thread for a dut.""" + + def __init__(self, dut, sched): + super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name)) + self._dut = dut + self._sched = sched + self._stat_num_br_run = 0 + self._stat_num_reimage = 0 + self._stat_annotation = "" + self._l = logger.GetLogger(self._sched._experiment.log_dir) + self.daemon = True + self._terminated = False + self._active_br = None + # Race condition accessing _active_br between _execute_benchmark_run and + # _terminate, so lock it up. + self._active_br_lock = Lock() + + def terminate(self): + self._terminated = True + with self._active_br_lock: + if self._active_br is not None: + # BenchmarkRun.Terminate() terminates any running testcase via + # suite_runner.Terminate and updates timeline. + self._active_br.Terminate() + + def run(self): + """Do the "run-test->(optionally reimage)->run-test" chore. + + Note - 'br' below means 'benchmark_run'. + """ + + self._setup_dut_label() + try: + self._l.LogOutput("{} started.".format(self)) + while not self._terminated: + br = self._sched.get_benchmark_run(self._dut) + if br is None: + # No br left for this label. Considering reimaging. + label = self._sched.allocate_label(self._dut) + if label is None: + # No br even for other labels. We are done. + self._l.LogOutput("ImageManager found no label " + "for dut, stopping working " + "thread {}.".format(self)) + break + if self._reimage(label): + # Reimage to run other br fails, dut is doomed, stop + # this thread. + self._l.LogWarning("Re-image failed, dut " + "in an unstable state, stopping " + "working thread {}.".format(self)) + break + else: + # Execute the br. + self._execute_benchmark_run(br) + finally: + self._stat_annotation = "finished" + # Thread finishes. Notify scheduler that I'm done. + self._sched.dut_worker_finished(self) + + def _reimage(self, label): + """Reimage image to label. + + Args: + label: the label to remimage onto dut. + + Returns: + 0 if successful, otherwise 1. + """ + + # Termination could happen anywhere, check it. + if self._terminated: + return 1 + + self._l.LogOutput('Reimaging {} using {}'.format(self, label)) + self._stat_num_reimage += 1 + self._stat_annotation = 'reimaging using "{}"'.format(label.name) + try: + # Note, only 1 reimage at any given time, this is guaranteed in + # ImageMachine, so no sync needed below. + retval = self._sched._experiment.machine_manager.ImageMachine( + self._dut, label) + if retval: + return 1 + except: + return 1 + + self._dut.label = label + return 0 + + def _execute_benchmark_run(self, br): + """Execute a single benchmark_run. + + Note - this function never throws exceptions. + """ + + # Termination could happen anywhere, check it. + if self._terminated: + return + + self._l.LogOutput('{} started working on {}'.format(self, br)) + self._stat_num_br_run += 1 + self._stat_annotation = 'executing {}'.format(br) + # benchmark_run.run does not throws, but just play it safe here. + try: + assert br.owner_thread is None + br.owner_thread = self + with self._active_br_lock: + self._active_br = br + br.run() + finally: + self._sched._experiment.BenchmarkRunFinished(br) + with self._active_br_lock: + self._active_br = None + + def _setup_dut_label(self): + """Try to match dut image with a certain experiment label. + + If such match is found, we just skip doing reimage and jump to execute + some benchmark_runs. + """ + + checksum_file = "/usr/local/osimage_checksum_file" + try: + rv, checksum, _ = command_executer.GetCommandExecuter().\ + CrosRunCommand( + "cat " + checksum_file, + return_output=True, + chromeos_root=self._sched._labels[0].chromeos_root, + machine=self._dut.name) + if rv == 0: + checksum = checksum.strip() + for l in self._sched._labels: + if l.checksum == checksum: + self._l.LogOutput( + "Dut '{}' is pre-installed with '{}'".format( + self._dut.name, l)) + self._dut.label = l + return + except: + traceback.print_exc(file=sys.stdout) + self._dut.label = None + + def __str__(self): + return 'DutWorker[dut="{}", label="{}"]'.format( + self._dut.name, self._dut.label.name if self._dut.label else "None") + + def dut(self): + return self._dut + + def status_str(self): + """Report thread status.""" + + return ('Worker thread "{}", label="{}", benchmark_run={}, ' + 'reimage={}, now {}'.format( + self._dut.name, + 'None' if self._dut.label is None else self._dut.label.name, + self._stat_num_br_run, + self._stat_num_reimage, + self._stat_annotation)) + + +class Schedv2(object): + """New scheduler for crosperf.""" + + def __init__(self, experiment): + self._experiment = experiment + self._l = logger.GetLogger(experiment.log_dir) + + # Create shortcuts to nested data structure. "_duts" points to a list of + # locked machines. _labels points to a list of all labels. + self._duts = self._experiment.machine_manager._all_machines + self._labels = self._experiment.labels + + # Mapping from label to a list of benchmark_runs. + self._label_brl_map = dict([(l, []) for l in self._labels]) + for br in self._experiment.benchmark_runs: + assert br.label in self._label_brl_map + self._label_brl_map[br.label].append(br) + + # Use machine image manager to calculate initial label allocation. + self._mim = MachineImageManager(self._labels, self._duts) + self._mim.compute_initial_allocation() + + # Create worker thread, 1 per dut. + self._active_workers = [DutWorker(dut, self) for dut in self._duts] + self._finished_workers = [] + + # Bookkeeping for synchronization. + self._workers_lock = Lock() + self._lock_map = defaultdict(lambda: Lock()) + + # Termination flag. + self._terminated = False + + # Test mode flag + self._in_test_mode = test_flag.GetTestMode() + + def run_sched(self): + """Start all dut worker threads and return immediately.""" + + [w.start() for w in self._active_workers] + + def get_benchmark_run(self, dut): + """Get a benchmark_run (br) object for a certain dut. + + Arguments: + dut: the dut for which a br is returned. + + Returns: + A br with its label matching that of the dut. If no such br could be + found, return None (this usually means a reimage is required for the + dut). + """ + + # If terminated, stop providing any br. + if self._terminated: + return None + + # If dut bears an unrecognized label, return None. + if dut.label is None: + return None + + # If br list for the dut's label is empty (that means all brs for this + # label have been done) , return None. + with self._lock_on(dut.label): + brl = self._label_brl_map[dut.label] + if not brl: + return None + # Return the first br. + return brl.pop(0) + + def allocate_label(self, dut): + """Allocate a label to a dut. + + The work is delegated to MachineImageManager. + + The dut_worker calling this method is responsible for reimage the dut to + this label. + + Arguments: + dut: the new label that is to be reimaged onto the dut. + + Returns: + The label or None. + """ + + if self._terminated: + return None + + return self._mim.allocate(dut, self) + + def dut_worker_finished(self, dut_worker): + """Notify schedv2 that the dut_worker thread finished. + + Arguemnts: + dut_worker: the thread that is about to end.""" + + self._l.LogOutput("{} finished.".format(dut_worker)) + with self._workers_lock: + self._active_workers.remove(dut_worker) + self._finished_workers.append(dut_worker) + + def is_complete(self): + return len(self._active_workers) == 0 + + def _lock_on(self, object): + return self._lock_map[object] + + def terminate(self): + """Mark flag so we stop providing br/reimages. + + Also terminate each DutWorker, so they refuse to execute br or reimage. + """ + + self._terminated = True + for dut_worker in self._active_workers: + dut_worker.terminate() + + def threads_status_as_string(self): + """Report the dut worker threads status.""" + + status = "{} active threads, {} finished threads.\n".format( + len(self._active_workers), len(self._finished_workers)) + status += " Active threads:" + for dw in self._active_workers: + status += '\n ' + dw.status_str() + if self._finished_workers: + status += "\n Finished threads:" + for dw in self._finished_workers: + status += '\n ' + dw.status_str() + return status |