diff options
author | Tiancong Wang <tcwang@google.com> | 2020-02-13 21:08:49 +0000 |
---|---|---|
committer | Tiancong Wang <tcwang@google.com> | 2020-02-13 21:08:49 +0000 |
commit | b75f321fc8978b92ce3db6886ccb966768f0c7a8 (patch) | |
tree | 35fa0fbaeaaddd9cc2a126a05eee3527b51e83a8 /deprecated/automation/server/job_manager.py | |
parent | cddd960b0ba2eb62c372c0d3176c75f0bd05d5e8 (diff) | |
parent | e617e3393dd24003aa976ece5050bb291070041c (diff) | |
download | toolchain-utils-b75f321fc8978b92ce3db6886ccb966768f0c7a8.tar.gz |
Merging 18 commit(s) from Chromium's toolchain-utils am: 0ae38c8498 am: 2a19d36a82 am: e617e3393dr_aml_301500702android-mainline-12.0.0_r55android-mainline-11.0.0_r9android-mainline-11.0.0_r8android-mainline-11.0.0_r7android-mainline-11.0.0_r6android-mainline-11.0.0_r5android-mainline-11.0.0_r45android-mainline-11.0.0_r44android-mainline-11.0.0_r43android-mainline-11.0.0_r42android-mainline-11.0.0_r41android-mainline-11.0.0_r40android-mainline-11.0.0_r4android-mainline-11.0.0_r39android-mainline-11.0.0_r38android-mainline-11.0.0_r37android-mainline-11.0.0_r36android-mainline-11.0.0_r35android-mainline-11.0.0_r34android-mainline-11.0.0_r33android-mainline-11.0.0_r32android-mainline-11.0.0_r31android-mainline-11.0.0_r30android-mainline-11.0.0_r3android-mainline-11.0.0_r29android-mainline-11.0.0_r28android-mainline-11.0.0_r27android-mainline-11.0.0_r26android-mainline-11.0.0_r25android-mainline-11.0.0_r24android-mainline-11.0.0_r23android-mainline-11.0.0_r22android-mainline-11.0.0_r21android-mainline-11.0.0_r20android-mainline-11.0.0_r2android-mainline-11.0.0_r19android-mainline-11.0.0_r18android-mainline-11.0.0_r17android-mainline-11.0.0_r16android-mainline-11.0.0_r15android-mainline-11.0.0_r14android-mainline-11.0.0_r13android-mainline-11.0.0_r12android-mainline-11.0.0_r10android-mainline-11.0.0_r1android-11.0.0_r48android-11.0.0_r47android-11.0.0_r46android-11.0.0_r45android-11.0.0_r44android-11.0.0_r43android-11.0.0_r42android-11.0.0_r41android-11.0.0_r40android-11.0.0_r39android-11.0.0_r38android-11.0.0_r37android-11.0.0_r36android-11.0.0_r35android-11.0.0_r34android-11.0.0_r33android-11.0.0_r32android-11.0.0_r31android-11.0.0_r30android-11.0.0_r29android-11.0.0_r28android-11.0.0_r27android-11.0.0_r26android-11.0.0_r24android-11.0.0_r23android-11.0.0_r22android-11.0.0_r21android-11.0.0_r20android-11.0.0_r19android-11.0.0_r18android-11.0.0_r16android11-qpr3-s1-releaseandroid11-qpr3-releaseandroid11-qpr2-releaseandroid11-qpr1-s2-releaseandroid11-qpr1-s1-releaseandroid11-qpr1-releaseandroid11-qpr1-d-s1-releaseandroid11-qpr1-d-releaseandroid11-qpr1-c-releaseandroid11-mainline-tethering-releaseandroid11-mainline-sparse-2021-jan-releaseandroid11-mainline-sparse-2020-dec-releaseandroid11-mainline-releaseandroid11-mainline-permission-releaseandroid11-mainline-os-statsd-releaseandroid11-mainline-networkstack-releaseandroid11-mainline-media-swcodec-releaseandroid11-mainline-media-releaseandroid11-mainline-extservices-releaseandroid11-mainline-documentsui-releaseandroid11-mainline-conscrypt-releaseandroid11-mainline-cellbroadcast-releaseandroid11-mainline-captiveportallogin-releaseandroid11-devandroid11-d2-releaseandroid11-d1-b-release
Change-Id: I3f25c7ee034b2e20e37ed941b8eae24eec7043eb
Diffstat (limited to 'deprecated/automation/server/job_manager.py')
-rw-r--r-- | deprecated/automation/server/job_manager.py | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/deprecated/automation/server/job_manager.py b/deprecated/automation/server/job_manager.py new file mode 100644 index 00000000..7a65b918 --- /dev/null +++ b/deprecated/automation/server/job_manager.py @@ -0,0 +1,194 @@ +# Copyright 2010 Google Inc. All Rights Reserved. +# + +import logging +import os +import re +import threading + +from automation.common import job +from automation.common import logger +from automation.server.job_executer import JobExecuter + + +class IdProducerPolicy(object): + """Produces series of unique integer IDs. + + Example: + id_producer = IdProducerPolicy() + id_a = id_producer.GetNextId() + id_b = id_producer.GetNextId() + assert id_a != id_b + """ + + def __init__(self): + self._counter = 1 + + def Initialize(self, home_prefix, home_pattern): + """Find first available ID based on a directory listing. + + Args: + home_prefix: A directory to be traversed. + home_pattern: A regexp describing all files/directories that will be + considered. The regexp must contain exactly one match group with name + "id", which must match an integer number. + + Example: + id_producer.Initialize(JOBDIR_PREFIX, 'job-(?P<id>\d+)') + """ + harvested_ids = [] + + if os.path.isdir(home_prefix): + for filename in os.listdir(home_prefix): + path = os.path.join(home_prefix, filename) + + if os.path.isdir(path): + match = re.match(home_pattern, filename) + + if match: + harvested_ids.append(int(match.group('id'))) + + self._counter = max(harvested_ids or [0]) + 1 + + def GetNextId(self): + """Calculates another ID considered to be unique.""" + new_id = self._counter + self._counter += 1 + return new_id + + +class JobManager(threading.Thread): + + def __init__(self, machine_manager): + threading.Thread.__init__(self, name=self.__class__.__name__) + self.all_jobs = [] + self.ready_jobs = [] + self.job_executer_mapping = {} + + self.machine_manager = machine_manager + + self._lock = threading.Lock() + self._jobs_available = threading.Condition(self._lock) + self._exit_request = False + + self.listeners = [] + self.listeners.append(self) + + self._id_producer = IdProducerPolicy() + self._id_producer.Initialize(job.Job.WORKDIR_PREFIX, 'job-(?P<id>\d+)') + + self._logger = logging.getLogger(self.__class__.__name__) + + def StartJobManager(self): + self._logger.info('Starting...') + + with self._lock: + self.start() + self._jobs_available.notifyAll() + + def StopJobManager(self): + self._logger.info('Shutdown request received.') + + with self._lock: + for job_ in self.all_jobs: + self._KillJob(job_.id) + + # Signal to die + self._exit_request = True + self._jobs_available.notifyAll() + + # Wait for all job threads to finish + for executer in self.job_executer_mapping.values(): + executer.join() + + def KillJob(self, job_id): + """Kill a job by id. + + Does not block until the job is completed. + """ + with self._lock: + self._KillJob(job_id) + + def GetJob(self, job_id): + for job_ in self.all_jobs: + if job_.id == job_id: + return job_ + return None + + def _KillJob(self, job_id): + self._logger.info('Killing [Job: %d].', job_id) + + if job_id in self.job_executer_mapping: + self.job_executer_mapping[job_id].Kill() + for job_ in self.ready_jobs: + if job_.id == job_id: + self.ready_jobs.remove(job_) + break + + def AddJob(self, job_): + with self._lock: + job_.id = self._id_producer.GetNextId() + + self.all_jobs.append(job_) + # Only queue a job as ready if it has no dependencies + if job_.is_ready: + self.ready_jobs.append(job_) + + self._jobs_available.notifyAll() + + return job_.id + + def CleanUpJob(self, job_): + with self._lock: + if job_.id in self.job_executer_mapping: + self.job_executer_mapping[job_.id].CleanUpWorkDir() + del self.job_executer_mapping[job_.id] + # TODO(raymes): remove job from self.all_jobs + + def NotifyJobComplete(self, job_): + self.machine_manager.ReturnMachines(job_.machines) + + with self._lock: + self._logger.debug('Handling %r completion event.', job_) + + if job_.status == job.STATUS_SUCCEEDED: + for succ in job_.successors: + if succ.is_ready: + if succ not in self.ready_jobs: + self.ready_jobs.append(succ) + + self._jobs_available.notifyAll() + + def AddListener(self, listener): + self.listeners.append(listener) + + @logger.HandleUncaughtExceptions + def run(self): + self._logger.info('Started.') + + while not self._exit_request: + with self._lock: + # Get the next ready job, block if there are none + self._jobs_available.wait() + + while self.ready_jobs: + ready_job = self.ready_jobs.pop() + + required_machines = ready_job.machine_dependencies + for pred in ready_job.predecessors: + required_machines[0].AddPreferredMachine( + pred.primary_machine.hostname) + + machines = self.machine_manager.GetMachines(required_machines) + if not machines: + # If we can't get the necessary machines right now, simply wait + # for some jobs to complete + self.ready_jobs.insert(0, ready_job) + break + else: + # Mark as executing + executer = JobExecuter(ready_job, machines, self.listeners) + executer.start() + self.job_executer_mapping[ready_job.id] = executer + + self._logger.info('Stopped.') |