summaryrefslogtreecommitdiff
path: root/tradefed_cluster/request_sync_monitor.py
blob: 4a9c461430533ee6bf0913db6bacd4436a066f87 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Monitor requests until they reach a final state."""

import datetime
import json
import logging

import flask

from tradefed_cluster import command_event
from tradefed_cluster import commander
from tradefed_cluster import common
from tradefed_cluster import datastore_entities
from tradefed_cluster import request_manager
from tradefed_cluster.services import task_scheduler
from tradefed_cluster.util import ndb_shim as ndb

REQUEST_SYNC_QUEUE = 'request-sync-queue'
REQUEST_ID_KEY = 'request_id'

# Wait 15 seconds if there were events.
SHORT_SYNC_COUNTDOWN_SECS = 15

# Wait 1 minute before checking again if there are no events
LONG_SYNC_COUNTDOWN_SECS = 60

# Force a check for events at least once every 5 minutes
FORCE_REQUEST_SYNC_SECS = 60 * 5

APP = flask.Flask(__name__)


class RequestSyncStatusNotFoundError(Exception):
  """Unable to find the Request sync status."""
  pass


def _AddRequestToQueue(request_id, countdown_secs=LONG_SYNC_COUNTDOWN_SECS):
  """Add a request to the sync queue."""
  payload = json.dumps({
      REQUEST_ID_KEY: request_id,
  })

  next_sync = common.Now() + datetime.timedelta(seconds=countdown_secs)
  logging.debug('Queueing request %s to be synced at %s', request_id, next_sync)
  task = task_scheduler.AddTask(
      queue_name=REQUEST_SYNC_QUEUE, payload=payload, eta=next_sync)
  logging.debug('Queued task: %s', task)


def GetRequestSyncStatusKey(request_id):
  """Generate the key for a RequestSyncStatusEntity."""
  return ndb.Key(
      datastore_entities.RequestSyncStatus,
      request_id,
      namespace=common.NAMESPACE)


def Monitor(request_id):
  """Monitor the given request ID."""
  logging.info('Monitoring request: %s', request_id)

  _AddRequestToQueue(request_id)

  key = GetRequestSyncStatusKey(request_id)
  if key.get():
    logging.warning('Sync status already exists for %s', request_id)
    return

  sync_status = datastore_entities.RequestSyncStatus(
      key=key, request_id=request_id)
  sync_status.put()


@ndb.transactional()
def _UpdateSyncStatus(request_id):
  """Update the RequestSyncStatus in a transaction.

  Args:
    request_id: The request ID as a string

  Returns:
    True if the request should be synced. False otherwise.

  Raises:
    RequestSyncStatusNotFoundError: the a RequestSyncStatus is not found for the
      given request.
  """
  sync_status_key = GetRequestSyncStatusKey(request_id)
  sync_status = sync_status_key.get()

  if not sync_status:
    # This should not happen. If it does, that would mean that put() operation
    # in Monitor() failed after adding the request which would have caused
    # CreateRequest to also fail.
    raise RequestSyncStatusNotFoundError('No RequestSyncStatus found for: %s' %
                                         request_id)

  should_sync = False
  if sync_status.has_new_command_events:
    logging.info('Request %s has new command events.', request_id)
    sync_status.has_new_command_events = False
    should_sync = True
  elif not sync_status.last_sync_time:
    logging.info('Request %s does not have a last sync time.', request_id)
    should_sync = True
  elif (datetime.timedelta(seconds=FORCE_REQUEST_SYNC_SECS) <
        (common.Now() - sync_status.last_sync_time)):
    logging.info('Request %s was last synced on %s.', request_id,
                 sync_status.last_sync_time)
    should_sync = True

  if should_sync:
    sync_status.last_sync_time = common.Now()
    sync_status.put()

  return should_sync


def StoreCommandEvent(event):
  """Stores the command event to be processed later.

  Args:
    event: a CommandEvent
  """
  _SetNewCommandEvents(event.request_id)
  raw_event = datastore_entities.RawCommandEvent(
      request_id=event.request_id,
      command_id=event.command_id,
      attempt_id=event.attempt_id,
      event_timestamp=event.time,
      payload=event.event_dict,
      namespace=common.NAMESPACE)
  raw_event.put()


def _ProcessCommandEvents(request_id):
  """Process all raw command events for the given request.

  Args:
    request_id: ID of the request to process all its events for.
  """
  raw_events = datastore_entities.RawCommandEvent.query(
      datastore_entities.RawCommandEvent.request_id == request_id,
      namespace=common.NAMESPACE).order(
          datastore_entities.RawCommandEvent.event_timestamp)

  raw_events_keys_to_delete = []
  error = None

  for raw_event in raw_events:
    event = command_event.CommandEvent(**raw_event.payload)

    try:
      commander.ProcessCommandEvent(event)
      raw_events_keys_to_delete.append(raw_event.key)
    except Exception as e:  
      logging.warning('Error while processing event: %s', event, exc_info=True)
      error = e
      break

  logging.info('Processed %d events for request %s',
               len(raw_events_keys_to_delete), request_id)
  if raw_events_keys_to_delete:
    ndb.delete_multi(raw_events_keys_to_delete)

  if error:
    # Re-raise any error
    logging.warning('Events were partially processed for %s', request_id)
    raise error


def _SetNewCommandEvents(request_id):
  """Set the has_new_command_events=True for the given request."""
  sync_status_key = GetRequestSyncStatusKey(request_id)
  sync_status = sync_status_key.get()

  if not sync_status:
    logging.error(
        'Unable find sync status for %s. This can happen when events '
        'arrived after the request is final.', request_id)
    return

  if not sync_status.has_new_command_events:
    sync_status.has_new_command_events = True
    sync_status.put()


def SyncRequest(request_id):
  """Sync the request for the given ID.

  Check for pending command events for the given request and process them in
  order of their timestamps until the request reaches a final state.

  If command events aren't available or the request isn't final, add the request
  back to the sync queue.

  Args:
    request_id: The request ID as a string
  """
  should_sync = _UpdateSyncStatus(request_id)
  if not should_sync:
    logging.debug('Not syncing request %s', request_id)
    _AddRequestToQueue(request_id)
    return

  sync_status_key = GetRequestSyncStatusKey(request_id)
  sync_status = sync_status_key.get()
  request = request_manager.GetRequest(request_id)
  if not request:
    # This should not happen. Requires further debugging.
    logging.error('No request found with ID: %s', request_id)
    sync_status_key.delete()
    return
  logging.info('Request %s: state=%s', request_id, request.state)

  if request.state == common.RequestState.UNKNOWN:
    logging.debug(
        'Request %s is being scheduled; delaying sync', request_id)
    _AddRequestToQueue(request_id)
    return

  # If a request is in a final state, switch to on-demand event processing.
  last_sync = common.IsFinalRequestState(request.state)
  if last_sync:
    # Stop new events from being queued.
    sync_status_key.delete()

  logging.info('Syncing request %s', request_id)
  try:
    _ProcessCommandEvents(request_id)
  except:
    logging.exception('Failed to process events for %s', request_id)
    # Recover sync status.
    sync_status.put()
    _SetNewCommandEvents(request_id)
    raise

  if last_sync:
    logging.info('Request %s will no longer be synced', request_id)
    return

  _AddRequestToQueue(request_id, countdown_secs=SHORT_SYNC_COUNTDOWN_SECS)


@APP.route('/_ah/queue/%s' % REQUEST_SYNC_QUEUE, methods=['POST'])
def HandleRequestTask():
  """Request sync queue handler."""
  payload = flask.request.get_data()
  request_info = json.loads(payload)
  logging.info('RequestTaskHandler syncing %s', request_info)
  try:
    SyncRequest(request_info[REQUEST_ID_KEY])
  except RequestSyncStatusNotFoundError:
    # Do not retry missing RequestSyncStatus
    logging.warning('Missing request sync status for %s', request_info)

  return common.HTTP_OK