aboutsummaryrefslogtreecommitdiff
path: root/pw_emu/py
diff options
context:
space:
mode:
Diffstat (limited to 'pw_emu/py')
-rw-r--r--pw_emu/py/BUILD.gn68
-rw-r--r--pw_emu/py/mock_emu.py111
-rw-r--r--pw_emu/py/mock_emu_frontend.py154
-rw-r--r--pw_emu/py/pw_emu/__init__.py14
-rw-r--r--pw_emu/py/pw_emu/__main__.py448
-rw-r--r--pw_emu/py/pw_emu/core.py956
-rw-r--r--pw_emu/py/pw_emu/frontend.py328
-rw-r--r--pw_emu/py/pw_emu/pigweed_emulators.py27
-rw-r--r--pw_emu/py/pw_emu/py.typed0
-rw-r--r--pw_emu/py/pw_emu/qemu.py343
-rw-r--r--pw_emu/py/pw_emu/renode.py209
-rw-r--r--pw_emu/py/pyproject.toml16
-rw-r--r--pw_emu/py/setup.cfg28
-rw-r--r--pw_emu/py/setup.py18
-rw-r--r--pw_emu/py/tests/__init__.py14
-rw-r--r--pw_emu/py/tests/cli_test.py276
-rw-r--r--pw_emu/py/tests/common.py67
-rw-r--r--pw_emu/py/tests/core_test.py480
-rw-r--r--pw_emu/py/tests/frontend_test.py150
-rw-r--r--pw_emu/py/tests/py.typed0
-rw-r--r--pw_emu/py/tests/qemu_test.py280
-rw-r--r--pw_emu/py/tests/renode_test.py238
22 files changed, 4225 insertions, 0 deletions
diff --git a/pw_emu/py/BUILD.gn b/pw_emu/py/BUILD.gn
new file mode 100644
index 000000000..bcf26e44e
--- /dev/null
+++ b/pw_emu/py/BUILD.gn
@@ -0,0 +1,68 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import("//build_overrides/pigweed.gni")
+
+import("$dir_pw_build/python.gni")
+
+pw_python_package("py") {
+ setup = [
+ "pyproject.toml",
+ "setup.cfg",
+ "setup.py",
+ ]
+ sources = [
+ "pw_emu/__init__.py",
+ "pw_emu/__main__.py",
+ "pw_emu/core.py",
+ "pw_emu/frontend.py",
+ "pw_emu/pigweed_emulators.py",
+ "pw_emu/qemu.py",
+ "pw_emu/renode.py",
+ ]
+ tests = [
+ "tests/__init__.py",
+ "tests/cli_test.py",
+ "tests/common.py",
+ "tests/core_test.py",
+ "tests/frontend_test.py",
+ "tests/qemu_test.py",
+ "tests/renode_test.py",
+ ]
+ pylintrc = "$dir_pigweed/.pylintrc"
+ mypy_ini = "$dir_pigweed/.mypy.ini"
+ python_deps = [
+ "$dir_pw_build/py",
+ "$dir_pw_env_setup/py",
+ ]
+}
+
+pw_python_script("mock_emu") {
+ sources = [ "mock_emu.py" ]
+ pylintrc = "$dir_pigweed/.pylintrc"
+ mypy_ini = "$dir_pigweed/.mypy.ini"
+ action = {
+ stamp = true
+ }
+}
+
+pw_python_script("mock_emu_frontend") {
+ sources = [ "mock_emu_frontend.py" ]
+ python_deps = [ "$dir_pw_env_setup/py" ]
+ pylintrc = "$dir_pigweed/.pylintrc"
+ mypy_ini = "$dir_pigweed/.mypy.ini"
+ action = {
+ stamp = true
+ }
+}
diff --git a/pw_emu/py/mock_emu.py b/pw_emu/py/mock_emu.py
new file mode 100644
index 000000000..1a49b2ba2
--- /dev/null
+++ b/pw_emu/py/mock_emu.py
@@ -0,0 +1,111 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Mock emulator used for testing process and channel management."""
+
+import argparse
+import os
+import socket
+import sys
+import time
+
+from threading import Thread
+
+
+def _tcp_thread(sock: socket.socket) -> None:
+ conn, _ = sock.accept()
+ while True:
+ data = conn.recv(1)
+ conn.send(data)
+
+
+def _pty_thread(fd: int) -> None:
+ while True:
+ data = os.read(fd, 1)
+ os.write(fd, data)
+
+
+def _get_parser() -> argparse.ArgumentParser:
+ """Command line parser."""
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '-C', '--working-dir', metavar='PATH', help='working directory'
+ )
+ parser.add_argument(
+ 'echo', metavar='STRING', nargs='*', help='write STRING to stdout'
+ )
+ parser.add_argument(
+ '--tcp-channel',
+ action='append',
+ default=[],
+ metavar='NAME',
+ help='listen for TCP connections, write port WDIR/NAME',
+ )
+ if sys.platform != 'win32':
+ parser.add_argument(
+ '--pty-channel',
+ action='append',
+ default=[],
+ metavar='NAME',
+ help='create pty channel and link in WDIR/NAME',
+ )
+ parser.add_argument(
+ '--exit', action='store_true', default=False, help='exit when done'
+ )
+
+ return parser
+
+
+def main() -> None:
+ """Mock emulator."""
+
+ args = _get_parser().parse_args()
+
+ if len(args.echo) > 0:
+ print(' '.join(args.echo))
+ sys.stdout.flush()
+
+ threads = []
+
+ for chan in args.tcp_channel:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.bind(('localhost', 0))
+ port = sock.getsockname()[1]
+ sock.listen()
+ with open(os.path.join(args.working_dir, chan), 'w') as file:
+ file.write(str(port))
+ thread = Thread(target=_tcp_thread, args=(sock,))
+ thread.start()
+ threads.append(thread)
+
+ if sys.platform != 'win32':
+ for chan in args.pty_channel:
+ controller, tty = os.openpty()
+ with open(os.path.join(args.working_dir, chan), 'w') as file:
+ file.write(os.ttyname(tty))
+ thread = Thread(target=_pty_thread, args=(controller,))
+ thread.start()
+ threads.append(thread)
+
+ for thread in threads:
+ thread.join()
+
+ if not args.exit:
+ while True:
+ time.sleep(1)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/pw_emu/py/mock_emu_frontend.py b/pw_emu/py/mock_emu_frontend.py
new file mode 100644
index 000000000..5ab370ac9
--- /dev/null
+++ b/pw_emu/py/mock_emu_frontend.py
@@ -0,0 +1,154 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Launcher and Connector for mock emulator"""
+
+import json
+import os
+from pathlib import Path
+from typing import Any, Optional, List, Union
+import time
+
+from pw_emu.core import (
+ Connector,
+ Launcher,
+ InvalidProperty,
+ InvalidPropertyPath,
+)
+
+# mock emulator script
+_mock_emu = [
+ 'python',
+ os.path.join(Path(os.path.dirname(__file__)).resolve(), 'mock_emu.py'),
+]
+
+
+def wait_for_file_size(
+ path: Union[os.PathLike, str], size: int, timeout: int = 5
+) -> None:
+ deadline = time.monotonic() + timeout
+ while not os.path.exists(path):
+ if time.monotonic() > deadline:
+ break
+ time.sleep(0.1)
+
+ while os.path.getsize(path) < size:
+ if time.monotonic() > deadline:
+ break
+ time.sleep(0.1)
+
+
+class MockEmuLauncher(Launcher):
+ """Launcher for mock emulator"""
+
+ def __init__(
+ self,
+ config_path: Path,
+ ):
+ super().__init__('mock-emu', config_path)
+ self._wdir: Optional[Path] = None
+ self.log = True
+
+ def _pre_start(
+ self,
+ target: str,
+ file: Optional[Path] = None,
+ pause: bool = False,
+ debug: bool = False,
+ args: Optional[str] = None,
+ ) -> List[str]:
+ channels = []
+ if self._config.get_target(['pre-start-cmds']):
+ self._handles.add_channel_tcp('test_subst_tcp', 'localhost', 1234)
+ self._handles.add_channel_pty('test_subst_pty', 'pty-path')
+ if self._config.get_emu(['gdb_channel']):
+ channels += ['--tcp-channel', 'gdb']
+ if self._config.get_emu(['tcp_channel']):
+ channels += ['--tcp-channel', 'tcp']
+ if self._config.get_emu(['pty_channel']):
+ channels += ['--pty-channel', 'pty']
+ if len(channels) > 0:
+ channels += ['--working-dir', str(self._wdir)]
+ return _mock_emu + channels + ['starting mock emulator']
+
+ def _post_start(self) -> None:
+ if not self._wdir:
+ return
+
+ if self._config.get_emu(['gdb_channel']):
+ path = os.path.join(self._wdir, 'gdb')
+ wait_for_file_size(path, 5, 5)
+ with open(path, 'r') as file:
+ port = int(file.read())
+ self._handles.add_channel_tcp('gdb', 'localhost', port)
+
+ if self._config.get_emu(['tcp_channel']):
+ path = os.path.join(self._wdir, 'tcp')
+ wait_for_file_size(path, 5, 5)
+ with open(path, 'r') as file:
+ port = int(file.read())
+ self._handles.add_channel_tcp('tcp', 'localhost', port)
+
+ if self._config.get_emu(['pty_channel']):
+ path = os.path.join(self._wdir, 'pty')
+ wait_for_file_size(path, 5, 5)
+ with open(path, 'r') as file:
+ pty_path = file.read()
+ self._handles.add_channel_pty('pty', pty_path)
+
+ def _get_connector(self, wdir: Path) -> Connector:
+ return MockEmuConnector(wdir)
+
+
+class MockEmuConnector(Connector):
+ """Connector for mock emulator"""
+
+ _props = {
+ 'path1': {
+ 'prop1': 'val1',
+ }
+ }
+
+ def reset(self) -> None:
+ Path(os.path.join(self._wdir, 'reset')).touch()
+
+ def cont(self) -> None:
+ Path(os.path.join(self._wdir, 'cont')).touch()
+
+ def list_properties(self, path: str) -> List[Any]:
+ try:
+ return list(self._props[path].keys())
+ except KeyError:
+ raise InvalidPropertyPath(path)
+
+ def set_property(self, path: str, prop: str, value: str) -> None:
+ if not self._props.get(path):
+ raise InvalidPropertyPath(path)
+ if not self._props[path].get(prop):
+ raise InvalidProperty(path, prop)
+ self._props[path][prop] = value
+ with open(os.path.join(self._wdir, 'props.json'), 'w') as file:
+ json.dump(self._props, file)
+
+ def get_property(self, path: str, prop: str) -> Any:
+ try:
+ with open(os.path.join(self._wdir, 'props.json'), 'r') as file:
+ self._props = json.load(file)
+ except OSError:
+ pass
+ if not self._props.get(path):
+ raise InvalidPropertyPath(path)
+ if not self._props[path].get(prop):
+ raise InvalidProperty(path, prop)
+ return self._props[path][prop]
diff --git a/pw_emu/py/pw_emu/__init__.py b/pw_emu/py/pw_emu/__init__.py
new file mode 100644
index 000000000..0f2cb9104
--- /dev/null
+++ b/pw_emu/py/pw_emu/__init__.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
diff --git a/pw_emu/py/pw_emu/__main__.py b/pw_emu/py/pw_emu/__main__.py
new file mode 100644
index 000000000..253edd931
--- /dev/null
+++ b/pw_emu/py/pw_emu/__main__.py
@@ -0,0 +1,448 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Command line interface for the Pigweed emulators frontend"""
+
+import argparse
+import json
+import os
+from pathlib import Path
+import signal
+import subprocess
+import sys
+import threading
+
+from typing import Any
+
+from pw_emu.core import Error
+from pw_emu.frontend import Emulator
+from serial import serial_for_url, SerialException
+from serial.tools.miniterm import Miniterm, key_description
+
+_TERM_CMD = ['python', '-m', 'serial', '--raw']
+
+
+def _cmd_gdb_cmds(emu, args: argparse.Namespace) -> None:
+ """Run gdb commands in batch mode."""
+
+ emu.run_gdb_cmds(args.gdb_cmd, executable=args.executable, pause=args.pause)
+
+
+def _cmd_load(emu: Emulator, args: argparse.Namespace) -> None:
+ """Load an executable image via gdb start executing it if pause is
+ not set"""
+
+ args.gdb_cmd = ['load']
+ _cmd_gdb_cmds(emu, args)
+
+
+def _cmd_start(emu: Emulator, args: argparse.Namespace) -> None:
+ """Launch the emulator and start executing, unless pause is set."""
+
+ if args.runner:
+ emu.set_emu(args.runner)
+
+ emu.start(
+ target=args.target,
+ file=args.file,
+ pause=args.pause,
+ args=args.args,
+ debug=args.debug,
+ foreground=args.foreground,
+ )
+
+
+def _get_miniterm(emu: Emulator, chan: str) -> Miniterm:
+ chan_type = emu.get_channel_type(chan)
+ if chan_type == 'tcp':
+ host, port = emu.get_channel_addr(chan)
+ url = f'socket://[{host}]:{port}'
+ elif chan_type == 'pty':
+ url = emu.get_channel_path(chan)
+ else:
+ raise Error(f'unknown channel type `{chan_type}`')
+ ser = serial_for_url(url)
+ ser.timeout = 1
+ miniterm = Miniterm(ser)
+ miniterm.raw = True
+ miniterm.set_tx_encoding('UTF-8')
+ miniterm.set_rx_encoding('UTF-8')
+
+ quit_key = key_description(miniterm.exit_character)
+ menu_key = key_description(miniterm.menu_character)
+ help_key = key_description('\x08')
+ help_desc = f'Help: {menu_key} followed by {help_key} ---'
+
+ print(f'--- Miniterm on {chan} ---')
+ print(f'--- Quit: {quit_key} | Menu: {menu_key} | {help_desc}')
+
+ # On POSIX systems miniterm uses TIOCSTI to "cancel" the TX thread
+ # (reading from the console, sending to the serial) which is
+ # disabled on Linux kernels > 6.2 see
+ # https://github.com/pyserial/pyserial/issues/243
+ #
+ # On Windows the cancel method does not seem to work either with
+ # recent win10 versions.
+ #
+ # Workaround by terminating the process for exceptions in the read
+ # and write threads.
+ threading.excepthook = lambda args: signal.raise_signal(signal.SIGTERM)
+
+ return miniterm
+
+
+def _cmd_run(emu: Emulator, args: argparse.Namespace) -> None:
+ """Start the emulator and connect the terminal to a channel. Stop
+ the emulator when exiting the terminal"""
+
+ emu.start(
+ target=args.target,
+ file=args.file,
+ pause=True,
+ args=args.args,
+ )
+
+ ctrl_chans = ['gdb', 'monitor', 'qmp', 'robot']
+ if not args.channel:
+ for chan in emu.get_channels():
+ if chan not in ctrl_chans:
+ args.channel = chan
+ break
+ if not args.channel:
+ raise Error(f'only control channels {ctrl_chans} found')
+
+ try:
+ miniterm = _get_miniterm(emu, args.channel)
+ emu.cont()
+ miniterm.start()
+ miniterm.join(True)
+ print('--- exit ---')
+ miniterm.stop()
+ miniterm.join()
+ miniterm.close()
+ except SerialException as err:
+ raise Error(f'error connecting to channel `{args.channel}`: {err}')
+ finally:
+ emu.stop()
+
+
+def _cmd_restart(emu: Emulator, args: argparse.Namespace) -> None:
+ """Restart the emulator and start executing, unless pause is set."""
+
+ if emu.running():
+ emu.stop()
+ _cmd_start(emu, args)
+
+
+def _cmd_stop(emu: Emulator, _args: argparse.Namespace) -> None:
+ """Stop the emulator"""
+
+ emu.stop()
+
+
+def _cmd_reset(emu: Emulator, _args: argparse.Namespace) -> None:
+ """Perform a software reset."""
+
+ emu.reset()
+
+
+def _cmd_gdb(emu: Emulator, args: argparse.Namespace) -> None:
+ """Start a gdb interactive session"""
+
+ executable = args.executable if args.executable else ""
+
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ try:
+ cmd = emu.get_gdb_cmd() + [
+ '-ex',
+ f'target remote {emu.get_gdb_remote()}',
+ executable,
+ ]
+ subprocess.run(cmd)
+ finally:
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+
+def _cmd_prop_ls(emu: Emulator, args: argparse.Namespace) -> None:
+ """List emulator object properties."""
+
+ props = emu.list_properties(args.path)
+ print(json.dumps(props, indent=4))
+
+
+def _cmd_prop_get(emu: Emulator, args: argparse.Namespace) -> None:
+ """Show the emulator's object properties."""
+
+ print(emu.get_property(args.path, args.property))
+
+
+def _cmd_prop_set(emu: Emulator, args: argparse.Namespace) -> None:
+ """Set emulator's object properties."""
+
+ emu.set_property(args.path, args.property, args.value)
+
+
+def _cmd_term(emu: Emulator, args: argparse.Namespace) -> None:
+ """Connect with an interactive terminal to an emulator channel"""
+
+ try:
+ miniterm = _get_miniterm(emu, args.channel)
+ miniterm.start()
+ miniterm.join(True)
+ print('--- exit ---')
+ miniterm.stop()
+ miniterm.join()
+ miniterm.close()
+ except SerialException as err:
+ raise Error(f'error connecting to channel `{args.channel}`: {err}')
+
+
+def _cmd_resume(emu: Emulator, _args: argparse.Namespace) -> None:
+ """Resume the execution of a paused emulator."""
+
+ emu.cont()
+
+
+def get_parser() -> argparse.ArgumentParser:
+ """Command line parser"""
+
+ parser = argparse.ArgumentParser(
+ description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+ parser.add_argument(
+ '-i',
+ '--instance',
+ help='instance to use (default: %(default)s)',
+ type=str,
+ metavar='STRING',
+ default='default',
+ )
+ parser.add_argument(
+ '-C',
+ '--working-dir',
+ help='path to working directory (default: %(default)s)',
+ type=Path,
+ default=os.getenv('PW_EMU_WDIR'),
+ )
+ parser.add_argument(
+ '-c',
+ '--config',
+ help='path config file (default: %(default)s)',
+ type=str,
+ default=None,
+ )
+
+ subparsers = parser.add_subparsers(dest='command', required=True)
+
+ def add_cmd(name: str, func: Any) -> argparse.ArgumentParser:
+ subparser = subparsers.add_parser(
+ name, description=func.__doc__, help=func.__doc__
+ )
+ subparser.set_defaults(func=func)
+ return subparser
+
+ start = add_cmd('start', _cmd_start)
+ restart = add_cmd('restart', _cmd_restart)
+
+ for subparser in [start, restart]:
+ subparser.add_argument(
+ 'target',
+ type=str,
+ )
+ subparser.add_argument(
+ '--file',
+ '-f',
+ metavar='FILE',
+ help='file to load before starting',
+ )
+ subparser.add_argument(
+ '--runner',
+ '-r',
+ help='emulator to use, automatically detected if not set',
+ choices=[None, 'qemu', 'renode'],
+ default=None,
+ )
+ subparser.add_argument(
+ '--args',
+ '-a',
+ help='options to pass to the emulator',
+ )
+ subparser.add_argument(
+ '--pause',
+ '-p',
+ action='store_true',
+ help='pause the emulator after starting it',
+ )
+ subparser.add_argument(
+ '--debug',
+ '-d',
+ action='store_true',
+ help='start the emulator in debug mode',
+ )
+ subparser.add_argument(
+ '--foreground',
+ '-F',
+ action='store_true',
+ help='start the emulator in foreground mode',
+ )
+
+ run = add_cmd('run', _cmd_run)
+ run.add_argument(
+ 'target',
+ type=str,
+ )
+ run.add_argument(
+ 'file',
+ metavar='FILE',
+ help='file to load before starting',
+ )
+ run.add_argument(
+ '--args',
+ '-a',
+ help='options to pass to the emulator',
+ )
+ run.add_argument(
+ '--channel',
+ '-n',
+ help='channel to connect the terminal to',
+ )
+
+ stop = add_cmd('stop', _cmd_stop)
+
+ load = add_cmd('load', _cmd_load)
+ load.add_argument(
+ 'executable',
+ metavar='FILE',
+ help='file to load via gdb',
+ )
+ load.add_argument(
+ '--pause',
+ '-p',
+ help='pause the emulator after loading the file',
+ action='store_true',
+ )
+ load.add_argument(
+ '--offset',
+ '-o',
+ metavar='ADDRESS',
+ help='address to load the file at',
+ )
+
+ reset = add_cmd('reset', _cmd_reset)
+
+ gdb = add_cmd('gdb', _cmd_gdb)
+ gdb.add_argument(
+ '--executable',
+ '-e',
+ metavar='FILE',
+ help='file to use for the debugging session',
+ )
+
+ prop_ls = add_cmd('prop-ls', _cmd_prop_ls)
+ prop_ls.add_argument(
+ 'path',
+ help='path of the emulator object',
+ )
+
+ prop_get = add_cmd('prop-get', _cmd_prop_get)
+ prop_get.add_argument(
+ 'path',
+ help='path of the emulator object',
+ )
+ prop_get.add_argument(
+ 'property',
+ help='name of the object property',
+ )
+
+ prop_set = add_cmd('prop-set', _cmd_prop_set)
+ prop_set.add_argument(
+ 'path',
+ help='path of the emulator object',
+ )
+ prop_set.add_argument(
+ 'property',
+ help='name of the object property',
+ )
+ prop_set.add_argument(
+ 'value',
+ help='value to set for the object property',
+ )
+
+ gdb_cmds = add_cmd('gdb-cmds', _cmd_gdb_cmds)
+ gdb_cmds.add_argument(
+ '--pause',
+ '-p',
+ help='do not resume execution after running the commands',
+ action='store_true',
+ )
+ gdb_cmds.add_argument(
+ '--executable',
+ '-e',
+ metavar='FILE',
+ help='executable to use while running the gdb commands',
+ )
+ gdb_cmds.add_argument(
+ 'gdb_cmd',
+ nargs='+',
+ help='gdb command to execute',
+ )
+
+ term = add_cmd('term', _cmd_term)
+ term.add_argument(
+ 'channel',
+ help='channel name',
+ )
+
+ resume = add_cmd('resume', _cmd_resume)
+
+ parser.epilog = f"""commands usage:
+ {start.format_usage().strip()}
+ {restart.format_usage().strip()}
+ {stop.format_usage().strip()}
+ {run.format_usage().strip()}
+ {load.format_usage().strip()}
+ {reset.format_usage().strip()}
+ {gdb.format_usage().strip()}
+ {prop_ls.format_usage().strip()}
+ {prop_get.format_usage().strip()}
+ {prop_set.format_usage().strip()}
+ {gdb_cmds.format_usage().strip()}
+ {term.format_usage().strip()}
+ {resume.format_usage().strip()}
+ """
+
+ return parser
+
+
+def main() -> int:
+ """Emulators frontend command line interface."""
+
+ args = get_parser().parse_args()
+ if not args.working_dir:
+ args.working_dir = (
+ f'{os.getenv("PW_PROJECT_ROOT")}/.pw_emu/{args.instance}'
+ )
+
+ try:
+ emu = Emulator(args.working_dir, args.config)
+ args.func(emu, args)
+ except Error as err:
+ print(err)
+ return 1
+
+ return 0
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/pw_emu/py/pw_emu/core.py b/pw_emu/py/pw_emu/core.py
new file mode 100644
index 000000000..1e5c2ea9f
--- /dev/null
+++ b/pw_emu/py/pw_emu/core.py
@@ -0,0 +1,956 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Infrastructure used by the user interface or specific emulators."""
+
+import io
+import json
+import logging
+import os
+import re
+import socket
+import subprocess
+import sys
+import time
+
+from abc import ABC, abstractmethod
+from importlib import import_module
+from pathlib import Path
+from typing import Optional, Dict, List, Union, Any, Type
+
+import psutil # type: ignore
+
+from pw_emu.pigweed_emulators import pigweed_emulators
+from pw_env_setup.config_file import load as pw_config_load
+from pw_env_setup.config_file import path as pw_config_path
+from serial import Serial
+
+
+_LAUNCHER_LOG = logging.getLogger('pw_qemu.core.launcher')
+
+
+def _stop_process(pid: int) -> None:
+ """Gracefully stop a running process."""
+
+ try:
+ proc = psutil.Process(pid)
+ proc.terminate()
+ try:
+ proc.wait(timeout=5)
+ except psutil.TimeoutExpired:
+ proc.kill()
+ except psutil.NoSuchProcess:
+ pass
+
+
+def _get_class(name: str) -> type:
+ """Returns a class from a full qualified class name
+ (e.g. "package.module.Class").
+
+ """
+ try:
+ module_path, class_name = name.rsplit('.', 1)
+ module = import_module(module_path)
+ return getattr(module, class_name)
+ except (ImportError, AttributeError):
+ raise ImportError(name)
+
+
+class Error(Exception):
+ """Generic pw_emu exception."""
+
+
+class ConfigError(Error):
+ """Exception raised for configuration errors."""
+
+ def __init__(self, config: Optional[Path], err: str) -> None:
+ msg = f'{config}: {err}\n'
+ try:
+ if config:
+ with open(config, 'r') as file:
+ msg += json.dumps(json.load(file), indent=4)
+ except (OSError, json.decoder.JSONDecodeError):
+ pass
+ super().__init__(msg)
+
+
+class AlreadyRunning(Error):
+ """Exception raised if an emulator process is already running."""
+
+ def __init__(self, wdir: Path) -> None:
+ super().__init__(f'{wdir}: emulator already started')
+
+
+class NotRunning(Error):
+ """Exception raised if an emulator process is not running."""
+
+ def __init__(self, wdir: Path) -> None:
+ super().__init__(f'{wdir}: emulator not started')
+
+
+class InvalidEmulator(Error):
+ """Exception raised if an different backend is running."""
+
+ def __init__(self, emu: str) -> None:
+ super().__init__(f'invalid emulator `{emu}`')
+
+
+class InvalidTarget(Error):
+ """Exception raised if the target is invalid."""
+
+ def __init__(self, config: Path, emu: Optional[str], target: str) -> None:
+ emu_str = f'for `{emu}`' if emu else ''
+ super().__init__(f'{config}: invalid target `{target}` {emu_str}')
+
+
+class InvalidChannelName(Error):
+ """Exception raised if a channel name is invalid."""
+
+ def __init__(self, name: str, target: str, valid: str) -> None:
+ msg = f"""
+ `{name}` is not a valid device name for {target}`
+ try: {valid}
+ """
+ super().__init__(msg)
+
+
+class InvalidChannelType(Error):
+ """Exception raised if a channel type is invalid."""
+
+ def __init__(self, name: str) -> None:
+ super().__init__(f'`{name}` is not a valid channel type')
+
+
+class WrongEmulator(Error):
+ """Exception raised if an different backend is running."""
+
+ def __init__(self, exp: str, found: str) -> None:
+ super().__init__(f'wrong emulator: expected `{exp}, found {found}`')
+
+
+class RunError(Error):
+ """Exception raised when a command failed to run."""
+
+ def __init__(self, proc: str, msg: str) -> None:
+ super().__init__(f'error running `{proc}`: {msg}')
+
+
+class InvalidPropertyPath(Error):
+ """Exception raised for an invalid property path."""
+
+ def __init__(self, path: str) -> None:
+ super().__init__(f'invalid property path `{path}`')
+
+
+class InvalidProperty(Error):
+ """Exception raised for an invalid property path."""
+
+ def __init__(self, path: str, name: str) -> None:
+ super().__init__(f'invalid property `{name}` at `{path}`')
+
+
+class HandlesError(Error):
+ """Exception raised while trying to load emulator handles."""
+
+ def __init__(self, msg: str) -> None:
+ super().__init__(f'error loading handles: {msg}')
+
+
+class Handles:
+ """Running emulator handles."""
+
+ class Channel:
+ def __init__(self, chan_type: str):
+ self.type = chan_type
+
+ class PtyChannel(Channel):
+ def __init__(self, path: str):
+ super().__init__('pty')
+ self.path = path
+
+ class TcpChannel(Channel):
+ def __init__(self, host: str, port: int):
+ super().__init__('tcp')
+ self.host = host
+ self.port = port
+
+ class Proc:
+ def __init__(self, pid: int):
+ self.pid = pid
+
+ @staticmethod
+ def _ser_obj(obj) -> Any:
+ if isinstance(obj, dict):
+ data = {}
+ for key, val in obj.items():
+ data[key] = Handles._ser_obj(val)
+ return data
+ if hasattr(obj, "__iter__") and not isinstance(obj, str):
+ return [Handles._ser_obj(item) for item in obj]
+ if hasattr(obj, "__dict__"):
+ return Handles._ser_obj(obj.__dict__)
+ return obj
+
+ def _serialize(self):
+ return Handles._ser_obj(self)
+
+ def save(self, wdir: Path) -> None:
+ """Saves handles to the given working directory."""
+
+ with open(os.path.join(wdir, 'handles.json'), 'w') as file:
+ json.dump(self._serialize(), file)
+
+ @staticmethod
+ def load(wdir: Path):
+ try:
+ with open(os.path.join(wdir, 'handles.json'), 'r') as file:
+ data = json.load(file)
+ except (KeyError, OSError, json.decoder.JSONDecodeError):
+ raise NotRunning(wdir)
+
+ handles = Handles(data['emu'], data['config'])
+ handles.set_target(data['target'])
+ gdb_cmd = data.get('gdb_cmd')
+ if gdb_cmd:
+ handles.set_gdb_cmd(gdb_cmd)
+ for name, chan in data['channels'].items():
+ chan_type = chan['type']
+ if chan_type == 'tcp':
+ handles.add_channel_tcp(name, chan['host'], chan['port'])
+ elif chan_type == 'pty':
+ handles.add_channel_pty(name, chan['path'])
+ else:
+ raise InvalidChannelType(chan_type)
+ for name, proc in data['procs'].items():
+ handles.add_proc(name, proc['pid'])
+ return handles
+
+ def __init__(self, emu: str, config: str) -> None:
+ self.emu = emu
+ self.config = config
+ self.gdb_cmd: List[str] = []
+ self.target = ''
+ self.channels: Dict[str, Handles.Channel] = {}
+ self.procs: Dict[str, Handles.Proc] = {}
+
+ def add_channel_tcp(self, name: str, host: str, port: int) -> None:
+ """Add a TCP channel."""
+
+ self.channels[name] = self.TcpChannel(host, port)
+
+ def add_channel_pty(self, name: str, path: str) -> None:
+ """Add a pty channel."""
+
+ self.channels[name] = self.PtyChannel(path)
+
+ def add_proc(self, name: str, pid: int) -> None:
+ """Add a pid."""
+
+ self.procs[name] = self.Proc(pid)
+
+ def set_target(self, target: str) -> None:
+ """Set the target."""
+
+ self.target = target
+
+ def set_gdb_cmd(self, cmd: List[str]) -> None:
+ """Set the gdb command."""
+
+ self.gdb_cmd = cmd.copy()
+
+
+def _stop_processes(handles: Handles, wdir: Path) -> None:
+ """Stop all processes for a (partially) running emulator instance.
+
+ Remove pid files as well.
+ """
+
+ for _, proc in handles.procs.items():
+ _stop_process(proc.pid)
+ path = os.path.join(wdir, f'{proc}.pid')
+ if os.path.exists(path):
+ os.unlink(path)
+
+
+class Config:
+ """Get and validate options from the configuration file."""
+
+ def __init__(
+ self,
+ config_path: Optional[Path] = None,
+ target: Optional[str] = None,
+ emu: Optional[str] = None,
+ ) -> None:
+ """Load the emulator configuration.
+
+ If no configuration file path is given, the root project
+ configuration is used.
+
+ This method set ups the generic configuration (e.g. gdb).
+
+ It loads emulator target files and gathers them under the 'targets' key
+ for each emulator backend. The 'targets' settings in the configuration
+ file takes precedence over the loaded target files.
+
+ """
+ try:
+ if config_path:
+ with open(config_path, 'r') as file:
+ config = json.load(file)['pw']['pw_emu']
+ else:
+ config_path = pw_config_path()
+ config = pw_config_load()['pw']['pw_emu']
+ except KeyError:
+ raise ConfigError(config_path, 'missing `pw_emu` configuration')
+
+ if not config_path:
+ raise ConfigError(None, 'unable to deterine config path')
+
+ if config.get('target_files'):
+ tmp = {}
+ for path in config['target_files']:
+ if not os.path.isabs(path):
+ path = os.path.join(os.path.dirname(config_path), path)
+ with open(path, 'r') as file:
+ tmp.update(json.load(file).get('targets'))
+ if config.get('targets'):
+ tmp.update(config['targets'])
+ config['targets'] = tmp
+
+ self.path = config_path
+ self._config = {'emulators': pigweed_emulators}
+ self._config.update(config)
+ self._emu = emu
+ self._target = target
+
+ def set_target(self, target: str) -> None:
+ """Sets the current target.
+
+ The current target is used by the get_target method.
+
+ """
+
+ self._target = target
+ try:
+ self.get(['targets', target], optional=False, entry_type=dict)
+ except ConfigError:
+ raise InvalidTarget(self.path, self._emu, self._target)
+
+ def get_targets(self) -> List[str]:
+ return list(self.get(['targets'], entry_type=dict).keys())
+
+ def _subst(self, string: str) -> str:
+ """Substitutes $pw_<subst_type>{arg} statements."""
+
+ match = re.search(r'\$pw_([^{]+){([^}]+)}', string)
+ if not match:
+ return string
+
+ subst_type = match.group(1)
+ arg = match.group(2)
+
+ if subst_type == 'env':
+ value = os.environ.get(arg)
+ if value is None:
+ msg = f'Environment variable `{arg}` not set'
+ raise ConfigError(self.path, msg)
+ return string.replace(f'$pw_{subst_type}{{{arg}}}', value)
+
+ raise ConfigError(self.path, f'Invalid substitution type: {subst_type}')
+
+ def _subst_list(self, items: List[Any]) -> List[Any]:
+ new_list = []
+ for item in items:
+ if isinstance(item, str):
+ new_list.append(self._subst(item))
+ else:
+ new_list.append(item)
+ return new_list
+
+ def get(
+ self,
+ keys: List[str],
+ optional: bool = True,
+ entry_type: Optional[Type] = None,
+ ) -> Any:
+ """Get a config entry.
+
+ keys is a list of string that identifies the config entry, e.g.
+ ['targets', 'test-target'] is going to look in the config dicionary for
+ ['targets']['test-target'].
+
+ If the option is not found and optional is True it returns None if
+ entry_type is none or a new (empty) object of type entry_type.
+
+ If the option is not found an optional is False it raises ConfigError.
+
+ If entry_type is not None it will check the option to be of
+ that type. If it is not it will raise ConfigError.
+
+ """
+
+ keys_str = ': '.join(keys)
+ entry: Optional[Dict[str, Any]] = self._config
+
+ for key in keys:
+ if not isinstance(entry, dict):
+ if optional:
+ if entry_type:
+ return entry_type()
+ return None
+ raise ConfigError(self.path, f'{keys_str}: not found')
+ entry = entry.get(key)
+
+ if entry is None:
+ if optional:
+ if entry_type:
+ return entry_type()
+ return None
+ raise ConfigError(self.path, f'{keys_str}: not found')
+
+ if entry_type and not isinstance(entry, entry_type):
+ msg = f'{keys_str}: expected entry of type `{entry_type}`'
+ raise ConfigError(self.path, msg)
+
+ if isinstance(entry, str):
+ entry = self._subst(entry)
+ elif isinstance(entry, list):
+ entry = self._subst_list(entry)
+
+ return entry
+
+ def get_target(
+ self,
+ keys: List[str],
+ optional: bool = True,
+ entry_type: Optional[Type] = None,
+ ) -> Any:
+ """Get a config option starting at ['targets'][target]."""
+
+ if not self._target:
+ raise Error('target not set')
+ return self.get(['targets', self._target] + keys, optional, entry_type)
+
+ def get_emu(
+ self,
+ keys: List[str],
+ optional: bool = True,
+ entry_type: Optional[Type] = None,
+ ) -> Any:
+ """Get a config option starting at [emu]."""
+
+ if not self._emu:
+ raise Error('emu not set')
+ return self.get([self._emu] + keys, optional, entry_type)
+
+ def get_target_emu(
+ self,
+ keys: List[str],
+ optional: bool = True,
+ entry_type: Optional[Type] = None,
+ ) -> Any:
+ """Get a config option starting at ['targets'][target][emu]."""
+
+ if not self._emu or not self._target:
+ raise Error('emu or target not set')
+ return self.get(
+ ['targets', self._target, self._emu] + keys, optional, entry_type
+ )
+
+
+class Connector(ABC):
+ """Interface between a running emulator and the user visible APIs."""
+
+ def __init__(self, wdir: Path) -> None:
+ self._wdir = wdir
+ self._handles = Handles.load(wdir)
+ self._channels = self._handles.channels
+ self._target = self._handles.target
+
+ @staticmethod
+ def get(wdir: Path) -> Any:
+ """Return a connector instace for a given emulator type."""
+ handles = Handles.load(wdir)
+ config = Config(handles.config)
+ emu = handles.emu
+ try:
+ name = config.get(['emulators', emu, 'connector'])
+ cls = _get_class(name)
+ except (ConfigError, ImportError):
+ raise InvalidEmulator(emu)
+ return cls(wdir)
+
+ def get_emu(self) -> str:
+ """Returns the emulator type."""
+
+ return self._handles.emu
+
+ def get_gdb_cmd(self) -> List[str]:
+ """Returns the configured gdb command."""
+ return self._handles.gdb_cmd
+
+ def get_config_path(self) -> Path:
+ """Returns the configuration path."""
+
+ return self._handles.config
+
+ def get_procs(self) -> Dict[str, Handles.Proc]:
+ """Returns the running processes indexed by the process name."""
+
+ return self._handles.procs
+
+ def get_channel_type(self, name: str) -> str:
+ """Returns the channel type."""
+
+ try:
+ return self._channels[name].type
+ except KeyError:
+ channels = ' '.join(self._channels.keys())
+ raise InvalidChannelName(name, self._target, channels)
+
+ def get_channel_path(self, name: str) -> str:
+ """Returns the channel path. Raises InvalidChannelType if this
+ is not a pty channel.
+
+ """
+
+ try:
+ if self._channels[name].type != 'pty':
+ raise InvalidChannelType(self._channels[name].type)
+ return self._channels[name].path
+ except KeyError:
+ raise InvalidChannelName(name, self._target, self._channels.keys())
+
+ def get_channel_addr(self, name: str) -> tuple:
+ """Returns a pair of (host, port) for the channel. Raises
+ InvalidChannelType if this is not a tcp channel.
+
+ """
+
+ try:
+ if self._channels[name].type != 'tcp':
+ raise InvalidChannelType(self._channels[name].type)
+ return (self._channels[name].host, self._channels[name].port)
+ except KeyError:
+ raise InvalidChannelName(name, self._target, self._channels.keys())
+
+ def get_channel_stream(
+ self,
+ name: str,
+ timeout: Optional[float] = None,
+ ) -> io.RawIOBase:
+ """Returns a file object for a given host exposed device.
+
+ If timeout is None than reads and writes are blocking. If
+ timeout is zero the stream is operating in non-blocking
+ mode. Otherwise read and write will timeout after the given
+ value.
+
+ """
+
+ chan_type = self.get_channel_type(name)
+ if chan_type == 'tcp':
+ host, port = self.get_channel_addr(name)
+ if ':' in host:
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ else:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((host, port))
+ sock.settimeout(timeout)
+ ret = sock.makefile('rwb', buffering=0)
+ sock.close()
+ return ret
+ if chan_type == 'pty':
+ ser = Serial(self.get_channel_path(name))
+ ser.timeout = timeout
+ return ser
+ raise InvalidChannelType(chan_type)
+
+ def get_channels(self) -> List[str]:
+ return self._handles.channels.keys()
+
+ def stop(self) -> None:
+ """Stop the emulator."""
+
+ _stop_processes(self._handles, self._wdir)
+
+ try:
+ os.unlink(os.path.join(self._wdir, 'handles.json'))
+ except OSError:
+ pass
+
+ def proc_running(self, proc: str) -> bool:
+ try:
+ return psutil.pid_exists(self._handles.procs[proc].pid)
+ except (NotRunning, KeyError):
+ return False
+
+ def running(self) -> bool:
+ """Check if the main emulator process is already running."""
+
+ try:
+ return psutil.pid_exists(self._handles.procs[self._handles.emu].pid)
+ except (NotRunning, KeyError):
+ return False
+
+ @abstractmethod
+ def reset(self) -> None:
+ """Perform a software reset."""
+
+ @abstractmethod
+ def cont(self) -> None:
+ """Resume the emulator's execution."""
+
+ @abstractmethod
+ def list_properties(self, path: str) -> List[Any]:
+ """Returns the property list for an emulator object."""
+
+ @abstractmethod
+ def set_property(self, path: str, prop: str, value: str) -> None:
+ """Sets the value of an emulator's object property."""
+
+ @abstractmethod
+ def get_property(self, path: str, prop: str) -> Any:
+ """Returns the value of an emulator's object property."""
+
+
+class Launcher(ABC):
+ """Starts an emulator based on the target and configuration file."""
+
+ def __init__(
+ self,
+ emu: str,
+ config_path: Optional[Path] = None,
+ ) -> None:
+ """Initializes a Launcher instance."""
+
+ self._wdir: Optional[Path] = None
+ """Working directory"""
+
+ self._emu = emu
+ """Emulator type (e.g. "qemu", "renode")."""
+
+ self._target: Optional[str] = None
+ """Target, initialized to None and set with _prep_start."""
+
+ self._config = Config(config_path, emu=emu)
+ """Global, emulator and target configuration."""
+
+ self._handles = Handles(self._emu, str(self._config.path))
+ """Handles for processes, channels, etc."""
+
+ gdb_cmd = self._config.get(['gdb'], entry_type=list)
+ if gdb_cmd:
+ self._handles.set_gdb_cmd(gdb_cmd)
+
+ @staticmethod
+ def get(emu: str, config_path: Optional[Path] = None) -> Any:
+ """Returns a launcher for a given emulator type."""
+ config = Config(config_path)
+ try:
+ name = config.get(['emulators', emu, 'launcher'])
+ cls = _get_class(name)
+ except (ConfigError, ImportError):
+ raise InvalidEmulator(str(emu))
+ return cls(config_path)
+
+ @abstractmethod
+ def _pre_start(
+ self,
+ target: str,
+ file: Optional[Path] = None,
+ pause: bool = False,
+ debug: bool = False,
+ args: Optional[str] = None,
+ ) -> List[str]:
+ """Pre start work, returns command to start the emulator.
+
+ The target and emulator configuration can be accessed through
+ :py:attr:`pw_emu.core.Launcher._config` with
+ :py:meth:`pw_emu.core.Config.get`,
+ :py:meth:`pw_emu.core.Config.get_target`,
+ :py:meth:`pw_emu.core.Config.get_emu`,
+ :py:meth:`pw_emu.core.Config.get_target_emu`.
+ """
+
+ @abstractmethod
+ def _post_start(self) -> None:
+ """Post start work, finalize emulator handles.
+
+ Perform any post start emulator initialization and finalize the emulator
+ handles information.
+
+ Typically an internal monitor channel is used to inquire information
+ about the the configured channels (e.g. TCP ports, pty paths) and
+ :py:attr:`pw_emu.core.Launcher._handles` is updated via
+ :py:meth:`pw_emu.core.Handles.add_channel_tcp`,
+ :py:meth:`pw_emu.core.Handles.add_channel_pty`, etc.
+
+ """
+
+ @abstractmethod
+ def _get_connector(self, wdir: Path) -> Connector:
+ """Get a connector for this emulator type."""
+
+ def _path(self, name: Union[Path, str]) -> Path:
+ """Returns the full path for a given emulator file."""
+ if self._wdir is None:
+ raise Error('internal error')
+ return Path(os.path.join(self._wdir, name))
+
+ def _subst_channel(self, subst_type: str, arg: str, string: str) -> str:
+ """Substitutes $pw_emu_channel_{func}{arg} statements."""
+
+ try:
+ chan = self._handles.channels[arg]
+ except KeyError:
+ return string
+
+ if subst_type == 'channel_port':
+ if not isinstance(chan, Handles.TcpChannel):
+ return string
+ return str(chan.port)
+
+ if subst_type == 'channel_host':
+ if not isinstance(chan, Handles.TcpChannel):
+ return string
+ return chan.host
+
+ if subst_type == 'channel_path':
+ if not isinstance(chan, Handles.PtyChannel):
+ return string
+ return chan.path
+
+ return string
+
+ def _subst(self, string: str) -> str:
+ """Substitutes $pw_emu_<subst_type>{arg} statements."""
+
+ match = re.search(r'\$pw_emu_([^{]+){([^}]+)}', string)
+ if not match:
+ return string
+
+ subst_type = match.group(1)
+ arg = match.group(2)
+
+ if subst_type == 'wdir':
+ if self._wdir:
+ return os.path.join(self._wdir, arg)
+ return string
+
+ if 'channel_' in subst_type:
+ return self._subst_channel(subst_type, arg, string)
+
+ return string
+
+ # pylint: disable=protected-access
+ # use os._exit after fork instead of os.exit
+ def _daemonize(
+ self,
+ name: str,
+ cmd: List[str],
+ ) -> None:
+ """Daemonize process for UNIX hosts."""
+
+ if sys.platform == 'win32':
+ raise Error('_daemonize not supported on win32')
+
+ # pylint: disable=no-member
+ # avoid pylint false positive on win32
+ pid = os.fork()
+ if pid < 0:
+ raise RunError(name, f'fork failed: {pid}')
+ if pid > 0:
+ return
+
+ path: Path = Path('/dev/null')
+ fd = os.open(path, os.O_RDONLY)
+ os.dup2(fd, sys.stdin.fileno())
+ os.close(fd)
+
+ path = self._path(f'{name}.log')
+ fd = os.open(path, os.O_WRONLY | os.O_TRUNC | os.O_CREAT)
+ os.dup2(fd, sys.stdout.fileno())
+ os.dup2(fd, sys.stderr.fileno())
+ os.close(fd)
+
+ os.setsid()
+
+ if os.fork() > 0:
+ os._exit(0)
+
+ try:
+ # Make the pid file create and pid write operations atomic to avoid
+ # races with readers.
+ with open(self._path(f'{name}.pid.tmp'), 'w') as file:
+ file.write(f'{os.getpid()}')
+ os.rename(self._path(f'{name}.pid.tmp'), self._path(f'{name}.pid'))
+ os.execvp(cmd[0], cmd)
+ finally:
+ os._exit(1)
+
+ def _start_proc(
+ self,
+ name: str,
+ cmd: List[str],
+ foreground: bool = False,
+ ) -> Union[subprocess.Popen, None]:
+ """Run the main emulator process.
+
+ The process pid is stored and can later be accessed by its name to
+ terminate it when the emulator is stopped.
+
+ If foreground is True the process run in the foreground and a
+ subprocess.Popen object is returned. Otherwise the process is started in
+ the background and None is returned.
+
+ When running in the background stdin is redirected to the NULL device
+ and stdout and stderr are redirected to a file named <name>.log which is
+ stored in the emulator's instance working directory.
+
+ """
+ for idx, item in enumerate(cmd):
+ cmd[idx] = self._subst(item)
+
+ pid_file_path = self._path(f'{name}.pid')
+ if os.path.exists(pid_file_path):
+ os.unlink(pid_file_path)
+
+ if foreground:
+ proc = subprocess.Popen(cmd)
+ self._handles.add_proc(name, proc.pid)
+ with open(pid_file_path, 'w') as file:
+ file.write(f'{proc.pid}')
+ return proc
+
+ if sys.platform == 'win32':
+ file = open(self._path(f'{name}.log'), 'w')
+ proc = subprocess.Popen(
+ cmd,
+ stdin=subprocess.DEVNULL,
+ stdout=file,
+ stderr=file,
+ creationflags=subprocess.DETACHED_PROCESS,
+ )
+ file.close()
+ with open(pid_file_path, 'w') as file:
+ file.write(f'{proc.pid}')
+ self._handles.add_proc(name, proc.pid)
+ # avoids resource warnings due to not calling wait which
+ # we don't want to do since we've started the process in
+ # the background
+ proc.returncode = 0
+ else:
+ self._daemonize(name, cmd)
+
+ # wait for the pid file to avoid double start race conditions
+ timeout = time.monotonic() + 30
+ while not os.path.exists(self._path(f'{name}.pid')):
+ time.sleep(0.1)
+ if time.monotonic() > timeout:
+ break
+ if not os.path.exists(self._path(f'{name}.pid')):
+ raise RunError(name, 'pid file timeout')
+ try:
+ with open(pid_file_path, 'r') as file:
+ pid = int(file.readline())
+ self._handles.add_proc(name, pid)
+ except (OSError, ValueError) as err:
+ raise RunError(name, str(err))
+
+ return None
+
+ def _stop_procs(self):
+ """Stop all registered processes."""
+
+ for name, proc in self._handles.procs.items():
+ _stop_process(proc.pid)
+ if os.path.exists(self._path(f'{name}.pid')):
+ os.unlink(self._path(f'{name}.pid'))
+
+ def _start_procs(self, procs_list: str) -> None:
+ """Start additional processes besides the main emulator one."""
+
+ procs = self._config.get_target([procs_list], entry_type=dict)
+ for name, cmd in procs.items():
+ self._start_proc(name, cmd)
+
+ def start(
+ self,
+ wdir: Path,
+ target: str,
+ file: Optional[Path] = None,
+ pause: bool = False,
+ debug: bool = False,
+ foreground: bool = False,
+ args: Optional[str] = None,
+ ) -> Connector:
+ """Start the emulator for the given target.
+
+ If file is set that the emulator will load the file before starting.
+
+ If pause is True the emulator is paused.
+
+ If debug is True the emulator is run in foreground with debug output
+ enabled. This is useful for seeing errors, traces, etc.
+
+ If foreground is True the emulator is run in foreground otherwise it is
+ started in daemon mode. This is useful when there is another process
+ controlling the emulator's life cycle (e.g. cuttlefish)
+
+ args are passed directly to the emulator
+
+ """
+
+ try:
+ handles = Handles.load(wdir)
+ if psutil.pid_exists(handles.procs[handles.emu].pid):
+ raise AlreadyRunning(wdir)
+ except NotRunning:
+ pass
+
+ self._wdir = wdir
+ self._target = target
+ self._config.set_target(target)
+ self._handles.set_target(target)
+ gdb_cmd = self._config.get_target(['gdb'], entry_type=list)
+ if gdb_cmd:
+ self._handles.set_gdb_cmd(gdb_cmd)
+ os.makedirs(wdir, mode=0o700, exist_ok=True)
+
+ cmd = self._pre_start(
+ target=target, file=file, pause=pause, debug=debug, args=args
+ )
+
+ if debug:
+ foreground = True
+ _LAUNCHER_LOG.setLevel(logging.DEBUG)
+
+ _LAUNCHER_LOG.debug('starting emulator with command: %s', ' '.join(cmd))
+
+ try:
+ self._start_procs('pre-start-cmds')
+ proc = self._start_proc(self._emu, cmd, foreground)
+ self._start_procs('post-start-cmds')
+ except RunError as err:
+ self._stop_procs()
+ raise err
+
+ self._post_start()
+ self._handles.save(wdir)
+
+ if proc:
+ proc.wait()
+ self._stop_procs()
+
+ return self._get_connector(self._wdir)
diff --git a/pw_emu/py/pw_emu/frontend.py b/pw_emu/py/pw_emu/frontend.py
new file mode 100644
index 000000000..8ecf7d3fe
--- /dev/null
+++ b/pw_emu/py/pw_emu/frontend.py
@@ -0,0 +1,328 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""User API"""
+
+import io
+import os
+import subprocess
+import tempfile
+
+from pathlib import Path
+from typing import Any, Optional, List, Union, Dict
+
+from pw_emu.core import (
+ AlreadyRunning,
+ Config,
+ ConfigError,
+ Connector,
+ Launcher,
+ InvalidEmulator,
+ InvalidChannelType,
+ NotRunning,
+)
+
+
+class Emulator:
+ """Launches, controls and interacts with an emulator instance."""
+
+ def __init__(self, wdir: Path, config_path: Optional[Path] = None) -> None:
+ self._wdir = wdir
+ self._config_path = config_path
+ self._connector: Optional[Connector] = None
+ self._launcher: Optional[Launcher] = None
+
+ def _get_launcher(self, target: str) -> Launcher:
+ """Returns an emulator for a given target.
+
+ If there are multiple emulators for the same target it will return
+ an arbitrary emulator launcher.
+ """
+ config = Config(self._config_path)
+ target_config = config.get(
+ ['targets', target],
+ optional=False,
+ entry_type=dict,
+ )
+ for key in target_config.keys():
+ try:
+ return Launcher.get(key, self._config_path)
+ except InvalidEmulator:
+ pass
+ raise ConfigError(
+ self._config_path,
+ f'could not determine emulator for target `{target}`',
+ )
+
+ def start(
+ self,
+ target: str,
+ file: Optional[Path] = None,
+ pause: bool = False,
+ debug: bool = False,
+ foreground: bool = False,
+ args: Optional[str] = None,
+ ) -> None:
+ """Start the emulator for the given target.
+
+ If file is set the emulator will load the file before starting.
+
+ If pause is True the emulator is paused until the debugger is
+ connected.
+
+ If debug is True the emulator is run in foreground with debug
+ output enabled. This is useful for seeing errors, traces, etc.
+
+ If foreground is True the emulator is run in foreground otherwise
+ it is started in daemon mode. This is useful when there is
+ another process controlling the emulator's life cycle
+ (e.g. cuttlefish)
+
+ args are passed directly to the emulator
+
+ """
+ if self._connector:
+ raise AlreadyRunning(self._wdir)
+
+ if self._launcher is None:
+ self._launcher = self._get_launcher(target)
+ self._connector = self._launcher.start(
+ wdir=self._wdir,
+ target=target,
+ file=file,
+ pause=pause,
+ debug=debug,
+ foreground=foreground,
+ args=args,
+ )
+
+ def _c(self) -> Connector:
+ if self._connector is None:
+ self._connector = Connector.get(self._wdir)
+ if not self.running():
+ raise NotRunning(self._wdir)
+ return self._connector
+
+ def running(self) -> bool:
+ """Check if the main emulator process is already running."""
+
+ try:
+ return self._c().running()
+ except NotRunning:
+ return False
+
+ def _path(self, name: Union[Path, str]) -> Union[Path, str]:
+ """Returns the full path for a given emulator file."""
+
+ return os.path.join(self._wdir, name)
+
+ def stop(self):
+ """Stop the emulator."""
+
+ return self._c().stop()
+
+ def get_gdb_remote(self) -> str:
+ """Return a string that can be passed to the target remote gdb
+ command.
+
+ """
+
+ chan_type = self._c().get_channel_type('gdb')
+
+ if chan_type == 'tcp':
+ host, port = self._c().get_channel_addr('gdb')
+ return f'{host}:{port}'
+
+ if chan_type == 'pty':
+ return self._c().get_channel_path('gdb')
+
+ raise InvalidChannelType(chan_type)
+
+ def get_gdb_cmd(self) -> List[str]:
+ """Returns the gdb command for current target."""
+ return self._c().get_gdb_cmd()
+
+ def run_gdb_cmds(
+ self,
+ commands: List[str],
+ executable: Optional[str] = None,
+ pause: bool = False,
+ ) -> subprocess.CompletedProcess:
+ """Connect to the target and run the given commands silently
+ in batch mode.
+
+ The executable is optional but it may be required by some gdb
+ commands.
+
+ If pause is set do not continue execution after running the
+ given commands.
+
+ """
+
+ cmd = self._c().get_gdb_cmd().copy()
+ if not cmd:
+ raise ConfigError(self._c().get_config_path(), 'gdb not configured')
+
+ cmd.append('-batch-silent')
+ cmd.append('-ex')
+ cmd.append(f'target remote {self.get_gdb_remote()}')
+ for gdb_cmd in commands:
+ cmd.append('-ex')
+ cmd.append(gdb_cmd)
+ if pause:
+ cmd.append('-ex')
+ cmd.append('disconnect')
+ if executable:
+ cmd.append(executable)
+ return subprocess.run(cmd, capture_output=True)
+
+ def reset(self) -> None:
+ """Perform a software reset."""
+ self._c().reset()
+
+ def list_properties(self, path: str) -> List[Dict]:
+ """Returns the property list for an emulator object.
+
+ The object is identified by a full path. The path is target
+ specific and the format of the path is backend specific.
+
+ qemu path example: /machine/unattached/device[10]
+
+ renode path example: sysbus.uart
+
+ """
+ return self._c().list_properties(path)
+
+ def set_property(self, path: str, prop: str, value: Any) -> None:
+ """Sets the value of an emulator's object property."""
+
+ self._c().set_property(path, prop, value)
+
+ def get_property(self, path: str, prop: str) -> Any:
+ """Returns the value of an emulator's object property."""
+
+ return self._c().get_property(path, prop)
+
+ def get_channel_type(self, name: str) -> str:
+ """Returns the channel type
+
+ Currently `pty` or `tcp` are the only supported types.
+
+ """
+
+ return self._c().get_channel_type(name)
+
+ def get_channel_path(self, name: str) -> str:
+ """Returns the channel path. Raises InvalidChannelType if this
+ is not a pty channel.
+
+ """
+
+ return self._c().get_channel_path(name)
+
+ def get_channel_addr(self, name: str) -> tuple:
+ """Returns a pair of (host, port) for the channel. Raises
+ InvalidChannelType if this is not a tcp channel.
+
+ """
+
+ return self._c().get_channel_addr(name)
+
+ def get_channel_stream(
+ self,
+ name: str,
+ timeout: Optional[float] = None,
+ ) -> io.RawIOBase:
+ """Returns a file object for a given host exposed device.
+
+ If timeout is None than reads and writes are blocking. If
+ timeout is zero the stream is operating in non-blocking
+ mode. Otherwise read and write will timeout after the given
+ value.
+
+ """
+
+ return self._c().get_channel_stream(name, timeout)
+
+ def get_channels(self) -> List[str]:
+ """Returns the list of available channels."""
+
+ return self._c().get_channels()
+
+ def set_emu(self, emu: str) -> None:
+ """Set the emulator type for this instance."""
+
+ self._launcher = Launcher.get(emu, self._config_path)
+
+ def cont(self) -> None:
+ """Resume the emulator's execution."""
+
+ self._c().cont()
+
+
+class TemporaryEmulator(Emulator):
+ """Temporary emulator instances.
+
+ Manages emulator instances that run in temporary working
+ directories. The emulator instance is stopped and the working
+ directory is cleared when the with block completes.
+
+ It also supports interoperability with the pw emu cli, i.e.
+ starting the emulator with the CLI and controlling / interacting
+ with it from the API.
+
+ Usage example:
+
+ .. code-block:: python
+
+ # programatically start and load an executable then access it
+ with TemporaryEmulator() as emu:
+ emu.start(target, file)
+ with emu.get_channel_stream(chan) as stream:
+ ...
+
+ .. code-block:: python
+
+ # or start it form the command line then access it
+ with TemporaryEmulator() as emu:
+ build.bazel(
+ ctx,
+ "run",
+ exec_path,
+ "--run_under=pw emu start <target> --file "
+ )
+ with emu.get_channel_stream(chan) as stream:
+ ...
+
+ """
+
+ def __init__(
+ self,
+ config_path: Optional[Path] = None,
+ cleanup: bool = True,
+ ) -> None:
+ self._temp = tempfile.TemporaryDirectory()
+ self._cleanup = cleanup
+ super().__init__(Path(self._temp.name), config_path)
+
+ def __enter__(self):
+ # Interoperability with pw emu cli.
+ os.environ["PW_EMU_WDIR"] = self._wdir
+ return self
+
+ def __exit__(self, exc, value, traceback) -> None:
+ self.stop()
+ del os.environ["PW_EMU_WDIR"]
+ if self._cleanup:
+ self._temp.cleanup()
diff --git a/pw_emu/py/pw_emu/pigweed_emulators.py b/pw_emu/py/pw_emu/pigweed_emulators.py
new file mode 100644
index 000000000..764ed2ace
--- /dev/null
+++ b/pw_emu/py/pw_emu/pigweed_emulators.py
@@ -0,0 +1,27 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Pigweed built-in emulators frontends."""
+
+from typing import Dict
+
+pigweed_emulators: Dict[str, Dict[str, str]] = {
+ 'qemu': {
+ 'connector': 'pw_emu.qemu.QemuConnector',
+ 'launcher': 'pw_emu.qemu.QemuLauncher',
+ },
+ 'renode': {
+ 'connector': 'pw_emu.renode.RenodeConnector',
+ 'launcher': 'pw_emu.renode.RenodeLauncher',
+ },
+}
diff --git a/pw_emu/py/pw_emu/py.typed b/pw_emu/py/pw_emu/py.typed
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/pw_emu/py/pw_emu/py.typed
diff --git a/pw_emu/py/pw_emu/qemu.py b/pw_emu/py/pw_emu/qemu.py
new file mode 100644
index 000000000..50112cda3
--- /dev/null
+++ b/pw_emu/py/pw_emu/qemu.py
@@ -0,0 +1,343 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Pigweed qemu frontend."""
+
+
+import io
+import json
+import logging
+import os
+import re
+import socket
+import sys
+
+from pathlib import Path
+from typing import Optional, Dict, List, Any
+
+from pw_emu.core import (
+ ConfigError,
+ Connector,
+ Launcher,
+ Error,
+ InvalidChannelType,
+ WrongEmulator,
+)
+
+_QMP_LOG = logging.getLogger('pw_qemu.qemu.qmp')
+
+
+class QmpError(Error):
+ """Exception for QMP errors."""
+
+ def __init__(self, err: str):
+ super().__init__(err)
+
+
+class QmpClient:
+ """Send qmp requests the server."""
+
+ def __init__(self, stream: io.RawIOBase):
+ self._stream = stream
+
+ json.loads(self._stream.readline())
+ cmd = json.dumps({'execute': 'qmp_capabilities'})
+ self._stream.write(cmd.encode('utf-8'))
+ resp = json.loads(self._stream.readline().decode('ascii'))
+ if not 'return' in resp:
+ raise QmpError(f'qmp init failed: {resp.get("error")}')
+
+ def request(self, cmd: str, args: Optional[Dict[str, Any]] = None) -> Any:
+ """Issue a command using the qmp interface.
+
+ Returns a map with the response or None if there is no
+ response for this command.
+
+ """
+
+ req: Dict[str, Any] = {'execute': cmd}
+ if args:
+ req['arguments'] = args
+ _QMP_LOG.debug(' -> {json.dumps(cmd)}')
+ self._stream.write(json.dumps(req).encode('utf-8'))
+ while True:
+ line = self._stream.readline()
+ _QMP_LOG.debug(' <- {line}')
+ resp = json.loads(line)
+ if 'error' in resp.keys():
+ raise QmpError(resp['error']['desc'])
+ if 'return' in resp.keys():
+ return resp['return']
+
+
+class QemuLauncher(Launcher):
+ """Start a new qemu process for a given target and config file."""
+
+ def __init__(self, config_path: Optional[Path] = None):
+ super().__init__('qemu', config_path)
+ self._start_cmd: List[str] = []
+ self._chardevs_id_to_name = {
+ 'compat_monitor0': 'qmp',
+ 'compat_monitor1': 'monitor',
+ 'gdb': 'gdb',
+ }
+ self._chardevs: Dict[str, Any] = {}
+ self._qmp_init_sock: Optional[socket.socket] = None
+
+ def _set_qemu_channel_tcp(self, name: str, filename: str) -> None:
+ """Parse a TCP chardev and return (host, port) tuple.
+
+ Format for the tcp chardev backend:
+
+ [disconnected|isconnected:]tcp:<host>:<port>[,<options>][ <->
+ <host>:<port>]
+
+ """
+
+ host_port: Any = filename.split(',')[0]
+ if host_port.split(':')[0] != 'tcp':
+ host_port = host_port.split(':')[2:]
+ else:
+ host_port = host_port.split(':')[1:]
+ # IPV6 hosts have :
+ host = ':'.join(host_port[0:-1])
+ port = host_port[-1]
+ self._handles.add_channel_tcp(name, host, int(port))
+
+ def _set_qemu_channel_pty(self, name: str, filename: str) -> None:
+ """Parse a PTY chardev and return the path.
+
+ Format for the pty chardev backend: pty:<path>
+ """
+
+ path = filename.split(':')[1]
+
+ self._handles.add_channel_pty(name, path)
+
+ if os.path.lexists(self._path(name)):
+ os.unlink(self._path(name))
+ os.symlink(path, self._path(name))
+
+ def _set_qemu_channel(self, name: str, filename: str) -> None:
+ """Setups a chardev channel type."""
+
+ if filename.startswith('pty'):
+ self._set_qemu_channel_pty(name, filename)
+ elif 'tcp' in filename:
+ self._set_qemu_channel_tcp(name, filename)
+
+ def _get_channels_config(self, chan: str, opt: str) -> Any:
+ val = self._config.get_emu(['channels', chan, opt])
+ if val is not None:
+ return val
+ return self._config.get_emu(['channels', opt])
+
+ def _configure_default_channels(self) -> None:
+ """Configure the default channels."""
+
+ # keep qmp first so that it gets the compat_monitor0 label
+ for chan in ['qmp', 'monitor', 'gdb']:
+ chan_type = self._get_channels_config(chan, 'type')
+ if not chan_type:
+ chan_type = 'tcp'
+ if chan_type == 'pty':
+ if sys.platform == 'win32':
+ raise InvalidChannelType(chan_type)
+ backend = 'pty'
+ elif chan_type == 'tcp':
+ backend = 'tcp:localhost:0,ipv4=on,server=on,wait=off'
+ else:
+ raise InvalidChannelType(chan_type)
+ self._start_cmd.extend([f'-{chan}', backend])
+
+ def _get_chardev_config(self, name: str, opt: str) -> Any:
+ val = self._config.get_target_emu(['channels', 'chardevs', name, opt])
+ if not val:
+ val = self._get_channels_config(name, opt)
+ return val
+
+ def _configure_serial_channels(self, serials: Dict) -> None:
+ """Create "standard" serial devices.
+
+ We can't control the serial allocation number for "standard"
+ -serial devices so fill the slots for the not needed serials
+ with null chardevs e.g. for serial3, serial1 generate the
+ following arguments, in this order:
+
+ -serial null -serial {backend} -serial null - serial {backend}
+
+ """
+
+ min_ser = sys.maxsize
+ max_ser = -1
+ for serial in serials.keys():
+ num = int(serial.split('serial')[1])
+ if num < min_ser:
+ min_ser = num
+ if num > max_ser:
+ max_ser = num
+ for i in range(min_ser, max_ser + 1):
+ if serials.get(f'serial{i}'):
+ name = serials[f'serial{i}']
+ chan_type = self._get_chardev_config(name, 'type')
+ if not chan_type:
+ chan_type = 'tcp'
+ if chan_type == 'pty':
+ backend = 'pty'
+ elif chan_type == 'tcp':
+ backend = 'tcp:localhost:0,ipv4=on,server=on,wait=off'
+ else:
+ raise InvalidChannelType(chan_type)
+ self._start_cmd.extend(['-serial', backend])
+ else:
+ self._start_cmd.extend(['-serial', 'null'])
+
+ def _configure_chardev_channels(self) -> None:
+ """Configure chardevs."""
+
+ self._chardevs = self._config.get_target_emu(
+ ['channels', 'chardevs'], True, dict
+ )
+
+ serials = {}
+ for name, config in self._chardevs.items():
+ chardev_id = config['id']
+ self._chardevs_id_to_name[chardev_id] = name
+
+ chardev_type = self._get_chardev_config(name, 'type')
+ if chardev_type is None:
+ chardev_type = 'tcp'
+
+ if chardev_type == 'pty':
+ backend = 'pty'
+ elif chardev_type == 'tcp':
+ backend = 'socket,host=localhost,port=0,server=on,wait=off'
+ else:
+ raise InvalidChannelType(chardev_type)
+
+ # serials are configured differently
+ if re.search(r'serial[0-9]*', chardev_id):
+ serials[chardev_id] = name
+ else:
+ self._start_cmd.extend(
+ ['-chardev', f'{backend},id={chardev_id}']
+ )
+
+ self._configure_serial_channels(serials)
+
+ def _pre_start(
+ self,
+ target: str,
+ file: Optional[Path] = None,
+ pause: bool = False,
+ debug: bool = False,
+ args: Optional[str] = None,
+ ) -> List[str]:
+ qemu = self._config.get_target_emu(['executable'])
+ if not qemu:
+ qemu = self._config.get_emu(['executable'], optional=False)
+ machine = self._config.get_target_emu(['machine'], optional=False)
+
+ self._start_cmd = [f'{qemu}', '-nographic', '-nodefaults']
+ self._start_cmd.extend(['-display', 'none'])
+ self._start_cmd.extend(['-machine', f'{machine}'])
+
+ try:
+ self._configure_default_channels()
+ self._configure_chardev_channels()
+ except KeyError as err:
+ raise ConfigError(self._config.path, str(err))
+
+ if pause:
+ self._start_cmd.append('-S')
+ if debug:
+ self._start_cmd.extend(['-d', 'guest_errors'])
+
+ if file:
+ self._start_cmd.extend(['-kernel', str(file)])
+
+ self._start_cmd.extend(self._config.get_emu(['args'], entry_type=list))
+ self._start_cmd.extend(
+ self._config.get_target_emu(['args'], entry_type=list)
+ )
+ if args:
+ self._start_cmd.extend(args.split(' '))
+
+ # initial/bootstrap qmp connection
+ self._qmp_init_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._qmp_init_sock.bind(('localhost', 0))
+ port = self._qmp_init_sock.getsockname()[1]
+ self._qmp_init_sock.listen()
+ self._qmp_init_sock.settimeout(30)
+ self._start_cmd.extend(['-qmp', f'tcp:localhost:{port}'])
+
+ return self._start_cmd
+
+ def _post_start(self) -> None:
+ assert self._qmp_init_sock is not None
+ conn, _ = self._qmp_init_sock.accept()
+ self._qmp_init_sock.close()
+ qmp = QmpClient(conn.makefile('rwb', buffering=0))
+ conn.close()
+
+ resp = qmp.request('query-chardev')
+ for chardev in resp:
+ label = chardev['label']
+ name = self._chardevs_id_to_name.get(label)
+ if name:
+ self._set_qemu_channel(name, chardev['filename'])
+
+ def _get_connector(self, wdir: Path) -> Connector:
+ return QemuConnector(wdir)
+
+
+class QemuConnector(Connector):
+ """qemu implementation for the emulator specific connector methods."""
+
+ def __init__(self, wdir: Path) -> None:
+ super().__init__(wdir)
+ if self.get_emu() != 'qemu':
+ raise WrongEmulator('qemu', self.get_emu())
+ self._qmp: Optional[QmpClient] = None
+
+ def _q(self) -> QmpClient:
+ if not self._qmp:
+ self._qmp = QmpClient(self.get_channel_stream('qmp'))
+ return self._qmp
+
+ def reset(self) -> None:
+ self._q().request('system_reset')
+
+ def cont(self) -> None:
+ self._q().request('cont')
+
+ def set_property(self, path: str, prop: str, value: Any) -> None:
+ args = {
+ 'path': '{}'.format(path),
+ 'property': prop,
+ 'value': value,
+ }
+ self._q().request('qom-set', args)
+
+ def get_property(self, path: str, prop: str) -> Any:
+ args = {
+ 'path': '{}'.format(path),
+ 'property': prop,
+ }
+ return self._q().request('qom-get', args)
+
+ def list_properties(self, path: str) -> List[Any]:
+ args = {
+ 'path': '{}'.format(path),
+ }
+ return self._q().request('qom-list', args)
diff --git a/pw_emu/py/pw_emu/renode.py b/pw_emu/py/pw_emu/renode.py
new file mode 100644
index 000000000..1e3ccb548
--- /dev/null
+++ b/pw_emu/py/pw_emu/renode.py
@@ -0,0 +1,209 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Pigweed renode frontend."""
+
+import socket
+import time
+import xmlrpc.client
+
+from pathlib import Path
+from typing import Optional, List, Any
+
+from pw_emu.core import (
+ Connector,
+ Handles,
+ InvalidChannelType,
+ Launcher,
+ Error,
+ WrongEmulator,
+)
+
+
+class RenodeRobotError(Error):
+ """Exception for Renode robot errors."""
+
+ def __init__(self, err: str):
+ super().__init__(err)
+
+
+class RenodeLauncher(Launcher):
+ """Start a new renode process for a given target and config file."""
+
+ def __init__(self, config_path: Optional[Path] = None):
+ super().__init__('renode', config_path)
+ self._start_cmd: List[str] = []
+
+ @staticmethod
+ def _allocate_port() -> int:
+ """Allocate renode ports.
+
+ This is inherently racy but renode currently does not have proper
+ support for dynamic ports. It accecept 0 as a port and the OS allocates
+ a dynamic port but there is no API to retrive the port.
+
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.bind(('localhost', 0))
+ port = sock.getsockname()[1]
+ sock.close()
+
+ return port
+
+ def _pre_start(
+ self,
+ target: str,
+ file: Optional[Path] = None,
+ pause: bool = False,
+ debug: bool = False,
+ args: Optional[str] = None,
+ ) -> List[str]:
+ renode = self._config.get_target_emu(['executable'])
+ if not renode:
+ renode = self._config.get_emu(['executable'], optional=False)
+
+ self._start_cmd.extend([f'{renode}', '--disable-xwt'])
+ port = self._allocate_port()
+ self._start_cmd.extend(['--robot-server-port', str(port)])
+ self._handles.add_channel_tcp('robot', 'localhost', port)
+
+ machine = self._config.get_target_emu(['machine'], optional=False)
+ self._start_cmd.extend(['--execute', f'mach add "{target}"'])
+
+ self._start_cmd.extend(
+ ['--execute', f'machine LoadPlatformDescription @{machine}']
+ )
+
+ terms = self._config.get_target_emu(
+ ['channels', 'terminals'], entry_type=dict
+ )
+ for name in terms.keys():
+ port = self._allocate_port()
+ dev_path = self._config.get_target_emu(
+ ['channels', 'terminals', name, 'device-path'],
+ optional=False,
+ entry_type=str,
+ )
+ term_type = self._config.get_target_emu(
+ ['channels', 'terminals', name, 'type'],
+ entry_type=str,
+ )
+ if not term_type:
+ term_type = self._config.get_emu(
+ ['channels', 'terminals', 'type'],
+ entry_type=str,
+ )
+ if not term_type:
+ term_type = 'tcp'
+
+ cmd = 'emulation '
+ if term_type == 'tcp':
+ cmd += f'CreateServerSocketTerminal {port} "{name}" false'
+ self._handles.add_channel_tcp(name, 'localhost', port)
+ elif term_type == 'pty':
+ path = self._path(name)
+ cmd += f'CreateUartPtyTerminal "{name}" "{path}"'
+ self._handles.add_channel_pty(name, str(path))
+ else:
+ raise InvalidChannelType(term_type)
+
+ self._start_cmd.extend(['--execute', cmd])
+ self._start_cmd.extend(
+ ['--execute', f'connector Connect {dev_path} {name}']
+ )
+
+ port = self._allocate_port()
+ self._start_cmd.extend(['--execute', f'machine StartGdbServer {port}'])
+ self._handles.add_channel_tcp('gdb', 'localhost', port)
+
+ if file:
+ self._start_cmd.extend(['--execute', f'sysbus LoadELF @{file}'])
+
+ if not pause:
+ self._start_cmd.extend(['--execute', 'start'])
+
+ return self._start_cmd
+
+ def _post_start(self) -> None:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ robot = self._handles.channels['gdb']
+ assert isinstance(robot, Handles.TcpChannel)
+
+ # renode is slow to start especially during host load
+ deadline = time.monotonic() + 120
+ connected = False
+ err = None
+ while time.monotonic() < deadline:
+ try:
+ sock.connect((robot.host, robot.port))
+ connected = True
+ break
+ except OSError as exc:
+ err = exc
+ time.sleep(1)
+
+ if not connected:
+ msg = 'failed to connect to robot channel'
+ msg += f'({robot.host}:{robot.port}): {err}'
+ raise RenodeRobotError(msg)
+
+ sock.close()
+
+ def _get_connector(self, wdir: Path) -> Connector:
+ return RenodeConnector(wdir)
+
+
+class RenodeConnector(Connector):
+ """renode implementation for the emulator specific connector methods."""
+
+ def __init__(self, wdir: Path) -> None:
+ super().__init__(wdir)
+ if self.get_emu() != 'renode':
+ raise WrongEmulator('renode', self.get_emu())
+ robot = self._handles.channels['robot']
+ host = robot.host
+ port = robot.port
+ self._proxy = xmlrpc.client.ServerProxy(f'http://{host}:{port}/')
+
+ def _request(self, cmd: str, args: List[str]) -> Any:
+ """Send a request using the robot interface.
+
+ Using the robot interface is not ideal since it is designed
+ for testing. However, it is more robust than the ANSI colored,
+ echoed, log mixed, telnet interface.
+
+ """
+
+ resp = self._proxy.run_keyword(cmd, args)
+ if not isinstance(resp, dict):
+ raise RenodeRobotError('expected dictionary in response')
+ if resp['status'] != 'PASS':
+ raise RenodeRobotError(resp['error'])
+ if resp.get('return'):
+ return resp['return']
+ return None
+
+ def reset(self) -> None:
+ self._request('ResetEmulation', [])
+
+ def cont(self) -> None:
+ self._request('StartEmulation', [])
+
+ def list_properties(self, path: str) -> List[Any]:
+ return self._request('ExecuteCommand', [f'{path}'])
+
+ def get_property(self, path: str, prop: str) -> Any:
+ return self._request('ExecuteCommand', [f'{path} {prop}'])
+
+ def set_property(self, path: str, prop: str, value: Any) -> None:
+ return self._request('ExecuteCommand', [f'{path} {prop} {value}'])
diff --git a/pw_emu/py/pyproject.toml b/pw_emu/py/pyproject.toml
new file mode 100644
index 000000000..78668a709
--- /dev/null
+++ b/pw_emu/py/pyproject.toml
@@ -0,0 +1,16 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+[build-system]
+requires = ['setuptools', 'wheel']
+build-backend = 'setuptools.build_meta'
diff --git a/pw_emu/py/setup.cfg b/pw_emu/py/setup.cfg
new file mode 100644
index 000000000..5dd480aca
--- /dev/null
+++ b/pw_emu/py/setup.cfg
@@ -0,0 +1,28 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+[metadata]
+name = pw_emu
+version = 0.0.1
+author = Pigweed Authors
+author_email = pigweed-developers@googlegroups.com
+description = Pigweed Emulators Frontend
+
+[options]
+packages = find:
+zip_safe = False
+install_requires =
+ python-daemon
+
+[options.package_data]
+pw_emu = py.typed
diff --git a/pw_emu/py/setup.py b/pw_emu/py/setup.py
new file mode 100644
index 000000000..7b1cffbda
--- /dev/null
+++ b/pw_emu/py/setup.py
@@ -0,0 +1,18 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""pw_emu"""
+
+import setuptools # type: ignore
+
+setuptools.setup() # Package definition in setup.cfg
diff --git a/pw_emu/py/tests/__init__.py b/pw_emu/py/tests/__init__.py
new file mode 100644
index 000000000..0f2cb9104
--- /dev/null
+++ b/pw_emu/py/tests/__init__.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
diff --git a/pw_emu/py/tests/cli_test.py b/pw_emu/py/tests/cli_test.py
new file mode 100644
index 000000000..86bde6432
--- /dev/null
+++ b/pw_emu/py/tests/cli_test.py
@@ -0,0 +1,276 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests for the command line interface"""
+
+import os
+import signal
+import subprocess
+import sys
+import time
+import unittest
+
+from pathlib import Path
+from typing import List
+
+from mock_emu_frontend import _mock_emu
+from tests.common import ConfigHelper
+
+
+# TODO: b/301382004 - The Python Pigweed package install (into python-venv)
+# races with running this test and there is no way to add that package as a test
+# depedency without creating circular depedencies. This means we can't rely on
+# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper.
+#
+# Run the CLI directly instead of going through pw cli.
+_cli_path = Path(
+ os.path.join(os.environ['PW_ROOT'], 'pw_emu', 'py', 'pw_emu', '__main__.py')
+).resolve()
+
+
+class TestCli(ConfigHelper):
+ """Test non-interactive commands"""
+
+ _config = {
+ 'emulators': {
+ 'mock-emu': {
+ 'launcher': 'mock_emu_frontend.MockEmuLauncher',
+ 'connector': 'mock_emu_frontend.MockEmuConnector',
+ }
+ },
+ 'mock-emu': {
+ 'tcp_channel': True,
+ 'gdb_channel': True,
+ },
+ 'gdb': _mock_emu + ['--exit', '--'],
+ 'targets': {'test-target': {'mock-emu': {}}},
+ }
+
+ def _build_cmd(self, args: List[str]) -> List[str]:
+ cmd = [
+ 'python',
+ str(_cli_path),
+ '--working-dir',
+ self._wdir.name,
+ '--config',
+ self._config_file,
+ ] + args
+ return cmd
+
+ def _run(self, args: List[str], **kwargs) -> subprocess.CompletedProcess:
+ """Run the CLI and wait for completion"""
+ return subprocess.run(self._build_cmd(args), **kwargs)
+
+ def _popen(self, args: List[str], **kwargs) -> subprocess.Popen:
+ """Run the CLI in the background"""
+ return subprocess.Popen(self._build_cmd(args), **kwargs)
+
+
+class TestNonInteractive(TestCli):
+ """Test non interactive commands."""
+
+ def setUp(self) -> None:
+ super().setUp()
+ self.assertEqual(self._run(['start', 'test-target']).returncode, 0)
+
+ def tearDown(self) -> None:
+ self.assertEqual(self._run(['stop']).returncode, 0)
+ super().tearDown()
+
+ def test_already_running(self) -> None:
+ self.assertNotEqual(self._run(['start', 'test-target']).returncode, 0)
+
+ def test_gdb_cmds(self) -> None:
+ status = self._run(
+ ['gdb-cmds', 'show version'],
+ )
+ self.assertEqual(status.returncode, 0)
+
+ def test_prop_ls(self) -> None:
+ status = self._run(['prop-ls', 'path1'], stdout=subprocess.PIPE)
+ self.assertEqual(status.returncode, 0)
+ self.assertTrue('prop1' in status.stdout.decode('ascii'))
+ status = self._run(['prop-ls', 'invalid path'], stdout=subprocess.PIPE)
+ self.assertNotEqual(status.returncode, 0)
+
+ def test_prop_get(self) -> None:
+ status = self._run(
+ ['prop-get', 'invalid path', 'prop1'],
+ stdout=subprocess.PIPE,
+ )
+ self.assertNotEqual(status.returncode, 0)
+ status = self._run(
+ ['prop-get', 'path1', 'invalid prop'],
+ stdout=subprocess.PIPE,
+ )
+ self.assertNotEqual(status.returncode, 0)
+ status = self._run(
+ ['prop-get', 'path1', 'prop1'],
+ stdout=subprocess.PIPE,
+ )
+ self.assertEqual(status.returncode, 0)
+ self.assertTrue('val1' in status.stdout.decode('ascii'))
+
+ def test_prop_set(self) -> None:
+ status = self._run(
+ ['prop-set', 'invalid path', 'prop1', 'v'],
+ stdout=subprocess.PIPE,
+ )
+ self.assertNotEqual(status.returncode, 0)
+ status = self._run(
+ ['prop-set', 'path1', 'invalid prop', 'v'],
+ stdout=subprocess.PIPE,
+ )
+ self.assertNotEqual(status.returncode, 0)
+ status = self._run(
+ ['prop-set', 'path1', 'prop1', 'value'],
+ stdout=subprocess.PIPE,
+ )
+ self.assertEqual(status.returncode, 0)
+ status = self._run(
+ ['prop-get', 'path1', 'prop1'],
+ stdout=subprocess.PIPE,
+ )
+ self.assertEqual(status.returncode, 0)
+ self.assertTrue('value' in status.stdout.decode('ascii'), status.stdout)
+
+ def test_reset(self) -> None:
+ self.assertEqual(self._run(['reset']).returncode, 0)
+ self.assertTrue(os.path.exists(os.path.join(self._wdir.name, 'reset')))
+
+ def test_load(self) -> None:
+ self.assertEqual(self._run(['load', 'executable']).returncode, 0)
+
+ def test_resume(self) -> None:
+ self.assertEqual(self._run(['resume']).returncode, 0)
+
+
+class TestForeground(TestCli):
+ """Test starting in foreground"""
+
+ def _test_common(self, cmd) -> None:
+ # Run the CLI process in a new session so that we can terminate both the
+ # CLI and the mock emulator it spawns in the foreground.
+ args = {}
+ if sys.platform == 'win32':
+ args['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
+ else:
+ args['start_new_session'] = True
+ proc = self._popen(cmd, stdout=subprocess.PIPE, **args)
+ assert proc.stdout
+ output = proc.stdout.readline()
+ self.assertTrue(
+ 'starting mock emulator' in output.decode('utf-8'),
+ output.decode('utf-8'),
+ )
+ if sys.platform == 'win32':
+ # See https://bugs.python.org/issue26350
+ os.kill(proc.pid, signal.CTRL_BREAK_EVENT)
+ else:
+ os.kill(-proc.pid, signal.SIGTERM)
+ proc.wait()
+ proc.stdout.close()
+
+ def test_foreground(self) -> None:
+ self._test_common(['start', '--foreground', 'test-target'])
+
+ def test_debug(self) -> None:
+ self._test_common(['start', '--debug', 'test-target'])
+
+
+class TestInteractive(TestCli):
+ """Test interactive commands"""
+
+ def setUp(self) -> None:
+ super().setUp()
+ self.assertEqual(self._run(['start', 'test-target']).returncode, 0)
+
+ def tearDown(self) -> None:
+ self.assertEqual(self._run(['stop']).returncode, 0)
+ super().tearDown()
+
+ @staticmethod
+ def _read_nonblocking(fd: int, size: int) -> bytes:
+ try:
+ return os.read(fd, size)
+ except BlockingIOError:
+ return b''
+
+ def test_term(self) -> None:
+ """Test the pw emu term command"""
+
+ if sys.platform == 'win32':
+ self.skipTest('pty not supported on win32')
+
+ # pylint: disable=import-outside-toplevel
+ # Can't import pty on win32.
+ import pty
+
+ # pylint: disable=no-member
+ # Avoid pylint false positive on win32.
+ pid, fd = pty.fork()
+ if pid == 0:
+ status = self._run(['term', 'tcp'])
+ # pylint: disable=protected-access
+ # Use os._exit instead of os.exit after fork.
+ os._exit(status.returncode)
+ else:
+ expected = '--- Miniterm on tcp ---'
+
+ # Read the expected string with a timeout.
+ os.set_blocking(fd, False)
+ deadline = time.monotonic() + 5
+ data = self._read_nonblocking(fd, len(expected))
+ while len(data) < len(expected):
+ time.sleep(0.1)
+ data += self._read_nonblocking(fd, len(expected) - len(data))
+ if time.monotonic() > deadline:
+ break
+ self.assertTrue(
+ expected in data.decode('ascii'),
+ data + self._read_nonblocking(fd, 100),
+ )
+
+ # send CTRL + ']' to terminate miniterm
+ os.write(fd, b'\x1d')
+
+ # wait for the process to exit, with a timeout
+ deadline = time.monotonic() + 5
+ wait_pid, ret = os.waitpid(pid, os.WNOHANG)
+ while wait_pid == 0:
+ time.sleep(0.1)
+ # Discard input to avoid writer hang on MacOS,
+ # see https://github.com/python/cpython/issues/97001.
+ try:
+ self._read_nonblocking(fd, 100)
+ except OSError:
+ # Avoid read errors when the child pair of the pty
+ # closes when the child terminates.
+ pass
+ wait_pid, ret = os.waitpid(pid, os.WNOHANG)
+ if time.monotonic() > deadline:
+ break
+ self.assertEqual(wait_pid, pid)
+ self.assertEqual(ret, 0)
+
+ def test_gdb(self) -> None:
+ res = self._run(['gdb', '-e', 'executable'], stdout=subprocess.PIPE)
+ self.assertEqual(res.returncode, 0)
+ output = res.stdout.decode('ascii')
+ self.assertTrue('target remote' in output, output)
+ self.assertTrue('executable' in output, output)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pw_emu/py/tests/common.py b/pw_emu/py/tests/common.py
new file mode 100644
index 000000000..9d2513fcc
--- /dev/null
+++ b/pw_emu/py/tests/common.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Common utilities for tests."""
+
+import json
+import os
+import subprocess
+import tempfile
+import unittest
+
+from pathlib import Path
+from typing import Any, Optional, Dict
+
+from pw_emu.frontend import Emulator
+
+
+def check_prog(prog: str) -> tuple:
+ msg = f'running {prog}'
+ try:
+ proc = subprocess.run([prog, '--help'], capture_output=True)
+ if proc.returncode != 0:
+ output = proc.stdout.decode('ascii') + proc.stderr.decode('ascii')
+ msg = f'error {msg}: {output}'
+ return (False, msg)
+ except OSError as err:
+ msg = f'error {msg}: {str(err)}'
+ return (False, msg)
+ return (True, msg)
+
+
+class ConfigHelper(unittest.TestCase):
+ """Helper that setups and tears down the configuration file"""
+
+ _config: Optional[Dict[str, Any]] = None
+
+ def setUp(self) -> None:
+ self._wdir = tempfile.TemporaryDirectory()
+ with tempfile.NamedTemporaryFile('wt', delete=False) as file:
+ pw_emu_config: Dict[str, Any] = {'pw': {'pw_emu': {}}}
+ if self._config:
+ pw_emu_config['pw']['pw_emu'].update(self._config)
+ json.dump(pw_emu_config, file)
+ self._config_file = file.name
+
+ def tearDown(self) -> None:
+ self._wdir.cleanup()
+ os.unlink(self._config_file)
+
+
+class ConfigHelperWithEmulator(ConfigHelper):
+ """Helper that setups and tears down the configuration file"""
+
+ def setUp(self) -> None:
+ super().setUp()
+ self._emu = Emulator(Path(self._wdir.name), Path(self._config_file))
diff --git a/pw_emu/py/tests/core_test.py b/pw_emu/py/tests/core_test.py
new file mode 100644
index 000000000..1131d5dd3
--- /dev/null
+++ b/pw_emu/py/tests/core_test.py
@@ -0,0 +1,480 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests for core / infrastructure code."""
+
+import copy
+import io
+import json
+import os
+import socket
+import sys
+import tempfile
+import time
+import unittest
+from pathlib import Path
+from typing import Any, Dict
+
+from unittest.mock import patch
+
+from pw_emu.core import (
+ AlreadyRunning,
+ Config,
+ ConfigError,
+ Handles,
+ InvalidTarget,
+ InvalidChannelName,
+ InvalidChannelType,
+ Launcher,
+)
+from mock_emu_frontend import _mock_emu
+from tests.common import ConfigHelper
+
+
+class ConfigHelperWithLauncher(ConfigHelper):
+ def setUp(self) -> None:
+ super().setUp()
+ self._launcher = Launcher.get('mock-emu', Path(self._config_file))
+
+
+class TestInvalidTarget(ConfigHelperWithLauncher):
+ """Check that InvalidTarget is raised with an empty config."""
+
+ _config: Dict[str, Any] = {
+ 'emulators': {
+ 'mock-emu': {
+ 'launcher': 'mock_emu_frontend.MockEmuLauncher',
+ 'connector': 'mock_emu_frontend.MockEmuConnector',
+ }
+ },
+ }
+
+ def test_invalid_target(self) -> None:
+ with self.assertRaises(InvalidTarget):
+ self._launcher.start(Path(self._wdir.name), 'test-target')
+
+
+class TestStart(ConfigHelperWithLauncher):
+ """Start tests for valid config."""
+
+ _config: Dict[str, Any] = {
+ 'emulators': {
+ 'mock-emu': {
+ 'launcher': 'mock_emu_frontend.MockEmuLauncher',
+ 'connector': 'mock_emu_frontend.MockEmuConnector',
+ }
+ },
+ 'targets': {'test-target': {'mock-emu': {}}},
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ self._connector = self._launcher.start(
+ Path(self._wdir.name), 'test-target'
+ )
+
+ def tearDown(self) -> None:
+ self._connector.stop()
+ super().tearDown()
+
+ def test_running(self) -> None:
+ self.assertTrue(self._connector.running())
+
+ def test_pid_file(self) -> None:
+ self.assertTrue(
+ os.path.exists(os.path.join(self._wdir.name, 'mock-emu.pid'))
+ )
+
+ def test_handles_file(self) -> None:
+ self.assertTrue(
+ os.path.exists(os.path.join(self._wdir.name, 'handles.json'))
+ )
+
+ def test_already_running(self) -> None:
+ with self.assertRaises(AlreadyRunning):
+ self._launcher.start(Path(self._wdir.name), 'test-target')
+
+ def test_log(self) -> None:
+ exp = 'starting mock emulator'
+ path = os.path.join(self._wdir.name, 'mock-emu.log')
+ deadline = time.monotonic() + 100
+ while os.path.getsize(path) < len(exp):
+ time.sleep(0.1)
+ if time.monotonic() > deadline:
+ break
+
+ with open(os.path.join(self._wdir.name, 'mock-emu.log'), 'rt') as file:
+ data = file.read()
+ self.assertTrue(exp in data, data)
+
+
+class TestPrePostStartCmds(ConfigHelperWithLauncher):
+ """Tests for configurations with pre-start commands."""
+
+ _config: Dict[str, Any] = {
+ 'emulators': {
+ 'mock-emu': {
+ 'launcher': 'mock_emu_frontend.MockEmuLauncher',
+ 'connector': 'mock_emu_frontend.MockEmuConnector',
+ }
+ },
+ 'targets': {
+ 'test-target': {
+ 'pre-start-cmds': {
+ 'pre-1': _mock_emu + ['pre-1'],
+ 'pre-2': _mock_emu + ['$pw_emu_wdir{pre-2}'],
+ },
+ 'post-start-cmds': {
+ 'post-1': _mock_emu
+ + ['$pw_emu_channel_path{test_subst_pty}'],
+ 'post-2': _mock_emu
+ + [
+ '$pw_emu_channel_host{test_subst_tcp}',
+ '$pw_emu_channel_port{test_subst_tcp}',
+ ],
+ },
+ 'mock-emu': {},
+ }
+ },
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ self._connector = self._launcher.start(
+ Path(self._wdir.name), 'test-target'
+ )
+
+ def tearDown(self) -> None:
+ self._connector.stop()
+ super().tearDown()
+
+ def test_running(self) -> None:
+ for proc in self._connector.get_procs().keys():
+ self.assertTrue(self._connector.proc_running(proc))
+
+ def test_stop(self) -> None:
+ self._connector.stop()
+ for proc in self._connector.get_procs().keys():
+ self.assertFalse(self._connector.proc_running(proc))
+
+ def test_pid_files(self) -> None:
+ for proc in ['pre-1', 'pre-2', 'post-1', 'post-2']:
+ self.assertTrue(
+ os.path.exists(os.path.join(self._wdir.name, f'{proc}.pid'))
+ )
+
+ def test_logs(self):
+ expect = {
+ 'pre-1.log': 'pre-1',
+ 'pre-2.log': os.path.join(self._wdir.name, 'pre-2'),
+ 'post-1.log': 'pty-path',
+ 'post-2.log': 'localhost 1234',
+ }
+
+ for log, pattern in expect.items():
+ path = os.path.join(self._wdir.name, log)
+ deadline = time.monotonic() + 100
+ while os.path.getsize(path) < len(pattern):
+ time.sleep(0.1)
+ if time.monotonic() > deadline:
+ break
+ with open(os.path.join(self._wdir.name, log)) as file:
+ data = file.read()
+ self.assertTrue(pattern in data, f'`{pattern}` not in `{data}`')
+
+
+class TestStop(ConfigHelperWithLauncher):
+ """Stop tests for valid config."""
+
+ _config: Dict[str, Any] = {
+ 'emulators': {
+ 'mock-emu': {
+ 'launcher': 'mock_emu_frontend.MockEmuLauncher',
+ 'connector': 'mock_emu_frontend.MockEmuConnector',
+ }
+ },
+ 'targets': {'test-target': {'mock-emu': {}}},
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ self._connector = self._launcher.start(
+ Path(self._wdir.name), 'test-target'
+ )
+ self._connector.stop()
+
+ def test_pid_files(self) -> None:
+ self.assertFalse(
+ os.path.exists(os.path.join(self._wdir.name, 'emu.pid'))
+ )
+
+ def test_target_file(self) -> None:
+ self.assertFalse(
+ os.path.exists(os.path.join(self._wdir.name, 'target'))
+ )
+
+ def test_running(self) -> None:
+ self.assertFalse(self._connector.running())
+
+
+class TestChannels(ConfigHelperWithLauncher):
+ """Test Connector channels APIs."""
+
+ _config: Dict[str, Any] = {
+ 'emulators': {
+ 'mock-emu': {
+ 'launcher': 'mock_emu_frontend.MockEmuLauncher',
+ 'connector': 'mock_emu_frontend.MockEmuConnector',
+ }
+ },
+ 'mock-emu': {
+ 'tcp_channel': True,
+ 'pty_channel': sys.platform != 'win32',
+ },
+ 'targets': {'test-target': {'mock-emu': {}}},
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ self._connector = self._launcher.start(
+ Path(self._wdir.name), 'test-target'
+ )
+
+ def tearDown(self) -> None:
+ self._connector.stop()
+ super().tearDown()
+
+ def test_tcp_channel_addr(self) -> None:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(self._connector.get_channel_addr('tcp'))
+ sock.close()
+
+ def test_get_channel_type(self) -> None:
+ self.assertEqual(self._connector.get_channel_type('tcp'), 'tcp')
+ if sys.platform != 'win32':
+ self.assertEqual(self._connector.get_channel_type('pty'), 'pty')
+ with self.assertRaises(InvalidChannelName):
+ self._connector.get_channel_type('invalid channel')
+
+ def test_pty_channel_path(self) -> None:
+ if sys.platform == 'win32':
+ self.skipTest('pty not supported on win32')
+ self.assertTrue(os.path.exists(self._connector.get_channel_path('pty')))
+ with self.assertRaises(InvalidChannelType):
+ self._connector.get_channel_path('tcp')
+
+ def _test_stream(self, stream: io.RawIOBase) -> None:
+ for char in [b'1', b'2', b'3', b'4']:
+ stream.write(char)
+ self.assertEqual(stream.read(1), char)
+
+ def test_tcp_stream(self) -> None:
+ with self._connector.get_channel_stream('tcp') as stream:
+ self._test_stream(stream)
+
+ def test_pty_stream(self) -> None:
+ if sys.platform == 'win32':
+ self.skipTest('pty not supported on win32')
+ with self._connector.get_channel_stream('pty') as stream:
+ self._test_stream(stream)
+
+
+class TestTargetFragments(unittest.TestCase):
+ """Tests for configurations using target fragments."""
+
+ _config_templ: Dict[str, Any] = {
+ 'pw': {
+ 'pw_emu': {
+ 'target_files': [],
+ 'targets': {
+ 'test-target': {
+ 'test-key': 'test-value',
+ }
+ },
+ }
+ }
+ }
+
+ _tf1_config: Dict[str, Any] = {
+ 'targets': {
+ 'test-target1': {},
+ 'test-target': {
+ 'test-key': 'test-value-file1',
+ },
+ }
+ }
+
+ _tf2_config: Dict[str, Any] = {'targets': {'test-target2': {}}}
+
+ def setUp(self) -> None:
+ with tempfile.NamedTemporaryFile(
+ 'wt', delete=False
+ ) as config_file, tempfile.NamedTemporaryFile(
+ 'wt', delete=False
+ ) as targets1_file, tempfile.NamedTemporaryFile(
+ 'wt', delete=False
+ ) as targets2_file:
+ self._config_path = config_file.name
+ self._targets1_path = targets1_file.name
+ self._targets2_path = targets2_file.name
+ json.dump(self._tf1_config, targets1_file)
+ json.dump(self._tf2_config, targets2_file)
+ config = copy.deepcopy(self._config_templ)
+ config['pw']['pw_emu']['target_files'].append(self._targets1_path)
+ config['pw']['pw_emu']['target_files'].append(self._targets2_path)
+ json.dump(config, config_file)
+ self._config = Config(Path(self._config_path))
+
+ def tearDown(self) -> None:
+ os.unlink(self._config_path)
+ os.unlink(self._targets1_path)
+ os.unlink(self._targets2_path)
+
+ def test_targets_loaded(self) -> None:
+ self.assertIsNotNone(self._config.get(['targets', 'test-target']))
+ self.assertIsNotNone(self._config.get(['targets', 'test-target1']))
+ self.assertIsNotNone(self._config.get(['targets', 'test-target2']))
+
+ def test_targets_priority(self) -> None:
+ self.assertEqual(
+ self._config.get(['targets', 'test-target', 'test-key']),
+ 'test-value',
+ )
+
+
+class TestHandles(unittest.TestCase):
+ """Tests for Handles."""
+
+ _config = {
+ 'emu': 'mock-emu',
+ 'config': 'test-config',
+ 'target': 'test-target',
+ 'gdb_cmd': ['test-gdb'],
+ 'channels': {
+ 'tcp_chan': {'type': 'tcp', 'host': 'localhost', 'port': 1234},
+ 'pty_chan': {'type': 'pty', 'path': 'path'},
+ },
+ 'procs': {'proc0': {'pid': 1983}, 'proc1': {'pid': 1234}},
+ }
+
+ def test_serialize(self):
+ handles = Handles('mock-emu', 'test-config')
+ handles.add_channel_tcp('tcp_chan', 'localhost', 1234)
+ handles.add_channel_pty('pty_chan', 'path')
+ handles.add_proc('proc0', 1983)
+ handles.add_proc('proc1', 1234)
+ handles.set_target('test-target')
+ handles.set_gdb_cmd(['test-gdb'])
+ tmp = tempfile.TemporaryDirectory()
+ handles.save(Path(tmp.name))
+ with open(os.path.join(tmp.name, 'handles.json'), 'rt') as file:
+ self.assertTrue(json.load(file) == self._config)
+ tmp.cleanup()
+
+ def test_load(self):
+ tmp = tempfile.TemporaryDirectory()
+ with open(os.path.join(tmp.name, 'handles.json'), 'wt') as file:
+ json.dump(self._config, file)
+ handles = Handles.load(Path(tmp.name))
+ self.assertEqual(handles.emu, 'mock-emu')
+ self.assertEqual(handles.gdb_cmd, ['test-gdb'])
+ self.assertEqual(handles.target, 'test-target')
+ self.assertEqual(handles.config, 'test-config')
+ tmp.cleanup()
+
+
+class TestConfig(ConfigHelper):
+ """Stop tests for valid config."""
+
+ _config: Dict[str, Any] = {
+ 'top': 'entry',
+ 'multi': {
+ 'level': {
+ 'entry': 0,
+ },
+ },
+ 'subst': 'a/$pw_env{PW_EMU_TEST_ENV_SUBST}/c',
+ 'targets': {
+ 'test-target': {
+ 'entry': [1, 2, 3],
+ 'mock-emu': {
+ 'entry': 'test',
+ },
+ }
+ },
+ 'mock-emu': {
+ 'executable': _mock_emu,
+ },
+ 'list': ['a', '$pw_env{PW_EMU_TEST_ENV_SUBST}', 'c'],
+ 'bad-subst-type': '$pw_bad_subst_type{test}',
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ self._cfg = Config(Path(self._config_file), 'test-target', 'mock-emu')
+
+ def test_top_entry(self) -> None:
+ self.assertEqual(self._cfg.get(['top']), 'entry')
+
+ def test_empty_subst(self) -> None:
+ with self.assertRaises(ConfigError):
+ self._cfg.get(['subst'])
+
+ def test_subst(self) -> None:
+ with patch.dict('os.environ', {'PW_EMU_TEST_ENV_SUBST': 'b'}):
+ self.assertEqual(self._cfg.get(['subst']), 'a/b/c')
+
+ def test_multi_level_entry(self) -> None:
+ self.assertEqual(self._cfg.get(['multi', 'level', 'entry']), 0)
+
+ def test_get_target(self) -> None:
+ self.assertEqual(self._cfg.get_targets(), ['test-target'])
+
+ def test_target(self) -> None:
+ self.assertEqual(self._cfg.get_target(['entry']), [1, 2, 3])
+
+ def test_target_emu(self) -> None:
+ self.assertEqual(self._cfg.get_target_emu(['entry']), 'test')
+
+ def test_type_checking(self) -> None:
+ with self.assertRaises(ConfigError):
+ self._cfg.get(['top'], entry_type=int)
+ self._cfg.get(['top'], entry_type=str)
+ self._cfg.get_target(['entry'], entry_type=list)
+ self._cfg.get_target_emu(['entry'], entry_type=str)
+ self._cfg.get(['targets'], entry_type=dict)
+
+ def test_non_optional(self) -> None:
+ with self.assertRaises(ConfigError):
+ self._cfg.get(['non-existing'], optional=False)
+
+ def test_optional(self) -> None:
+ self.assertEqual(self._cfg.get(['non-existing']), None)
+ self.assertEqual(self._cfg.get(['non-existing'], entry_type=int), 0)
+ self.assertEqual(self._cfg.get(['non-existing'], entry_type=str), '')
+ self.assertEqual(self._cfg.get(['non-existing'], entry_type=list), [])
+
+ def test_list(self) -> None:
+ with self.assertRaises(ConfigError):
+ self._cfg.get(['list'])
+ with patch.dict('os.environ', {'PW_EMU_TEST_ENV_SUBST': 'b'}):
+ self.assertEqual(self._cfg.get(['list']), ['a', 'b', 'c'])
+
+ def test_bad_subst(self) -> None:
+ with self.assertRaises(ConfigError):
+ self._cfg.get(['bad-subst-type'])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pw_emu/py/tests/frontend_test.py b/pw_emu/py/tests/frontend_test.py
new file mode 100644
index 000000000..36ea3d7b2
--- /dev/null
+++ b/pw_emu/py/tests/frontend_test.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Emulator API tests."""
+
+import unittest
+
+from typing import Any, Dict
+
+from pw_emu.core import (
+ ConfigError,
+ InvalidChannelType,
+ InvalidProperty,
+ InvalidPropertyPath,
+)
+from mock_emu_frontend import _mock_emu
+from tests.common import ConfigHelperWithEmulator
+
+
+class TestEmulator(ConfigHelperWithEmulator):
+ """Test Emulator APIs."""
+
+ _config = {
+ 'emulators': {
+ 'mock-emu': {
+ 'launcher': 'mock_emu_frontend.MockEmuLauncher',
+ 'connector': 'mock_emu_frontend.MockEmuConnector',
+ }
+ },
+ 'mock-emu': {
+ 'gdb_channel': True,
+ },
+ 'gdb': _mock_emu + ['--exit', '--'],
+ 'targets': {'test-target': {'mock-emu': {}}},
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ self._emu.start('test-target')
+
+ def tearDown(self) -> None:
+ self._emu.stop()
+ super().tearDown()
+
+ def test_gdb_target_remote(self) -> None:
+ output = self._emu.run_gdb_cmds([]).stdout
+ host, port = self._emu.get_channel_addr('gdb')
+ self.assertTrue(f'{host}:{port}' in output.decode('utf-8'))
+
+ def test_gdb_commands(self) -> None:
+ output = self._emu.run_gdb_cmds(['test_gdb_cmd']).stdout
+ self.assertTrue(
+ output and '-ex test_gdb_cmd' in output.decode('utf-8'), output
+ )
+
+ def test_gdb_executable(self) -> None:
+ output = self._emu.run_gdb_cmds([], 'test_gdb_exec').stdout
+ self.assertTrue('test_gdb_exec' in output.decode('utf-8'))
+
+ def test_gdb_pause(self) -> None:
+ output = self._emu.run_gdb_cmds([], pause=True).stdout
+ self.assertTrue('-ex disconnect' in output.decode('utf-8'))
+
+ # Minimal testing for APIs that are straight wrappers over Connector APIs.
+ def test_running(self) -> None:
+ self.assertTrue(self._emu.running())
+
+ def test_reset(self) -> None:
+ self._emu.reset()
+
+ def test_cont(self) -> None:
+ self._emu.cont()
+
+ def test_list_properties(self) -> None:
+ with self.assertRaises(InvalidPropertyPath):
+ self._emu.list_properties('invalid path')
+ self.assertEqual(self._emu.list_properties('path1'), ['prop1'])
+
+ def test_get_property(self) -> None:
+ with self.assertRaises(InvalidProperty):
+ self._emu.get_property('path1', 'invalid property')
+ with self.assertRaises(InvalidPropertyPath):
+ self._emu.get_property('invalid path', 'prop1')
+ self.assertEqual(self._emu.get_property('path1', 'prop1'), 'val1')
+
+ def test_set_property(self) -> None:
+ with self.assertRaises(InvalidPropertyPath):
+ self._emu.set_property('invalid path', 'prop1', 'test')
+ with self.assertRaises(InvalidProperty):
+ self._emu.set_property('path1', 'invalid property', 'test')
+ self._emu.set_property('path1', 'prop1', 'val2')
+ self.assertEqual(self._emu.get_property('path1', 'prop1'), 'val2')
+
+ def test_get_channel_type(self) -> None:
+ self.assertEqual(self._emu.get_channel_type('gdb'), 'tcp')
+
+ def test_get_channel_path(self) -> None:
+ with self.assertRaises(InvalidChannelType):
+ self._emu.get_channel_path('gdb')
+
+ def test_get_channel_addr(self) -> None:
+ self.assertEqual(len(self._emu.get_channel_addr('gdb')), 2)
+
+ def test_channel_stream(self) -> None:
+ with self._emu.get_channel_stream('gdb') as _:
+ pass
+
+
+class TestGdbEmptyConfig(ConfigHelperWithEmulator):
+ """Check that ConfigError is raised when running gdb with an empty
+ gdb config.
+
+ """
+
+ _config: Dict[str, Any] = {
+ 'emulators': {
+ 'mock-emu': {
+ 'launcher': 'mock_emu_frontend.MockEmuLauncher',
+ 'connector': 'mock_emu_frontend.MockEmuConnector',
+ }
+ },
+ 'targets': {'test-target': {'mock-emu': {}}},
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ self._emu.start('test-target')
+
+ def tearDown(self) -> None:
+ self._emu.stop()
+ super().tearDown()
+
+ def test_gdb_config_error(self) -> None:
+ with self.assertRaises(ConfigError):
+ self._emu.run_gdb_cmds([])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pw_emu/py/tests/py.typed b/pw_emu/py/tests/py.typed
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/pw_emu/py/tests/py.typed
diff --git a/pw_emu/py/tests/qemu_test.py b/pw_emu/py/tests/qemu_test.py
new file mode 100644
index 000000000..c84fb2b6d
--- /dev/null
+++ b/pw_emu/py/tests/qemu_test.py
@@ -0,0 +1,280 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""QEMU emulator tests."""
+
+import json
+import os
+import socket
+import sys
+import tempfile
+import time
+import unittest
+
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+from pw_emu.core import InvalidChannelName, InvalidChannelType
+from tests.common import check_prog, ConfigHelperWithEmulator
+
+
+# TODO: b/301382004 - The Python Pigweed package install (into python-venv)
+# races with running this test and there is no way to add that package as a test
+# depedency without creating circular depedencies. This means we can't rely on
+# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper.
+#
+# run the arm_gdb.py wrapper directly
+_arm_none_eabi_gdb_path = Path(
+ os.path.join(
+ os.environ['PW_ROOT'],
+ 'pw_env_setup',
+ 'py',
+ 'pw_env_setup',
+ 'entry_points',
+ 'arm_gdb.py',
+ )
+).resolve()
+
+
+class TestQemu(ConfigHelperWithEmulator):
+ """Tests for a valid qemu configuration."""
+
+ _config = {
+ 'gdb': ['python', str(_arm_none_eabi_gdb_path)],
+ 'qemu': {
+ 'executable': 'qemu-system-arm',
+ },
+ 'targets': {
+ 'test-target': {
+ 'ignore1': None,
+ 'qemu': {
+ 'machine': 'lm3s6965evb',
+ 'channels': {
+ 'chardevs': {
+ 'test_uart': {
+ 'id': 'serial0',
+ }
+ }
+ },
+ },
+ 'ignore2': None,
+ }
+ },
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ # No image so start paused to avoid crashing.
+ self._emu.start(target='test-target', pause=True)
+
+ def tearDown(self) -> None:
+ self._emu.stop()
+ super().tearDown()
+
+ def test_running(self) -> None:
+ self.assertTrue(self._emu.running())
+
+ def test_list_properties(self) -> None:
+ self.assertIsNotNone(self._emu.list_properties('/machine'))
+
+ def test_get_property(self) -> None:
+ self.assertEqual(
+ self._emu.get_property('/machine', 'type'), 'lm3s6965evb-machine'
+ )
+
+ def test_set_property(self) -> None:
+ self._emu.set_property('/machine', 'graphics', False)
+ self.assertFalse(self._emu.get_property('/machine', 'graphics'))
+
+ def test_bad_channel_name(self) -> None:
+ with self.assertRaises(InvalidChannelName):
+ self._emu.get_channel_addr('serial1')
+
+ def get_reg(self, addr: int) -> bytes:
+ temp = tempfile.NamedTemporaryFile(delete=False)
+ temp.close()
+
+ res = self._emu.run_gdb_cmds(
+ [
+ f'dump val {temp.name} *(char*){addr}',
+ 'disconnect',
+ ]
+ )
+ self.assertEqual(res.returncode, 0, res.stderr.decode('ascii'))
+
+ with open(temp.name, 'rb') as file:
+ ret = file.read(1)
+
+ self.assertNotEqual(ret, b'', res.stderr.decode('ascii'))
+
+ os.unlink(temp.name)
+
+ return ret
+
+ def poll_data(self, timeout: int) -> Optional[bytes]:
+ uartris = 0x4000C03C
+ uartrd = 0x4000C000
+
+ deadline = time.monotonic() + timeout
+ while self.get_reg(uartris) == b'\x00':
+ time.sleep(0.1)
+ if time.monotonic() > deadline:
+ return None
+ return self.get_reg(uartrd)
+
+ def test_channel_stream(self) -> None:
+ ok, msg = check_prog('arm-none-eabi-gdb')
+ if not ok:
+ self.skipTest(msg)
+
+ stream = self._emu.get_channel_stream('test_uart')
+ stream.write('test\n'.encode('ascii'))
+
+ self.assertEqual(self.poll_data(5), b't')
+ self.assertEqual(self.poll_data(5), b'e')
+ self.assertEqual(self.poll_data(5), b's')
+ self.assertEqual(self.poll_data(5), b't')
+
+ def test_gdb(self) -> None:
+ self._emu.run_gdb_cmds(['c'])
+ deadline = time.monotonic() + 5
+ while self._emu.running():
+ if time.monotonic() > deadline:
+ return
+ self.assertFalse(self._emu.running())
+
+
+class TestQemuChannelsTcp(TestQemu):
+ """Tests for configurations using TCP channels."""
+
+ _config: Dict[str, Any] = {}
+ _config.update(json.loads(json.dumps(TestQemu._config)))
+ _config['qemu']['channels'] = {'type': 'tcp'}
+
+ def test_get_channel_addr(self) -> None:
+ host, port = self._emu.get_channel_addr('test_uart')
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((host, port))
+ sock.close()
+
+
+class TestQemuChannelsPty(TestQemu):
+ """Tests for configurations using PTY channels."""
+
+ _config: Dict[str, Any] = {}
+ _config.update(json.loads(json.dumps(TestQemu._config)))
+ _config['qemu']['channels'] = {'type': 'pty'}
+
+ def setUp(self):
+ if sys.platform == 'win32':
+ self.skipTest('pty not supported on win32')
+ super().setUp()
+
+ def test_get_path(self) -> None:
+ self.assertTrue(os.path.exists(self._emu.get_channel_path('test_uart')))
+
+
+class TestQemuInvalidChannelType(ConfigHelperWithEmulator):
+ """Test invalid channel type configuration."""
+
+ _config = {
+ 'qemu': {
+ 'executable': 'qemu-system-arm',
+ 'channels': {'type': 'invalid'},
+ },
+ 'targets': {
+ 'test-target': {
+ 'qemu': {
+ 'machine': 'lm3s6965evb',
+ }
+ }
+ },
+ }
+
+ def test_start(self) -> None:
+ with self.assertRaises(InvalidChannelType):
+ self._emu.start('test-target', pause=True)
+
+
+class TestQemuTargetChannelsMixed(ConfigHelperWithEmulator):
+ """Test configuration with mixed channels types."""
+
+ _config = {
+ 'qemu': {
+ 'executable': 'qemu-system-arm',
+ },
+ 'targets': {
+ 'test-target': {
+ 'qemu': {
+ 'machine': 'lm3s6965evb',
+ 'channels': {
+ 'chardevs': {
+ 'test_uart0': {
+ 'id': 'serial0',
+ },
+ 'test_uart1': {
+ 'id': 'serial1',
+ 'type': 'tcp',
+ },
+ 'test_uart2': {
+ 'id': 'serial2',
+ 'type': 'pty',
+ },
+ }
+ },
+ }
+ }
+ },
+ }
+
+ def setUp(self) -> None:
+ if sys.platform == 'win32':
+ self.skipTest('pty not supported on win32')
+ super().setUp()
+ # no image to run so start paused
+ self._emu.start('test-target', pause=True)
+
+ def tearDown(self) -> None:
+ self._emu.stop()
+ super().tearDown()
+
+ def test_uart0_addr(self) -> None:
+ host, port = self._emu.get_channel_addr('test_uart0')
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((host, port))
+ sock.close()
+
+ def test_uart1_addr(self) -> None:
+ host, port = self._emu.get_channel_addr('test_uart1')
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((host, port))
+ sock.close()
+
+ def test_uart2_path(self) -> None:
+ self.assertTrue(
+ os.path.exists(self._emu.get_channel_path('test_uart2'))
+ )
+
+
+def main() -> None:
+ ok, msg = check_prog('qemu-system-arm')
+ if not ok:
+ print(f'skipping tests: {msg}')
+ sys.exit(0)
+
+ unittest.main()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/pw_emu/py/tests/renode_test.py b/pw_emu/py/tests/renode_test.py
new file mode 100644
index 000000000..acb9e47a9
--- /dev/null
+++ b/pw_emu/py/tests/renode_test.py
@@ -0,0 +1,238 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""renode emulator tests."""
+
+import json
+import os
+import sys
+import struct
+import tempfile
+import time
+import unittest
+
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+from pw_emu.core import InvalidChannelName, InvalidChannelType
+from tests.common import check_prog, ConfigHelperWithEmulator
+
+
+# TODO: b/301382004 - The Python Pigweed package install (into python-venv)
+# races with running this test and there is no way to add that package as a test
+# depedency without creating circular depedencies. This means we can't rely on
+# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper.
+#
+# run the arm_gdb.py wrapper directly
+_arm_none_eabi_gdb_path = Path(
+ os.path.join(
+ os.environ['PW_ROOT'],
+ 'pw_env_setup',
+ 'py',
+ 'pw_env_setup',
+ 'entry_points',
+ 'arm_gdb.py',
+ )
+).resolve()
+
+
+class TestRenode(ConfigHelperWithEmulator):
+ """Tests for a valid renode configuration."""
+
+ _config = {
+ 'gdb': ['python', str(_arm_none_eabi_gdb_path)],
+ 'renode': {
+ 'executable': 'renode',
+ },
+ 'targets': {
+ 'test-target': {
+ 'renode': {
+ 'machine': 'platforms/boards/stm32f4_discovery-kit.repl',
+ }
+ }
+ },
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ # no image to run so start paused
+ self._emu.start(target='test-target', pause=True)
+
+ def tearDown(self) -> None:
+ self._emu.stop()
+ super().tearDown()
+
+ def test_running(self) -> None:
+ self.assertTrue(self._emu.running())
+
+ def test_list_properties(self) -> None:
+ self.assertIsNotNone(self._emu.list_properties('sysbus.usart1'))
+
+ def test_get_property(self) -> None:
+ self.assertIsNotNone(
+ self._emu.get_property('sysbus.usart1', 'BaudRate')
+ )
+
+ def test_set_property(self) -> None:
+ self._emu.set_property('sysbus.timer1', 'Frequency', 100)
+ self.assertEqual(
+ int(self._emu.get_property('sysbus.timer1', 'Frequency'), 16), 100
+ )
+
+
+class TestRenodeInvalidChannelType(ConfigHelperWithEmulator):
+ """Test invalid channel type configuration."""
+
+ _config = {
+ 'renode': {
+ 'executable': 'renode',
+ },
+ 'targets': {
+ 'test-target': {
+ 'renode': {
+ 'machine': 'platforms/boards/stm32f4_discovery-kit.repl',
+ 'channels': {
+ 'terminals': {
+ 'test_uart': {
+ 'device-path': 'sysbus.usart1',
+ 'type': 'invalid',
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ def test_start(self) -> None:
+ with self.assertRaises(InvalidChannelType):
+ self._emu.start('test-target', pause=True)
+
+
+class TestRenodeChannels(ConfigHelperWithEmulator):
+ """Tests for a valid renode channels configuration."""
+
+ _config = {
+ 'gdb': ['python', str(_arm_none_eabi_gdb_path)],
+ 'renode': {
+ 'executable': 'renode',
+ },
+ 'targets': {
+ 'test-target': {
+ 'renode': {
+ 'machine': 'platforms/boards/stm32f4_discovery-kit.repl',
+ 'channels': {
+ 'terminals': {
+ 'test_uart': {
+ 'device-path': 'sysbus.usart1',
+ }
+ }
+ },
+ }
+ }
+ },
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ # no image to run so start paused
+ self._emu.start(target='test-target', pause=True)
+
+ def tearDown(self) -> None:
+ self._emu.stop()
+ super().tearDown()
+
+ def test_bad_channel_name(self) -> None:
+ with self.assertRaises(InvalidChannelName):
+ self._emu.get_channel_addr('serial1')
+
+ def set_reg(self, addr: int, val: int) -> None:
+ self._emu.run_gdb_cmds([f'set *(unsigned int*){addr}={val}'])
+
+ def get_reg(self, addr: int) -> int:
+ temp = tempfile.NamedTemporaryFile(delete=False)
+ temp.close()
+
+ res = self._emu.run_gdb_cmds(
+ [
+ f'dump val {temp.name} *(char*){addr}',
+ 'disconnect',
+ ]
+ )
+ self.assertEqual(res.returncode, 0, res.stderr.decode('ascii'))
+
+ with open(temp.name, 'rb') as file:
+ ret = file.read(1)
+
+ self.assertNotEqual(ret, b'', res.stderr.decode('ascii'))
+
+ os.unlink(temp.name)
+
+ return struct.unpack('B', ret)[0]
+
+ def poll_data(self, timeout: int) -> Optional[int]:
+ usart_sr = 0x40011000
+ usart_dr = 0x40011004
+ deadline = time.monotonic() + timeout
+ while self.get_reg(usart_sr) & 0x20 == 0:
+ time.sleep(0.1)
+ if time.monotonic() > deadline:
+ return None
+ return self.get_reg(usart_dr)
+
+ def test_channel_stream(self) -> None:
+ ok, msg = check_prog('arm-none-eabi-gdb')
+ if not ok:
+ self.skipTest(msg)
+
+ usart_cr1 = 0x4001100C
+ # enable RX and TX
+ self.set_reg(usart_cr1, 0xC)
+
+ stream = self._emu.get_channel_stream('test_uart')
+ stream.write('test\n'.encode('ascii'))
+
+ self.assertEqual(self.poll_data(5), ord('t'))
+ self.assertEqual(self.poll_data(5), ord('e'))
+ self.assertEqual(self.poll_data(5), ord('s'))
+ self.assertEqual(self.poll_data(5), ord('t'))
+
+
+class TestRenodeChannelsPty(TestRenodeChannels):
+ """Tests for configurations using PTY channels."""
+
+ _config: Dict[str, Any] = {}
+ _config.update(json.loads(json.dumps(TestRenodeChannels._config)))
+ _config['renode']['channels'] = {'terminals': {'type': 'pty'}}
+
+ def setUp(self):
+ if sys.platform == 'win32':
+ self.skipTest('pty not supported on win32')
+ super().setUp()
+
+ def test_get_path(self) -> None:
+ self.assertTrue(os.path.exists(self._emu.get_channel_path('test_uart')))
+
+
+def main() -> None:
+ ok, msg = check_prog('renode')
+ if not ok:
+ print(f'skipping tests: {msg}')
+ sys.exit(0)
+
+ unittest.main()
+
+
+if __name__ == '__main__':
+ main()