diff options
author | Xin Li <delphij@google.com> | 2024-01-17 22:13:58 -0800 |
---|---|---|
committer | Xin Li <delphij@google.com> | 2024-01-17 22:13:58 -0800 |
commit | 28d03a2a1cabbe01d7bcb6cf5166c10e50d3c2c6 (patch) | |
tree | c1643be8ab17fc607cea748a8bb1d621a5964873 /targets/rp2040/py/rp2040_utils | |
parent | ec2628a6ba2d0ecbe3ac10c8c772f6fc6acc345d (diff) | |
parent | f054515492af5132f685cb23fe11891ee77104c9 (diff) | |
download | pigweed-28d03a2a1cabbe01d7bcb6cf5166c10e50d3c2c6.tar.gz |
Merge Android 24Q1 Release (ab/11220357)temp_319669529
Bug: 319669529
Merged-In: Iba357b308a79d0c8b560acd4f72b5423c9c83294
Change-Id: Icdf552029fb97a34e83c6dd7799433fc473a2506
Diffstat (limited to 'targets/rp2040/py/rp2040_utils')
-rw-r--r-- | targets/rp2040/py/rp2040_utils/__init__.py | 0 | ||||
-rw-r--r-- | targets/rp2040/py/rp2040_utils/device_detector.py | 183 | ||||
-rw-r--r-- | targets/rp2040/py/rp2040_utils/py.typed | 0 | ||||
-rw-r--r-- | targets/rp2040/py/rp2040_utils/unit_test_client.py | 58 | ||||
-rw-r--r-- | targets/rp2040/py/rp2040_utils/unit_test_runner.py | 195 | ||||
-rw-r--r-- | targets/rp2040/py/rp2040_utils/unit_test_server.py | 139 |
6 files changed, 575 insertions, 0 deletions
diff --git a/targets/rp2040/py/rp2040_utils/__init__.py b/targets/rp2040/py/rp2040_utils/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/targets/rp2040/py/rp2040_utils/__init__.py diff --git a/targets/rp2040/py/rp2040_utils/device_detector.py b/targets/rp2040/py/rp2040_utils/device_detector.py new file mode 100644 index 000000000..04172c4ac --- /dev/null +++ b/targets/rp2040/py/rp2040_utils/device_detector.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Detects attached Raspberry Pi Pico boards.""" + +from dataclasses import dataclass +import logging +from typing import Dict, List + +import serial.tools.list_ports # type: ignore +import usb # type: ignore + +import pw_cli.log + +# Vendor/device ID to search for in USB devices. +_RASPBERRY_PI_VENDOR_ID = 0x2E8A +_PICO_USB_SERIAL_DEVICE_ID = 0x000A +_PICO_BOOTLOADER_DEVICE_ID = 0x0003 +_PICO_DEVICE_IDS = ( + _PICO_USB_SERIAL_DEVICE_ID, + _PICO_BOOTLOADER_DEVICE_ID, +) + +_LOG = logging.getLogger('pi_pico_detector') + + +@dataclass +class BoardInfo: + """Information about a connected Pi Pico board.""" + + serial_port: str + bus: int + port: int + + # As a board is flashed and reset, the USB address can change. This method + # uses the USB bus and port to try and find the desired device. Using the + # serial number sounds appealing, but unfortunately the application's serial + # number is different from the bootloader's. + def address(self) -> int: + devices = usb.core.find( + find_all=True, + idVendor=_RASPBERRY_PI_VENDOR_ID, + ) + for device in devices: + if device.idProduct not in _PICO_DEVICE_IDS: + raise ValueError( + 'Unknown device type on bus %d port %d' + % (self.bus, self.port) + ) + if device.port_number == self.port: + return device.address + raise ValueError( + ( + 'No Pico found, it may have been disconnected or flashed with ' + 'an incompatible application' + ) + ) + + +@dataclass +class _BoardSerialInfo: + """Information that ties a serial number to a serial com port.""" + + serial_port: str + serial_number: str + + +@dataclass +class _BoardUsbInfo: + """Information that ties a serial number to a USB information""" + + serial_number: str + bus: int + port: int + + +def _detect_pico_usb_info() -> Dict[str, _BoardUsbInfo]: + """Finds Raspberry Pi Pico devices and retrieves USB info for each one.""" + boards: Dict[str, _BoardUsbInfo] = {} + devices = usb.core.find( + find_all=True, + idVendor=_RASPBERRY_PI_VENDOR_ID, + ) + + if not devices: + return boards + + for device in devices: + if device.idProduct == _PICO_USB_SERIAL_DEVICE_ID: + boards[device.serial_number] = _BoardUsbInfo( + serial_number=device.serial_number, + bus=device.bus, + port=device.port_number, + ) + elif device.idProduct == _PICO_BOOTLOADER_DEVICE_ID: + _LOG.error( + 'Found a Pi Pico in bootloader mode on bus %d address %d', + device.bus, + device.address, + ) + _LOG.error( + ( + 'Please flash and reboot the Pico into an application ' + 'utilizing USB serial to properly detect it' + ) + ) + + else: + _LOG.error('Unknown/incompatible Raspberry Pi detected') + _LOG.error( + ( + 'Make sure your Pi Pico is running an application ' + 'utilizing USB serial' + ) + ) + return boards + + +def _detect_pico_serial_ports() -> Dict[str, _BoardSerialInfo]: + """Finds the serial com port associated with each Raspberry Pi Pico.""" + boards = {} + all_devs = serial.tools.list_ports.comports() + for dev in all_devs: + if ( + dev.vid == _RASPBERRY_PI_VENDOR_ID + and dev.pid == _PICO_USB_SERIAL_DEVICE_ID + ): + if dev.serial_number is None: + raise ValueError('Found pico with no serial number') + boards[dev.serial_number] = _BoardSerialInfo( + serial_port=dev.device, + serial_number=dev.serial_number, + ) + return boards + + +def detect_boards() -> List[BoardInfo]: + """Detects attached Raspberry Pi Pico boards in USB serial mode. + + Returns: + A list of all found boards as BoardInfo objects. + """ + serial_devices = _detect_pico_serial_ports() + pico_usb_info = _detect_pico_usb_info() + boards = [] + for serial_number, usb_info in pico_usb_info.items(): + if serial_number in serial_devices: + serial_info = serial_devices[serial_number] + boards.append( + BoardInfo( + serial_port=serial_info.serial_port, + bus=usb_info.bus, + port=usb_info.port, + ) + ) + return boards + + +def main(): + """Detects and then prints all attached Raspberry Pi Picos.""" + pw_cli.log.install() + + boards = detect_boards() + if not boards: + _LOG.info('No attached boards detected') + for idx, board in enumerate(boards): + _LOG.info('Board %d:', idx) + _LOG.info(' %s', board) + + +if __name__ == '__main__': + main() diff --git a/targets/rp2040/py/rp2040_utils/py.typed b/targets/rp2040/py/rp2040_utils/py.typed new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/targets/rp2040/py/rp2040_utils/py.typed diff --git a/targets/rp2040/py/rp2040_utils/unit_test_client.py b/targets/rp2040/py/rp2040_utils/unit_test_client.py new file mode 100644 index 000000000..dfbc31208 --- /dev/null +++ b/targets/rp2040/py/rp2040_utils/unit_test_client.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Launch a pw_target_runner client that sends a test request.""" + +import argparse +import subprocess +import sys + +try: + from rp2040_utils import unit_test_server +except ImportError: + # Load from this directory if rp2040_utils is not available. + import unit_test_server # type: ignore + +_TARGET_CLIENT_COMMAND = 'pw_target_runner_client' + + +def parse_args(): + """Parses command-line arguments.""" + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('binary', help='The target test binary to run') + parser.add_argument( + '--server-port', + type=int, + default=unit_test_server.DEFAULT_PORT, + help='Port the test server is located on', + ) + + return parser.parse_args() + + +def launch_client(binary: str, server_port: int) -> int: + """Sends a test request to the specified server port.""" + cmd = (_TARGET_CLIENT_COMMAND, '-binary', binary, '-port', str(server_port)) + return subprocess.call(cmd) + + +def main() -> int: + """Launch a test by sending a request to a pw_target_runner_server.""" + args = parse_args() + return launch_client(args.binary, args.server_port) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/targets/rp2040/py/rp2040_utils/unit_test_runner.py b/targets/rp2040/py/rp2040_utils/unit_test_runner.py new file mode 100644 index 000000000..a53a9500c --- /dev/null +++ b/targets/rp2040/py/rp2040_utils/unit_test_runner.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""This script flashes and runs unit tests on Raspberry Pi Pico boards.""" + +import argparse +import logging +import subprocess +import sys +import time +from pathlib import Path + +import serial # type: ignore + +import pw_cli.log +from pw_unit_test import serial_test_runner +from pw_unit_test.serial_test_runner import ( + SerialTestingDevice, + DeviceNotFound, + TestingFailure, +) +from rp2040_utils import device_detector +from rp2040_utils.device_detector import BoardInfo + + +_LOG = logging.getLogger("unit_test_runner") + + +def parse_args(): + """Parses command-line arguments.""" + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + 'binary', type=Path, help='The target test binary to run' + ) + parser.add_argument( + '--usb-bus', + type=int, + help='The bus this Pi Pico is on', + ) + parser.add_argument( + '--usb-port', + type=int, + help='The port of this Pi Pico on the specified USB bus', + ) + parser.add_argument( + '--serial-port', + type=str, + help='The name of the serial port to connect to when running tests', + ) + parser.add_argument( + '-b', + '--baud', + type=int, + default=115200, + help='Baud rate to use for serial communication with target device', + ) + parser.add_argument( + '--test-timeout', + type=float, + default=5.0, + help='Maximum communication delay in seconds before a ' + 'test is considered unresponsive and aborted', + ) + parser.add_argument( + '--verbose', + '-v', + dest='verbose', + action='store_true', + help='Output additional logs as the script runs', + ) + + return parser.parse_args() + + +class PiPicoTestingDevice(SerialTestingDevice): + """A SerialTestingDevice implementation for the Pi Pico.""" + + def __init__(self, board_info: BoardInfo, baud_rate=115200): + self._board_info = board_info + self._baud_rate = baud_rate + + def load_binary(self, binary: Path) -> None: + cmd = ( + 'picotool', + 'load', + '-x', + str(binary), + '--bus', + str(self._board_info.bus), + '--address', + str(self._board_info.address()), + '-F', + ) + process = subprocess.run( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + if process.returncode: + err = ( + 'Command failed: ' + ' '.join(cmd), + str(self._board_info), + process.stdout.decode('utf-8', errors='replace'), + ) + raise TestingFailure('\n\n'.join(err)) + # Wait for serial port to enumerate. This will retry forever. + while True: + try: + serial.Serial( + baudrate=self.baud_rate(), port=self.serial_port() + ) + except serial.serialutil.SerialException: + time.sleep(0.001) + else: + break + + def serial_port(self) -> str: + return self._board_info.serial_port + + def baud_rate(self) -> int: + return self._baud_rate + + +def _run_test( + device: PiPicoTestingDevice, binary: Path, test_timeout: float +) -> bool: + return serial_test_runner.run_device_test(device, binary, test_timeout) + + +def run_device_test( + binary: Path, + test_timeout: float, + serial_port: str, + baud_rate: int, + usb_bus: int, + usb_port: int, +) -> bool: + """Flashes, runs, and checks an on-device test binary. + + Returns true on test pass. + """ + board = BoardInfo(serial_port, usb_bus, usb_port) + return _run_test( + PiPicoTestingDevice(board, baud_rate), binary, test_timeout + ) + + +def detect_and_run_test(binary: Path, test_timeout: float, baud_rate: int): + _LOG.debug('Attempting to automatically detect dev board') + boards = device_detector.detect_boards() + if not boards: + error = 'Could not find an attached device' + _LOG.error(error) + raise DeviceNotFound(error) + return _run_test( + PiPicoTestingDevice(boards[0], baud_rate), binary, test_timeout + ) + + +def main(): + """Set up runner, and then flash/run device test.""" + args = parse_args() + log_level = logging.DEBUG if args.verbose else logging.INFO + pw_cli.log.install(level=log_level) + + test_passed = False + if not args.serial_port: + test_passed = detect_and_run_test( + args.binary, args.test_timeout, args.baud + ) + else: + test_passed = run_device_test( + args.binary, + args.test_timeout, + args.serial_port, + args.baud, + args.usb_bus, + args.usb_port, + ) + + sys.exit(0 if test_passed else 1) + + +if __name__ == '__main__': + main() diff --git a/targets/rp2040/py/rp2040_utils/unit_test_server.py b/targets/rp2040/py/rp2040_utils/unit_test_server.py new file mode 100644 index 000000000..62c5b9461 --- /dev/null +++ b/targets/rp2040/py/rp2040_utils/unit_test_server.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Launch a pw_target_runner server to use for multi-device testing.""" + +import argparse +import logging +import sys +import tempfile +from typing import IO, List, Optional + +import pw_cli.process +import pw_cli.log + +try: + from rp2040_utils import device_detector +except ImportError: + # Load from this directory if rp2040_utils is not available. + import device_detector # type: ignore + +_LOG = logging.getLogger('unit_test_server') + +DEFAULT_PORT = 34172 + +_TEST_RUNNER_COMMAND = 'rp2040_unit_test_runner' +_TEST_SERVER_COMMAND = 'pw_target_runner_server' + + +def parse_args(): + """Parses command-line arguments.""" + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + '--server-port', + type=int, + default=DEFAULT_PORT, + help='Port to launch the pw_target_runner_server on', + ) + parser.add_argument( + '--server-config', + type=argparse.FileType('r'), + help='Path to server config file', + ) + parser.add_argument( + '--verbose', + '-v', + dest='verbose', + action="store_true", + help='Output additional logs as the script runs', + ) + + return parser.parse_args() + + +def generate_runner(command: str, arguments: List[str]) -> str: + """Generates a text-proto style pw_target_runner_server configuration.""" + # TODO(amontanez): Use a real proto library to generate this when we have + # one set up. + for i, arg in enumerate(arguments): + arguments[i] = f' args: "{arg}"' + runner = ['runner {', f' command:"{command}"'] + runner.extend(arguments) + runner.append('}\n') + return '\n'.join(runner) + + +def generate_server_config() -> IO[bytes]: + """Returns a temporary generated file for use as the server config.""" + boards = device_detector.detect_boards() + if not boards: + _LOG.critical('No attached boards detected') + sys.exit(1) + config_file = tempfile.NamedTemporaryFile() + _LOG.debug('Generating test server config at %s', config_file.name) + _LOG.debug('Found %d attached devices', len(boards)) + + # TODO: b/290245354 - Multi-device flashing doesn't work due to limitations + # of picotool. Limit to one device even if multiple are connected. + if boards: + boards = boards[:1] + + for board in boards: + test_runner_args = [ + '--usb-bus', + str(board.bus), + '--usb-port', + str(board.port), + '--serial-port', + board.serial_port, + ] + config_file.write( + generate_runner(_TEST_RUNNER_COMMAND, test_runner_args).encode( + 'utf-8' + ) + ) + config_file.flush() + return config_file + + +def launch_server( + server_config: Optional[IO[bytes]], server_port: Optional[int] +) -> int: + """Launch a device test server with the provided arguments.""" + if server_config is None: + # Auto-detect attached boards if no config is provided. + server_config = generate_server_config() + + cmd = [_TEST_SERVER_COMMAND, '-config', server_config.name] + + if server_port is not None: + cmd.extend(['-port', str(server_port)]) + + return pw_cli.process.run(*cmd, log_output=True).returncode + + +def main(): + """Launch a device test server with the provided arguments.""" + args = parse_args() + + log_level = logging.DEBUG if args.verbose else logging.INFO + pw_cli.log.install(level=log_level) + + exit_code = launch_server(args.server_config, args.server_port) + sys.exit(exit_code) + + +if __name__ == '__main__': + main() |