diff options
Diffstat (limited to 'deprecated/automation/common')
-rw-r--r-- | deprecated/automation/common/__init__.py | 1 | ||||
-rw-r--r-- | deprecated/automation/common/command.py | 241 | ||||
-rw-r--r-- | deprecated/automation/common/command_executer.py | 230 | ||||
-rwxr-xr-x | deprecated/automation/common/command_executer_test.py | 210 | ||||
-rw-r--r-- | deprecated/automation/common/events.py | 149 | ||||
-rw-r--r-- | deprecated/automation/common/job.py | 178 | ||||
-rw-r--r-- | deprecated/automation/common/job_group.py | 73 | ||||
-rw-r--r-- | deprecated/automation/common/logger.py | 144 | ||||
-rw-r--r-- | deprecated/automation/common/machine.py | 70 | ||||
-rwxr-xr-x | deprecated/automation/common/machine_test.py | 26 | ||||
-rw-r--r-- | deprecated/automation/common/state_machine.py | 54 |
11 files changed, 1376 insertions, 0 deletions
diff --git a/deprecated/automation/common/__init__.py b/deprecated/automation/common/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/deprecated/automation/common/__init__.py @@ -0,0 +1 @@ + diff --git a/deprecated/automation/common/command.py b/deprecated/automation/common/command.py new file mode 100644 index 00000000..c56e9fad --- /dev/null +++ b/deprecated/automation/common/command.py @@ -0,0 +1,241 @@ +# Copyright 2011 Google Inc. All Rights Reserved. + +__author__ = 'kbaclawski@google.com (Krystian Baclawski)' + +import abc +import collections +import os.path + + +class Shell(object): + """Class used to build a string representation of a shell command.""" + + def __init__(self, cmd, *args, **kwargs): + assert all(key in ['path', 'ignore_error'] for key in kwargs) + + self._cmd = cmd + self._args = list(args) + self._path = kwargs.get('path', '') + self._ignore_error = bool(kwargs.get('ignore_error', False)) + + def __str__(self): + cmdline = [os.path.join(self._path, self._cmd)] + cmdline.extend(self._args) + + cmd = ' '.join(cmdline) + + if self._ignore_error: + cmd = '{ %s; true; }' % cmd + + return cmd + + def AddOption(self, option): + self._args.append(option) + + +class Wrapper(object): + """Wraps a command with environment which gets cleaned up after execution.""" + + _counter = 1 + + def __init__(self, command, cwd=None, env=None, umask=None): + # @param cwd: temporary working directory + # @param env: dictionary of environment variables + self._command = command + self._prefix = Chain() + self._suffix = Chain() + + if cwd: + self._prefix.append(Shell('pushd', cwd)) + self._suffix.insert(0, Shell('popd')) + + if env: + for env_var, value in env.items(): + self._prefix.append(Shell('%s=%s' % (env_var, value))) + self._suffix.insert(0, Shell('unset', env_var)) + + if umask: + umask_save_var = 'OLD_UMASK_%d' % self.counter + + self._prefix.append(Shell('%s=$(umask)' % umask_save_var)) + self._prefix.append(Shell('umask', umask)) + self._suffix.insert(0, Shell('umask', '$%s' % umask_save_var)) + + @property + def counter(self): + counter = self._counter + self._counter += 1 + return counter + + def __str__(self): + return str(Chain(self._prefix, self._command, self._suffix)) + + +class AbstractCommandContainer(collections.MutableSequence): + """Common base for all classes that behave like command container.""" + + def __init__(self, *commands): + self._commands = list(commands) + + def __contains__(self, command): + return command in self._commands + + def __iter__(self): + return iter(self._commands) + + def __len__(self): + return len(self._commands) + + def __getitem__(self, index): + return self._commands[index] + + def __setitem__(self, index, command): + self._commands[index] = self._ValidateCommandType(command) + + def __delitem__(self, index): + del self._commands[index] + + def insert(self, index, command): + self._commands.insert(index, self._ValidateCommandType(command)) + + @abc.abstractmethod + def __str__(self): + pass + + @abc.abstractproperty + def stored_types(self): + pass + + def _ValidateCommandType(self, command): + if type(command) not in self.stored_types: + raise TypeError('Command cannot have %s type.' % type(command)) + else: + return command + + def _StringifyCommands(self): + cmds = [] + + for cmd in self: + if isinstance(cmd, AbstractCommandContainer) and len(cmd) > 1: + cmds.append('{ %s; }' % cmd) + else: + cmds.append(str(cmd)) + + return cmds + + +class Chain(AbstractCommandContainer): + """Container that chains shell commands using (&&) shell operator.""" + + @property + def stored_types(self): + return [str, Shell, Chain, Pipe] + + def __str__(self): + return ' && '.join(self._StringifyCommands()) + + +class Pipe(AbstractCommandContainer): + """Container that chains shell commands using pipe (|) operator.""" + + def __init__(self, *commands, **kwargs): + assert all(key in ['input', 'output'] for key in kwargs) + + AbstractCommandContainer.__init__(self, *commands) + + self._input = kwargs.get('input', None) + self._output = kwargs.get('output', None) + + @property + def stored_types(self): + return [str, Shell] + + def __str__(self): + pipe = self._StringifyCommands() + + if self._input: + pipe.insert(str(Shell('cat', self._input), 0)) + + if self._output: + pipe.append(str(Shell('tee', self._output))) + + return ' | '.join(pipe) + +# TODO(kbaclawski): Unfortunately we don't have any policy describing which +# directories can or cannot be touched by a job. Thus, I cannot decide how to +# protect a system against commands that are considered to be dangerous (like +# RmTree("${HOME}")). AFAIK we'll have to execute some commands with root access +# (especially for ChromeOS related jobs, which involve chroot-ing), which is +# even more scary. + + +def Copy(*args, **kwargs): + assert all(key in ['to_dir', 'recursive'] for key in kwargs.keys()) + + options = [] + + if 'to_dir' in kwargs: + options.extend(['-t', kwargs['to_dir']]) + + if 'recursive' in kwargs: + options.append('-r') + + options.extend(args) + + return Shell('cp', *options) + + +def RemoteCopyFrom(from_machine, from_path, to_path, username=None): + from_path = os.path.expanduser(from_path) + '/' + to_path = os.path.expanduser(to_path) + '/' + + if not username: + login = from_machine + else: + login = '%s@%s' % (username, from_machine) + + return Chain( + MakeDir(to_path), Shell('rsync', '-a', '%s:%s' % + (login, from_path), to_path)) + + +def MakeSymlink(to_path, link_name): + return Shell('ln', '-f', '-s', '-T', to_path, link_name) + + +def MakeDir(*dirs, **kwargs): + options = ['-p'] + + mode = kwargs.get('mode', None) + + if mode: + options.extend(['-m', str(mode)]) + + options.extend(dirs) + + return Shell('mkdir', *options) + + +def RmTree(*dirs): + return Shell('rm', '-r', '-f', *dirs) + + +def UnTar(tar_file, dest_dir): + return Chain( + MakeDir(dest_dir), Shell('tar', '-x', '-f', tar_file, '-C', dest_dir)) + + +def Tar(tar_file, *args): + options = ['-c'] + + if tar_file.endswith('.tar.bz2'): + options.append('-j') + elif tar_file.endswith('.tar.gz'): + options.append('-z') + else: + assert tar_file.endswith('.tar') + + options.extend(['-f', tar_file]) + options.extend(args) + + return Chain(MakeDir(os.path.dirname(tar_file)), Shell('tar', *options)) diff --git a/deprecated/automation/common/command_executer.py b/deprecated/automation/common/command_executer.py new file mode 100644 index 00000000..c0f314f5 --- /dev/null +++ b/deprecated/automation/common/command_executer.py @@ -0,0 +1,230 @@ +# Copyright 2011 Google Inc. All Rights Reserved. +# +"""Classes that help running commands in a subshell. + +Commands can be run locally, or remotly using SSH connection. You may log the +output of a command to a terminal or a file, or any other destination. +""" + +__author__ = 'kbaclawski@google.com (Krystian Baclawski)' + +import fcntl +import logging +import os +import select +import subprocess +import time + +from automation.common import logger + + +class CommandExecuter(object): + DRY_RUN = False + + def __init__(self, dry_run=False): + self._logger = logging.getLogger(self.__class__.__name__) + self._dry_run = dry_run or self.DRY_RUN + + @classmethod + def Configure(cls, dry_run): + cls.DRY_RUN = dry_run + + def RunCommand(self, + cmd, + machine=None, + username=None, + command_terminator=None, + command_timeout=None): + cmd = str(cmd) + + if self._dry_run: + return 0 + + if not command_terminator: + command_terminator = CommandTerminator() + + if command_terminator.IsTerminated(): + self._logger.warning('Command has been already terminated!') + return 1 + + # Rewrite command for remote execution. + if machine: + if username: + login = '%s@%s' % (username, machine) + else: + login = machine + + self._logger.debug("Executing '%s' on %s.", cmd, login) + + # FIXME(asharif): Remove this after crosbug.com/33007 is fixed. + cmd = "ssh -t -t %s -- '%s'" % (login, cmd) + else: + self._logger.debug("Executing: '%s'.", cmd) + + child = self._SpawnProcess(cmd, command_terminator, command_timeout) + + self._logger.debug('{PID: %d} Finished with %d code.', child.pid, + child.returncode) + + return child.returncode + + def _Terminate(self, child, command_timeout, wait_timeout=10): + """Gracefully shutdown the child by sending SIGTERM.""" + + if command_timeout: + self._logger.warning('{PID: %d} Timeout of %s seconds reached since ' + 'process started.', child.pid, command_timeout) + + self._logger.warning('{PID: %d} Terminating child.', child.pid) + + try: + child.terminate() + except OSError: + pass + + wait_started = time.time() + + while not child.poll(): + if time.time() - wait_started >= wait_timeout: + break + time.sleep(0.1) + + return child.poll() + + def _Kill(self, child): + """Kill the child with immediate result.""" + self._logger.warning('{PID: %d} Process still alive.', child.pid) + self._logger.warning('{PID: %d} Killing child.', child.pid) + child.kill() + child.wait() + + def _SpawnProcess(self, cmd, command_terminator, command_timeout): + # Create a child process executing provided command. + child = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + shell=True) + + # Close stdin so the child won't be able to block on read. + child.stdin.close() + + started_time = time.time() + + # Watch for data on process stdout, stderr. + pipes = [child.stdout, child.stderr] + + # Put pipes into non-blocking mode. + for pipe in pipes: + fd = pipe.fileno() + fd_flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fd_flags | os.O_NONBLOCK) + + already_terminated = False + + while pipes: + # Maybe timeout reached? + if command_timeout and time.time() - started_time > command_timeout: + command_terminator.Terminate() + + # Check if terminate request was received. + if command_terminator.IsTerminated() and not already_terminated: + if not self._Terminate(child, command_timeout): + self._Kill(child) + # Don't exit the loop immediately. Firstly try to read everything that + # was left on stdout and stderr. + already_terminated = True + + # Wait for pipes to become ready. + ready_pipes, _, _ = select.select(pipes, [], [], 0.1) + + # Handle file descriptors ready to be read. + for pipe in ready_pipes: + fd = pipe.fileno() + + data = os.read(fd, 4096) + + # check for end-of-file + if not data: + pipes.remove(pipe) + continue + + # read all data that's available + while data: + if pipe == child.stdout: + self.DataReceivedOnOutput(data) + elif pipe == child.stderr: + self.DataReceivedOnError(data) + + try: + data = os.read(fd, 4096) + except OSError: + # terminate loop if EWOULDBLOCK (EAGAIN) is received + data = '' + + if not already_terminated: + self._logger.debug('Waiting for command to finish.') + child.wait() + + return child + + def DataReceivedOnOutput(self, data): + """Invoked when the child process wrote data to stdout.""" + sys.stdout.write(data) + + def DataReceivedOnError(self, data): + """Invoked when the child process wrote data to stderr.""" + sys.stderr.write(data) + + +class LoggingCommandExecuter(CommandExecuter): + + def __init__(self, *args, **kwargs): + super(LoggingCommandExecuter, self).__init__(*args, **kwargs) + + # Create a logger for command's stdout/stderr streams. + self._output = logging.getLogger('%s.%s' % (self._logger.name, 'Output')) + + def OpenLog(self, log_path): + """The messages are going to be saved to gzip compressed file.""" + formatter = logging.Formatter('%(asctime)s %(prefix)s: %(message)s', + '%Y-%m-%d %H:%M:%S') + handler = logger.CompressedFileHandler(log_path, delay=True) + handler.setFormatter(formatter) + self._output.addHandler(handler) + + # Set a flag to prevent log records from being propagated up the logger + # hierarchy tree. We don't want for command output messages to appear in + # the main log. + self._output.propagate = 0 + + def CloseLog(self): + """Remove handlers and reattach the logger to its parent.""" + for handler in list(self._output.handlers): + self._output.removeHandler(handler) + handler.flush() + handler.close() + + self._output.propagate = 1 + + def DataReceivedOnOutput(self, data): + """Invoked when the child process wrote data to stdout.""" + for line in data.splitlines(): + self._output.info(line, extra={'prefix': 'STDOUT'}) + + def DataReceivedOnError(self, data): + """Invoked when the child process wrote data to stderr.""" + for line in data.splitlines(): + self._output.warning(line, extra={'prefix': 'STDERR'}) + + +class CommandTerminator(object): + + def __init__(self): + self.terminated = False + + def Terminate(self): + self.terminated = True + + def IsTerminated(self): + return self.terminated diff --git a/deprecated/automation/common/command_executer_test.py b/deprecated/automation/common/command_executer_test.py new file mode 100755 index 00000000..2caaa146 --- /dev/null +++ b/deprecated/automation/common/command_executer_test.py @@ -0,0 +1,210 @@ +#!/usr/bin/python2 +# +# Copyright 2011 Google Inc. All Rights Reserved. +# + +__author__ = 'kbaclawski@google.com (Krystian Baclawski)' + +import cStringIO +import logging +import os +import signal +import socket +import sys +import time +import unittest + + +def AddScriptDirToPath(): + """Required for remote python script execution.""" + path = os.path.abspath(__file__) + + for _ in range(3): + path, _ = os.path.split(path) + + if not path in sys.path: + sys.path.append(path) + + +AddScriptDirToPath() + +from automation.common.command_executer import CommandExecuter + + +class LoggerMock(object): + + def LogCmd(self, cmd, machine='', user=''): + if machine: + logging.info('[%s] Executing: %s', machine, cmd) + else: + logging.info('Executing: %s', cmd) + + def LogError(self, msg): + logging.error(msg) + + def LogWarning(self, msg): + logging.warning(msg) + + def LogOutput(self, msg): + logging.info(msg) + + +class CommandExecuterUnderTest(CommandExecuter): + + def __init__(self): + CommandExecuter.__init__(self, logger_to_set=LoggerMock()) + + # We will record stdout and stderr. + self._stderr = cStringIO.StringIO() + self._stdout = cStringIO.StringIO() + + @property + def stdout(self): + return self._stdout.getvalue() + + @property + def stderr(self): + return self._stderr.getvalue() + + def DataReceivedOnOutput(self, data): + self._stdout.write(data) + + def DataReceivedOnError(self, data): + self._stderr.write(data) + + +class CommandExecuterLocalTests(unittest.TestCase): + HOSTNAME = None + + def setUp(self): + self._executer = CommandExecuterUnderTest() + + def tearDown(self): + pass + + def RunCommand(self, method, **kwargs): + program = os.path.abspath(sys.argv[0]) + + return self._executer.RunCommand('%s runHelper %s' % (program, method), + machine=self.HOSTNAME, + **kwargs) + + def testCommandTimeout(self): + exit_code = self.RunCommand('SleepForMinute', command_timeout=3) + + self.assertTrue(-exit_code in [signal.SIGTERM, signal.SIGKILL], + 'Invalid exit code: %d' % exit_code) + + def testCommandTimeoutIfSigTermIgnored(self): + exit_code = self.RunCommand('IgnoreSigTerm', command_timeout=3) + + self.assertTrue(-exit_code in [signal.SIGTERM, signal.SIGKILL]) + + def testCommandSucceeded(self): + self.assertFalse(self.RunCommand('ReturnTrue')) + + def testCommandFailed(self): + self.assertTrue(self.RunCommand('ReturnFalse')) + + def testStringOnOutputStream(self): + self.assertFalse(self.RunCommand('EchoToOutputStream')) + self.assertEquals(self._executer.stderr, '') + self.assertEquals(self._executer.stdout, 'test') + + def testStringOnErrorStream(self): + self.assertFalse(self.RunCommand('EchoToErrorStream')) + self.assertEquals(self._executer.stderr, 'test') + self.assertEquals(self._executer.stdout, '') + + def testOutputStreamNonInteractive(self): + self.assertFalse( + self.RunCommand('IsOutputStreamInteractive'), + 'stdout stream is a terminal!') + + def testErrorStreamNonInteractive(self): + self.assertFalse( + self.RunCommand('IsErrorStreamInteractive'), + 'stderr stream is a terminal!') + + def testAttemptToRead(self): + self.assertFalse(self.RunCommand('WaitForInput', command_timeout=3)) + + def testInterruptedProcess(self): + self.assertEquals(self.RunCommand('TerminateBySigAbrt'), -signal.SIGABRT) + + +class CommandExecuterRemoteTests(CommandExecuterLocalTests): + HOSTNAME = socket.gethostname() + + def testCommandTimeoutIfSigTermIgnored(self): + exit_code = self.RunCommand('IgnoreSigTerm', command_timeout=6) + + self.assertEquals(exit_code, 255) + + lines = self._executer.stdout.splitlines() + pid = int(lines[0]) + + try: + with open('/proc/%d/cmdline' % pid) as f: + cmdline = f.read() + except IOError: + cmdline = '' + + self.assertFalse('IgnoreSigTerm' in cmdline, 'Process is still alive.') + + +class CommandExecuterTestHelpers(object): + + def SleepForMinute(self): + time.sleep(60) + return 1 + + def ReturnTrue(self): + return 0 + + def ReturnFalse(self): + return 1 + + def EchoToOutputStream(self): + sys.stdout.write('test') + return 0 + + def EchoToErrorStream(self): + sys.stderr.write('test') + return 0 + + def IsOutputStreamInteractive(self): + return sys.stdout.isatty() + + def IsErrorStreamInteractive(self): + return sys.stderr.isatty() + + def IgnoreSigTerm(self): + os.write(1, '%d' % os.getpid()) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + time.sleep(30) + return 0 + + def WaitForInput(self): + try: + # can only read end-of-file marker + return os.read(0, 1) != '' + except OSError: + # that means that stdin descriptor is closed + return 0 + + def TerminateBySigAbrt(self): + os.kill(os.getpid(), signal.SIGABRT) + return 0 + + +if __name__ == '__main__': + FORMAT = '%(asctime)-15s %(levelname)s %(message)s' + logging.basicConfig(format=FORMAT, level=logging.DEBUG) + + if len(sys.argv) > 1: + if sys.argv[1] == 'runHelper': + helpers = CommandExecuterTestHelpers() + sys.exit(getattr(helpers, sys.argv[2])()) + + unittest.main() diff --git a/deprecated/automation/common/events.py b/deprecated/automation/common/events.py new file mode 100644 index 00000000..ad3ec844 --- /dev/null +++ b/deprecated/automation/common/events.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Google Inc. All Rights Reserved. +# +"""Tools for recording and reporting timeline of abstract events. + +You can store any events provided that they can be stringified. +""" + +__author__ = 'kbaclawski@google.com (Krystian Baclawski)' + +import collections +import datetime +import time + + +class _EventRecord(object): + """Internal class. Attaches extra information to an event.""" + + def __init__(self, event, time_started=None, time_elapsed=None): + self._event = event + self._time_started = time_started or time.time() + self._time_elapsed = None + + if time_elapsed: + self.time_elapsed = time_elapsed + + @property + def event(self): + return self._event + + @property + def time_started(self): + return self._time_started + + def _TimeElapsedGet(self): + if self.has_finished: + time_elapsed = self._time_elapsed + else: + time_elapsed = time.time() - self._time_started + + return datetime.timedelta(seconds=time_elapsed) + + def _TimeElapsedSet(self, time_elapsed): + if isinstance(time_elapsed, datetime.timedelta): + self._time_elapsed = time_elapsed.seconds + else: + self._time_elapsed = time_elapsed + + time_elapsed = property(_TimeElapsedGet, _TimeElapsedSet) + + @property + def has_finished(self): + return self._time_elapsed is not None + + def GetTimeStartedFormatted(self): + return time.strftime('%m/%d/%Y %H:%M:%S', time.gmtime(self._time_started)) + + def GetTimeElapsedRounded(self): + return datetime.timedelta(seconds=int(self.time_elapsed.seconds)) + + def Finish(self): + if not self.has_finished: + self._time_elapsed = time.time() - self._time_started + + +class _Transition(collections.namedtuple('_Transition', ('from_', 'to_'))): + """Internal class. Represents transition point between events / states.""" + + def __str__(self): + return '%s => %s' % (self.from_, self.to_) + + +class EventHistory(collections.Sequence): + """Records events and provides human readable events timeline.""" + + def __init__(self, records=None): + self._records = records or [] + + def __len__(self): + return len(self._records) + + def __iter__(self): + return iter(self._records) + + def __getitem__(self, index): + return self._records[index] + + @property + def last(self): + if self._records: + return self._records[-1] + + def AddEvent(self, event): + if self.last: + self.last.Finish() + + evrec = _EventRecord(event) + self._records.append(evrec) + return evrec + + def GetTotalTime(self): + if self._records: + total_time_elapsed = sum(evrec.time_elapsed.seconds + for evrec in self._records) + + return datetime.timedelta(seconds=int(total_time_elapsed)) + + def GetTransitionEventHistory(self): + records = [] + + if self._records: + for num, next_evrec in enumerate(self._records[1:], start=1): + evrec = self._records[num - 1] + + records.append(_EventRecord( + _Transition(evrec.event, next_evrec.event), evrec.time_started, + evrec.time_elapsed)) + + if not self.last.has_finished: + records.append(_EventRecord( + _Transition(self.last.event, + 'NOW'), self.last.time_started, self.last.time_elapsed)) + + return EventHistory(records) + + @staticmethod + def _GetReport(history, report_name): + report = [report_name] + + for num, evrec in enumerate(history, start=1): + time_elapsed = str(evrec.GetTimeElapsedRounded()) + + if not evrec.has_finished: + time_elapsed.append(' (not finished)') + + report.append('%d) %s: %s: %s' % (num, evrec.GetTimeStartedFormatted(), + evrec.event, time_elapsed)) + + report.append('Total Time: %s' % history.GetTotalTime()) + + return '\n'.join(report) + + def GetEventReport(self): + return EventHistory._GetReport(self, 'Timeline of events:') + + def GetTransitionEventReport(self): + return EventHistory._GetReport(self.GetTransitionEventHistory(), + 'Timeline of transition events:') diff --git a/deprecated/automation/common/job.py b/deprecated/automation/common/job.py new file mode 100644 index 00000000..e845ab25 --- /dev/null +++ b/deprecated/automation/common/job.py @@ -0,0 +1,178 @@ +# Copyright 2010 Google Inc. All Rights Reserved. +# +"""A module for a job in the infrastructure.""" + +__author__ = 'raymes@google.com (Raymes Khoury)' + +import os.path + +from automation.common import state_machine + +STATUS_NOT_EXECUTED = 'NOT_EXECUTED' +STATUS_SETUP = 'SETUP' +STATUS_COPYING = 'COPYING' +STATUS_RUNNING = 'RUNNING' +STATUS_SUCCEEDED = 'SUCCEEDED' +STATUS_FAILED = 'FAILED' + + +class FolderDependency(object): + + def __init__(self, job, src, dest=None): + if not dest: + dest = src + + # TODO(kbaclawski): rename to producer + self.job = job + self.src = src + self.dest = dest + + @property + def read_only(self): + return self.dest == self.src + + +class JobStateMachine(state_machine.BasicStateMachine): + state_machine = { + STATUS_NOT_EXECUTED: [STATUS_SETUP], + STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED], + STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED], + STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED] + } + + final_states = [STATUS_SUCCEEDED, STATUS_FAILED] + + +class JobFailure(Exception): + + def __init__(self, message, exit_code): + Exception.__init__(self, message) + self.exit_code = exit_code + + +class Job(object): + """A class representing a job whose commands will be executed.""" + + WORKDIR_PREFIX = '/usr/local/google/tmp/automation' + + def __init__(self, label, command, timeout=4 * 60 * 60): + self._state = JobStateMachine(STATUS_NOT_EXECUTED) + self.predecessors = set() + self.successors = set() + self.machine_dependencies = [] + self.folder_dependencies = [] + self.id = 0 + self.machines = [] + self.command = command + self._has_primary_machine_spec = False + self.group = None + self.dry_run = None + self.label = label + self.timeout = timeout + + def _StateGet(self): + return self._state + + def _StateSet(self, new_state): + self._state.Change(new_state) + + status = property(_StateGet, _StateSet) + + @property + def timeline(self): + return self._state.timeline + + def __repr__(self): + return '{%s: %s}' % (self.__class__.__name__, self.id) + + def __str__(self): + res = [] + res.append('%d' % self.id) + res.append('Predecessors:') + res.extend(['%d' % pred.id for pred in self.predecessors]) + res.append('Successors:') + res.extend(['%d' % succ.id for succ in self.successors]) + res.append('Machines:') + res.extend(['%s' % machine for machine in self.machines]) + res.append(self.PrettyFormatCommand()) + res.append('%s' % self.status) + res.append(self.timeline.GetTransitionEventReport()) + return '\n'.join(res) + + @staticmethod + def _FormatCommand(cmd, substitutions): + for pattern, replacement in substitutions: + cmd = cmd.replace(pattern, replacement) + + return cmd + + def GetCommand(self): + substitutions = [ + ('$JOB_ID', str(self.id)), ('$JOB_TMP', self.work_dir), + ('$JOB_HOME', self.home_dir), + ('$PRIMARY_MACHINE', self.primary_machine.hostname) + ] + + if len(self.machines) > 1: + for num, machine in enumerate(self.machines[1:]): + substitutions.append(('$SECONDARY_MACHINES[%d]' % num, machine.hostname + )) + + return self._FormatCommand(str(self.command), substitutions) + + def PrettyFormatCommand(self): + # TODO(kbaclawski): This method doesn't belong here, but rather to + # non existing Command class. If one is created then PrettyFormatCommand + # shall become its method. + return self._FormatCommand(self.GetCommand(), [ + ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n') + ]) + + def DependsOnFolder(self, dependency): + self.folder_dependencies.append(dependency) + self.DependsOn(dependency.job) + + @property + def results_dir(self): + return os.path.join(self.work_dir, 'results') + + @property + def logs_dir(self): + return os.path.join(self.home_dir, 'logs') + + @property + def log_filename_prefix(self): + return 'job-%d.log' % self.id + + @property + def work_dir(self): + return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id) + + @property + def home_dir(self): + return os.path.join(self.group.home_dir, 'job-%d' % self.id) + + @property + def primary_machine(self): + return self.machines[0] + + def DependsOn(self, job): + """Specifies Jobs to be finished before this job can be launched.""" + self.predecessors.add(job) + job.successors.add(self) + + @property + def is_ready(self): + """Check that all our dependencies have been executed.""" + return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors) + + def DependsOnMachine(self, machine_spec, primary=True): + # Job will run on arbitrarily chosen machine specified by + # MachineSpecification class instances passed to this method. + if primary: + if self._has_primary_machine_spec: + raise RuntimeError('Only one primary machine specification allowed.') + self._has_primary_machine_spec = True + self.machine_dependencies.insert(0, machine_spec) + else: + self.machine_dependencies.append(machine_spec) diff --git a/deprecated/automation/common/job_group.py b/deprecated/automation/common/job_group.py new file mode 100644 index 00000000..96912fc1 --- /dev/null +++ b/deprecated/automation/common/job_group.py @@ -0,0 +1,73 @@ +# Copyright 2010 Google Inc. All Rights Reserved. +# + +import getpass +import os + +from automation.common.state_machine import BasicStateMachine + +STATUS_NOT_EXECUTED = 'NOT_EXECUTED' +STATUS_EXECUTING = 'EXECUTING' +STATUS_SUCCEEDED = 'SUCCEEDED' +STATUS_FAILED = 'FAILED' + + +class JobGroupStateMachine(BasicStateMachine): + state_machine = { + STATUS_NOT_EXECUTED: [STATUS_EXECUTING], + STATUS_EXECUTING: [STATUS_SUCCEEDED, STATUS_FAILED] + } + + final_states = [STATUS_SUCCEEDED, STATUS_FAILED] + + +class JobGroup(object): + HOMEDIR_PREFIX = os.path.join('/home', getpass.getuser(), 'www', 'automation') + + def __init__(self, + label, + jobs=None, + cleanup_on_completion=True, + cleanup_on_failure=False, + description=''): + self._state = JobGroupStateMachine(STATUS_NOT_EXECUTED) + self.id = 0 + self.label = label + self.jobs = [] + self.cleanup_on_completion = cleanup_on_completion + self.cleanup_on_failure = cleanup_on_failure + self.description = description + + if jobs: + for job in jobs: + self.AddJob(job) + + def _StateGet(self): + return self._state + + def _StateSet(self, new_state): + self._state.Change(new_state) + + status = property(_StateGet, _StateSet) + + @property + def home_dir(self): + return os.path.join(self.HOMEDIR_PREFIX, 'job-group-%d' % self.id) + + @property + def time_submitted(self): + try: + return self.status.timeline[1].time_started + except IndexError: + return None + + def __repr__(self): + return '{%s: %s}' % (self.__class__.__name__, self.id) + + def __str__(self): + return '\n'.join(['Job-Group:', 'ID: %s' % self.id] + [str( + job) for job in self.jobs]) + + def AddJob(self, job): + self.jobs.append(job) + job.group = self diff --git a/deprecated/automation/common/logger.py b/deprecated/automation/common/logger.py new file mode 100644 index 00000000..4aeee052 --- /dev/null +++ b/deprecated/automation/common/logger.py @@ -0,0 +1,144 @@ +# Copyright 2010 Google Inc. All Rights Reserved. + +from itertools import chain +import gzip +import logging +import logging.handlers +import time +import traceback + + +def SetUpRootLogger(filename=None, level=None, display_flags={}): + console_handler = logging.StreamHandler() + console_handler.setFormatter(CustomFormatter(AnsiColorCoder(), display_flags)) + logging.root.addHandler(console_handler) + + if filename: + file_handler = logging.handlers.RotatingFileHandler( + filename, + maxBytes=10 * 1024 * 1024, + backupCount=9, + delay=True) + file_handler.setFormatter(CustomFormatter(NullColorCoder(), display_flags)) + logging.root.addHandler(file_handler) + + if level: + logging.root.setLevel(level) + + +class NullColorCoder(object): + + def __call__(self, *args): + return '' + + +class AnsiColorCoder(object): + CODES = {'reset': (0,), + 'bold': (1, 22), + 'italics': (3, 23), + 'underline': (4, 24), + 'inverse': (7, 27), + 'strikethrough': (9, 29), + 'black': (30, 40), + 'red': (31, 41), + 'green': (32, 42), + 'yellow': (33, 43), + 'blue': (34, 44), + 'magenta': (35, 45), + 'cyan': (36, 46), + 'white': (37, 47)} + + def __call__(self, *args): + codes = [] + + for arg in args: + if arg.startswith('bg-') or arg.startswith('no-'): + codes.append(self.CODES[arg[3:]][1]) + else: + codes.append(self.CODES[arg][0]) + + return '\033[%sm' % ';'.join(map(str, codes)) + + +class CustomFormatter(logging.Formatter): + COLORS = {'DEBUG': ('white',), + 'INFO': ('green',), + 'WARN': ('yellow', 'bold'), + 'ERROR': ('red', 'bold'), + 'CRIT': ('red', 'inverse', 'bold')} + + def __init__(self, coder, display_flags={}): + items = [] + + if display_flags.get('datetime', True): + items.append('%(asctime)s') + if display_flags.get('level', True): + items.append('%(levelname)s') + if display_flags.get('name', True): + items.append(coder('cyan') + '[%(threadName)s:%(name)s]' + coder('reset')) + items.append('%(prefix)s%(message)s') + + logging.Formatter.__init__(self, fmt=' '.join(items)) + + self._coder = coder + + def formatTime(self, record): + ct = self.converter(record.created) + t = time.strftime('%Y-%m-%d %H:%M:%S', ct) + return '%s.%02d' % (t, record.msecs / 10) + + def formatLevelName(self, record): + if record.levelname in ['WARNING', 'CRITICAL']: + levelname = record.levelname[:4] + else: + levelname = record.levelname + + return ''.join([self._coder(*self.COLORS[levelname]), levelname, + self._coder('reset')]) + + def formatMessagePrefix(self, record): + try: + return ' %s%s:%s ' % (self._coder('black', 'bold'), record.prefix, + self._coder('reset')) + except AttributeError: + return '' + + def format(self, record): + if record.exc_info: + if not record.exc_text: + record.exc_text = self.formatException(record.exc_info) + else: + record.exc_text = '' + + fmt = record.__dict__.copy() + fmt.update({'levelname': self.formatLevelName(record), + 'asctime': self.formatTime(record), + 'prefix': self.formatMessagePrefix(record)}) + + s = [] + + for line in chain(record.getMessage().splitlines(), + record.exc_text.splitlines()): + fmt['message'] = line + + s.append(self._fmt % fmt) + + return '\n'.join(s) + + +class CompressedFileHandler(logging.FileHandler): + + def _open(self): + return gzip.open(self.baseFilename + '.gz', self.mode, 9) + + +def HandleUncaughtExceptions(fun): + """Catches all exceptions that would go outside decorated fun scope.""" + + def _Interceptor(*args, **kwargs): + try: + return fun(*args, **kwargs) + except StandardError: + logging.exception('Uncaught exception:') + + return _Interceptor diff --git a/deprecated/automation/common/machine.py b/deprecated/automation/common/machine.py new file mode 100644 index 00000000..4db0db0d --- /dev/null +++ b/deprecated/automation/common/machine.py @@ -0,0 +1,70 @@ +# Copyright 2010 Google Inc. All Rights Reserved. + +__author__ = 'asharif@google.com (Ahmad Sharif)' + +from fnmatch import fnmatch + + +class Machine(object): + """Stores information related to machine and its state.""" + + def __init__(self, hostname, label, cpu, cores, os, username): + self.hostname = hostname + self.label = label + self.cpu = cpu + self.cores = cores + self.os = os + self.username = username + + # MachineManager related attributes. + self.uses = 0 + self.locked = False + + def Acquire(self, exclusively): + assert not self.locked + + if exclusively: + self.locked = True + self.uses += 1 + + def Release(self): + assert self.uses > 0 + + self.uses -= 1 + + if not self.uses: + self.locked = False + + def __repr__(self): + return '{%s: %s@%s}' % (self.__class__.__name__, self.username, + self.hostname) + + def __str__(self): + return '\n'.join( + ['Machine Information:', 'Hostname: %s' % self.hostname, 'Label: %s' % + self.label, 'CPU: %s' % self.cpu, 'Cores: %d' % self.cores, 'OS: %s' % + self.os, 'Uses: %d' % self.uses, 'Locked: %s' % self.locked]) + + +class MachineSpecification(object): + """Helper class used to find a machine matching your requirements.""" + + def __init__(self, hostname='*', label='*', os='*', lock_required=False): + self.hostname = hostname + self.label = label + self.os = os + self.lock_required = lock_required + self.preferred_machines = [] + + def __str__(self): + return '\n'.join(['Machine Specification:', 'Name: %s' % self.name, 'OS: %s' + % self.os, 'Lock required: %s' % self.lock_required]) + + def IsMatch(self, machine): + return all([not machine.locked, fnmatch(machine.hostname, self.hostname), + fnmatch(machine.label, self.label), fnmatch(machine.os, + self.os)]) + + def AddPreferredMachine(self, hostname): + if hostname not in self.preferred_machines: + self.preferred_machines.append(hostname) diff --git a/deprecated/automation/common/machine_test.py b/deprecated/automation/common/machine_test.py new file mode 100755 index 00000000..f66299f5 --- /dev/null +++ b/deprecated/automation/common/machine_test.py @@ -0,0 +1,26 @@ +#!/usr/bin/python2 +# +# Copyright 2010 Google Inc. All Rights Reserved. +"""Machine manager unittest. + +MachineManagerTest tests MachineManager. +""" + +__author__ = 'asharif@google.com (Ahmad Sharif)' + +import machine +import unittest + + +class MachineTest(unittest.TestCase): + + def setUp(self): + pass + + def testPrintMachine(self): + mach = machine.Machine('ahmad.mtv', 'core2duo', 4, 'linux', 'asharif') + self.assertTrue('ahmad.mtv' in str(mach)) + + +if __name__ == '__main__': + unittest.main() diff --git a/deprecated/automation/common/state_machine.py b/deprecated/automation/common/state_machine.py new file mode 100644 index 00000000..d1cf42c8 --- /dev/null +++ b/deprecated/automation/common/state_machine.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2011 Google Inc. All Rights Reserved. +# + +__author__ = 'kbaclawski@google.com (Krystian Baclawski)' + +from automation.common import events + + +class BasicStateMachine(object): + """Generic class for constructing state machines. + + Keeps all states and possible transition of a state machine. Ensures that + transition between two states is always valid. Also stores transition events + in a timeline object. + """ + state_machine = {} + final_states = [] + + def __init__(self, initial_state): + assert initial_state in self.state_machine,\ + 'Initial state does not belong to this state machine' + + self._state = initial_state + + self.timeline = events.EventHistory() + self.timeline.AddEvent(self._state) + + def __str__(self): + return self._state + + def __eq__(self, value): + if isinstance(value, BasicStateMachine): + value = str(value) + + return self._state == value + + def __ne__(self, value): + return not self == value + + def _TransitionAllowed(self, to_state): + return to_state in self.state_machine.get(self._state, []) + + def Change(self, new_state): + assert self._TransitionAllowed(new_state),\ + 'Transition from %s to %s not possible' % (self._state, new_state) + + self._state = new_state + + self.timeline.AddEvent(self._state) + + if self._state in self.final_states: + self.timeline.last.Finish() |