diff options
Diffstat (limited to 'pw_emu/py')
-rw-r--r-- | pw_emu/py/BUILD.gn | 68 | ||||
-rw-r--r-- | pw_emu/py/mock_emu.py | 111 | ||||
-rw-r--r-- | pw_emu/py/mock_emu_frontend.py | 154 | ||||
-rw-r--r-- | pw_emu/py/pw_emu/__init__.py | 14 | ||||
-rw-r--r-- | pw_emu/py/pw_emu/__main__.py | 448 | ||||
-rw-r--r-- | pw_emu/py/pw_emu/core.py | 956 | ||||
-rw-r--r-- | pw_emu/py/pw_emu/frontend.py | 328 | ||||
-rw-r--r-- | pw_emu/py/pw_emu/pigweed_emulators.py | 27 | ||||
-rw-r--r-- | pw_emu/py/pw_emu/py.typed | 0 | ||||
-rw-r--r-- | pw_emu/py/pw_emu/qemu.py | 343 | ||||
-rw-r--r-- | pw_emu/py/pw_emu/renode.py | 209 | ||||
-rw-r--r-- | pw_emu/py/pyproject.toml | 16 | ||||
-rw-r--r-- | pw_emu/py/setup.cfg | 28 | ||||
-rw-r--r-- | pw_emu/py/setup.py | 18 | ||||
-rw-r--r-- | pw_emu/py/tests/__init__.py | 14 | ||||
-rw-r--r-- | pw_emu/py/tests/cli_test.py | 276 | ||||
-rw-r--r-- | pw_emu/py/tests/common.py | 67 | ||||
-rw-r--r-- | pw_emu/py/tests/core_test.py | 480 | ||||
-rw-r--r-- | pw_emu/py/tests/frontend_test.py | 150 | ||||
-rw-r--r-- | pw_emu/py/tests/py.typed | 0 | ||||
-rw-r--r-- | pw_emu/py/tests/qemu_test.py | 280 | ||||
-rw-r--r-- | pw_emu/py/tests/renode_test.py | 238 |
22 files changed, 4225 insertions, 0 deletions
diff --git a/pw_emu/py/BUILD.gn b/pw_emu/py/BUILD.gn new file mode 100644 index 000000000..bcf26e44e --- /dev/null +++ b/pw_emu/py/BUILD.gn @@ -0,0 +1,68 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import("//build_overrides/pigweed.gni") + +import("$dir_pw_build/python.gni") + +pw_python_package("py") { + setup = [ + "pyproject.toml", + "setup.cfg", + "setup.py", + ] + sources = [ + "pw_emu/__init__.py", + "pw_emu/__main__.py", + "pw_emu/core.py", + "pw_emu/frontend.py", + "pw_emu/pigweed_emulators.py", + "pw_emu/qemu.py", + "pw_emu/renode.py", + ] + tests = [ + "tests/__init__.py", + "tests/cli_test.py", + "tests/common.py", + "tests/core_test.py", + "tests/frontend_test.py", + "tests/qemu_test.py", + "tests/renode_test.py", + ] + pylintrc = "$dir_pigweed/.pylintrc" + mypy_ini = "$dir_pigweed/.mypy.ini" + python_deps = [ + "$dir_pw_build/py", + "$dir_pw_env_setup/py", + ] +} + +pw_python_script("mock_emu") { + sources = [ "mock_emu.py" ] + pylintrc = "$dir_pigweed/.pylintrc" + mypy_ini = "$dir_pigweed/.mypy.ini" + action = { + stamp = true + } +} + +pw_python_script("mock_emu_frontend") { + sources = [ "mock_emu_frontend.py" ] + python_deps = [ "$dir_pw_env_setup/py" ] + pylintrc = "$dir_pigweed/.pylintrc" + mypy_ini = "$dir_pigweed/.mypy.ini" + action = { + stamp = true + } +} diff --git a/pw_emu/py/mock_emu.py b/pw_emu/py/mock_emu.py new file mode 100644 index 000000000..1a49b2ba2 --- /dev/null +++ b/pw_emu/py/mock_emu.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Mock emulator used for testing process and channel management.""" + +import argparse +import os +import socket +import sys +import time + +from threading import Thread + + +def _tcp_thread(sock: socket.socket) -> None: + conn, _ = sock.accept() + while True: + data = conn.recv(1) + conn.send(data) + + +def _pty_thread(fd: int) -> None: + while True: + data = os.read(fd, 1) + os.write(fd, data) + + +def _get_parser() -> argparse.ArgumentParser: + """Command line parser.""" + + parser = argparse.ArgumentParser() + parser.add_argument( + '-C', '--working-dir', metavar='PATH', help='working directory' + ) + parser.add_argument( + 'echo', metavar='STRING', nargs='*', help='write STRING to stdout' + ) + parser.add_argument( + '--tcp-channel', + action='append', + default=[], + metavar='NAME', + help='listen for TCP connections, write port WDIR/NAME', + ) + if sys.platform != 'win32': + parser.add_argument( + '--pty-channel', + action='append', + default=[], + metavar='NAME', + help='create pty channel and link in WDIR/NAME', + ) + parser.add_argument( + '--exit', action='store_true', default=False, help='exit when done' + ) + + return parser + + +def main() -> None: + """Mock emulator.""" + + args = _get_parser().parse_args() + + if len(args.echo) > 0: + print(' '.join(args.echo)) + sys.stdout.flush() + + threads = [] + + for chan in args.tcp_channel: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('localhost', 0)) + port = sock.getsockname()[1] + sock.listen() + with open(os.path.join(args.working_dir, chan), 'w') as file: + file.write(str(port)) + thread = Thread(target=_tcp_thread, args=(sock,)) + thread.start() + threads.append(thread) + + if sys.platform != 'win32': + for chan in args.pty_channel: + controller, tty = os.openpty() + with open(os.path.join(args.working_dir, chan), 'w') as file: + file.write(os.ttyname(tty)) + thread = Thread(target=_pty_thread, args=(controller,)) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + if not args.exit: + while True: + time.sleep(1) + + +if __name__ == '__main__': + main() diff --git a/pw_emu/py/mock_emu_frontend.py b/pw_emu/py/mock_emu_frontend.py new file mode 100644 index 000000000..5ab370ac9 --- /dev/null +++ b/pw_emu/py/mock_emu_frontend.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Launcher and Connector for mock emulator""" + +import json +import os +from pathlib import Path +from typing import Any, Optional, List, Union +import time + +from pw_emu.core import ( + Connector, + Launcher, + InvalidProperty, + InvalidPropertyPath, +) + +# mock emulator script +_mock_emu = [ + 'python', + os.path.join(Path(os.path.dirname(__file__)).resolve(), 'mock_emu.py'), +] + + +def wait_for_file_size( + path: Union[os.PathLike, str], size: int, timeout: int = 5 +) -> None: + deadline = time.monotonic() + timeout + while not os.path.exists(path): + if time.monotonic() > deadline: + break + time.sleep(0.1) + + while os.path.getsize(path) < size: + if time.monotonic() > deadline: + break + time.sleep(0.1) + + +class MockEmuLauncher(Launcher): + """Launcher for mock emulator""" + + def __init__( + self, + config_path: Path, + ): + super().__init__('mock-emu', config_path) + self._wdir: Optional[Path] = None + self.log = True + + def _pre_start( + self, + target: str, + file: Optional[Path] = None, + pause: bool = False, + debug: bool = False, + args: Optional[str] = None, + ) -> List[str]: + channels = [] + if self._config.get_target(['pre-start-cmds']): + self._handles.add_channel_tcp('test_subst_tcp', 'localhost', 1234) + self._handles.add_channel_pty('test_subst_pty', 'pty-path') + if self._config.get_emu(['gdb_channel']): + channels += ['--tcp-channel', 'gdb'] + if self._config.get_emu(['tcp_channel']): + channels += ['--tcp-channel', 'tcp'] + if self._config.get_emu(['pty_channel']): + channels += ['--pty-channel', 'pty'] + if len(channels) > 0: + channels += ['--working-dir', str(self._wdir)] + return _mock_emu + channels + ['starting mock emulator'] + + def _post_start(self) -> None: + if not self._wdir: + return + + if self._config.get_emu(['gdb_channel']): + path = os.path.join(self._wdir, 'gdb') + wait_for_file_size(path, 5, 5) + with open(path, 'r') as file: + port = int(file.read()) + self._handles.add_channel_tcp('gdb', 'localhost', port) + + if self._config.get_emu(['tcp_channel']): + path = os.path.join(self._wdir, 'tcp') + wait_for_file_size(path, 5, 5) + with open(path, 'r') as file: + port = int(file.read()) + self._handles.add_channel_tcp('tcp', 'localhost', port) + + if self._config.get_emu(['pty_channel']): + path = os.path.join(self._wdir, 'pty') + wait_for_file_size(path, 5, 5) + with open(path, 'r') as file: + pty_path = file.read() + self._handles.add_channel_pty('pty', pty_path) + + def _get_connector(self, wdir: Path) -> Connector: + return MockEmuConnector(wdir) + + +class MockEmuConnector(Connector): + """Connector for mock emulator""" + + _props = { + 'path1': { + 'prop1': 'val1', + } + } + + def reset(self) -> None: + Path(os.path.join(self._wdir, 'reset')).touch() + + def cont(self) -> None: + Path(os.path.join(self._wdir, 'cont')).touch() + + def list_properties(self, path: str) -> List[Any]: + try: + return list(self._props[path].keys()) + except KeyError: + raise InvalidPropertyPath(path) + + def set_property(self, path: str, prop: str, value: str) -> None: + if not self._props.get(path): + raise InvalidPropertyPath(path) + if not self._props[path].get(prop): + raise InvalidProperty(path, prop) + self._props[path][prop] = value + with open(os.path.join(self._wdir, 'props.json'), 'w') as file: + json.dump(self._props, file) + + def get_property(self, path: str, prop: str) -> Any: + try: + with open(os.path.join(self._wdir, 'props.json'), 'r') as file: + self._props = json.load(file) + except OSError: + pass + if not self._props.get(path): + raise InvalidPropertyPath(path) + if not self._props[path].get(prop): + raise InvalidProperty(path, prop) + return self._props[path][prop] diff --git a/pw_emu/py/pw_emu/__init__.py b/pw_emu/py/pw_emu/__init__.py new file mode 100644 index 000000000..0f2cb9104 --- /dev/null +++ b/pw_emu/py/pw_emu/__init__.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. diff --git a/pw_emu/py/pw_emu/__main__.py b/pw_emu/py/pw_emu/__main__.py new file mode 100644 index 000000000..253edd931 --- /dev/null +++ b/pw_emu/py/pw_emu/__main__.py @@ -0,0 +1,448 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Command line interface for the Pigweed emulators frontend""" + +import argparse +import json +import os +from pathlib import Path +import signal +import subprocess +import sys +import threading + +from typing import Any + +from pw_emu.core import Error +from pw_emu.frontend import Emulator +from serial import serial_for_url, SerialException +from serial.tools.miniterm import Miniterm, key_description + +_TERM_CMD = ['python', '-m', 'serial', '--raw'] + + +def _cmd_gdb_cmds(emu, args: argparse.Namespace) -> None: + """Run gdb commands in batch mode.""" + + emu.run_gdb_cmds(args.gdb_cmd, executable=args.executable, pause=args.pause) + + +def _cmd_load(emu: Emulator, args: argparse.Namespace) -> None: + """Load an executable image via gdb start executing it if pause is + not set""" + + args.gdb_cmd = ['load'] + _cmd_gdb_cmds(emu, args) + + +def _cmd_start(emu: Emulator, args: argparse.Namespace) -> None: + """Launch the emulator and start executing, unless pause is set.""" + + if args.runner: + emu.set_emu(args.runner) + + emu.start( + target=args.target, + file=args.file, + pause=args.pause, + args=args.args, + debug=args.debug, + foreground=args.foreground, + ) + + +def _get_miniterm(emu: Emulator, chan: str) -> Miniterm: + chan_type = emu.get_channel_type(chan) + if chan_type == 'tcp': + host, port = emu.get_channel_addr(chan) + url = f'socket://[{host}]:{port}' + elif chan_type == 'pty': + url = emu.get_channel_path(chan) + else: + raise Error(f'unknown channel type `{chan_type}`') + ser = serial_for_url(url) + ser.timeout = 1 + miniterm = Miniterm(ser) + miniterm.raw = True + miniterm.set_tx_encoding('UTF-8') + miniterm.set_rx_encoding('UTF-8') + + quit_key = key_description(miniterm.exit_character) + menu_key = key_description(miniterm.menu_character) + help_key = key_description('\x08') + help_desc = f'Help: {menu_key} followed by {help_key} ---' + + print(f'--- Miniterm on {chan} ---') + print(f'--- Quit: {quit_key} | Menu: {menu_key} | {help_desc}') + + # On POSIX systems miniterm uses TIOCSTI to "cancel" the TX thread + # (reading from the console, sending to the serial) which is + # disabled on Linux kernels > 6.2 see + # https://github.com/pyserial/pyserial/issues/243 + # + # On Windows the cancel method does not seem to work either with + # recent win10 versions. + # + # Workaround by terminating the process for exceptions in the read + # and write threads. + threading.excepthook = lambda args: signal.raise_signal(signal.SIGTERM) + + return miniterm + + +def _cmd_run(emu: Emulator, args: argparse.Namespace) -> None: + """Start the emulator and connect the terminal to a channel. Stop + the emulator when exiting the terminal""" + + emu.start( + target=args.target, + file=args.file, + pause=True, + args=args.args, + ) + + ctrl_chans = ['gdb', 'monitor', 'qmp', 'robot'] + if not args.channel: + for chan in emu.get_channels(): + if chan not in ctrl_chans: + args.channel = chan + break + if not args.channel: + raise Error(f'only control channels {ctrl_chans} found') + + try: + miniterm = _get_miniterm(emu, args.channel) + emu.cont() + miniterm.start() + miniterm.join(True) + print('--- exit ---') + miniterm.stop() + miniterm.join() + miniterm.close() + except SerialException as err: + raise Error(f'error connecting to channel `{args.channel}`: {err}') + finally: + emu.stop() + + +def _cmd_restart(emu: Emulator, args: argparse.Namespace) -> None: + """Restart the emulator and start executing, unless pause is set.""" + + if emu.running(): + emu.stop() + _cmd_start(emu, args) + + +def _cmd_stop(emu: Emulator, _args: argparse.Namespace) -> None: + """Stop the emulator""" + + emu.stop() + + +def _cmd_reset(emu: Emulator, _args: argparse.Namespace) -> None: + """Perform a software reset.""" + + emu.reset() + + +def _cmd_gdb(emu: Emulator, args: argparse.Namespace) -> None: + """Start a gdb interactive session""" + + executable = args.executable if args.executable else "" + + signal.signal(signal.SIGINT, signal.SIG_IGN) + try: + cmd = emu.get_gdb_cmd() + [ + '-ex', + f'target remote {emu.get_gdb_remote()}', + executable, + ] + subprocess.run(cmd) + finally: + signal.signal(signal.SIGINT, signal.SIG_DFL) + + +def _cmd_prop_ls(emu: Emulator, args: argparse.Namespace) -> None: + """List emulator object properties.""" + + props = emu.list_properties(args.path) + print(json.dumps(props, indent=4)) + + +def _cmd_prop_get(emu: Emulator, args: argparse.Namespace) -> None: + """Show the emulator's object properties.""" + + print(emu.get_property(args.path, args.property)) + + +def _cmd_prop_set(emu: Emulator, args: argparse.Namespace) -> None: + """Set emulator's object properties.""" + + emu.set_property(args.path, args.property, args.value) + + +def _cmd_term(emu: Emulator, args: argparse.Namespace) -> None: + """Connect with an interactive terminal to an emulator channel""" + + try: + miniterm = _get_miniterm(emu, args.channel) + miniterm.start() + miniterm.join(True) + print('--- exit ---') + miniterm.stop() + miniterm.join() + miniterm.close() + except SerialException as err: + raise Error(f'error connecting to channel `{args.channel}`: {err}') + + +def _cmd_resume(emu: Emulator, _args: argparse.Namespace) -> None: + """Resume the execution of a paused emulator.""" + + emu.cont() + + +def get_parser() -> argparse.ArgumentParser: + """Command line parser""" + + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + '-i', + '--instance', + help='instance to use (default: %(default)s)', + type=str, + metavar='STRING', + default='default', + ) + parser.add_argument( + '-C', + '--working-dir', + help='path to working directory (default: %(default)s)', + type=Path, + default=os.getenv('PW_EMU_WDIR'), + ) + parser.add_argument( + '-c', + '--config', + help='path config file (default: %(default)s)', + type=str, + default=None, + ) + + subparsers = parser.add_subparsers(dest='command', required=True) + + def add_cmd(name: str, func: Any) -> argparse.ArgumentParser: + subparser = subparsers.add_parser( + name, description=func.__doc__, help=func.__doc__ + ) + subparser.set_defaults(func=func) + return subparser + + start = add_cmd('start', _cmd_start) + restart = add_cmd('restart', _cmd_restart) + + for subparser in [start, restart]: + subparser.add_argument( + 'target', + type=str, + ) + subparser.add_argument( + '--file', + '-f', + metavar='FILE', + help='file to load before starting', + ) + subparser.add_argument( + '--runner', + '-r', + help='emulator to use, automatically detected if not set', + choices=[None, 'qemu', 'renode'], + default=None, + ) + subparser.add_argument( + '--args', + '-a', + help='options to pass to the emulator', + ) + subparser.add_argument( + '--pause', + '-p', + action='store_true', + help='pause the emulator after starting it', + ) + subparser.add_argument( + '--debug', + '-d', + action='store_true', + help='start the emulator in debug mode', + ) + subparser.add_argument( + '--foreground', + '-F', + action='store_true', + help='start the emulator in foreground mode', + ) + + run = add_cmd('run', _cmd_run) + run.add_argument( + 'target', + type=str, + ) + run.add_argument( + 'file', + metavar='FILE', + help='file to load before starting', + ) + run.add_argument( + '--args', + '-a', + help='options to pass to the emulator', + ) + run.add_argument( + '--channel', + '-n', + help='channel to connect the terminal to', + ) + + stop = add_cmd('stop', _cmd_stop) + + load = add_cmd('load', _cmd_load) + load.add_argument( + 'executable', + metavar='FILE', + help='file to load via gdb', + ) + load.add_argument( + '--pause', + '-p', + help='pause the emulator after loading the file', + action='store_true', + ) + load.add_argument( + '--offset', + '-o', + metavar='ADDRESS', + help='address to load the file at', + ) + + reset = add_cmd('reset', _cmd_reset) + + gdb = add_cmd('gdb', _cmd_gdb) + gdb.add_argument( + '--executable', + '-e', + metavar='FILE', + help='file to use for the debugging session', + ) + + prop_ls = add_cmd('prop-ls', _cmd_prop_ls) + prop_ls.add_argument( + 'path', + help='path of the emulator object', + ) + + prop_get = add_cmd('prop-get', _cmd_prop_get) + prop_get.add_argument( + 'path', + help='path of the emulator object', + ) + prop_get.add_argument( + 'property', + help='name of the object property', + ) + + prop_set = add_cmd('prop-set', _cmd_prop_set) + prop_set.add_argument( + 'path', + help='path of the emulator object', + ) + prop_set.add_argument( + 'property', + help='name of the object property', + ) + prop_set.add_argument( + 'value', + help='value to set for the object property', + ) + + gdb_cmds = add_cmd('gdb-cmds', _cmd_gdb_cmds) + gdb_cmds.add_argument( + '--pause', + '-p', + help='do not resume execution after running the commands', + action='store_true', + ) + gdb_cmds.add_argument( + '--executable', + '-e', + metavar='FILE', + help='executable to use while running the gdb commands', + ) + gdb_cmds.add_argument( + 'gdb_cmd', + nargs='+', + help='gdb command to execute', + ) + + term = add_cmd('term', _cmd_term) + term.add_argument( + 'channel', + help='channel name', + ) + + resume = add_cmd('resume', _cmd_resume) + + parser.epilog = f"""commands usage: + {start.format_usage().strip()} + {restart.format_usage().strip()} + {stop.format_usage().strip()} + {run.format_usage().strip()} + {load.format_usage().strip()} + {reset.format_usage().strip()} + {gdb.format_usage().strip()} + {prop_ls.format_usage().strip()} + {prop_get.format_usage().strip()} + {prop_set.format_usage().strip()} + {gdb_cmds.format_usage().strip()} + {term.format_usage().strip()} + {resume.format_usage().strip()} + """ + + return parser + + +def main() -> int: + """Emulators frontend command line interface.""" + + args = get_parser().parse_args() + if not args.working_dir: + args.working_dir = ( + f'{os.getenv("PW_PROJECT_ROOT")}/.pw_emu/{args.instance}' + ) + + try: + emu = Emulator(args.working_dir, args.config) + args.func(emu, args) + except Error as err: + print(err) + return 1 + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/pw_emu/py/pw_emu/core.py b/pw_emu/py/pw_emu/core.py new file mode 100644 index 000000000..1e5c2ea9f --- /dev/null +++ b/pw_emu/py/pw_emu/core.py @@ -0,0 +1,956 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Infrastructure used by the user interface or specific emulators.""" + +import io +import json +import logging +import os +import re +import socket +import subprocess +import sys +import time + +from abc import ABC, abstractmethod +from importlib import import_module +from pathlib import Path +from typing import Optional, Dict, List, Union, Any, Type + +import psutil # type: ignore + +from pw_emu.pigweed_emulators import pigweed_emulators +from pw_env_setup.config_file import load as pw_config_load +from pw_env_setup.config_file import path as pw_config_path +from serial import Serial + + +_LAUNCHER_LOG = logging.getLogger('pw_qemu.core.launcher') + + +def _stop_process(pid: int) -> None: + """Gracefully stop a running process.""" + + try: + proc = psutil.Process(pid) + proc.terminate() + try: + proc.wait(timeout=5) + except psutil.TimeoutExpired: + proc.kill() + except psutil.NoSuchProcess: + pass + + +def _get_class(name: str) -> type: + """Returns a class from a full qualified class name + (e.g. "package.module.Class"). + + """ + try: + module_path, class_name = name.rsplit('.', 1) + module = import_module(module_path) + return getattr(module, class_name) + except (ImportError, AttributeError): + raise ImportError(name) + + +class Error(Exception): + """Generic pw_emu exception.""" + + +class ConfigError(Error): + """Exception raised for configuration errors.""" + + def __init__(self, config: Optional[Path], err: str) -> None: + msg = f'{config}: {err}\n' + try: + if config: + with open(config, 'r') as file: + msg += json.dumps(json.load(file), indent=4) + except (OSError, json.decoder.JSONDecodeError): + pass + super().__init__(msg) + + +class AlreadyRunning(Error): + """Exception raised if an emulator process is already running.""" + + def __init__(self, wdir: Path) -> None: + super().__init__(f'{wdir}: emulator already started') + + +class NotRunning(Error): + """Exception raised if an emulator process is not running.""" + + def __init__(self, wdir: Path) -> None: + super().__init__(f'{wdir}: emulator not started') + + +class InvalidEmulator(Error): + """Exception raised if an different backend is running.""" + + def __init__(self, emu: str) -> None: + super().__init__(f'invalid emulator `{emu}`') + + +class InvalidTarget(Error): + """Exception raised if the target is invalid.""" + + def __init__(self, config: Path, emu: Optional[str], target: str) -> None: + emu_str = f'for `{emu}`' if emu else '' + super().__init__(f'{config}: invalid target `{target}` {emu_str}') + + +class InvalidChannelName(Error): + """Exception raised if a channel name is invalid.""" + + def __init__(self, name: str, target: str, valid: str) -> None: + msg = f""" + `{name}` is not a valid device name for {target}` + try: {valid} + """ + super().__init__(msg) + + +class InvalidChannelType(Error): + """Exception raised if a channel type is invalid.""" + + def __init__(self, name: str) -> None: + super().__init__(f'`{name}` is not a valid channel type') + + +class WrongEmulator(Error): + """Exception raised if an different backend is running.""" + + def __init__(self, exp: str, found: str) -> None: + super().__init__(f'wrong emulator: expected `{exp}, found {found}`') + + +class RunError(Error): + """Exception raised when a command failed to run.""" + + def __init__(self, proc: str, msg: str) -> None: + super().__init__(f'error running `{proc}`: {msg}') + + +class InvalidPropertyPath(Error): + """Exception raised for an invalid property path.""" + + def __init__(self, path: str) -> None: + super().__init__(f'invalid property path `{path}`') + + +class InvalidProperty(Error): + """Exception raised for an invalid property path.""" + + def __init__(self, path: str, name: str) -> None: + super().__init__(f'invalid property `{name}` at `{path}`') + + +class HandlesError(Error): + """Exception raised while trying to load emulator handles.""" + + def __init__(self, msg: str) -> None: + super().__init__(f'error loading handles: {msg}') + + +class Handles: + """Running emulator handles.""" + + class Channel: + def __init__(self, chan_type: str): + self.type = chan_type + + class PtyChannel(Channel): + def __init__(self, path: str): + super().__init__('pty') + self.path = path + + class TcpChannel(Channel): + def __init__(self, host: str, port: int): + super().__init__('tcp') + self.host = host + self.port = port + + class Proc: + def __init__(self, pid: int): + self.pid = pid + + @staticmethod + def _ser_obj(obj) -> Any: + if isinstance(obj, dict): + data = {} + for key, val in obj.items(): + data[key] = Handles._ser_obj(val) + return data + if hasattr(obj, "__iter__") and not isinstance(obj, str): + return [Handles._ser_obj(item) for item in obj] + if hasattr(obj, "__dict__"): + return Handles._ser_obj(obj.__dict__) + return obj + + def _serialize(self): + return Handles._ser_obj(self) + + def save(self, wdir: Path) -> None: + """Saves handles to the given working directory.""" + + with open(os.path.join(wdir, 'handles.json'), 'w') as file: + json.dump(self._serialize(), file) + + @staticmethod + def load(wdir: Path): + try: + with open(os.path.join(wdir, 'handles.json'), 'r') as file: + data = json.load(file) + except (KeyError, OSError, json.decoder.JSONDecodeError): + raise NotRunning(wdir) + + handles = Handles(data['emu'], data['config']) + handles.set_target(data['target']) + gdb_cmd = data.get('gdb_cmd') + if gdb_cmd: + handles.set_gdb_cmd(gdb_cmd) + for name, chan in data['channels'].items(): + chan_type = chan['type'] + if chan_type == 'tcp': + handles.add_channel_tcp(name, chan['host'], chan['port']) + elif chan_type == 'pty': + handles.add_channel_pty(name, chan['path']) + else: + raise InvalidChannelType(chan_type) + for name, proc in data['procs'].items(): + handles.add_proc(name, proc['pid']) + return handles + + def __init__(self, emu: str, config: str) -> None: + self.emu = emu + self.config = config + self.gdb_cmd: List[str] = [] + self.target = '' + self.channels: Dict[str, Handles.Channel] = {} + self.procs: Dict[str, Handles.Proc] = {} + + def add_channel_tcp(self, name: str, host: str, port: int) -> None: + """Add a TCP channel.""" + + self.channels[name] = self.TcpChannel(host, port) + + def add_channel_pty(self, name: str, path: str) -> None: + """Add a pty channel.""" + + self.channels[name] = self.PtyChannel(path) + + def add_proc(self, name: str, pid: int) -> None: + """Add a pid.""" + + self.procs[name] = self.Proc(pid) + + def set_target(self, target: str) -> None: + """Set the target.""" + + self.target = target + + def set_gdb_cmd(self, cmd: List[str]) -> None: + """Set the gdb command.""" + + self.gdb_cmd = cmd.copy() + + +def _stop_processes(handles: Handles, wdir: Path) -> None: + """Stop all processes for a (partially) running emulator instance. + + Remove pid files as well. + """ + + for _, proc in handles.procs.items(): + _stop_process(proc.pid) + path = os.path.join(wdir, f'{proc}.pid') + if os.path.exists(path): + os.unlink(path) + + +class Config: + """Get and validate options from the configuration file.""" + + def __init__( + self, + config_path: Optional[Path] = None, + target: Optional[str] = None, + emu: Optional[str] = None, + ) -> None: + """Load the emulator configuration. + + If no configuration file path is given, the root project + configuration is used. + + This method set ups the generic configuration (e.g. gdb). + + It loads emulator target files and gathers them under the 'targets' key + for each emulator backend. The 'targets' settings in the configuration + file takes precedence over the loaded target files. + + """ + try: + if config_path: + with open(config_path, 'r') as file: + config = json.load(file)['pw']['pw_emu'] + else: + config_path = pw_config_path() + config = pw_config_load()['pw']['pw_emu'] + except KeyError: + raise ConfigError(config_path, 'missing `pw_emu` configuration') + + if not config_path: + raise ConfigError(None, 'unable to deterine config path') + + if config.get('target_files'): + tmp = {} + for path in config['target_files']: + if not os.path.isabs(path): + path = os.path.join(os.path.dirname(config_path), path) + with open(path, 'r') as file: + tmp.update(json.load(file).get('targets')) + if config.get('targets'): + tmp.update(config['targets']) + config['targets'] = tmp + + self.path = config_path + self._config = {'emulators': pigweed_emulators} + self._config.update(config) + self._emu = emu + self._target = target + + def set_target(self, target: str) -> None: + """Sets the current target. + + The current target is used by the get_target method. + + """ + + self._target = target + try: + self.get(['targets', target], optional=False, entry_type=dict) + except ConfigError: + raise InvalidTarget(self.path, self._emu, self._target) + + def get_targets(self) -> List[str]: + return list(self.get(['targets'], entry_type=dict).keys()) + + def _subst(self, string: str) -> str: + """Substitutes $pw_<subst_type>{arg} statements.""" + + match = re.search(r'\$pw_([^{]+){([^}]+)}', string) + if not match: + return string + + subst_type = match.group(1) + arg = match.group(2) + + if subst_type == 'env': + value = os.environ.get(arg) + if value is None: + msg = f'Environment variable `{arg}` not set' + raise ConfigError(self.path, msg) + return string.replace(f'$pw_{subst_type}{{{arg}}}', value) + + raise ConfigError(self.path, f'Invalid substitution type: {subst_type}') + + def _subst_list(self, items: List[Any]) -> List[Any]: + new_list = [] + for item in items: + if isinstance(item, str): + new_list.append(self._subst(item)) + else: + new_list.append(item) + return new_list + + def get( + self, + keys: List[str], + optional: bool = True, + entry_type: Optional[Type] = None, + ) -> Any: + """Get a config entry. + + keys is a list of string that identifies the config entry, e.g. + ['targets', 'test-target'] is going to look in the config dicionary for + ['targets']['test-target']. + + If the option is not found and optional is True it returns None if + entry_type is none or a new (empty) object of type entry_type. + + If the option is not found an optional is False it raises ConfigError. + + If entry_type is not None it will check the option to be of + that type. If it is not it will raise ConfigError. + + """ + + keys_str = ': '.join(keys) + entry: Optional[Dict[str, Any]] = self._config + + for key in keys: + if not isinstance(entry, dict): + if optional: + if entry_type: + return entry_type() + return None + raise ConfigError(self.path, f'{keys_str}: not found') + entry = entry.get(key) + + if entry is None: + if optional: + if entry_type: + return entry_type() + return None + raise ConfigError(self.path, f'{keys_str}: not found') + + if entry_type and not isinstance(entry, entry_type): + msg = f'{keys_str}: expected entry of type `{entry_type}`' + raise ConfigError(self.path, msg) + + if isinstance(entry, str): + entry = self._subst(entry) + elif isinstance(entry, list): + entry = self._subst_list(entry) + + return entry + + def get_target( + self, + keys: List[str], + optional: bool = True, + entry_type: Optional[Type] = None, + ) -> Any: + """Get a config option starting at ['targets'][target].""" + + if not self._target: + raise Error('target not set') + return self.get(['targets', self._target] + keys, optional, entry_type) + + def get_emu( + self, + keys: List[str], + optional: bool = True, + entry_type: Optional[Type] = None, + ) -> Any: + """Get a config option starting at [emu].""" + + if not self._emu: + raise Error('emu not set') + return self.get([self._emu] + keys, optional, entry_type) + + def get_target_emu( + self, + keys: List[str], + optional: bool = True, + entry_type: Optional[Type] = None, + ) -> Any: + """Get a config option starting at ['targets'][target][emu].""" + + if not self._emu or not self._target: + raise Error('emu or target not set') + return self.get( + ['targets', self._target, self._emu] + keys, optional, entry_type + ) + + +class Connector(ABC): + """Interface between a running emulator and the user visible APIs.""" + + def __init__(self, wdir: Path) -> None: + self._wdir = wdir + self._handles = Handles.load(wdir) + self._channels = self._handles.channels + self._target = self._handles.target + + @staticmethod + def get(wdir: Path) -> Any: + """Return a connector instace for a given emulator type.""" + handles = Handles.load(wdir) + config = Config(handles.config) + emu = handles.emu + try: + name = config.get(['emulators', emu, 'connector']) + cls = _get_class(name) + except (ConfigError, ImportError): + raise InvalidEmulator(emu) + return cls(wdir) + + def get_emu(self) -> str: + """Returns the emulator type.""" + + return self._handles.emu + + def get_gdb_cmd(self) -> List[str]: + """Returns the configured gdb command.""" + return self._handles.gdb_cmd + + def get_config_path(self) -> Path: + """Returns the configuration path.""" + + return self._handles.config + + def get_procs(self) -> Dict[str, Handles.Proc]: + """Returns the running processes indexed by the process name.""" + + return self._handles.procs + + def get_channel_type(self, name: str) -> str: + """Returns the channel type.""" + + try: + return self._channels[name].type + except KeyError: + channels = ' '.join(self._channels.keys()) + raise InvalidChannelName(name, self._target, channels) + + def get_channel_path(self, name: str) -> str: + """Returns the channel path. Raises InvalidChannelType if this + is not a pty channel. + + """ + + try: + if self._channels[name].type != 'pty': + raise InvalidChannelType(self._channels[name].type) + return self._channels[name].path + except KeyError: + raise InvalidChannelName(name, self._target, self._channels.keys()) + + def get_channel_addr(self, name: str) -> tuple: + """Returns a pair of (host, port) for the channel. Raises + InvalidChannelType if this is not a tcp channel. + + """ + + try: + if self._channels[name].type != 'tcp': + raise InvalidChannelType(self._channels[name].type) + return (self._channels[name].host, self._channels[name].port) + except KeyError: + raise InvalidChannelName(name, self._target, self._channels.keys()) + + def get_channel_stream( + self, + name: str, + timeout: Optional[float] = None, + ) -> io.RawIOBase: + """Returns a file object for a given host exposed device. + + If timeout is None than reads and writes are blocking. If + timeout is zero the stream is operating in non-blocking + mode. Otherwise read and write will timeout after the given + value. + + """ + + chan_type = self.get_channel_type(name) + if chan_type == 'tcp': + host, port = self.get_channel_addr(name) + if ':' in host: + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + else: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.settimeout(timeout) + ret = sock.makefile('rwb', buffering=0) + sock.close() + return ret + if chan_type == 'pty': + ser = Serial(self.get_channel_path(name)) + ser.timeout = timeout + return ser + raise InvalidChannelType(chan_type) + + def get_channels(self) -> List[str]: + return self._handles.channels.keys() + + def stop(self) -> None: + """Stop the emulator.""" + + _stop_processes(self._handles, self._wdir) + + try: + os.unlink(os.path.join(self._wdir, 'handles.json')) + except OSError: + pass + + def proc_running(self, proc: str) -> bool: + try: + return psutil.pid_exists(self._handles.procs[proc].pid) + except (NotRunning, KeyError): + return False + + def running(self) -> bool: + """Check if the main emulator process is already running.""" + + try: + return psutil.pid_exists(self._handles.procs[self._handles.emu].pid) + except (NotRunning, KeyError): + return False + + @abstractmethod + def reset(self) -> None: + """Perform a software reset.""" + + @abstractmethod + def cont(self) -> None: + """Resume the emulator's execution.""" + + @abstractmethod + def list_properties(self, path: str) -> List[Any]: + """Returns the property list for an emulator object.""" + + @abstractmethod + def set_property(self, path: str, prop: str, value: str) -> None: + """Sets the value of an emulator's object property.""" + + @abstractmethod + def get_property(self, path: str, prop: str) -> Any: + """Returns the value of an emulator's object property.""" + + +class Launcher(ABC): + """Starts an emulator based on the target and configuration file.""" + + def __init__( + self, + emu: str, + config_path: Optional[Path] = None, + ) -> None: + """Initializes a Launcher instance.""" + + self._wdir: Optional[Path] = None + """Working directory""" + + self._emu = emu + """Emulator type (e.g. "qemu", "renode").""" + + self._target: Optional[str] = None + """Target, initialized to None and set with _prep_start.""" + + self._config = Config(config_path, emu=emu) + """Global, emulator and target configuration.""" + + self._handles = Handles(self._emu, str(self._config.path)) + """Handles for processes, channels, etc.""" + + gdb_cmd = self._config.get(['gdb'], entry_type=list) + if gdb_cmd: + self._handles.set_gdb_cmd(gdb_cmd) + + @staticmethod + def get(emu: str, config_path: Optional[Path] = None) -> Any: + """Returns a launcher for a given emulator type.""" + config = Config(config_path) + try: + name = config.get(['emulators', emu, 'launcher']) + cls = _get_class(name) + except (ConfigError, ImportError): + raise InvalidEmulator(str(emu)) + return cls(config_path) + + @abstractmethod + def _pre_start( + self, + target: str, + file: Optional[Path] = None, + pause: bool = False, + debug: bool = False, + args: Optional[str] = None, + ) -> List[str]: + """Pre start work, returns command to start the emulator. + + The target and emulator configuration can be accessed through + :py:attr:`pw_emu.core.Launcher._config` with + :py:meth:`pw_emu.core.Config.get`, + :py:meth:`pw_emu.core.Config.get_target`, + :py:meth:`pw_emu.core.Config.get_emu`, + :py:meth:`pw_emu.core.Config.get_target_emu`. + """ + + @abstractmethod + def _post_start(self) -> None: + """Post start work, finalize emulator handles. + + Perform any post start emulator initialization and finalize the emulator + handles information. + + Typically an internal monitor channel is used to inquire information + about the the configured channels (e.g. TCP ports, pty paths) and + :py:attr:`pw_emu.core.Launcher._handles` is updated via + :py:meth:`pw_emu.core.Handles.add_channel_tcp`, + :py:meth:`pw_emu.core.Handles.add_channel_pty`, etc. + + """ + + @abstractmethod + def _get_connector(self, wdir: Path) -> Connector: + """Get a connector for this emulator type.""" + + def _path(self, name: Union[Path, str]) -> Path: + """Returns the full path for a given emulator file.""" + if self._wdir is None: + raise Error('internal error') + return Path(os.path.join(self._wdir, name)) + + def _subst_channel(self, subst_type: str, arg: str, string: str) -> str: + """Substitutes $pw_emu_channel_{func}{arg} statements.""" + + try: + chan = self._handles.channels[arg] + except KeyError: + return string + + if subst_type == 'channel_port': + if not isinstance(chan, Handles.TcpChannel): + return string + return str(chan.port) + + if subst_type == 'channel_host': + if not isinstance(chan, Handles.TcpChannel): + return string + return chan.host + + if subst_type == 'channel_path': + if not isinstance(chan, Handles.PtyChannel): + return string + return chan.path + + return string + + def _subst(self, string: str) -> str: + """Substitutes $pw_emu_<subst_type>{arg} statements.""" + + match = re.search(r'\$pw_emu_([^{]+){([^}]+)}', string) + if not match: + return string + + subst_type = match.group(1) + arg = match.group(2) + + if subst_type == 'wdir': + if self._wdir: + return os.path.join(self._wdir, arg) + return string + + if 'channel_' in subst_type: + return self._subst_channel(subst_type, arg, string) + + return string + + # pylint: disable=protected-access + # use os._exit after fork instead of os.exit + def _daemonize( + self, + name: str, + cmd: List[str], + ) -> None: + """Daemonize process for UNIX hosts.""" + + if sys.platform == 'win32': + raise Error('_daemonize not supported on win32') + + # pylint: disable=no-member + # avoid pylint false positive on win32 + pid = os.fork() + if pid < 0: + raise RunError(name, f'fork failed: {pid}') + if pid > 0: + return + + path: Path = Path('/dev/null') + fd = os.open(path, os.O_RDONLY) + os.dup2(fd, sys.stdin.fileno()) + os.close(fd) + + path = self._path(f'{name}.log') + fd = os.open(path, os.O_WRONLY | os.O_TRUNC | os.O_CREAT) + os.dup2(fd, sys.stdout.fileno()) + os.dup2(fd, sys.stderr.fileno()) + os.close(fd) + + os.setsid() + + if os.fork() > 0: + os._exit(0) + + try: + # Make the pid file create and pid write operations atomic to avoid + # races with readers. + with open(self._path(f'{name}.pid.tmp'), 'w') as file: + file.write(f'{os.getpid()}') + os.rename(self._path(f'{name}.pid.tmp'), self._path(f'{name}.pid')) + os.execvp(cmd[0], cmd) + finally: + os._exit(1) + + def _start_proc( + self, + name: str, + cmd: List[str], + foreground: bool = False, + ) -> Union[subprocess.Popen, None]: + """Run the main emulator process. + + The process pid is stored and can later be accessed by its name to + terminate it when the emulator is stopped. + + If foreground is True the process run in the foreground and a + subprocess.Popen object is returned. Otherwise the process is started in + the background and None is returned. + + When running in the background stdin is redirected to the NULL device + and stdout and stderr are redirected to a file named <name>.log which is + stored in the emulator's instance working directory. + + """ + for idx, item in enumerate(cmd): + cmd[idx] = self._subst(item) + + pid_file_path = self._path(f'{name}.pid') + if os.path.exists(pid_file_path): + os.unlink(pid_file_path) + + if foreground: + proc = subprocess.Popen(cmd) + self._handles.add_proc(name, proc.pid) + with open(pid_file_path, 'w') as file: + file.write(f'{proc.pid}') + return proc + + if sys.platform == 'win32': + file = open(self._path(f'{name}.log'), 'w') + proc = subprocess.Popen( + cmd, + stdin=subprocess.DEVNULL, + stdout=file, + stderr=file, + creationflags=subprocess.DETACHED_PROCESS, + ) + file.close() + with open(pid_file_path, 'w') as file: + file.write(f'{proc.pid}') + self._handles.add_proc(name, proc.pid) + # avoids resource warnings due to not calling wait which + # we don't want to do since we've started the process in + # the background + proc.returncode = 0 + else: + self._daemonize(name, cmd) + + # wait for the pid file to avoid double start race conditions + timeout = time.monotonic() + 30 + while not os.path.exists(self._path(f'{name}.pid')): + time.sleep(0.1) + if time.monotonic() > timeout: + break + if not os.path.exists(self._path(f'{name}.pid')): + raise RunError(name, 'pid file timeout') + try: + with open(pid_file_path, 'r') as file: + pid = int(file.readline()) + self._handles.add_proc(name, pid) + except (OSError, ValueError) as err: + raise RunError(name, str(err)) + + return None + + def _stop_procs(self): + """Stop all registered processes.""" + + for name, proc in self._handles.procs.items(): + _stop_process(proc.pid) + if os.path.exists(self._path(f'{name}.pid')): + os.unlink(self._path(f'{name}.pid')) + + def _start_procs(self, procs_list: str) -> None: + """Start additional processes besides the main emulator one.""" + + procs = self._config.get_target([procs_list], entry_type=dict) + for name, cmd in procs.items(): + self._start_proc(name, cmd) + + def start( + self, + wdir: Path, + target: str, + file: Optional[Path] = None, + pause: bool = False, + debug: bool = False, + foreground: bool = False, + args: Optional[str] = None, + ) -> Connector: + """Start the emulator for the given target. + + If file is set that the emulator will load the file before starting. + + If pause is True the emulator is paused. + + If debug is True the emulator is run in foreground with debug output + enabled. This is useful for seeing errors, traces, etc. + + If foreground is True the emulator is run in foreground otherwise it is + started in daemon mode. This is useful when there is another process + controlling the emulator's life cycle (e.g. cuttlefish) + + args are passed directly to the emulator + + """ + + try: + handles = Handles.load(wdir) + if psutil.pid_exists(handles.procs[handles.emu].pid): + raise AlreadyRunning(wdir) + except NotRunning: + pass + + self._wdir = wdir + self._target = target + self._config.set_target(target) + self._handles.set_target(target) + gdb_cmd = self._config.get_target(['gdb'], entry_type=list) + if gdb_cmd: + self._handles.set_gdb_cmd(gdb_cmd) + os.makedirs(wdir, mode=0o700, exist_ok=True) + + cmd = self._pre_start( + target=target, file=file, pause=pause, debug=debug, args=args + ) + + if debug: + foreground = True + _LAUNCHER_LOG.setLevel(logging.DEBUG) + + _LAUNCHER_LOG.debug('starting emulator with command: %s', ' '.join(cmd)) + + try: + self._start_procs('pre-start-cmds') + proc = self._start_proc(self._emu, cmd, foreground) + self._start_procs('post-start-cmds') + except RunError as err: + self._stop_procs() + raise err + + self._post_start() + self._handles.save(wdir) + + if proc: + proc.wait() + self._stop_procs() + + return self._get_connector(self._wdir) diff --git a/pw_emu/py/pw_emu/frontend.py b/pw_emu/py/pw_emu/frontend.py new file mode 100644 index 000000000..8ecf7d3fe --- /dev/null +++ b/pw_emu/py/pw_emu/frontend.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""User API""" + +import io +import os +import subprocess +import tempfile + +from pathlib import Path +from typing import Any, Optional, List, Union, Dict + +from pw_emu.core import ( + AlreadyRunning, + Config, + ConfigError, + Connector, + Launcher, + InvalidEmulator, + InvalidChannelType, + NotRunning, +) + + +class Emulator: + """Launches, controls and interacts with an emulator instance.""" + + def __init__(self, wdir: Path, config_path: Optional[Path] = None) -> None: + self._wdir = wdir + self._config_path = config_path + self._connector: Optional[Connector] = None + self._launcher: Optional[Launcher] = None + + def _get_launcher(self, target: str) -> Launcher: + """Returns an emulator for a given target. + + If there are multiple emulators for the same target it will return + an arbitrary emulator launcher. + """ + config = Config(self._config_path) + target_config = config.get( + ['targets', target], + optional=False, + entry_type=dict, + ) + for key in target_config.keys(): + try: + return Launcher.get(key, self._config_path) + except InvalidEmulator: + pass + raise ConfigError( + self._config_path, + f'could not determine emulator for target `{target}`', + ) + + def start( + self, + target: str, + file: Optional[Path] = None, + pause: bool = False, + debug: bool = False, + foreground: bool = False, + args: Optional[str] = None, + ) -> None: + """Start the emulator for the given target. + + If file is set the emulator will load the file before starting. + + If pause is True the emulator is paused until the debugger is + connected. + + If debug is True the emulator is run in foreground with debug + output enabled. This is useful for seeing errors, traces, etc. + + If foreground is True the emulator is run in foreground otherwise + it is started in daemon mode. This is useful when there is + another process controlling the emulator's life cycle + (e.g. cuttlefish) + + args are passed directly to the emulator + + """ + if self._connector: + raise AlreadyRunning(self._wdir) + + if self._launcher is None: + self._launcher = self._get_launcher(target) + self._connector = self._launcher.start( + wdir=self._wdir, + target=target, + file=file, + pause=pause, + debug=debug, + foreground=foreground, + args=args, + ) + + def _c(self) -> Connector: + if self._connector is None: + self._connector = Connector.get(self._wdir) + if not self.running(): + raise NotRunning(self._wdir) + return self._connector + + def running(self) -> bool: + """Check if the main emulator process is already running.""" + + try: + return self._c().running() + except NotRunning: + return False + + def _path(self, name: Union[Path, str]) -> Union[Path, str]: + """Returns the full path for a given emulator file.""" + + return os.path.join(self._wdir, name) + + def stop(self): + """Stop the emulator.""" + + return self._c().stop() + + def get_gdb_remote(self) -> str: + """Return a string that can be passed to the target remote gdb + command. + + """ + + chan_type = self._c().get_channel_type('gdb') + + if chan_type == 'tcp': + host, port = self._c().get_channel_addr('gdb') + return f'{host}:{port}' + + if chan_type == 'pty': + return self._c().get_channel_path('gdb') + + raise InvalidChannelType(chan_type) + + def get_gdb_cmd(self) -> List[str]: + """Returns the gdb command for current target.""" + return self._c().get_gdb_cmd() + + def run_gdb_cmds( + self, + commands: List[str], + executable: Optional[str] = None, + pause: bool = False, + ) -> subprocess.CompletedProcess: + """Connect to the target and run the given commands silently + in batch mode. + + The executable is optional but it may be required by some gdb + commands. + + If pause is set do not continue execution after running the + given commands. + + """ + + cmd = self._c().get_gdb_cmd().copy() + if not cmd: + raise ConfigError(self._c().get_config_path(), 'gdb not configured') + + cmd.append('-batch-silent') + cmd.append('-ex') + cmd.append(f'target remote {self.get_gdb_remote()}') + for gdb_cmd in commands: + cmd.append('-ex') + cmd.append(gdb_cmd) + if pause: + cmd.append('-ex') + cmd.append('disconnect') + if executable: + cmd.append(executable) + return subprocess.run(cmd, capture_output=True) + + def reset(self) -> None: + """Perform a software reset.""" + self._c().reset() + + def list_properties(self, path: str) -> List[Dict]: + """Returns the property list for an emulator object. + + The object is identified by a full path. The path is target + specific and the format of the path is backend specific. + + qemu path example: /machine/unattached/device[10] + + renode path example: sysbus.uart + + """ + return self._c().list_properties(path) + + def set_property(self, path: str, prop: str, value: Any) -> None: + """Sets the value of an emulator's object property.""" + + self._c().set_property(path, prop, value) + + def get_property(self, path: str, prop: str) -> Any: + """Returns the value of an emulator's object property.""" + + return self._c().get_property(path, prop) + + def get_channel_type(self, name: str) -> str: + """Returns the channel type + + Currently `pty` or `tcp` are the only supported types. + + """ + + return self._c().get_channel_type(name) + + def get_channel_path(self, name: str) -> str: + """Returns the channel path. Raises InvalidChannelType if this + is not a pty channel. + + """ + + return self._c().get_channel_path(name) + + def get_channel_addr(self, name: str) -> tuple: + """Returns a pair of (host, port) for the channel. Raises + InvalidChannelType if this is not a tcp channel. + + """ + + return self._c().get_channel_addr(name) + + def get_channel_stream( + self, + name: str, + timeout: Optional[float] = None, + ) -> io.RawIOBase: + """Returns a file object for a given host exposed device. + + If timeout is None than reads and writes are blocking. If + timeout is zero the stream is operating in non-blocking + mode. Otherwise read and write will timeout after the given + value. + + """ + + return self._c().get_channel_stream(name, timeout) + + def get_channels(self) -> List[str]: + """Returns the list of available channels.""" + + return self._c().get_channels() + + def set_emu(self, emu: str) -> None: + """Set the emulator type for this instance.""" + + self._launcher = Launcher.get(emu, self._config_path) + + def cont(self) -> None: + """Resume the emulator's execution.""" + + self._c().cont() + + +class TemporaryEmulator(Emulator): + """Temporary emulator instances. + + Manages emulator instances that run in temporary working + directories. The emulator instance is stopped and the working + directory is cleared when the with block completes. + + It also supports interoperability with the pw emu cli, i.e. + starting the emulator with the CLI and controlling / interacting + with it from the API. + + Usage example: + + .. code-block:: python + + # programatically start and load an executable then access it + with TemporaryEmulator() as emu: + emu.start(target, file) + with emu.get_channel_stream(chan) as stream: + ... + + .. code-block:: python + + # or start it form the command line then access it + with TemporaryEmulator() as emu: + build.bazel( + ctx, + "run", + exec_path, + "--run_under=pw emu start <target> --file " + ) + with emu.get_channel_stream(chan) as stream: + ... + + """ + + def __init__( + self, + config_path: Optional[Path] = None, + cleanup: bool = True, + ) -> None: + self._temp = tempfile.TemporaryDirectory() + self._cleanup = cleanup + super().__init__(Path(self._temp.name), config_path) + + def __enter__(self): + # Interoperability with pw emu cli. + os.environ["PW_EMU_WDIR"] = self._wdir + return self + + def __exit__(self, exc, value, traceback) -> None: + self.stop() + del os.environ["PW_EMU_WDIR"] + if self._cleanup: + self._temp.cleanup() diff --git a/pw_emu/py/pw_emu/pigweed_emulators.py b/pw_emu/py/pw_emu/pigweed_emulators.py new file mode 100644 index 000000000..764ed2ace --- /dev/null +++ b/pw_emu/py/pw_emu/pigweed_emulators.py @@ -0,0 +1,27 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Pigweed built-in emulators frontends.""" + +from typing import Dict + +pigweed_emulators: Dict[str, Dict[str, str]] = { + 'qemu': { + 'connector': 'pw_emu.qemu.QemuConnector', + 'launcher': 'pw_emu.qemu.QemuLauncher', + }, + 'renode': { + 'connector': 'pw_emu.renode.RenodeConnector', + 'launcher': 'pw_emu.renode.RenodeLauncher', + }, +} diff --git a/pw_emu/py/pw_emu/py.typed b/pw_emu/py/pw_emu/py.typed new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/pw_emu/py/pw_emu/py.typed diff --git a/pw_emu/py/pw_emu/qemu.py b/pw_emu/py/pw_emu/qemu.py new file mode 100644 index 000000000..50112cda3 --- /dev/null +++ b/pw_emu/py/pw_emu/qemu.py @@ -0,0 +1,343 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Pigweed qemu frontend.""" + + +import io +import json +import logging +import os +import re +import socket +import sys + +from pathlib import Path +from typing import Optional, Dict, List, Any + +from pw_emu.core import ( + ConfigError, + Connector, + Launcher, + Error, + InvalidChannelType, + WrongEmulator, +) + +_QMP_LOG = logging.getLogger('pw_qemu.qemu.qmp') + + +class QmpError(Error): + """Exception for QMP errors.""" + + def __init__(self, err: str): + super().__init__(err) + + +class QmpClient: + """Send qmp requests the server.""" + + def __init__(self, stream: io.RawIOBase): + self._stream = stream + + json.loads(self._stream.readline()) + cmd = json.dumps({'execute': 'qmp_capabilities'}) + self._stream.write(cmd.encode('utf-8')) + resp = json.loads(self._stream.readline().decode('ascii')) + if not 'return' in resp: + raise QmpError(f'qmp init failed: {resp.get("error")}') + + def request(self, cmd: str, args: Optional[Dict[str, Any]] = None) -> Any: + """Issue a command using the qmp interface. + + Returns a map with the response or None if there is no + response for this command. + + """ + + req: Dict[str, Any] = {'execute': cmd} + if args: + req['arguments'] = args + _QMP_LOG.debug(' -> {json.dumps(cmd)}') + self._stream.write(json.dumps(req).encode('utf-8')) + while True: + line = self._stream.readline() + _QMP_LOG.debug(' <- {line}') + resp = json.loads(line) + if 'error' in resp.keys(): + raise QmpError(resp['error']['desc']) + if 'return' in resp.keys(): + return resp['return'] + + +class QemuLauncher(Launcher): + """Start a new qemu process for a given target and config file.""" + + def __init__(self, config_path: Optional[Path] = None): + super().__init__('qemu', config_path) + self._start_cmd: List[str] = [] + self._chardevs_id_to_name = { + 'compat_monitor0': 'qmp', + 'compat_monitor1': 'monitor', + 'gdb': 'gdb', + } + self._chardevs: Dict[str, Any] = {} + self._qmp_init_sock: Optional[socket.socket] = None + + def _set_qemu_channel_tcp(self, name: str, filename: str) -> None: + """Parse a TCP chardev and return (host, port) tuple. + + Format for the tcp chardev backend: + + [disconnected|isconnected:]tcp:<host>:<port>[,<options>][ <-> + <host>:<port>] + + """ + + host_port: Any = filename.split(',')[0] + if host_port.split(':')[0] != 'tcp': + host_port = host_port.split(':')[2:] + else: + host_port = host_port.split(':')[1:] + # IPV6 hosts have : + host = ':'.join(host_port[0:-1]) + port = host_port[-1] + self._handles.add_channel_tcp(name, host, int(port)) + + def _set_qemu_channel_pty(self, name: str, filename: str) -> None: + """Parse a PTY chardev and return the path. + + Format for the pty chardev backend: pty:<path> + """ + + path = filename.split(':')[1] + + self._handles.add_channel_pty(name, path) + + if os.path.lexists(self._path(name)): + os.unlink(self._path(name)) + os.symlink(path, self._path(name)) + + def _set_qemu_channel(self, name: str, filename: str) -> None: + """Setups a chardev channel type.""" + + if filename.startswith('pty'): + self._set_qemu_channel_pty(name, filename) + elif 'tcp' in filename: + self._set_qemu_channel_tcp(name, filename) + + def _get_channels_config(self, chan: str, opt: str) -> Any: + val = self._config.get_emu(['channels', chan, opt]) + if val is not None: + return val + return self._config.get_emu(['channels', opt]) + + def _configure_default_channels(self) -> None: + """Configure the default channels.""" + + # keep qmp first so that it gets the compat_monitor0 label + for chan in ['qmp', 'monitor', 'gdb']: + chan_type = self._get_channels_config(chan, 'type') + if not chan_type: + chan_type = 'tcp' + if chan_type == 'pty': + if sys.platform == 'win32': + raise InvalidChannelType(chan_type) + backend = 'pty' + elif chan_type == 'tcp': + backend = 'tcp:localhost:0,ipv4=on,server=on,wait=off' + else: + raise InvalidChannelType(chan_type) + self._start_cmd.extend([f'-{chan}', backend]) + + def _get_chardev_config(self, name: str, opt: str) -> Any: + val = self._config.get_target_emu(['channels', 'chardevs', name, opt]) + if not val: + val = self._get_channels_config(name, opt) + return val + + def _configure_serial_channels(self, serials: Dict) -> None: + """Create "standard" serial devices. + + We can't control the serial allocation number for "standard" + -serial devices so fill the slots for the not needed serials + with null chardevs e.g. for serial3, serial1 generate the + following arguments, in this order: + + -serial null -serial {backend} -serial null - serial {backend} + + """ + + min_ser = sys.maxsize + max_ser = -1 + for serial in serials.keys(): + num = int(serial.split('serial')[1]) + if num < min_ser: + min_ser = num + if num > max_ser: + max_ser = num + for i in range(min_ser, max_ser + 1): + if serials.get(f'serial{i}'): + name = serials[f'serial{i}'] + chan_type = self._get_chardev_config(name, 'type') + if not chan_type: + chan_type = 'tcp' + if chan_type == 'pty': + backend = 'pty' + elif chan_type == 'tcp': + backend = 'tcp:localhost:0,ipv4=on,server=on,wait=off' + else: + raise InvalidChannelType(chan_type) + self._start_cmd.extend(['-serial', backend]) + else: + self._start_cmd.extend(['-serial', 'null']) + + def _configure_chardev_channels(self) -> None: + """Configure chardevs.""" + + self._chardevs = self._config.get_target_emu( + ['channels', 'chardevs'], True, dict + ) + + serials = {} + for name, config in self._chardevs.items(): + chardev_id = config['id'] + self._chardevs_id_to_name[chardev_id] = name + + chardev_type = self._get_chardev_config(name, 'type') + if chardev_type is None: + chardev_type = 'tcp' + + if chardev_type == 'pty': + backend = 'pty' + elif chardev_type == 'tcp': + backend = 'socket,host=localhost,port=0,server=on,wait=off' + else: + raise InvalidChannelType(chardev_type) + + # serials are configured differently + if re.search(r'serial[0-9]*', chardev_id): + serials[chardev_id] = name + else: + self._start_cmd.extend( + ['-chardev', f'{backend},id={chardev_id}'] + ) + + self._configure_serial_channels(serials) + + def _pre_start( + self, + target: str, + file: Optional[Path] = None, + pause: bool = False, + debug: bool = False, + args: Optional[str] = None, + ) -> List[str]: + qemu = self._config.get_target_emu(['executable']) + if not qemu: + qemu = self._config.get_emu(['executable'], optional=False) + machine = self._config.get_target_emu(['machine'], optional=False) + + self._start_cmd = [f'{qemu}', '-nographic', '-nodefaults'] + self._start_cmd.extend(['-display', 'none']) + self._start_cmd.extend(['-machine', f'{machine}']) + + try: + self._configure_default_channels() + self._configure_chardev_channels() + except KeyError as err: + raise ConfigError(self._config.path, str(err)) + + if pause: + self._start_cmd.append('-S') + if debug: + self._start_cmd.extend(['-d', 'guest_errors']) + + if file: + self._start_cmd.extend(['-kernel', str(file)]) + + self._start_cmd.extend(self._config.get_emu(['args'], entry_type=list)) + self._start_cmd.extend( + self._config.get_target_emu(['args'], entry_type=list) + ) + if args: + self._start_cmd.extend(args.split(' ')) + + # initial/bootstrap qmp connection + self._qmp_init_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._qmp_init_sock.bind(('localhost', 0)) + port = self._qmp_init_sock.getsockname()[1] + self._qmp_init_sock.listen() + self._qmp_init_sock.settimeout(30) + self._start_cmd.extend(['-qmp', f'tcp:localhost:{port}']) + + return self._start_cmd + + def _post_start(self) -> None: + assert self._qmp_init_sock is not None + conn, _ = self._qmp_init_sock.accept() + self._qmp_init_sock.close() + qmp = QmpClient(conn.makefile('rwb', buffering=0)) + conn.close() + + resp = qmp.request('query-chardev') + for chardev in resp: + label = chardev['label'] + name = self._chardevs_id_to_name.get(label) + if name: + self._set_qemu_channel(name, chardev['filename']) + + def _get_connector(self, wdir: Path) -> Connector: + return QemuConnector(wdir) + + +class QemuConnector(Connector): + """qemu implementation for the emulator specific connector methods.""" + + def __init__(self, wdir: Path) -> None: + super().__init__(wdir) + if self.get_emu() != 'qemu': + raise WrongEmulator('qemu', self.get_emu()) + self._qmp: Optional[QmpClient] = None + + def _q(self) -> QmpClient: + if not self._qmp: + self._qmp = QmpClient(self.get_channel_stream('qmp')) + return self._qmp + + def reset(self) -> None: + self._q().request('system_reset') + + def cont(self) -> None: + self._q().request('cont') + + def set_property(self, path: str, prop: str, value: Any) -> None: + args = { + 'path': '{}'.format(path), + 'property': prop, + 'value': value, + } + self._q().request('qom-set', args) + + def get_property(self, path: str, prop: str) -> Any: + args = { + 'path': '{}'.format(path), + 'property': prop, + } + return self._q().request('qom-get', args) + + def list_properties(self, path: str) -> List[Any]: + args = { + 'path': '{}'.format(path), + } + return self._q().request('qom-list', args) diff --git a/pw_emu/py/pw_emu/renode.py b/pw_emu/py/pw_emu/renode.py new file mode 100644 index 000000000..1e3ccb548 --- /dev/null +++ b/pw_emu/py/pw_emu/renode.py @@ -0,0 +1,209 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Pigweed renode frontend.""" + +import socket +import time +import xmlrpc.client + +from pathlib import Path +from typing import Optional, List, Any + +from pw_emu.core import ( + Connector, + Handles, + InvalidChannelType, + Launcher, + Error, + WrongEmulator, +) + + +class RenodeRobotError(Error): + """Exception for Renode robot errors.""" + + def __init__(self, err: str): + super().__init__(err) + + +class RenodeLauncher(Launcher): + """Start a new renode process for a given target and config file.""" + + def __init__(self, config_path: Optional[Path] = None): + super().__init__('renode', config_path) + self._start_cmd: List[str] = [] + + @staticmethod + def _allocate_port() -> int: + """Allocate renode ports. + + This is inherently racy but renode currently does not have proper + support for dynamic ports. It accecept 0 as a port and the OS allocates + a dynamic port but there is no API to retrive the port. + + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('localhost', 0)) + port = sock.getsockname()[1] + sock.close() + + return port + + def _pre_start( + self, + target: str, + file: Optional[Path] = None, + pause: bool = False, + debug: bool = False, + args: Optional[str] = None, + ) -> List[str]: + renode = self._config.get_target_emu(['executable']) + if not renode: + renode = self._config.get_emu(['executable'], optional=False) + + self._start_cmd.extend([f'{renode}', '--disable-xwt']) + port = self._allocate_port() + self._start_cmd.extend(['--robot-server-port', str(port)]) + self._handles.add_channel_tcp('robot', 'localhost', port) + + machine = self._config.get_target_emu(['machine'], optional=False) + self._start_cmd.extend(['--execute', f'mach add "{target}"']) + + self._start_cmd.extend( + ['--execute', f'machine LoadPlatformDescription @{machine}'] + ) + + terms = self._config.get_target_emu( + ['channels', 'terminals'], entry_type=dict + ) + for name in terms.keys(): + port = self._allocate_port() + dev_path = self._config.get_target_emu( + ['channels', 'terminals', name, 'device-path'], + optional=False, + entry_type=str, + ) + term_type = self._config.get_target_emu( + ['channels', 'terminals', name, 'type'], + entry_type=str, + ) + if not term_type: + term_type = self._config.get_emu( + ['channels', 'terminals', 'type'], + entry_type=str, + ) + if not term_type: + term_type = 'tcp' + + cmd = 'emulation ' + if term_type == 'tcp': + cmd += f'CreateServerSocketTerminal {port} "{name}" false' + self._handles.add_channel_tcp(name, 'localhost', port) + elif term_type == 'pty': + path = self._path(name) + cmd += f'CreateUartPtyTerminal "{name}" "{path}"' + self._handles.add_channel_pty(name, str(path)) + else: + raise InvalidChannelType(term_type) + + self._start_cmd.extend(['--execute', cmd]) + self._start_cmd.extend( + ['--execute', f'connector Connect {dev_path} {name}'] + ) + + port = self._allocate_port() + self._start_cmd.extend(['--execute', f'machine StartGdbServer {port}']) + self._handles.add_channel_tcp('gdb', 'localhost', port) + + if file: + self._start_cmd.extend(['--execute', f'sysbus LoadELF @{file}']) + + if not pause: + self._start_cmd.extend(['--execute', 'start']) + + return self._start_cmd + + def _post_start(self) -> None: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + robot = self._handles.channels['gdb'] + assert isinstance(robot, Handles.TcpChannel) + + # renode is slow to start especially during host load + deadline = time.monotonic() + 120 + connected = False + err = None + while time.monotonic() < deadline: + try: + sock.connect((robot.host, robot.port)) + connected = True + break + except OSError as exc: + err = exc + time.sleep(1) + + if not connected: + msg = 'failed to connect to robot channel' + msg += f'({robot.host}:{robot.port}): {err}' + raise RenodeRobotError(msg) + + sock.close() + + def _get_connector(self, wdir: Path) -> Connector: + return RenodeConnector(wdir) + + +class RenodeConnector(Connector): + """renode implementation for the emulator specific connector methods.""" + + def __init__(self, wdir: Path) -> None: + super().__init__(wdir) + if self.get_emu() != 'renode': + raise WrongEmulator('renode', self.get_emu()) + robot = self._handles.channels['robot'] + host = robot.host + port = robot.port + self._proxy = xmlrpc.client.ServerProxy(f'http://{host}:{port}/') + + def _request(self, cmd: str, args: List[str]) -> Any: + """Send a request using the robot interface. + + Using the robot interface is not ideal since it is designed + for testing. However, it is more robust than the ANSI colored, + echoed, log mixed, telnet interface. + + """ + + resp = self._proxy.run_keyword(cmd, args) + if not isinstance(resp, dict): + raise RenodeRobotError('expected dictionary in response') + if resp['status'] != 'PASS': + raise RenodeRobotError(resp['error']) + if resp.get('return'): + return resp['return'] + return None + + def reset(self) -> None: + self._request('ResetEmulation', []) + + def cont(self) -> None: + self._request('StartEmulation', []) + + def list_properties(self, path: str) -> List[Any]: + return self._request('ExecuteCommand', [f'{path}']) + + def get_property(self, path: str, prop: str) -> Any: + return self._request('ExecuteCommand', [f'{path} {prop}']) + + def set_property(self, path: str, prop: str, value: Any) -> None: + return self._request('ExecuteCommand', [f'{path} {prop} {value}']) diff --git a/pw_emu/py/pyproject.toml b/pw_emu/py/pyproject.toml new file mode 100644 index 000000000..78668a709 --- /dev/null +++ b/pw_emu/py/pyproject.toml @@ -0,0 +1,16 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +[build-system] +requires = ['setuptools', 'wheel'] +build-backend = 'setuptools.build_meta' diff --git a/pw_emu/py/setup.cfg b/pw_emu/py/setup.cfg new file mode 100644 index 000000000..5dd480aca --- /dev/null +++ b/pw_emu/py/setup.cfg @@ -0,0 +1,28 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +[metadata] +name = pw_emu +version = 0.0.1 +author = Pigweed Authors +author_email = pigweed-developers@googlegroups.com +description = Pigweed Emulators Frontend + +[options] +packages = find: +zip_safe = False +install_requires = + python-daemon + +[options.package_data] +pw_emu = py.typed diff --git a/pw_emu/py/setup.py b/pw_emu/py/setup.py new file mode 100644 index 000000000..7b1cffbda --- /dev/null +++ b/pw_emu/py/setup.py @@ -0,0 +1,18 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""pw_emu""" + +import setuptools # type: ignore + +setuptools.setup() # Package definition in setup.cfg diff --git a/pw_emu/py/tests/__init__.py b/pw_emu/py/tests/__init__.py new file mode 100644 index 000000000..0f2cb9104 --- /dev/null +++ b/pw_emu/py/tests/__init__.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. diff --git a/pw_emu/py/tests/cli_test.py b/pw_emu/py/tests/cli_test.py new file mode 100644 index 000000000..86bde6432 --- /dev/null +++ b/pw_emu/py/tests/cli_test.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Tests for the command line interface""" + +import os +import signal +import subprocess +import sys +import time +import unittest + +from pathlib import Path +from typing import List + +from mock_emu_frontend import _mock_emu +from tests.common import ConfigHelper + + +# TODO: b/301382004 - The Python Pigweed package install (into python-venv) +# races with running this test and there is no way to add that package as a test +# depedency without creating circular depedencies. This means we can't rely on +# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. +# +# Run the CLI directly instead of going through pw cli. +_cli_path = Path( + os.path.join(os.environ['PW_ROOT'], 'pw_emu', 'py', 'pw_emu', '__main__.py') +).resolve() + + +class TestCli(ConfigHelper): + """Test non-interactive commands""" + + _config = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'mock-emu': { + 'tcp_channel': True, + 'gdb_channel': True, + }, + 'gdb': _mock_emu + ['--exit', '--'], + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def _build_cmd(self, args: List[str]) -> List[str]: + cmd = [ + 'python', + str(_cli_path), + '--working-dir', + self._wdir.name, + '--config', + self._config_file, + ] + args + return cmd + + def _run(self, args: List[str], **kwargs) -> subprocess.CompletedProcess: + """Run the CLI and wait for completion""" + return subprocess.run(self._build_cmd(args), **kwargs) + + def _popen(self, args: List[str], **kwargs) -> subprocess.Popen: + """Run the CLI in the background""" + return subprocess.Popen(self._build_cmd(args), **kwargs) + + +class TestNonInteractive(TestCli): + """Test non interactive commands.""" + + def setUp(self) -> None: + super().setUp() + self.assertEqual(self._run(['start', 'test-target']).returncode, 0) + + def tearDown(self) -> None: + self.assertEqual(self._run(['stop']).returncode, 0) + super().tearDown() + + def test_already_running(self) -> None: + self.assertNotEqual(self._run(['start', 'test-target']).returncode, 0) + + def test_gdb_cmds(self) -> None: + status = self._run( + ['gdb-cmds', 'show version'], + ) + self.assertEqual(status.returncode, 0) + + def test_prop_ls(self) -> None: + status = self._run(['prop-ls', 'path1'], stdout=subprocess.PIPE) + self.assertEqual(status.returncode, 0) + self.assertTrue('prop1' in status.stdout.decode('ascii')) + status = self._run(['prop-ls', 'invalid path'], stdout=subprocess.PIPE) + self.assertNotEqual(status.returncode, 0) + + def test_prop_get(self) -> None: + status = self._run( + ['prop-get', 'invalid path', 'prop1'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-get', 'path1', 'invalid prop'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-get', 'path1', 'prop1'], + stdout=subprocess.PIPE, + ) + self.assertEqual(status.returncode, 0) + self.assertTrue('val1' in status.stdout.decode('ascii')) + + def test_prop_set(self) -> None: + status = self._run( + ['prop-set', 'invalid path', 'prop1', 'v'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-set', 'path1', 'invalid prop', 'v'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-set', 'path1', 'prop1', 'value'], + stdout=subprocess.PIPE, + ) + self.assertEqual(status.returncode, 0) + status = self._run( + ['prop-get', 'path1', 'prop1'], + stdout=subprocess.PIPE, + ) + self.assertEqual(status.returncode, 0) + self.assertTrue('value' in status.stdout.decode('ascii'), status.stdout) + + def test_reset(self) -> None: + self.assertEqual(self._run(['reset']).returncode, 0) + self.assertTrue(os.path.exists(os.path.join(self._wdir.name, 'reset'))) + + def test_load(self) -> None: + self.assertEqual(self._run(['load', 'executable']).returncode, 0) + + def test_resume(self) -> None: + self.assertEqual(self._run(['resume']).returncode, 0) + + +class TestForeground(TestCli): + """Test starting in foreground""" + + def _test_common(self, cmd) -> None: + # Run the CLI process in a new session so that we can terminate both the + # CLI and the mock emulator it spawns in the foreground. + args = {} + if sys.platform == 'win32': + args['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + else: + args['start_new_session'] = True + proc = self._popen(cmd, stdout=subprocess.PIPE, **args) + assert proc.stdout + output = proc.stdout.readline() + self.assertTrue( + 'starting mock emulator' in output.decode('utf-8'), + output.decode('utf-8'), + ) + if sys.platform == 'win32': + # See https://bugs.python.org/issue26350 + os.kill(proc.pid, signal.CTRL_BREAK_EVENT) + else: + os.kill(-proc.pid, signal.SIGTERM) + proc.wait() + proc.stdout.close() + + def test_foreground(self) -> None: + self._test_common(['start', '--foreground', 'test-target']) + + def test_debug(self) -> None: + self._test_common(['start', '--debug', 'test-target']) + + +class TestInteractive(TestCli): + """Test interactive commands""" + + def setUp(self) -> None: + super().setUp() + self.assertEqual(self._run(['start', 'test-target']).returncode, 0) + + def tearDown(self) -> None: + self.assertEqual(self._run(['stop']).returncode, 0) + super().tearDown() + + @staticmethod + def _read_nonblocking(fd: int, size: int) -> bytes: + try: + return os.read(fd, size) + except BlockingIOError: + return b'' + + def test_term(self) -> None: + """Test the pw emu term command""" + + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + + # pylint: disable=import-outside-toplevel + # Can't import pty on win32. + import pty + + # pylint: disable=no-member + # Avoid pylint false positive on win32. + pid, fd = pty.fork() + if pid == 0: + status = self._run(['term', 'tcp']) + # pylint: disable=protected-access + # Use os._exit instead of os.exit after fork. + os._exit(status.returncode) + else: + expected = '--- Miniterm on tcp ---' + + # Read the expected string with a timeout. + os.set_blocking(fd, False) + deadline = time.monotonic() + 5 + data = self._read_nonblocking(fd, len(expected)) + while len(data) < len(expected): + time.sleep(0.1) + data += self._read_nonblocking(fd, len(expected) - len(data)) + if time.monotonic() > deadline: + break + self.assertTrue( + expected in data.decode('ascii'), + data + self._read_nonblocking(fd, 100), + ) + + # send CTRL + ']' to terminate miniterm + os.write(fd, b'\x1d') + + # wait for the process to exit, with a timeout + deadline = time.monotonic() + 5 + wait_pid, ret = os.waitpid(pid, os.WNOHANG) + while wait_pid == 0: + time.sleep(0.1) + # Discard input to avoid writer hang on MacOS, + # see https://github.com/python/cpython/issues/97001. + try: + self._read_nonblocking(fd, 100) + except OSError: + # Avoid read errors when the child pair of the pty + # closes when the child terminates. + pass + wait_pid, ret = os.waitpid(pid, os.WNOHANG) + if time.monotonic() > deadline: + break + self.assertEqual(wait_pid, pid) + self.assertEqual(ret, 0) + + def test_gdb(self) -> None: + res = self._run(['gdb', '-e', 'executable'], stdout=subprocess.PIPE) + self.assertEqual(res.returncode, 0) + output = res.stdout.decode('ascii') + self.assertTrue('target remote' in output, output) + self.assertTrue('executable' in output, output) + + +if __name__ == '__main__': + unittest.main() diff --git a/pw_emu/py/tests/common.py b/pw_emu/py/tests/common.py new file mode 100644 index 000000000..9d2513fcc --- /dev/null +++ b/pw_emu/py/tests/common.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Common utilities for tests.""" + +import json +import os +import subprocess +import tempfile +import unittest + +from pathlib import Path +from typing import Any, Optional, Dict + +from pw_emu.frontend import Emulator + + +def check_prog(prog: str) -> tuple: + msg = f'running {prog}' + try: + proc = subprocess.run([prog, '--help'], capture_output=True) + if proc.returncode != 0: + output = proc.stdout.decode('ascii') + proc.stderr.decode('ascii') + msg = f'error {msg}: {output}' + return (False, msg) + except OSError as err: + msg = f'error {msg}: {str(err)}' + return (False, msg) + return (True, msg) + + +class ConfigHelper(unittest.TestCase): + """Helper that setups and tears down the configuration file""" + + _config: Optional[Dict[str, Any]] = None + + def setUp(self) -> None: + self._wdir = tempfile.TemporaryDirectory() + with tempfile.NamedTemporaryFile('wt', delete=False) as file: + pw_emu_config: Dict[str, Any] = {'pw': {'pw_emu': {}}} + if self._config: + pw_emu_config['pw']['pw_emu'].update(self._config) + json.dump(pw_emu_config, file) + self._config_file = file.name + + def tearDown(self) -> None: + self._wdir.cleanup() + os.unlink(self._config_file) + + +class ConfigHelperWithEmulator(ConfigHelper): + """Helper that setups and tears down the configuration file""" + + def setUp(self) -> None: + super().setUp() + self._emu = Emulator(Path(self._wdir.name), Path(self._config_file)) diff --git a/pw_emu/py/tests/core_test.py b/pw_emu/py/tests/core_test.py new file mode 100644 index 000000000..1131d5dd3 --- /dev/null +++ b/pw_emu/py/tests/core_test.py @@ -0,0 +1,480 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Tests for core / infrastructure code.""" + +import copy +import io +import json +import os +import socket +import sys +import tempfile +import time +import unittest +from pathlib import Path +from typing import Any, Dict + +from unittest.mock import patch + +from pw_emu.core import ( + AlreadyRunning, + Config, + ConfigError, + Handles, + InvalidTarget, + InvalidChannelName, + InvalidChannelType, + Launcher, +) +from mock_emu_frontend import _mock_emu +from tests.common import ConfigHelper + + +class ConfigHelperWithLauncher(ConfigHelper): + def setUp(self) -> None: + super().setUp() + self._launcher = Launcher.get('mock-emu', Path(self._config_file)) + + +class TestInvalidTarget(ConfigHelperWithLauncher): + """Check that InvalidTarget is raised with an empty config.""" + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + } + + def test_invalid_target(self) -> None: + with self.assertRaises(InvalidTarget): + self._launcher.start(Path(self._wdir.name), 'test-target') + + +class TestStart(ConfigHelperWithLauncher): + """Start tests for valid config.""" + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._connector = self._launcher.start( + Path(self._wdir.name), 'test-target' + ) + + def tearDown(self) -> None: + self._connector.stop() + super().tearDown() + + def test_running(self) -> None: + self.assertTrue(self._connector.running()) + + def test_pid_file(self) -> None: + self.assertTrue( + os.path.exists(os.path.join(self._wdir.name, 'mock-emu.pid')) + ) + + def test_handles_file(self) -> None: + self.assertTrue( + os.path.exists(os.path.join(self._wdir.name, 'handles.json')) + ) + + def test_already_running(self) -> None: + with self.assertRaises(AlreadyRunning): + self._launcher.start(Path(self._wdir.name), 'test-target') + + def test_log(self) -> None: + exp = 'starting mock emulator' + path = os.path.join(self._wdir.name, 'mock-emu.log') + deadline = time.monotonic() + 100 + while os.path.getsize(path) < len(exp): + time.sleep(0.1) + if time.monotonic() > deadline: + break + + with open(os.path.join(self._wdir.name, 'mock-emu.log'), 'rt') as file: + data = file.read() + self.assertTrue(exp in data, data) + + +class TestPrePostStartCmds(ConfigHelperWithLauncher): + """Tests for configurations with pre-start commands.""" + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'targets': { + 'test-target': { + 'pre-start-cmds': { + 'pre-1': _mock_emu + ['pre-1'], + 'pre-2': _mock_emu + ['$pw_emu_wdir{pre-2}'], + }, + 'post-start-cmds': { + 'post-1': _mock_emu + + ['$pw_emu_channel_path{test_subst_pty}'], + 'post-2': _mock_emu + + [ + '$pw_emu_channel_host{test_subst_tcp}', + '$pw_emu_channel_port{test_subst_tcp}', + ], + }, + 'mock-emu': {}, + } + }, + } + + def setUp(self) -> None: + super().setUp() + self._connector = self._launcher.start( + Path(self._wdir.name), 'test-target' + ) + + def tearDown(self) -> None: + self._connector.stop() + super().tearDown() + + def test_running(self) -> None: + for proc in self._connector.get_procs().keys(): + self.assertTrue(self._connector.proc_running(proc)) + + def test_stop(self) -> None: + self._connector.stop() + for proc in self._connector.get_procs().keys(): + self.assertFalse(self._connector.proc_running(proc)) + + def test_pid_files(self) -> None: + for proc in ['pre-1', 'pre-2', 'post-1', 'post-2']: + self.assertTrue( + os.path.exists(os.path.join(self._wdir.name, f'{proc}.pid')) + ) + + def test_logs(self): + expect = { + 'pre-1.log': 'pre-1', + 'pre-2.log': os.path.join(self._wdir.name, 'pre-2'), + 'post-1.log': 'pty-path', + 'post-2.log': 'localhost 1234', + } + + for log, pattern in expect.items(): + path = os.path.join(self._wdir.name, log) + deadline = time.monotonic() + 100 + while os.path.getsize(path) < len(pattern): + time.sleep(0.1) + if time.monotonic() > deadline: + break + with open(os.path.join(self._wdir.name, log)) as file: + data = file.read() + self.assertTrue(pattern in data, f'`{pattern}` not in `{data}`') + + +class TestStop(ConfigHelperWithLauncher): + """Stop tests for valid config.""" + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._connector = self._launcher.start( + Path(self._wdir.name), 'test-target' + ) + self._connector.stop() + + def test_pid_files(self) -> None: + self.assertFalse( + os.path.exists(os.path.join(self._wdir.name, 'emu.pid')) + ) + + def test_target_file(self) -> None: + self.assertFalse( + os.path.exists(os.path.join(self._wdir.name, 'target')) + ) + + def test_running(self) -> None: + self.assertFalse(self._connector.running()) + + +class TestChannels(ConfigHelperWithLauncher): + """Test Connector channels APIs.""" + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'mock-emu': { + 'tcp_channel': True, + 'pty_channel': sys.platform != 'win32', + }, + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._connector = self._launcher.start( + Path(self._wdir.name), 'test-target' + ) + + def tearDown(self) -> None: + self._connector.stop() + super().tearDown() + + def test_tcp_channel_addr(self) -> None: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(self._connector.get_channel_addr('tcp')) + sock.close() + + def test_get_channel_type(self) -> None: + self.assertEqual(self._connector.get_channel_type('tcp'), 'tcp') + if sys.platform != 'win32': + self.assertEqual(self._connector.get_channel_type('pty'), 'pty') + with self.assertRaises(InvalidChannelName): + self._connector.get_channel_type('invalid channel') + + def test_pty_channel_path(self) -> None: + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + self.assertTrue(os.path.exists(self._connector.get_channel_path('pty'))) + with self.assertRaises(InvalidChannelType): + self._connector.get_channel_path('tcp') + + def _test_stream(self, stream: io.RawIOBase) -> None: + for char in [b'1', b'2', b'3', b'4']: + stream.write(char) + self.assertEqual(stream.read(1), char) + + def test_tcp_stream(self) -> None: + with self._connector.get_channel_stream('tcp') as stream: + self._test_stream(stream) + + def test_pty_stream(self) -> None: + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + with self._connector.get_channel_stream('pty') as stream: + self._test_stream(stream) + + +class TestTargetFragments(unittest.TestCase): + """Tests for configurations using target fragments.""" + + _config_templ: Dict[str, Any] = { + 'pw': { + 'pw_emu': { + 'target_files': [], + 'targets': { + 'test-target': { + 'test-key': 'test-value', + } + }, + } + } + } + + _tf1_config: Dict[str, Any] = { + 'targets': { + 'test-target1': {}, + 'test-target': { + 'test-key': 'test-value-file1', + }, + } + } + + _tf2_config: Dict[str, Any] = {'targets': {'test-target2': {}}} + + def setUp(self) -> None: + with tempfile.NamedTemporaryFile( + 'wt', delete=False + ) as config_file, tempfile.NamedTemporaryFile( + 'wt', delete=False + ) as targets1_file, tempfile.NamedTemporaryFile( + 'wt', delete=False + ) as targets2_file: + self._config_path = config_file.name + self._targets1_path = targets1_file.name + self._targets2_path = targets2_file.name + json.dump(self._tf1_config, targets1_file) + json.dump(self._tf2_config, targets2_file) + config = copy.deepcopy(self._config_templ) + config['pw']['pw_emu']['target_files'].append(self._targets1_path) + config['pw']['pw_emu']['target_files'].append(self._targets2_path) + json.dump(config, config_file) + self._config = Config(Path(self._config_path)) + + def tearDown(self) -> None: + os.unlink(self._config_path) + os.unlink(self._targets1_path) + os.unlink(self._targets2_path) + + def test_targets_loaded(self) -> None: + self.assertIsNotNone(self._config.get(['targets', 'test-target'])) + self.assertIsNotNone(self._config.get(['targets', 'test-target1'])) + self.assertIsNotNone(self._config.get(['targets', 'test-target2'])) + + def test_targets_priority(self) -> None: + self.assertEqual( + self._config.get(['targets', 'test-target', 'test-key']), + 'test-value', + ) + + +class TestHandles(unittest.TestCase): + """Tests for Handles.""" + + _config = { + 'emu': 'mock-emu', + 'config': 'test-config', + 'target': 'test-target', + 'gdb_cmd': ['test-gdb'], + 'channels': { + 'tcp_chan': {'type': 'tcp', 'host': 'localhost', 'port': 1234}, + 'pty_chan': {'type': 'pty', 'path': 'path'}, + }, + 'procs': {'proc0': {'pid': 1983}, 'proc1': {'pid': 1234}}, + } + + def test_serialize(self): + handles = Handles('mock-emu', 'test-config') + handles.add_channel_tcp('tcp_chan', 'localhost', 1234) + handles.add_channel_pty('pty_chan', 'path') + handles.add_proc('proc0', 1983) + handles.add_proc('proc1', 1234) + handles.set_target('test-target') + handles.set_gdb_cmd(['test-gdb']) + tmp = tempfile.TemporaryDirectory() + handles.save(Path(tmp.name)) + with open(os.path.join(tmp.name, 'handles.json'), 'rt') as file: + self.assertTrue(json.load(file) == self._config) + tmp.cleanup() + + def test_load(self): + tmp = tempfile.TemporaryDirectory() + with open(os.path.join(tmp.name, 'handles.json'), 'wt') as file: + json.dump(self._config, file) + handles = Handles.load(Path(tmp.name)) + self.assertEqual(handles.emu, 'mock-emu') + self.assertEqual(handles.gdb_cmd, ['test-gdb']) + self.assertEqual(handles.target, 'test-target') + self.assertEqual(handles.config, 'test-config') + tmp.cleanup() + + +class TestConfig(ConfigHelper): + """Stop tests for valid config.""" + + _config: Dict[str, Any] = { + 'top': 'entry', + 'multi': { + 'level': { + 'entry': 0, + }, + }, + 'subst': 'a/$pw_env{PW_EMU_TEST_ENV_SUBST}/c', + 'targets': { + 'test-target': { + 'entry': [1, 2, 3], + 'mock-emu': { + 'entry': 'test', + }, + } + }, + 'mock-emu': { + 'executable': _mock_emu, + }, + 'list': ['a', '$pw_env{PW_EMU_TEST_ENV_SUBST}', 'c'], + 'bad-subst-type': '$pw_bad_subst_type{test}', + } + + def setUp(self) -> None: + super().setUp() + self._cfg = Config(Path(self._config_file), 'test-target', 'mock-emu') + + def test_top_entry(self) -> None: + self.assertEqual(self._cfg.get(['top']), 'entry') + + def test_empty_subst(self) -> None: + with self.assertRaises(ConfigError): + self._cfg.get(['subst']) + + def test_subst(self) -> None: + with patch.dict('os.environ', {'PW_EMU_TEST_ENV_SUBST': 'b'}): + self.assertEqual(self._cfg.get(['subst']), 'a/b/c') + + def test_multi_level_entry(self) -> None: + self.assertEqual(self._cfg.get(['multi', 'level', 'entry']), 0) + + def test_get_target(self) -> None: + self.assertEqual(self._cfg.get_targets(), ['test-target']) + + def test_target(self) -> None: + self.assertEqual(self._cfg.get_target(['entry']), [1, 2, 3]) + + def test_target_emu(self) -> None: + self.assertEqual(self._cfg.get_target_emu(['entry']), 'test') + + def test_type_checking(self) -> None: + with self.assertRaises(ConfigError): + self._cfg.get(['top'], entry_type=int) + self._cfg.get(['top'], entry_type=str) + self._cfg.get_target(['entry'], entry_type=list) + self._cfg.get_target_emu(['entry'], entry_type=str) + self._cfg.get(['targets'], entry_type=dict) + + def test_non_optional(self) -> None: + with self.assertRaises(ConfigError): + self._cfg.get(['non-existing'], optional=False) + + def test_optional(self) -> None: + self.assertEqual(self._cfg.get(['non-existing']), None) + self.assertEqual(self._cfg.get(['non-existing'], entry_type=int), 0) + self.assertEqual(self._cfg.get(['non-existing'], entry_type=str), '') + self.assertEqual(self._cfg.get(['non-existing'], entry_type=list), []) + + def test_list(self) -> None: + with self.assertRaises(ConfigError): + self._cfg.get(['list']) + with patch.dict('os.environ', {'PW_EMU_TEST_ENV_SUBST': 'b'}): + self.assertEqual(self._cfg.get(['list']), ['a', 'b', 'c']) + + def test_bad_subst(self) -> None: + with self.assertRaises(ConfigError): + self._cfg.get(['bad-subst-type']) + + +if __name__ == '__main__': + unittest.main() diff --git a/pw_emu/py/tests/frontend_test.py b/pw_emu/py/tests/frontend_test.py new file mode 100644 index 000000000..36ea3d7b2 --- /dev/null +++ b/pw_emu/py/tests/frontend_test.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Emulator API tests.""" + +import unittest + +from typing import Any, Dict + +from pw_emu.core import ( + ConfigError, + InvalidChannelType, + InvalidProperty, + InvalidPropertyPath, +) +from mock_emu_frontend import _mock_emu +from tests.common import ConfigHelperWithEmulator + + +class TestEmulator(ConfigHelperWithEmulator): + """Test Emulator APIs.""" + + _config = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'mock-emu': { + 'gdb_channel': True, + }, + 'gdb': _mock_emu + ['--exit', '--'], + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._emu.start('test-target') + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_gdb_target_remote(self) -> None: + output = self._emu.run_gdb_cmds([]).stdout + host, port = self._emu.get_channel_addr('gdb') + self.assertTrue(f'{host}:{port}' in output.decode('utf-8')) + + def test_gdb_commands(self) -> None: + output = self._emu.run_gdb_cmds(['test_gdb_cmd']).stdout + self.assertTrue( + output and '-ex test_gdb_cmd' in output.decode('utf-8'), output + ) + + def test_gdb_executable(self) -> None: + output = self._emu.run_gdb_cmds([], 'test_gdb_exec').stdout + self.assertTrue('test_gdb_exec' in output.decode('utf-8')) + + def test_gdb_pause(self) -> None: + output = self._emu.run_gdb_cmds([], pause=True).stdout + self.assertTrue('-ex disconnect' in output.decode('utf-8')) + + # Minimal testing for APIs that are straight wrappers over Connector APIs. + def test_running(self) -> None: + self.assertTrue(self._emu.running()) + + def test_reset(self) -> None: + self._emu.reset() + + def test_cont(self) -> None: + self._emu.cont() + + def test_list_properties(self) -> None: + with self.assertRaises(InvalidPropertyPath): + self._emu.list_properties('invalid path') + self.assertEqual(self._emu.list_properties('path1'), ['prop1']) + + def test_get_property(self) -> None: + with self.assertRaises(InvalidProperty): + self._emu.get_property('path1', 'invalid property') + with self.assertRaises(InvalidPropertyPath): + self._emu.get_property('invalid path', 'prop1') + self.assertEqual(self._emu.get_property('path1', 'prop1'), 'val1') + + def test_set_property(self) -> None: + with self.assertRaises(InvalidPropertyPath): + self._emu.set_property('invalid path', 'prop1', 'test') + with self.assertRaises(InvalidProperty): + self._emu.set_property('path1', 'invalid property', 'test') + self._emu.set_property('path1', 'prop1', 'val2') + self.assertEqual(self._emu.get_property('path1', 'prop1'), 'val2') + + def test_get_channel_type(self) -> None: + self.assertEqual(self._emu.get_channel_type('gdb'), 'tcp') + + def test_get_channel_path(self) -> None: + with self.assertRaises(InvalidChannelType): + self._emu.get_channel_path('gdb') + + def test_get_channel_addr(self) -> None: + self.assertEqual(len(self._emu.get_channel_addr('gdb')), 2) + + def test_channel_stream(self) -> None: + with self._emu.get_channel_stream('gdb') as _: + pass + + +class TestGdbEmptyConfig(ConfigHelperWithEmulator): + """Check that ConfigError is raised when running gdb with an empty + gdb config. + + """ + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._emu.start('test-target') + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_gdb_config_error(self) -> None: + with self.assertRaises(ConfigError): + self._emu.run_gdb_cmds([]) + + +if __name__ == '__main__': + unittest.main() diff --git a/pw_emu/py/tests/py.typed b/pw_emu/py/tests/py.typed new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/pw_emu/py/tests/py.typed diff --git a/pw_emu/py/tests/qemu_test.py b/pw_emu/py/tests/qemu_test.py new file mode 100644 index 000000000..c84fb2b6d --- /dev/null +++ b/pw_emu/py/tests/qemu_test.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""QEMU emulator tests.""" + +import json +import os +import socket +import sys +import tempfile +import time +import unittest + +from pathlib import Path +from typing import Any, Dict, Optional + +from pw_emu.core import InvalidChannelName, InvalidChannelType +from tests.common import check_prog, ConfigHelperWithEmulator + + +# TODO: b/301382004 - The Python Pigweed package install (into python-venv) +# races with running this test and there is no way to add that package as a test +# depedency without creating circular depedencies. This means we can't rely on +# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. +# +# run the arm_gdb.py wrapper directly +_arm_none_eabi_gdb_path = Path( + os.path.join( + os.environ['PW_ROOT'], + 'pw_env_setup', + 'py', + 'pw_env_setup', + 'entry_points', + 'arm_gdb.py', + ) +).resolve() + + +class TestQemu(ConfigHelperWithEmulator): + """Tests for a valid qemu configuration.""" + + _config = { + 'gdb': ['python', str(_arm_none_eabi_gdb_path)], + 'qemu': { + 'executable': 'qemu-system-arm', + }, + 'targets': { + 'test-target': { + 'ignore1': None, + 'qemu': { + 'machine': 'lm3s6965evb', + 'channels': { + 'chardevs': { + 'test_uart': { + 'id': 'serial0', + } + } + }, + }, + 'ignore2': None, + } + }, + } + + def setUp(self) -> None: + super().setUp() + # No image so start paused to avoid crashing. + self._emu.start(target='test-target', pause=True) + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_running(self) -> None: + self.assertTrue(self._emu.running()) + + def test_list_properties(self) -> None: + self.assertIsNotNone(self._emu.list_properties('/machine')) + + def test_get_property(self) -> None: + self.assertEqual( + self._emu.get_property('/machine', 'type'), 'lm3s6965evb-machine' + ) + + def test_set_property(self) -> None: + self._emu.set_property('/machine', 'graphics', False) + self.assertFalse(self._emu.get_property('/machine', 'graphics')) + + def test_bad_channel_name(self) -> None: + with self.assertRaises(InvalidChannelName): + self._emu.get_channel_addr('serial1') + + def get_reg(self, addr: int) -> bytes: + temp = tempfile.NamedTemporaryFile(delete=False) + temp.close() + + res = self._emu.run_gdb_cmds( + [ + f'dump val {temp.name} *(char*){addr}', + 'disconnect', + ] + ) + self.assertEqual(res.returncode, 0, res.stderr.decode('ascii')) + + with open(temp.name, 'rb') as file: + ret = file.read(1) + + self.assertNotEqual(ret, b'', res.stderr.decode('ascii')) + + os.unlink(temp.name) + + return ret + + def poll_data(self, timeout: int) -> Optional[bytes]: + uartris = 0x4000C03C + uartrd = 0x4000C000 + + deadline = time.monotonic() + timeout + while self.get_reg(uartris) == b'\x00': + time.sleep(0.1) + if time.monotonic() > deadline: + return None + return self.get_reg(uartrd) + + def test_channel_stream(self) -> None: + ok, msg = check_prog('arm-none-eabi-gdb') + if not ok: + self.skipTest(msg) + + stream = self._emu.get_channel_stream('test_uart') + stream.write('test\n'.encode('ascii')) + + self.assertEqual(self.poll_data(5), b't') + self.assertEqual(self.poll_data(5), b'e') + self.assertEqual(self.poll_data(5), b's') + self.assertEqual(self.poll_data(5), b't') + + def test_gdb(self) -> None: + self._emu.run_gdb_cmds(['c']) + deadline = time.monotonic() + 5 + while self._emu.running(): + if time.monotonic() > deadline: + return + self.assertFalse(self._emu.running()) + + +class TestQemuChannelsTcp(TestQemu): + """Tests for configurations using TCP channels.""" + + _config: Dict[str, Any] = {} + _config.update(json.loads(json.dumps(TestQemu._config))) + _config['qemu']['channels'] = {'type': 'tcp'} + + def test_get_channel_addr(self) -> None: + host, port = self._emu.get_channel_addr('test_uart') + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.close() + + +class TestQemuChannelsPty(TestQemu): + """Tests for configurations using PTY channels.""" + + _config: Dict[str, Any] = {} + _config.update(json.loads(json.dumps(TestQemu._config))) + _config['qemu']['channels'] = {'type': 'pty'} + + def setUp(self): + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + super().setUp() + + def test_get_path(self) -> None: + self.assertTrue(os.path.exists(self._emu.get_channel_path('test_uart'))) + + +class TestQemuInvalidChannelType(ConfigHelperWithEmulator): + """Test invalid channel type configuration.""" + + _config = { + 'qemu': { + 'executable': 'qemu-system-arm', + 'channels': {'type': 'invalid'}, + }, + 'targets': { + 'test-target': { + 'qemu': { + 'machine': 'lm3s6965evb', + } + } + }, + } + + def test_start(self) -> None: + with self.assertRaises(InvalidChannelType): + self._emu.start('test-target', pause=True) + + +class TestQemuTargetChannelsMixed(ConfigHelperWithEmulator): + """Test configuration with mixed channels types.""" + + _config = { + 'qemu': { + 'executable': 'qemu-system-arm', + }, + 'targets': { + 'test-target': { + 'qemu': { + 'machine': 'lm3s6965evb', + 'channels': { + 'chardevs': { + 'test_uart0': { + 'id': 'serial0', + }, + 'test_uart1': { + 'id': 'serial1', + 'type': 'tcp', + }, + 'test_uart2': { + 'id': 'serial2', + 'type': 'pty', + }, + } + }, + } + } + }, + } + + def setUp(self) -> None: + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + super().setUp() + # no image to run so start paused + self._emu.start('test-target', pause=True) + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_uart0_addr(self) -> None: + host, port = self._emu.get_channel_addr('test_uart0') + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.close() + + def test_uart1_addr(self) -> None: + host, port = self._emu.get_channel_addr('test_uart1') + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.close() + + def test_uart2_path(self) -> None: + self.assertTrue( + os.path.exists(self._emu.get_channel_path('test_uart2')) + ) + + +def main() -> None: + ok, msg = check_prog('qemu-system-arm') + if not ok: + print(f'skipping tests: {msg}') + sys.exit(0) + + unittest.main() + + +if __name__ == '__main__': + main() diff --git a/pw_emu/py/tests/renode_test.py b/pw_emu/py/tests/renode_test.py new file mode 100644 index 000000000..acb9e47a9 --- /dev/null +++ b/pw_emu/py/tests/renode_test.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""renode emulator tests.""" + +import json +import os +import sys +import struct +import tempfile +import time +import unittest + +from pathlib import Path +from typing import Any, Dict, Optional + +from pw_emu.core import InvalidChannelName, InvalidChannelType +from tests.common import check_prog, ConfigHelperWithEmulator + + +# TODO: b/301382004 - The Python Pigweed package install (into python-venv) +# races with running this test and there is no way to add that package as a test +# depedency without creating circular depedencies. This means we can't rely on +# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. +# +# run the arm_gdb.py wrapper directly +_arm_none_eabi_gdb_path = Path( + os.path.join( + os.environ['PW_ROOT'], + 'pw_env_setup', + 'py', + 'pw_env_setup', + 'entry_points', + 'arm_gdb.py', + ) +).resolve() + + +class TestRenode(ConfigHelperWithEmulator): + """Tests for a valid renode configuration.""" + + _config = { + 'gdb': ['python', str(_arm_none_eabi_gdb_path)], + 'renode': { + 'executable': 'renode', + }, + 'targets': { + 'test-target': { + 'renode': { + 'machine': 'platforms/boards/stm32f4_discovery-kit.repl', + } + } + }, + } + + def setUp(self) -> None: + super().setUp() + # no image to run so start paused + self._emu.start(target='test-target', pause=True) + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_running(self) -> None: + self.assertTrue(self._emu.running()) + + def test_list_properties(self) -> None: + self.assertIsNotNone(self._emu.list_properties('sysbus.usart1')) + + def test_get_property(self) -> None: + self.assertIsNotNone( + self._emu.get_property('sysbus.usart1', 'BaudRate') + ) + + def test_set_property(self) -> None: + self._emu.set_property('sysbus.timer1', 'Frequency', 100) + self.assertEqual( + int(self._emu.get_property('sysbus.timer1', 'Frequency'), 16), 100 + ) + + +class TestRenodeInvalidChannelType(ConfigHelperWithEmulator): + """Test invalid channel type configuration.""" + + _config = { + 'renode': { + 'executable': 'renode', + }, + 'targets': { + 'test-target': { + 'renode': { + 'machine': 'platforms/boards/stm32f4_discovery-kit.repl', + 'channels': { + 'terminals': { + 'test_uart': { + 'device-path': 'sysbus.usart1', + 'type': 'invalid', + } + } + }, + } + } + }, + } + + def test_start(self) -> None: + with self.assertRaises(InvalidChannelType): + self._emu.start('test-target', pause=True) + + +class TestRenodeChannels(ConfigHelperWithEmulator): + """Tests for a valid renode channels configuration.""" + + _config = { + 'gdb': ['python', str(_arm_none_eabi_gdb_path)], + 'renode': { + 'executable': 'renode', + }, + 'targets': { + 'test-target': { + 'renode': { + 'machine': 'platforms/boards/stm32f4_discovery-kit.repl', + 'channels': { + 'terminals': { + 'test_uart': { + 'device-path': 'sysbus.usart1', + } + } + }, + } + } + }, + } + + def setUp(self) -> None: + super().setUp() + # no image to run so start paused + self._emu.start(target='test-target', pause=True) + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_bad_channel_name(self) -> None: + with self.assertRaises(InvalidChannelName): + self._emu.get_channel_addr('serial1') + + def set_reg(self, addr: int, val: int) -> None: + self._emu.run_gdb_cmds([f'set *(unsigned int*){addr}={val}']) + + def get_reg(self, addr: int) -> int: + temp = tempfile.NamedTemporaryFile(delete=False) + temp.close() + + res = self._emu.run_gdb_cmds( + [ + f'dump val {temp.name} *(char*){addr}', + 'disconnect', + ] + ) + self.assertEqual(res.returncode, 0, res.stderr.decode('ascii')) + + with open(temp.name, 'rb') as file: + ret = file.read(1) + + self.assertNotEqual(ret, b'', res.stderr.decode('ascii')) + + os.unlink(temp.name) + + return struct.unpack('B', ret)[0] + + def poll_data(self, timeout: int) -> Optional[int]: + usart_sr = 0x40011000 + usart_dr = 0x40011004 + deadline = time.monotonic() + timeout + while self.get_reg(usart_sr) & 0x20 == 0: + time.sleep(0.1) + if time.monotonic() > deadline: + return None + return self.get_reg(usart_dr) + + def test_channel_stream(self) -> None: + ok, msg = check_prog('arm-none-eabi-gdb') + if not ok: + self.skipTest(msg) + + usart_cr1 = 0x4001100C + # enable RX and TX + self.set_reg(usart_cr1, 0xC) + + stream = self._emu.get_channel_stream('test_uart') + stream.write('test\n'.encode('ascii')) + + self.assertEqual(self.poll_data(5), ord('t')) + self.assertEqual(self.poll_data(5), ord('e')) + self.assertEqual(self.poll_data(5), ord('s')) + self.assertEqual(self.poll_data(5), ord('t')) + + +class TestRenodeChannelsPty(TestRenodeChannels): + """Tests for configurations using PTY channels.""" + + _config: Dict[str, Any] = {} + _config.update(json.loads(json.dumps(TestRenodeChannels._config))) + _config['renode']['channels'] = {'terminals': {'type': 'pty'}} + + def setUp(self): + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + super().setUp() + + def test_get_path(self) -> None: + self.assertTrue(os.path.exists(self._emu.get_channel_path('test_uart'))) + + +def main() -> None: + ok, msg = check_prog('renode') + if not ok: + print(f'skipping tests: {msg}') + sys.exit(0) + + unittest.main() + + +if __name__ == '__main__': + main() |