aboutsummaryrefslogtreecommitdiff
path: root/deprecated/automation/common/job.py
diff options
context:
space:
mode:
Diffstat (limited to 'deprecated/automation/common/job.py')
-rw-r--r--deprecated/automation/common/job.py178
1 files changed, 178 insertions, 0 deletions
diff --git a/deprecated/automation/common/job.py b/deprecated/automation/common/job.py
new file mode 100644
index 00000000..e845ab25
--- /dev/null
+++ b/deprecated/automation/common/job.py
@@ -0,0 +1,178 @@
+# Copyright 2010 Google Inc. All Rights Reserved.
+#
+"""A module for a job in the infrastructure."""
+
+__author__ = 'raymes@google.com (Raymes Khoury)'
+
+import os.path
+
+from automation.common import state_machine
+
+STATUS_NOT_EXECUTED = 'NOT_EXECUTED'
+STATUS_SETUP = 'SETUP'
+STATUS_COPYING = 'COPYING'
+STATUS_RUNNING = 'RUNNING'
+STATUS_SUCCEEDED = 'SUCCEEDED'
+STATUS_FAILED = 'FAILED'
+
+
+class FolderDependency(object):
+
+ def __init__(self, job, src, dest=None):
+ if not dest:
+ dest = src
+
+ # TODO(kbaclawski): rename to producer
+ self.job = job
+ self.src = src
+ self.dest = dest
+
+ @property
+ def read_only(self):
+ return self.dest == self.src
+
+
+class JobStateMachine(state_machine.BasicStateMachine):
+ state_machine = {
+ STATUS_NOT_EXECUTED: [STATUS_SETUP],
+ STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED],
+ STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED],
+ STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED]
+ }
+
+ final_states = [STATUS_SUCCEEDED, STATUS_FAILED]
+
+
+class JobFailure(Exception):
+
+ def __init__(self, message, exit_code):
+ Exception.__init__(self, message)
+ self.exit_code = exit_code
+
+
+class Job(object):
+ """A class representing a job whose commands will be executed."""
+
+ WORKDIR_PREFIX = '/usr/local/google/tmp/automation'
+
+ def __init__(self, label, command, timeout=4 * 60 * 60):
+ self._state = JobStateMachine(STATUS_NOT_EXECUTED)
+ self.predecessors = set()
+ self.successors = set()
+ self.machine_dependencies = []
+ self.folder_dependencies = []
+ self.id = 0
+ self.machines = []
+ self.command = command
+ self._has_primary_machine_spec = False
+ self.group = None
+ self.dry_run = None
+ self.label = label
+ self.timeout = timeout
+
+ def _StateGet(self):
+ return self._state
+
+ def _StateSet(self, new_state):
+ self._state.Change(new_state)
+
+ status = property(_StateGet, _StateSet)
+
+ @property
+ def timeline(self):
+ return self._state.timeline
+
+ def __repr__(self):
+ return '{%s: %s}' % (self.__class__.__name__, self.id)
+
+ def __str__(self):
+ res = []
+ res.append('%d' % self.id)
+ res.append('Predecessors:')
+ res.extend(['%d' % pred.id for pred in self.predecessors])
+ res.append('Successors:')
+ res.extend(['%d' % succ.id for succ in self.successors])
+ res.append('Machines:')
+ res.extend(['%s' % machine for machine in self.machines])
+ res.append(self.PrettyFormatCommand())
+ res.append('%s' % self.status)
+ res.append(self.timeline.GetTransitionEventReport())
+ return '\n'.join(res)
+
+ @staticmethod
+ def _FormatCommand(cmd, substitutions):
+ for pattern, replacement in substitutions:
+ cmd = cmd.replace(pattern, replacement)
+
+ return cmd
+
+ def GetCommand(self):
+ substitutions = [
+ ('$JOB_ID', str(self.id)), ('$JOB_TMP', self.work_dir),
+ ('$JOB_HOME', self.home_dir),
+ ('$PRIMARY_MACHINE', self.primary_machine.hostname)
+ ]
+
+ if len(self.machines) > 1:
+ for num, machine in enumerate(self.machines[1:]):
+ substitutions.append(('$SECONDARY_MACHINES[%d]' % num, machine.hostname
+ ))
+
+ return self._FormatCommand(str(self.command), substitutions)
+
+ def PrettyFormatCommand(self):
+ # TODO(kbaclawski): This method doesn't belong here, but rather to
+ # non existing Command class. If one is created then PrettyFormatCommand
+ # shall become its method.
+ return self._FormatCommand(self.GetCommand(), [
+ ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n')
+ ])
+
+ def DependsOnFolder(self, dependency):
+ self.folder_dependencies.append(dependency)
+ self.DependsOn(dependency.job)
+
+ @property
+ def results_dir(self):
+ return os.path.join(self.work_dir, 'results')
+
+ @property
+ def logs_dir(self):
+ return os.path.join(self.home_dir, 'logs')
+
+ @property
+ def log_filename_prefix(self):
+ return 'job-%d.log' % self.id
+
+ @property
+ def work_dir(self):
+ return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id)
+
+ @property
+ def home_dir(self):
+ return os.path.join(self.group.home_dir, 'job-%d' % self.id)
+
+ @property
+ def primary_machine(self):
+ return self.machines[0]
+
+ def DependsOn(self, job):
+ """Specifies Jobs to be finished before this job can be launched."""
+ self.predecessors.add(job)
+ job.successors.add(self)
+
+ @property
+ def is_ready(self):
+ """Check that all our dependencies have been executed."""
+ return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors)
+
+ def DependsOnMachine(self, machine_spec, primary=True):
+ # Job will run on arbitrarily chosen machine specified by
+ # MachineSpecification class instances passed to this method.
+ if primary:
+ if self._has_primary_machine_spec:
+ raise RuntimeError('Only one primary machine specification allowed.')
+ self._has_primary_machine_spec = True
+ self.machine_dependencies.insert(0, machine_spec)
+ else:
+ self.machine_dependencies.append(machine_spec)