diff options
Diffstat (limited to 'gd/hci/cert/acl_manager_test.py')
-rw-r--r-- | gd/hci/cert/acl_manager_test.py | 393 |
1 files changed, 74 insertions, 319 deletions
diff --git a/gd/hci/cert/acl_manager_test.py b/gd/hci/cert/acl_manager_test.py index 1f25c7d51..ce8bea63a 100644 --- a/gd/hci/cert/acl_manager_test.py +++ b/gd/hci/cert/acl_manager_test.py @@ -14,351 +14,106 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import sys -import logging - -from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass -from cert.event_callback_stream import EventCallbackStream -from cert.event_asserts import EventAsserts -from google.protobuf import empty_pb2 as empty_proto -from facade import rootservice_pb2 as facade_rootservice -from hci.facade import acl_manager_facade_pb2 as acl_manager_facade +from cert.gd_base_test import GdBaseTestClass +from cert.truth import assertThat from neighbor.facade import facade_pb2 as neighbor_facade -from hci.facade import controller_facade_pb2 as controller_facade -from hci.facade import facade_pb2 as hci_facade -import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import hci_packets +from cert.py_hci import PyHci +from cert.py_acl_manager import PyAclManager -class AclManagerTest(GdFacadeOnlyBaseTestClass): +class AclManagerTest(GdBaseTestClass): - def setup_test(self): - self.device_under_test.rootservice.StartStack( - facade_rootservice.StartStackRequest( - module_under_test=facade_rootservice.BluetoothModule.Value( - 'HCI_INTERFACES'),)) - self.cert_device.rootservice.StartStack( - facade_rootservice.StartStackRequest( - module_under_test=facade_rootservice.BluetoothModule.Value( - 'HCI'),)) + def setup_class(self): + super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI') - self.device_under_test.wait_channel_ready() - self.cert_device.wait_channel_ready() + # todo: move into GdBaseTestClass, based on modules inited + def setup_test(self): + super().setup_test() + self.cert_hci = PyHci(self.cert, acl_streaming=True) + self.dut_acl_manager = PyAclManager(self.dut) def teardown_test(self): - self.device_under_test.rootservice.StopStack( - facade_rootservice.StopStackRequest()) - self.cert_device.rootservice.StopStack( - facade_rootservice.StopStackRequest()) - - def register_for_event(self, event_code): - msg = hci_facade.EventCodeMsg(code=int(event_code)) - self.cert_device.hci.RegisterEventHandler(msg) - - def enqueue_hci_command(self, command, expect_complete): - cmd_bytes = bytes(command.Serialize()) - cmd = hci_facade.CommandMsg(command=cmd_bytes) - if (expect_complete): - self.cert_device.hci.EnqueueCommandWithComplete(cmd) - else: - self.cert_device.hci.EnqueueCommandWithStatus(cmd) - - def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): - acl_msg = hci_facade.AclMsg( - handle=int(handle), - packet_boundary_flag=int(pb_flag), - broadcast_flag=int(b_flag), - data=acl) - self.cert_device.hci.SendAclData(acl_msg) + self.cert_hci.close() + super().teardown_test() def test_dut_connects(self): - self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) - self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) - self.register_for_event( - hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) - with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ - EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ - EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: - - cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) - acl_data_asserts = EventAsserts(acl_data_stream) - cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) - - # CERT Enables scans and gets its address - self.enqueue_hci_command( - hci_packets.WriteScanEnableBuilder( - hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True) - - cert_address = None - - def get_address_from_complete(packet): - packet_bytes = packet.event - if b'\x0e\x0a\x01\x09\x10' in packet_bytes: - nonlocal cert_address - addr_view = hci_packets.ReadBdAddrCompleteView( - hci_packets.CommandCompleteView( - hci_packets.EventPacketView( - bt_packets.PacketViewLittleEndian( - list(packet_bytes))))) - cert_address = addr_view.GetBdAddr() - return True - return False - - self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) - - cert_hci_event_asserts.assert_event_occurs( - get_address_from_complete) - - with EventCallbackStream( - self.device_under_test.hci_acl_manager.CreateConnection( - acl_manager_facade.ConnectionMsg( - address_type=int( - hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS), - address=bytes(cert_address, - 'utf8')))) as connection_event_stream: - - connection_event_asserts = EventAsserts(connection_event_stream) - connection_request = None - - def get_connect_request(packet): - if b'\x04\x0a' in packet.event: - nonlocal connection_request - connection_request = hci_packets.ConnectionRequestView( - hci_packets.EventPacketView( - bt_packets.PacketViewLittleEndian( - list(packet.event)))) - return True - return False - - # Cert Accepts - cert_hci_event_asserts.assert_event_occurs(get_connect_request) - self.enqueue_hci_command( - hci_packets.AcceptConnectionRequestBuilder( - connection_request.GetBdAddr(), - hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE), - False) - - # Cert gets ConnectionComplete with a handle and sends ACL data - handle = 0xfff - - def get_handle(packet): - packet_bytes = packet.event - if b'\x03\x0b\x00' in packet_bytes: - nonlocal handle - cc_view = hci_packets.ConnectionCompleteView( - hci_packets.EventPacketView( - bt_packets.PacketViewLittleEndian( - list(packet_bytes)))) - handle = cc_view.GetConnectionHandle() - return True - return False - - cert_hci_event_asserts.assert_event_occurs(get_handle) - cert_handle = handle + self.cert_hci.enable_inquiry_and_page_scan() + cert_address = self.cert_hci.read_own_address() - self.enqueue_acl_data( - cert_handle, hci_packets.PacketBoundaryFlag. - FIRST_AUTOMATICALLY_FLUSHABLE, - hci_packets.BroadcastFlag.POINT_TO_POINT, - bytes( - b'\x26\x00\x07\x00This is just SomeAclData from the Cert' - )) + self.dut_acl_manager.initiate_connection(cert_address) + cert_acl = self.cert_hci.accept_connection() + with self.dut_acl_manager.complete_outgoing_connection() as dut_acl: + cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert') + dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT') - # DUT gets a connection complete event and sends and receives - handle = 0xfff - connection_event_asserts.assert_event_occurs(get_handle) - - self.device_under_test.hci_acl_manager.SendAclData( - acl_manager_facade.AclData( - handle=handle, - payload=bytes( - b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' - ))) - - cert_acl_data_asserts.assert_event_occurs( - lambda packet: b'SomeMoreAclData' in packet.data) - acl_data_asserts.assert_event_occurs( - lambda packet: b'SomeAclData' in packet.payload) + assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload) + assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload) def test_cert_connects(self): - self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) - self.register_for_event(hci_packets.EventCode.ROLE_CHANGE) - self.register_for_event( - hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) - with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ - EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ - EventCallbackStream(self.device_under_test.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ - EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: - - cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) - incoming_connection_asserts = EventAsserts( - incoming_connection_stream) - cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) - acl_data_asserts = EventAsserts(acl_data_stream) - - # DUT Enables scans and gets its address - dut_address = self.device_under_test.hci_controller.GetMacAddress( - empty_proto.Empty()).address - - self.device_under_test.neighbor.EnablePageScan( - neighbor_facade.EnableMsg(enabled=True)) - - # Cert connects - self.enqueue_hci_command( - hci_packets.CreateConnectionBuilder( - dut_address.decode('utf-8'), - 0xcc18, # Packet Type - hci_packets.PageScanRepetitionMode.R1, - 0x0, - hci_packets.ClockOffsetValid.INVALID, - hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH), - False) - - conn_handle = 0xfff - - def get_handle(packet): - packet_bytes = packet.event - if b'\x03\x0b\x00' in packet_bytes: - nonlocal conn_handle - cc_view = hci_packets.ConnectionCompleteView( - hci_packets.EventPacketView( - bt_packets.PacketViewLittleEndian( - list(packet_bytes)))) - conn_handle = cc_view.GetConnectionHandle() - return True - return False - - # DUT gets a connection request - incoming_connection_asserts.assert_event_occurs(get_handle) + dut_address = self.dut.hci_controller.GetMacAddressSimple() + self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) - self.device_under_test.hci_acl_manager.SendAclData( - acl_manager_facade.AclData( - handle=conn_handle, - payload=bytes( - b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' - ))) + self.dut_acl_manager.listen_for_an_incoming_connection() + self.cert_hci.initiate_connection(dut_address) + with self.dut_acl_manager.complete_incoming_connection() as dut_acl: + cert_acl = self.cert_hci.complete_connection() - conn_handle = 0xfff + dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT') - cert_hci_event_asserts.assert_event_occurs(get_handle) - cert_handle = conn_handle + cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert') - self.enqueue_acl_data( - cert_handle, - hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, - hci_packets.BroadcastFlag.POINT_TO_POINT, - bytes( - b'\x26\x00\x07\x00This is just SomeAclData from the Cert')) + assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload) + assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload) - cert_acl_data_asserts.assert_event_occurs( - lambda packet: b'SomeMoreAclData' in packet.data) - acl_data_asserts.assert_event_occurs( - lambda packet: b'SomeAclData' in packet.payload) + def test_reject_broadcast(self): + dut_address = self.dut.hci_controller.GetMacAddressSimple() + self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) - def test_recombination_l2cap_packet(self): - self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) - self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) - self.register_for_event( - hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) - with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ - EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ - EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: - - cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) - acl_data_asserts = EventAsserts(acl_data_stream) - - # CERT Enables scans and gets its address - self.enqueue_hci_command( - hci_packets.WriteScanEnableBuilder( - hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True) - - cert_address = None - - def get_address_from_complete(packet): - packet_bytes = packet.event - if b'\x0e\x0a\x01\x09\x10' in packet_bytes: - nonlocal cert_address - addr_view = hci_packets.ReadBdAddrCompleteView( - hci_packets.CommandCompleteView( - hci_packets.EventPacketView( - bt_packets.PacketViewLittleEndian( - list(packet_bytes))))) - cert_address = addr_view.GetBdAddr() - return True - return False + self.dut_acl_manager.listen_for_an_incoming_connection() + self.cert_hci.initiate_connection(dut_address) + with self.dut_acl_manager.complete_incoming_connection() as dut_acl: + cert_acl = self.cert_hci.complete_connection() - self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) + cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, + hci_packets.BroadcastFlag.ACTIVE_PERIPHERAL_BROADCAST, + b'\x26\x00\x07\x00This is a Broadcast from the Cert') + assertThat(dut_acl).emitsNone() - cert_hci_event_asserts.assert_event_occurs( - get_address_from_complete) + cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, + hci_packets.BroadcastFlag.POINT_TO_POINT, + b'\x26\x00\x07\x00This is just SomeAclData from the Cert') + assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload) - with EventCallbackStream( - self.device_under_test.hci_acl_manager.CreateConnection( - acl_manager_facade.ConnectionMsg( - address_type=int( - hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS), - address=bytes(cert_address, - 'utf8')))) as connection_event_stream: + def test_cert_connects_disconnects(self): + dut_address = self.dut.hci_controller.GetMacAddressSimple() + self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) - connection_event_asserts = EventAsserts(connection_event_stream) - connection_request = None + self.dut_acl_manager.listen_for_an_incoming_connection() + self.cert_hci.initiate_connection(dut_address) + with self.dut_acl_manager.complete_incoming_connection() as dut_acl: + cert_acl = self.cert_hci.complete_connection() - def get_connect_request(packet): - if b'\x04\x0a' in packet.event: - nonlocal connection_request - connection_request = hci_packets.ConnectionRequestView( - hci_packets.EventPacketView( - bt_packets.PacketViewLittleEndian( - list(packet.event)))) - return True - return False + dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT') - # Cert Accepts - cert_hci_event_asserts.assert_event_occurs(get_connect_request) - self.enqueue_hci_command( - hci_packets.AcceptConnectionRequestBuilder( - connection_request.GetBdAddr(), - hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE), - False) + cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert') - # Cert gets ConnectionComplete with a handle and sends ACL data - handle = 0xfff + assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload) + assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload) - def get_handle(packet): - packet_bytes = packet.event - if b'\x03\x0b\x00' in packet_bytes: - nonlocal handle - cc_view = hci_packets.ConnectionCompleteView( - hci_packets.EventPacketView( - bt_packets.PacketViewLittleEndian( - list(packet_bytes)))) - handle = cc_view.GetConnectionHandle() - return True - return False + dut_acl.disconnect(hci_packets.DisconnectReason.REMOTE_USER_TERMINATED_CONNECTION) + dut_acl.wait_for_disconnection_complete() - cert_hci_event_asserts.assert_event_occurs(get_handle) - cert_handle = handle - - self.enqueue_acl_data( - cert_handle, hci_packets.PacketBoundaryFlag. - FIRST_AUTOMATICALLY_FLUSHABLE, - hci_packets.BroadcastFlag.POINT_TO_POINT, - bytes(b'\x06\x00\x07\x00Hello')) - self.enqueue_acl_data( - cert_handle, - hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, - hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) - self.enqueue_acl_data( - cert_handle, hci_packets.PacketBoundaryFlag. - FIRST_AUTOMATICALLY_FLUSHABLE, - hci_packets.BroadcastFlag.POINT_TO_POINT, - bytes(b'\xe8\x03\x07\x00' + b'Hello' * 200)) - - # DUT gets a connection complete event and sends and receives - connection_event_asserts.assert_event_occurs(get_handle) - - acl_data_asserts.assert_event_occurs( - lambda packet: b'Hello!' in packet.payload) - acl_data_asserts.assert_event_occurs( - lambda packet: b'Hello' * 200 in packet.payload) + def test_recombination_l2cap_packet(self): + self.cert_hci.enable_inquiry_and_page_scan() + cert_address = self.cert_hci.read_own_address() + + self.dut_acl_manager.initiate_connection(cert_address) + cert_acl = self.cert_hci.accept_connection() + with self.dut_acl_manager.complete_outgoing_connection() as dut_acl: + cert_acl.send_first(b'\x06\x00\x07\x00Hello') + cert_acl.send_continuing(b'!') + cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200) + + assertThat(dut_acl).emits(lambda packet: b'Hello!' in packet.payload, + lambda packet: b'Hello' * 200 in packet.payload).inOrder() |