# -*- coding: utf-8 -*- # Copyright 2013 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Module of benchmark runs.""" import datetime import threading import time import traceback from cros_utils import command_executer from cros_utils import timeline from results_cache import MockResult from results_cache import MockResultsCache from results_cache import Result from results_cache import ResultsCache from suite_runner import SuiteRunner STATUS_FAILED = "FAILED" STATUS_SUCCEEDED = "SUCCEEDED" STATUS_IMAGING = "IMAGING" STATUS_RUNNING = "RUNNING" STATUS_WAITING = "WAITING" STATUS_PENDING = "PENDING" class BenchmarkRun(threading.Thread): """The benchmarkrun class.""" def __init__( self, name, benchmark, label, iteration, cache_conditions, machine_manager, logger_to_use, log_level, share_cache, dut_config, ): threading.Thread.__init__(self) self.name = name self._logger = logger_to_use self.log_level = log_level self.benchmark = benchmark self.iteration = iteration self.label = label self.result = None self.terminated = False self.retval = None self.run_completed = False self.machine_manager = machine_manager self.suite_runner = SuiteRunner( dut_config, self._logger, self.log_level ) self.machine = None self.cache_conditions = cache_conditions self.runs_complete = 0 self.cache_hit = False self.failure_reason = "" self.test_args = benchmark.test_args self.cache = None self.profiler_args = self.GetExtraAutotestArgs() self._ce = command_executer.GetCommandExecuter( self._logger, log_level=self.log_level ) self.timeline = timeline.Timeline() self.timeline.Record(STATUS_PENDING) self.share_cache = share_cache self.cache_has_been_read = False # This is used by schedv2. self.owner_thread = None def ReadCache(self): # Just use the first machine for running the cached version, # without locking it. self.cache = ResultsCache() self.cache.Init( self.label.chromeos_image, self.label.chromeos_root, self.benchmark.test_name, self.iteration, self.test_args, self.profiler_args, self.machine_manager, self.machine, self.label.board, self.cache_conditions, self._logger, self.log_level, self.label, self.share_cache, self.benchmark.suite, self.benchmark.show_all_results, self.benchmark.run_local, self.benchmark.cwp_dso, ) self.result = self.cache.ReadResult() self.cache_hit = self.result is not None self.cache_has_been_read = True def run(self): try: if not self.cache_has_been_read: self.ReadCache() if self.result: self._logger.LogOutput("%s: Cache hit." % self.name) self._logger.LogOutput(self.result.out, print_to_console=False) self._logger.LogError(self.result.err, print_to_console=False) elif self.label.cache_only: self._logger.LogOutput("%s: No cache hit." % self.name) output = "%s: No Cache hit." % self.name retval = 1 err = "No cache hit." self.result = Result.CreateFromRun( self._logger, self.log_level, self.label, self.machine, output, err, retval, self.benchmark.test_name, self.benchmark.suite, self.benchmark.cwp_dso, ) else: self._logger.LogOutput("%s: No cache hit." % self.name) self.timeline.Record(STATUS_WAITING) # Try to acquire a machine now. self.machine = self.AcquireMachine() self.cache.machine = self.machine self.result = self.RunTest(self.machine) self.cache.remote = self.machine.name self.label.chrome_version = ( self.machine_manager.GetChromeVersion(self.machine) ) self.cache.StoreResult(self.result) if not self.label.chrome_version: if self.machine: self.label.chrome_version = ( self.machine_manager.GetChromeVersion(self.machine) ) elif self.result.chrome_version: self.label.chrome_version = self.result.chrome_version if self.terminated: return if not self.result.retval: self.timeline.Record(STATUS_SUCCEEDED) else: if self.timeline.GetLastEvent() != STATUS_FAILED: self.failure_reason = ( "Return value of test suite was non-zero." ) self.timeline.Record(STATUS_FAILED) except Exception as e: self._logger.LogError( "Benchmark run: '%s' failed: %s" % (self.name, e) ) traceback.print_exc() if self.timeline.GetLastEvent() != STATUS_FAILED: self.timeline.Record(STATUS_FAILED) self.failure_reason = str(e) finally: if self.owner_thread is not None: # In schedv2 mode, we do not lock machine locally. So noop here. pass elif self.machine: if not self.machine.IsReachable(): self._logger.LogOutput( "Machine %s is not reachable, removing it." % self.machine.name ) self.machine_manager.RemoveMachine(self.machine.name) self._logger.LogOutput( "Releasing machine: %s" % self.machine.name ) self.machine_manager.ReleaseMachine(self.machine) self._logger.LogOutput( "Released machine: %s" % self.machine.name ) def Terminate(self): self.terminated = True self.suite_runner.Terminate() if self.timeline.GetLastEvent() != STATUS_FAILED: self.timeline.Record(STATUS_FAILED) self.failure_reason = "Thread terminated." def AcquireMachine(self): if self.owner_thread is not None: # No need to lock machine locally, DutWorker, which is a thread, is # responsible for running br. return self.owner_thread.dut() while True: machine = None if self.terminated: raise RuntimeError( "Thread terminated while trying to acquire machine." ) machine = self.machine_manager.AcquireMachine(self.label) if machine: self._logger.LogOutput( "%s: Machine %s acquired at %s" % (self.name, machine.name, datetime.datetime.now()) ) break time.sleep(10) return machine def GetExtraAutotestArgs(self): if ( self.benchmark.perf_args and self.benchmark.suite != "telemetry_Crosperf" ): self._logger.LogError( "Non-telemetry benchmark does not support profiler." ) self.benchmark.perf_args = "" if self.benchmark.perf_args: perf_args_list = self.benchmark.perf_args.split(" ") perf_args_list = [perf_args_list[0]] + ["-a"] + perf_args_list[1:] perf_args = " ".join(perf_args_list) if not perf_args_list[0] in ["record", "stat"]: raise SyntaxError( "perf_args must start with either record or stat" ) extra_test_args = [ "--profiler=custom_perf", ("--profiler_args='perf_options=\"%s\"'" % perf_args), ] return " ".join(extra_test_args) else: return "" def RunTest(self, machine): self.timeline.Record(STATUS_IMAGING) if self.owner_thread is not None: # In schedv2 mode, do not even call ImageMachine. Machine image is # guarenteed. pass else: self.machine_manager.ImageMachine(machine, self.label) self.timeline.Record(STATUS_RUNNING) retval, out, err = self.suite_runner.Run( machine, self.label, self.benchmark, self.test_args, self.profiler_args, ) self.run_completed = True return Result.CreateFromRun( self._logger, self.log_level, self.label, self.machine, out, err, retval, self.benchmark.test_name, self.benchmark.suite, self.benchmark.cwp_dso, ) def SetCacheConditions(self, cache_conditions): self.cache_conditions = cache_conditions def logger(self): """Return the logger, only used by unittest. Returns: self._logger """ return self._logger def __str__(self): """For better debugging.""" return 'BenchmarkRun[name="{}"]'.format(self.name) class MockBenchmarkRun(BenchmarkRun): """Inherited from BenchmarkRun.""" def ReadCache(self): # Just use the first machine for running the cached version, # without locking it. self.cache = MockResultsCache() self.cache.Init( self.label.chromeos_image, self.label.chromeos_root, self.benchmark.test_name, self.iteration, self.test_args, self.profiler_args, self.machine_manager, self.machine, self.label.board, self.cache_conditions, self._logger, self.log_level, self.label, self.share_cache, self.benchmark.suite, self.benchmark.show_all_results, self.benchmark.run_local, self.benchmark.cwp_dso, ) self.result = self.cache.ReadResult() self.cache_hit = self.result is not None def RunTest(self, machine): """Remove Result.CreateFromRun for testing.""" self.timeline.Record(STATUS_IMAGING) self.machine_manager.ImageMachine(machine, self.label) self.timeline.Record(STATUS_RUNNING) [retval, out, err] = self.suite_runner.Run( machine, self.label, self.benchmark, self.test_args, self.profiler_args, ) self.run_completed = True rr = MockResult("logger", self.label, self.log_level, machine) rr.out = out rr.err = err rr.retval = retval return rr