aboutsummaryrefslogtreecommitdiff
path: root/crosperf/experiment_runner.py
diff options
context:
space:
mode:
authorHan Shen <shenhan@google.com>2015-09-16 11:08:09 -0700
committerchrome-bot <chrome-bot@chromium.org>2015-09-16 17:52:37 -0700
commit7a939a38850f3afff46a30b53e731ccafa961f54 (patch)
treef2ea69af91b0bc7f15dcfc8cb9962886c2f8c87c /crosperf/experiment_runner.py
parente205d9066d4391035bd3d7f9652f45647a721486 (diff)
downloadtoolchain-utils-7a939a38850f3afff46a30b53e731ccafa961f54.tar.gz
Refactor 'schedv2' into a seperate file.
This CL also fixed a few minor errors in running unittest. This is to ease future development on schedv2. TEST=manually run crosperf with a test file. Change-Id: Iea9221929506d68b794b40f1a2319ceb5f3c8f5d Reviewed-on: https://chrome-internal-review.googlesource.com/230829 Commit-Ready: Han Shen <shenhan@google.com> Tested-by: Han Shen <shenhan@google.com> Reviewed-by: Caroline Tice <cmtice@google.com>
Diffstat (limited to 'crosperf/experiment_runner.py')
-rw-r--r--crosperf/experiment_runner.py297
1 files changed, 2 insertions, 295 deletions
diff --git a/crosperf/experiment_runner.py b/crosperf/experiment_runner.py
index ec74fdb1..479650cc 100644
--- a/crosperf/experiment_runner.py
+++ b/crosperf/experiment_runner.py
@@ -1,6 +1,6 @@
#!/usr/bin/python
-# Copyright 2011 Google Inc. All Rights Reserved.
+# Copyright 2011-2015 Google Inc. All Rights Reserved.
"""The experiment runner module."""
import getpass
@@ -9,18 +9,13 @@ import random
import shutil
import sys
import time
-import traceback
import afe_lock_machine
-from machine_image_manager import MachineImageManager
-from collections import defaultdict
from utils import command_executer
from utils import logger
from utils.email_sender import EmailSender
from utils.file_utils import FileUtils
-from threading import Lock
-from threading import Thread
import config
from experiment_status import ExperimentStatus
@@ -29,6 +24,7 @@ from results_cache import ResultsCache
from results_report import HTMLResultsReport
from results_report import TextResultsReport
from results_report import JSONResultsReport
+from schedv2 import Schedv2
class ExperimentRunner(object):
@@ -261,295 +257,6 @@ class ExperimentRunner(object):
self._StoreResults(self._experiment)
self._Email(self._experiment)
-class DutWorker(Thread):
-
- def __init__(self, dut, sched):
- super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name))
- self._dut = dut
- self._sched = sched
- self._stat_num_br_run = 0
- self._stat_num_reimage = 0
- self._stat_annotation = ""
- self._l = logger.GetLogger(self._sched._experiment.log_dir)
- self.daemon = True
- self._terminated = False
- self._active_br = None
- # Race condition accessing _active_br between _execute_benchmark_run and
- # _terminate, so lock it up.
- self._active_br_lock = Lock()
-
- def terminate(self):
- self._terminated = True
- with self._active_br_lock:
- if self._active_br is not None:
- # BenchmarkRun.Terminate() terminates any running testcase via
- # suite_runner.Terminate and updates timeline.
- self._active_br.Terminate()
-
- def run(self):
- """Do the "run-test->(optionally reimage)->run-test" chore.
-
- Note - 'br' below means 'benchmark_run'.
- """
-
- self._setup_dut_label()
- try:
- self._l.LogOutput("{} started.".format(self))
- while not self._terminated:
- br = self._sched.get_benchmark_run(self._dut)
- if br is None:
- # No br left for this label. Considering reimaging.
- label = self._sched.allocate_label(self._dut)
- if label is None:
- # No br even for other labels. We are done.
- self._l.LogOutput("ImageManager found no label "
- "for dut, stopping working "
- "thread {}.".format(self))
- break
- if self._reimage(label):
- # Reimage to run other br fails, dut is doomed, stop
- # this thread.
- self._l.LogWarning("Re-image failed, dut "
- "in an unstable state, stopping "
- "working thread {}.".format(self))
- break
- else:
- # Execute the br.
- self._execute_benchmark_run(br)
- finally:
- self._stat_annotation = "finished"
- # Thread finishes. Notify scheduler that I'm done.
- self._sched.dut_worker_finished(self)
-
- def _reimage(self, label):
- """Reimage image to label.
-
- Args:
- label: the label to remimage onto dut.
-
- Returns:
- 0 if successful, otherwise 1.
- """
-
- # Termination could happen anywhere, check it.
- if self._terminated:
- return 1
-
- self._l.LogOutput('Reimaging {} using {}'.format(self, label))
- self._stat_num_reimage += 1
- self._stat_annotation = 'reimaging using "{}"'.format(label.name)
- try:
- # Note, only 1 reimage at any given time, this is guaranteed in
- # ImageMachine, so no sync needed below.
- retval = self._sched._experiment.machine_manager.ImageMachine(
- self._dut, label)
- if retval:
- return 1
- except:
- return 1
-
- self._dut.label = label
- return 0
-
- def _execute_benchmark_run(self, br):
- """Execute a single benchmark_run.
-
- Note - this function never throws exceptions.
- """
-
- # Termination could happen anywhere, check it.
- if self._terminated:
- return
-
- self._l.LogOutput('{} started working on {}'.format(self, br))
- self._stat_num_br_run += 1
- self._stat_annotation = 'executing {}'.format(br)
- # benchmark_run.run does not throws, but just play it safe here.
- try:
- assert br.owner_thread is None
- br.owner_thread = self
- with self._active_br_lock:
- self._active_br = br
- br.run()
- finally:
- self._sched._experiment.BenchmarkRunFinished(br)
- with self._active_br_lock:
- self._active_br = None
-
- def _setup_dut_label(self):
- """Try to match dut image with a certain experiment label.
-
- If such match is found, we just skip doing reimage and jump to execute
- some benchmark_runs.
- """
-
- checksum_file = "/usr/local/osimage_checksum_file"
- try:
- rv, checksum, _ = command_executer.GetCommandExecuter().\
- CrosRunCommand(
- "cat " + checksum_file,
- return_output=True,
- chromeos_root=self._sched._labels[0].chromeos_root,
- machine=self._dut.name)
- if rv == 0:
- checksum = checksum.strip()
- for l in self._sched._labels:
- if l.checksum == checksum:
- self._l.LogOutput(
- "Dut '{}' is pre-installed with '{}'".format(
- self._dut.name, l))
- self._dut.label = l
- return
- except:
- traceback.print_exc(file=sys.stdout)
- self._dut.label = None
-
- def __str__(self):
- return 'DutWorker[dut="{}", label="{}"]'.format(
- self._dut.name, self._dut.label.name if self._dut.label else "None")
-
- def dut(self):
- return self._dut
-
- def status_str(self):
- """Report thread status."""
-
- return ('Worker thread "{}", label="{}", benchmark_run={}, '
- 'reimage={}, now {}'.format(
- self._dut.name,
- 'None' if self._dut.label is None else self._dut.label.name,
- self._stat_num_br_run,
- self._stat_num_reimage,
- self._stat_annotation))
-
-
-class Schedv2(object):
- """New scheduler for crosperf."""
-
- def __init__(self, experiment):
- self._experiment = experiment
- self._l = logger.GetLogger(experiment.log_dir)
-
- # Create shortcuts to nested data structure. "_duts" points to a list of
- # locked machines. _labels points to a list of all labels.
- self._duts = self._experiment.machine_manager._all_machines
- self._labels = self._experiment.labels
-
- # Mapping from label to a list of benchmark_runs.
- self._label_brl_map = dict([(l, []) for l in self._labels])
- for br in self._experiment.benchmark_runs:
- assert br.label in self._label_brl_map
- self._label_brl_map[br.label].append(br)
-
- # Use machine image manager to calculate initial label allocation.
- self._mim = MachineImageManager(self._labels, self._duts)
- self._mim.compute_initial_allocation()
-
- # Create worker thread, 1 per dut.
- self._active_workers = [DutWorker(dut, self) for dut in self._duts]
- self._finished_workers = []
-
- # Bookkeeping for synchronization.
- self._workers_lock = Lock()
- self._lock_map = defaultdict(lambda: Lock())
-
- # Termination flag.
- self._terminated = False
-
- def run_sched(self):
- """Start all dut worker threads and return immediately."""
- [w.start() for w in self._active_workers]
-
- def get_benchmark_run(self, dut):
- """Get a benchmark_run (br) object for a certain dut.
-
- Arguments:
- dut: the dut for which a br is returned.
-
- Returns:
- A br with its label matching that of the dut. If no such br could be
- found, return None (this usually means a reimage is required for the
- dut).
- """
-
- # If terminated, stop providing any br.
- if self._terminated:
- return None
-
- # If dut bears an unrecognized label, return None.
- if dut.label is None:
- return None
-
- # If br list for the dut's label is empty (that means all brs for this
- # label have been done) , return None.
- with self._lock_on(dut.label):
- brl = self._label_brl_map[dut.label]
- if not brl:
- return None
- # Return the first br.
- return brl.pop(0)
-
- def allocate_label(self, dut):
- """Allocate a label to a dut.
-
- The work is delegated to MachineImageManager.
-
- The dut_worker calling this method is responsible for reimage the dut to
- this label.
-
- Arguments:
- dut: the new label that is to be reimaged onto the dut.
-
- Returns:
- The label or None.
- """
-
- if self._terminated:
- return None
-
- return self._mim.allocate(dut, self)
-
- def dut_worker_finished(self, dut_worker):
- """Notify schedv2 that the dut_worker thread finished.
-
- Arguemnts:
- dut_worker: the thread that is about to end."""
-
- self._l.LogOutput("{} finished.".format(dut_worker))
- with self._workers_lock:
- self._active_workers.remove(dut_worker)
- self._finished_workers.append(dut_worker)
-
- def is_complete(self):
- return len(self._active_workers) == 0
-
- def _lock_on(self, object):
- return self._lock_map[object]
-
- def terminate(self):
- """Mark flag so we stop providing br/reimages.
-
- Also terminate each DutWorker, so they refuse to execute br or reimage.
- """
-
- self._terminated = True
- for dut_worker in self._active_workers:
- dut_worker.terminate()
-
- def threads_status_as_string(self):
- """Report the dut worker threads status."""
-
- status = "{} active threads, {} finished threads.\n".format(
- len(self._active_workers), len(self._finished_workers))
- status += " Active threads:"
- for dw in self._active_workers:
- status += '\n ' + dw.status_str()
- if self._finished_workers:
- status += "\n Finished threads:"
- for dw in self._finished_workers:
- status += '\n ' + dw.status_str()
- return status
-
class MockExperimentRunner(ExperimentRunner):
"""Mocked ExperimentRunner for testing."""