diff options
Diffstat (limited to 'gae/webapp/src/tasks/indexing.py')
-rw-r--r-- | gae/webapp/src/tasks/indexing.py | 278 |
1 files changed, 160 insertions, 118 deletions
diff --git a/gae/webapp/src/tasks/indexing.py b/gae/webapp/src/tasks/indexing.py index 4816363..2ebe687 100644 --- a/gae/webapp/src/tasks/indexing.py +++ b/gae/webapp/src/tasks/indexing.py @@ -15,129 +15,171 @@ # limitations under the License. # -import webapp2 +import logging from webapp.src import vtslab_status as Status from webapp.src.proto import model +from webapp.src.scheduler import schedule_worker +import webapp2 +from google.appengine.api import taskqueue +from google.appengine.ext import ndb -class CreateIndex(webapp2.RequestHandler): - """Main class for /tasks/indexing. - - By fetch and put all entities, indexing all existing entities. - """ - - def get(self): - """Fetch and put all entities and display complete message.""" - build_query = model.BuildModel.query() - builds = build_query.fetch() - for build in builds: - build.put() - - schedule_query = model.ScheduleModel.query() - schedules = schedule_query.fetch() - for schedule in schedules: - schedule.put() - - lab_query = model.LabModel.query() - labs = lab_query.fetch() - for lab in labs: - lab.put() - - device_query = model.DeviceModel.query() - devices = device_query.fetch() - for device in devices: - device.put() - - job_query = model.JobModel.query() - jobs = job_query.fetch() - for job in jobs: - job.put() - - self.response.write("<pre>Indexing has been completed.</pre>") - - -class CreateBuildModelIndex(webapp2.RequestHandler): - """Main class for /tasks/indexing/build. - - By fetch and put all entities, indexing all existing BuildModel entities. - """ - - def get(self): - """Fetch and put all BuildModel entities""" - build_query = model.BuildModel.query() - builds = build_query.fetch() - for build in builds: - build.put() - - self.response.write("<pre>BuildModel indexing has been completed.</pre>") - - -class CreateDeviceModelIndex(webapp2.RequestHandler): - """Main class for /tasks/indexing/device. - - By fetch and put all entities, indexing all existing DeviceModel entities. - """ - - def get(self): - """Fetch and put all DeviceModel entities""" - device_query = model.DeviceModel.query() - devices = device_query.fetch() - for device in devices: - device.put() - - self.response.write( - "<pre>DeviceModel indexing has been completed.</pre>") - - -class CreateJobModelIndex(webapp2.RequestHandler): - """Main class for /tasks/indexing/job. - - By fetch and put all entities, indexing all existing JobModel entities. - """ - - def get(self): - """Fetch and put all JobModel entities""" - job_query = model.JobModel.query() - jobs = job_query.fetch() - for job in jobs: - job.put() - - self.response.write( - "<pre>JobModel indexing has been completed.</pre>") - - -class CreateLabModelIndex(webapp2.RequestHandler): - """Main class for /tasks/indexing/lab. - - By fetch and put all entities, indexing all existing LabModel entities. - """ - - def get(self): - """Fetch and put all LabModel entities""" - lab_query = model.LabModel.query() - labs = lab_query.fetch() - for lab in labs: - lab.put() - - self.response.write( - "<pre>LabModel indexing has been completed.</pre>") - - -class CreateScheduleModelIndex(webapp2.RequestHandler): - """Main class for /tasks/indexing/schedule. - - By fetch and put all entities, indexing all existing ScheduleModel entities. - """ +PAGING_SIZE = 1000 +DICT_MODELS = { + "build": model.BuildModel, + "device": model.DeviceModel, + "lab": model.LabModel, + "job": model.JobModel, + "schedule": model.ScheduleModel +} - def get(self): - """Fetch and put all ScheduleModel entities""" - schedule_query = model.ScheduleModel.query() - schedules = schedule_query.fetch() - for schedule in schedules: - if schedule.build_storage_type is None: - schedule.build_storage_type = Status.STORAGE_TYPE_DICT["PAB"] - schedule.put() +class CreateIndex(webapp2.RequestHandler): + """Cron class for /tasks/indexing/{model}.""" + + def get(self, arg): + """Creates a task to re-index, with given URL format.""" + index_list = [] + if arg: + if arg.startswith("/") and arg[1:].lower() in DICT_MODELS.keys(): + index_list.append(arg[1:].lower()) + else: + self.response.write("<pre>Access Denied. Please visit " + "/tasks/indexing/{model}</pre>") + return + else: + # accessed by /tasks/indexing + index_list.extend(DICT_MODELS.keys()) self.response.write( - "<pre>ScheduleModel indexing has been completed.</pre>") + "<pre>Re-indexing task{} for {} {} going to be created.</pre>". + format("s" + if len(index_list) > 1 else "", ", ".join(index_list), "are" + if len(index_list) > 1 else "is")) + + for model_type in index_list: + task = taskqueue.add( + url="/worker/indexing", + target="worker", + queue_name="queue-indexing", + transactional=False, + params={ + "model_type": model_type + }) + self.response.write( + "<pre>Re-indexing task for {} is created. ETA: {}</pre>". + format(model_type, task.eta)) + + +class IndexingHandler(webapp2.RequestHandler): + """Task queue handler class to re-index ndb model.""" + + def post(self): + """Fetch entities and process model specific jobs.""" + reload(model) + model_type = self.request.get("model_type") + + num_updated = 0 + next_cursor = None + more = True + + while more: + query = DICT_MODELS[model_type].query() + entities, next_cursor, more = query.fetch_page( + PAGING_SIZE, start_cursor=next_cursor) + + to_put = [] + for entity in entities: + if model_type == "build": + pass + elif model_type == "device": + pass + elif model_type == "lab": + pass + elif model_type == "job": + # uses bits 0-1 to indicate version. + test_type = schedule_worker.GetTestVersionType( + entity.manifest_branch, entity.gsi_branch) + # uses bit 2 + if entity.require_signed_device_build: + test_type |= ( + Status.TEST_TYPE_DICT[Status.TEST_TYPE_SIGNED]) + entity.test_type = test_type + + if not entity.parent_schedule: + # finds and links to a parent schedule. + parent_schedule_query = model.ScheduleModel.query( + model.ScheduleModel.priority == entity.priority, + model.ScheduleModel.test_name == entity.test_name, + model.ScheduleModel.period == entity.period, + model.ScheduleModel.build_storage_type == ( + entity.build_storage_type), + model.ScheduleModel.manifest_branch == ( + entity.manifest_branch), + model.ScheduleModel.build_target == ( + entity.build_target), + model.ScheduleModel.device_pab_account_id == ( + entity.pab_account_id), + model.ScheduleModel.shards == entity.shards, + model.ScheduleModel.retry_count == ( + entity.retry_count), + model.ScheduleModel.gsi_storage_type == ( + entity.gsi_storage_type), + model.ScheduleModel.gsi_branch == ( + entity.gsi_branch), + model.ScheduleModel.gsi_build_target == ( + entity.gsi_build_target), + model.ScheduleModel.gsi_pab_account_id == ( + entity.gsi_pab_account_id), + model.ScheduleModel.gsi_vendor_version == ( + entity.gsi_vendor_version), + model.ScheduleModel.test_storage_type == ( + entity.test_storage_type), + model.ScheduleModel.test_branch == ( + entity.test_branch), + model.ScheduleModel.test_build_target == ( + entity.test_build_target), + model.ScheduleModel.test_pab_account_id == ( + entity.test_pab_account_id)) + parent_schedules = parent_schedule_query.fetch() + if not parent_schedules: + logging.error("Parent not found.") + else: + parent_schedule = parent_schedules[0] + parent_schedule.children_jobs.append(entity.key) + entity.parent_schedule = parent_schedule.key + to_put.append(parent_schedule) + + elif model_type == "schedule": + if entity.error_count is None: + entity.error_count = 0 + if entity.suspended is None: + entity.suspended = False + if entity.build_storage_type is None: + entity.build_storage_type = Status.STORAGE_TYPE_DICT[ + "PAB"] + # remove None children jobs. + if entity.children_jobs: + entity.children_jobs = [ + x for x in entity.children_jobs if x] + else: + entity.children_jobs = [] + + for attr in ["has_bootloader_img", "has_radio_img"]: + if getattr(entity, attr, None) is None: + setattr(entity, attr, True) + + # set priority_value for old schedules. + if entity.priority_value is None: + entity.priority_value = Status.GetPriorityValue( + entity.priority) + else: + pass + to_put.append(entity) + + if to_put: + ndb.put_multi(to_put) + num_updated += len(to_put) + + logging.info("{} indexing complete with {} updates!".format( + model_type, num_updated)) |