aboutsummaryrefslogtreecommitdiff
path: root/crosperf/schedv2.py
diff options
context:
space:
mode:
Diffstat (limited to 'crosperf/schedv2.py')
-rw-r--r--crosperf/schedv2.py145
1 files changed, 126 insertions, 19 deletions
diff --git a/crosperf/schedv2.py b/crosperf/schedv2.py
index a8ef4d57..f33b30ad 100644
--- a/crosperf/schedv2.py
+++ b/crosperf/schedv2.py
@@ -2,6 +2,7 @@
# Copyright 2015 Google Inc. All Rights Reserved.
+import math
import sys
import test_flag
import traceback
@@ -24,7 +25,7 @@ class DutWorker(Thread):
self._stat_num_br_run = 0
self._stat_num_reimage = 0
self._stat_annotation = ""
- self._l = logger.GetLogger(self._sched._experiment.log_dir)
+ self._logger = logger.GetLogger(self._sched._experiment.log_dir)
self.daemon = True
self._terminated = False
self._active_br = None
@@ -46,9 +47,20 @@ class DutWorker(Thread):
Note - 'br' below means 'benchmark_run'.
"""
+ # Firstly, handle benchmarkruns that have cache hit.
+ br = self._sched.get_cached_benchmark_run()
+ while br:
+ try:
+ self._stat_annotation = 'finishing cached {}'.format(br)
+ br.run()
+ except:
+ traceback.print_exc(file=sys.stdout)
+ br = self._sched.get_cached_benchmark_run()
+
+ # Secondly, handle benchmarkruns that needs to be run on dut.
self._setup_dut_label()
try:
- self._l.LogOutput("{} started.".format(self))
+ self._logger.LogOutput("{} started.".format(self))
while not self._terminated:
br = self._sched.get_benchmark_run(self._dut)
if br is None:
@@ -56,14 +68,14 @@ class DutWorker(Thread):
label = self._sched.allocate_label(self._dut)
if label is None:
# No br even for other labels. We are done.
- self._l.LogOutput("ImageManager found no label "
+ self._logger.LogOutput("ImageManager found no label "
"for dut, stopping working "
"thread {}.".format(self))
break
if self._reimage(label):
# Reimage to run other br fails, dut is doomed, stop
# this thread.
- self._l.LogWarning("Re-image failed, dut "
+ self._logger.LogWarning("Re-image failed, dut "
"in an unstable state, stopping "
"working thread {}.".format(self))
break
@@ -89,7 +101,7 @@ class DutWorker(Thread):
if self._terminated:
return 1
- self._l.LogOutput('Reimaging {} using {}'.format(self, label))
+ self._logger.LogOutput('Reimaging {} using {}'.format(self, label))
self._stat_num_reimage += 1
self._stat_annotation = 'reimaging using "{}"'.format(label.name)
try:
@@ -115,7 +127,7 @@ class DutWorker(Thread):
if self._terminated:
return
- self._l.LogOutput('{} started working on {}'.format(self, br))
+ self._logger.LogOutput('{} started working on {}'.format(self, br))
self._stat_num_br_run += 1
self._stat_annotation = 'executing {}'.format(br)
# benchmark_run.run does not throws, but just play it safe here.
@@ -144,12 +156,13 @@ class DutWorker(Thread):
"cat " + checksum_file,
return_output=True,
chromeos_root=self._sched._labels[0].chromeos_root,
- machine=self._dut.name)
+ machine=self._dut.name,
+ print_to_console=False)
if rv == 0:
checksum = checksum.strip()
for l in self._sched._labels:
if l.checksum == checksum:
- self._l.LogOutput(
+ self._logger.LogOutput(
"Dut '{}' is pre-installed with '{}'".format(
self._dut.name, l))
self._dut.label = l
@@ -176,24 +189,62 @@ class DutWorker(Thread):
self._stat_num_reimage,
self._stat_annotation))
+class BenchmarkRunCacheReader(Thread):
+ """The thread to read cache for a list of benchmark_runs.
+
+ On creation, each instance of this class is given a br_list, which is a
+ subset of experiment._benchmark_runs.
+ """
+
+ def __init__(self, schedv2, br_list):
+ super(BenchmarkRunCacheReader, self).__init__()
+ self._schedv2 = schedv2
+ self._br_list = br_list
+ self._logger = self._schedv2._logger
+
+ def run(self):
+ for br in self._br_list:
+ try:
+ br.ReadCache()
+ if br.cache_hit:
+ self._logger.LogOutput('Cache hit - {}'.format(br))
+ with self._schedv2._lock_on('_cached_br_list'):
+ self._schedv2._cached_br_list.append(br)
+ else:
+ self._logger.LogOutput('Cache not hit - {}'.format(br))
+ except:
+ traceback.print_exc(file=sys.stderr)
+
class Schedv2(object):
"""New scheduler for crosperf."""
def __init__(self, experiment):
self._experiment = experiment
- self._l = logger.GetLogger(experiment.log_dir)
+ self._logger = logger.GetLogger(experiment.log_dir)
# Create shortcuts to nested data structure. "_duts" points to a list of
# locked machines. _labels points to a list of all labels.
self._duts = self._experiment.machine_manager._all_machines
self._labels = self._experiment.labels
+ # Bookkeeping for synchronization.
+ self._workers_lock = Lock()
+ self._lock_map = defaultdict(lambda: Lock())
+
+ # Test mode flag
+ self._in_test_mode = test_flag.GetTestMode()
+
+ # Read benchmarkrun cache.
+ self._read_br_cache()
+
# Mapping from label to a list of benchmark_runs.
self._label_brl_map = dict([(l, []) for l in self._labels])
for br in self._experiment.benchmark_runs:
assert br.label in self._label_brl_map
- self._label_brl_map[br.label].append(br)
+ # Only put no-cache-hit br into the map.
+ if br not in self._cached_br_list:
+ self._label_brl_map[br.label].append(br)
# Use machine image manager to calculate initial label allocation.
self._mim = MachineImageManager(self._labels, self._duts)
@@ -203,21 +254,77 @@ class Schedv2(object):
self._active_workers = [DutWorker(dut, self) for dut in self._duts]
self._finished_workers = []
- # Bookkeeping for synchronization.
- self._workers_lock = Lock()
- self._lock_map = defaultdict(lambda: Lock())
-
# Termination flag.
self._terminated = False
- # Test mode flag
- self._in_test_mode = test_flag.GetTestMode()
-
def run_sched(self):
"""Start all dut worker threads and return immediately."""
[w.start() for w in self._active_workers]
+ def _read_br_cache(self):
+ """Use multi-threading to read cache for all benchmarkruns.
+
+ We do this by firstly creating a few threads, and then assign each
+ thread a segment of all brs. Each thread will check cache status for
+ each br and put those with cache into '_cached_br_list'."""
+
+ self._cached_br_list = []
+ n_benchmarkruns = len(self._experiment.benchmark_runs)
+ if n_benchmarkruns <= 4:
+ # Use single thread to read cache.
+ self._logger.LogOutput(('Starting to read cache status for '
+ '{} benchmark runs ...').format(n_benchmarkruns))
+ BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run()
+ return
+
+ # Split benchmarkruns set into segments. Each segment will be handled by
+ # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4).
+ n_threads = max(2, min(20, (n_benchmarkruns + 3) / 4))
+ self._logger.LogOutput(('Starting {} threads to read cache status for '
+ '{} benchmark runs ...').format(
+ n_threads, n_benchmarkruns))
+ benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) / n_threads
+ benchmarkrun_segments = []
+ for i in range(n_threads - 1):
+ start = i * benchmarkruns_per_thread
+ end = (i + 1) * benchmarkruns_per_thread
+ benchmarkrun_segments.append(
+ self._experiment.benchmark_runs[start : end])
+ benchmarkrun_segments.append(self._experiment.benchmark_runs[
+ (n_threads - 1) * benchmarkruns_per_thread:])
+
+ # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs.
+ assert (sum([len(x) for x in benchmarkrun_segments]) == n_benchmarkruns)
+
+ # Create and start all readers.
+ cache_readers = [
+ BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments]
+
+ for x in cache_readers:
+ x.start()
+
+ # Wait till all readers finish.
+ for x in cache_readers:
+ x.join()
+
+ # Summarize.
+ self._logger.LogOutput(
+ 'Total {} cache hit out of {} benchmark_runs.'.format(
+ len(self._cached_br_list), n_benchmarkruns))
+
+ def get_cached_benchmark_run(self):
+ """Get a benchmark_run with 'cache hit'.
+
+ return:
+ The benchmark that has cache hit, if any. Otherwise none.
+ """
+
+ with self._lock_on('_cached_br_list'):
+ if self._cached_br_list:
+ return self._cached_br_list.pop()
+ return None
+
def get_benchmark_run(self, dut):
"""Get a benchmark_run (br) object for a certain dut.
@@ -239,7 +346,7 @@ class Schedv2(object):
return None
# If br list for the dut's label is empty (that means all brs for this
- # label have been done) , return None.
+ # label have been done), return None.
with self._lock_on(dut.label):
brl = self._label_brl_map[dut.label]
if not brl:
@@ -273,7 +380,7 @@ class Schedv2(object):
Arguemnts:
dut_worker: the thread that is about to end."""
- self._l.LogOutput("{} finished.".format(dut_worker))
+ self._logger.LogOutput("{} finished.".format(dut_worker))
with self._workers_lock:
self._active_workers.remove(dut_worker)
self._finished_workers.append(dut_worker)