summaryrefslogtreecommitdiff
path: root/harnesses/host_controller/invocation_thread.py
blob: b8760f39b45a3f9f8970dcb65a459f8e9ae7c97c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#
# Copyright (C) 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import socket
import threading

import httplib2
from googleapiclient import errors

from host_controller.tfc import command_attempt
from host_controller.tradefed import remote_operation


class InvocationThread(threading.Thread):
    """The thread that remotely executes a command task.

    Attributes:
        _remote_client: The RemoteClient which executes the command.
        _tfc_client: The TfcClient to which the command events are sent.
        _attempt: The CommandAttempt whose events are sent to TFC.
        _command: A list of strings, the command and arguments.
        device_serials: A list of strings, the serial numbers of the devices
                        which need to be allocated to the task.
        _allocated_serials: A list of strings, the serial numbers of the devices
                            which are successfully allocated.
        _tfc_heartbeat_interval: The interval of TestRunInProgress events in
                                 seconds.
    """

    def __init__(self,
                 remote_client,
                 tfc_client,
                 attempt,
                 command,
                 device_serials,
                 tfc_heartbeat_interval=5 * 60):
        """Initializes the attributes."""
        super(InvocationThread, self).__init__()
        self._remote_client = remote_client
        self._tfc_client = tfc_client
        self._attempt = attempt
        self._command = command
        self.device_serials = device_serials
        self._allocated_serials = None
        # The value in Java implementation is 5 minutes.
        self._tfc_heartbeat_interval = tfc_heartbeat_interval

    def _AllocateDevices(self):
        """Allocates all of device_serial."""
        for serial in self.device_serials:
            self._remote_client.SendOperation(
                    remote_operation.AllocateDevice(serial))
            self._allocated_serials.append(serial)

    def _StartInvocation(self):
        """Starts executing command and sends the event to TFC."""
        self._remote_client.SendOperation(
                remote_operation.ExecuteCommand(self.device_serials[0],
                                                *self._command))
        event = self._attempt.CreateCommandEvent(
                command_attempt.EventType.INVOCATION_STARTED)
        self._tfc_client.SubmitCommandEvents([event])

    def _WaitForCommandResult(self):
        """Waits for command result and keeps sending heartbeat to TFC

        Returns:
            A JSON object returned from TradeFed remote manager.
        """
        while True:
            result = self._remote_client.WaitForCommandResult(
                    self.device_serials[0], self._tfc_heartbeat_interval)
            if result:
                return result
            event = self._attempt.CreateCommandEvent(
                    command_attempt.EventType.TEST_RUN_IN_PROGRESS)
            self._tfc_client.SubmitCommandEvents([event])

    def _CompleteInvocation(self, result):
        """Sends InvocationCompleted event according to the result.

        Args:
            result: A JSON object returned from TradeFed remote manager.
        """
        if result["status"] == "INVOCATION_SUCCESS":
            event = self._attempt.CreateInvocationCompletedEvent(
                    str(result), 1, 0)
        else:
            event = self._attempt.CreateInvocationCompletedEvent(
                    str(result), 1, 1, error=str(result))
        self._tfc_client.SubmitCommandEvents([event])

    def _FreeAllocatedDevices(self):
        """Frees allocated devices and tolerates RemoteOperationException."""
        for serial in self._allocated_serials:
            try:
                self._remote_client.SendOperation(
                        remote_operation.FreeDevice(serial))
            except remote_operation.RemoteOperationException as e:
                logging.exception(e)
            except socket.error as e:
                logging.exception(e)
                break
        self._allocated_serials = []

    def _SubmitErrorEvent(self, event_type, error_msg):
        """Submits an error event and tolerates http exceptions.

        Args:
            event_type: A string, the type of the command event.
            error_msg: A string, the error message.
        """
        try:
            self._tfc_client.SubmitCommandEvents(
                [self._attempt.CreateCommandEvent(event_type, error_msg)])
        except (httplib2.HttpLib2Error, errors.HttpError) as e:
            logging.exception(e)

    # @Override
    def run(self):
        """Executes a command task with exception handling."""
        self._allocated_serials = []
        last_error = None
        error_event = command_attempt.EventType.ALLOCATION_FAILED
        try:
            self._AllocateDevices()
            error_event = command_attempt.EventType.EXECUTE_FAILED
            self._StartInvocation()
            result = self._WaitForCommandResult()
            self._CompleteInvocation(result)
            error_event = None
        except errors.HttpError as e:
            logging.exception(e)
            last_error = e
        except remote_operation.RemoteOperationException as e:
            logging.exception(e)
            last_error = e
            # ConfigurationException on TradeFed remote manager.
            if str(e).startswith("Config error: "):
                error_event = command_attempt.EventType.CONFIGURATION_ERROR
        except httplib2.HttpLib2Error as e:
            logging.exception("Cannot communicate with TradeFed cluster: %s\n"
                              "Skip submitting event %s.", e, error_event)
            last_error = e
            error_event = None
        except socket.error as e:
            logging.exception("Cannot communicate with TradeFed remote "
                              "manager: %s\nSkip freeing devices %s.",
                              e, self._allocated_serials)
            last_error = e
            self._allocated_serials = []
        finally:
            if error_event:
                self._SubmitErrorEvent(error_event, str(last_error))
            self._FreeAllocatedDevices()