aboutsummaryrefslogtreecommitdiff
path: root/targets/rp2040/py/rp2040_utils
diff options
context:
space:
mode:
authorXin Li <delphij@google.com>2024-01-17 22:13:58 -0800
committerXin Li <delphij@google.com>2024-01-17 22:13:58 -0800
commit28d03a2a1cabbe01d7bcb6cf5166c10e50d3c2c6 (patch)
treec1643be8ab17fc607cea748a8bb1d621a5964873 /targets/rp2040/py/rp2040_utils
parentec2628a6ba2d0ecbe3ac10c8c772f6fc6acc345d (diff)
parentf054515492af5132f685cb23fe11891ee77104c9 (diff)
downloadpigweed-28d03a2a1cabbe01d7bcb6cf5166c10e50d3c2c6.tar.gz
Merge Android 24Q1 Release (ab/11220357)temp_319669529
Bug: 319669529 Merged-In: Iba357b308a79d0c8b560acd4f72b5423c9c83294 Change-Id: Icdf552029fb97a34e83c6dd7799433fc473a2506
Diffstat (limited to 'targets/rp2040/py/rp2040_utils')
-rw-r--r--targets/rp2040/py/rp2040_utils/__init__.py0
-rw-r--r--targets/rp2040/py/rp2040_utils/device_detector.py183
-rw-r--r--targets/rp2040/py/rp2040_utils/py.typed0
-rw-r--r--targets/rp2040/py/rp2040_utils/unit_test_client.py58
-rw-r--r--targets/rp2040/py/rp2040_utils/unit_test_runner.py195
-rw-r--r--targets/rp2040/py/rp2040_utils/unit_test_server.py139
6 files changed, 575 insertions, 0 deletions
diff --git a/targets/rp2040/py/rp2040_utils/__init__.py b/targets/rp2040/py/rp2040_utils/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/targets/rp2040/py/rp2040_utils/__init__.py
diff --git a/targets/rp2040/py/rp2040_utils/device_detector.py b/targets/rp2040/py/rp2040_utils/device_detector.py
new file mode 100644
index 000000000..04172c4ac
--- /dev/null
+++ b/targets/rp2040/py/rp2040_utils/device_detector.py
@@ -0,0 +1,183 @@
+#!/usr/bin/env python3
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Detects attached Raspberry Pi Pico boards."""
+
+from dataclasses import dataclass
+import logging
+from typing import Dict, List
+
+import serial.tools.list_ports # type: ignore
+import usb # type: ignore
+
+import pw_cli.log
+
+# Vendor/device ID to search for in USB devices.
+_RASPBERRY_PI_VENDOR_ID = 0x2E8A
+_PICO_USB_SERIAL_DEVICE_ID = 0x000A
+_PICO_BOOTLOADER_DEVICE_ID = 0x0003
+_PICO_DEVICE_IDS = (
+ _PICO_USB_SERIAL_DEVICE_ID,
+ _PICO_BOOTLOADER_DEVICE_ID,
+)
+
+_LOG = logging.getLogger('pi_pico_detector')
+
+
+@dataclass
+class BoardInfo:
+ """Information about a connected Pi Pico board."""
+
+ serial_port: str
+ bus: int
+ port: int
+
+ # As a board is flashed and reset, the USB address can change. This method
+ # uses the USB bus and port to try and find the desired device. Using the
+ # serial number sounds appealing, but unfortunately the application's serial
+ # number is different from the bootloader's.
+ def address(self) -> int:
+ devices = usb.core.find(
+ find_all=True,
+ idVendor=_RASPBERRY_PI_VENDOR_ID,
+ )
+ for device in devices:
+ if device.idProduct not in _PICO_DEVICE_IDS:
+ raise ValueError(
+ 'Unknown device type on bus %d port %d'
+ % (self.bus, self.port)
+ )
+ if device.port_number == self.port:
+ return device.address
+ raise ValueError(
+ (
+ 'No Pico found, it may have been disconnected or flashed with '
+ 'an incompatible application'
+ )
+ )
+
+
+@dataclass
+class _BoardSerialInfo:
+ """Information that ties a serial number to a serial com port."""
+
+ serial_port: str
+ serial_number: str
+
+
+@dataclass
+class _BoardUsbInfo:
+ """Information that ties a serial number to a USB information"""
+
+ serial_number: str
+ bus: int
+ port: int
+
+
+def _detect_pico_usb_info() -> Dict[str, _BoardUsbInfo]:
+ """Finds Raspberry Pi Pico devices and retrieves USB info for each one."""
+ boards: Dict[str, _BoardUsbInfo] = {}
+ devices = usb.core.find(
+ find_all=True,
+ idVendor=_RASPBERRY_PI_VENDOR_ID,
+ )
+
+ if not devices:
+ return boards
+
+ for device in devices:
+ if device.idProduct == _PICO_USB_SERIAL_DEVICE_ID:
+ boards[device.serial_number] = _BoardUsbInfo(
+ serial_number=device.serial_number,
+ bus=device.bus,
+ port=device.port_number,
+ )
+ elif device.idProduct == _PICO_BOOTLOADER_DEVICE_ID:
+ _LOG.error(
+ 'Found a Pi Pico in bootloader mode on bus %d address %d',
+ device.bus,
+ device.address,
+ )
+ _LOG.error(
+ (
+ 'Please flash and reboot the Pico into an application '
+ 'utilizing USB serial to properly detect it'
+ )
+ )
+
+ else:
+ _LOG.error('Unknown/incompatible Raspberry Pi detected')
+ _LOG.error(
+ (
+ 'Make sure your Pi Pico is running an application '
+ 'utilizing USB serial'
+ )
+ )
+ return boards
+
+
+def _detect_pico_serial_ports() -> Dict[str, _BoardSerialInfo]:
+ """Finds the serial com port associated with each Raspberry Pi Pico."""
+ boards = {}
+ all_devs = serial.tools.list_ports.comports()
+ for dev in all_devs:
+ if (
+ dev.vid == _RASPBERRY_PI_VENDOR_ID
+ and dev.pid == _PICO_USB_SERIAL_DEVICE_ID
+ ):
+ if dev.serial_number is None:
+ raise ValueError('Found pico with no serial number')
+ boards[dev.serial_number] = _BoardSerialInfo(
+ serial_port=dev.device,
+ serial_number=dev.serial_number,
+ )
+ return boards
+
+
+def detect_boards() -> List[BoardInfo]:
+ """Detects attached Raspberry Pi Pico boards in USB serial mode.
+
+ Returns:
+ A list of all found boards as BoardInfo objects.
+ """
+ serial_devices = _detect_pico_serial_ports()
+ pico_usb_info = _detect_pico_usb_info()
+ boards = []
+ for serial_number, usb_info in pico_usb_info.items():
+ if serial_number in serial_devices:
+ serial_info = serial_devices[serial_number]
+ boards.append(
+ BoardInfo(
+ serial_port=serial_info.serial_port,
+ bus=usb_info.bus,
+ port=usb_info.port,
+ )
+ )
+ return boards
+
+
+def main():
+ """Detects and then prints all attached Raspberry Pi Picos."""
+ pw_cli.log.install()
+
+ boards = detect_boards()
+ if not boards:
+ _LOG.info('No attached boards detected')
+ for idx, board in enumerate(boards):
+ _LOG.info('Board %d:', idx)
+ _LOG.info(' %s', board)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/targets/rp2040/py/rp2040_utils/py.typed b/targets/rp2040/py/rp2040_utils/py.typed
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/targets/rp2040/py/rp2040_utils/py.typed
diff --git a/targets/rp2040/py/rp2040_utils/unit_test_client.py b/targets/rp2040/py/rp2040_utils/unit_test_client.py
new file mode 100644
index 000000000..dfbc31208
--- /dev/null
+++ b/targets/rp2040/py/rp2040_utils/unit_test_client.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python3
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Launch a pw_target_runner client that sends a test request."""
+
+import argparse
+import subprocess
+import sys
+
+try:
+ from rp2040_utils import unit_test_server
+except ImportError:
+ # Load from this directory if rp2040_utils is not available.
+ import unit_test_server # type: ignore
+
+_TARGET_CLIENT_COMMAND = 'pw_target_runner_client'
+
+
+def parse_args():
+ """Parses command-line arguments."""
+
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument('binary', help='The target test binary to run')
+ parser.add_argument(
+ '--server-port',
+ type=int,
+ default=unit_test_server.DEFAULT_PORT,
+ help='Port the test server is located on',
+ )
+
+ return parser.parse_args()
+
+
+def launch_client(binary: str, server_port: int) -> int:
+ """Sends a test request to the specified server port."""
+ cmd = (_TARGET_CLIENT_COMMAND, '-binary', binary, '-port', str(server_port))
+ return subprocess.call(cmd)
+
+
+def main() -> int:
+ """Launch a test by sending a request to a pw_target_runner_server."""
+ args = parse_args()
+ return launch_client(args.binary, args.server_port)
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/targets/rp2040/py/rp2040_utils/unit_test_runner.py b/targets/rp2040/py/rp2040_utils/unit_test_runner.py
new file mode 100644
index 000000000..a53a9500c
--- /dev/null
+++ b/targets/rp2040/py/rp2040_utils/unit_test_runner.py
@@ -0,0 +1,195 @@
+#!/usr/bin/env python3
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""This script flashes and runs unit tests on Raspberry Pi Pico boards."""
+
+import argparse
+import logging
+import subprocess
+import sys
+import time
+from pathlib import Path
+
+import serial # type: ignore
+
+import pw_cli.log
+from pw_unit_test import serial_test_runner
+from pw_unit_test.serial_test_runner import (
+ SerialTestingDevice,
+ DeviceNotFound,
+ TestingFailure,
+)
+from rp2040_utils import device_detector
+from rp2040_utils.device_detector import BoardInfo
+
+
+_LOG = logging.getLogger("unit_test_runner")
+
+
+def parse_args():
+ """Parses command-line arguments."""
+
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument(
+ 'binary', type=Path, help='The target test binary to run'
+ )
+ parser.add_argument(
+ '--usb-bus',
+ type=int,
+ help='The bus this Pi Pico is on',
+ )
+ parser.add_argument(
+ '--usb-port',
+ type=int,
+ help='The port of this Pi Pico on the specified USB bus',
+ )
+ parser.add_argument(
+ '--serial-port',
+ type=str,
+ help='The name of the serial port to connect to when running tests',
+ )
+ parser.add_argument(
+ '-b',
+ '--baud',
+ type=int,
+ default=115200,
+ help='Baud rate to use for serial communication with target device',
+ )
+ parser.add_argument(
+ '--test-timeout',
+ type=float,
+ default=5.0,
+ help='Maximum communication delay in seconds before a '
+ 'test is considered unresponsive and aborted',
+ )
+ parser.add_argument(
+ '--verbose',
+ '-v',
+ dest='verbose',
+ action='store_true',
+ help='Output additional logs as the script runs',
+ )
+
+ return parser.parse_args()
+
+
+class PiPicoTestingDevice(SerialTestingDevice):
+ """A SerialTestingDevice implementation for the Pi Pico."""
+
+ def __init__(self, board_info: BoardInfo, baud_rate=115200):
+ self._board_info = board_info
+ self._baud_rate = baud_rate
+
+ def load_binary(self, binary: Path) -> None:
+ cmd = (
+ 'picotool',
+ 'load',
+ '-x',
+ str(binary),
+ '--bus',
+ str(self._board_info.bus),
+ '--address',
+ str(self._board_info.address()),
+ '-F',
+ )
+ process = subprocess.run(
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
+ )
+ if process.returncode:
+ err = (
+ 'Command failed: ' + ' '.join(cmd),
+ str(self._board_info),
+ process.stdout.decode('utf-8', errors='replace'),
+ )
+ raise TestingFailure('\n\n'.join(err))
+ # Wait for serial port to enumerate. This will retry forever.
+ while True:
+ try:
+ serial.Serial(
+ baudrate=self.baud_rate(), port=self.serial_port()
+ )
+ except serial.serialutil.SerialException:
+ time.sleep(0.001)
+ else:
+ break
+
+ def serial_port(self) -> str:
+ return self._board_info.serial_port
+
+ def baud_rate(self) -> int:
+ return self._baud_rate
+
+
+def _run_test(
+ device: PiPicoTestingDevice, binary: Path, test_timeout: float
+) -> bool:
+ return serial_test_runner.run_device_test(device, binary, test_timeout)
+
+
+def run_device_test(
+ binary: Path,
+ test_timeout: float,
+ serial_port: str,
+ baud_rate: int,
+ usb_bus: int,
+ usb_port: int,
+) -> bool:
+ """Flashes, runs, and checks an on-device test binary.
+
+ Returns true on test pass.
+ """
+ board = BoardInfo(serial_port, usb_bus, usb_port)
+ return _run_test(
+ PiPicoTestingDevice(board, baud_rate), binary, test_timeout
+ )
+
+
+def detect_and_run_test(binary: Path, test_timeout: float, baud_rate: int):
+ _LOG.debug('Attempting to automatically detect dev board')
+ boards = device_detector.detect_boards()
+ if not boards:
+ error = 'Could not find an attached device'
+ _LOG.error(error)
+ raise DeviceNotFound(error)
+ return _run_test(
+ PiPicoTestingDevice(boards[0], baud_rate), binary, test_timeout
+ )
+
+
+def main():
+ """Set up runner, and then flash/run device test."""
+ args = parse_args()
+ log_level = logging.DEBUG if args.verbose else logging.INFO
+ pw_cli.log.install(level=log_level)
+
+ test_passed = False
+ if not args.serial_port:
+ test_passed = detect_and_run_test(
+ args.binary, args.test_timeout, args.baud
+ )
+ else:
+ test_passed = run_device_test(
+ args.binary,
+ args.test_timeout,
+ args.serial_port,
+ args.baud,
+ args.usb_bus,
+ args.usb_port,
+ )
+
+ sys.exit(0 if test_passed else 1)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/targets/rp2040/py/rp2040_utils/unit_test_server.py b/targets/rp2040/py/rp2040_utils/unit_test_server.py
new file mode 100644
index 000000000..62c5b9461
--- /dev/null
+++ b/targets/rp2040/py/rp2040_utils/unit_test_server.py
@@ -0,0 +1,139 @@
+#!/usr/bin/env python3
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Launch a pw_target_runner server to use for multi-device testing."""
+
+import argparse
+import logging
+import sys
+import tempfile
+from typing import IO, List, Optional
+
+import pw_cli.process
+import pw_cli.log
+
+try:
+ from rp2040_utils import device_detector
+except ImportError:
+ # Load from this directory if rp2040_utils is not available.
+ import device_detector # type: ignore
+
+_LOG = logging.getLogger('unit_test_server')
+
+DEFAULT_PORT = 34172
+
+_TEST_RUNNER_COMMAND = 'rp2040_unit_test_runner'
+_TEST_SERVER_COMMAND = 'pw_target_runner_server'
+
+
+def parse_args():
+ """Parses command-line arguments."""
+
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument(
+ '--server-port',
+ type=int,
+ default=DEFAULT_PORT,
+ help='Port to launch the pw_target_runner_server on',
+ )
+ parser.add_argument(
+ '--server-config',
+ type=argparse.FileType('r'),
+ help='Path to server config file',
+ )
+ parser.add_argument(
+ '--verbose',
+ '-v',
+ dest='verbose',
+ action="store_true",
+ help='Output additional logs as the script runs',
+ )
+
+ return parser.parse_args()
+
+
+def generate_runner(command: str, arguments: List[str]) -> str:
+ """Generates a text-proto style pw_target_runner_server configuration."""
+ # TODO(amontanez): Use a real proto library to generate this when we have
+ # one set up.
+ for i, arg in enumerate(arguments):
+ arguments[i] = f' args: "{arg}"'
+ runner = ['runner {', f' command:"{command}"']
+ runner.extend(arguments)
+ runner.append('}\n')
+ return '\n'.join(runner)
+
+
+def generate_server_config() -> IO[bytes]:
+ """Returns a temporary generated file for use as the server config."""
+ boards = device_detector.detect_boards()
+ if not boards:
+ _LOG.critical('No attached boards detected')
+ sys.exit(1)
+ config_file = tempfile.NamedTemporaryFile()
+ _LOG.debug('Generating test server config at %s', config_file.name)
+ _LOG.debug('Found %d attached devices', len(boards))
+
+ # TODO: b/290245354 - Multi-device flashing doesn't work due to limitations
+ # of picotool. Limit to one device even if multiple are connected.
+ if boards:
+ boards = boards[:1]
+
+ for board in boards:
+ test_runner_args = [
+ '--usb-bus',
+ str(board.bus),
+ '--usb-port',
+ str(board.port),
+ '--serial-port',
+ board.serial_port,
+ ]
+ config_file.write(
+ generate_runner(_TEST_RUNNER_COMMAND, test_runner_args).encode(
+ 'utf-8'
+ )
+ )
+ config_file.flush()
+ return config_file
+
+
+def launch_server(
+ server_config: Optional[IO[bytes]], server_port: Optional[int]
+) -> int:
+ """Launch a device test server with the provided arguments."""
+ if server_config is None:
+ # Auto-detect attached boards if no config is provided.
+ server_config = generate_server_config()
+
+ cmd = [_TEST_SERVER_COMMAND, '-config', server_config.name]
+
+ if server_port is not None:
+ cmd.extend(['-port', str(server_port)])
+
+ return pw_cli.process.run(*cmd, log_output=True).returncode
+
+
+def main():
+ """Launch a device test server with the provided arguments."""
+ args = parse_args()
+
+ log_level = logging.DEBUG if args.verbose else logging.INFO
+ pw_cli.log.install(level=log_level)
+
+ exit_code = launch_server(args.server_config, args.server_port)
+ sys.exit(exit_code)
+
+
+if __name__ == '__main__':
+ main()