diff options
Diffstat (limited to 'gae/webapp/src/scheduler/schedule_worker.py')
-rw-r--r-- | gae/webapp/src/scheduler/schedule_worker.py | 549 |
1 files changed, 549 insertions, 0 deletions
diff --git a/gae/webapp/src/scheduler/schedule_worker.py b/gae/webapp/src/scheduler/schedule_worker.py new file mode 100644 index 0000000..4c4b20f --- /dev/null +++ b/gae/webapp/src/scheduler/schedule_worker.py @@ -0,0 +1,549 @@ +#!/usr/bin/env python +# +# Copyright (C) 2018 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import datetime +import itertools +import logging +import re + +from google.appengine.ext import ndb + +from webapp.src import vtslab_status as Status +from webapp.src.proto import model +from webapp.src.utils import logger +import webapp2 + +MAX_LOG_CHARACTERS = 10000 # maximum number of characters per each log +BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS = 60 # retry minutes when boot-up error is occurred + +CREATE_JOB_SUCCESS = "success" +CREATE_JOB_FAILED_NO_BUILD = "no_build" +CREATE_JOB_FAILED_NO_DEVICE = "no_device" + + +def GetTestVersionType(manifest_branch, gsi_branch, test_type=0): + """Compares manifest branch and gsi branch to get test type. + + This function only completes two LSBs which represent version related + test type. + + Args: + manifest_branch: a string, manifest branch name. + gsi_branch: a string, gsi branch name. + test_type: an integer, previous test type value. + + Returns: + An integer, test type value. + """ + if not test_type: + value = 0 + else: + # clear two bits + value = test_type & ~(1 | 1 << 1) + + if not manifest_branch: + logging.debug("manifest branch cannot be empty or None.") + return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN] + + if not gsi_branch: + logging.debug("gsi_branch is empty.") + return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT] + + gcs_pattern = "^gs://.*/v([0-9.]*)/.*" + q_pattern = "(git_)?(aosp-)?q.*" + p_pattern = "(git_)?(aosp-)?p.*" + o_mr1_pattern = "(git_)?(aosp-)?o[^-]*-m.*" + o_pattern = "(git_)?(aosp-)?o.*" + master_pattern = "(git_)?(aosp-)?master" + + gcs_search = re.search(gcs_pattern, manifest_branch) + if gcs_search: + device_version = gcs_search.group(1) + elif re.match(q_pattern, manifest_branch): + device_version = "10.0" + elif re.match(p_pattern, manifest_branch): + device_version = "9.0" + elif re.match(o_mr1_pattern, manifest_branch): + device_version = "8.1" + elif re.match(o_pattern, manifest_branch): + device_version = "8.0" + elif re.match(master_pattern, manifest_branch): + device_version = "master" + else: + logging.debug("Unknown device version.") + return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN] + + gcs_search = re.search(gcs_pattern, gsi_branch) + if gcs_search: + gsi_version = gcs_search.group(1) + elif re.match(q_pattern, gsi_branch): + gsi_version = "10.0" + elif re.match(p_pattern, gsi_branch): + gsi_version = "9.0" + elif re.match(o_mr1_pattern, gsi_branch): + gsi_version = "8.1" + elif re.match(o_pattern, gsi_branch): + gsi_version = "8.0" + elif re.match(master_pattern, gsi_branch): + gsi_version = "master" + else: + logging.debug("Unknown gsi version.") + return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN] + + if device_version == gsi_version: + return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT] + else: + return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_OTA] + + +class ScheduleHandler(webapp2.RequestHandler): + """Background worker class for /worker/schedule_handler. + + This class pull tasks from 'queue-schedule' queue and processes in + background service 'worker'. + + Attributes: + logger: Logger class + """ + logger = logger.Logger() + + def ReserveDevices(self, target_device_serials): + """Reserves devices. + + Args: + target_device_serials: a list of strings, containing target device + serial numbers. + """ + device_query = model.DeviceModel.query( + model.DeviceModel.serial.IN(target_device_serials)) + devices = device_query.fetch() + devices_to_put = [] + for device in devices: + device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[ + "reserved"] + devices_to_put.append(device) + if devices_to_put: + ndb.put_multi(devices_to_put) + + def FindBuildId(self, artifact_type, manifest_branch, target, + signed=False): + """Finds a designated build ID. + + Args: + artifact_type: a string, build artifact type. + manifest_branch: a string, build manifest branch. + target: a string which build target and type are joined by '-'. + signed: a boolean to get a signed build. + + Return: + string, build ID found. + """ + build_id = "" + if "-" in target: + build_target, build_type = target.split("-") + else: + build_target = target + build_type = "" + if not artifact_type or not manifest_branch or not build_target: + self.logger.Println("The argument format is invalid.") + return build_id + build_query = model.BuildModel.query( + model.BuildModel.artifact_type == artifact_type, + model.BuildModel.manifest_branch == manifest_branch, + model.BuildModel.build_target == build_target, + model.BuildModel.build_type == build_type) + builds = build_query.fetch() + + if builds: + builds = [ + build for build in builds + if (build.timestamp > + datetime.datetime.now() - datetime.timedelta(hours=72)) + ] + + if builds: + self.logger.Println("-- Found build ID") + builds.sort(key=lambda x: x.build_id, reverse=True) + for build in builds: + if not signed or build.signed: + build_id = build.build_id + break + return build_id + + def post(self): + self.logger.Clear() + manual_job = False + schedule_key = self.request.get("schedule_key") + if schedule_key: + key = ndb.key.Key(urlsafe=schedule_key) + manual_job = True + schedules = [key.get()] + else: + schedule_query = model.ScheduleModel.query( + model.ScheduleModel.suspended != True) + schedules = schedule_query.fetch() + + if schedules: + # filter out the schedules which are not updated within 72 hours. + schedules = [ + schedule for schedule in schedules + if (schedule.timestamp > + datetime.datetime.now() - datetime.timedelta(hours=72)) + ] + schedules = self.FilterWithPeriod(schedules) + + if schedules: + schedules.sort(key=lambda x: self.GetProductName(x)) + group_by_product = [ + list(g) + for _, g in itertools.groupby(schedules, + lambda x: self.GetProductName(x)) + ] + for group in group_by_product: + group.sort(key=lambda x: x.priority_value if ( + x.priority_value) else Status.GetPriorityValue(x.priority)) + create_result = { + CREATE_JOB_SUCCESS: [], + CREATE_JOB_FAILED_NO_BUILD: [], + CREATE_JOB_FAILED_NO_DEVICE: [] + } + for schedule in group: + self.logger.Println("") + self.logger.Println("Schedule: %s (branch: %s)" % + (schedule.test_name, + schedule.manifest_branch)) + self.logger.Println( + "Build Target: %s" % schedule.build_target) + self.logger.Println("Device: %s" % schedule.device) + self.logger.Indent() + result, lab = self.CreateJob(schedule, manual_job) + if result == CREATE_JOB_SUCCESS: + create_result[result].append(lab) + else: + create_result[result].append(schedule) + self.logger.Unindent() + # if any schedule in group created a job, increase priority of + # the schedules which couldn't create due to out of devices. + schedules_to_put = [] + for lab in create_result[CREATE_JOB_SUCCESS]: + for schedule in create_result[CREATE_JOB_FAILED_NO_DEVICE]: + if any([lab in target for target in schedule.device + ]) and schedule not in schedules_to_put: + if schedule.priority_value is None: + schedule.priority_value = ( + Status.GetPriorityValue(schedule.priority)) + if schedule.priority_value > 0: + schedule.priority_value -= 1 + schedules_to_put.append(schedule) + if schedules_to_put: + ndb.put_multi(schedules_to_put) + + self.logger.Println("Scheduling completed.") + + lines = self.logger.Get() + lines = [line.strip() for line in lines] + outputs = [] + chars = 0 + for line in lines: + chars += len(line) + if chars > MAX_LOG_CHARACTERS: + logging.info("\n".join(outputs)) + outputs = [] + chars = len(line) + outputs.append(line) + logging.info("\n".join(outputs)) + + def CreateJob(self, schedule, manual_job=False): + """Creates a job for given schedule. + + Args: + schedule: model.ScheduleModel instance. + manual_job: True if a job is created by a user, False otherwise. + + Returns: + a string of job creation result message. + a string of lab name if job is created, otherwise empty string. + """ + target_host, target_device, target_device_serials = ( + self.SelectTargetLab(schedule)) + if not target_host: + return CREATE_JOB_FAILED_NO_DEVICE, "" + + self.logger.Println("- Target host: %s" % target_host) + self.logger.Println("- Target device: %s" % target_device) + self.logger.Println("- Target serials: %s" % target_device_serials) + + # create job and add. + new_job = model.JobModel() + new_job.hostname = target_host + new_job.priority = schedule.priority + new_job.test_name = schedule.test_name + new_job.require_signed_device_build = ( + schedule.require_signed_device_build) + new_job.device = target_device + new_job.period = schedule.period + new_job.serial.extend(target_device_serials) + new_job.build_storage_type = schedule.build_storage_type + new_job.manifest_branch = schedule.manifest_branch + new_job.build_target = schedule.build_target + new_job.pab_account_id = schedule.device_pab_account_id + new_job.shards = schedule.shards + new_job.param = schedule.param + new_job.retry_count = schedule.retry_count + new_job.gsi_storage_type = schedule.gsi_storage_type + new_job.gsi_branch = schedule.gsi_branch + new_job.gsi_build_target = schedule.gsi_build_target + new_job.gsi_pab_account_id = schedule.gsi_pab_account_id + new_job.gsi_vendor_version = schedule.gsi_vendor_version + new_job.test_storage_type = schedule.test_storage_type + new_job.test_branch = schedule.test_branch + new_job.test_build_target = schedule.test_build_target + new_job.test_pab_account_id = schedule.test_pab_account_id + new_job.parent_schedule = schedule.key + new_job.image_package_repo_base = schedule.image_package_repo_base + new_job.required_host_equipment = schedule.required_host_equipment + new_job.required_device_equipment = schedule.required_device_equipment + new_job.has_bootloader_img = schedule.has_bootloader_img + new_job.has_radio_img = schedule.has_radio_img + new_job.report_bucket = schedule.report_bucket + new_job.report_spreadsheet_id = schedule.report_spreadsheet_id + new_job.report_persistent_url = schedule.report_persistent_url + new_job.report_reference_url = schedule.report_reference_url + + # uses bit 0-1 to indicate version. + test_type = GetTestVersionType(schedule.manifest_branch, + schedule.gsi_branch) + # uses bit 2 + if schedule.require_signed_device_build: + test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED] + + if manual_job: + test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_MANUAL] + + new_job.test_type = test_type + + new_job.build_id = "" + new_job.gsi_build_id = "" + new_job.test_build_id = "" + for artifact_type in ["device", "gsi", "test"]: + if artifact_type == "device": + storage_type_text = "build_storage_type" + manifest_branch_text = "manifest_branch" + build_target_text = "build_target" + build_id_text = "build_id" + signed = new_job.require_signed_device_build + else: + storage_type_text = artifact_type + "_storage_type" + manifest_branch_text = artifact_type + "_branch" + build_target_text = artifact_type + "_build_target" + build_id_text = artifact_type + "_build_id" + signed = False + + manifest_branch = getattr(new_job, manifest_branch_text) + build_target = getattr(new_job, build_target_text) + storage_type = getattr(new_job, storage_type_text) + if storage_type == Status.STORAGE_TYPE_DICT["PAB"]: + build_id = self.FindBuildId( + artifact_type=artifact_type, + manifest_branch=manifest_branch, + target=build_target, + signed=signed) + elif storage_type == Status.STORAGE_TYPE_DICT["GCS"]: + # temp value to distinguish from empty values. + build_id = "gcs" + else: + build_id = "" + self.logger.Println( + "Unexpected storage type (%s)." % storage_type) + setattr(new_job, build_id_text, build_id) + + if ((not new_job.manifest_branch or new_job.build_id) + and (not new_job.gsi_branch or new_job.gsi_build_id) + and (not new_job.test_branch or new_job.test_build_id)): + new_job.build_id = new_job.build_id.replace("gcs", "") + new_job.gsi_build_id = (new_job.gsi_build_id.replace("gcs", "")) + new_job.test_build_id = (new_job.test_build_id.replace("gcs", "")) + self.ReserveDevices(target_device_serials) + new_job.status = Status.JOB_STATUS_DICT["ready"] + new_job.timestamp = datetime.datetime.now() + new_job_key = new_job.put() + schedule.children_jobs.append(new_job_key) + schedule.priority_value = Status.GetPriorityValue( + schedule.priority) + schedule.put() + self.logger.Println("A new job has been created.") + labs = model.LabModel.query( + model.LabModel.hostname == target_host).fetch() + return CREATE_JOB_SUCCESS, labs[0].name + else: + self.logger.Println("Cannot find builds to create a job.") + self.logger.Println("- Device branch / build - {} / {}".format( + new_job.manifest_branch, new_job.build_id)) + self.logger.Println("- GSI branch / build - {} / {}".format( + new_job.gsi_branch, new_job.gsi_build_id)) + self.logger.Println("- Test branch / build - {} / {}".format( + new_job.test_branch, new_job.test_build_id)) + return CREATE_JOB_FAILED_NO_BUILD, "" + + def FilterWithPeriod(self, schedules): + """Filters schedules with period. + + This method filters schedules if any children jobs are created within + period time. + + Args: + schedules: a list of model.ScheduleModel instances. + + Returns: + a list of model.ScheduleModel instances which need to create a new + job. + """ + ret_list = [] + if not schedules: + return ret_list + + if type(schedules) is not list: + schedules = [schedules] + + for schedule in schedules: + if not schedule.children_jobs: + ret_list.append(schedule) + continue + + latest_job_key = schedule.children_jobs[-1] + latest_job = latest_job_key.get() + + if datetime.datetime.now() - latest_job.timestamp > ( + datetime.timedelta( + minutes=self.GetCorrectedPeriod(schedule))): + ret_list.append(schedule) + + return ret_list + + def SelectTargetLab(self, schedule): + """Find target host and devices to schedule a new job. + + Args: + schedule: a proto containing the information of a schedule. + + Returns: + a string which represents hostname, + a string containing target lab and product with '/' separator, + a list of selected devices serial (see whether devices will be + selected later when the job is picked up.) + """ + + available_devices = [] + for target_device in schedule.device: + if "/" not in target_device: + self.logger.Println( + "Device malformed - {}".format(target_device)) + continue + + target_lab, target_product_type = target_device.split("/") + self.logger.Println("- Lab %s" % target_lab) + self.logger.Indent() + host_query = model.LabModel.query( + model.LabModel.name == target_lab) + target_hosts = host_query.fetch() + + if target_hosts: + for host in target_hosts: + if not (set(schedule.required_host_equipment) <= set( + host.host_equipment)): + continue + self.logger.Println("- Host: %s" % host.hostname) + self.logger.Indent() + device_query = model.DeviceModel.query( + model.DeviceModel.hostname == host.hostname, + model.DeviceModel.scheduling_status == + Status.DEVICE_SCHEDULING_STATUS_DICT["free"], + model.DeviceModel.status.IN([ + Status.DEVICE_STATUS_DICT["fastboot"], + Status.DEVICE_STATUS_DICT["online"], + Status.DEVICE_STATUS_DICT["ready"] + ])) + host_devices = device_query.fetch() + host_devices = [ + x for x in host_devices + if x.product.lower() == target_product_type.lower() and + (set(schedule.required_device_equipment) <= set( + x.device_equipment)) + ] + if len(host_devices) < schedule.shards: + self.logger.Println( + "A host {} does not have enough devices. " + "# of devices = {}, shards = {}".format( + host.hostname, len(host_devices), + schedule.shards)) + self.logger.Unindent() + continue + host_devices.sort( + key=lambda x: (len(x.device_equipment) + if x.device_equipment else 0)) + available_devices.append((host_devices, target_device)) + self.logger.Unindent() + + self.logger.Unindent() + + if not available_devices: + self.logger.Println("No hosts have enough devices for schedule!") + return None, None, [] + + available_devices.sort(key=lambda x: ( + sum([len(y.device_equipment) for y in x[0][:schedule.shards]]))) + selected_host_devices = available_devices[0] + return selected_host_devices[0][0].hostname, selected_host_devices[ + 1], [x.serial for x in selected_host_devices[0][:schedule.shards]] + + def GetProductName(self, schedule): + """Gets a product name from schedule instance. + + Args: + schedule: a schedule instance. + + Returns: + a string, product name in lowercase. + """ + if not schedule or not schedule.device: + return "" + + if "/" not in schedule.device[0]: + return "" + + return schedule.device[0].split("/")[1].lower() + + def GetCorrectedPeriod(self, schedule): + """Corrects and returns period value based on latest children jobs. + + Args: + schedule: a model.ScheduleModel instance containing schedule + information. + + Returns: + an integer, corrected schedule period. + """ + if not schedule.error_count or not schedule.children_jobs or ( + schedule.period <= BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS): + return schedule.period + + latest_job = schedule.children_jobs[-1].get() + + if latest_job.status == Status.JOB_STATUS_DICT["bootup-err"]: + return BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS + else: + return schedule.period |