# Copyright 2010 Google Inc. All Rights Reserved. # import copy import logging import threading from automation.common import command as cmd from automation.common import logger from automation.common.command_executer import CommandExecuter from automation.common import job from automation.common import job_group from automation.server.job_manager import IdProducerPolicy class JobGroupManager(object): def __init__(self, job_manager): self.all_job_groups = [] self.job_manager = job_manager self.job_manager.AddListener(self) self._lock = threading.Lock() self._job_group_finished = threading.Condition(self._lock) self._id_producer = IdProducerPolicy() self._id_producer.Initialize(job_group.JobGroup.HOMEDIR_PREFIX, 'job-group-(?P\d+)') self._logger = logging.getLogger(self.__class__.__name__) def GetJobGroup(self, group_id): with self._lock: for group in self.all_job_groups: if group.id == group_id: return group return None def GetAllJobGroups(self): with self._lock: return copy.deepcopy(self.all_job_groups) def AddJobGroup(self, group): with self._lock: group.id = self._id_producer.GetNextId() self._logger.debug('Creating runtime environment for %r.', group) CommandExecuter().RunCommand(cmd.Chain( cmd.RmTree(group.home_dir), cmd.MakeDir(group.home_dir))) with self._lock: self.all_job_groups.append(group) for job_ in group.jobs: self.job_manager.AddJob(job_) group.status = job_group.STATUS_EXECUTING self._logger.info('Added %r to queue.', group) return group.id def KillJobGroup(self, group): with self._lock: self._logger.debug('Killing all jobs that belong to %r.', group) for job_ in group.jobs: self.job_manager.KillJob(job_) self._logger.debug('Waiting for jobs to quit.') # Lets block until the group is killed so we know it is completed # when we return. while group.status not in [job_group.STATUS_SUCCEEDED, job_group.STATUS_FAILED]: self._job_group_finished.wait() def NotifyJobComplete(self, job_): self._logger.debug('Handling %r completion event.', job_) group = job_.group with self._lock: # We need to perform an action only if the group hasn't already failed. if group.status != job_group.STATUS_FAILED: if job_.status == job.STATUS_FAILED: # We have a failed job, abort the job group group.status = job_group.STATUS_FAILED if group.cleanup_on_failure: for job_ in group.jobs: # TODO(bjanakiraman): We should probably only kill dependent jobs # instead of the whole job group. self.job_manager.KillJob(job_) self.job_manager.CleanUpJob(job_) else: # The job succeeded successfully -- lets check to see if we are done. assert job_.status == job.STATUS_SUCCEEDED finished = True for other_job in group.jobs: assert other_job.status != job.STATUS_FAILED if other_job.status != job.STATUS_SUCCEEDED: finished = False break if finished and group.status != job_group.STATUS_SUCCEEDED: # TODO(kbaclawski): Without check performed above following code # could be called more than once. This would trigger StateMachine # crash, because it cannot transition from STATUS_SUCCEEDED to # STATUS_SUCCEEDED. Need to address that bug in near future. group.status = job_group.STATUS_SUCCEEDED if group.cleanup_on_completion: for job_ in group.jobs: self.job_manager.CleanUpJob(job_) self._job_group_finished.notifyAll()