summaryrefslogtreecommitdiff
path: root/gae/webapp/src/scheduler/periodic.py
blob: 1c8a3d0f7c747558e2ec746028f5e14f65340f36 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
#!/usr/bin/env python
#
# Copyright (C) 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import datetime
import logging
import webapp2

from webapp.src import vtslab_status as Status
from webapp.src.proto import model
from webapp.src.utils import logger


def StrGT(left, right):
    """Returns true if `left` string is greater than `right` in value."""
    if len(left) > len(right):
        right = "0" * (len(left) - len(right)) + right
    elif len(right) > len(left):
        left = "0" * (len(right) - len(left)) + left
    return left > right


class PeriodicScheduler(webapp2.RequestHandler):
    """Main class for /tasks/schedule servlet.

    This class creates jobs from registered schedules periodically.

    Attributes:
        logger: Logger class
    """
    logger = logger.Logger()

    def ReserveDevices(self, target_device_serials):
        """Reserves devices.

        Args:
            target_device_serials: a list of strings, containing target device
                                   serial numbers.
        """
        device_query = model.DeviceModel.query(
            model.DeviceModel.serial.IN(target_device_serials))
        devices = device_query.fetch()
        for device in devices:
            device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[
                "reserved"]
            device.put()

    def FindBuildId(self, new_job):
        """Finds build ID for a new job.

        Args:
            new_job: JobModel, a new job.

        Return:
            string, build ID found.
        """
        build_id = ""
        build_query = model.BuildModel.query(
            model.BuildModel.manifest_branch == new_job.manifest_branch)
        builds = build_query.fetch()

        if builds:
            self.logger.Println("-- Find build ID")
            # Remove builds if build_id info is none
            build_id_filled = [x for x in builds if x.build_id]
            sorted_list = sorted(
                build_id_filled, key=lambda x: int(x.build_id), reverse=True)
            filtered_list = [
                x for x in sorted_list
                if (all(
                    hasattr(x, attrs)
                    for attrs in ["build_target", "build_type", "build_id"])
                    and x.build_target and x.build_type)
            ]
            for device_build in filtered_list:
                candidate_build_target = "-".join(
                    [device_build.build_target, device_build.build_type])
                if (new_job.build_target == candidate_build_target and
                    (not new_job.require_signed_device_build or
                     device_build.signed)):
                    build_id = device_build.build_id
                    break
        return build_id

    def get(self):
        """Generates an HTML page based on the task schedules kept in DB."""
        self.logger.Clear()

        schedule_query = model.ScheduleModel.query()
        schedules = schedule_query.fetch()

        if schedules:
            for schedule in schedules:
                self.logger.Println("Schedule: %s (%s %s)" %
                                    (schedule.test_name,
                                     schedule.manifest_branch,
                                     schedule.build_target))
                self.logger.Indent()
                if self.NewPeriod(schedule):
                    self.logger.Println("- Need new job")
                    target_host, target_device, target_device_serials =\
                        self.SelectTargetLab(schedule)
                    self.logger.Println("- Target host: %s" % target_host)
                    self.logger.Println("- Target device: %s" % target_device)
                    self.logger.Println(
                        "- Target serials: %s" % target_device_serials)
                    # TODO: update device status

                    # create job and add.
                    if target_host:
                        new_job = model.JobModel()
                        new_job.hostname = target_host
                        new_job.priority = schedule.priority
                        new_job.test_name = schedule.test_name
                        new_job.require_signed_device_build = (
                            schedule.require_signed_device_build)
                        new_job.device = target_device
                        new_job.period = schedule.period
                        new_job.serial.extend(target_device_serials)
                        new_job.build_storage_type = schedule.build_storage_type
                        new_job.manifest_branch = schedule.manifest_branch
                        new_job.build_target = schedule.build_target
                        new_job.shards = schedule.shards
                        new_job.param = schedule.param
                        new_job.retry_count = schedule.retry_count
                        new_job.gsi_storage_type = schedule.gsi_storage_type
                        new_job.gsi_branch = schedule.gsi_branch
                        new_job.gsi_build_target = schedule.gsi_build_target
                        new_job.gsi_pab_account_id = schedule.gsi_pab_account_id
                        new_job.test_storage_type = schedule.test_storage_type
                        new_job.test_branch = schedule.test_branch
                        new_job.test_build_target = schedule.test_build_target
                        new_job.test_pab_account_id = (
                            schedule.test_pab_account_id)

                        new_job.build_id = ""

                        if new_job.build_storage_type == (
                                Status.STORAGE_TYPE_DICT["PAB"]):
                            new_job.build_id = self.FindBuildId(new_job)
                            if new_job.build_id:
                                self.ReserveDevices(target_device_serials)
                                new_job.status = Status.JOB_STATUS_DICT[
                                    "ready"]
                                new_job.timestamp = datetime.datetime.now()
                                new_job.put()
                                self.logger.Println("NEW JOB")
                            else:
                                self.logger.Println("NO BUILD FOUND")
                        elif new_job.build_storage_type == (
                                Status.STORAGE_TYPE_DICT["GCS"]):
                            new_job.status = Status.JOB_STATUS_DICT["ready"]
                            new_job.timestamp = datetime.datetime.now()
                            new_job.put()
                            self.logger.Println("NEW JOB - GCS")
                        else:
                            self.logger.Println("Unexpected storage type.")

                self.logger.Unindent()

        self.response.write(
            "<pre>\n" + "\n".join(self.logger.Get()) + "\n</pre>")

    def NewPeriod(self, schedule):
        """Checks whether a new job creation is needed.

        Args:
            schedule: a proto containing schedule information.

        Returns:
            True if new job is required, False otherwise.
        """
        job_query = model.JobModel.query(
            model.JobModel.manifest_branch == schedule.manifest_branch,
            model.JobModel.build_target == schedule.build_target,
            model.JobModel.test_name == schedule.test_name,
            model.JobModel.period == schedule.period,
            model.JobModel.shards == schedule.shards,
            model.JobModel.retry_count == schedule.retry_count,
            model.JobModel.gsi_branch == schedule.gsi_branch,
            model.JobModel.test_branch == schedule.test_branch)
        same_jobs = job_query.fetch()
        same_jobs = [
            x for x in same_jobs
            if (set(x.param) == set(schedule.param)
                and x.device in schedule.device)
        ]
        if not same_jobs:
            return True

        outdated_jobs = [
            x for x in same_jobs
            if (datetime.datetime.now() - x.timestamp > datetime.timedelta(
                minutes=x.period))
        ]
        outdated_ready_jobs = [
            x for x in outdated_jobs
            if x.status == Status.JOB_STATUS_DICT["expired"]
        ]

        if outdated_ready_jobs:
            msg = ("Job key[{}] is(are) outdated. "
                   "They became infra-err status.").format(
                       ", ".join(
                           [str(x.key.id()) for x in outdated_ready_jobs]))
            logging.debug(msg)
            self.logger.Println(msg)
            for job in outdated_ready_jobs:
                job.status = Status.JOB_STATUS_DICT["infra-err"]
                job.put()

        outdated_leased_jobs = [
            x for x in outdated_jobs
            if x.status == Status.JOB_STATUS_DICT["leased"]
        ]
        if outdated_leased_jobs:
            msg = ("Job key[{}] is(are) expected to be completed "
                   "however still in leased status.").format(
                       ", ".join(
                           [str(x.key.id()) for x in outdated_leased_jobs]))
            logging.debug(msg)
            self.logger.Println(msg)

        recent_jobs = [x for x in same_jobs if x not in outdated_jobs]

        if recent_jobs or outdated_leased_jobs:
            return False
        else:
            return True

    def SelectTargetLab(self, schedule):
        """Find target host and devices to schedule a new job.

        Args:
            schedule: a proto containing the information of a schedule.

        Returns:
            a string which represents hostname,
            a string containing target lab and product with '/' separator,
            a list of selected devices serial (see whether devices will be
            selected later when the job is picked up.)
        """
        for target_device in schedule.device:
            if "/" not in target_device:
                # device malformed
                continue

            target_lab, target_product_type = target_device.split("/")
            self.logger.Println("- Seeking product %s in lab %s" %
                                (target_product_type, target_lab))
            self.logger.Indent()
            lab_query = model.LabModel.query(model.LabModel.name == target_lab)
            target_labs = lab_query.fetch()

            available_devices = {}
            if target_labs:
                for lab in target_labs:
                    self.logger.Println("- target lab found")
                    self.logger.Println("- target device %s %s" %
                                        (lab.hostname, target_product_type))
                    self.logger.Indent()
                    device_query = model.DeviceModel.query(
                        model.DeviceModel.hostname == lab.hostname)
                    host_devices = device_query.fetch()

                    for device in host_devices:
                        self.logger.Println("- check device %s %s" %
                                            (device.status, device.product))
                        if ((device.status in [
                                Status.DEVICE_STATUS_DICT["fastboot"],
                                Status.DEVICE_STATUS_DICT["online"],
                                Status.DEVICE_STATUS_DICT["ready"]
                        ]) and (device.scheduling_status ==
                                Status.DEVICE_SCHEDULING_STATUS_DICT["free"])
                                and device.product == target_product_type):
                            self.logger.Println(
                                "- a device found %s" % device.serial)
                            if device.hostname not in available_devices:
                                available_devices[device.hostname] = set()
                            available_devices[device.hostname].add(
                                device.serial)
                    self.logger.Unindent()
                for host in available_devices:
                    self.logger.Println("- len(devices) %s >= shards %s ?" %
                                        (len(available_devices[host]),
                                         schedule.shards))
                    if len(available_devices[host]) >= schedule.shards:
                        self.logger.Unindent()
                        return host, target_device, list(
                            available_devices[host])[:schedule.shards]
            self.logger.Unindent()
        return None, None, []