From e066297f07a8d1e1ad3416b4b034b2943f47c648 Mon Sep 17 00:00:00 2001 From: Han Shen Date: Fri, 18 Sep 2015 16:53:34 -0700 Subject: Fix cache reading problem. The previous implementation images a machine to match the br(benchmarkrun)'s label before ReadCache is performed, that image step is a huge waste. Now re-implemented by starting a few threads at the very beginning to go through all brs, and put cache-hit brs into _cached_br_list. Then we start the DutWorker thread to go through _cached_br_list, which does not require dut to bear any particular image. After this step, we proceed as usual. Unittest cases also added. Tested by manullay launch a crosperf and examining no re-image is done. Change-Id: Ib611937f9fa28ae9bedce8a1a01ed303f14f838a Reviewed-on: https://chrome-internal-review.googlesource.com/231585 Commit-Ready: Han Shen Tested-by: Han Shen Reviewed-by: Han Shen --- crosperf/benchmark_run.py | 10 ++- crosperf/experiment.py | 24 +++---- crosperf/machine_manager.py | 1 + crosperf/results_cache.py | 30 +++++--- crosperf/schedv2.py | 145 +++++++++++++++++++++++++++++++++----- crosperf/schedv2_unittest.py | 164 +++++++++++++++++++++++++++++++++++++------ 6 files changed, 304 insertions(+), 70 deletions(-) (limited to 'crosperf') diff --git a/crosperf/benchmark_run.py b/crosperf/benchmark_run.py index 3688647e..e72bc142 100644 --- a/crosperf/benchmark_run.py +++ b/crosperf/benchmark_run.py @@ -94,7 +94,6 @@ class BenchmarkRun(threading.Thread): def run(self): try: - self.ReadCache() if self.result: self._logger.LogOutput("%s: Cache hit." % self.name) @@ -285,11 +284,10 @@ class MockBenchmarkRun(BenchmarkRun): self.label) self.timeline.Record(STATUS_RUNNING) [retval, out, err] = self.suite_runner.Run(machine.name, - self.label.chromeos_root, - self.label.board, - self.benchmark.test_name, - self.test_args, - self.profiler_args) + self.label, + self.benchmark, + self.test_args, + self.profiler_args) self.run_completed = True rr = MockResult("logger", self.label, self.log_level) rr.out = out diff --git a/crosperf/experiment.py b/crosperf/experiment.py index 7fa34c3f..e18556a1 100644 --- a/crosperf/experiment.py +++ b/crosperf/experiment.py @@ -15,7 +15,7 @@ from threading import Lock from utils import logger from utils import misc -from benchmark_run import BenchmarkRun +import benchmark_run from machine_manager import MachineManager from machine_manager import MockMachineManager import test_flag @@ -110,17 +110,17 @@ class Experiment(object): logger_to_use = logger.Logger(self.log_dir, "run.%s" % (full_name), True) - benchmark_run = BenchmarkRun(benchmark_run_name, - benchmark, - label, - iteration, - self.cache_conditions, - self.machine_manager, - logger_to_use, - self.log_level, - self.share_cache) - - benchmark_runs.append(benchmark_run) + benchmark_runs.append(benchmark_run.BenchmarkRun( + benchmark_run_name, + benchmark, + label, + iteration, + self.cache_conditions, + self.machine_manager, + logger_to_use, + self.log_level, + self.share_cache)) + return benchmark_runs def Build(self): diff --git a/crosperf/machine_manager.py b/crosperf/machine_manager.py index 40666c4a..d68454d7 100644 --- a/crosperf/machine_manager.py +++ b/crosperf/machine_manager.py @@ -536,6 +536,7 @@ class MockCrosMachine(CrosMachine): #In test, we assume "lumpy1", "lumpy2" are the same machine. self.machine_checksum = self._GetMD5Checksum(self.checksum_string) self.log_level = log_level + self.label = None def IsReachable(self): return True diff --git a/crosperf/results_cache.py b/crosperf/results_cache.py index 8871c10c..ed3b6607 100644 --- a/crosperf/results_cache.py +++ b/crosperf/results_cache.py @@ -132,13 +132,14 @@ class Result(object): if not self._temp_dir: self._temp_dir = tempfile.mkdtemp(dir=results_in_chroot) command = "cp -r {0}/* {1}".format(self.results_dir, self._temp_dir) - self._ce.RunCommand(command) + self._ce.RunCommand(command, print_to_console=False) command = ("python generate_test_report --no-color --csv %s" % (os.path.join("/tmp", os.path.basename(self._temp_dir)))) [_, out, _] = self._ce.ChrootRunCommand(self._chromeos_root, command, - return_output=True) + return_output=True, + print_to_console=False) keyvals_dict = {} tmp_dir_in_chroot = misc.GetInsideChrootPath(self._chromeos_root, self._temp_dir) @@ -173,7 +174,8 @@ class Result(object): command = "find %s %s" % (self.results_dir, find_args) - ret, out, _ = self._ce.RunCommand(command, return_output=True) + ret, out, _ = self._ce.RunCommand(command, return_output=True, + print_to_console=False) if ret: raise Exception("Could not run find command!") return out @@ -301,7 +303,7 @@ class Result(object): command = ("cd %s && tar xf %s" % (self._temp_dir, os.path.join(cache_dir, AUTOTEST_TARBALL))) - ret = self._ce.RunCommand(command) + ret = self._ce.RunCommand(command, print_to_console=False) if ret: raise Exception("Could not untar cached tarball") self.results_dir = self._temp_dir @@ -519,11 +521,14 @@ class ResultsCache(object): def _GetCacheDirForWrite(self, get_keylist=False): cache_path = self._FormCacheDir(self._GetCacheKeyList(False))[0] if get_keylist: - args_str = "%s_%s_%s" % (self.test_args, self.profiler_args, self.run_local) - version, image = results_report.ParseChromeosImage(self.label.chromeos_image) - keylist = [ version, image, self.label.board, - self.machine.name, self.test_name, str(self.iteration), - args_str] + args_str = "%s_%s_%s" % (self.test_args, + self.profiler_args, + self.run_local) + version, image = results_report.ParseChromeosImage( + self.label.chromeos_image) + keylist = [version, image, self.label.board, + self.machine.name, self.test_name, str(self.iteration), + args_str] return cache_path, keylist return cache_path @@ -576,7 +581,9 @@ class ResultsCache(object): machine_id_checksum = machine.machine_id_checksum break - temp_test_args = "%s %s %s" % (self.test_args, self.profiler_args, self.run_local) + temp_test_args = "%s %s %s" % (self.test_args, + self.profiler_args, + self.run_local) test_args_checksum = hashlib.md5( "".join(temp_test_args)).hexdigest() return (image_path_checksum, @@ -601,7 +608,8 @@ class ResultsCache(object): if not os.path.isdir(cache_dir): return None - self._logger.LogOutput("Trying to read from cache dir: %s" % cache_dir) + if self.log_level == 'verbose': + self._logger.LogOutput("Trying to read from cache dir: %s" % cache_dir) result = Result.CreateFromCacheHit(self._logger, self.log_level, self.label, diff --git a/crosperf/schedv2.py b/crosperf/schedv2.py index a8ef4d57..f33b30ad 100644 --- a/crosperf/schedv2.py +++ b/crosperf/schedv2.py @@ -2,6 +2,7 @@ # Copyright 2015 Google Inc. All Rights Reserved. +import math import sys import test_flag import traceback @@ -24,7 +25,7 @@ class DutWorker(Thread): self._stat_num_br_run = 0 self._stat_num_reimage = 0 self._stat_annotation = "" - self._l = logger.GetLogger(self._sched._experiment.log_dir) + self._logger = logger.GetLogger(self._sched._experiment.log_dir) self.daemon = True self._terminated = False self._active_br = None @@ -46,9 +47,20 @@ class DutWorker(Thread): Note - 'br' below means 'benchmark_run'. """ + # Firstly, handle benchmarkruns that have cache hit. + br = self._sched.get_cached_benchmark_run() + while br: + try: + self._stat_annotation = 'finishing cached {}'.format(br) + br.run() + except: + traceback.print_exc(file=sys.stdout) + br = self._sched.get_cached_benchmark_run() + + # Secondly, handle benchmarkruns that needs to be run on dut. self._setup_dut_label() try: - self._l.LogOutput("{} started.".format(self)) + self._logger.LogOutput("{} started.".format(self)) while not self._terminated: br = self._sched.get_benchmark_run(self._dut) if br is None: @@ -56,14 +68,14 @@ class DutWorker(Thread): label = self._sched.allocate_label(self._dut) if label is None: # No br even for other labels. We are done. - self._l.LogOutput("ImageManager found no label " + self._logger.LogOutput("ImageManager found no label " "for dut, stopping working " "thread {}.".format(self)) break if self._reimage(label): # Reimage to run other br fails, dut is doomed, stop # this thread. - self._l.LogWarning("Re-image failed, dut " + self._logger.LogWarning("Re-image failed, dut " "in an unstable state, stopping " "working thread {}.".format(self)) break @@ -89,7 +101,7 @@ class DutWorker(Thread): if self._terminated: return 1 - self._l.LogOutput('Reimaging {} using {}'.format(self, label)) + self._logger.LogOutput('Reimaging {} using {}'.format(self, label)) self._stat_num_reimage += 1 self._stat_annotation = 'reimaging using "{}"'.format(label.name) try: @@ -115,7 +127,7 @@ class DutWorker(Thread): if self._terminated: return - self._l.LogOutput('{} started working on {}'.format(self, br)) + self._logger.LogOutput('{} started working on {}'.format(self, br)) self._stat_num_br_run += 1 self._stat_annotation = 'executing {}'.format(br) # benchmark_run.run does not throws, but just play it safe here. @@ -144,12 +156,13 @@ class DutWorker(Thread): "cat " + checksum_file, return_output=True, chromeos_root=self._sched._labels[0].chromeos_root, - machine=self._dut.name) + machine=self._dut.name, + print_to_console=False) if rv == 0: checksum = checksum.strip() for l in self._sched._labels: if l.checksum == checksum: - self._l.LogOutput( + self._logger.LogOutput( "Dut '{}' is pre-installed with '{}'".format( self._dut.name, l)) self._dut.label = l @@ -176,24 +189,62 @@ class DutWorker(Thread): self._stat_num_reimage, self._stat_annotation)) +class BenchmarkRunCacheReader(Thread): + """The thread to read cache for a list of benchmark_runs. + + On creation, each instance of this class is given a br_list, which is a + subset of experiment._benchmark_runs. + """ + + def __init__(self, schedv2, br_list): + super(BenchmarkRunCacheReader, self).__init__() + self._schedv2 = schedv2 + self._br_list = br_list + self._logger = self._schedv2._logger + + def run(self): + for br in self._br_list: + try: + br.ReadCache() + if br.cache_hit: + self._logger.LogOutput('Cache hit - {}'.format(br)) + with self._schedv2._lock_on('_cached_br_list'): + self._schedv2._cached_br_list.append(br) + else: + self._logger.LogOutput('Cache not hit - {}'.format(br)) + except: + traceback.print_exc(file=sys.stderr) + class Schedv2(object): """New scheduler for crosperf.""" def __init__(self, experiment): self._experiment = experiment - self._l = logger.GetLogger(experiment.log_dir) + self._logger = logger.GetLogger(experiment.log_dir) # Create shortcuts to nested data structure. "_duts" points to a list of # locked machines. _labels points to a list of all labels. self._duts = self._experiment.machine_manager._all_machines self._labels = self._experiment.labels + # Bookkeeping for synchronization. + self._workers_lock = Lock() + self._lock_map = defaultdict(lambda: Lock()) + + # Test mode flag + self._in_test_mode = test_flag.GetTestMode() + + # Read benchmarkrun cache. + self._read_br_cache() + # Mapping from label to a list of benchmark_runs. self._label_brl_map = dict([(l, []) for l in self._labels]) for br in self._experiment.benchmark_runs: assert br.label in self._label_brl_map - self._label_brl_map[br.label].append(br) + # Only put no-cache-hit br into the map. + if br not in self._cached_br_list: + self._label_brl_map[br.label].append(br) # Use machine image manager to calculate initial label allocation. self._mim = MachineImageManager(self._labels, self._duts) @@ -203,21 +254,77 @@ class Schedv2(object): self._active_workers = [DutWorker(dut, self) for dut in self._duts] self._finished_workers = [] - # Bookkeeping for synchronization. - self._workers_lock = Lock() - self._lock_map = defaultdict(lambda: Lock()) - # Termination flag. self._terminated = False - # Test mode flag - self._in_test_mode = test_flag.GetTestMode() - def run_sched(self): """Start all dut worker threads and return immediately.""" [w.start() for w in self._active_workers] + def _read_br_cache(self): + """Use multi-threading to read cache for all benchmarkruns. + + We do this by firstly creating a few threads, and then assign each + thread a segment of all brs. Each thread will check cache status for + each br and put those with cache into '_cached_br_list'.""" + + self._cached_br_list = [] + n_benchmarkruns = len(self._experiment.benchmark_runs) + if n_benchmarkruns <= 4: + # Use single thread to read cache. + self._logger.LogOutput(('Starting to read cache status for ' + '{} benchmark runs ...').format(n_benchmarkruns)) + BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run() + return + + # Split benchmarkruns set into segments. Each segment will be handled by + # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4). + n_threads = max(2, min(20, (n_benchmarkruns + 3) / 4)) + self._logger.LogOutput(('Starting {} threads to read cache status for ' + '{} benchmark runs ...').format( + n_threads, n_benchmarkruns)) + benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) / n_threads + benchmarkrun_segments = [] + for i in range(n_threads - 1): + start = i * benchmarkruns_per_thread + end = (i + 1) * benchmarkruns_per_thread + benchmarkrun_segments.append( + self._experiment.benchmark_runs[start : end]) + benchmarkrun_segments.append(self._experiment.benchmark_runs[ + (n_threads - 1) * benchmarkruns_per_thread:]) + + # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs. + assert (sum([len(x) for x in benchmarkrun_segments]) == n_benchmarkruns) + + # Create and start all readers. + cache_readers = [ + BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments] + + for x in cache_readers: + x.start() + + # Wait till all readers finish. + for x in cache_readers: + x.join() + + # Summarize. + self._logger.LogOutput( + 'Total {} cache hit out of {} benchmark_runs.'.format( + len(self._cached_br_list), n_benchmarkruns)) + + def get_cached_benchmark_run(self): + """Get a benchmark_run with 'cache hit'. + + return: + The benchmark that has cache hit, if any. Otherwise none. + """ + + with self._lock_on('_cached_br_list'): + if self._cached_br_list: + return self._cached_br_list.pop() + return None + def get_benchmark_run(self, dut): """Get a benchmark_run (br) object for a certain dut. @@ -239,7 +346,7 @@ class Schedv2(object): return None # If br list for the dut's label is empty (that means all brs for this - # label have been done) , return None. + # label have been done), return None. with self._lock_on(dut.label): brl = self._label_brl_map[dut.label] if not brl: @@ -273,7 +380,7 @@ class Schedv2(object): Arguemnts: dut_worker: the thread that is about to end.""" - self._l.LogOutput("{} finished.".format(dut_worker)) + self._logger.LogOutput("{} finished.".format(dut_worker)) with self._workers_lock: self._active_workers.remove(dut_worker) self._finished_workers.append(dut_worker) diff --git a/crosperf/schedv2_unittest.py b/crosperf/schedv2_unittest.py index 9025fb96..3ce4e15e 100644 --- a/crosperf/schedv2_unittest.py +++ b/crosperf/schedv2_unittest.py @@ -6,8 +6,11 @@ import mock import unittest import StringIO +import benchmark_run import machine_manager +import schedv2 import test_flag +from benchmark_run import MockBenchmarkRun from experiment_factory import ExperimentFactory from experiment_file import ExperimentFile from experiment_runner import ExperimentRunner @@ -18,25 +21,46 @@ from experiment_runner_unittest import FakeLogger from schedv2 import Schedv2 -EXPERIMENT_FILE_1 = """ - board: daisy - remote: chromeos-daisy1.cros chromeos-daisy2.cros +EXPERIMENT_FILE_1 = """\ +board: daisy +remote: chromeos-daisy1.cros chromeos-daisy2.cros - benchmark: kraken { - suite: telemetry_Crosperf - iterations: 3 - } +benchmark: kraken { + suite: telemetry_Crosperf + iterations: 3 +} - image1 { - chromeos_image: /chromeos/src/build/images/daisy/latest/cros_image1.bin - remote: chromeos-daisy3.cros - } +image1 { + chromeos_image: /chromeos/src/build/images/daisy/latest/cros_image1.bin + remote: chromeos-daisy3.cros +} - image2 { - chromeos_image: /chromeos/src/build/imaages/daisy/latest/cros_image2.bin - remote: chromeos-daisy4.cros chromeos-daisy5.cros - } - """ +image2 { + chromeos_image: /chromeos/src/build/imaages/daisy/latest/cros_image2.bin + remote: chromeos-daisy4.cros chromeos-daisy5.cros +} +""" + + +EXPERIMENT_FILE_WITH_FORMAT = """\ +board: daisy +remote: chromeos-daisy1.cros chromeos-daisy2.cros + +benchmark: kraken {{ + suite: telemetry_Crosperf + iterations: {kraken_iterations} +}} + +image1 {{ + chromeos_image: /chromeos/src/build/images/daisy/latest/cros_image1.bin + remote: chromeos-daisy3.cros +}} + +image2 {{ + chromeos_image: /chromeos/src/build/imaages/daisy/latest/cros_image2.bin + remote: chromeos-daisy4.cros chromeos-daisy5.cros +}} +""" class Schedv2Test(unittest.TestCase): @@ -44,7 +68,13 @@ class Schedv2Test(unittest.TestCase): mock_logger = FakeLogger() mock_cmd_exec = mock.Mock(spec=CommandExecuter) + @mock.patch('benchmark_run.BenchmarkRun', + new=benchmark_run.MockBenchmarkRun) def _make_fake_experiment(self, expstr): + """Create fake experiment from string. + + Note - we mock out BenchmarkRun in this step. + """ experiment_file = ExperimentFile(StringIO.StringIO(expstr)) experiment = ExperimentFactory().GetExperiment( experiment_file, working_directory="", log_dir="") @@ -70,10 +100,8 @@ class Schedv2Test(unittest.TestCase): return (cm.name != 'chromeos-daisy3.cros' and cm.name != 'chromeos-daisy5.cros') - originalIsReachable = MockCrosMachine.IsReachable - - try: - MockCrosMachine.IsReachable = MockIsReachable + with mock.patch('machine_manager.MockCrosMachine.IsReachable', + new=MockIsReachable) as f: self.exp = self._make_fake_experiment(EXPERIMENT_FILE_1) self.assertIn('chromeos-daisy1.cros', self.exp.remote) self.assertIn('chromeos-daisy2.cros', self.exp.remote) @@ -87,8 +115,100 @@ class Schedv2Test(unittest.TestCase): self.assertIn('chromeos-daisy4.cros', l.remote) elif l.name == 'image1': self.assertNotIn('chromeos-daisy3.cros', l.remote) - finally: - MockCrosMachine.IsReachable = originalIsReachable + + @mock.patch('schedv2.BenchmarkRunCacheReader') + def test_BenchmarkRunCacheReader_1(self, reader): + """Test benchmarkrun set is split into 5 segments.""" + + self.exp = self._make_fake_experiment( + EXPERIMENT_FILE_WITH_FORMAT.format(kraken_iterations=9)) + schedv2 = Schedv2(self.exp) + # We have 9 * 2 == 18 brs, we use 5 threads, each reading 4, 4, 4, + # 4, 2 brs respectively. + # Assert that BenchmarkRunCacheReader() is called 5 times. + self.assertEquals(reader.call_count, 5) + # reader.call_args_list[n] - nth call. + # reader.call_args_list[n][0] - positioned args in nth call. + # reader.call_args_list[n][0][1] - the 2nd arg in nth call, + # that is 'br_list' in 'schedv2.BenchmarkRunCacheReader'. + self.assertEquals(len(reader.call_args_list[0][0][1]), 4) + self.assertEquals(len(reader.call_args_list[1][0][1]), 4) + self.assertEquals(len(reader.call_args_list[2][0][1]), 4) + self.assertEquals(len(reader.call_args_list[3][0][1]), 4) + self.assertEquals(len(reader.call_args_list[4][0][1]), 2) + + @mock.patch('schedv2.BenchmarkRunCacheReader') + def test_BenchmarkRunCacheReader_2(self, reader): + """Test benchmarkrun set is split into 4 segments.""" + + self.exp = self._make_fake_experiment( + EXPERIMENT_FILE_WITH_FORMAT.format(kraken_iterations=8)) + schedv2 = Schedv2(self.exp) + # We have 8 * 2 == 16 brs, we use 4 threads, each reading 4 brs. + self.assertEquals(reader.call_count, 4) + self.assertEquals(len(reader.call_args_list[0][0][1]), 4) + self.assertEquals(len(reader.call_args_list[1][0][1]), 4) + self.assertEquals(len(reader.call_args_list[2][0][1]), 4) + self.assertEquals(len(reader.call_args_list[3][0][1]), 4) + + @mock.patch('schedv2.BenchmarkRunCacheReader') + def test_BenchmarkRunCacheReader_3(self, reader): + """Test benchmarkrun set is split into 2 segments.""" + + self.exp = self._make_fake_experiment( + EXPERIMENT_FILE_WITH_FORMAT.format(kraken_iterations=3)) + schedv2 = Schedv2(self.exp) + # We have 3 * 2 == 6 brs, we use 2 threads. + self.assertEquals(reader.call_count, 2) + self.assertEquals(len(reader.call_args_list[0][0][1]), 3) + self.assertEquals(len(reader.call_args_list[1][0][1]), 3) + + @mock.patch('schedv2.BenchmarkRunCacheReader') + def test_BenchmarkRunCacheReader_4(self, reader): + """Test benchmarkrun set is not splitted.""" + + self.exp = self._make_fake_experiment( + EXPERIMENT_FILE_WITH_FORMAT.format(kraken_iterations=1)) + schedv2 = Schedv2(self.exp) + # We have 1 * 2 == 2 br, so only 1 instance. + self.assertEquals(reader.call_count, 1) + self.assertEquals(len(reader.call_args_list[0][0][1]), 2) + + def test_cachehit(self): + """Test cache-hit and none-cache-hit brs are properly organized.""" + + def MockReadCache(br): + br.cache_hit = (br.label.name == 'image2') + + with mock.patch('benchmark_run.MockBenchmarkRun.ReadCache', + new=MockReadCache) as f: + # We have 2 * 30 brs, half of which are put into _cached_br_list. + self.exp = self._make_fake_experiment( + EXPERIMENT_FILE_WITH_FORMAT.format(kraken_iterations=30)) + schedv2 = Schedv2(self.exp) + self.assertEquals(len(schedv2._cached_br_list), 30) + # The non-cache-hit brs are put into Schedv2._label_brl_map. + self.assertEquals(reduce(lambda a, x: a + len(x[1]), + schedv2._label_brl_map.iteritems(), 0), + 30) + + def test_nocachehit(self): + """Test no cache-hit.""" + + def MockReadCache(br): + br.cache_hit = False + + with mock.patch('benchmark_run.MockBenchmarkRun.ReadCache', + new=MockReadCache) as f: + # We have 2 * 30 brs, none of which are put into _cached_br_list. + self.exp = self._make_fake_experiment( + EXPERIMENT_FILE_WITH_FORMAT.format(kraken_iterations=30)) + schedv2 = Schedv2(self.exp) + self.assertEquals(len(schedv2._cached_br_list), 0) + # The non-cache-hit brs are put into Schedv2._label_brl_map. + self.assertEquals(reduce(lambda a, x: a + len(x[1]), + schedv2._label_brl_map.iteritems(), 0), + 60) if __name__ == '__main__': -- cgit v1.2.3