aboutsummaryrefslogtreecommitdiff
path: root/deprecated/automation/server/job_manager.py
diff options
context:
space:
mode:
authorTiancong Wang <tcwang@google.com>2020-02-13 21:08:49 +0000
committerTiancong Wang <tcwang@google.com>2020-02-13 21:08:49 +0000
commitb75f321fc8978b92ce3db6886ccb966768f0c7a8 (patch)
tree35fa0fbaeaaddd9cc2a126a05eee3527b51e83a8 /deprecated/automation/server/job_manager.py
parentcddd960b0ba2eb62c372c0d3176c75f0bd05d5e8 (diff)
parente617e3393dd24003aa976ece5050bb291070041c (diff)
downloadtoolchain-utils-b75f321fc8978b92ce3db6886ccb966768f0c7a8.tar.gz
Merging 18 commit(s) from Chromium's toolchain-utils am: 0ae38c8498 am: 2a19d36a82 am: e617e3393dr_aml_301500702android-mainline-12.0.0_r55android-mainline-11.0.0_r9android-mainline-11.0.0_r8android-mainline-11.0.0_r7android-mainline-11.0.0_r6android-mainline-11.0.0_r5android-mainline-11.0.0_r45android-mainline-11.0.0_r44android-mainline-11.0.0_r43android-mainline-11.0.0_r42android-mainline-11.0.0_r41android-mainline-11.0.0_r40android-mainline-11.0.0_r4android-mainline-11.0.0_r39android-mainline-11.0.0_r38android-mainline-11.0.0_r37android-mainline-11.0.0_r36android-mainline-11.0.0_r35android-mainline-11.0.0_r34android-mainline-11.0.0_r33android-mainline-11.0.0_r32android-mainline-11.0.0_r31android-mainline-11.0.0_r30android-mainline-11.0.0_r3android-mainline-11.0.0_r29android-mainline-11.0.0_r28android-mainline-11.0.0_r27android-mainline-11.0.0_r26android-mainline-11.0.0_r25android-mainline-11.0.0_r24android-mainline-11.0.0_r23android-mainline-11.0.0_r22android-mainline-11.0.0_r21android-mainline-11.0.0_r20android-mainline-11.0.0_r2android-mainline-11.0.0_r19android-mainline-11.0.0_r18android-mainline-11.0.0_r17android-mainline-11.0.0_r16android-mainline-11.0.0_r15android-mainline-11.0.0_r14android-mainline-11.0.0_r13android-mainline-11.0.0_r12android-mainline-11.0.0_r10android-mainline-11.0.0_r1android-11.0.0_r48android-11.0.0_r47android-11.0.0_r46android-11.0.0_r45android-11.0.0_r44android-11.0.0_r43android-11.0.0_r42android-11.0.0_r41android-11.0.0_r40android-11.0.0_r39android-11.0.0_r38android-11.0.0_r37android-11.0.0_r36android-11.0.0_r35android-11.0.0_r34android-11.0.0_r33android-11.0.0_r32android-11.0.0_r31android-11.0.0_r30android-11.0.0_r29android-11.0.0_r28android-11.0.0_r27android-11.0.0_r26android-11.0.0_r24android-11.0.0_r23android-11.0.0_r22android-11.0.0_r21android-11.0.0_r20android-11.0.0_r19android-11.0.0_r18android-11.0.0_r16android11-qpr3-s1-releaseandroid11-qpr3-releaseandroid11-qpr2-releaseandroid11-qpr1-s2-releaseandroid11-qpr1-s1-releaseandroid11-qpr1-releaseandroid11-qpr1-d-s1-releaseandroid11-qpr1-d-releaseandroid11-qpr1-c-releaseandroid11-mainline-tethering-releaseandroid11-mainline-sparse-2021-jan-releaseandroid11-mainline-sparse-2020-dec-releaseandroid11-mainline-releaseandroid11-mainline-permission-releaseandroid11-mainline-os-statsd-releaseandroid11-mainline-networkstack-releaseandroid11-mainline-media-swcodec-releaseandroid11-mainline-media-releaseandroid11-mainline-extservices-releaseandroid11-mainline-documentsui-releaseandroid11-mainline-conscrypt-releaseandroid11-mainline-cellbroadcast-releaseandroid11-mainline-captiveportallogin-releaseandroid11-devandroid11-d2-releaseandroid11-d1-b-release
Change-Id: I3f25c7ee034b2e20e37ed941b8eae24eec7043eb
Diffstat (limited to 'deprecated/automation/server/job_manager.py')
-rw-r--r--deprecated/automation/server/job_manager.py194
1 files changed, 194 insertions, 0 deletions
diff --git a/deprecated/automation/server/job_manager.py b/deprecated/automation/server/job_manager.py
new file mode 100644
index 00000000..7a65b918
--- /dev/null
+++ b/deprecated/automation/server/job_manager.py
@@ -0,0 +1,194 @@
+# Copyright 2010 Google Inc. All Rights Reserved.
+#
+
+import logging
+import os
+import re
+import threading
+
+from automation.common import job
+from automation.common import logger
+from automation.server.job_executer import JobExecuter
+
+
+class IdProducerPolicy(object):
+ """Produces series of unique integer IDs.
+
+ Example:
+ id_producer = IdProducerPolicy()
+ id_a = id_producer.GetNextId()
+ id_b = id_producer.GetNextId()
+ assert id_a != id_b
+ """
+
+ def __init__(self):
+ self._counter = 1
+
+ def Initialize(self, home_prefix, home_pattern):
+ """Find first available ID based on a directory listing.
+
+ Args:
+ home_prefix: A directory to be traversed.
+ home_pattern: A regexp describing all files/directories that will be
+ considered. The regexp must contain exactly one match group with name
+ "id", which must match an integer number.
+
+ Example:
+ id_producer.Initialize(JOBDIR_PREFIX, 'job-(?P<id>\d+)')
+ """
+ harvested_ids = []
+
+ if os.path.isdir(home_prefix):
+ for filename in os.listdir(home_prefix):
+ path = os.path.join(home_prefix, filename)
+
+ if os.path.isdir(path):
+ match = re.match(home_pattern, filename)
+
+ if match:
+ harvested_ids.append(int(match.group('id')))
+
+ self._counter = max(harvested_ids or [0]) + 1
+
+ def GetNextId(self):
+ """Calculates another ID considered to be unique."""
+ new_id = self._counter
+ self._counter += 1
+ return new_id
+
+
+class JobManager(threading.Thread):
+
+ def __init__(self, machine_manager):
+ threading.Thread.__init__(self, name=self.__class__.__name__)
+ self.all_jobs = []
+ self.ready_jobs = []
+ self.job_executer_mapping = {}
+
+ self.machine_manager = machine_manager
+
+ self._lock = threading.Lock()
+ self._jobs_available = threading.Condition(self._lock)
+ self._exit_request = False
+
+ self.listeners = []
+ self.listeners.append(self)
+
+ self._id_producer = IdProducerPolicy()
+ self._id_producer.Initialize(job.Job.WORKDIR_PREFIX, 'job-(?P<id>\d+)')
+
+ self._logger = logging.getLogger(self.__class__.__name__)
+
+ def StartJobManager(self):
+ self._logger.info('Starting...')
+
+ with self._lock:
+ self.start()
+ self._jobs_available.notifyAll()
+
+ def StopJobManager(self):
+ self._logger.info('Shutdown request received.')
+
+ with self._lock:
+ for job_ in self.all_jobs:
+ self._KillJob(job_.id)
+
+ # Signal to die
+ self._exit_request = True
+ self._jobs_available.notifyAll()
+
+ # Wait for all job threads to finish
+ for executer in self.job_executer_mapping.values():
+ executer.join()
+
+ def KillJob(self, job_id):
+ """Kill a job by id.
+
+ Does not block until the job is completed.
+ """
+ with self._lock:
+ self._KillJob(job_id)
+
+ def GetJob(self, job_id):
+ for job_ in self.all_jobs:
+ if job_.id == job_id:
+ return job_
+ return None
+
+ def _KillJob(self, job_id):
+ self._logger.info('Killing [Job: %d].', job_id)
+
+ if job_id in self.job_executer_mapping:
+ self.job_executer_mapping[job_id].Kill()
+ for job_ in self.ready_jobs:
+ if job_.id == job_id:
+ self.ready_jobs.remove(job_)
+ break
+
+ def AddJob(self, job_):
+ with self._lock:
+ job_.id = self._id_producer.GetNextId()
+
+ self.all_jobs.append(job_)
+ # Only queue a job as ready if it has no dependencies
+ if job_.is_ready:
+ self.ready_jobs.append(job_)
+
+ self._jobs_available.notifyAll()
+
+ return job_.id
+
+ def CleanUpJob(self, job_):
+ with self._lock:
+ if job_.id in self.job_executer_mapping:
+ self.job_executer_mapping[job_.id].CleanUpWorkDir()
+ del self.job_executer_mapping[job_.id]
+ # TODO(raymes): remove job from self.all_jobs
+
+ def NotifyJobComplete(self, job_):
+ self.machine_manager.ReturnMachines(job_.machines)
+
+ with self._lock:
+ self._logger.debug('Handling %r completion event.', job_)
+
+ if job_.status == job.STATUS_SUCCEEDED:
+ for succ in job_.successors:
+ if succ.is_ready:
+ if succ not in self.ready_jobs:
+ self.ready_jobs.append(succ)
+
+ self._jobs_available.notifyAll()
+
+ def AddListener(self, listener):
+ self.listeners.append(listener)
+
+ @logger.HandleUncaughtExceptions
+ def run(self):
+ self._logger.info('Started.')
+
+ while not self._exit_request:
+ with self._lock:
+ # Get the next ready job, block if there are none
+ self._jobs_available.wait()
+
+ while self.ready_jobs:
+ ready_job = self.ready_jobs.pop()
+
+ required_machines = ready_job.machine_dependencies
+ for pred in ready_job.predecessors:
+ required_machines[0].AddPreferredMachine(
+ pred.primary_machine.hostname)
+
+ machines = self.machine_manager.GetMachines(required_machines)
+ if not machines:
+ # If we can't get the necessary machines right now, simply wait
+ # for some jobs to complete
+ self.ready_jobs.insert(0, ready_job)
+ break
+ else:
+ # Mark as executing
+ executer = JobExecuter(ready_job, machines, self.listeners)
+ executer.start()
+ self.job_executer_mapping[ready_job.id] = executer
+
+ self._logger.info('Stopped.')