aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHan Shen <shenhan@google.com>2015-08-05 17:19:55 -0700
committerChromeOS Commit Bot <chromeos-commit-bot@chromium.org>2015-08-20 20:29:33 +0000
commitba64928c5dcbacbc70b4358881a89ad96227164d (patch)
tree4d2887d79204febd1c5b016810b2a65bb7551c91
parent0aa17b4641bcc205c52e6db0c529dd3126003ec5 (diff)
downloadtoolchain-utils-ba64928c5dcbacbc70b4358881a89ad96227164d.tar.gz
Crosperf schedv2 (1) - new option and integrating new scheduler.
This Cl introduces a new option '--schedv2' which uses the new scheduler to allocate jobs (benchmark_runs) to duts. With this option, schedv2 takes control of storing/allocating jobs and reimage machines using the new algorithm. This CL leaves actual reimaging and running jobs non-op (a random time sleep is used for each such op, which would be replace in later CLs.) You may try this CL like this and see how schedv2 works - crosperf --locks_dir=/usr/local/google/home/shenhan/tmp --use_file_locks=True --logging_level=verbose --schedv2 some.exp Change-Id: If5bb7751b466c39e54c93fe8f0b4e363be6d9165 Reviewed-on: https://chrome-internal-review.googlesource.com/225515 Commit-Queue: Han Shen <shenhan@google.com> Tested-by: Han Shen <shenhan@google.com> Reviewed-by: Han Shen <shenhan@google.com>
-rw-r--r--crosperf/benchmark_run.py26
-rwxr-xr-xcrosperf/crosperf.py7
-rw-r--r--crosperf/experiment.py51
-rw-r--r--crosperf/experiment_factory.py2
-rw-r--r--crosperf/experiment_runner.py308
-rw-r--r--crosperf/experiment_status.py14
-rw-r--r--crosperf/label.py31
-rw-r--r--crosperf/machine_image_manager.py266
-rw-r--r--crosperf/machine_image_manager_unittest.py329
-rw-r--r--crosperf/machine_manager.py28
-rwxr-xr-ximage_chromeos.py2
11 files changed, 1024 insertions, 40 deletions
diff --git a/crosperf/benchmark_run.py b/crosperf/benchmark_run.py
index cee41dac..dabbdeb2 100644
--- a/crosperf/benchmark_run.py
+++ b/crosperf/benchmark_run.py
@@ -64,6 +64,9 @@ class BenchmarkRun(threading.Thread):
self.timeline.Record(STATUS_PENDING)
self.share_cache = share_cache
+ # This is used by schedv2.
+ self.owner_thread = None
+
def ReadCache(self):
# Just use the first machine for running the cached version,
# without locking it.
@@ -137,7 +140,10 @@ class BenchmarkRun(threading.Thread):
self.timeline.Record(STATUS_FAILED)
self.failure_reason = str(e)
finally:
- if self.machine:
+ if self.owner_thread is not None:
+ # In schedv2 mode, we do not lock machine locally. So noop here.
+ pass
+ elif self.machine:
if not self.machine.IsReachable():
self._logger.LogOutput("Machine %s is not reachable, removing it."
% self.machine.name)
@@ -154,6 +160,10 @@ class BenchmarkRun(threading.Thread):
self.failure_reason = "Thread terminated."
def AcquireMachine(self):
+ if self.owner_thread is not None:
+ # No need to lock machine locally, DutWorker, which is a thread, is
+ # responsible for running br.
+ return self.owner_thread.dut()
while True:
machine = None
if self.terminated:
@@ -204,8 +214,13 @@ class BenchmarkRun(threading.Thread):
def RunTest(self, machine):
self.timeline.Record(STATUS_IMAGING)
- self.machine_manager.ImageMachine(machine,
- self.label)
+ if self.owner_thread is not None:
+ # In schedv2 mode, do not even call ImageMachine. Machine image is
+ # guarenteed.
+ pass
+ else:
+ self.machine_manager.ImageMachine(machine,
+ self.label)
self.timeline.Record(STATUS_RUNNING)
[retval, out, err] = self.suite_runner.Run(machine.name,
self.label,
@@ -226,6 +241,11 @@ class BenchmarkRun(threading.Thread):
def SetCacheConditions(self, cache_conditions):
self.cache_conditions = cache_conditions
+ def __str__(self):
+ """For better debugging."""
+
+ return 'BenchmarkRun[name="{}"]'.format(self.name)
+
class MockBenchmarkRun(BenchmarkRun):
"""Inherited from BenchmarkRuna."""
diff --git a/crosperf/crosperf.py b/crosperf/crosperf.py
index 88e510fd..b0f0b0c9 100755
--- a/crosperf/crosperf.py
+++ b/crosperf/crosperf.py
@@ -64,6 +64,10 @@ def Main(argv):
formatter=MyIndentedHelpFormatter(),
version="%prog 3.0")
+ parser.add_option("--schedv2",
+ dest="schedv2",
+ action="store_true",
+ help="Use crosperf scheduler v2 (feature in progress).")
parser.add_option("-l", "--log_dir",
dest="log_dir",
default="",
@@ -101,7 +105,8 @@ def Main(argv):
if options.dry_run:
runner = MockExperimentRunner(experiment)
else:
- runner = ExperimentRunner(experiment)
+ runner = ExperimentRunner(experiment, using_schedv2=options.schedv2)
+
runner.Run()
if __name__ == "__main__":
diff --git a/crosperf/experiment.py b/crosperf/experiment.py
index 9f88ed01..59e932f6 100644
--- a/crosperf/experiment.py
+++ b/crosperf/experiment.py
@@ -10,6 +10,7 @@ import os
import time
import afe_lock_machine
+from threading import Lock
from utils import logger
from utils import misc
@@ -79,6 +80,15 @@ class Experiment(object):
self.start_time = None
self.benchmark_runs = self._GenerateBenchmarkRuns()
+ self._schedv2 = None
+ self._internal_counter_lock = Lock()
+
+ def set_schedv2(self, schedv2):
+ self._schedv2 = schedv2
+
+ def schedv2(self):
+ return self._schedv2
+
def _GenerateBenchmarkRuns(self):
"""Generate benchmark runs from labels and benchmark defintions."""
benchmark_runs = []
@@ -109,12 +119,17 @@ class Experiment(object):
pass
def Terminate(self):
- for t in self.benchmark_runs:
- if t.isAlive():
- self.l.LogError("Terminating run: '%s'." % t.name)
- t.Terminate()
+ if self._schedv2 is not None:
+ self._schedv2.terminate()
+ else:
+ for t in self.benchmark_runs:
+ if t.isAlive():
+ self.l.LogError("Terminating run: '%s'." % t.name)
+ t.Terminate()
def IsComplete(self):
+ if self._schedv2:
+ return self._schedv2.is_complete()
if self.active_threads:
for t in self.active_threads:
if t.isAlive():
@@ -127,14 +142,30 @@ class Experiment(object):
return False
return True
+ def BenchmarkRunFinished(self, br):
+ """Update internal counters after br finishes.
+
+ Note this is only used by schedv2 and is called by multiple threads.
+ Never throw any exception here.
+ """
+
+ assert self._schedv2 is not None
+ with self._internal_counter_lock:
+ self.num_complete += 1
+ if not br.cache_hit:
+ self.num_run_complete += 1
+
def Run(self):
self.start_time = time.time()
- self.active_threads = []
- for benchmark_run in self.benchmark_runs:
- # Set threads to daemon so program exits when ctrl-c is pressed.
- benchmark_run.daemon = True
- benchmark_run.start()
- self.active_threads.append(benchmark_run)
+ if self._schedv2 is not None:
+ self._schedv2.run_sched()
+ else:
+ self.active_threads = []
+ for benchmark_run in self.benchmark_runs:
+ # Set threads to daemon so program exits when ctrl-c is pressed.
+ benchmark_run.daemon = True
+ benchmark_run.start()
+ self.active_threads.append(benchmark_run)
def SetCacheConditions(self, cache_conditions):
for benchmark_run in self.benchmark_runs:
diff --git a/crosperf/experiment_factory.py b/crosperf/experiment_factory.py
index 65d7ce8d..0f90383c 100644
--- a/crosperf/experiment_factory.py
+++ b/crosperf/experiment_factory.py
@@ -248,7 +248,7 @@ class ExperimentFactory(object):
image_args, cache_dir, cache_only, chrome_src)
else:
label = Label(label_name, image, chromeos_root, board, my_remote,
- image_args, cache_dir, cache_only, chrome_src)
+ image_args, cache_dir, cache_only, log_level, chrome_src)
labels.append(label)
email = global_settings.GetField("email")
diff --git a/crosperf/experiment_runner.py b/crosperf/experiment_runner.py
index 541bebe0..90c07e77 100644
--- a/crosperf/experiment_runner.py
+++ b/crosperf/experiment_runner.py
@@ -5,14 +5,21 @@
"""The experiment runner module."""
import getpass
import os
+import random
+import sys
import time
+import traceback
import afe_lock_machine
+from machine_image_manager import MachineImageManager
+from collections import defaultdict
from utils import command_executer
from utils import logger
from utils.email_sender import EmailSender
from utils.file_utils import FileUtils
+from threading import Lock
+from threading import Thread
import config
from experiment_status import ExperimentStatus
@@ -26,7 +33,7 @@ class ExperimentRunner(object):
STATUS_TIME_DELAY = 30
THREAD_MONITOR_DELAY = 2
- def __init__(self, experiment, log=None, cmd_exec=None):
+ def __init__(self, experiment, using_schedv2=False, log=None, cmd_exec=None):
self._experiment = experiment
self.l = log or logger.GetLogger(experiment.log_dir)
self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
@@ -35,6 +42,9 @@ class ExperimentRunner(object):
if experiment.log_level != "verbose":
self.STATUS_TIME_DELAY = 10
+ # Setting this to True will use crosperf sched v2 (feature in progress).
+ self._using_schedv2 = using_schedv2
+
def _GetMachineList(self):
"""Return a list of all requested machines.
@@ -88,7 +98,8 @@ class ExperimentRunner(object):
self.locked_machines = lock_mgr.UpdateMachines(True)
self._experiment.locked_machines = self.locked_machines
self._UpdateMachineList(self.locked_machines)
- self._experiment.machine_manager.RemoveNonLockedMachines(self.locked_machines)
+ self._experiment.machine_manager.RemoveNonLockedMachines(
+ self.locked_machines)
if len(self.locked_machines) == 0:
raise RuntimeError("Unable to lock any machines.")
@@ -116,6 +127,9 @@ class ExperimentRunner(object):
try:
if not experiment.locks_dir:
self._LockAllMachines(experiment)
+ if self._using_schedv2:
+ schedv2 = Schedv2(experiment)
+ experiment.set_schedv2(schedv2)
status = ExperimentStatus(experiment)
experiment.Run()
last_status_time = 0
@@ -223,6 +237,296 @@ class ExperimentRunner(object):
self._StoreResults(self._experiment)
self._Email(self._experiment)
+class DutWorker(Thread):
+
+ def __init__(self, dut, sched):
+ super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name))
+ self._dut = dut
+ self._sched = sched
+ self._stat_num_br_run = 0
+ self._stat_num_reimage = 0
+ self._stat_annotation = ""
+ self._l = logger.GetLogger(self._sched._experiment.log_dir)
+ self.daemon = True
+ self._terminated = False
+ self._active_br = None
+ # Race condition accessing _active_br between _execute_benchmark_run and
+ # _terminate, so lock it up.
+ self._active_br_lock = Lock()
+
+ def terminate(self):
+ self._terminated = True
+ with self._active_br_lock:
+ if self._active_br is not None:
+ # BenchmarkRun.Terminate() terminates any running testcase via
+ # suite_runner.Terminate and updates timeline.
+ self._active_br.Terminate()
+
+ def run(self):
+ """Do the "run-test->(optionally reimage)->run-test" chore.
+
+ Note - 'br' below means 'benchmark_run'.
+ """
+
+ self._setup_dut_label()
+ try:
+ self._l.LogOutput("{} started.".format(self))
+ while not self._terminated:
+ br = self._sched.get_benchmark_run(self._dut)
+ if br is None:
+ # No br left for this label. Considering reimaging.
+ label = self._sched.allocate_label(self._dut)
+ if label is None:
+ # No br even for other labels. We are done.
+ self._l.LogOutput("ImageManager found no label "
+ "for dut, stopping working "
+ "thread {}.".format(self))
+ break
+ if self._reimage(label):
+ # Reimage to run other br fails, dut is doomed, stop
+ # this thread.
+ self._l.LogWarning("Re-image failed, dut "
+ "in an unstable state, stopping "
+ "working thread {}.".format(self))
+ break
+ else:
+ # Execute the br.
+ self._execute_benchmark_run(br)
+ finally:
+ self._stat_annotation = "finished"
+ # Thread finishes. Notify scheduler that I'm done.
+ self._sched.dut_worker_finished(self)
+
+ def _reimage(self, label):
+ """Reimage image to label.
+
+ Args:
+ label: the label to remimage onto dut.
+
+ Returns:
+ 0 if successful, otherwise 1.
+ """
+
+ # Termination could happen anywhere, check it.
+ if self._terminated:
+ return 1
+
+ self._l.LogOutput('Reimaging {} using {}'.format(self, label))
+ self._stat_num_reimage += 1
+ self._stat_annotation = 'reimaging using "{}"'.format(label.name)
+ try:
+ # Note, only 1 reimage at any given time, this is guaranteed in
+ # ImageMachine, so no sync needed below.
+ retval = self._sched._experiment.machine_manager.ImageMachine(
+ self._dut, label)
+ if retval:
+ return 1
+ except:
+ return 1
+
+ self._dut.label = label
+ return 0
+
+ def _execute_benchmark_run(self, br):
+ """Execute a single benchmark_run.
+
+ Note - this function never throws exceptions.
+ """
+
+ # Termination could happen anywhere, check it.
+ if self._terminated:
+ return
+
+ self._l.LogOutput('{} started working on {}'.format(self, br))
+ self._stat_num_br_run += 1
+ self._stat_annotation = 'executing {}'.format(br)
+ # benchmark_run.run does not throws, but just play it safe here.
+ try:
+ assert br.owner_thread is None
+ br.owner_thread = self
+ with self._active_br_lock:
+ self._active_br = br
+ br.run()
+ finally:
+ self._sched._experiment.BenchmarkRunFinished(br)
+ with self._active_br_lock:
+ self._active_br = None
+
+ def _setup_dut_label(self):
+ """Try to match dut image with a certain experiment label.
+
+ If such match is found, we just skip doing reimage and jump to execute
+ some benchmark_runs.
+ """
+
+ checksum_file = "/usr/local/osimage_checksum_file"
+ try:
+ rv, checksum, _ = command_executer.GetCommandExecuter().\
+ CrosRunCommand(
+ "cat " + checksum_file,
+ return_output=True,
+ chromeos_root=self._sched._labels[0].chromeos_root,
+ machine=self._dut.name)
+ if rv == 0:
+ checksum = checksum.strip()
+ for l in self._sched._labels:
+ if l.checksum == checksum:
+ self._l.LogOutput(
+ "Dut '{}' is pre-installed with '{}'".format(
+ self._dut.name, l))
+ self._dut.label = l
+ return
+ except:
+ traceback.print_exc(file=sys.stdout)
+ self._dut.label = None
+
+ def __str__(self):
+ return 'DutWorker[dut="{}", label="{}"]'.format(
+ self._dut.name, self._dut.label.name if self._dut.label else "None")
+
+ def dut(self):
+ return self._dut
+
+ def status_str(self):
+ """Report thread status."""
+
+ return ('Worker thread "{}", label="{}", benchmark_run={}, '
+ 'reimage={}, now {}'.format(
+ self._dut.name,
+ 'None' if self._dut.label is None else self._dut.label.name,
+ self._stat_num_br_run,
+ self._stat_num_reimage,
+ self._stat_annotation))
+
+
+class Schedv2(object):
+ """New scheduler for crosperf."""
+
+ def __init__(self, experiment):
+ self._experiment = experiment
+ self._l = logger.GetLogger(experiment.log_dir)
+
+ # Create shortcuts to nested data structure. "_duts" points to a list of
+ # locked machines. _labels points to a list of all labels.
+ self._duts = self._experiment.machine_manager._all_machines
+ self._labels = self._experiment.labels
+
+ # Mapping from label to a list of benchmark_runs.
+ self._label_brl_map = dict([(l, []) for l in self._labels])
+ for br in self._experiment.benchmark_runs:
+ assert br.label in self._label_brl_map
+ self._label_brl_map[br.label].append(br)
+
+ # Use machine image manager to calculate initial label allocation.
+ self._mim = MachineImageManager(self._labels, self._duts)
+ self._mim.compute_initial_allocation()
+
+ # Create worker thread, 1 per dut.
+ self._active_workers = [DutWorker(dut, self) for dut in self._duts]
+ self._finished_workers = []
+
+ # Bookkeeping for synchronization.
+ self._workers_lock = Lock()
+ self._lock_map = defaultdict(lambda: Lock())
+
+ # Termination flag.
+ self._terminated = False
+
+ def run_sched(self):
+ """Start all dut worker threads and return immediately."""
+
+ [w.start() for w in self._active_workers]
+
+ def get_benchmark_run(self, dut):
+ """Get a benchmark_run (br) object for a certain dut.
+
+ Arguments:
+ dut: the dut for which a br is returned.
+
+ Returns:
+ A br with its label matching that of the dut. If no such br could be
+ found, return None (this usually means a reimage is required for the
+ dut).
+ """
+
+ # If terminated, stop providing any br.
+ if self._terminated:
+ return None
+
+ # If dut bears an unrecognized label, return None.
+ if dut.label is None:
+ return None
+
+ # If br list for the dut's label is empty (that means all brs for this
+ # label have been done) , return None.
+ with self._lock_on(dut.label):
+ brl = self._label_brl_map[dut.label]
+ if not brl:
+ return None
+ # Return the first br.
+ return brl.pop(0)
+
+ def allocate_label(self, dut):
+ """Allocate a label to a dut.
+
+ The work is delegated to MachineImageManager.
+
+ The dut_worker calling this method is responsible for reimage the dut to
+ this label.
+
+ Arguments:
+ dut: the new label that is to be reimaged onto the dut.
+
+ Returns:
+ The label or None.
+ """
+
+ if self._terminated:
+ return None
+
+ return self._mim.allocate(dut)
+
+ def dut_worker_finished(self, dut_worker):
+ """Notify schedv2 that the dut_worker thread finished.
+
+ Arguemnts:
+ dut_worker: the thread that is about to end."""
+
+ self._l.LogOutput("{} finished.".format(dut_worker))
+ with self._workers_lock:
+ self._active_workers.remove(dut_worker)
+ self._finished_workers.append(dut_worker)
+
+ def is_complete(self):
+ return len(self._active_workers) == 0
+
+ def _lock_on(self, object):
+ return self._lock_map[object]
+
+ def terminate(self):
+ """Mark flag so we stop providing br/reimages.
+
+ Also terminate each DutWorker, so they refuse to execute br or reimage.
+ """
+
+ self._terminated = True
+ for dut_worker in self._active_workers:
+ dut_worker.terminate()
+
+ def threads_status_as_string(self):
+ """Report the dut worker threads status."""
+
+ status = "{} active threads, {} finished threads.\n".format(
+ len(self._active_workers), len(self._finished_workers))
+ status += " Active threads:"
+ for dw in self._active_workers:
+ status += '\n ' + dw.status_str()
+ if self._finished_workers:
+ status += "\n Finished threads:"
+ for dw in self._finished_workers:
+ status += '\n ' + dw.status_str()
+ return status
+
class MockExperimentRunner(ExperimentRunner):
"""Mocked ExperimentRunner for testing."""
diff --git a/crosperf/experiment_status.py b/crosperf/experiment_status.py
index f855ca68..ae18dbca 100644
--- a/crosperf/experiment_status.py
+++ b/crosperf/experiment_status.py
@@ -95,11 +95,19 @@ class ExperimentStatus(object):
for key, val in status_bins.items():
status_strings.append("%s: %s" %
(key, self._GetNamesAndIterations(val)))
- result = "Thread Status:\n%s" % "\n".join(status_strings)
+ thread_status = ""
if self.experiment.log_level == "verbose":
- # Add the machine manager status.
- result += "\n" + self.experiment.machine_manager.AsString() + "\n"
+ thread_status_format = "Thread Status: \n{}\n"
+ if self.experiment.schedv2() is None:
+ # Add the machine manager status.
+ thread_status = thread_status_format.format(
+ self.experiment.machine_manager.AsString())
+ else:
+ thread_status = thread_status_format.format(
+ self.experiment.schedv2().threads_status_as_string())
+
+ result = "{}{}".format(thread_status, "\n".join(status_strings))
return result
diff --git a/crosperf/label.py b/crosperf/label.py
index 78fc1314..2f84e77b 100644
--- a/crosperf/label.py
+++ b/crosperf/label.py
@@ -7,13 +7,15 @@
"""The label of benchamrks."""
import os
+
+from image_checksummer import ImageChecksummer
from utils.file_utils import FileUtils
from utils import misc
class Label(object):
def __init__(self, name, chromeos_image, chromeos_root, board, remote,
- image_args, cache_dir, cache_only, chrome_src=None):
+ image_args, cache_dir, cache_only, log_level, chrome_src=None):
self.image_type = self._GetImageType(chromeos_image)
@@ -29,6 +31,7 @@ class Label(object):
self.image_args = image_args
self.cache_dir = cache_dir
self.cache_only = cache_only
+ self.log_level = log_level
if not chromeos_root:
if self.image_type == "local":
@@ -57,6 +60,17 @@ class Label(object):
% (name, chrome_src))
self.chrome_src = chromeos_src
+ self._SetupChecksum()
+
+ def _SetupChecksum(self):
+ """Compute label checksum only once."""
+
+ self.checksum = None
+ if self.image_type == "local":
+ self.checksum = ImageChecksummer().Checksum(self, self.log_level)
+ elif self.image_type == "trybot":
+ self.checksum = hashlib.md5(self.chromeos_image).hexdigest()
+
def _GetImageType(self, chromeos_image):
image_type = None
if chromeos_image.find("xbuddy://") < 0:
@@ -67,6 +81,21 @@ class Label(object):
image_type = "official"
return image_type
+ def __hash__(self):
+ """Label objects are used in a map, so provide "hash" and "equal"."""
+
+ return hash(self.name)
+
+ def __eq__(self, other):
+ """Label objects are used in a map, so provide "hash" and "equal"."""
+
+ return isinstance(other, Label) and other.name == self.name
+
+ def __str__(self):
+ """For better debugging."""
+
+ return 'label[name="{}"]'.format(self.name)
+
class MockLabel(object):
def __init__(self, name, chromeos_image, chromeos_root, board, remote,
image_args, cache_dir, cache_only, chrome_src=None):
diff --git a/crosperf/machine_image_manager.py b/crosperf/machine_image_manager.py
new file mode 100644
index 00000000..7cc081b5
--- /dev/null
+++ b/crosperf/machine_image_manager.py
@@ -0,0 +1,266 @@
+#!/usr/bin/python
+
+# Copyright 2015 Google Inc. All Rights Reserved.
+
+class MachineImageManager(object):
+ """Management of allocating images to duts.
+
+ * Data structure we have -
+
+ duts_ - list of duts, for each duts, we assume the following 2 properties
+ exist - label_ (the current label the duts_ carries or None, if it has an
+ alien image) and name (a string)
+
+ labels_ - a list of labels, for each label, we assume these properties
+ exist - remote (a set/vector/list of dut names (not dut object), to each
+ of which this image is compatible), remote could be none, which means
+ universal compatible.
+
+ label_duts_ - for each label, we maintain a list of duts, onto which the
+ label is imaged. Note this is an array of lists. Each element of each list
+ is an integer which is dut oridnal. We access this array using label
+ ordinal.
+
+ allocate_log_ - a list of allocation record. For example, if we allocate
+ l1 to d1, then l2 to d2, then allocate_log_ would be [(1, 1), (2, 2)].
+ This is used for debug/log, etc. All tuples in the list are integer pairs
+ (label_ordinal, dut_ordinal).
+
+ n_duts_ - number of duts.
+
+ n_labels_ - number of labels.
+
+ dut_name_ordinal_ - mapping from dut name (a string) to an integer,
+ starting from 0. So that duts_[dut_name_ordinal_[a_dut.name]]= a_dut.
+
+ * Problem abstraction -
+
+ Assume we have the following matrix - label X machine (row X col). A 'X'
+ in (i, j) in the matrix means machine and lable is not compatible, or that
+ we cannot image li to Mj.
+
+ M1 M2 M3
+ L1 X
+
+ L2 X
+
+ L3 X X
+
+ Now that we'll try to find a way to fill Ys in the matrix so that -
+
+ a) - each row at least get a Y, this ensures that each label get imaged
+ at least once, an apparent prerequiste.
+
+ b) - each column get at most N Ys. This make sure we can successfully
+ finish all tests by re-image each machine at most N times. That being
+ said, we could *OPTIONALLY* reimage some machines more than N times to
+ *accelerate* the test speed.
+
+ How to choose initial N for b) -
+ If number of duts (nd) is equal to or more than that of labels (nl), we
+ start from N == 1. Else we start from N = nl - nd + 1.
+
+ We will begin the search with pre-defined N, if we fail to find such a
+ solution for such N, we increase N by 1 and continue the search till we
+ get N == nl, at this case we fails.
+
+ Such a solution ensures minimal number of reimages.
+
+ * Solution representation
+
+ The solution will be placed inside the matrix, like below
+
+ M1 M2 M3 M4
+ L1 X X Y
+
+ L2 Y X
+
+ L3 X Y X
+
+ * Allocation algorithm
+
+ When Mj asks for a image, we check column j, pick the first cell that
+ contains a 'Y', and mark the cell '_'. If no such 'Y' exists (like M4 in
+ the above solution matrix), we just pick an image that the minimal reimage
+ number.
+
+ After allocate for M3
+ M1 M2 M3 M4
+ L1 X X _
+
+ L2 Y X
+
+ L3 X Y X
+
+ After allocate for M4
+ M1 M2 M3 M4
+ L1 X X _
+
+ L2 Y X _
+
+ L3 X Y X
+
+ After allocate for M2
+ M1 M2 M3 M4
+ L1 X X _
+
+ L2 Y X _
+
+ L3 X _ X
+
+ After allocate for M1
+ M1 M2 M3 M4
+ L1 X X _
+
+ L2 _ X _
+
+ L3 X _ X
+
+ After allocate for M2
+ M1 M2 M3 M4
+ L1 X X _
+
+ L2 _ _ X _
+
+ L3 X _ X
+
+ If we try to allocate for M1 or M2 or M3 again, we get None.
+
+ * Special / common case to handle seperately
+
+ We have only 1 dut or if we have only 1 label, that's simple enough.
+
+ """
+
+ def __init__(self, labels, duts):
+ self.labels_ = labels
+ self.duts_ = duts
+ self.n_labels_ = len(labels)
+ self.n_duts_ = len(duts)
+ self.dut_name_ordinal_ = dict()
+ for idx, dut in enumerate(self.duts_):
+ self.dut_name_ordinal_[dut.name] = idx
+
+ # Generate initial matrix containg 'X' or ' '.
+ self.matrix_ = [['X' if (l.remote and len(l.remote)) else ' ' \
+ for d in range(self.n_duts_)] for l in self.labels_]
+ for ol, l in enumerate(self.labels_):
+ if l.remote:
+ for r in l.remote:
+ self.matrix_[ol][self.dut_name_ordinal_[r]] = ' '
+
+ self.label_duts_ = [[] for _ in range(self.n_labels_)]
+ self.allocate_log_ = []
+
+ def compute_initial_allocation(self):
+ """Compute the initial label-dut allocation.
+
+ This method finds the most efficient way that every label gets imaged at
+ least once.
+
+ Returns:
+ False, only if not all labels could be imaged to a certain machine,
+ otherwise True.
+ """
+
+ if self.n_duts_ == 1:
+ for i, v in self.matrix_vertical_generator(0):
+ if v != 'X':
+ self.matrix_[i][0] = 'Y'
+ return
+
+ if self.n_labels_ == 1:
+ for j, v in self.matrix_horizontal_generator(0):
+ if v != 'X':
+ self.matrix_[0][j] = 'Y'
+ return
+
+ if self.n_duts_ >= self.n_labels_:
+ n = 1
+ else:
+ n = self.n_labels_ - self.n_duts_ + 1
+ while n <= self.n_labels_:
+ if self._compute_initial_allocation_internal(0, n):
+ break
+ n += 1
+
+ return n <= self.n_labels_
+
+ def _record_allocate_log(self, label_i, dut_j):
+ self.allocate_log_.append((label_i, dut_j))
+ self.label_duts_[label_i].append(dut_j)
+
+ def allocate(self, dut):
+ """Allocate a label for dut.
+
+ Arguments:
+ dut: the dut that asks for a new image.
+
+ Returns:
+ a label to image onto the dut or None if no more available images for
+ the dut.
+ """
+ j = self.dut_name_ordinal_[dut.name]
+ min_reimage_number = 999
+ min_i = 999
+ min_label = None
+ for i, v in self.matrix_vertical_generator(j):
+ label = self.labels_[i]
+ if v == 'Y':
+ self.matrix_[i][j] = '_'
+ self._record_allocate_log(i, j)
+ return label
+ if v == ' ':
+ label_reimage_number = len(self.label_duts_[i])
+ if label_reimage_number < min_reimage_number:
+ min_reimage_number = label_reimage_number
+ min_i = i
+ min_label = self.labels_[min_i]
+
+ # All labels are marked either '_' (already taken) or 'X' (not
+ # compatible), so return None to notify machine thread to quit.
+ if min_label is None:
+ return None
+
+ # At this point, we don't find any 'Y' for the machine, so we go the
+ # 'min' approach.
+ self.matrix_[min_i][j] = '_'
+ self._record_allocate_log(min_i, j)
+ return min_label
+
+ def matrix_vertical_generator(self, col):
+ """Iterate matrix vertically at column 'col'.
+
+ Yield row number i and value at matrix_[i][col].
+ """
+ for i, l in enumerate(self.labels_):
+ yield i, self.matrix_[i][col]
+
+ def matrix_horizontal_generator(self, row):
+ """Iterate matrix horizontally at row 'row'.
+
+ Yield col number j and value at matrix_[row][j].
+ """
+ for j, d in enumerate(self.duts_):
+ yield j, self.matrix_[row][j]
+
+
+ def _compute_initial_allocation_internal(self, level, N):
+ """ Search matrix for d with N. """
+
+ if level == self.n_labels_:
+ return True
+
+ for j, v in self.matrix_horizontal_generator(level):
+ if v == ' ':
+ # Before we put a 'Y', we check how many Y column 'j' has.
+ # Note y[0] is row idx, y[1] is the cell value.
+ ny = reduce(lambda x, y: x + 1 if (y[1] == 'Y') else x,
+ self.matrix_vertical_generator(j), 0)
+ if ny < N:
+ self.matrix_[level][j] = 'Y'
+ if self._compute_initial_allocation_internal(level + 1, N):
+ return True
+ self.matrix_[level][j] = ' '
+
+ return False
diff --git a/crosperf/machine_image_manager_unittest.py b/crosperf/machine_image_manager_unittest.py
new file mode 100644
index 00000000..1e8df33d
--- /dev/null
+++ b/crosperf/machine_image_manager_unittest.py
@@ -0,0 +1,329 @@
+#!/usr/bin/python
+
+# Copyright 2015 Google Inc. All Rights Reserved.
+
+import random
+import unittest
+
+from machine_image_manager import MachineImageManager
+
+class MockLabel(object):
+
+ def __init__(self, name, remotes=None):
+ self.name = name
+ self.remote = remotes
+
+ def __hash__(self):
+ """Provide hash function for label.
+ This is required because Label object is used inside a dict as key.
+ """
+ return hash(self.name)
+
+ def __eq__(self, other):
+ """Provide eq function for label.
+ This is required because Label object is used inside a dict as key.
+ """
+ return isinstance(other, MockLabel) and other.name == self.name
+
+class MockDut(object):
+
+ def __init__(self, name, label=None):
+ self.name = name
+ self.label_ = label
+
+
+class MachineImageManagerTester(unittest.TestCase):
+
+ def gen_duts_by_name(self, *names):
+ duts = []
+ for n in names:
+ duts.append(MockDut(n))
+ return duts
+
+ def print_matrix(self, matrix):
+ for r in matrix:
+ for v in r:
+ print '{} '.format('.' if v == ' ' else v),
+ print('')
+
+ def create_labels_and_duts_from_pattern(self, pattern):
+ labels = []
+ duts = []
+ for i, r in enumerate(pattern):
+ l = MockLabel('l{}'.format(i), [])
+ for j, v in enumerate(r.split()):
+ if v == '.':
+ l.remote.append('m{}'.format(j))
+ if i == 0:
+ duts.append(MockDut('m{}'.format(j)))
+ labels.append(l)
+ return labels, duts
+
+ def check_matrix_against_pattern(self, matrix, pattern):
+ for i, s in enumerate(pattern):
+ for j, v in enumerate(s.split()):
+ self.assertTrue(v == '.' and matrix[i][j] == ' ' or
+ v == matrix[i][j])
+
+ def pattern_based_test(self, input, output):
+ labels, duts = self.create_labels_and_duts_from_pattern(input)
+ mim = MachineImageManager(labels, duts)
+ self.assertTrue(mim.compute_initial_allocation())
+ self.check_matrix_against_pattern(mim.matrix_, output)
+ return mim
+
+ def test_single_dut(self):
+ labels = [MockLabel('l1'),
+ MockLabel('l2'),
+ MockLabel('l3')]
+ dut = MockDut('m1')
+ mim = MachineImageManager(labels, [dut])
+ mim.compute_initial_allocation()
+ self.assertTrue(mim.matrix_ == [['Y'], ['Y'], ['Y']])
+
+ def test_single_label(self):
+ labels = [MockLabel('l1')]
+ duts = self.gen_duts_by_name('m1', 'm2', 'm3')
+ mim = MachineImageManager(labels, duts)
+ mim.compute_initial_allocation()
+ self.assertTrue(mim.matrix_ == [['Y', 'Y', 'Y']])
+
+ def test_case1(self):
+ labels = [MockLabel('l1', ['m1', 'm2']),
+ MockLabel('l2', ['m2', 'm3']),
+ MockLabel('l3', ['m1'])]
+ duts = [MockDut('m1'), MockDut('m2'), MockDut('m3')]
+ mim = MachineImageManager(labels, duts)
+ self.assertTrue(mim.matrix_ == [[' ', ' ', 'X'],
+ ['X', ' ', ' '],
+ [' ', 'X', 'X']])
+ mim.compute_initial_allocation()
+ self.assertTrue(mim.matrix_ == [[' ', 'Y', 'X'],
+ ['X', ' ', 'Y'],
+ ['Y', 'X', 'X']])
+
+ def test_case2(self):
+ labels = [MockLabel('l1', ['m1', 'm2']),
+ MockLabel('l2', ['m2', 'm3']),
+ MockLabel('l3', ['m1'])]
+ duts = [MockDut('m1'), MockDut('m2'), MockDut('m3')]
+ mim = MachineImageManager(labels, duts)
+ self.assertTrue(mim.matrix_ == [[' ', ' ', 'X'],
+ ['X', ' ', ' '],
+ [' ', 'X', 'X']])
+ mim.compute_initial_allocation()
+ self.assertTrue(mim.matrix_ == [[' ', 'Y', 'X'],
+ ['X', ' ', 'Y'],
+ ['Y', 'X', 'X']])
+
+ def test_case3(self):
+ labels = [MockLabel('l1', ['m1', 'm2']),
+ MockLabel('l2', ['m2', 'm3']),
+ MockLabel('l3', ['m1'])]
+ duts = [MockDut('m1', labels[0]), MockDut('m2'), MockDut('m3')]
+ mim = MachineImageManager(labels, duts)
+ mim.compute_initial_allocation()
+ self.assertTrue(mim.matrix_ == [[' ', 'Y', 'X'],
+ ['X', ' ', 'Y'],
+ ['Y', 'X', 'X']])
+
+ def test_case4(self):
+ labels = [MockLabel('l1', ['m1', 'm2']),
+ MockLabel('l2', ['m2', 'm3']),
+ MockLabel('l3', ['m1'])]
+ duts = [MockDut('m1'), MockDut('m2', labels[0]), MockDut('m3')]
+ mim = MachineImageManager(labels, duts)
+ mim.compute_initial_allocation()
+ self.assertTrue(mim.matrix_ == [[' ', 'Y', 'X'],
+ ['X', ' ', 'Y'],
+ ['Y', 'X', 'X']])
+
+ def test_case5(self):
+ labels = [MockLabel('l1', ['m3']),
+ MockLabel('l2', ['m3']),
+ MockLabel('l3', ['m1'])]
+ duts = self.gen_duts_by_name('m1', 'm2', 'm3')
+ mim = MachineImageManager(labels, duts)
+ self.assertTrue(mim.compute_initial_allocation())
+ self.assertTrue(mim.matrix_ == [['X', 'X', 'Y'],
+ ['X', 'X', 'Y'],
+ ['Y', 'X', 'X']])
+
+ def test_2x2_with_allocation(self):
+ labels = [MockLabel('l0'), MockLabel('l1')]
+ duts = [MockDut('m0'), MockDut('m1')]
+ mim = MachineImageManager(labels, duts)
+ self.assertTrue(mim.compute_initial_allocation())
+ self.assertTrue(mim.allocate(duts[0]) == labels[0])
+ self.assertTrue(mim.allocate(duts[0]) == labels[1])
+ self.assertTrue(mim.allocate(duts[0]) is None)
+ self.assertTrue(mim.matrix_[0][0] == '_')
+ self.assertTrue(mim.matrix_[1][0] == '_')
+ self.assertTrue(mim.allocate(duts[1]) == labels[1])
+
+ def test_10x10_general(self):
+ """Gen 10x10 matrix."""
+ n = 10
+ labels = []
+ duts = []
+ for i in range(n):
+ labels.append(MockLabel('l{}'.format(i)))
+ duts.append(MockDut('m{}'.format(i)))
+ mim = MachineImageManager(labels, duts)
+ self.assertTrue(mim.compute_initial_allocation())
+ for i in range(n):
+ for j in range(n):
+ if i == j:
+ self.assertTrue(mim.matrix_[i][j] == 'Y')
+ else:
+ self.assertTrue(mim.matrix_[i][j] == ' ')
+ self.assertTrue(mim.allocate(duts[3]).name == 'l3')
+
+ def test_random_generated(self):
+ n = 10
+ labels = []
+ duts = []
+ for i in range(10):
+ # generate 3-5 machines that is compatible with this label
+ l = MockLabel('l{}'.format(i), [])
+ r = random.random()
+ for _ in range(4):
+ t = int(r * 10) % n
+ r *= 10
+ l.remote.append('m{}'.format(t))
+ labels.append(l)
+ duts.append(MockDut('m{}'.format(i)))
+ mim = MachineImageManager(labels, duts)
+ self.assertTrue(mim.compute_initial_allocation())
+
+ def test_10x10_fully_random(self):
+ input = ['X . . . X X . X X .',
+ 'X X . X . X . X X .',
+ 'X X X . . X . X . X',
+ 'X . X X . . X X . X',
+ 'X X X X . . . X . .',
+ 'X X . X . X . . X .',
+ '. X . X . X X X . .',
+ '. X . X X . X X . .',
+ 'X X . . . X X X . .',
+ '. X X X X . . . . X']
+ output = ['X Y . . X X . X X .',
+ 'X X Y X . X . X X .',
+ 'X X X Y . X . X . X',
+ 'X . X X Y . X X . X',
+ 'X X X X . Y . X . .',
+ 'X X . X . X Y . X .',
+ 'Y X . X . X X X . .',
+ '. X . X X . X X Y .',
+ 'X X . . . X X X . Y',
+ '. X X X X . . Y . X']
+ self.pattern_based_test(input, output)
+
+ def test_10x10_fully_random2(self):
+ input = ['X . X . . X . X X X',
+ 'X X X X X X . . X .',
+ 'X . X X X X X . . X',
+ 'X X X . X . X X . .',
+ '. X . X . X X X X X',
+ 'X X X X X X X . . X',
+ 'X . X X X X X . . X',
+ 'X X X . X X X X . .',
+ 'X X X . . . X X X X',
+ '. X X . X X X . X X']
+ output = ['X . X Y . X . X X X',
+ 'X X X X X X Y . X .',
+ 'X Y X X X X X . . X',
+ 'X X X . X Y X X . .',
+ '. X Y X . X X X X X',
+ 'X X X X X X X Y . X',
+ 'X . X X X X X . Y X',
+ 'X X X . X X X X . Y',
+ 'X X X . Y . X X X X',
+ 'Y X X . X X X . X X']
+ self.pattern_based_test(input, output)
+
+ def test_3x4_with_allocation(self):
+ input = ['X X . .',
+ '. . X .',
+ 'X . X .']
+ output = ['X X Y .',
+ 'Y . X .',
+ 'X Y X .']
+ mim = self.pattern_based_test(input, output)
+ self.assertTrue(mim.allocate(mim.duts_[2]) == mim.labels_[0])
+ self.assertTrue(mim.allocate(mim.duts_[3]) == mim.labels_[1])
+ self.assertTrue(mim.allocate(mim.duts_[0]) == mim.labels_[1])
+ self.assertTrue(mim.allocate(mim.duts_[1]) == mim.labels_[2])
+ self.assertTrue(mim.allocate(mim.duts_[3]) == mim.labels_[0])
+ self.assertTrue(mim.allocate(mim.duts_[3]) == mim.labels_[2])
+ self.assertTrue(mim.allocate(mim.duts_[3]) is None)
+ self.assertTrue(mim.allocate(mim.duts_[2]) is None)
+ self.assertTrue(mim.allocate(mim.duts_[1]) == mim.labels_[1])
+ self.assertTrue(mim.allocate(mim.duts_[1]) == None)
+ self.assertTrue(mim.allocate(mim.duts_[0]) == None)
+ self.assertTrue(mim.label_duts_[0] == [2, 3])
+ self.assertTrue(mim.label_duts_[1] == [3, 0, 1])
+ self.assertTrue(mim.label_duts_[2] == [1, 3])
+ self.assertTrue(mim.allocate_log_ ==
+ [(0, 2),
+ (1, 3),
+ (1, 0),
+ (2, 1),
+ (0, 3),
+ (2, 3),
+ (1, 1)])
+
+ def test_cornercase_1(self):
+ """This corner case is brought up by Caroline.
+
+ The description is -
+
+ If you have multiple labels and multiple machines, (so we don't
+ automatically fall into the 1 dut or 1 label case), but all of the
+ labels specify the same 1 remote, then instead of assigning the same
+ machine to all the labels, your algorithm fails to assign any...
+
+ So first step is to create an initial matrix like below, l0, l1 and l2
+ all specify the same 1 remote - m0.
+
+ m0 m1 m2
+ l0 . X X
+
+ l1 . X X
+
+ l2 . X X
+
+ The search process will be like this -
+ a) try to find a solution with at most 1 'Y's per column (but ensure at
+ least 1 Y per row), fail
+ b) try to find a solution with at most 2 'Y's per column (but ensure at
+ least 1 Y per row), fail
+ c) try to find a solution with at most 3 'Y's per column (but ensure at
+ least 1 Y per row), succeed, so we end up having this solution
+
+ m0 m1 m2
+ l0 Y X X
+
+ l1 Y X X
+
+ l2 Y X X
+
+ """
+
+ input = ['. X X',
+ '. X X',
+ '. X X', ]
+ output = ['Y X X',
+ 'Y X X',
+ 'Y X X', ]
+ mim = self.pattern_based_test(input, output)
+ self.assertTrue(mim.allocate(mim.duts_[1]) is None)
+ self.assertTrue(mim.allocate(mim.duts_[2]) is None)
+ self.assertTrue(mim.allocate(mim.duts_[0]) == mim.labels_[0])
+ self.assertTrue(mim.allocate(mim.duts_[0]) == mim.labels_[1])
+ self.assertTrue(mim.allocate(mim.duts_[0]) == mim.labels_[2])
+ self.assertTrue(mim.allocate(mim.duts_[0]) is None)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/crosperf/machine_manager.py b/crosperf/machine_manager.py
index 79423481..91fa1302 100644
--- a/crosperf/machine_manager.py
+++ b/crosperf/machine_manager.py
@@ -20,8 +20,6 @@ from utils import logger
from utils import misc
from utils.file_utils import FileUtils
-from image_checksummer import ImageChecksummer
-
CHECKSUM_FILE = "/usr/local/osimage_checksum_file"
class NonMatchingMachines(Exception):
@@ -34,6 +32,9 @@ class CrosMachine(object):
def __init__(self, name, chromeos_root, log_level, cmd_exec=None):
self.name = name
self.image = None
+ # We relate a dut with a label if we reimage the dut using label or we
+ # detect at the very beginning that the dut is running this label.
+ self.label = None
self.checksum = None
self.locked = False
self.released_time = time.time()
@@ -216,12 +217,7 @@ class MachineManager(object):
self._machines.remove(m)
def ImageMachine(self, machine, label):
- if label.image_type == "local":
- checksum = ImageChecksummer().Checksum(label, self.log_level)
- elif label.image_type == "trybot":
- checksum = machine._GetMD5Checksum(label.chromeos_image)
- else:
- checksum = None
+ checksum = label.checksum
if checksum and (machine.checksum == checksum):
return
@@ -247,9 +243,9 @@ class MachineManager(object):
with self.image_lock:
if self.log_level != "verbose":
self.logger.LogOutput("Pushing image onto machine.")
- self.logger.LogOutput("CMD : python %s "
+ self.logger.LogOutput("Running image_chromeos.DoImage with %s"
% " ".join(image_chromeos_args))
- retval = self.ce.RunCommand(" ".join(["python"] + image_chromeos_args))
+ retval = image_chromeos.DoImage(image_chromeos_args)
if retval:
cmd = "reboot && exit"
if self.log_level != "verbose":
@@ -259,15 +255,16 @@ class MachineManager(object):
time.sleep(60)
if self.log_level != "verbose":
self.logger.LogOutput("Pushing image onto machine.")
- self.logger.LogOutput("CMD : python %s "
+ self.logger.LogOutput("Running image_chromeos.DoImage with %s"
% " ".join(image_chromeos_args))
- retval = self.ce.RunCommand(" ".join(["python"] + image_chromeos_args))
+ retval = image_chromeos.DoImage(image_chromeos_args)
if retval:
raise Exception("Could not image machine: '%s'." % machine.name)
else:
self.num_reimages += 1
machine.checksum = checksum
machine.image = label.chromeos_image
+ machine.label = label
self.ce.log_level = save_ce_log_level
return retval
@@ -344,12 +341,7 @@ class MachineManager(object):
m.SetUpChecksumInfo()
def AcquireMachine(self, chromeos_image, label, throw=False):
- if label.image_type == "local":
- image_checksum = ImageChecksummer().Checksum(label, self.log_level)
- elif label.image_type == "trybot":
- image_checksum = hashlib.md5(chromeos_image).hexdigest()
- else:
- image_checksum = None
+ image_checksum = label.checksum
machines = self.GetMachines(label)
check_interval_time = 120
with self._lock:
diff --git a/image_chromeos.py b/image_chromeos.py
index dd9a336f..1d977e03 100755
--- a/image_chromeos.py
+++ b/image_chromeos.py
@@ -50,7 +50,7 @@ def CheckForCrosFlash(chromeos_root, remote, log_level):
def DoImage(argv):
- """Build ChromeOS."""
+ """Image ChromeOS."""
parser = optparse.OptionParser()
parser.add_option("-c", "--chromeos_root", dest="chromeos_root",