diff options
author | Kevin Lau Fang <fangk@google.com> | 2023-04-07 15:36:50 -0700 |
---|---|---|
committer | Copybara-Service <copybara-worker@google.com> | 2023-04-07 15:37:38 -0700 |
commit | 528766d26c49412708708d66e80f02ecfc179abd (patch) | |
tree | 4e68d382ada29e6b7e9dd6420a08e094a78e6cf0 | |
parent | d5958c822cb0a252e9d247e84c0a84c6ea56454e (diff) | |
download | tradefed_cluster-528766d26c49412708708d66e80f02ecfc179abd.tar.gz |
Internal change
PiperOrigin-RevId: 522699259
Change-Id: I851bb15dcb9c57b9a9d2e2795ced3b9fd6b64979
-rw-r--r-- | tradefed_cluster/BUILD | 7 | ||||
-rw-r--r-- | tradefed_cluster/command_attempt_monitor.py | 17 | ||||
-rw-r--r-- | tradefed_cluster/command_attempt_monitor_test.py | 28 | ||||
-rw-r--r-- | tradefed_cluster/command_manager.py | 14 | ||||
-rw-r--r-- | tradefed_cluster/command_task_api.py | 2 |
5 files changed, 61 insertions, 7 deletions
diff --git a/tradefed_cluster/BUILD b/tradefed_cluster/BUILD index 2a4a252..4c8fd0c 100644 --- a/tradefed_cluster/BUILD +++ b/tradefed_cluster/BUILD @@ -411,10 +411,17 @@ py_test( deps = [ ":command_attempt_monitor_lib", ":command_event_test_util", + ":command_lib", + ":commander_lib", + ":common_lib", + ":datastore_entities_lib", + ":env_config_lib", + ":request_manager_lib", ":testbed_dependent_test_lib", third_party("hamcrest"), third_party("mock"), third_party("webtest"), + "//third_party/py/six", "//tradefed_cluster/util:ndb_shim_lib", ], ) diff --git a/tradefed_cluster/command_attempt_monitor.py b/tradefed_cluster/command_attempt_monitor.py index 7ad3154..66530b0 100644 --- a/tradefed_cluster/command_attempt_monitor.py +++ b/tradefed_cluster/command_attempt_monitor.py @@ -72,6 +72,23 @@ def SyncCommandAttempt(request_id, command_id, attempt_id): logging.warning( 'No attempt found to sync. Request %s Command %s Attempt %s', request_id, command_id, attempt_id) + + # Make sure the command is still leasable if there was no attempt created + attempts = command_manager.GetCommandAttempts(request_id, command_id) + # There may be attempts under other IDs due to retries + if not attempts: + logging.warning( + 'Ensuring request %s command %s is leaseable as there are no' + ' attempts and %s was not found', + request_id, + command_id, + attempt_id, + ) + command = command_manager.GetCommand(request_id, command_id) + try: + command_manager.EnsureLeasable(command) + except command_manager.CommandTaskNotFoundError: + logging.info('command %s %s has no tasks', request_id, command_id) return if attempt.state in common.FINAL_COMMAND_STATES: diff --git a/tradefed_cluster/command_attempt_monitor_test.py b/tradefed_cluster/command_attempt_monitor_test.py index 972c463..3059c26 100644 --- a/tradefed_cluster/command_attempt_monitor_test.py +++ b/tradefed_cluster/command_attempt_monitor_test.py @@ -145,15 +145,39 @@ class CommandAttemptMonitorTest(testbed_dependent_test.TestbedDependentTest): mock_update.assert_not_called() sync.assert_has_calls([mock.call(attempt)]) + @mock.patch.object(command_manager, 'EnsureLeasable', autospec=True) @mock.patch.object( - command_manager, 'AddToSyncCommandAttemptQueue', autospec=True) + command_manager, 'AddToSyncCommandAttemptQueue', autospec=True + ) @mock.patch.object(commander, 'ProcessCommandEvent', autospec=True) - def testSyncCommandAttempt_noAttempt(self, mock_update, sync): + def testSyncCommandAttempt_noAttempt( + self, mock_update, sync, ensure_leasable + ): _, request_id, _, command_id = self.command.key.flat() command_attempt_monitor.SyncCommandAttempt(request_id, command_id, 'attempt_id') mock_update.assert_not_called() sync.assert_not_called() + ensure_leasable.assert_called_once_with(self.command) + + @mock.patch.object(command_manager, 'EnsureLeasable', autospec=True) + @mock.patch.object( + command_manager, 'AddToSyncCommandAttemptQueue', autospec=True + ) + @mock.patch.object(commander, 'ProcessCommandEvent', autospec=True) + def testSyncCommandAttempt_noAttempt_hasExisting( + self, mock_update, sync, ensure_leasable + ): + _, request_id, _, command_id = self.command.key.flat() + command_event_test_util.CreateCommandAttempt( + self.command, 'attempt_id2', state=common.CommandState.UNKNOWN + ) + command_attempt_monitor.SyncCommandAttempt( + request_id, command_id, 'attempt_id' + ) + mock_update.assert_not_called() + sync.assert_not_called() + ensure_leasable.assert_not_called() @mock.patch.object(command_attempt_monitor, 'Now', autospec=True) @mock.patch.object( diff --git a/tradefed_cluster/command_manager.py b/tradefed_cluster/command_manager.py index 019d2af..fc37b10 100644 --- a/tradefed_cluster/command_manager.py +++ b/tradefed_cluster/command_manager.py @@ -542,7 +542,9 @@ def _RescheduleOrDeleteTask(task_id, DeleteTask(task_id) -def AddToSyncCommandAttemptQueue(attempt): +def AddToSyncCommandAttemptQueue( + attempt, delay_minutes=MAX_COMMAND_EVENT_DELAY_MIN +): """Add a command to the sync queue.""" logging.info("Monitoring command attempt: %s", attempt.key) _, request_id, _, command_id, _, attempt_id = attempt.key.flat() @@ -555,7 +557,8 @@ def AddToSyncCommandAttemptQueue(attempt): task_scheduler.AddTask( queue_name=COMMAND_ATTEMPT_SYNC_QUEUE, payload=payload, - eta=update_time + datetime.timedelta(minutes=MAX_COMMAND_EVENT_DELAY_MIN)) + eta=update_time + datetime.timedelta(minutes=delay_minutes), + ) @ndb.transactional() @@ -576,8 +579,11 @@ def UpdateCommandAttempt(event): attempt_state_changed = False if not attempt_entity: logging.error( - "attempt cannot be found, request_id: %s, command_id: %s, attempt_id: %s", - event.request_id, event.command_id, event.attempt_id) + "attempt not found, request_id: %s, command_id: %s, attempt_id: %s", + event.request_id, + event.command_id, + event.attempt_id, + ) return False elif (attempt_entity.last_event_time and event.time and diff --git a/tradefed_cluster/command_task_api.py b/tradefed_cluster/command_task_api.py index 7abc3a9..54aa9b2 100644 --- a/tradefed_cluster/command_task_api.py +++ b/tradefed_cluster/command_task_api.py @@ -210,7 +210,7 @@ class CommandTaskApi(remote.Service): device_serial=task.device_serials[0], device_serials=task.device_serials, plugin_data=plugin_data_) - command_manager.AddToSyncCommandAttemptQueue(attempt_entity) + command_manager.AddToSyncCommandAttemptQueue(attempt_entity, 1) attempt_entity.put() stored_task = command_task_store.GetTask(task.task_id) |