From e5bc63bbed4e001b080c4ce0b18c5c78900d4786 Mon Sep 17 00:00:00 2001 From: cmtice Date: Wed, 27 May 2015 16:59:37 -0700 Subject: Implement new global locking scheme for machines. This CL implements a new machine locking mechanism using Autotest Front End servers. When locking/unlocking a lab machine, it uses the ChromeOS HW lab server; when locking/unlocking a local machine, it uses a local AFE server on chrotomation2. BUG=None TEST=Tested the script manually to lock/unlock machines and query status. Also tested with image_chromeos and with crosperf scripts. Change-Id: I2793bc1f7dc056e725694e81ded656d9f49d227b Reviewed-on: https://chrome-internal-review.googlesource.com/217560 Reviewed-by: Luis Lozano Reviewed-by: David Sharp Tested-by: Caroline Tice Commit-Queue: Caroline Tice --- crosperf/experiment.py | 23 ++++++++- crosperf/experiment_runner.py | 110 +++++++++++++++++++++++++++++++----------- crosperf/machine_manager.py | 60 ++++++++--------------- crosperf/settings_factory.py | 3 +- 4 files changed, 125 insertions(+), 71 deletions(-) (limited to 'crosperf') diff --git a/crosperf/experiment.py b/crosperf/experiment.py index 3bcd1a83..a7a4a7a0 100644 --- a/crosperf/experiment.py +++ b/crosperf/experiment.py @@ -9,6 +9,8 @@ import os import time +import afe_lock_machine + from utils import logger from utils import misc @@ -55,12 +57,20 @@ class Experiment(object): raise Exception("No chromeos_root given and could not determine one from " "the image path.") + # This is a local directory, where the machine manager will keep track of + # which machines are available for which benchmark run. The assumption is + # that all of the machines have been globally locked for this experiment, + # to keep other people/experiments from accessing them, but we still need the + # local locks directory to keep two or more benchmark runs within the same + # experiment from trying to use the same machine at the same time. + local_locks_directory = os.path.join(self.working_directory, + "local_locks") if test_flag.GetTestMode(): self.machine_manager = MockMachineManager(chromeos_root, acquire_timeout, log_level, locks_directory) else: self.machine_manager = MachineManager(chromeos_root, acquire_timeout, - log_level, locks_directory) + log_level, local_locks_directory) self.l = logger.GetLogger(log_dir) for machine in remote: @@ -134,4 +144,13 @@ class Experiment(object): benchmark_run.SetCacheConditions(cache_conditions) def Cleanup(self): - self.machine_manager.Cleanup() + """Make sure all machines are unlocked.""" + all_machines = self.remote + for l in self.labels: + all_machines += l.remote + lock_mgr = afe_lock_machine.AFELockManager(all_machines, "", + self.labels[0].chromeos_root, None) + machine_states = lock_mgr.GetMachineStates("unlock") + for k, state in machine_states.iteritems(): + if state["locked"]: + lock_mgr.UpdateLockInAFE(False, k) diff --git a/crosperf/experiment_runner.py b/crosperf/experiment_runner.py index d27144ed..ec109b5a 100644 --- a/crosperf/experiment_runner.py +++ b/crosperf/experiment_runner.py @@ -7,6 +7,8 @@ import getpass import os import time +import afe_lock_machine + from utils import command_executer from utils import logger from utils.email_sender import EmailSender @@ -32,38 +34,92 @@ class ExperimentRunner(object): if experiment.log_level != "verbose": self.STATUS_TIME_DELAY = 10 + def _GetMachineList(self): + """Return a list of all requested machines. + + Create a list of all the requested machines, both global requests and + label-specific requests, and return the list. + """ + machines = self._experiment.remote + for l in self._experiment.labels: + if l.remote: + machines += l.remote + return machines + + def _LockAllMachines(self, experiment): + """Attempt to globally lock all of the machines requested for run. + + This method will use the AFE server to globally lock all of the machines + requested for this crosperf run, to prevent any other crosperf runs from + being able to update/use the machines while this experiment is running. + """ + lock_mgr = afe_lock_machine.AFELockManager( + self._GetMachineList(), + "", + experiment.labels[0].chromeos_root, + None, + log=self.l, + ) + for m in lock_mgr.machines: + if not lock_mgr.MachineIsKnown(m): + lock_mgr.AddLocalMachine(m) + machine_states = lock_mgr.GetMachineStates("lock") + lock_mgr.CheckMachineLocks(machine_states, "lock") + lock_mgr.UpdateMachines(True) + + def _UnlockAllMachines(self, experiment): + """Attempt to globally unlock all of the machines requested for run. + + The method will use the AFE server to globally unlock all of the machines + requested for this crosperf run. + """ + lock_mgr = afe_lock_machine.AFELockManager( + self._GetMachineList(), + "", + experiment.labels[0].chromeos_root, + None, + log=self.l, + ) + machine_states = lock_mgr.GetMachineStates("unlock") + lock_mgr.CheckMachineLocks(machine_states, "unlock") + lock_mgr.UpdateMachines(False) + def _Run(self, experiment): - status = ExperimentStatus(experiment) - experiment.Run() - last_status_time = 0 - last_status_string = "" try: - if experiment.log_level != "verbose": - self.l.LogStartDots() - while not experiment.IsComplete(): - if last_status_time + self.STATUS_TIME_DELAY < time.time(): - last_status_time = time.time() - border = "==============================" - if experiment.log_level == "verbose": - self.l.LogOutput(border) - self.l.LogOutput(status.GetProgressString()) - self.l.LogOutput(status.GetStatusString()) - self.l.LogOutput(border) - else: - current_status_string = status.GetStatusString() - if (current_status_string != last_status_string): - self.l.LogEndDots() + self._LockAllMachines(experiment) + status = ExperimentStatus(experiment) + experiment.Run() + last_status_time = 0 + last_status_string = "" + try: + if experiment.log_level != "verbose": + self.l.LogStartDots() + while not experiment.IsComplete(): + if last_status_time + self.STATUS_TIME_DELAY < time.time(): + last_status_time = time.time() + border = "==============================" + if experiment.log_level == "verbose": self.l.LogOutput(border) - self.l.LogOutput(current_status_string) + self.l.LogOutput(status.GetProgressString()) + self.l.LogOutput(status.GetStatusString()) self.l.LogOutput(border) - last_status_string = current_status_string else: - self.l.LogAppendDot() - time.sleep(self.THREAD_MONITOR_DELAY) - except KeyboardInterrupt: - self._terminated = True - self.l.LogError("Ctrl-c pressed. Cleaning up...") - experiment.Terminate() + current_status_string = status.GetStatusString() + if (current_status_string != last_status_string): + self.l.LogEndDots() + self.l.LogOutput(border) + self.l.LogOutput(current_status_string) + self.l.LogOutput(border) + last_status_string = current_status_string + else: + self.l.LogAppendDot() + time.sleep(self.THREAD_MONITOR_DELAY) + except KeyboardInterrupt: + self._terminated = True + self.l.LogError("Ctrl-c pressed. Cleaning up...") + experiment.Terminate() + finally: + self._UnlockAllMachines(experiment) def _PrintTable(self, experiment): self.l.LogOutput(TextResultsReport(experiment).GetReport()) diff --git a/crosperf/machine_manager.py b/crosperf/machine_manager.py index 0941e91a..2155d377 100644 --- a/crosperf/machine_manager.py +++ b/crosperf/machine_manager.py @@ -6,7 +6,6 @@ import hashlib import image_chromeos -import lock_machine import math import os.path import re @@ -172,6 +171,16 @@ class CrosMachine(object): class MachineManager(object): + """Lock, image and unlock machines locally for benchmark runs. + + This class contains methods and calls to lock, unlock and image + machines and distribute machines to each benchmark run. The assumption is + that all of the machines for the experiment have been globally locked + (using an AFE server) in the ExperimentRunner, but the machines still need + to be locally locked/unlocked (allocated to benchmark runs) to prevent + multiple benchmark runs within the same experiment from trying to use the + same machine at the same time. + """ def __init__(self, chromeos_root, acquire_timeout, log_level, locks_dir, cmd_exec=None, lgr=None): self._lock = threading.RLock() @@ -184,20 +193,11 @@ class MachineManager(object): self.machine_checksum_string = {} self.acquire_timeout = acquire_timeout self.log_level = log_level - self.locks_dir = locks_dir + self.locks_dir = None self.ce = cmd_exec or command_executer.GetCommandExecuter( log_level=self.log_level) self.logger = lgr or logger.GetLogger() - if self.locks_dir != lock_machine.Machine.LOCKS_DIR: - msg = ("WARNING: If you use your own locks directory, there is no" - " guarantee that someone else might not hold a lock on the same" - " machine in a different locks directory.") - self.logger.LogOutput(msg) - - if not os.path.isdir(self.locks_dir): - raise MissingLocksDirectory ("Cannot access locks directory: %s" - % self.locks_dir) self._initialized_machines = [] self.chromeos_root = chromeos_root @@ -215,6 +215,7 @@ class MachineManager(object): if not chromeos_root: chromeos_root = self.chromeos_root image_chromeos_args = [image_chromeos.__file__, + "--no_lock", "--chromeos_root=%s" % chromeos_root, "--image=%s" % label.chromeos_image, "--image_args=%s" % label.image_args, @@ -275,19 +276,14 @@ class MachineManager(object): for m in self._machines: if m.name == cros_machine.name: return - locked = lock_machine.Machine(cros_machine.name, - self.locks_dir).Lock(True, - sys.argv[0]) - if locked: - self._machines.append(cros_machine) - command = "cat %s" % CHECKSUM_FILE - ret, out, _ = self.ce.CrosRunCommand( - command, return_output=True, chromeos_root=self.chromeos_root, - machine=cros_machine.name) - if ret == 0: - cros_machine.checksum = out.strip() - else: - self.logger.LogOutput("Couldn't lock: %s" % cros_machine.name) + + self._machines.append(cros_machine) + command = "cat %s" % CHECKSUM_FILE + ret, out, _ = self.ce.CrosRunCommand( + command, return_output=True, chromeos_root=self.chromeos_root, + machine=cros_machine.name) + if ret == 0: + cros_machine.checksum = out.strip() # This is called from single threaded mode. def AddMachine(self, machine_name): @@ -313,12 +309,6 @@ class MachineManager(object): with self._lock: self._machines = [m for m in self._machines if m.name != machine_name] - res = lock_machine.Machine(machine_name, - self.locks_dir).Unlock(True) - - if not res: - self.logger.LogError("Could not unlock machine: '%s'." - % m.name) def ForceSameImageToAllMachines(self, label): machines = self.GetMachines(label) @@ -420,16 +410,6 @@ class MachineManager(object): m.status = "Available" break - def Cleanup(self): - with self._lock: - # Unlock all machines. - for m in self._machines: - res = lock_machine.Machine(m.name, self.locks_dir).Unlock(True) - - if not res: - self.logger.LogError("Could not unlock machine: '%s'." - % m.name) - def __str__(self): with self._lock: l = ["MachineManager Status:"] diff --git a/crosperf/settings_factory.py b/crosperf/settings_factory.py index 9def4398..6793a608 100644 --- a/crosperf/settings_factory.py +++ b/crosperf/settings_factory.py @@ -138,8 +138,7 @@ class GlobalSettings(Settings): " separated by a \",\"")) self.AddField(TextField("results_dir", default="", description="The results dir")) - default_locks_dir = lock_machine.Machine.LOCKS_DIR - self.AddField(TextField("locks_dir", default=default_locks_dir, + self.AddField(TextField("locks_dir", default="", description="An alternate directory to use for " "storing/checking machine locks.\n" "WARNING: If you use your own locks directory, " -- cgit v1.2.3