diff options
Diffstat (limited to 'crosperf/schedv2.py')
-rw-r--r-- | crosperf/schedv2.py | 145 |
1 files changed, 126 insertions, 19 deletions
diff --git a/crosperf/schedv2.py b/crosperf/schedv2.py index a8ef4d57..f33b30ad 100644 --- a/crosperf/schedv2.py +++ b/crosperf/schedv2.py @@ -2,6 +2,7 @@ # Copyright 2015 Google Inc. All Rights Reserved. +import math import sys import test_flag import traceback @@ -24,7 +25,7 @@ class DutWorker(Thread): self._stat_num_br_run = 0 self._stat_num_reimage = 0 self._stat_annotation = "" - self._l = logger.GetLogger(self._sched._experiment.log_dir) + self._logger = logger.GetLogger(self._sched._experiment.log_dir) self.daemon = True self._terminated = False self._active_br = None @@ -46,9 +47,20 @@ class DutWorker(Thread): Note - 'br' below means 'benchmark_run'. """ + # Firstly, handle benchmarkruns that have cache hit. + br = self._sched.get_cached_benchmark_run() + while br: + try: + self._stat_annotation = 'finishing cached {}'.format(br) + br.run() + except: + traceback.print_exc(file=sys.stdout) + br = self._sched.get_cached_benchmark_run() + + # Secondly, handle benchmarkruns that needs to be run on dut. self._setup_dut_label() try: - self._l.LogOutput("{} started.".format(self)) + self._logger.LogOutput("{} started.".format(self)) while not self._terminated: br = self._sched.get_benchmark_run(self._dut) if br is None: @@ -56,14 +68,14 @@ class DutWorker(Thread): label = self._sched.allocate_label(self._dut) if label is None: # No br even for other labels. We are done. - self._l.LogOutput("ImageManager found no label " + self._logger.LogOutput("ImageManager found no label " "for dut, stopping working " "thread {}.".format(self)) break if self._reimage(label): # Reimage to run other br fails, dut is doomed, stop # this thread. - self._l.LogWarning("Re-image failed, dut " + self._logger.LogWarning("Re-image failed, dut " "in an unstable state, stopping " "working thread {}.".format(self)) break @@ -89,7 +101,7 @@ class DutWorker(Thread): if self._terminated: return 1 - self._l.LogOutput('Reimaging {} using {}'.format(self, label)) + self._logger.LogOutput('Reimaging {} using {}'.format(self, label)) self._stat_num_reimage += 1 self._stat_annotation = 'reimaging using "{}"'.format(label.name) try: @@ -115,7 +127,7 @@ class DutWorker(Thread): if self._terminated: return - self._l.LogOutput('{} started working on {}'.format(self, br)) + self._logger.LogOutput('{} started working on {}'.format(self, br)) self._stat_num_br_run += 1 self._stat_annotation = 'executing {}'.format(br) # benchmark_run.run does not throws, but just play it safe here. @@ -144,12 +156,13 @@ class DutWorker(Thread): "cat " + checksum_file, return_output=True, chromeos_root=self._sched._labels[0].chromeos_root, - machine=self._dut.name) + machine=self._dut.name, + print_to_console=False) if rv == 0: checksum = checksum.strip() for l in self._sched._labels: if l.checksum == checksum: - self._l.LogOutput( + self._logger.LogOutput( "Dut '{}' is pre-installed with '{}'".format( self._dut.name, l)) self._dut.label = l @@ -176,24 +189,62 @@ class DutWorker(Thread): self._stat_num_reimage, self._stat_annotation)) +class BenchmarkRunCacheReader(Thread): + """The thread to read cache for a list of benchmark_runs. + + On creation, each instance of this class is given a br_list, which is a + subset of experiment._benchmark_runs. + """ + + def __init__(self, schedv2, br_list): + super(BenchmarkRunCacheReader, self).__init__() + self._schedv2 = schedv2 + self._br_list = br_list + self._logger = self._schedv2._logger + + def run(self): + for br in self._br_list: + try: + br.ReadCache() + if br.cache_hit: + self._logger.LogOutput('Cache hit - {}'.format(br)) + with self._schedv2._lock_on('_cached_br_list'): + self._schedv2._cached_br_list.append(br) + else: + self._logger.LogOutput('Cache not hit - {}'.format(br)) + except: + traceback.print_exc(file=sys.stderr) + class Schedv2(object): """New scheduler for crosperf.""" def __init__(self, experiment): self._experiment = experiment - self._l = logger.GetLogger(experiment.log_dir) + self._logger = logger.GetLogger(experiment.log_dir) # Create shortcuts to nested data structure. "_duts" points to a list of # locked machines. _labels points to a list of all labels. self._duts = self._experiment.machine_manager._all_machines self._labels = self._experiment.labels + # Bookkeeping for synchronization. + self._workers_lock = Lock() + self._lock_map = defaultdict(lambda: Lock()) + + # Test mode flag + self._in_test_mode = test_flag.GetTestMode() + + # Read benchmarkrun cache. + self._read_br_cache() + # Mapping from label to a list of benchmark_runs. self._label_brl_map = dict([(l, []) for l in self._labels]) for br in self._experiment.benchmark_runs: assert br.label in self._label_brl_map - self._label_brl_map[br.label].append(br) + # Only put no-cache-hit br into the map. + if br not in self._cached_br_list: + self._label_brl_map[br.label].append(br) # Use machine image manager to calculate initial label allocation. self._mim = MachineImageManager(self._labels, self._duts) @@ -203,21 +254,77 @@ class Schedv2(object): self._active_workers = [DutWorker(dut, self) for dut in self._duts] self._finished_workers = [] - # Bookkeeping for synchronization. - self._workers_lock = Lock() - self._lock_map = defaultdict(lambda: Lock()) - # Termination flag. self._terminated = False - # Test mode flag - self._in_test_mode = test_flag.GetTestMode() - def run_sched(self): """Start all dut worker threads and return immediately.""" [w.start() for w in self._active_workers] + def _read_br_cache(self): + """Use multi-threading to read cache for all benchmarkruns. + + We do this by firstly creating a few threads, and then assign each + thread a segment of all brs. Each thread will check cache status for + each br and put those with cache into '_cached_br_list'.""" + + self._cached_br_list = [] + n_benchmarkruns = len(self._experiment.benchmark_runs) + if n_benchmarkruns <= 4: + # Use single thread to read cache. + self._logger.LogOutput(('Starting to read cache status for ' + '{} benchmark runs ...').format(n_benchmarkruns)) + BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run() + return + + # Split benchmarkruns set into segments. Each segment will be handled by + # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4). + n_threads = max(2, min(20, (n_benchmarkruns + 3) / 4)) + self._logger.LogOutput(('Starting {} threads to read cache status for ' + '{} benchmark runs ...').format( + n_threads, n_benchmarkruns)) + benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) / n_threads + benchmarkrun_segments = [] + for i in range(n_threads - 1): + start = i * benchmarkruns_per_thread + end = (i + 1) * benchmarkruns_per_thread + benchmarkrun_segments.append( + self._experiment.benchmark_runs[start : end]) + benchmarkrun_segments.append(self._experiment.benchmark_runs[ + (n_threads - 1) * benchmarkruns_per_thread:]) + + # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs. + assert (sum([len(x) for x in benchmarkrun_segments]) == n_benchmarkruns) + + # Create and start all readers. + cache_readers = [ + BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments] + + for x in cache_readers: + x.start() + + # Wait till all readers finish. + for x in cache_readers: + x.join() + + # Summarize. + self._logger.LogOutput( + 'Total {} cache hit out of {} benchmark_runs.'.format( + len(self._cached_br_list), n_benchmarkruns)) + + def get_cached_benchmark_run(self): + """Get a benchmark_run with 'cache hit'. + + return: + The benchmark that has cache hit, if any. Otherwise none. + """ + + with self._lock_on('_cached_br_list'): + if self._cached_br_list: + return self._cached_br_list.pop() + return None + def get_benchmark_run(self, dut): """Get a benchmark_run (br) object for a certain dut. @@ -239,7 +346,7 @@ class Schedv2(object): return None # If br list for the dut's label is empty (that means all brs for this - # label have been done) , return None. + # label have been done), return None. with self._lock_on(dut.label): brl = self._label_brl_map[dut.label] if not brl: @@ -273,7 +380,7 @@ class Schedv2(object): Arguemnts: dut_worker: the thread that is about to end.""" - self._l.LogOutput("{} finished.".format(dut_worker)) + self._logger.LogOutput("{} finished.".format(dut_worker)) with self._workers_lock: self._active_workers.remove(dut_worker) self._finished_workers.append(dut_worker) |