aboutsummaryrefslogtreecommitdiff
path: root/automation/server/job_group_manager.py
blob: d66f5e0717a250da51cc85a7709da97c42c6c55a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# Copyright 2010 Google Inc. All Rights Reserved.
#

import copy
import logging
import threading

from automation.common import command as cmd
from automation.common import logger
from automation.common.command_executer import CommandExecuter
from automation.common import job
from automation.common import job_group
from automation.server.job_manager import IdProducerPolicy


class JobGroupManager(object):

  def __init__(self, job_manager):
    self.all_job_groups = []

    self.job_manager = job_manager
    self.job_manager.AddListener(self)

    self._lock = threading.Lock()
    self._job_group_finished = threading.Condition(self._lock)

    self._id_producer = IdProducerPolicy()
    self._id_producer.Initialize(job_group.JobGroup.HOMEDIR_PREFIX,
                                 'job-group-(?P<id>\d+)')

    self._logger = logging.getLogger(self.__class__.__name__)

  def GetJobGroup(self, group_id):
    with self._lock:
      for group in self.all_job_groups:
        if group.id == group_id:
          return group

      return None

  def GetAllJobGroups(self):
    with self._lock:
      return copy.deepcopy(self.all_job_groups)

  def AddJobGroup(self, group):
    with self._lock:
      group.id = self._id_producer.GetNextId()

    self._logger.debug('Creating runtime environment for %r.', group)

    CommandExecuter().RunCommand(cmd.Chain(
        cmd.RmTree(group.home_dir), cmd.MakeDir(group.home_dir)))

    with self._lock:
      self.all_job_groups.append(group)

      for job_ in group.jobs:
        self.job_manager.AddJob(job_)

      group.status = job_group.STATUS_EXECUTING

    self._logger.info('Added %r to queue.', group)

    return group.id

  def KillJobGroup(self, group):
    with self._lock:
      self._logger.debug('Killing all jobs that belong to %r.', group)

      for job_ in group.jobs:
        self.job_manager.KillJob(job_)

      self._logger.debug('Waiting for jobs to quit.')

      # Lets block until the group is killed so we know it is completed
      # when we return.
      while group.status not in [job_group.STATUS_SUCCEEDED,
                                 job_group.STATUS_FAILED]:
        self._job_group_finished.wait()

  def NotifyJobComplete(self, job_):
    self._logger.debug('Handling %r completion event.', job_)

    group = job_.group

    with self._lock:
      # We need to perform an action only if the group hasn't already failed.
      if group.status != job_group.STATUS_FAILED:
        if job_.status == job.STATUS_FAILED:
          # We have a failed job, abort the job group
          group.status = job_group.STATUS_FAILED
          if group.cleanup_on_failure:
            for job_ in group.jobs:
              # TODO(bjanakiraman): We should probably only kill dependent jobs
              # instead of the whole job group.
              self.job_manager.KillJob(job_)
              self.job_manager.CleanUpJob(job_)
        else:
          # The job succeeded successfully -- lets check to see if we are done.
          assert job_.status == job.STATUS_SUCCEEDED
          finished = True
          for other_job in group.jobs:
            assert other_job.status != job.STATUS_FAILED
            if other_job.status != job.STATUS_SUCCEEDED:
              finished = False
              break

          if finished and group.status != job_group.STATUS_SUCCEEDED:
            # TODO(kbaclawski): Without check performed above following code
            # could be called more than once. This would trigger StateMachine
            # crash, because it cannot transition from STATUS_SUCCEEDED to
            # STATUS_SUCCEEDED. Need to address that bug in near future.
            group.status = job_group.STATUS_SUCCEEDED
            if group.cleanup_on_completion:
              for job_ in group.jobs:
                self.job_manager.CleanUpJob(job_)

        self._job_group_finished.notifyAll()