diff options
Diffstat (limited to 'automation/common/job.py')
-rw-r--r-- | automation/common/job.py | 178 |
1 files changed, 178 insertions, 0 deletions
diff --git a/automation/common/job.py b/automation/common/job.py new file mode 100644 index 00000000..e845ab25 --- /dev/null +++ b/automation/common/job.py @@ -0,0 +1,178 @@ +# Copyright 2010 Google Inc. All Rights Reserved. +# +"""A module for a job in the infrastructure.""" + +__author__ = 'raymes@google.com (Raymes Khoury)' + +import os.path + +from automation.common import state_machine + +STATUS_NOT_EXECUTED = 'NOT_EXECUTED' +STATUS_SETUP = 'SETUP' +STATUS_COPYING = 'COPYING' +STATUS_RUNNING = 'RUNNING' +STATUS_SUCCEEDED = 'SUCCEEDED' +STATUS_FAILED = 'FAILED' + + +class FolderDependency(object): + + def __init__(self, job, src, dest=None): + if not dest: + dest = src + + # TODO(kbaclawski): rename to producer + self.job = job + self.src = src + self.dest = dest + + @property + def read_only(self): + return self.dest == self.src + + +class JobStateMachine(state_machine.BasicStateMachine): + state_machine = { + STATUS_NOT_EXECUTED: [STATUS_SETUP], + STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED], + STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED], + STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED] + } + + final_states = [STATUS_SUCCEEDED, STATUS_FAILED] + + +class JobFailure(Exception): + + def __init__(self, message, exit_code): + Exception.__init__(self, message) + self.exit_code = exit_code + + +class Job(object): + """A class representing a job whose commands will be executed.""" + + WORKDIR_PREFIX = '/usr/local/google/tmp/automation' + + def __init__(self, label, command, timeout=4 * 60 * 60): + self._state = JobStateMachine(STATUS_NOT_EXECUTED) + self.predecessors = set() + self.successors = set() + self.machine_dependencies = [] + self.folder_dependencies = [] + self.id = 0 + self.machines = [] + self.command = command + self._has_primary_machine_spec = False + self.group = None + self.dry_run = None + self.label = label + self.timeout = timeout + + def _StateGet(self): + return self._state + + def _StateSet(self, new_state): + self._state.Change(new_state) + + status = property(_StateGet, _StateSet) + + @property + def timeline(self): + return self._state.timeline + + def __repr__(self): + return '{%s: %s}' % (self.__class__.__name__, self.id) + + def __str__(self): + res = [] + res.append('%d' % self.id) + res.append('Predecessors:') + res.extend(['%d' % pred.id for pred in self.predecessors]) + res.append('Successors:') + res.extend(['%d' % succ.id for succ in self.successors]) + res.append('Machines:') + res.extend(['%s' % machine for machine in self.machines]) + res.append(self.PrettyFormatCommand()) + res.append('%s' % self.status) + res.append(self.timeline.GetTransitionEventReport()) + return '\n'.join(res) + + @staticmethod + def _FormatCommand(cmd, substitutions): + for pattern, replacement in substitutions: + cmd = cmd.replace(pattern, replacement) + + return cmd + + def GetCommand(self): + substitutions = [ + ('$JOB_ID', str(self.id)), ('$JOB_TMP', self.work_dir), + ('$JOB_HOME', self.home_dir), + ('$PRIMARY_MACHINE', self.primary_machine.hostname) + ] + + if len(self.machines) > 1: + for num, machine in enumerate(self.machines[1:]): + substitutions.append(('$SECONDARY_MACHINES[%d]' % num, machine.hostname + )) + + return self._FormatCommand(str(self.command), substitutions) + + def PrettyFormatCommand(self): + # TODO(kbaclawski): This method doesn't belong here, but rather to + # non existing Command class. If one is created then PrettyFormatCommand + # shall become its method. + return self._FormatCommand(self.GetCommand(), [ + ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n') + ]) + + def DependsOnFolder(self, dependency): + self.folder_dependencies.append(dependency) + self.DependsOn(dependency.job) + + @property + def results_dir(self): + return os.path.join(self.work_dir, 'results') + + @property + def logs_dir(self): + return os.path.join(self.home_dir, 'logs') + + @property + def log_filename_prefix(self): + return 'job-%d.log' % self.id + + @property + def work_dir(self): + return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id) + + @property + def home_dir(self): + return os.path.join(self.group.home_dir, 'job-%d' % self.id) + + @property + def primary_machine(self): + return self.machines[0] + + def DependsOn(self, job): + """Specifies Jobs to be finished before this job can be launched.""" + self.predecessors.add(job) + job.successors.add(self) + + @property + def is_ready(self): + """Check that all our dependencies have been executed.""" + return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors) + + def DependsOnMachine(self, machine_spec, primary=True): + # Job will run on arbitrarily chosen machine specified by + # MachineSpecification class instances passed to this method. + if primary: + if self._has_primary_machine_spec: + raise RuntimeError('Only one primary machine specification allowed.') + self._has_primary_machine_spec = True + self.machine_dependencies.insert(0, machine_spec) + else: + self.machine_dependencies.append(machine_spec) |