aboutsummaryrefslogtreecommitdiff
path: root/automation/common
diff options
context:
space:
mode:
Diffstat (limited to 'automation/common')
-rw-r--r--automation/common/__init__.py1
-rw-r--r--automation/common/command.py241
-rw-r--r--automation/common/command_executer.py230
-rwxr-xr-xautomation/common/command_executer_test.py210
-rw-r--r--automation/common/events.py149
-rw-r--r--automation/common/job.py178
-rw-r--r--automation/common/job_group.py73
-rw-r--r--automation/common/logger.py144
-rw-r--r--automation/common/machine.py70
-rwxr-xr-xautomation/common/machine_test.py26
-rw-r--r--automation/common/state_machine.py54
11 files changed, 1376 insertions, 0 deletions
diff --git a/automation/common/__init__.py b/automation/common/__init__.py
new file mode 100644
index 00000000..8b137891
--- /dev/null
+++ b/automation/common/__init__.py
@@ -0,0 +1 @@
+
diff --git a/automation/common/command.py b/automation/common/command.py
new file mode 100644
index 00000000..c56e9fad
--- /dev/null
+++ b/automation/common/command.py
@@ -0,0 +1,241 @@
+# Copyright 2011 Google Inc. All Rights Reserved.
+
+__author__ = 'kbaclawski@google.com (Krystian Baclawski)'
+
+import abc
+import collections
+import os.path
+
+
+class Shell(object):
+ """Class used to build a string representation of a shell command."""
+
+ def __init__(self, cmd, *args, **kwargs):
+ assert all(key in ['path', 'ignore_error'] for key in kwargs)
+
+ self._cmd = cmd
+ self._args = list(args)
+ self._path = kwargs.get('path', '')
+ self._ignore_error = bool(kwargs.get('ignore_error', False))
+
+ def __str__(self):
+ cmdline = [os.path.join(self._path, self._cmd)]
+ cmdline.extend(self._args)
+
+ cmd = ' '.join(cmdline)
+
+ if self._ignore_error:
+ cmd = '{ %s; true; }' % cmd
+
+ return cmd
+
+ def AddOption(self, option):
+ self._args.append(option)
+
+
+class Wrapper(object):
+ """Wraps a command with environment which gets cleaned up after execution."""
+
+ _counter = 1
+
+ def __init__(self, command, cwd=None, env=None, umask=None):
+ # @param cwd: temporary working directory
+ # @param env: dictionary of environment variables
+ self._command = command
+ self._prefix = Chain()
+ self._suffix = Chain()
+
+ if cwd:
+ self._prefix.append(Shell('pushd', cwd))
+ self._suffix.insert(0, Shell('popd'))
+
+ if env:
+ for env_var, value in env.items():
+ self._prefix.append(Shell('%s=%s' % (env_var, value)))
+ self._suffix.insert(0, Shell('unset', env_var))
+
+ if umask:
+ umask_save_var = 'OLD_UMASK_%d' % self.counter
+
+ self._prefix.append(Shell('%s=$(umask)' % umask_save_var))
+ self._prefix.append(Shell('umask', umask))
+ self._suffix.insert(0, Shell('umask', '$%s' % umask_save_var))
+
+ @property
+ def counter(self):
+ counter = self._counter
+ self._counter += 1
+ return counter
+
+ def __str__(self):
+ return str(Chain(self._prefix, self._command, self._suffix))
+
+
+class AbstractCommandContainer(collections.MutableSequence):
+ """Common base for all classes that behave like command container."""
+
+ def __init__(self, *commands):
+ self._commands = list(commands)
+
+ def __contains__(self, command):
+ return command in self._commands
+
+ def __iter__(self):
+ return iter(self._commands)
+
+ def __len__(self):
+ return len(self._commands)
+
+ def __getitem__(self, index):
+ return self._commands[index]
+
+ def __setitem__(self, index, command):
+ self._commands[index] = self._ValidateCommandType(command)
+
+ def __delitem__(self, index):
+ del self._commands[index]
+
+ def insert(self, index, command):
+ self._commands.insert(index, self._ValidateCommandType(command))
+
+ @abc.abstractmethod
+ def __str__(self):
+ pass
+
+ @abc.abstractproperty
+ def stored_types(self):
+ pass
+
+ def _ValidateCommandType(self, command):
+ if type(command) not in self.stored_types:
+ raise TypeError('Command cannot have %s type.' % type(command))
+ else:
+ return command
+
+ def _StringifyCommands(self):
+ cmds = []
+
+ for cmd in self:
+ if isinstance(cmd, AbstractCommandContainer) and len(cmd) > 1:
+ cmds.append('{ %s; }' % cmd)
+ else:
+ cmds.append(str(cmd))
+
+ return cmds
+
+
+class Chain(AbstractCommandContainer):
+ """Container that chains shell commands using (&&) shell operator."""
+
+ @property
+ def stored_types(self):
+ return [str, Shell, Chain, Pipe]
+
+ def __str__(self):
+ return ' && '.join(self._StringifyCommands())
+
+
+class Pipe(AbstractCommandContainer):
+ """Container that chains shell commands using pipe (|) operator."""
+
+ def __init__(self, *commands, **kwargs):
+ assert all(key in ['input', 'output'] for key in kwargs)
+
+ AbstractCommandContainer.__init__(self, *commands)
+
+ self._input = kwargs.get('input', None)
+ self._output = kwargs.get('output', None)
+
+ @property
+ def stored_types(self):
+ return [str, Shell]
+
+ def __str__(self):
+ pipe = self._StringifyCommands()
+
+ if self._input:
+ pipe.insert(str(Shell('cat', self._input), 0))
+
+ if self._output:
+ pipe.append(str(Shell('tee', self._output)))
+
+ return ' | '.join(pipe)
+
+# TODO(kbaclawski): Unfortunately we don't have any policy describing which
+# directories can or cannot be touched by a job. Thus, I cannot decide how to
+# protect a system against commands that are considered to be dangerous (like
+# RmTree("${HOME}")). AFAIK we'll have to execute some commands with root access
+# (especially for ChromeOS related jobs, which involve chroot-ing), which is
+# even more scary.
+
+
+def Copy(*args, **kwargs):
+ assert all(key in ['to_dir', 'recursive'] for key in kwargs.keys())
+
+ options = []
+
+ if 'to_dir' in kwargs:
+ options.extend(['-t', kwargs['to_dir']])
+
+ if 'recursive' in kwargs:
+ options.append('-r')
+
+ options.extend(args)
+
+ return Shell('cp', *options)
+
+
+def RemoteCopyFrom(from_machine, from_path, to_path, username=None):
+ from_path = os.path.expanduser(from_path) + '/'
+ to_path = os.path.expanduser(to_path) + '/'
+
+ if not username:
+ login = from_machine
+ else:
+ login = '%s@%s' % (username, from_machine)
+
+ return Chain(
+ MakeDir(to_path), Shell('rsync', '-a', '%s:%s' %
+ (login, from_path), to_path))
+
+
+def MakeSymlink(to_path, link_name):
+ return Shell('ln', '-f', '-s', '-T', to_path, link_name)
+
+
+def MakeDir(*dirs, **kwargs):
+ options = ['-p']
+
+ mode = kwargs.get('mode', None)
+
+ if mode:
+ options.extend(['-m', str(mode)])
+
+ options.extend(dirs)
+
+ return Shell('mkdir', *options)
+
+
+def RmTree(*dirs):
+ return Shell('rm', '-r', '-f', *dirs)
+
+
+def UnTar(tar_file, dest_dir):
+ return Chain(
+ MakeDir(dest_dir), Shell('tar', '-x', '-f', tar_file, '-C', dest_dir))
+
+
+def Tar(tar_file, *args):
+ options = ['-c']
+
+ if tar_file.endswith('.tar.bz2'):
+ options.append('-j')
+ elif tar_file.endswith('.tar.gz'):
+ options.append('-z')
+ else:
+ assert tar_file.endswith('.tar')
+
+ options.extend(['-f', tar_file])
+ options.extend(args)
+
+ return Chain(MakeDir(os.path.dirname(tar_file)), Shell('tar', *options))
diff --git a/automation/common/command_executer.py b/automation/common/command_executer.py
new file mode 100644
index 00000000..c0f314f5
--- /dev/null
+++ b/automation/common/command_executer.py
@@ -0,0 +1,230 @@
+# Copyright 2011 Google Inc. All Rights Reserved.
+#
+"""Classes that help running commands in a subshell.
+
+Commands can be run locally, or remotly using SSH connection. You may log the
+output of a command to a terminal or a file, or any other destination.
+"""
+
+__author__ = 'kbaclawski@google.com (Krystian Baclawski)'
+
+import fcntl
+import logging
+import os
+import select
+import subprocess
+import time
+
+from automation.common import logger
+
+
+class CommandExecuter(object):
+ DRY_RUN = False
+
+ def __init__(self, dry_run=False):
+ self._logger = logging.getLogger(self.__class__.__name__)
+ self._dry_run = dry_run or self.DRY_RUN
+
+ @classmethod
+ def Configure(cls, dry_run):
+ cls.DRY_RUN = dry_run
+
+ def RunCommand(self,
+ cmd,
+ machine=None,
+ username=None,
+ command_terminator=None,
+ command_timeout=None):
+ cmd = str(cmd)
+
+ if self._dry_run:
+ return 0
+
+ if not command_terminator:
+ command_terminator = CommandTerminator()
+
+ if command_terminator.IsTerminated():
+ self._logger.warning('Command has been already terminated!')
+ return 1
+
+ # Rewrite command for remote execution.
+ if machine:
+ if username:
+ login = '%s@%s' % (username, machine)
+ else:
+ login = machine
+
+ self._logger.debug("Executing '%s' on %s.", cmd, login)
+
+ # FIXME(asharif): Remove this after crosbug.com/33007 is fixed.
+ cmd = "ssh -t -t %s -- '%s'" % (login, cmd)
+ else:
+ self._logger.debug("Executing: '%s'.", cmd)
+
+ child = self._SpawnProcess(cmd, command_terminator, command_timeout)
+
+ self._logger.debug('{PID: %d} Finished with %d code.', child.pid,
+ child.returncode)
+
+ return child.returncode
+
+ def _Terminate(self, child, command_timeout, wait_timeout=10):
+ """Gracefully shutdown the child by sending SIGTERM."""
+
+ if command_timeout:
+ self._logger.warning('{PID: %d} Timeout of %s seconds reached since '
+ 'process started.', child.pid, command_timeout)
+
+ self._logger.warning('{PID: %d} Terminating child.', child.pid)
+
+ try:
+ child.terminate()
+ except OSError:
+ pass
+
+ wait_started = time.time()
+
+ while not child.poll():
+ if time.time() - wait_started >= wait_timeout:
+ break
+ time.sleep(0.1)
+
+ return child.poll()
+
+ def _Kill(self, child):
+ """Kill the child with immediate result."""
+ self._logger.warning('{PID: %d} Process still alive.', child.pid)
+ self._logger.warning('{PID: %d} Killing child.', child.pid)
+ child.kill()
+ child.wait()
+
+ def _SpawnProcess(self, cmd, command_terminator, command_timeout):
+ # Create a child process executing provided command.
+ child = subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ shell=True)
+
+ # Close stdin so the child won't be able to block on read.
+ child.stdin.close()
+
+ started_time = time.time()
+
+ # Watch for data on process stdout, stderr.
+ pipes = [child.stdout, child.stderr]
+
+ # Put pipes into non-blocking mode.
+ for pipe in pipes:
+ fd = pipe.fileno()
+ fd_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, fd_flags | os.O_NONBLOCK)
+
+ already_terminated = False
+
+ while pipes:
+ # Maybe timeout reached?
+ if command_timeout and time.time() - started_time > command_timeout:
+ command_terminator.Terminate()
+
+ # Check if terminate request was received.
+ if command_terminator.IsTerminated() and not already_terminated:
+ if not self._Terminate(child, command_timeout):
+ self._Kill(child)
+ # Don't exit the loop immediately. Firstly try to read everything that
+ # was left on stdout and stderr.
+ already_terminated = True
+
+ # Wait for pipes to become ready.
+ ready_pipes, _, _ = select.select(pipes, [], [], 0.1)
+
+ # Handle file descriptors ready to be read.
+ for pipe in ready_pipes:
+ fd = pipe.fileno()
+
+ data = os.read(fd, 4096)
+
+ # check for end-of-file
+ if not data:
+ pipes.remove(pipe)
+ continue
+
+ # read all data that's available
+ while data:
+ if pipe == child.stdout:
+ self.DataReceivedOnOutput(data)
+ elif pipe == child.stderr:
+ self.DataReceivedOnError(data)
+
+ try:
+ data = os.read(fd, 4096)
+ except OSError:
+ # terminate loop if EWOULDBLOCK (EAGAIN) is received
+ data = ''
+
+ if not already_terminated:
+ self._logger.debug('Waiting for command to finish.')
+ child.wait()
+
+ return child
+
+ def DataReceivedOnOutput(self, data):
+ """Invoked when the child process wrote data to stdout."""
+ sys.stdout.write(data)
+
+ def DataReceivedOnError(self, data):
+ """Invoked when the child process wrote data to stderr."""
+ sys.stderr.write(data)
+
+
+class LoggingCommandExecuter(CommandExecuter):
+
+ def __init__(self, *args, **kwargs):
+ super(LoggingCommandExecuter, self).__init__(*args, **kwargs)
+
+ # Create a logger for command's stdout/stderr streams.
+ self._output = logging.getLogger('%s.%s' % (self._logger.name, 'Output'))
+
+ def OpenLog(self, log_path):
+ """The messages are going to be saved to gzip compressed file."""
+ formatter = logging.Formatter('%(asctime)s %(prefix)s: %(message)s',
+ '%Y-%m-%d %H:%M:%S')
+ handler = logger.CompressedFileHandler(log_path, delay=True)
+ handler.setFormatter(formatter)
+ self._output.addHandler(handler)
+
+ # Set a flag to prevent log records from being propagated up the logger
+ # hierarchy tree. We don't want for command output messages to appear in
+ # the main log.
+ self._output.propagate = 0
+
+ def CloseLog(self):
+ """Remove handlers and reattach the logger to its parent."""
+ for handler in list(self._output.handlers):
+ self._output.removeHandler(handler)
+ handler.flush()
+ handler.close()
+
+ self._output.propagate = 1
+
+ def DataReceivedOnOutput(self, data):
+ """Invoked when the child process wrote data to stdout."""
+ for line in data.splitlines():
+ self._output.info(line, extra={'prefix': 'STDOUT'})
+
+ def DataReceivedOnError(self, data):
+ """Invoked when the child process wrote data to stderr."""
+ for line in data.splitlines():
+ self._output.warning(line, extra={'prefix': 'STDERR'})
+
+
+class CommandTerminator(object):
+
+ def __init__(self):
+ self.terminated = False
+
+ def Terminate(self):
+ self.terminated = True
+
+ def IsTerminated(self):
+ return self.terminated
diff --git a/automation/common/command_executer_test.py b/automation/common/command_executer_test.py
new file mode 100755
index 00000000..4aa245f0
--- /dev/null
+++ b/automation/common/command_executer_test.py
@@ -0,0 +1,210 @@
+#!/usr/bin/python
+#
+# Copyright 2011 Google Inc. All Rights Reserved.
+#
+
+__author__ = 'kbaclawski@google.com (Krystian Baclawski)'
+
+import cStringIO
+import logging
+import os
+import signal
+import socket
+import sys
+import time
+import unittest
+
+
+def AddScriptDirToPath():
+ """Required for remote python script execution."""
+ path = os.path.abspath(__file__)
+
+ for _ in range(3):
+ path, _ = os.path.split(path)
+
+ if not path in sys.path:
+ sys.path.append(path)
+
+
+AddScriptDirToPath()
+
+from automation.common.command_executer import CommandExecuter
+
+
+class LoggerMock(object):
+
+ def LogCmd(self, cmd, machine='', user=''):
+ if machine:
+ logging.info('[%s] Executing: %s', machine, cmd)
+ else:
+ logging.info('Executing: %s', cmd)
+
+ def LogError(self, msg):
+ logging.error(msg)
+
+ def LogWarning(self, msg):
+ logging.warning(msg)
+
+ def LogOutput(self, msg):
+ logging.info(msg)
+
+
+class CommandExecuterUnderTest(CommandExecuter):
+
+ def __init__(self):
+ CommandExecuter.__init__(self, logger_to_set=LoggerMock())
+
+ # We will record stdout and stderr.
+ self._stderr = cStringIO.StringIO()
+ self._stdout = cStringIO.StringIO()
+
+ @property
+ def stdout(self):
+ return self._stdout.getvalue()
+
+ @property
+ def stderr(self):
+ return self._stderr.getvalue()
+
+ def DataReceivedOnOutput(self, data):
+ self._stdout.write(data)
+
+ def DataReceivedOnError(self, data):
+ self._stderr.write(data)
+
+
+class CommandExecuterLocalTests(unittest.TestCase):
+ HOSTNAME = None
+
+ def setUp(self):
+ self._executer = CommandExecuterUnderTest()
+
+ def tearDown(self):
+ pass
+
+ def RunCommand(self, method, **kwargs):
+ program = os.path.abspath(sys.argv[0])
+
+ return self._executer.RunCommand('%s runHelper %s' % (program, method),
+ machine=self.HOSTNAME,
+ **kwargs)
+
+ def testCommandTimeout(self):
+ exit_code = self.RunCommand('SleepForMinute', command_timeout=3)
+
+ self.assertTrue(-exit_code in [signal.SIGTERM, signal.SIGKILL],
+ 'Invalid exit code: %d' % exit_code)
+
+ def testCommandTimeoutIfSigTermIgnored(self):
+ exit_code = self.RunCommand('IgnoreSigTerm', command_timeout=3)
+
+ self.assertTrue(-exit_code in [signal.SIGTERM, signal.SIGKILL])
+
+ def testCommandSucceeded(self):
+ self.assertFalse(self.RunCommand('ReturnTrue'))
+
+ def testCommandFailed(self):
+ self.assertTrue(self.RunCommand('ReturnFalse'))
+
+ def testStringOnOutputStream(self):
+ self.assertFalse(self.RunCommand('EchoToOutputStream'))
+ self.assertEquals(self._executer.stderr, '')
+ self.assertEquals(self._executer.stdout, 'test')
+
+ def testStringOnErrorStream(self):
+ self.assertFalse(self.RunCommand('EchoToErrorStream'))
+ self.assertEquals(self._executer.stderr, 'test')
+ self.assertEquals(self._executer.stdout, '')
+
+ def testOutputStreamNonInteractive(self):
+ self.assertFalse(
+ self.RunCommand('IsOutputStreamInteractive'),
+ 'stdout stream is a terminal!')
+
+ def testErrorStreamNonInteractive(self):
+ self.assertFalse(
+ self.RunCommand('IsErrorStreamInteractive'),
+ 'stderr stream is a terminal!')
+
+ def testAttemptToRead(self):
+ self.assertFalse(self.RunCommand('WaitForInput', command_timeout=3))
+
+ def testInterruptedProcess(self):
+ self.assertEquals(self.RunCommand('TerminateBySigAbrt'), -signal.SIGABRT)
+
+
+class CommandExecuterRemoteTests(CommandExecuterLocalTests):
+ HOSTNAME = socket.gethostname()
+
+ def testCommandTimeoutIfSigTermIgnored(self):
+ exit_code = self.RunCommand('IgnoreSigTerm', command_timeout=6)
+
+ self.assertEquals(exit_code, 255)
+
+ lines = self._executer.stdout.splitlines()
+ pid = int(lines[0])
+
+ try:
+ with open('/proc/%d/cmdline' % pid) as f:
+ cmdline = f.read()
+ except IOError:
+ cmdline = ''
+
+ self.assertFalse('IgnoreSigTerm' in cmdline, 'Process is still alive.')
+
+
+class CommandExecuterTestHelpers(object):
+
+ def SleepForMinute(self):
+ time.sleep(60)
+ return 1
+
+ def ReturnTrue(self):
+ return 0
+
+ def ReturnFalse(self):
+ return 1
+
+ def EchoToOutputStream(self):
+ sys.stdout.write('test')
+ return 0
+
+ def EchoToErrorStream(self):
+ sys.stderr.write('test')
+ return 0
+
+ def IsOutputStreamInteractive(self):
+ return sys.stdout.isatty()
+
+ def IsErrorStreamInteractive(self):
+ return sys.stderr.isatty()
+
+ def IgnoreSigTerm(self):
+ os.write(1, '%d' % os.getpid())
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ time.sleep(30)
+ return 0
+
+ def WaitForInput(self):
+ try:
+ # can only read end-of-file marker
+ return os.read(0, 1) != ''
+ except OSError:
+ # that means that stdin descriptor is closed
+ return 0
+
+ def TerminateBySigAbrt(self):
+ os.kill(os.getpid(), signal.SIGABRT)
+ return 0
+
+
+if __name__ == '__main__':
+ FORMAT = '%(asctime)-15s %(levelname)s %(message)s'
+ logging.basicConfig(format=FORMAT, level=logging.DEBUG)
+
+ if len(sys.argv) > 1:
+ if sys.argv[1] == 'runHelper':
+ helpers = CommandExecuterTestHelpers()
+ sys.exit(getattr(helpers, sys.argv[2])())
+
+ unittest.main()
diff --git a/automation/common/events.py b/automation/common/events.py
new file mode 100644
index 00000000..ad3ec844
--- /dev/null
+++ b/automation/common/events.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright 2011 Google Inc. All Rights Reserved.
+#
+"""Tools for recording and reporting timeline of abstract events.
+
+You can store any events provided that they can be stringified.
+"""
+
+__author__ = 'kbaclawski@google.com (Krystian Baclawski)'
+
+import collections
+import datetime
+import time
+
+
+class _EventRecord(object):
+ """Internal class. Attaches extra information to an event."""
+
+ def __init__(self, event, time_started=None, time_elapsed=None):
+ self._event = event
+ self._time_started = time_started or time.time()
+ self._time_elapsed = None
+
+ if time_elapsed:
+ self.time_elapsed = time_elapsed
+
+ @property
+ def event(self):
+ return self._event
+
+ @property
+ def time_started(self):
+ return self._time_started
+
+ def _TimeElapsedGet(self):
+ if self.has_finished:
+ time_elapsed = self._time_elapsed
+ else:
+ time_elapsed = time.time() - self._time_started
+
+ return datetime.timedelta(seconds=time_elapsed)
+
+ def _TimeElapsedSet(self, time_elapsed):
+ if isinstance(time_elapsed, datetime.timedelta):
+ self._time_elapsed = time_elapsed.seconds
+ else:
+ self._time_elapsed = time_elapsed
+
+ time_elapsed = property(_TimeElapsedGet, _TimeElapsedSet)
+
+ @property
+ def has_finished(self):
+ return self._time_elapsed is not None
+
+ def GetTimeStartedFormatted(self):
+ return time.strftime('%m/%d/%Y %H:%M:%S', time.gmtime(self._time_started))
+
+ def GetTimeElapsedRounded(self):
+ return datetime.timedelta(seconds=int(self.time_elapsed.seconds))
+
+ def Finish(self):
+ if not self.has_finished:
+ self._time_elapsed = time.time() - self._time_started
+
+
+class _Transition(collections.namedtuple('_Transition', ('from_', 'to_'))):
+ """Internal class. Represents transition point between events / states."""
+
+ def __str__(self):
+ return '%s => %s' % (self.from_, self.to_)
+
+
+class EventHistory(collections.Sequence):
+ """Records events and provides human readable events timeline."""
+
+ def __init__(self, records=None):
+ self._records = records or []
+
+ def __len__(self):
+ return len(self._records)
+
+ def __iter__(self):
+ return iter(self._records)
+
+ def __getitem__(self, index):
+ return self._records[index]
+
+ @property
+ def last(self):
+ if self._records:
+ return self._records[-1]
+
+ def AddEvent(self, event):
+ if self.last:
+ self.last.Finish()
+
+ evrec = _EventRecord(event)
+ self._records.append(evrec)
+ return evrec
+
+ def GetTotalTime(self):
+ if self._records:
+ total_time_elapsed = sum(evrec.time_elapsed.seconds
+ for evrec in self._records)
+
+ return datetime.timedelta(seconds=int(total_time_elapsed))
+
+ def GetTransitionEventHistory(self):
+ records = []
+
+ if self._records:
+ for num, next_evrec in enumerate(self._records[1:], start=1):
+ evrec = self._records[num - 1]
+
+ records.append(_EventRecord(
+ _Transition(evrec.event, next_evrec.event), evrec.time_started,
+ evrec.time_elapsed))
+
+ if not self.last.has_finished:
+ records.append(_EventRecord(
+ _Transition(self.last.event,
+ 'NOW'), self.last.time_started, self.last.time_elapsed))
+
+ return EventHistory(records)
+
+ @staticmethod
+ def _GetReport(history, report_name):
+ report = [report_name]
+
+ for num, evrec in enumerate(history, start=1):
+ time_elapsed = str(evrec.GetTimeElapsedRounded())
+
+ if not evrec.has_finished:
+ time_elapsed.append(' (not finished)')
+
+ report.append('%d) %s: %s: %s' % (num, evrec.GetTimeStartedFormatted(),
+ evrec.event, time_elapsed))
+
+ report.append('Total Time: %s' % history.GetTotalTime())
+
+ return '\n'.join(report)
+
+ def GetEventReport(self):
+ return EventHistory._GetReport(self, 'Timeline of events:')
+
+ def GetTransitionEventReport(self):
+ return EventHistory._GetReport(self.GetTransitionEventHistory(),
+ 'Timeline of transition events:')
diff --git a/automation/common/job.py b/automation/common/job.py
new file mode 100644
index 00000000..e845ab25
--- /dev/null
+++ b/automation/common/job.py
@@ -0,0 +1,178 @@
+# Copyright 2010 Google Inc. All Rights Reserved.
+#
+"""A module for a job in the infrastructure."""
+
+__author__ = 'raymes@google.com (Raymes Khoury)'
+
+import os.path
+
+from automation.common import state_machine
+
+STATUS_NOT_EXECUTED = 'NOT_EXECUTED'
+STATUS_SETUP = 'SETUP'
+STATUS_COPYING = 'COPYING'
+STATUS_RUNNING = 'RUNNING'
+STATUS_SUCCEEDED = 'SUCCEEDED'
+STATUS_FAILED = 'FAILED'
+
+
+class FolderDependency(object):
+
+ def __init__(self, job, src, dest=None):
+ if not dest:
+ dest = src
+
+ # TODO(kbaclawski): rename to producer
+ self.job = job
+ self.src = src
+ self.dest = dest
+
+ @property
+ def read_only(self):
+ return self.dest == self.src
+
+
+class JobStateMachine(state_machine.BasicStateMachine):
+ state_machine = {
+ STATUS_NOT_EXECUTED: [STATUS_SETUP],
+ STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED],
+ STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED],
+ STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED]
+ }
+
+ final_states = [STATUS_SUCCEEDED, STATUS_FAILED]
+
+
+class JobFailure(Exception):
+
+ def __init__(self, message, exit_code):
+ Exception.__init__(self, message)
+ self.exit_code = exit_code
+
+
+class Job(object):
+ """A class representing a job whose commands will be executed."""
+
+ WORKDIR_PREFIX = '/usr/local/google/tmp/automation'
+
+ def __init__(self, label, command, timeout=4 * 60 * 60):
+ self._state = JobStateMachine(STATUS_NOT_EXECUTED)
+ self.predecessors = set()
+ self.successors = set()
+ self.machine_dependencies = []
+ self.folder_dependencies = []
+ self.id = 0
+ self.machines = []
+ self.command = command
+ self._has_primary_machine_spec = False
+ self.group = None
+ self.dry_run = None
+ self.label = label
+ self.timeout = timeout
+
+ def _StateGet(self):
+ return self._state
+
+ def _StateSet(self, new_state):
+ self._state.Change(new_state)
+
+ status = property(_StateGet, _StateSet)
+
+ @property
+ def timeline(self):
+ return self._state.timeline
+
+ def __repr__(self):
+ return '{%s: %s}' % (self.__class__.__name__, self.id)
+
+ def __str__(self):
+ res = []
+ res.append('%d' % self.id)
+ res.append('Predecessors:')
+ res.extend(['%d' % pred.id for pred in self.predecessors])
+ res.append('Successors:')
+ res.extend(['%d' % succ.id for succ in self.successors])
+ res.append('Machines:')
+ res.extend(['%s' % machine for machine in self.machines])
+ res.append(self.PrettyFormatCommand())
+ res.append('%s' % self.status)
+ res.append(self.timeline.GetTransitionEventReport())
+ return '\n'.join(res)
+
+ @staticmethod
+ def _FormatCommand(cmd, substitutions):
+ for pattern, replacement in substitutions:
+ cmd = cmd.replace(pattern, replacement)
+
+ return cmd
+
+ def GetCommand(self):
+ substitutions = [
+ ('$JOB_ID', str(self.id)), ('$JOB_TMP', self.work_dir),
+ ('$JOB_HOME', self.home_dir),
+ ('$PRIMARY_MACHINE', self.primary_machine.hostname)
+ ]
+
+ if len(self.machines) > 1:
+ for num, machine in enumerate(self.machines[1:]):
+ substitutions.append(('$SECONDARY_MACHINES[%d]' % num, machine.hostname
+ ))
+
+ return self._FormatCommand(str(self.command), substitutions)
+
+ def PrettyFormatCommand(self):
+ # TODO(kbaclawski): This method doesn't belong here, but rather to
+ # non existing Command class. If one is created then PrettyFormatCommand
+ # shall become its method.
+ return self._FormatCommand(self.GetCommand(), [
+ ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n')
+ ])
+
+ def DependsOnFolder(self, dependency):
+ self.folder_dependencies.append(dependency)
+ self.DependsOn(dependency.job)
+
+ @property
+ def results_dir(self):
+ return os.path.join(self.work_dir, 'results')
+
+ @property
+ def logs_dir(self):
+ return os.path.join(self.home_dir, 'logs')
+
+ @property
+ def log_filename_prefix(self):
+ return 'job-%d.log' % self.id
+
+ @property
+ def work_dir(self):
+ return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id)
+
+ @property
+ def home_dir(self):
+ return os.path.join(self.group.home_dir, 'job-%d' % self.id)
+
+ @property
+ def primary_machine(self):
+ return self.machines[0]
+
+ def DependsOn(self, job):
+ """Specifies Jobs to be finished before this job can be launched."""
+ self.predecessors.add(job)
+ job.successors.add(self)
+
+ @property
+ def is_ready(self):
+ """Check that all our dependencies have been executed."""
+ return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors)
+
+ def DependsOnMachine(self, machine_spec, primary=True):
+ # Job will run on arbitrarily chosen machine specified by
+ # MachineSpecification class instances passed to this method.
+ if primary:
+ if self._has_primary_machine_spec:
+ raise RuntimeError('Only one primary machine specification allowed.')
+ self._has_primary_machine_spec = True
+ self.machine_dependencies.insert(0, machine_spec)
+ else:
+ self.machine_dependencies.append(machine_spec)
diff --git a/automation/common/job_group.py b/automation/common/job_group.py
new file mode 100644
index 00000000..96912fc1
--- /dev/null
+++ b/automation/common/job_group.py
@@ -0,0 +1,73 @@
+# Copyright 2010 Google Inc. All Rights Reserved.
+#
+
+import getpass
+import os
+
+from automation.common.state_machine import BasicStateMachine
+
+STATUS_NOT_EXECUTED = 'NOT_EXECUTED'
+STATUS_EXECUTING = 'EXECUTING'
+STATUS_SUCCEEDED = 'SUCCEEDED'
+STATUS_FAILED = 'FAILED'
+
+
+class JobGroupStateMachine(BasicStateMachine):
+ state_machine = {
+ STATUS_NOT_EXECUTED: [STATUS_EXECUTING],
+ STATUS_EXECUTING: [STATUS_SUCCEEDED, STATUS_FAILED]
+ }
+
+ final_states = [STATUS_SUCCEEDED, STATUS_FAILED]
+
+
+class JobGroup(object):
+ HOMEDIR_PREFIX = os.path.join('/home', getpass.getuser(), 'www', 'automation')
+
+ def __init__(self,
+ label,
+ jobs=None,
+ cleanup_on_completion=True,
+ cleanup_on_failure=False,
+ description=''):
+ self._state = JobGroupStateMachine(STATUS_NOT_EXECUTED)
+ self.id = 0
+ self.label = label
+ self.jobs = []
+ self.cleanup_on_completion = cleanup_on_completion
+ self.cleanup_on_failure = cleanup_on_failure
+ self.description = description
+
+ if jobs:
+ for job in jobs:
+ self.AddJob(job)
+
+ def _StateGet(self):
+ return self._state
+
+ def _StateSet(self, new_state):
+ self._state.Change(new_state)
+
+ status = property(_StateGet, _StateSet)
+
+ @property
+ def home_dir(self):
+ return os.path.join(self.HOMEDIR_PREFIX, 'job-group-%d' % self.id)
+
+ @property
+ def time_submitted(self):
+ try:
+ return self.status.timeline[1].time_started
+ except IndexError:
+ return None
+
+ def __repr__(self):
+ return '{%s: %s}' % (self.__class__.__name__, self.id)
+
+ def __str__(self):
+ return '\n'.join(['Job-Group:', 'ID: %s' % self.id] + [str(
+ job) for job in self.jobs])
+
+ def AddJob(self, job):
+ self.jobs.append(job)
+ job.group = self
diff --git a/automation/common/logger.py b/automation/common/logger.py
new file mode 100644
index 00000000..4aeee052
--- /dev/null
+++ b/automation/common/logger.py
@@ -0,0 +1,144 @@
+# Copyright 2010 Google Inc. All Rights Reserved.
+
+from itertools import chain
+import gzip
+import logging
+import logging.handlers
+import time
+import traceback
+
+
+def SetUpRootLogger(filename=None, level=None, display_flags={}):
+ console_handler = logging.StreamHandler()
+ console_handler.setFormatter(CustomFormatter(AnsiColorCoder(), display_flags))
+ logging.root.addHandler(console_handler)
+
+ if filename:
+ file_handler = logging.handlers.RotatingFileHandler(
+ filename,
+ maxBytes=10 * 1024 * 1024,
+ backupCount=9,
+ delay=True)
+ file_handler.setFormatter(CustomFormatter(NullColorCoder(), display_flags))
+ logging.root.addHandler(file_handler)
+
+ if level:
+ logging.root.setLevel(level)
+
+
+class NullColorCoder(object):
+
+ def __call__(self, *args):
+ return ''
+
+
+class AnsiColorCoder(object):
+ CODES = {'reset': (0,),
+ 'bold': (1, 22),
+ 'italics': (3, 23),
+ 'underline': (4, 24),
+ 'inverse': (7, 27),
+ 'strikethrough': (9, 29),
+ 'black': (30, 40),
+ 'red': (31, 41),
+ 'green': (32, 42),
+ 'yellow': (33, 43),
+ 'blue': (34, 44),
+ 'magenta': (35, 45),
+ 'cyan': (36, 46),
+ 'white': (37, 47)}
+
+ def __call__(self, *args):
+ codes = []
+
+ for arg in args:
+ if arg.startswith('bg-') or arg.startswith('no-'):
+ codes.append(self.CODES[arg[3:]][1])
+ else:
+ codes.append(self.CODES[arg][0])
+
+ return '\033[%sm' % ';'.join(map(str, codes))
+
+
+class CustomFormatter(logging.Formatter):
+ COLORS = {'DEBUG': ('white',),
+ 'INFO': ('green',),
+ 'WARN': ('yellow', 'bold'),
+ 'ERROR': ('red', 'bold'),
+ 'CRIT': ('red', 'inverse', 'bold')}
+
+ def __init__(self, coder, display_flags={}):
+ items = []
+
+ if display_flags.get('datetime', True):
+ items.append('%(asctime)s')
+ if display_flags.get('level', True):
+ items.append('%(levelname)s')
+ if display_flags.get('name', True):
+ items.append(coder('cyan') + '[%(threadName)s:%(name)s]' + coder('reset'))
+ items.append('%(prefix)s%(message)s')
+
+ logging.Formatter.__init__(self, fmt=' '.join(items))
+
+ self._coder = coder
+
+ def formatTime(self, record):
+ ct = self.converter(record.created)
+ t = time.strftime('%Y-%m-%d %H:%M:%S', ct)
+ return '%s.%02d' % (t, record.msecs / 10)
+
+ def formatLevelName(self, record):
+ if record.levelname in ['WARNING', 'CRITICAL']:
+ levelname = record.levelname[:4]
+ else:
+ levelname = record.levelname
+
+ return ''.join([self._coder(*self.COLORS[levelname]), levelname,
+ self._coder('reset')])
+
+ def formatMessagePrefix(self, record):
+ try:
+ return ' %s%s:%s ' % (self._coder('black', 'bold'), record.prefix,
+ self._coder('reset'))
+ except AttributeError:
+ return ''
+
+ def format(self, record):
+ if record.exc_info:
+ if not record.exc_text:
+ record.exc_text = self.formatException(record.exc_info)
+ else:
+ record.exc_text = ''
+
+ fmt = record.__dict__.copy()
+ fmt.update({'levelname': self.formatLevelName(record),
+ 'asctime': self.formatTime(record),
+ 'prefix': self.formatMessagePrefix(record)})
+
+ s = []
+
+ for line in chain(record.getMessage().splitlines(),
+ record.exc_text.splitlines()):
+ fmt['message'] = line
+
+ s.append(self._fmt % fmt)
+
+ return '\n'.join(s)
+
+
+class CompressedFileHandler(logging.FileHandler):
+
+ def _open(self):
+ return gzip.open(self.baseFilename + '.gz', self.mode, 9)
+
+
+def HandleUncaughtExceptions(fun):
+ """Catches all exceptions that would go outside decorated fun scope."""
+
+ def _Interceptor(*args, **kwargs):
+ try:
+ return fun(*args, **kwargs)
+ except StandardError:
+ logging.exception('Uncaught exception:')
+
+ return _Interceptor
diff --git a/automation/common/machine.py b/automation/common/machine.py
new file mode 100644
index 00000000..4db0db0d
--- /dev/null
+++ b/automation/common/machine.py
@@ -0,0 +1,70 @@
+# Copyright 2010 Google Inc. All Rights Reserved.
+
+__author__ = 'asharif@google.com (Ahmad Sharif)'
+
+from fnmatch import fnmatch
+
+
+class Machine(object):
+ """Stores information related to machine and its state."""
+
+ def __init__(self, hostname, label, cpu, cores, os, username):
+ self.hostname = hostname
+ self.label = label
+ self.cpu = cpu
+ self.cores = cores
+ self.os = os
+ self.username = username
+
+ # MachineManager related attributes.
+ self.uses = 0
+ self.locked = False
+
+ def Acquire(self, exclusively):
+ assert not self.locked
+
+ if exclusively:
+ self.locked = True
+ self.uses += 1
+
+ def Release(self):
+ assert self.uses > 0
+
+ self.uses -= 1
+
+ if not self.uses:
+ self.locked = False
+
+ def __repr__(self):
+ return '{%s: %s@%s}' % (self.__class__.__name__, self.username,
+ self.hostname)
+
+ def __str__(self):
+ return '\n'.join(
+ ['Machine Information:', 'Hostname: %s' % self.hostname, 'Label: %s' %
+ self.label, 'CPU: %s' % self.cpu, 'Cores: %d' % self.cores, 'OS: %s' %
+ self.os, 'Uses: %d' % self.uses, 'Locked: %s' % self.locked])
+
+
+class MachineSpecification(object):
+ """Helper class used to find a machine matching your requirements."""
+
+ def __init__(self, hostname='*', label='*', os='*', lock_required=False):
+ self.hostname = hostname
+ self.label = label
+ self.os = os
+ self.lock_required = lock_required
+ self.preferred_machines = []
+
+ def __str__(self):
+ return '\n'.join(['Machine Specification:', 'Name: %s' % self.name, 'OS: %s'
+ % self.os, 'Lock required: %s' % self.lock_required])
+
+ def IsMatch(self, machine):
+ return all([not machine.locked, fnmatch(machine.hostname, self.hostname),
+ fnmatch(machine.label, self.label), fnmatch(machine.os,
+ self.os)])
+
+ def AddPreferredMachine(self, hostname):
+ if hostname not in self.preferred_machines:
+ self.preferred_machines.append(hostname)
diff --git a/automation/common/machine_test.py b/automation/common/machine_test.py
new file mode 100755
index 00000000..c9c200a9
--- /dev/null
+++ b/automation/common/machine_test.py
@@ -0,0 +1,26 @@
+#!/usr/bin/python
+#
+# Copyright 2010 Google Inc. All Rights Reserved.
+"""Machine manager unittest.
+
+MachineManagerTest tests MachineManager.
+"""
+
+__author__ = 'asharif@google.com (Ahmad Sharif)'
+
+import machine
+import unittest
+
+
+class MachineTest(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def testPrintMachine(self):
+ mach = machine.Machine('ahmad.mtv', 'core2duo', 4, 'linux', 'asharif')
+ self.assertTrue('ahmad.mtv' in str(mach))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/automation/common/state_machine.py b/automation/common/state_machine.py
new file mode 100644
index 00000000..d1cf42c8
--- /dev/null
+++ b/automation/common/state_machine.py
@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright 2011 Google Inc. All Rights Reserved.
+#
+
+__author__ = 'kbaclawski@google.com (Krystian Baclawski)'
+
+from automation.common import events
+
+
+class BasicStateMachine(object):
+ """Generic class for constructing state machines.
+
+ Keeps all states and possible transition of a state machine. Ensures that
+ transition between two states is always valid. Also stores transition events
+ in a timeline object.
+ """
+ state_machine = {}
+ final_states = []
+
+ def __init__(self, initial_state):
+ assert initial_state in self.state_machine,\
+ 'Initial state does not belong to this state machine'
+
+ self._state = initial_state
+
+ self.timeline = events.EventHistory()
+ self.timeline.AddEvent(self._state)
+
+ def __str__(self):
+ return self._state
+
+ def __eq__(self, value):
+ if isinstance(value, BasicStateMachine):
+ value = str(value)
+
+ return self._state == value
+
+ def __ne__(self, value):
+ return not self == value
+
+ def _TransitionAllowed(self, to_state):
+ return to_state in self.state_machine.get(self._state, [])
+
+ def Change(self, new_state):
+ assert self._TransitionAllowed(new_state),\
+ 'Transition from %s to %s not possible' % (self._state, new_state)
+
+ self._state = new_state
+
+ self.timeline.AddEvent(self._state)
+
+ if self._state in self.final_states:
+ self.timeline.last.Finish()