diff options
Diffstat (limited to 'gae/webapp/src/scheduler/periodic.py')
-rw-r--r-- | gae/webapp/src/scheduler/periodic.py | 301 |
1 files changed, 25 insertions, 276 deletions
diff --git a/gae/webapp/src/scheduler/periodic.py b/gae/webapp/src/scheduler/periodic.py index 1c8a3d0..627ec14 100644 --- a/gae/webapp/src/scheduler/periodic.py +++ b/gae/webapp/src/scheduler/periodic.py @@ -14,292 +14,41 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import datetime -import logging import webapp2 -from webapp.src import vtslab_status as Status from webapp.src.proto import model -from webapp.src.utils import logger - -def StrGT(left, right): - """Returns true if `left` string is greater than `right` in value.""" - if len(left) > len(right): - right = "0" * (len(left) - len(right)) + right - elif len(right) > len(left): - left = "0" * (len(right) - len(left)) + left - return left > right +from google.appengine.api import taskqueue class PeriodicScheduler(webapp2.RequestHandler): """Main class for /tasks/schedule servlet. - This class creates jobs from registered schedules periodically. - - Attributes: - logger: Logger class + This class creates a task, which creates schedules, in given period. """ - logger = logger.Logger() - - def ReserveDevices(self, target_device_serials): - """Reserves devices. - - Args: - target_device_serials: a list of strings, containing target device - serial numbers. - """ - device_query = model.DeviceModel.query( - model.DeviceModel.serial.IN(target_device_serials)) - devices = device_query.fetch() - for device in devices: - device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[ - "reserved"] - device.put() - - def FindBuildId(self, new_job): - """Finds build ID for a new job. - - Args: - new_job: JobModel, a new job. - - Return: - string, build ID found. - """ - build_id = "" - build_query = model.BuildModel.query( - model.BuildModel.manifest_branch == new_job.manifest_branch) - builds = build_query.fetch() - - if builds: - self.logger.Println("-- Find build ID") - # Remove builds if build_id info is none - build_id_filled = [x for x in builds if x.build_id] - sorted_list = sorted( - build_id_filled, key=lambda x: int(x.build_id), reverse=True) - filtered_list = [ - x for x in sorted_list - if (all( - hasattr(x, attrs) - for attrs in ["build_target", "build_type", "build_id"]) - and x.build_target and x.build_type) - ] - for device_build in filtered_list: - candidate_build_target = "-".join( - [device_build.build_target, device_build.build_type]) - if (new_job.build_target == candidate_build_target and - (not new_job.require_signed_device_build or - device_build.signed)): - build_id = device_build.build_id - break - return build_id def get(self): - """Generates an HTML page based on the task schedules kept in DB.""" - self.logger.Clear() - - schedule_query = model.ScheduleModel.query() - schedules = schedule_query.fetch() - - if schedules: - for schedule in schedules: - self.logger.Println("Schedule: %s (%s %s)" % - (schedule.test_name, - schedule.manifest_branch, - schedule.build_target)) - self.logger.Indent() - if self.NewPeriod(schedule): - self.logger.Println("- Need new job") - target_host, target_device, target_device_serials =\ - self.SelectTargetLab(schedule) - self.logger.Println("- Target host: %s" % target_host) - self.logger.Println("- Target device: %s" % target_device) - self.logger.Println( - "- Target serials: %s" % target_device_serials) - # TODO: update device status - - # create job and add. - if target_host: - new_job = model.JobModel() - new_job.hostname = target_host - new_job.priority = schedule.priority - new_job.test_name = schedule.test_name - new_job.require_signed_device_build = ( - schedule.require_signed_device_build) - new_job.device = target_device - new_job.period = schedule.period - new_job.serial.extend(target_device_serials) - new_job.build_storage_type = schedule.build_storage_type - new_job.manifest_branch = schedule.manifest_branch - new_job.build_target = schedule.build_target - new_job.shards = schedule.shards - new_job.param = schedule.param - new_job.retry_count = schedule.retry_count - new_job.gsi_storage_type = schedule.gsi_storage_type - new_job.gsi_branch = schedule.gsi_branch - new_job.gsi_build_target = schedule.gsi_build_target - new_job.gsi_pab_account_id = schedule.gsi_pab_account_id - new_job.test_storage_type = schedule.test_storage_type - new_job.test_branch = schedule.test_branch - new_job.test_build_target = schedule.test_build_target - new_job.test_pab_account_id = ( - schedule.test_pab_account_id) - - new_job.build_id = "" - - if new_job.build_storage_type == ( - Status.STORAGE_TYPE_DICT["PAB"]): - new_job.build_id = self.FindBuildId(new_job) - if new_job.build_id: - self.ReserveDevices(target_device_serials) - new_job.status = Status.JOB_STATUS_DICT[ - "ready"] - new_job.timestamp = datetime.datetime.now() - new_job.put() - self.logger.Println("NEW JOB") - else: - self.logger.Println("NO BUILD FOUND") - elif new_job.build_storage_type == ( - Status.STORAGE_TYPE_DICT["GCS"]): - new_job.status = Status.JOB_STATUS_DICT["ready"] - new_job.timestamp = datetime.datetime.now() - new_job.put() - self.logger.Println("NEW JOB - GCS") - else: - self.logger.Println("Unexpected storage type.") - - self.logger.Unindent() - + """Enqueues a scheduling task if scheduler is enabled.""" + schedule_control = model.ScheduleControlModel.query() + schedule_control_dataset = schedule_control.fetch() + enabled = True + if schedule_control_dataset: + for schedule_control_data_tuple in schedule_control_dataset: + if (not schedule_control_data_tuple.schedule_name or + schedule_control_data_tuple.schedule_name == "global"): + enabled = schedule_control_data_tuple.enabled + + if not enabled: + self.response.write( + "<pre>\nScheduler not enabled.\n</pre>") + return + + task = taskqueue.add( + url="/worker/schedule_handler", + target="worker", + queue_name="queue-schedule", + transactional=False + ) self.response.write( - "<pre>\n" + "\n".join(self.logger.Get()) + "\n</pre>") - - def NewPeriod(self, schedule): - """Checks whether a new job creation is needed. - - Args: - schedule: a proto containing schedule information. - - Returns: - True if new job is required, False otherwise. - """ - job_query = model.JobModel.query( - model.JobModel.manifest_branch == schedule.manifest_branch, - model.JobModel.build_target == schedule.build_target, - model.JobModel.test_name == schedule.test_name, - model.JobModel.period == schedule.period, - model.JobModel.shards == schedule.shards, - model.JobModel.retry_count == schedule.retry_count, - model.JobModel.gsi_branch == schedule.gsi_branch, - model.JobModel.test_branch == schedule.test_branch) - same_jobs = job_query.fetch() - same_jobs = [ - x for x in same_jobs - if (set(x.param) == set(schedule.param) - and x.device in schedule.device) - ] - if not same_jobs: - return True - - outdated_jobs = [ - x for x in same_jobs - if (datetime.datetime.now() - x.timestamp > datetime.timedelta( - minutes=x.period)) - ] - outdated_ready_jobs = [ - x for x in outdated_jobs - if x.status == Status.JOB_STATUS_DICT["expired"] - ] - - if outdated_ready_jobs: - msg = ("Job key[{}] is(are) outdated. " - "They became infra-err status.").format( - ", ".join( - [str(x.key.id()) for x in outdated_ready_jobs])) - logging.debug(msg) - self.logger.Println(msg) - for job in outdated_ready_jobs: - job.status = Status.JOB_STATUS_DICT["infra-err"] - job.put() - - outdated_leased_jobs = [ - x for x in outdated_jobs - if x.status == Status.JOB_STATUS_DICT["leased"] - ] - if outdated_leased_jobs: - msg = ("Job key[{}] is(are) expected to be completed " - "however still in leased status.").format( - ", ".join( - [str(x.key.id()) for x in outdated_leased_jobs])) - logging.debug(msg) - self.logger.Println(msg) - - recent_jobs = [x for x in same_jobs if x not in outdated_jobs] - - if recent_jobs or outdated_leased_jobs: - return False - else: - return True - - def SelectTargetLab(self, schedule): - """Find target host and devices to schedule a new job. - - Args: - schedule: a proto containing the information of a schedule. - - Returns: - a string which represents hostname, - a string containing target lab and product with '/' separator, - a list of selected devices serial (see whether devices will be - selected later when the job is picked up.) - """ - for target_device in schedule.device: - if "/" not in target_device: - # device malformed - continue - - target_lab, target_product_type = target_device.split("/") - self.logger.Println("- Seeking product %s in lab %s" % - (target_product_type, target_lab)) - self.logger.Indent() - lab_query = model.LabModel.query(model.LabModel.name == target_lab) - target_labs = lab_query.fetch() - - available_devices = {} - if target_labs: - for lab in target_labs: - self.logger.Println("- target lab found") - self.logger.Println("- target device %s %s" % - (lab.hostname, target_product_type)) - self.logger.Indent() - device_query = model.DeviceModel.query( - model.DeviceModel.hostname == lab.hostname) - host_devices = device_query.fetch() - - for device in host_devices: - self.logger.Println("- check device %s %s" % - (device.status, device.product)) - if ((device.status in [ - Status.DEVICE_STATUS_DICT["fastboot"], - Status.DEVICE_STATUS_DICT["online"], - Status.DEVICE_STATUS_DICT["ready"] - ]) and (device.scheduling_status == - Status.DEVICE_SCHEDULING_STATUS_DICT["free"]) - and device.product == target_product_type): - self.logger.Println( - "- a device found %s" % device.serial) - if device.hostname not in available_devices: - available_devices[device.hostname] = set() - available_devices[device.hostname].add( - device.serial) - self.logger.Unindent() - for host in available_devices: - self.logger.Println("- len(devices) %s >= shards %s ?" % - (len(available_devices[host]), - schedule.shards)) - if len(available_devices[host]) >= schedule.shards: - self.logger.Unindent() - return host, target_device, list( - available_devices[host])[:schedule.shards] - self.logger.Unindent() - return None, None, [] + "<pre>\nScheduling task is enqueued. ETA {}\n</pre>".format( + task.eta)) |