diff options
Diffstat (limited to 'avatar/pandora.py')
-rw-r--r-- | avatar/pandora.py | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/avatar/pandora.py b/avatar/pandora.py new file mode 100644 index 0000000..31695ee --- /dev/null +++ b/avatar/pandora.py @@ -0,0 +1,67 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio + +from avatar import BumblePandoraDevice +from avatar import PandoraDevice +from bumble.device import Connection as BumbleConnection +from mobly.asserts import assert_equal # type: ignore +from mobly.asserts import assert_is_not_none # type: ignore +from pandora._utils import AioStream +from pandora.host_pb2 import AdvertiseResponse +from pandora.host_pb2 import Connection +from pandora.host_pb2 import OwnAddressType +from pandora.host_pb2 import ScanningResponse +from typing import Optional, Tuple + + +def get_raw_connection_handle(device: PandoraDevice, connection: Connection) -> int: + assert isinstance(device, BumblePandoraDevice) + return int.from_bytes(connection.cookie.value, 'big') + + +def get_raw_connection(device: PandoraDevice, connection: Connection) -> Optional[BumbleConnection]: + assert isinstance(device, BumblePandoraDevice) + return device.device.lookup_connection(get_raw_connection_handle(device, connection)) + + +async def connect(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]: + init_res, wait_res = await asyncio.gather( + initiator.aio.host.Connect(address=acceptor.address), + acceptor.aio.host.WaitConnection(address=initiator.address), + ) + assert_equal(init_res.result_variant(), 'connection') + assert_equal(wait_res.result_variant(), 'connection') + assert init_res.connection is not None and wait_res.connection is not None + return init_res.connection, wait_res.connection + + +async def connect_le( + initiator: PandoraDevice, + acceptor: AioStream[AdvertiseResponse], + scan: ScanningResponse, + own_address_type: OwnAddressType, + cancel_advertisement: bool = True, +) -> Tuple[Connection, Connection]: + (init_res, wait_res) = await asyncio.gather( + initiator.aio.host.ConnectLE(own_address_type=own_address_type, **scan.address_asdict()), + anext(aiter(acceptor)), # pytype: disable=name-error + ) + if cancel_advertisement: + acceptor.cancel() + assert_equal(init_res.result_variant(), 'connection') + assert_is_not_none(init_res.connection) + assert init_res.connection + return init_res.connection, wait_res.connection |