path: root/gae/webapp/src/scheduler/
diff options
Diffstat (limited to 'gae/webapp/src/scheduler/')
1 files changed, 549 insertions, 0 deletions
diff --git a/gae/webapp/src/scheduler/ b/gae/webapp/src/scheduler/
new file mode 100644
index 0000000..4c4b20f
--- /dev/null
+++ b/gae/webapp/src/scheduler/
@@ -0,0 +1,549 @@
+#!/usr/bin/env python
+# Copyright (C) 2018 The Android Open Source Project
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import datetime
+import itertools
+import logging
+import re
+from google.appengine.ext import ndb
+from webapp.src import vtslab_status as Status
+from webapp.src.proto import model
+from webapp.src.utils import logger
+import webapp2
+MAX_LOG_CHARACTERS = 10000 # maximum number of characters per each log
+BOOTUP_ERROR_RETRY_INTERVAL_IN_MINS = 60 # retry minutes when boot-up error is occurred
+def GetTestVersionType(manifest_branch, gsi_branch, test_type=0):
+ """Compares manifest branch and gsi branch to get test type.
+ This function only completes two LSBs which represent version related
+ test type.
+ Args:
+ manifest_branch: a string, manifest branch name.
+ gsi_branch: a string, gsi branch name.
+ test_type: an integer, previous test type value.
+ Returns:
+ An integer, test type value.
+ """
+ if not test_type:
+ value = 0
+ else:
+ # clear two bits
+ value = test_type & ~(1 | 1 << 1)
+ if not manifest_branch:
+ logging.debug("manifest branch cannot be empty or None.")
+ return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]
+ if not gsi_branch:
+ logging.debug("gsi_branch is empty.")
+ return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT]
+ gcs_pattern = "^gs://.*/v([0-9.]*)/.*"
+ q_pattern = "(git_)?(aosp-)?q.*"
+ p_pattern = "(git_)?(aosp-)?p.*"
+ o_mr1_pattern = "(git_)?(aosp-)?o[^-]*-m.*"
+ o_pattern = "(git_)?(aosp-)?o.*"
+ master_pattern = "(git_)?(aosp-)?master"
+ gcs_search =, manifest_branch)
+ if gcs_search:
+ device_version =
+ elif re.match(q_pattern, manifest_branch):
+ device_version = "10.0"
+ elif re.match(p_pattern, manifest_branch):
+ device_version = "9.0"
+ elif re.match(o_mr1_pattern, manifest_branch):
+ device_version = "8.1"
+ elif re.match(o_pattern, manifest_branch):
+ device_version = "8.0"
+ elif re.match(master_pattern, manifest_branch):
+ device_version = "master"
+ else:
+ logging.debug("Unknown device version.")
+ return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]
+ gcs_search =, gsi_branch)
+ if gcs_search:
+ gsi_version =
+ elif re.match(q_pattern, gsi_branch):
+ gsi_version = "10.0"
+ elif re.match(p_pattern, gsi_branch):
+ gsi_version = "9.0"
+ elif re.match(o_mr1_pattern, gsi_branch):
+ gsi_version = "8.1"
+ elif re.match(o_pattern, gsi_branch):
+ gsi_version = "8.0"
+ elif re.match(master_pattern, gsi_branch):
+ gsi_version = "master"
+ else:
+ logging.debug("Unknown gsi version.")
+ return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_UNKNOWN]
+ if device_version == gsi_version:
+ return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT]
+ else:
+ return value | Status.TEST_TYPE_DICT[Status.TEST_TYPE_OTA]
+class ScheduleHandler(webapp2.RequestHandler):
+ """Background worker class for /worker/schedule_handler.
+ This class pull tasks from 'queue-schedule' queue and processes in
+ background service 'worker'.
+ Attributes:
+ logger: Logger class
+ """
+ logger = logger.Logger()
+ def ReserveDevices(self, target_device_serials):
+ """Reserves devices.
+ Args:
+ target_device_serials: a list of strings, containing target device
+ serial numbers.
+ """
+ device_query = model.DeviceModel.query(
+ model.DeviceModel.serial.IN(target_device_serials))
+ devices = device_query.fetch()
+ devices_to_put = []
+ for device in devices:
+ device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[
+ "reserved"]
+ devices_to_put.append(device)
+ if devices_to_put:
+ ndb.put_multi(devices_to_put)
+ def FindBuildId(self, artifact_type, manifest_branch, target,
+ signed=False):
+ """Finds a designated build ID.
+ Args:
+ artifact_type: a string, build artifact type.
+ manifest_branch: a string, build manifest branch.
+ target: a string which build target and type are joined by '-'.
+ signed: a boolean to get a signed build.
+ Return:
+ string, build ID found.
+ """
+ build_id = ""
+ if "-" in target:
+ build_target, build_type = target.split("-")
+ else:
+ build_target = target
+ build_type = ""
+ if not artifact_type or not manifest_branch or not build_target:
+ self.logger.Println("The argument format is invalid.")
+ return build_id
+ build_query = model.BuildModel.query(
+ model.BuildModel.artifact_type == artifact_type,
+ model.BuildModel.manifest_branch == manifest_branch,
+ model.BuildModel.build_target == build_target,
+ model.BuildModel.build_type == build_type)
+ builds = build_query.fetch()
+ if builds:
+ builds = [
+ build for build in builds
+ if (build.timestamp >
+ - datetime.timedelta(hours=72))
+ ]
+ if builds:
+ self.logger.Println("-- Found build ID")
+ builds.sort(key=lambda x: x.build_id, reverse=True)
+ for build in builds:
+ if not signed or build.signed:
+ build_id = build.build_id
+ break
+ return build_id
+ def post(self):
+ self.logger.Clear()
+ manual_job = False
+ schedule_key = self.request.get("schedule_key")
+ if schedule_key:
+ key = ndb.key.Key(urlsafe=schedule_key)
+ manual_job = True
+ schedules = [key.get()]
+ else:
+ schedule_query = model.ScheduleModel.query(
+ model.ScheduleModel.suspended != True)
+ schedules = schedule_query.fetch()
+ if schedules:
+ # filter out the schedules which are not updated within 72 hours.
+ schedules = [
+ schedule for schedule in schedules
+ if (schedule.timestamp >
+ - datetime.timedelta(hours=72))
+ ]
+ schedules = self.FilterWithPeriod(schedules)
+ if schedules:
+ schedules.sort(key=lambda x: self.GetProductName(x))
+ group_by_product = [
+ list(g)
+ for _, g in itertools.groupby(schedules,
+ lambda x: self.GetProductName(x))
+ ]
+ for group in group_by_product:
+ group.sort(key=lambda x: x.priority_value if (
+ x.priority_value) else Status.GetPriorityValue(x.priority))
+ create_result = {
+ }
+ for schedule in group:
+ self.logger.Println("")
+ self.logger.Println("Schedule: %s (branch: %s)" %
+ (schedule.test_name,
+ schedule.manifest_branch))
+ self.logger.Println(
+ "Build Target: %s" % schedule.build_target)
+ self.logger.Println("Device: %s" % schedule.device)
+ self.logger.Indent()
+ result, lab = self.CreateJob(schedule, manual_job)
+ if result == CREATE_JOB_SUCCESS:
+ create_result[result].append(lab)
+ else:
+ create_result[result].append(schedule)
+ self.logger.Unindent()
+ # if any schedule in group created a job, increase priority of
+ # the schedules which couldn't create due to out of devices.
+ schedules_to_put = []
+ for lab in create_result[CREATE_JOB_SUCCESS]:
+ for schedule in create_result[CREATE_JOB_FAILED_NO_DEVICE]:
+ if any([lab in target for target in schedule.device
+ ]) and schedule not in schedules_to_put:
+ if schedule.priority_value is None:
+ schedule.priority_value = (
+ Status.GetPriorityValue(schedule.priority))
+ if schedule.priority_value > 0:
+ schedule.priority_value -= 1
+ schedules_to_put.append(schedule)
+ if schedules_to_put:
+ ndb.put_multi(schedules_to_put)
+ self.logger.Println("Scheduling completed.")
+ lines = self.logger.Get()
+ lines = [line.strip() for line in lines]
+ outputs = []
+ chars = 0
+ for line in lines:
+ chars += len(line)
+ if chars > MAX_LOG_CHARACTERS:
+ outputs = []
+ chars = len(line)
+ outputs.append(line)
+ def CreateJob(self, schedule, manual_job=False):
+ """Creates a job for given schedule.
+ Args:
+ schedule: model.ScheduleModel instance.
+ manual_job: True if a job is created by a user, False otherwise.
+ Returns:
+ a string of job creation result message.
+ a string of lab name if job is created, otherwise empty string.
+ """
+ target_host, target_device, target_device_serials = (
+ self.SelectTargetLab(schedule))
+ if not target_host:
+ self.logger.Println("- Target host: %s" % target_host)
+ self.logger.Println("- Target device: %s" % target_device)
+ self.logger.Println("- Target serials: %s" % target_device_serials)
+ # create job and add.
+ new_job = model.JobModel()
+ new_job.hostname = target_host
+ new_job.priority = schedule.priority
+ new_job.test_name = schedule.test_name
+ new_job.require_signed_device_build = (
+ schedule.require_signed_device_build)
+ new_job.device = target_device
+ new_job.period = schedule.period
+ new_job.serial.extend(target_device_serials)
+ new_job.build_storage_type = schedule.build_storage_type
+ new_job.manifest_branch = schedule.manifest_branch
+ new_job.build_target = schedule.build_target
+ new_job.pab_account_id = schedule.device_pab_account_id
+ new_job.shards = schedule.shards
+ new_job.param = schedule.param
+ new_job.retry_count = schedule.retry_count
+ new_job.gsi_storage_type = schedule.gsi_storage_type
+ new_job.gsi_branch = schedule.gsi_branch
+ new_job.gsi_build_target = schedule.gsi_build_target
+ new_job.gsi_pab_account_id = schedule.gsi_pab_account_id
+ new_job.gsi_vendor_version = schedule.gsi_vendor_version
+ new_job.test_storage_type = schedule.test_storage_type
+ new_job.test_branch = schedule.test_branch
+ new_job.test_build_target = schedule.test_build_target
+ new_job.test_pab_account_id = schedule.test_pab_account_id
+ new_job.parent_schedule = schedule.key
+ new_job.image_package_repo_base = schedule.image_package_repo_base
+ new_job.required_host_equipment = schedule.required_host_equipment
+ new_job.required_device_equipment = schedule.required_device_equipment
+ new_job.has_bootloader_img = schedule.has_bootloader_img
+ new_job.has_radio_img = schedule.has_radio_img
+ new_job.report_bucket = schedule.report_bucket
+ new_job.report_spreadsheet_id = schedule.report_spreadsheet_id
+ new_job.report_persistent_url = schedule.report_persistent_url
+ new_job.report_reference_url = schedule.report_reference_url
+ # uses bit 0-1 to indicate version.
+ test_type = GetTestVersionType(schedule.manifest_branch,
+ schedule.gsi_branch)
+ # uses bit 2
+ if schedule.require_signed_device_build:
+ test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED]
+ if manual_job:
+ test_type |= Status.TEST_TYPE_DICT[Status.TEST_TYPE_MANUAL]
+ new_job.test_type = test_type
+ new_job.build_id = ""
+ new_job.gsi_build_id = ""
+ new_job.test_build_id = ""
+ for artifact_type in ["device", "gsi", "test"]:
+ if artifact_type == "device":
+ storage_type_text = "build_storage_type"
+ manifest_branch_text = "manifest_branch"
+ build_target_text = "build_target"
+ build_id_text = "build_id"
+ signed = new_job.require_signed_device_build
+ else:
+ storage_type_text = artifact_type + "_storage_type"
+ manifest_branch_text = artifact_type + "_branch"
+ build_target_text = artifact_type + "_build_target"
+ build_id_text = artifact_type + "_build_id"
+ signed = False
+ manifest_branch = getattr(new_job, manifest_branch_text)
+ build_target = getattr(new_job, build_target_text)
+ storage_type = getattr(new_job, storage_type_text)
+ if storage_type == Status.STORAGE_TYPE_DICT["PAB"]:
+ build_id = self.FindBuildId(
+ artifact_type=artifact_type,
+ manifest_branch=manifest_branch,
+ target=build_target,
+ signed=signed)
+ elif storage_type == Status.STORAGE_TYPE_DICT["GCS"]:
+ # temp value to distinguish from empty values.
+ build_id = "gcs"
+ else:
+ build_id = ""
+ self.logger.Println(
+ "Unexpected storage type (%s)." % storage_type)
+ setattr(new_job, build_id_text, build_id)
+ if ((not new_job.manifest_branch or new_job.build_id)
+ and (not new_job.gsi_branch or new_job.gsi_build_id)
+ and (not new_job.test_branch or new_job.test_build_id)):
+ new_job.build_id = new_job.build_id.replace("gcs", "")
+ new_job.gsi_build_id = (new_job.gsi_build_id.replace("gcs", ""))
+ new_job.test_build_id = (new_job.test_build_id.replace("gcs", ""))
+ self.ReserveDevices(target_device_serials)
+ new_job.status = Status.JOB_STATUS_DICT["ready"]
+ new_job.timestamp =
+ new_job_key = new_job.put()
+ schedule.children_jobs.append(new_job_key)
+ schedule.priority_value = Status.GetPriorityValue(
+ schedule.priority)
+ schedule.put()
+ self.logger.Println("A new job has been created.")
+ labs = model.LabModel.query(
+ model.LabModel.hostname == target_host).fetch()
+ return CREATE_JOB_SUCCESS, labs[0].name
+ else:
+ self.logger.Println("Cannot find builds to create a job.")
+ self.logger.Println("- Device branch / build - {} / {}".format(
+ new_job.manifest_branch, new_job.build_id))
+ self.logger.Println("- GSI branch / build - {} / {}".format(
+ new_job.gsi_branch, new_job.gsi_build_id))
+ self.logger.Println("- Test branch / build - {} / {}".format(
+ new_job.test_branch, new_job.test_build_id))
+ def FilterWithPeriod(self, schedules):
+ """Filters schedules with period.
+ This method filters schedules if any children jobs are created within
+ period time.
+ Args:
+ schedules: a list of model.ScheduleModel instances.
+ Returns:
+ a list of model.ScheduleModel instances which need to create a new
+ job.
+ """
+ ret_list = []
+ if not schedules:
+ return ret_list
+ if type(schedules) is not list:
+ schedules = [schedules]
+ for schedule in schedules:
+ if not schedule.children_jobs:
+ ret_list.append(schedule)
+ continue
+ latest_job_key = schedule.children_jobs[-1]
+ latest_job = latest_job_key.get()
+ if - latest_job.timestamp > (
+ datetime.timedelta(
+ minutes=self.GetCorrectedPeriod(schedule))):
+ ret_list.append(schedule)
+ return ret_list
+ def SelectTargetLab(self, schedule):
+ """Find target host and devices to schedule a new job.
+ Args:
+ schedule: a proto containing the information of a schedule.
+ Returns:
+ a string which represents hostname,
+ a string containing target lab and product with '/' separator,
+ a list of selected devices serial (see whether devices will be
+ selected later when the job is picked up.)
+ """
+ available_devices = []
+ for target_device in schedule.device:
+ if "/" not in target_device:
+ self.logger.Println(
+ "Device malformed - {}".format(target_device))
+ continue
+ target_lab, target_product_type = target_device.split("/")
+ self.logger.Println("- Lab %s" % target_lab)
+ self.logger.Indent()
+ host_query = model.LabModel.query(
+ == target_lab)
+ target_hosts = host_query.fetch()
+ if target_hosts:
+ for host in target_hosts:
+ if not (set(schedule.required_host_equipment) <= set(
+ host.host_equipment)):
+ continue
+ self.logger.Println("- Host: %s" % host.hostname)
+ self.logger.Indent()
+ device_query = model.DeviceModel.query(
+ model.DeviceModel.hostname == host.hostname,
+ model.DeviceModel.scheduling_status ==
+ model.DeviceModel.status.IN([
+ Status.DEVICE_STATUS_DICT["fastboot"],
+ Status.DEVICE_STATUS_DICT["online"],
+ Status.DEVICE_STATUS_DICT["ready"]
+ ]))
+ host_devices = device_query.fetch()
+ host_devices = [
+ x for x in host_devices
+ if x.product.lower() == target_product_type.lower() and
+ (set(schedule.required_device_equipment) <= set(
+ x.device_equipment))
+ ]
+ if len(host_devices) < schedule.shards:
+ self.logger.Println(
+ "A host {} does not have enough devices. "
+ "# of devices = {}, shards = {}".format(
+ host.hostname, len(host_devices),
+ schedule.shards))
+ self.logger.Unindent()
+ continue
+ host_devices.sort(
+ key=lambda x: (len(x.device_equipment)
+ if x.device_equipment else 0))
+ available_devices.append((host_devices, target_device))
+ self.logger.Unindent()
+ self.logger.Unindent()
+ if not available_devices:
+ self.logger.Println("No hosts have enough devices for schedule!")
+ return None, None, []
+ available_devices.sort(key=lambda x: (
+ sum([len(y.device_equipment) for y in x[0][:schedule.shards]])))
+ selected_host_devices = available_devices[0]
+ return selected_host_devices[0][0].hostname, selected_host_devices[
+ 1], [x.serial for x in selected_host_devices[0][:schedule.shards]]
+ def GetProductName(self, schedule):
+ """Gets a product name from schedule instance.
+ Args:
+ schedule: a schedule instance.
+ Returns:
+ a string, product name in lowercase.
+ """
+ if not schedule or not schedule.device:
+ return ""
+ if "/" not in schedule.device[0]:
+ return ""
+ return schedule.device[0].split("/")[1].lower()
+ def GetCorrectedPeriod(self, schedule):
+ """Corrects and returns period value based on latest children jobs.
+ Args:
+ schedule: a model.ScheduleModel instance containing schedule
+ information.
+ Returns:
+ an integer, corrected schedule period.
+ """
+ if not schedule.error_count or not schedule.children_jobs or (
+ return schedule.period
+ latest_job = schedule.children_jobs[-1].get()
+ if latest_job.status == Status.JOB_STATUS_DICT["bootup-err"]:
+ else:
+ return schedule.period