aboutsummaryrefslogtreecommitdiff
path: root/gd/hci/cert/acl_manager_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'gd/hci/cert/acl_manager_test.py')
-rw-r--r--gd/hci/cert/acl_manager_test.py393
1 files changed, 74 insertions, 319 deletions
diff --git a/gd/hci/cert/acl_manager_test.py b/gd/hci/cert/acl_manager_test.py
index 1f25c7d51..ce8bea63a 100644
--- a/gd/hci/cert/acl_manager_test.py
+++ b/gd/hci/cert/acl_manager_test.py
@@ -14,351 +14,106 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-import sys
-import logging
-
-from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
-from cert.event_callback_stream import EventCallbackStream
-from cert.event_asserts import EventAsserts
-from google.protobuf import empty_pb2 as empty_proto
-from facade import rootservice_pb2 as facade_rootservice
-from hci.facade import acl_manager_facade_pb2 as acl_manager_facade
+from cert.gd_base_test import GdBaseTestClass
+from cert.truth import assertThat
from neighbor.facade import facade_pb2 as neighbor_facade
-from hci.facade import controller_facade_pb2 as controller_facade
-from hci.facade import facade_pb2 as hci_facade
-import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import hci_packets
+from cert.py_hci import PyHci
+from cert.py_acl_manager import PyAclManager
-class AclManagerTest(GdFacadeOnlyBaseTestClass):
+class AclManagerTest(GdBaseTestClass):
- def setup_test(self):
- self.device_under_test.rootservice.StartStack(
- facade_rootservice.StartStackRequest(
- module_under_test=facade_rootservice.BluetoothModule.Value(
- 'HCI_INTERFACES'),))
- self.cert_device.rootservice.StartStack(
- facade_rootservice.StartStackRequest(
- module_under_test=facade_rootservice.BluetoothModule.Value(
- 'HCI'),))
+ def setup_class(self):
+ super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')
- self.device_under_test.wait_channel_ready()
- self.cert_device.wait_channel_ready()
+ # todo: move into GdBaseTestClass, based on modules inited
+ def setup_test(self):
+ super().setup_test()
+ self.cert_hci = PyHci(self.cert, acl_streaming=True)
+ self.dut_acl_manager = PyAclManager(self.dut)
def teardown_test(self):
- self.device_under_test.rootservice.StopStack(
- facade_rootservice.StopStackRequest())
- self.cert_device.rootservice.StopStack(
- facade_rootservice.StopStackRequest())
-
- def register_for_event(self, event_code):
- msg = hci_facade.EventCodeMsg(code=int(event_code))
- self.cert_device.hci.RegisterEventHandler(msg)
-
- def enqueue_hci_command(self, command, expect_complete):
- cmd_bytes = bytes(command.Serialize())
- cmd = hci_facade.CommandMsg(command=cmd_bytes)
- if (expect_complete):
- self.cert_device.hci.EnqueueCommandWithComplete(cmd)
- else:
- self.cert_device.hci.EnqueueCommandWithStatus(cmd)
-
- def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
- acl_msg = hci_facade.AclMsg(
- handle=int(handle),
- packet_boundary_flag=int(pb_flag),
- broadcast_flag=int(b_flag),
- data=acl)
- self.cert_device.hci.SendAclData(acl_msg)
+ self.cert_hci.close()
+ super().teardown_test()
def test_dut_connects(self):
- self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST)
- self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
- self.register_for_event(
- hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
- with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \
- EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
- EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
-
- cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
- acl_data_asserts = EventAsserts(acl_data_stream)
- cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
-
- # CERT Enables scans and gets its address
- self.enqueue_hci_command(
- hci_packets.WriteScanEnableBuilder(
- hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
-
- cert_address = None
-
- def get_address_from_complete(packet):
- packet_bytes = packet.event
- if b'\x0e\x0a\x01\x09\x10' in packet_bytes:
- nonlocal cert_address
- addr_view = hci_packets.ReadBdAddrCompleteView(
- hci_packets.CommandCompleteView(
- hci_packets.EventPacketView(
- bt_packets.PacketViewLittleEndian(
- list(packet_bytes)))))
- cert_address = addr_view.GetBdAddr()
- return True
- return False
-
- self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
-
- cert_hci_event_asserts.assert_event_occurs(
- get_address_from_complete)
-
- with EventCallbackStream(
- self.device_under_test.hci_acl_manager.CreateConnection(
- acl_manager_facade.ConnectionMsg(
- address_type=int(
- hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
- address=bytes(cert_address,
- 'utf8')))) as connection_event_stream:
-
- connection_event_asserts = EventAsserts(connection_event_stream)
- connection_request = None
-
- def get_connect_request(packet):
- if b'\x04\x0a' in packet.event:
- nonlocal connection_request
- connection_request = hci_packets.ConnectionRequestView(
- hci_packets.EventPacketView(
- bt_packets.PacketViewLittleEndian(
- list(packet.event))))
- return True
- return False
-
- # Cert Accepts
- cert_hci_event_asserts.assert_event_occurs(get_connect_request)
- self.enqueue_hci_command(
- hci_packets.AcceptConnectionRequestBuilder(
- connection_request.GetBdAddr(),
- hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
- False)
-
- # Cert gets ConnectionComplete with a handle and sends ACL data
- handle = 0xfff
-
- def get_handle(packet):
- packet_bytes = packet.event
- if b'\x03\x0b\x00' in packet_bytes:
- nonlocal handle
- cc_view = hci_packets.ConnectionCompleteView(
- hci_packets.EventPacketView(
- bt_packets.PacketViewLittleEndian(
- list(packet_bytes))))
- handle = cc_view.GetConnectionHandle()
- return True
- return False
-
- cert_hci_event_asserts.assert_event_occurs(get_handle)
- cert_handle = handle
+ self.cert_hci.enable_inquiry_and_page_scan()
+ cert_address = self.cert_hci.read_own_address()
- self.enqueue_acl_data(
- cert_handle, hci_packets.PacketBoundaryFlag.
- FIRST_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT,
- bytes(
- b'\x26\x00\x07\x00This is just SomeAclData from the Cert'
- ))
+ self.dut_acl_manager.initiate_connection(cert_address)
+ cert_acl = self.cert_hci.accept_connection()
+ with self.dut_acl_manager.complete_outgoing_connection() as dut_acl:
+ cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
+ dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT')
- # DUT gets a connection complete event and sends and receives
- handle = 0xfff
- connection_event_asserts.assert_event_occurs(get_handle)
-
- self.device_under_test.hci_acl_manager.SendAclData(
- acl_manager_facade.AclData(
- handle=handle,
- payload=bytes(
- b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
- )))
-
- cert_acl_data_asserts.assert_event_occurs(
- lambda packet: b'SomeMoreAclData' in packet.data)
- acl_data_asserts.assert_event_occurs(
- lambda packet: b'SomeAclData' in packet.payload)
+ assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
+ assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
def test_cert_connects(self):
- self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
- self.register_for_event(hci_packets.EventCode.ROLE_CHANGE)
- self.register_for_event(
- hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
- with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \
- EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
- EventCallbackStream(self.device_under_test.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \
- EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
-
- cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
- incoming_connection_asserts = EventAsserts(
- incoming_connection_stream)
- cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
- acl_data_asserts = EventAsserts(acl_data_stream)
-
- # DUT Enables scans and gets its address
- dut_address = self.device_under_test.hci_controller.GetMacAddress(
- empty_proto.Empty()).address
-
- self.device_under_test.neighbor.EnablePageScan(
- neighbor_facade.EnableMsg(enabled=True))
-
- # Cert connects
- self.enqueue_hci_command(
- hci_packets.CreateConnectionBuilder(
- dut_address.decode('utf-8'),
- 0xcc18, # Packet Type
- hci_packets.PageScanRepetitionMode.R1,
- 0x0,
- hci_packets.ClockOffsetValid.INVALID,
- hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH),
- False)
-
- conn_handle = 0xfff
-
- def get_handle(packet):
- packet_bytes = packet.event
- if b'\x03\x0b\x00' in packet_bytes:
- nonlocal conn_handle
- cc_view = hci_packets.ConnectionCompleteView(
- hci_packets.EventPacketView(
- bt_packets.PacketViewLittleEndian(
- list(packet_bytes))))
- conn_handle = cc_view.GetConnectionHandle()
- return True
- return False
-
- # DUT gets a connection request
- incoming_connection_asserts.assert_event_occurs(get_handle)
+ dut_address = self.dut.hci_controller.GetMacAddressSimple()
+ self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
- self.device_under_test.hci_acl_manager.SendAclData(
- acl_manager_facade.AclData(
- handle=conn_handle,
- payload=bytes(
- b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
- )))
+ self.dut_acl_manager.listen_for_an_incoming_connection()
+ self.cert_hci.initiate_connection(dut_address)
+ with self.dut_acl_manager.complete_incoming_connection() as dut_acl:
+ cert_acl = self.cert_hci.complete_connection()
- conn_handle = 0xfff
+ dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT')
- cert_hci_event_asserts.assert_event_occurs(get_handle)
- cert_handle = conn_handle
+ cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
- self.enqueue_acl_data(
- cert_handle,
- hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT,
- bytes(
- b'\x26\x00\x07\x00This is just SomeAclData from the Cert'))
+ assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
+ assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
- cert_acl_data_asserts.assert_event_occurs(
- lambda packet: b'SomeMoreAclData' in packet.data)
- acl_data_asserts.assert_event_occurs(
- lambda packet: b'SomeAclData' in packet.payload)
+ def test_reject_broadcast(self):
+ dut_address = self.dut.hci_controller.GetMacAddressSimple()
+ self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
- def test_recombination_l2cap_packet(self):
- self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST)
- self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
- self.register_for_event(
- hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
- with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \
- EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
- EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
-
- cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
- acl_data_asserts = EventAsserts(acl_data_stream)
-
- # CERT Enables scans and gets its address
- self.enqueue_hci_command(
- hci_packets.WriteScanEnableBuilder(
- hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
-
- cert_address = None
-
- def get_address_from_complete(packet):
- packet_bytes = packet.event
- if b'\x0e\x0a\x01\x09\x10' in packet_bytes:
- nonlocal cert_address
- addr_view = hci_packets.ReadBdAddrCompleteView(
- hci_packets.CommandCompleteView(
- hci_packets.EventPacketView(
- bt_packets.PacketViewLittleEndian(
- list(packet_bytes)))))
- cert_address = addr_view.GetBdAddr()
- return True
- return False
+ self.dut_acl_manager.listen_for_an_incoming_connection()
+ self.cert_hci.initiate_connection(dut_address)
+ with self.dut_acl_manager.complete_incoming_connection() as dut_acl:
+ cert_acl = self.cert_hci.complete_connection()
- self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
+ cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
+ hci_packets.BroadcastFlag.ACTIVE_PERIPHERAL_BROADCAST,
+ b'\x26\x00\x07\x00This is a Broadcast from the Cert')
+ assertThat(dut_acl).emitsNone()
- cert_hci_event_asserts.assert_event_occurs(
- get_address_from_complete)
+ cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
+ hci_packets.BroadcastFlag.POINT_TO_POINT,
+ b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
+ assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
- with EventCallbackStream(
- self.device_under_test.hci_acl_manager.CreateConnection(
- acl_manager_facade.ConnectionMsg(
- address_type=int(
- hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
- address=bytes(cert_address,
- 'utf8')))) as connection_event_stream:
+ def test_cert_connects_disconnects(self):
+ dut_address = self.dut.hci_controller.GetMacAddressSimple()
+ self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
- connection_event_asserts = EventAsserts(connection_event_stream)
- connection_request = None
+ self.dut_acl_manager.listen_for_an_incoming_connection()
+ self.cert_hci.initiate_connection(dut_address)
+ with self.dut_acl_manager.complete_incoming_connection() as dut_acl:
+ cert_acl = self.cert_hci.complete_connection()
- def get_connect_request(packet):
- if b'\x04\x0a' in packet.event:
- nonlocal connection_request
- connection_request = hci_packets.ConnectionRequestView(
- hci_packets.EventPacketView(
- bt_packets.PacketViewLittleEndian(
- list(packet.event))))
- return True
- return False
+ dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT')
- # Cert Accepts
- cert_hci_event_asserts.assert_event_occurs(get_connect_request)
- self.enqueue_hci_command(
- hci_packets.AcceptConnectionRequestBuilder(
- connection_request.GetBdAddr(),
- hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
- False)
+ cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
- # Cert gets ConnectionComplete with a handle and sends ACL data
- handle = 0xfff
+ assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
+ assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
- def get_handle(packet):
- packet_bytes = packet.event
- if b'\x03\x0b\x00' in packet_bytes:
- nonlocal handle
- cc_view = hci_packets.ConnectionCompleteView(
- hci_packets.EventPacketView(
- bt_packets.PacketViewLittleEndian(
- list(packet_bytes))))
- handle = cc_view.GetConnectionHandle()
- return True
- return False
+ dut_acl.disconnect(hci_packets.DisconnectReason.REMOTE_USER_TERMINATED_CONNECTION)
+ dut_acl.wait_for_disconnection_complete()
- cert_hci_event_asserts.assert_event_occurs(get_handle)
- cert_handle = handle
-
- self.enqueue_acl_data(
- cert_handle, hci_packets.PacketBoundaryFlag.
- FIRST_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT,
- bytes(b'\x06\x00\x07\x00Hello'))
- self.enqueue_acl_data(
- cert_handle,
- hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
- hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!'))
- self.enqueue_acl_data(
- cert_handle, hci_packets.PacketBoundaryFlag.
- FIRST_AUTOMATICALLY_FLUSHABLE,
- hci_packets.BroadcastFlag.POINT_TO_POINT,
- bytes(b'\xe8\x03\x07\x00' + b'Hello' * 200))
-
- # DUT gets a connection complete event and sends and receives
- connection_event_asserts.assert_event_occurs(get_handle)
-
- acl_data_asserts.assert_event_occurs(
- lambda packet: b'Hello!' in packet.payload)
- acl_data_asserts.assert_event_occurs(
- lambda packet: b'Hello' * 200 in packet.payload)
+ def test_recombination_l2cap_packet(self):
+ self.cert_hci.enable_inquiry_and_page_scan()
+ cert_address = self.cert_hci.read_own_address()
+
+ self.dut_acl_manager.initiate_connection(cert_address)
+ cert_acl = self.cert_hci.accept_connection()
+ with self.dut_acl_manager.complete_outgoing_connection() as dut_acl:
+ cert_acl.send_first(b'\x06\x00\x07\x00Hello')
+ cert_acl.send_continuing(b'!')
+ cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200)
+
+ assertThat(dut_acl).emits(lambda packet: b'Hello!' in packet.payload,
+ lambda packet: b'Hello' * 200 in packet.payload).inOrder()