summaryrefslogtreecommitdiff
path: root/gae/webapp/src/endpoint/job_queue_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'gae/webapp/src/endpoint/job_queue_test.py')
-rw-r--r--gae/webapp/src/endpoint/job_queue_test.py158
1 files changed, 158 insertions, 0 deletions
diff --git a/gae/webapp/src/endpoint/job_queue_test.py b/gae/webapp/src/endpoint/job_queue_test.py
new file mode 100644
index 0000000..140c4f4
--- /dev/null
+++ b/gae/webapp/src/endpoint/job_queue_test.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2018 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+from webapp.src import vtslab_status as Status
+from webapp.src.endpoint import job_queue
+from webapp.src.proto import model
+from webapp.src.testing import unittest_base
+
+
+class JobQueueTest(unittest_base.UnitTestBase):
+ """A class to test job_queue endpoint API."""
+
+ def setUp(self):
+ """Initializes test"""
+ super(JobQueueTest, self).setUp()
+
+ def testGetJobModel(self):
+ """Asserts job_queue/get API receives a job lease request."""
+ test_values = {
+ "test_type": Status.TEST_TYPE_DICT[Status.TEST_TYPE_TOT],
+ "hostname": self.GetRandomString(),
+ "priority": self.GetRandomString(),
+ "test_name": self.GetRandomString(),
+ "require_signed_device_build": False,
+ "has_bootloader_img": True,
+ "has_radio_img": False,
+ "device": self.GetRandomString(),
+ "serial": ["serial01", "serial02"],
+ "build_storage_type": Status.STORAGE_TYPE_DICT["GCS"],
+ "manifest_branch": self.GetRandomString(),
+ "build_target": self.GetRandomString(),
+ "build_id": self.GetRandomString(),
+ "pab_account_id": self.GetRandomString(),
+ "shards": 1,
+ "param": [""],
+ "status": Status.JOB_STATUS_DICT["ready"],
+ "period": 360,
+ "gsi_storage_type": Status.STORAGE_TYPE_DICT["GCS"],
+ "gsi_branch": self.GetRandomString(),
+ "gsi_build_target": self.GetRandomString(),
+ "gsi_build_id": self.GetRandomString(),
+ "gsi_pab_account_id": self.GetRandomString(),
+ "gsi_vendor_version": self.GetRandomString(),
+ "test_storage_type": Status.STORAGE_TYPE_DICT["GCS"],
+ "test_branch": self.GetRandomString(),
+ "test_build_target": self.GetRandomString(),
+ "test_build_id": self.GetRandomString(),
+ "test_pab_account_id": self.GetRandomString(),
+ "retry_count": 2,
+ "infra_log_url": self.GetRandomString(),
+ "image_package_repo_base": self.GetRandomString(),
+ "report_bucket": [self.GetRandomString()],
+ "report_spreadsheet_id": [self.GetRandomString()],
+ "report_persistent_url": [self.GetRandomString()],
+ "report_reference_url": [self.GetRandomString()],
+ }
+
+ for serial in test_values["serial"]:
+ self.GenerateDeviceModel(serial=serial).put()
+
+ job = model.JobModel()
+ for key in test_values:
+ setattr(job, key, test_values[key])
+ job.timestamp = datetime.datetime.now()
+ job.put()
+
+ container = (job_queue.JOB_QUEUE_RESOURCE.combined_message_class(
+ hostname=test_values["hostname"]))
+ api = job_queue.JobQueueApi()
+ response = api.lease(container)
+
+ self.assertEqual(response.return_code,
+ model.ReturnCodeMessage.SUCCESS)
+ self.assertEqual(len(response.jobs), 1)
+ for key in test_values:
+ if key is "status":
+ self.assertEqual(
+ getattr(response.jobs[0], key),
+ Status.JOB_STATUS_DICT["leased"])
+ else:
+ self.assertEqual(
+ getattr(response.jobs[0], key), test_values[key])
+
+ devices = model.DeviceModel.query().fetch()
+ for device in devices:
+ self.assertEqual(device.scheduling_status,
+ Status.DEVICE_SCHEDULING_STATUS_DICT["use"])
+
+ # test job heartbeat api
+ container = (job_queue.JOB_QUEUE_RESOURCE.combined_message_class(
+ hostname=response.jobs[0].hostname,
+ manifest_branch=response.jobs[0].manifest_branch,
+ build_target=response.jobs[0].build_target,
+ test_name=response.jobs[0].test_name,
+ serial=response.jobs[0].serial,
+ status=response.jobs[0].status,
+ ))
+ api = job_queue.JobQueueApi()
+ response = api.heartbeat(container)
+ self.assertEqual(response.return_code,
+ model.ReturnCodeMessage.SUCCESS)
+
+ jobs = model.JobModel.query().fetch()
+ self.assertEqual(len(jobs), 1)
+ self.assertEqual(jobs[0].status, Status.JOB_STATUS_DICT["leased"])
+ self.assertTrue(datetime.datetime.now() - jobs[0].heartbeat_stamp <
+ datetime.timedelta(seconds=1))
+
+ # test job heartbeat api to complete the job
+ container = (job_queue.JOB_QUEUE_RESOURCE.combined_message_class(
+ hostname=response.jobs[0].hostname,
+ manifest_branch=response.jobs[0].manifest_branch,
+ build_target=response.jobs[0].build_target,
+ test_name=response.jobs[0].test_name,
+ serial=response.jobs[0].serial,
+ status=Status.JOB_STATUS_DICT["complete"],
+ ))
+ api = job_queue.JobQueueApi()
+ response = api.heartbeat(container)
+ self.assertEqual(response.return_code,
+ model.ReturnCodeMessage.SUCCESS)
+
+ jobs = model.JobModel.query().fetch()
+ self.assertEqual(len(jobs), 1)
+ self.assertEqual(jobs[0].status, Status.JOB_STATUS_DICT["complete"])
+ self.assertTrue(datetime.datetime.now() - jobs[0].heartbeat_stamp <
+ datetime.timedelta(seconds=1))
+
+ devices = model.DeviceModel.query().fetch()
+ for device in devices:
+ self.assertEqual(device.scheduling_status,
+ Status.DEVICE_SCHEDULING_STATUS_DICT["free"])
+
+
+if __name__ == "__main__":
+ unittest.main()