summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevin Lau Fang <fangk@google.com>2023-04-07 15:36:50 -0700
committerCopybara-Service <copybara-worker@google.com>2023-04-07 15:37:38 -0700
commit528766d26c49412708708d66e80f02ecfc179abd (patch)
tree4e68d382ada29e6b7e9dd6420a08e094a78e6cf0
parentd5958c822cb0a252e9d247e84c0a84c6ea56454e (diff)
downloadtradefed_cluster-528766d26c49412708708d66e80f02ecfc179abd.tar.gz
Internal change
PiperOrigin-RevId: 522699259 Change-Id: I851bb15dcb9c57b9a9d2e2795ced3b9fd6b64979
-rw-r--r--tradefed_cluster/BUILD7
-rw-r--r--tradefed_cluster/command_attempt_monitor.py17
-rw-r--r--tradefed_cluster/command_attempt_monitor_test.py28
-rw-r--r--tradefed_cluster/command_manager.py14
-rw-r--r--tradefed_cluster/command_task_api.py2
5 files changed, 61 insertions, 7 deletions
diff --git a/tradefed_cluster/BUILD b/tradefed_cluster/BUILD
index 2a4a252..4c8fd0c 100644
--- a/tradefed_cluster/BUILD
+++ b/tradefed_cluster/BUILD
@@ -411,10 +411,17 @@ py_test(
deps = [
":command_attempt_monitor_lib",
":command_event_test_util",
+ ":command_lib",
+ ":commander_lib",
+ ":common_lib",
+ ":datastore_entities_lib",
+ ":env_config_lib",
+ ":request_manager_lib",
":testbed_dependent_test_lib",
third_party("hamcrest"),
third_party("mock"),
third_party("webtest"),
+ "//third_party/py/six",
"//tradefed_cluster/util:ndb_shim_lib",
],
)
diff --git a/tradefed_cluster/command_attempt_monitor.py b/tradefed_cluster/command_attempt_monitor.py
index 7ad3154..66530b0 100644
--- a/tradefed_cluster/command_attempt_monitor.py
+++ b/tradefed_cluster/command_attempt_monitor.py
@@ -72,6 +72,23 @@ def SyncCommandAttempt(request_id, command_id, attempt_id):
logging.warning(
'No attempt found to sync. Request %s Command %s Attempt %s',
request_id, command_id, attempt_id)
+
+ # Make sure the command is still leasable if there was no attempt created
+ attempts = command_manager.GetCommandAttempts(request_id, command_id)
+ # There may be attempts under other IDs due to retries
+ if not attempts:
+ logging.warning(
+ 'Ensuring request %s command %s is leaseable as there are no'
+ ' attempts and %s was not found',
+ request_id,
+ command_id,
+ attempt_id,
+ )
+ command = command_manager.GetCommand(request_id, command_id)
+ try:
+ command_manager.EnsureLeasable(command)
+ except command_manager.CommandTaskNotFoundError:
+ logging.info('command %s %s has no tasks', request_id, command_id)
return
if attempt.state in common.FINAL_COMMAND_STATES:
diff --git a/tradefed_cluster/command_attempt_monitor_test.py b/tradefed_cluster/command_attempt_monitor_test.py
index 972c463..3059c26 100644
--- a/tradefed_cluster/command_attempt_monitor_test.py
+++ b/tradefed_cluster/command_attempt_monitor_test.py
@@ -145,15 +145,39 @@ class CommandAttemptMonitorTest(testbed_dependent_test.TestbedDependentTest):
mock_update.assert_not_called()
sync.assert_has_calls([mock.call(attempt)])
+ @mock.patch.object(command_manager, 'EnsureLeasable', autospec=True)
@mock.patch.object(
- command_manager, 'AddToSyncCommandAttemptQueue', autospec=True)
+ command_manager, 'AddToSyncCommandAttemptQueue', autospec=True
+ )
@mock.patch.object(commander, 'ProcessCommandEvent', autospec=True)
- def testSyncCommandAttempt_noAttempt(self, mock_update, sync):
+ def testSyncCommandAttempt_noAttempt(
+ self, mock_update, sync, ensure_leasable
+ ):
_, request_id, _, command_id = self.command.key.flat()
command_attempt_monitor.SyncCommandAttempt(request_id, command_id,
'attempt_id')
mock_update.assert_not_called()
sync.assert_not_called()
+ ensure_leasable.assert_called_once_with(self.command)
+
+ @mock.patch.object(command_manager, 'EnsureLeasable', autospec=True)
+ @mock.patch.object(
+ command_manager, 'AddToSyncCommandAttemptQueue', autospec=True
+ )
+ @mock.patch.object(commander, 'ProcessCommandEvent', autospec=True)
+ def testSyncCommandAttempt_noAttempt_hasExisting(
+ self, mock_update, sync, ensure_leasable
+ ):
+ _, request_id, _, command_id = self.command.key.flat()
+ command_event_test_util.CreateCommandAttempt(
+ self.command, 'attempt_id2', state=common.CommandState.UNKNOWN
+ )
+ command_attempt_monitor.SyncCommandAttempt(
+ request_id, command_id, 'attempt_id'
+ )
+ mock_update.assert_not_called()
+ sync.assert_not_called()
+ ensure_leasable.assert_not_called()
@mock.patch.object(command_attempt_monitor, 'Now', autospec=True)
@mock.patch.object(
diff --git a/tradefed_cluster/command_manager.py b/tradefed_cluster/command_manager.py
index 019d2af..fc37b10 100644
--- a/tradefed_cluster/command_manager.py
+++ b/tradefed_cluster/command_manager.py
@@ -542,7 +542,9 @@ def _RescheduleOrDeleteTask(task_id,
DeleteTask(task_id)
-def AddToSyncCommandAttemptQueue(attempt):
+def AddToSyncCommandAttemptQueue(
+ attempt, delay_minutes=MAX_COMMAND_EVENT_DELAY_MIN
+):
"""Add a command to the sync queue."""
logging.info("Monitoring command attempt: %s", attempt.key)
_, request_id, _, command_id, _, attempt_id = attempt.key.flat()
@@ -555,7 +557,8 @@ def AddToSyncCommandAttemptQueue(attempt):
task_scheduler.AddTask(
queue_name=COMMAND_ATTEMPT_SYNC_QUEUE,
payload=payload,
- eta=update_time + datetime.timedelta(minutes=MAX_COMMAND_EVENT_DELAY_MIN))
+ eta=update_time + datetime.timedelta(minutes=delay_minutes),
+ )
@ndb.transactional()
@@ -576,8 +579,11 @@ def UpdateCommandAttempt(event):
attempt_state_changed = False
if not attempt_entity:
logging.error(
- "attempt cannot be found, request_id: %s, command_id: %s, attempt_id: %s",
- event.request_id, event.command_id, event.attempt_id)
+ "attempt not found, request_id: %s, command_id: %s, attempt_id: %s",
+ event.request_id,
+ event.command_id,
+ event.attempt_id,
+ )
return False
elif (attempt_entity.last_event_time and
event.time and
diff --git a/tradefed_cluster/command_task_api.py b/tradefed_cluster/command_task_api.py
index 7abc3a9..54aa9b2 100644
--- a/tradefed_cluster/command_task_api.py
+++ b/tradefed_cluster/command_task_api.py
@@ -210,7 +210,7 @@ class CommandTaskApi(remote.Service):
device_serial=task.device_serials[0],
device_serials=task.device_serials,
plugin_data=plugin_data_)
- command_manager.AddToSyncCommandAttemptQueue(attempt_entity)
+ command_manager.AddToSyncCommandAttemptQueue(attempt_entity, 1)
attempt_entity.put()
stored_task = command_task_store.GetTask(task.task_id)