aboutsummaryrefslogtreecommitdiff
path: root/avatar/pandora_server.py
blob: aafc3fc78ae66e3ecb941d1ff2f64dd270645cc3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Interface for controller-specific Pandora server management."""

import asyncio
import avatar.aio
import grpc
import grpc.aio
import portpicker
import threading
import types

from avatar.controllers import bumble_device
from avatar.controllers import pandora_device
from avatar.pandora_client import BumblePandoraClient
from avatar.pandora_client import PandoraClient
from bumble import pandora as bumble_server
from bumble.pandora.device import PandoraDevice as BumblePandoraDevice
from contextlib import suppress
from mobly.controllers import android_device
from mobly.controllers.android_device import AndroidDevice
from typing import Generic, Optional, TypeVar

ANDROID_SERVER_PACKAGE = 'com.android.pandora'
ANDROID_SERVER_GRPC_PORT = 8999


# Generic type for `PandoraServer`.
TDevice = TypeVar('TDevice')


class PandoraServer(Generic[TDevice]):
    """Abstract interface to manage the Pandora gRPC server on the device."""

    MOBLY_CONTROLLER_MODULE: types.ModuleType = pandora_device

    device: TDevice

    def __init__(self, device: TDevice) -> None:
        """Creates a PandoraServer.

        Args:
            device: A Mobly controller instance.
        """
        self.device = device

    def start(self) -> PandoraClient:
        """Sets up and starts the Pandora server on the device."""
        assert isinstance(self.device, PandoraClient)
        return self.device

    def stop(self) -> None:
        """Stops and cleans up the Pandora server on the device."""


class BumblePandoraServer(PandoraServer[BumblePandoraDevice]):
    """Manages the Pandora gRPC server on a BumbleDevice."""

    MOBLY_CONTROLLER_MODULE = bumble_device

    _task: Optional[asyncio.Task[None]] = None

    def start(self) -> BumblePandoraClient:
        """Sets up and starts the Pandora server on the Bumble device."""
        assert self._task is None

        # set the event loop to make sure the gRPC server use the avatar one.
        asyncio.set_event_loop(avatar.aio.loop)

        # create gRPC server & port.
        server = grpc.aio.server()
        port = server.add_insecure_port(f'localhost:{0}')

        config = bumble_server.Config()
        self._task = avatar.aio.loop.create_task(
            bumble_server.serve(self.device, config=config, grpc_server=server, port=port)
        )

        return BumblePandoraClient(f'localhost:{port}', self.device, config)

    def stop(self) -> None:
        """Stops and cleans up the Pandora server on the Bumble device."""

        async def server_stop() -> None:
            assert self._task is not None
            if not self._task.done():
                self._task.cancel()
                with suppress(asyncio.CancelledError):
                    await self._task
            self._task = None

        avatar.aio.run_until_complete(server_stop())


class AndroidPandoraServer(PandoraServer[AndroidDevice]):
    """Manages the Pandora gRPC server on an AndroidDevice."""

    MOBLY_CONTROLLER_MODULE = android_device

    _instrumentation: Optional[threading.Thread] = None
    _port: int

    def start(self) -> PandoraClient:
        """Sets up and starts the Pandora server on the Android device."""
        assert self._instrumentation is None

        # start Pandora Android gRPC server.
        self._port = portpicker.pick_unused_port()  # type: ignore
        self._instrumentation = threading.Thread(
            target=lambda: self.device.adb._exec_adb_cmd(  # type: ignore
                'shell',
                f'am instrument --no-hidden-api-checks -w {ANDROID_SERVER_PACKAGE}/.Main',
                shell=False,
                timeout=None,
                stderr=None,
            )
        )

        self._instrumentation.start()
        self.device.adb.forward([f'tcp:{self._port}', f'tcp:{ANDROID_SERVER_GRPC_PORT}'])  # type: ignore

        return PandoraClient(f'localhost:{self._port}', 'android')

    def stop(self) -> None:
        """Stops and cleans up the Pandora server on the Android device."""
        assert self._instrumentation is not None

        # Stop Pandora Android gRPC server.
        self.device.adb._exec_adb_cmd(  # type: ignore
            'shell', f'am force-stop {ANDROID_SERVER_PACKAGE}', shell=False, timeout=None, stderr=None
        )

        self.device.adb.forward(['--remove', f'tcp:{self._port}'])  # type: ignore
        self._instrumentation.join()
        self._instrumentation = None