From ba64928c5dcbacbc70b4358881a89ad96227164d Mon Sep 17 00:00:00 2001 From: Han Shen Date: Wed, 5 Aug 2015 17:19:55 -0700 Subject: Crosperf schedv2 (1) - new option and integrating new scheduler. This Cl introduces a new option '--schedv2' which uses the new scheduler to allocate jobs (benchmark_runs) to duts. With this option, schedv2 takes control of storing/allocating jobs and reimage machines using the new algorithm. This CL leaves actual reimaging and running jobs non-op (a random time sleep is used for each such op, which would be replace in later CLs.) You may try this CL like this and see how schedv2 works - crosperf --locks_dir=/usr/local/google/home/shenhan/tmp --use_file_locks=True --logging_level=verbose --schedv2 some.exp Change-Id: If5bb7751b466c39e54c93fe8f0b4e363be6d9165 Reviewed-on: https://chrome-internal-review.googlesource.com/225515 Commit-Queue: Han Shen Tested-by: Han Shen Reviewed-by: Han Shen --- crosperf/benchmark_run.py | 26 ++- crosperf/crosperf.py | 7 +- crosperf/experiment.py | 51 ++++- crosperf/experiment_factory.py | 2 +- crosperf/experiment_runner.py | 308 ++++++++++++++++++++++++++- crosperf/experiment_status.py | 14 +- crosperf/label.py | 31 ++- crosperf/machine_image_manager.py | 266 +++++++++++++++++++++++ crosperf/machine_image_manager_unittest.py | 329 +++++++++++++++++++++++++++++ crosperf/machine_manager.py | 28 +-- image_chromeos.py | 2 +- 11 files changed, 1024 insertions(+), 40 deletions(-) create mode 100644 crosperf/machine_image_manager.py create mode 100644 crosperf/machine_image_manager_unittest.py diff --git a/crosperf/benchmark_run.py b/crosperf/benchmark_run.py index cee41dac..dabbdeb2 100644 --- a/crosperf/benchmark_run.py +++ b/crosperf/benchmark_run.py @@ -64,6 +64,9 @@ class BenchmarkRun(threading.Thread): self.timeline.Record(STATUS_PENDING) self.share_cache = share_cache + # This is used by schedv2. + self.owner_thread = None + def ReadCache(self): # Just use the first machine for running the cached version, # without locking it. @@ -137,7 +140,10 @@ class BenchmarkRun(threading.Thread): self.timeline.Record(STATUS_FAILED) self.failure_reason = str(e) finally: - if self.machine: + if self.owner_thread is not None: + # In schedv2 mode, we do not lock machine locally. So noop here. + pass + elif self.machine: if not self.machine.IsReachable(): self._logger.LogOutput("Machine %s is not reachable, removing it." % self.machine.name) @@ -154,6 +160,10 @@ class BenchmarkRun(threading.Thread): self.failure_reason = "Thread terminated." def AcquireMachine(self): + if self.owner_thread is not None: + # No need to lock machine locally, DutWorker, which is a thread, is + # responsible for running br. + return self.owner_thread.dut() while True: machine = None if self.terminated: @@ -204,8 +214,13 @@ class BenchmarkRun(threading.Thread): def RunTest(self, machine): self.timeline.Record(STATUS_IMAGING) - self.machine_manager.ImageMachine(machine, - self.label) + if self.owner_thread is not None: + # In schedv2 mode, do not even call ImageMachine. Machine image is + # guarenteed. + pass + else: + self.machine_manager.ImageMachine(machine, + self.label) self.timeline.Record(STATUS_RUNNING) [retval, out, err] = self.suite_runner.Run(machine.name, self.label, @@ -226,6 +241,11 @@ class BenchmarkRun(threading.Thread): def SetCacheConditions(self, cache_conditions): self.cache_conditions = cache_conditions + def __str__(self): + """For better debugging.""" + + return 'BenchmarkRun[name="{}"]'.format(self.name) + class MockBenchmarkRun(BenchmarkRun): """Inherited from BenchmarkRuna.""" diff --git a/crosperf/crosperf.py b/crosperf/crosperf.py index 88e510fd..b0f0b0c9 100755 --- a/crosperf/crosperf.py +++ b/crosperf/crosperf.py @@ -64,6 +64,10 @@ def Main(argv): formatter=MyIndentedHelpFormatter(), version="%prog 3.0") + parser.add_option("--schedv2", + dest="schedv2", + action="store_true", + help="Use crosperf scheduler v2 (feature in progress).") parser.add_option("-l", "--log_dir", dest="log_dir", default="", @@ -101,7 +105,8 @@ def Main(argv): if options.dry_run: runner = MockExperimentRunner(experiment) else: - runner = ExperimentRunner(experiment) + runner = ExperimentRunner(experiment, using_schedv2=options.schedv2) + runner.Run() if __name__ == "__main__": diff --git a/crosperf/experiment.py b/crosperf/experiment.py index 9f88ed01..59e932f6 100644 --- a/crosperf/experiment.py +++ b/crosperf/experiment.py @@ -10,6 +10,7 @@ import os import time import afe_lock_machine +from threading import Lock from utils import logger from utils import misc @@ -79,6 +80,15 @@ class Experiment(object): self.start_time = None self.benchmark_runs = self._GenerateBenchmarkRuns() + self._schedv2 = None + self._internal_counter_lock = Lock() + + def set_schedv2(self, schedv2): + self._schedv2 = schedv2 + + def schedv2(self): + return self._schedv2 + def _GenerateBenchmarkRuns(self): """Generate benchmark runs from labels and benchmark defintions.""" benchmark_runs = [] @@ -109,12 +119,17 @@ class Experiment(object): pass def Terminate(self): - for t in self.benchmark_runs: - if t.isAlive(): - self.l.LogError("Terminating run: '%s'." % t.name) - t.Terminate() + if self._schedv2 is not None: + self._schedv2.terminate() + else: + for t in self.benchmark_runs: + if t.isAlive(): + self.l.LogError("Terminating run: '%s'." % t.name) + t.Terminate() def IsComplete(self): + if self._schedv2: + return self._schedv2.is_complete() if self.active_threads: for t in self.active_threads: if t.isAlive(): @@ -127,14 +142,30 @@ class Experiment(object): return False return True + def BenchmarkRunFinished(self, br): + """Update internal counters after br finishes. + + Note this is only used by schedv2 and is called by multiple threads. + Never throw any exception here. + """ + + assert self._schedv2 is not None + with self._internal_counter_lock: + self.num_complete += 1 + if not br.cache_hit: + self.num_run_complete += 1 + def Run(self): self.start_time = time.time() - self.active_threads = [] - for benchmark_run in self.benchmark_runs: - # Set threads to daemon so program exits when ctrl-c is pressed. - benchmark_run.daemon = True - benchmark_run.start() - self.active_threads.append(benchmark_run) + if self._schedv2 is not None: + self._schedv2.run_sched() + else: + self.active_threads = [] + for benchmark_run in self.benchmark_runs: + # Set threads to daemon so program exits when ctrl-c is pressed. + benchmark_run.daemon = True + benchmark_run.start() + self.active_threads.append(benchmark_run) def SetCacheConditions(self, cache_conditions): for benchmark_run in self.benchmark_runs: diff --git a/crosperf/experiment_factory.py b/crosperf/experiment_factory.py index 65d7ce8d..0f90383c 100644 --- a/crosperf/experiment_factory.py +++ b/crosperf/experiment_factory.py @@ -248,7 +248,7 @@ class ExperimentFactory(object): image_args, cache_dir, cache_only, chrome_src) else: label = Label(label_name, image, chromeos_root, board, my_remote, - image_args, cache_dir, cache_only, chrome_src) + image_args, cache_dir, cache_only, log_level, chrome_src) labels.append(label) email = global_settings.GetField("email") diff --git a/crosperf/experiment_runner.py b/crosperf/experiment_runner.py index 541bebe0..90c07e77 100644 --- a/crosperf/experiment_runner.py +++ b/crosperf/experiment_runner.py @@ -5,14 +5,21 @@ """The experiment runner module.""" import getpass import os +import random +import sys import time +import traceback import afe_lock_machine +from machine_image_manager import MachineImageManager +from collections import defaultdict from utils import command_executer from utils import logger from utils.email_sender import EmailSender from utils.file_utils import FileUtils +from threading import Lock +from threading import Thread import config from experiment_status import ExperimentStatus @@ -26,7 +33,7 @@ class ExperimentRunner(object): STATUS_TIME_DELAY = 30 THREAD_MONITOR_DELAY = 2 - def __init__(self, experiment, log=None, cmd_exec=None): + def __init__(self, experiment, using_schedv2=False, log=None, cmd_exec=None): self._experiment = experiment self.l = log or logger.GetLogger(experiment.log_dir) self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l) @@ -35,6 +42,9 @@ class ExperimentRunner(object): if experiment.log_level != "verbose": self.STATUS_TIME_DELAY = 10 + # Setting this to True will use crosperf sched v2 (feature in progress). + self._using_schedv2 = using_schedv2 + def _GetMachineList(self): """Return a list of all requested machines. @@ -88,7 +98,8 @@ class ExperimentRunner(object): self.locked_machines = lock_mgr.UpdateMachines(True) self._experiment.locked_machines = self.locked_machines self._UpdateMachineList(self.locked_machines) - self._experiment.machine_manager.RemoveNonLockedMachines(self.locked_machines) + self._experiment.machine_manager.RemoveNonLockedMachines( + self.locked_machines) if len(self.locked_machines) == 0: raise RuntimeError("Unable to lock any machines.") @@ -116,6 +127,9 @@ class ExperimentRunner(object): try: if not experiment.locks_dir: self._LockAllMachines(experiment) + if self._using_schedv2: + schedv2 = Schedv2(experiment) + experiment.set_schedv2(schedv2) status = ExperimentStatus(experiment) experiment.Run() last_status_time = 0 @@ -223,6 +237,296 @@ class ExperimentRunner(object): self._StoreResults(self._experiment) self._Email(self._experiment) +class DutWorker(Thread): + + def __init__(self, dut, sched): + super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name)) + self._dut = dut + self._sched = sched + self._stat_num_br_run = 0 + self._stat_num_reimage = 0 + self._stat_annotation = "" + self._l = logger.GetLogger(self._sched._experiment.log_dir) + self.daemon = True + self._terminated = False + self._active_br = None + # Race condition accessing _active_br between _execute_benchmark_run and + # _terminate, so lock it up. + self._active_br_lock = Lock() + + def terminate(self): + self._terminated = True + with self._active_br_lock: + if self._active_br is not None: + # BenchmarkRun.Terminate() terminates any running testcase via + # suite_runner.Terminate and updates timeline. + self._active_br.Terminate() + + def run(self): + """Do the "run-test->(optionally reimage)->run-test" chore. + + Note - 'br' below means 'benchmark_run'. + """ + + self._setup_dut_label() + try: + self._l.LogOutput("{} started.".format(self)) + while not self._terminated: + br = self._sched.get_benchmark_run(self._dut) + if br is None: + # No br left for this label. Considering reimaging. + label = self._sched.allocate_label(self._dut) + if label is None: + # No br even for other labels. We are done. + self._l.LogOutput("ImageManager found no label " + "for dut, stopping working " + "thread {}.".format(self)) + break + if self._reimage(label): + # Reimage to run other br fails, dut is doomed, stop + # this thread. + self._l.LogWarning("Re-image failed, dut " + "in an unstable state, stopping " + "working thread {}.".format(self)) + break + else: + # Execute the br. + self._execute_benchmark_run(br) + finally: + self._stat_annotation = "finished" + # Thread finishes. Notify scheduler that I'm done. + self._sched.dut_worker_finished(self) + + def _reimage(self, label): + """Reimage image to label. + + Args: + label: the label to remimage onto dut. + + Returns: + 0 if successful, otherwise 1. + """ + + # Termination could happen anywhere, check it. + if self._terminated: + return 1 + + self._l.LogOutput('Reimaging {} using {}'.format(self, label)) + self._stat_num_reimage += 1 + self._stat_annotation = 'reimaging using "{}"'.format(label.name) + try: + # Note, only 1 reimage at any given time, this is guaranteed in + # ImageMachine, so no sync needed below. + retval = self._sched._experiment.machine_manager.ImageMachine( + self._dut, label) + if retval: + return 1 + except: + return 1 + + self._dut.label = label + return 0 + + def _execute_benchmark_run(self, br): + """Execute a single benchmark_run. + + Note - this function never throws exceptions. + """ + + # Termination could happen anywhere, check it. + if self._terminated: + return + + self._l.LogOutput('{} started working on {}'.format(self, br)) + self._stat_num_br_run += 1 + self._stat_annotation = 'executing {}'.format(br) + # benchmark_run.run does not throws, but just play it safe here. + try: + assert br.owner_thread is None + br.owner_thread = self + with self._active_br_lock: + self._active_br = br + br.run() + finally: + self._sched._experiment.BenchmarkRunFinished(br) + with self._active_br_lock: + self._active_br = None + + def _setup_dut_label(self): + """Try to match dut image with a certain experiment label. + + If such match is found, we just skip doing reimage and jump to execute + some benchmark_runs. + """ + + checksum_file = "/usr/local/osimage_checksum_file" + try: + rv, checksum, _ = command_executer.GetCommandExecuter().\ + CrosRunCommand( + "cat " + checksum_file, + return_output=True, + chromeos_root=self._sched._labels[0].chromeos_root, + machine=self._dut.name) + if rv == 0: + checksum = checksum.strip() + for l in self._sched._labels: + if l.checksum == checksum: + self._l.LogOutput( + "Dut '{}' is pre-installed with '{}'".format( + self._dut.name, l)) + self._dut.label = l + return + except: + traceback.print_exc(file=sys.stdout) + self._dut.label = None + + def __str__(self): + return 'DutWorker[dut="{}", label="{}"]'.format( + self._dut.name, self._dut.label.name if self._dut.label else "None") + + def dut(self): + return self._dut + + def status_str(self): + """Report thread status.""" + + return ('Worker thread "{}", label="{}", benchmark_run={}, ' + 'reimage={}, now {}'.format( + self._dut.name, + 'None' if self._dut.label is None else self._dut.label.name, + self._stat_num_br_run, + self._stat_num_reimage, + self._stat_annotation)) + + +class Schedv2(object): + """New scheduler for crosperf.""" + + def __init__(self, experiment): + self._experiment = experiment + self._l = logger.GetLogger(experiment.log_dir) + + # Create shortcuts to nested data structure. "_duts" points to a list of + # locked machines. _labels points to a list of all labels. + self._duts = self._experiment.machine_manager._all_machines + self._labels = self._experiment.labels + + # Mapping from label to a list of benchmark_runs. + self._label_brl_map = dict([(l, []) for l in self._labels]) + for br in self._experiment.benchmark_runs: + assert br.label in self._label_brl_map + self._label_brl_map[br.label].append(br) + + # Use machine image manager to calculate initial label allocation. + self._mim = MachineImageManager(self._labels, self._duts) + self._mim.compute_initial_allocation() + + # Create worker thread, 1 per dut. + self._active_workers = [DutWorker(dut, self) for dut in self._duts] + self._finished_workers = [] + + # Bookkeeping for synchronization. + self._workers_lock = Lock() + self._lock_map = defaultdict(lambda: Lock()) + + # Termination flag. + self._terminated = False + + def run_sched(self): + """Start all dut worker threads and return immediately.""" + + [w.start() for w in self._active_workers] + + def get_benchmark_run(self, dut): + """Get a benchmark_run (br) object for a certain dut. + + Arguments: + dut: the dut for which a br is returned. + + Returns: + A br with its label matching that of the dut. If no such br could be + found, return None (this usually means a reimage is required for the + dut). + """ + + # If terminated, stop providing any br. + if self._terminated: + return None + + # If dut bears an unrecognized label, return None. + if dut.label is None: + return None + + # If br list for the dut's label is empty (that means all brs for this + # label have been done) , return None. + with self._lock_on(dut.label): + brl = self._label_brl_map[dut.label] + if not brl: + return None + # Return the first br. + return brl.pop(0) + + def allocate_label(self, dut): + """Allocate a label to a dut. + + The work is delegated to MachineImageManager. + + The dut_worker calling this method is responsible for reimage the dut to + this label. + + Arguments: + dut: the new label that is to be reimaged onto the dut. + + Returns: + The label or None. + """ + + if self._terminated: + return None + + return self._mim.allocate(dut) + + def dut_worker_finished(self, dut_worker): + """Notify schedv2 that the dut_worker thread finished. + + Arguemnts: + dut_worker: the thread that is about to end.""" + + self._l.LogOutput("{} finished.".format(dut_worker)) + with self._workers_lock: + self._active_workers.remove(dut_worker) + self._finished_workers.append(dut_worker) + + def is_complete(self): + return len(self._active_workers) == 0 + + def _lock_on(self, object): + return self._lock_map[object] + + def terminate(self): + """Mark flag so we stop providing br/reimages. + + Also terminate each DutWorker, so they refuse to execute br or reimage. + """ + + self._terminated = True + for dut_worker in self._active_workers: + dut_worker.terminate() + + def threads_status_as_string(self): + """Report the dut worker threads status.""" + + status = "{} active threads, {} finished threads.\n".format( + len(self._active_workers), len(self._finished_workers)) + status += " Active threads:" + for dw in self._active_workers: + status += '\n ' + dw.status_str() + if self._finished_workers: + status += "\n Finished threads:" + for dw in self._finished_workers: + status += '\n ' + dw.status_str() + return status + class MockExperimentRunner(ExperimentRunner): """Mocked ExperimentRunner for testing.""" diff --git a/crosperf/experiment_status.py b/crosperf/experiment_status.py index f855ca68..ae18dbca 100644 --- a/crosperf/experiment_status.py +++ b/crosperf/experiment_status.py @@ -95,11 +95,19 @@ class ExperimentStatus(object): for key, val in status_bins.items(): status_strings.append("%s: %s" % (key, self._GetNamesAndIterations(val))) - result = "Thread Status:\n%s" % "\n".join(status_strings) + thread_status = "" if self.experiment.log_level == "verbose": - # Add the machine manager status. - result += "\n" + self.experiment.machine_manager.AsString() + "\n" + thread_status_format = "Thread Status: \n{}\n" + if self.experiment.schedv2() is None: + # Add the machine manager status. + thread_status = thread_status_format.format( + self.experiment.machine_manager.AsString()) + else: + thread_status = thread_status_format.format( + self.experiment.schedv2().threads_status_as_string()) + + result = "{}{}".format(thread_status, "\n".join(status_strings)) return result diff --git a/crosperf/label.py b/crosperf/label.py index 78fc1314..2f84e77b 100644 --- a/crosperf/label.py +++ b/crosperf/label.py @@ -7,13 +7,15 @@ """The label of benchamrks.""" import os + +from image_checksummer import ImageChecksummer from utils.file_utils import FileUtils from utils import misc class Label(object): def __init__(self, name, chromeos_image, chromeos_root, board, remote, - image_args, cache_dir, cache_only, chrome_src=None): + image_args, cache_dir, cache_only, log_level, chrome_src=None): self.image_type = self._GetImageType(chromeos_image) @@ -29,6 +31,7 @@ class Label(object): self.image_args = image_args self.cache_dir = cache_dir self.cache_only = cache_only + self.log_level = log_level if not chromeos_root: if self.image_type == "local": @@ -57,6 +60,17 @@ class Label(object): % (name, chrome_src)) self.chrome_src = chromeos_src + self._SetupChecksum() + + def _SetupChecksum(self): + """Compute label checksum only once.""" + + self.checksum = None + if self.image_type == "local": + self.checksum = ImageChecksummer().Checksum(self, self.log_level) + elif self.image_type == "trybot": + self.checksum = hashlib.md5(self.chromeos_image).hexdigest() + def _GetImageType(self, chromeos_image): image_type = None if chromeos_image.find("xbuddy://") < 0: @@ -67,6 +81,21 @@ class Label(object): image_type = "official" return image_type + def __hash__(self): + """Label objects are used in a map, so provide "hash" and "equal".""" + + return hash(self.name) + + def __eq__(self, other): + """Label objects are used in a map, so provide "hash" and "equal".""" + + return isinstance(other, Label) and other.name == self.name + + def __str__(self): + """For better debugging.""" + + return 'label[name="{}"]'.format(self.name) + class MockLabel(object): def __init__(self, name, chromeos_image, chromeos_root, board, remote, image_args, cache_dir, cache_only, chrome_src=None): diff --git a/crosperf/machine_image_manager.py b/crosperf/machine_image_manager.py new file mode 100644 index 00000000..7cc081b5 --- /dev/null +++ b/crosperf/machine_image_manager.py @@ -0,0 +1,266 @@ +#!/usr/bin/python + +# Copyright 2015 Google Inc. All Rights Reserved. + +class MachineImageManager(object): + """Management of allocating images to duts. + + * Data structure we have - + + duts_ - list of duts, for each duts, we assume the following 2 properties + exist - label_ (the current label the duts_ carries or None, if it has an + alien image) and name (a string) + + labels_ - a list of labels, for each label, we assume these properties + exist - remote (a set/vector/list of dut names (not dut object), to each + of which this image is compatible), remote could be none, which means + universal compatible. + + label_duts_ - for each label, we maintain a list of duts, onto which the + label is imaged. Note this is an array of lists. Each element of each list + is an integer which is dut oridnal. We access this array using label + ordinal. + + allocate_log_ - a list of allocation record. For example, if we allocate + l1 to d1, then l2 to d2, then allocate_log_ would be [(1, 1), (2, 2)]. + This is used for debug/log, etc. All tuples in the list are integer pairs + (label_ordinal, dut_ordinal). + + n_duts_ - number of duts. + + n_labels_ - number of labels. + + dut_name_ordinal_ - mapping from dut name (a string) to an integer, + starting from 0. So that duts_[dut_name_ordinal_[a_dut.name]]= a_dut. + + * Problem abstraction - + + Assume we have the following matrix - label X machine (row X col). A 'X' + in (i, j) in the matrix means machine and lable is not compatible, or that + we cannot image li to Mj. + + M1 M2 M3 + L1 X + + L2 X + + L3 X X + + Now that we'll try to find a way to fill Ys in the matrix so that - + + a) - each row at least get a Y, this ensures that each label get imaged + at least once, an apparent prerequiste. + + b) - each column get at most N Ys. This make sure we can successfully + finish all tests by re-image each machine at most N times. That being + said, we could *OPTIONALLY* reimage some machines more than N times to + *accelerate* the test speed. + + How to choose initial N for b) - + If number of duts (nd) is equal to or more than that of labels (nl), we + start from N == 1. Else we start from N = nl - nd + 1. + + We will begin the search with pre-defined N, if we fail to find such a + solution for such N, we increase N by 1 and continue the search till we + get N == nl, at this case we fails. + + Such a solution ensures minimal number of reimages. + + * Solution representation + + The solution will be placed inside the matrix, like below + + M1 M2 M3 M4 + L1 X X Y + + L2 Y X + + L3 X Y X + + * Allocation algorithm + + When Mj asks for a image, we check column j, pick the first cell that + contains a 'Y', and mark the cell '_'. If no such 'Y' exists (like M4 in + the above solution matrix), we just pick an image that the minimal reimage + number. + + After allocate for M3 + M1 M2 M3 M4 + L1 X X _ + + L2 Y X + + L3 X Y X + + After allocate for M4 + M1 M2 M3 M4 + L1 X X _ + + L2 Y X _ + + L3 X Y X + + After allocate for M2 + M1 M2 M3 M4 + L1 X X _ + + L2 Y X _ + + L3 X _ X + + After allocate for M1 + M1 M2 M3 M4 + L1 X X _ + + L2 _ X _ + + L3 X _ X + + After allocate for M2 + M1 M2 M3 M4 + L1 X X _ + + L2 _ _ X _ + + L3 X _ X + + If we try to allocate for M1 or M2 or M3 again, we get None. + + * Special / common case to handle seperately + + We have only 1 dut or if we have only 1 label, that's simple enough. + + """ + + def __init__(self, labels, duts): + self.labels_ = labels + self.duts_ = duts + self.n_labels_ = len(labels) + self.n_duts_ = len(duts) + self.dut_name_ordinal_ = dict() + for idx, dut in enumerate(self.duts_): + self.dut_name_ordinal_[dut.name] = idx + + # Generate initial matrix containg 'X' or ' '. + self.matrix_ = [['X' if (l.remote and len(l.remote)) else ' ' \ + for d in range(self.n_duts_)] for l in self.labels_] + for ol, l in enumerate(self.labels_): + if l.remote: + for r in l.remote: + self.matrix_[ol][self.dut_name_ordinal_[r]] = ' ' + + self.label_duts_ = [[] for _ in range(self.n_labels_)] + self.allocate_log_ = [] + + def compute_initial_allocation(self): + """Compute the initial label-dut allocation. + + This method finds the most efficient way that every label gets imaged at + least once. + + Returns: + False, only if not all labels could be imaged to a certain machine, + otherwise True. + """ + + if self.n_duts_ == 1: + for i, v in self.matrix_vertical_generator(0): + if v != 'X': + self.matrix_[i][0] = 'Y' + return + + if self.n_labels_ == 1: + for j, v in self.matrix_horizontal_generator(0): + if v != 'X': + self.matrix_[0][j] = 'Y' + return + + if self.n_duts_ >= self.n_labels_: + n = 1 + else: + n = self.n_labels_ - self.n_duts_ + 1 + while n <= self.n_labels_: + if self._compute_initial_allocation_internal(0, n): + break + n += 1 + + return n <= self.n_labels_ + + def _record_allocate_log(self, label_i, dut_j): + self.allocate_log_.append((label_i, dut_j)) + self.label_duts_[label_i].append(dut_j) + + def allocate(self, dut): + """Allocate a label for dut. + + Arguments: + dut: the dut that asks for a new image. + + Returns: + a label to image onto the dut or None if no more available images for + the dut. + """ + j = self.dut_name_ordinal_[dut.name] + min_reimage_number = 999 + min_i = 999 + min_label = None + for i, v in self.matrix_vertical_generator(j): + label = self.labels_[i] + if v == 'Y': + self.matrix_[i][j] = '_' + self._record_allocate_log(i, j) + return label + if v == ' ': + label_reimage_number = len(self.label_duts_[i]) + if label_reimage_number < min_reimage_number: + min_reimage_number = label_reimage_number + min_i = i + min_label = self.labels_[min_i] + + # All labels are marked either '_' (already taken) or 'X' (not + # compatible), so return None to notify machine thread to quit. + if min_label is None: + return None + + # At this point, we don't find any 'Y' for the machine, so we go the + # 'min' approach. + self.matrix_[min_i][j] = '_' + self._record_allocate_log(min_i, j) + return min_label + + def matrix_vertical_generator(self, col): + """Iterate matrix vertically at column 'col'. + + Yield row number i and value at matrix_[i][col]. + """ + for i, l in enumerate(self.labels_): + yield i, self.matrix_[i][col] + + def matrix_horizontal_generator(self, row): + """Iterate matrix horizontally at row 'row'. + + Yield col number j and value at matrix_[row][j]. + """ + for j, d in enumerate(self.duts_): + yield j, self.matrix_[row][j] + + + def _compute_initial_allocation_internal(self, level, N): + """ Search matrix for d with N. """ + + if level == self.n_labels_: + return True + + for j, v in self.matrix_horizontal_generator(level): + if v == ' ': + # Before we put a 'Y', we check how many Y column 'j' has. + # Note y[0] is row idx, y[1] is the cell value. + ny = reduce(lambda x, y: x + 1 if (y[1] == 'Y') else x, + self.matrix_vertical_generator(j), 0) + if ny < N: + self.matrix_[level][j] = 'Y' + if self._compute_initial_allocation_internal(level + 1, N): + return True + self.matrix_[level][j] = ' ' + + return False diff --git a/crosperf/machine_image_manager_unittest.py b/crosperf/machine_image_manager_unittest.py new file mode 100644 index 00000000..1e8df33d --- /dev/null +++ b/crosperf/machine_image_manager_unittest.py @@ -0,0 +1,329 @@ +#!/usr/bin/python + +# Copyright 2015 Google Inc. All Rights Reserved. + +import random +import unittest + +from machine_image_manager import MachineImageManager + +class MockLabel(object): + + def __init__(self, name, remotes=None): + self.name = name + self.remote = remotes + + def __hash__(self): + """Provide hash function for label. + This is required because Label object is used inside a dict as key. + """ + return hash(self.name) + + def __eq__(self, other): + """Provide eq function for label. + This is required because Label object is used inside a dict as key. + """ + return isinstance(other, MockLabel) and other.name == self.name + +class MockDut(object): + + def __init__(self, name, label=None): + self.name = name + self.label_ = label + + +class MachineImageManagerTester(unittest.TestCase): + + def gen_duts_by_name(self, *names): + duts = [] + for n in names: + duts.append(MockDut(n)) + return duts + + def print_matrix(self, matrix): + for r in matrix: + for v in r: + print '{} '.format('.' if v == ' ' else v), + print('') + + def create_labels_and_duts_from_pattern(self, pattern): + labels = [] + duts = [] + for i, r in enumerate(pattern): + l = MockLabel('l{}'.format(i), []) + for j, v in enumerate(r.split()): + if v == '.': + l.remote.append('m{}'.format(j)) + if i == 0: + duts.append(MockDut('m{}'.format(j))) + labels.append(l) + return labels, duts + + def check_matrix_against_pattern(self, matrix, pattern): + for i, s in enumerate(pattern): + for j, v in enumerate(s.split()): + self.assertTrue(v == '.' and matrix[i][j] == ' ' or + v == matrix[i][j]) + + def pattern_based_test(self, input, output): + labels, duts = self.create_labels_and_duts_from_pattern(input) + mim = MachineImageManager(labels, duts) + self.assertTrue(mim.compute_initial_allocation()) + self.check_matrix_against_pattern(mim.matrix_, output) + return mim + + def test_single_dut(self): + labels = [MockLabel('l1'), + MockLabel('l2'), + MockLabel('l3')] + dut = MockDut('m1') + mim = MachineImageManager(labels, [dut]) + mim.compute_initial_allocation() + self.assertTrue(mim.matrix_ == [['Y'], ['Y'], ['Y']]) + + def test_single_label(self): + labels = [MockLabel('l1')] + duts = self.gen_duts_by_name('m1', 'm2', 'm3') + mim = MachineImageManager(labels, duts) + mim.compute_initial_allocation() + self.assertTrue(mim.matrix_ == [['Y', 'Y', 'Y']]) + + def test_case1(self): + labels = [MockLabel('l1', ['m1', 'm2']), + MockLabel('l2', ['m2', 'm3']), + MockLabel('l3', ['m1'])] + duts = [MockDut('m1'), MockDut('m2'), MockDut('m3')] + mim = MachineImageManager(labels, duts) + self.assertTrue(mim.matrix_ == [[' ', ' ', 'X'], + ['X', ' ', ' '], + [' ', 'X', 'X']]) + mim.compute_initial_allocation() + self.assertTrue(mim.matrix_ == [[' ', 'Y', 'X'], + ['X', ' ', 'Y'], + ['Y', 'X', 'X']]) + + def test_case2(self): + labels = [MockLabel('l1', ['m1', 'm2']), + MockLabel('l2', ['m2', 'm3']), + MockLabel('l3', ['m1'])] + duts = [MockDut('m1'), MockDut('m2'), MockDut('m3')] + mim = MachineImageManager(labels, duts) + self.assertTrue(mim.matrix_ == [[' ', ' ', 'X'], + ['X', ' ', ' '], + [' ', 'X', 'X']]) + mim.compute_initial_allocation() + self.assertTrue(mim.matrix_ == [[' ', 'Y', 'X'], + ['X', ' ', 'Y'], + ['Y', 'X', 'X']]) + + def test_case3(self): + labels = [MockLabel('l1', ['m1', 'm2']), + MockLabel('l2', ['m2', 'm3']), + MockLabel('l3', ['m1'])] + duts = [MockDut('m1', labels[0]), MockDut('m2'), MockDut('m3')] + mim = MachineImageManager(labels, duts) + mim.compute_initial_allocation() + self.assertTrue(mim.matrix_ == [[' ', 'Y', 'X'], + ['X', ' ', 'Y'], + ['Y', 'X', 'X']]) + + def test_case4(self): + labels = [MockLabel('l1', ['m1', 'm2']), + MockLabel('l2', ['m2', 'm3']), + MockLabel('l3', ['m1'])] + duts = [MockDut('m1'), MockDut('m2', labels[0]), MockDut('m3')] + mim = MachineImageManager(labels, duts) + mim.compute_initial_allocation() + self.assertTrue(mim.matrix_ == [[' ', 'Y', 'X'], + ['X', ' ', 'Y'], + ['Y', 'X', 'X']]) + + def test_case5(self): + labels = [MockLabel('l1', ['m3']), + MockLabel('l2', ['m3']), + MockLabel('l3', ['m1'])] + duts = self.gen_duts_by_name('m1', 'm2', 'm3') + mim = MachineImageManager(labels, duts) + self.assertTrue(mim.compute_initial_allocation()) + self.assertTrue(mim.matrix_ == [['X', 'X', 'Y'], + ['X', 'X', 'Y'], + ['Y', 'X', 'X']]) + + def test_2x2_with_allocation(self): + labels = [MockLabel('l0'), MockLabel('l1')] + duts = [MockDut('m0'), MockDut('m1')] + mim = MachineImageManager(labels, duts) + self.assertTrue(mim.compute_initial_allocation()) + self.assertTrue(mim.allocate(duts[0]) == labels[0]) + self.assertTrue(mim.allocate(duts[0]) == labels[1]) + self.assertTrue(mim.allocate(duts[0]) is None) + self.assertTrue(mim.matrix_[0][0] == '_') + self.assertTrue(mim.matrix_[1][0] == '_') + self.assertTrue(mim.allocate(duts[1]) == labels[1]) + + def test_10x10_general(self): + """Gen 10x10 matrix.""" + n = 10 + labels = [] + duts = [] + for i in range(n): + labels.append(MockLabel('l{}'.format(i))) + duts.append(MockDut('m{}'.format(i))) + mim = MachineImageManager(labels, duts) + self.assertTrue(mim.compute_initial_allocation()) + for i in range(n): + for j in range(n): + if i == j: + self.assertTrue(mim.matrix_[i][j] == 'Y') + else: + self.assertTrue(mim.matrix_[i][j] == ' ') + self.assertTrue(mim.allocate(duts[3]).name == 'l3') + + def test_random_generated(self): + n = 10 + labels = [] + duts = [] + for i in range(10): + # generate 3-5 machines that is compatible with this label + l = MockLabel('l{}'.format(i), []) + r = random.random() + for _ in range(4): + t = int(r * 10) % n + r *= 10 + l.remote.append('m{}'.format(t)) + labels.append(l) + duts.append(MockDut('m{}'.format(i))) + mim = MachineImageManager(labels, duts) + self.assertTrue(mim.compute_initial_allocation()) + + def test_10x10_fully_random(self): + input = ['X . . . X X . X X .', + 'X X . X . X . X X .', + 'X X X . . X . X . X', + 'X . X X . . X X . X', + 'X X X X . . . X . .', + 'X X . X . X . . X .', + '. X . X . X X X . .', + '. X . X X . X X . .', + 'X X . . . X X X . .', + '. X X X X . . . . X'] + output = ['X Y . . X X . X X .', + 'X X Y X . X . X X .', + 'X X X Y . X . X . X', + 'X . X X Y . X X . X', + 'X X X X . Y . X . .', + 'X X . X . X Y . X .', + 'Y X . X . X X X . .', + '. X . X X . X X Y .', + 'X X . . . X X X . Y', + '. X X X X . . Y . X'] + self.pattern_based_test(input, output) + + def test_10x10_fully_random2(self): + input = ['X . X . . X . X X X', + 'X X X X X X . . X .', + 'X . X X X X X . . X', + 'X X X . X . X X . .', + '. X . X . X X X X X', + 'X X X X X X X . . X', + 'X . X X X X X . . X', + 'X X X . X X X X . .', + 'X X X . . . X X X X', + '. X X . X X X . X X'] + output = ['X . X Y . X . X X X', + 'X X X X X X Y . X .', + 'X Y X X X X X . . X', + 'X X X . X Y X X . .', + '. X Y X . X X X X X', + 'X X X X X X X Y . X', + 'X . X X X X X . Y X', + 'X X X . X X X X . Y', + 'X X X . Y . X X X X', + 'Y X X . X X X . X X'] + self.pattern_based_test(input, output) + + def test_3x4_with_allocation(self): + input = ['X X . .', + '. . X .', + 'X . X .'] + output = ['X X Y .', + 'Y . X .', + 'X Y X .'] + mim = self.pattern_based_test(input, output) + self.assertTrue(mim.allocate(mim.duts_[2]) == mim.labels_[0]) + self.assertTrue(mim.allocate(mim.duts_[3]) == mim.labels_[1]) + self.assertTrue(mim.allocate(mim.duts_[0]) == mim.labels_[1]) + self.assertTrue(mim.allocate(mim.duts_[1]) == mim.labels_[2]) + self.assertTrue(mim.allocate(mim.duts_[3]) == mim.labels_[0]) + self.assertTrue(mim.allocate(mim.duts_[3]) == mim.labels_[2]) + self.assertTrue(mim.allocate(mim.duts_[3]) is None) + self.assertTrue(mim.allocate(mim.duts_[2]) is None) + self.assertTrue(mim.allocate(mim.duts_[1]) == mim.labels_[1]) + self.assertTrue(mim.allocate(mim.duts_[1]) == None) + self.assertTrue(mim.allocate(mim.duts_[0]) == None) + self.assertTrue(mim.label_duts_[0] == [2, 3]) + self.assertTrue(mim.label_duts_[1] == [3, 0, 1]) + self.assertTrue(mim.label_duts_[2] == [1, 3]) + self.assertTrue(mim.allocate_log_ == + [(0, 2), + (1, 3), + (1, 0), + (2, 1), + (0, 3), + (2, 3), + (1, 1)]) + + def test_cornercase_1(self): + """This corner case is brought up by Caroline. + + The description is - + + If you have multiple labels and multiple machines, (so we don't + automatically fall into the 1 dut or 1 label case), but all of the + labels specify the same 1 remote, then instead of assigning the same + machine to all the labels, your algorithm fails to assign any... + + So first step is to create an initial matrix like below, l0, l1 and l2 + all specify the same 1 remote - m0. + + m0 m1 m2 + l0 . X X + + l1 . X X + + l2 . X X + + The search process will be like this - + a) try to find a solution with at most 1 'Y's per column (but ensure at + least 1 Y per row), fail + b) try to find a solution with at most 2 'Y's per column (but ensure at + least 1 Y per row), fail + c) try to find a solution with at most 3 'Y's per column (but ensure at + least 1 Y per row), succeed, so we end up having this solution + + m0 m1 m2 + l0 Y X X + + l1 Y X X + + l2 Y X X + + """ + + input = ['. X X', + '. X X', + '. X X', ] + output = ['Y X X', + 'Y X X', + 'Y X X', ] + mim = self.pattern_based_test(input, output) + self.assertTrue(mim.allocate(mim.duts_[1]) is None) + self.assertTrue(mim.allocate(mim.duts_[2]) is None) + self.assertTrue(mim.allocate(mim.duts_[0]) == mim.labels_[0]) + self.assertTrue(mim.allocate(mim.duts_[0]) == mim.labels_[1]) + self.assertTrue(mim.allocate(mim.duts_[0]) == mim.labels_[2]) + self.assertTrue(mim.allocate(mim.duts_[0]) is None) + + +if __name__ == '__main__': + unittest.main() diff --git a/crosperf/machine_manager.py b/crosperf/machine_manager.py index 79423481..91fa1302 100644 --- a/crosperf/machine_manager.py +++ b/crosperf/machine_manager.py @@ -20,8 +20,6 @@ from utils import logger from utils import misc from utils.file_utils import FileUtils -from image_checksummer import ImageChecksummer - CHECKSUM_FILE = "/usr/local/osimage_checksum_file" class NonMatchingMachines(Exception): @@ -34,6 +32,9 @@ class CrosMachine(object): def __init__(self, name, chromeos_root, log_level, cmd_exec=None): self.name = name self.image = None + # We relate a dut with a label if we reimage the dut using label or we + # detect at the very beginning that the dut is running this label. + self.label = None self.checksum = None self.locked = False self.released_time = time.time() @@ -216,12 +217,7 @@ class MachineManager(object): self._machines.remove(m) def ImageMachine(self, machine, label): - if label.image_type == "local": - checksum = ImageChecksummer().Checksum(label, self.log_level) - elif label.image_type == "trybot": - checksum = machine._GetMD5Checksum(label.chromeos_image) - else: - checksum = None + checksum = label.checksum if checksum and (machine.checksum == checksum): return @@ -247,9 +243,9 @@ class MachineManager(object): with self.image_lock: if self.log_level != "verbose": self.logger.LogOutput("Pushing image onto machine.") - self.logger.LogOutput("CMD : python %s " + self.logger.LogOutput("Running image_chromeos.DoImage with %s" % " ".join(image_chromeos_args)) - retval = self.ce.RunCommand(" ".join(["python"] + image_chromeos_args)) + retval = image_chromeos.DoImage(image_chromeos_args) if retval: cmd = "reboot && exit" if self.log_level != "verbose": @@ -259,15 +255,16 @@ class MachineManager(object): time.sleep(60) if self.log_level != "verbose": self.logger.LogOutput("Pushing image onto machine.") - self.logger.LogOutput("CMD : python %s " + self.logger.LogOutput("Running image_chromeos.DoImage with %s" % " ".join(image_chromeos_args)) - retval = self.ce.RunCommand(" ".join(["python"] + image_chromeos_args)) + retval = image_chromeos.DoImage(image_chromeos_args) if retval: raise Exception("Could not image machine: '%s'." % machine.name) else: self.num_reimages += 1 machine.checksum = checksum machine.image = label.chromeos_image + machine.label = label self.ce.log_level = save_ce_log_level return retval @@ -344,12 +341,7 @@ class MachineManager(object): m.SetUpChecksumInfo() def AcquireMachine(self, chromeos_image, label, throw=False): - if label.image_type == "local": - image_checksum = ImageChecksummer().Checksum(label, self.log_level) - elif label.image_type == "trybot": - image_checksum = hashlib.md5(chromeos_image).hexdigest() - else: - image_checksum = None + image_checksum = label.checksum machines = self.GetMachines(label) check_interval_time = 120 with self._lock: diff --git a/image_chromeos.py b/image_chromeos.py index dd9a336f..1d977e03 100755 --- a/image_chromeos.py +++ b/image_chromeos.py @@ -50,7 +50,7 @@ def CheckForCrosFlash(chromeos_root, remote, log_level): def DoImage(argv): - """Build ChromeOS.""" + """Image ChromeOS.""" parser = optparse.OptionParser() parser.add_option("-c", "--chromeos_root", dest="chromeos_root", -- cgit v1.2.3