diff options
Diffstat (limited to 'automation/server/job_group_manager.py')
-rw-r--r-- | automation/server/job_group_manager.py | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/automation/server/job_group_manager.py b/automation/server/job_group_manager.py new file mode 100644 index 00000000..d66f5e07 --- /dev/null +++ b/automation/server/job_group_manager.py @@ -0,0 +1,118 @@ +# Copyright 2010 Google Inc. All Rights Reserved. +# + +import copy +import logging +import threading + +from automation.common import command as cmd +from automation.common import logger +from automation.common.command_executer import CommandExecuter +from automation.common import job +from automation.common import job_group +from automation.server.job_manager import IdProducerPolicy + + +class JobGroupManager(object): + + def __init__(self, job_manager): + self.all_job_groups = [] + + self.job_manager = job_manager + self.job_manager.AddListener(self) + + self._lock = threading.Lock() + self._job_group_finished = threading.Condition(self._lock) + + self._id_producer = IdProducerPolicy() + self._id_producer.Initialize(job_group.JobGroup.HOMEDIR_PREFIX, + 'job-group-(?P<id>\d+)') + + self._logger = logging.getLogger(self.__class__.__name__) + + def GetJobGroup(self, group_id): + with self._lock: + for group in self.all_job_groups: + if group.id == group_id: + return group + + return None + + def GetAllJobGroups(self): + with self._lock: + return copy.deepcopy(self.all_job_groups) + + def AddJobGroup(self, group): + with self._lock: + group.id = self._id_producer.GetNextId() + + self._logger.debug('Creating runtime environment for %r.', group) + + CommandExecuter().RunCommand(cmd.Chain( + cmd.RmTree(group.home_dir), cmd.MakeDir(group.home_dir))) + + with self._lock: + self.all_job_groups.append(group) + + for job_ in group.jobs: + self.job_manager.AddJob(job_) + + group.status = job_group.STATUS_EXECUTING + + self._logger.info('Added %r to queue.', group) + + return group.id + + def KillJobGroup(self, group): + with self._lock: + self._logger.debug('Killing all jobs that belong to %r.', group) + + for job_ in group.jobs: + self.job_manager.KillJob(job_) + + self._logger.debug('Waiting for jobs to quit.') + + # Lets block until the group is killed so we know it is completed + # when we return. + while group.status not in [job_group.STATUS_SUCCEEDED, + job_group.STATUS_FAILED]: + self._job_group_finished.wait() + + def NotifyJobComplete(self, job_): + self._logger.debug('Handling %r completion event.', job_) + + group = job_.group + + with self._lock: + # We need to perform an action only if the group hasn't already failed. + if group.status != job_group.STATUS_FAILED: + if job_.status == job.STATUS_FAILED: + # We have a failed job, abort the job group + group.status = job_group.STATUS_FAILED + if group.cleanup_on_failure: + for job_ in group.jobs: + # TODO(bjanakiraman): We should probably only kill dependent jobs + # instead of the whole job group. + self.job_manager.KillJob(job_) + self.job_manager.CleanUpJob(job_) + else: + # The job succeeded successfully -- lets check to see if we are done. + assert job_.status == job.STATUS_SUCCEEDED + finished = True + for other_job in group.jobs: + assert other_job.status != job.STATUS_FAILED + if other_job.status != job.STATUS_SUCCEEDED: + finished = False + break + + if finished and group.status != job_group.STATUS_SUCCEEDED: + # TODO(kbaclawski): Without check performed above following code + # could be called more than once. This would trigger StateMachine + # crash, because it cannot transition from STATUS_SUCCEEDED to + # STATUS_SUCCEEDED. Need to address that bug in near future. + group.status = job_group.STATUS_SUCCEEDED + if group.cleanup_on_completion: + for job_ in group.jobs: + self.job_manager.CleanUpJob(job_) + + self._job_group_finished.notifyAll() |