diff options
Diffstat (limited to 'pw_emu/py/tests')
-rw-r--r-- | pw_emu/py/tests/__init__.py | 14 | ||||
-rw-r--r-- | pw_emu/py/tests/cli_test.py | 276 | ||||
-rw-r--r-- | pw_emu/py/tests/common.py | 67 | ||||
-rw-r--r-- | pw_emu/py/tests/core_test.py | 480 | ||||
-rw-r--r-- | pw_emu/py/tests/frontend_test.py | 150 | ||||
-rw-r--r-- | pw_emu/py/tests/py.typed | 0 | ||||
-rw-r--r-- | pw_emu/py/tests/qemu_test.py | 280 | ||||
-rw-r--r-- | pw_emu/py/tests/renode_test.py | 238 |
8 files changed, 1505 insertions, 0 deletions
diff --git a/pw_emu/py/tests/__init__.py b/pw_emu/py/tests/__init__.py new file mode 100644 index 000000000..0f2cb9104 --- /dev/null +++ b/pw_emu/py/tests/__init__.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. diff --git a/pw_emu/py/tests/cli_test.py b/pw_emu/py/tests/cli_test.py new file mode 100644 index 000000000..86bde6432 --- /dev/null +++ b/pw_emu/py/tests/cli_test.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Tests for the command line interface""" + +import os +import signal +import subprocess +import sys +import time +import unittest + +from pathlib import Path +from typing import List + +from mock_emu_frontend import _mock_emu +from tests.common import ConfigHelper + + +# TODO: b/301382004 - The Python Pigweed package install (into python-venv) +# races with running this test and there is no way to add that package as a test +# depedency without creating circular depedencies. This means we can't rely on +# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. +# +# Run the CLI directly instead of going through pw cli. +_cli_path = Path( + os.path.join(os.environ['PW_ROOT'], 'pw_emu', 'py', 'pw_emu', '__main__.py') +).resolve() + + +class TestCli(ConfigHelper): + """Test non-interactive commands""" + + _config = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'mock-emu': { + 'tcp_channel': True, + 'gdb_channel': True, + }, + 'gdb': _mock_emu + ['--exit', '--'], + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def _build_cmd(self, args: List[str]) -> List[str]: + cmd = [ + 'python', + str(_cli_path), + '--working-dir', + self._wdir.name, + '--config', + self._config_file, + ] + args + return cmd + + def _run(self, args: List[str], **kwargs) -> subprocess.CompletedProcess: + """Run the CLI and wait for completion""" + return subprocess.run(self._build_cmd(args), **kwargs) + + def _popen(self, args: List[str], **kwargs) -> subprocess.Popen: + """Run the CLI in the background""" + return subprocess.Popen(self._build_cmd(args), **kwargs) + + +class TestNonInteractive(TestCli): + """Test non interactive commands.""" + + def setUp(self) -> None: + super().setUp() + self.assertEqual(self._run(['start', 'test-target']).returncode, 0) + + def tearDown(self) -> None: + self.assertEqual(self._run(['stop']).returncode, 0) + super().tearDown() + + def test_already_running(self) -> None: + self.assertNotEqual(self._run(['start', 'test-target']).returncode, 0) + + def test_gdb_cmds(self) -> None: + status = self._run( + ['gdb-cmds', 'show version'], + ) + self.assertEqual(status.returncode, 0) + + def test_prop_ls(self) -> None: + status = self._run(['prop-ls', 'path1'], stdout=subprocess.PIPE) + self.assertEqual(status.returncode, 0) + self.assertTrue('prop1' in status.stdout.decode('ascii')) + status = self._run(['prop-ls', 'invalid path'], stdout=subprocess.PIPE) + self.assertNotEqual(status.returncode, 0) + + def test_prop_get(self) -> None: + status = self._run( + ['prop-get', 'invalid path', 'prop1'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-get', 'path1', 'invalid prop'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-get', 'path1', 'prop1'], + stdout=subprocess.PIPE, + ) + self.assertEqual(status.returncode, 0) + self.assertTrue('val1' in status.stdout.decode('ascii')) + + def test_prop_set(self) -> None: + status = self._run( + ['prop-set', 'invalid path', 'prop1', 'v'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-set', 'path1', 'invalid prop', 'v'], + stdout=subprocess.PIPE, + ) + self.assertNotEqual(status.returncode, 0) + status = self._run( + ['prop-set', 'path1', 'prop1', 'value'], + stdout=subprocess.PIPE, + ) + self.assertEqual(status.returncode, 0) + status = self._run( + ['prop-get', 'path1', 'prop1'], + stdout=subprocess.PIPE, + ) + self.assertEqual(status.returncode, 0) + self.assertTrue('value' in status.stdout.decode('ascii'), status.stdout) + + def test_reset(self) -> None: + self.assertEqual(self._run(['reset']).returncode, 0) + self.assertTrue(os.path.exists(os.path.join(self._wdir.name, 'reset'))) + + def test_load(self) -> None: + self.assertEqual(self._run(['load', 'executable']).returncode, 0) + + def test_resume(self) -> None: + self.assertEqual(self._run(['resume']).returncode, 0) + + +class TestForeground(TestCli): + """Test starting in foreground""" + + def _test_common(self, cmd) -> None: + # Run the CLI process in a new session so that we can terminate both the + # CLI and the mock emulator it spawns in the foreground. + args = {} + if sys.platform == 'win32': + args['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + else: + args['start_new_session'] = True + proc = self._popen(cmd, stdout=subprocess.PIPE, **args) + assert proc.stdout + output = proc.stdout.readline() + self.assertTrue( + 'starting mock emulator' in output.decode('utf-8'), + output.decode('utf-8'), + ) + if sys.platform == 'win32': + # See https://bugs.python.org/issue26350 + os.kill(proc.pid, signal.CTRL_BREAK_EVENT) + else: + os.kill(-proc.pid, signal.SIGTERM) + proc.wait() + proc.stdout.close() + + def test_foreground(self) -> None: + self._test_common(['start', '--foreground', 'test-target']) + + def test_debug(self) -> None: + self._test_common(['start', '--debug', 'test-target']) + + +class TestInteractive(TestCli): + """Test interactive commands""" + + def setUp(self) -> None: + super().setUp() + self.assertEqual(self._run(['start', 'test-target']).returncode, 0) + + def tearDown(self) -> None: + self.assertEqual(self._run(['stop']).returncode, 0) + super().tearDown() + + @staticmethod + def _read_nonblocking(fd: int, size: int) -> bytes: + try: + return os.read(fd, size) + except BlockingIOError: + return b'' + + def test_term(self) -> None: + """Test the pw emu term command""" + + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + + # pylint: disable=import-outside-toplevel + # Can't import pty on win32. + import pty + + # pylint: disable=no-member + # Avoid pylint false positive on win32. + pid, fd = pty.fork() + if pid == 0: + status = self._run(['term', 'tcp']) + # pylint: disable=protected-access + # Use os._exit instead of os.exit after fork. + os._exit(status.returncode) + else: + expected = '--- Miniterm on tcp ---' + + # Read the expected string with a timeout. + os.set_blocking(fd, False) + deadline = time.monotonic() + 5 + data = self._read_nonblocking(fd, len(expected)) + while len(data) < len(expected): + time.sleep(0.1) + data += self._read_nonblocking(fd, len(expected) - len(data)) + if time.monotonic() > deadline: + break + self.assertTrue( + expected in data.decode('ascii'), + data + self._read_nonblocking(fd, 100), + ) + + # send CTRL + ']' to terminate miniterm + os.write(fd, b'\x1d') + + # wait for the process to exit, with a timeout + deadline = time.monotonic() + 5 + wait_pid, ret = os.waitpid(pid, os.WNOHANG) + while wait_pid == 0: + time.sleep(0.1) + # Discard input to avoid writer hang on MacOS, + # see https://github.com/python/cpython/issues/97001. + try: + self._read_nonblocking(fd, 100) + except OSError: + # Avoid read errors when the child pair of the pty + # closes when the child terminates. + pass + wait_pid, ret = os.waitpid(pid, os.WNOHANG) + if time.monotonic() > deadline: + break + self.assertEqual(wait_pid, pid) + self.assertEqual(ret, 0) + + def test_gdb(self) -> None: + res = self._run(['gdb', '-e', 'executable'], stdout=subprocess.PIPE) + self.assertEqual(res.returncode, 0) + output = res.stdout.decode('ascii') + self.assertTrue('target remote' in output, output) + self.assertTrue('executable' in output, output) + + +if __name__ == '__main__': + unittest.main() diff --git a/pw_emu/py/tests/common.py b/pw_emu/py/tests/common.py new file mode 100644 index 000000000..9d2513fcc --- /dev/null +++ b/pw_emu/py/tests/common.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Common utilities for tests.""" + +import json +import os +import subprocess +import tempfile +import unittest + +from pathlib import Path +from typing import Any, Optional, Dict + +from pw_emu.frontend import Emulator + + +def check_prog(prog: str) -> tuple: + msg = f'running {prog}' + try: + proc = subprocess.run([prog, '--help'], capture_output=True) + if proc.returncode != 0: + output = proc.stdout.decode('ascii') + proc.stderr.decode('ascii') + msg = f'error {msg}: {output}' + return (False, msg) + except OSError as err: + msg = f'error {msg}: {str(err)}' + return (False, msg) + return (True, msg) + + +class ConfigHelper(unittest.TestCase): + """Helper that setups and tears down the configuration file""" + + _config: Optional[Dict[str, Any]] = None + + def setUp(self) -> None: + self._wdir = tempfile.TemporaryDirectory() + with tempfile.NamedTemporaryFile('wt', delete=False) as file: + pw_emu_config: Dict[str, Any] = {'pw': {'pw_emu': {}}} + if self._config: + pw_emu_config['pw']['pw_emu'].update(self._config) + json.dump(pw_emu_config, file) + self._config_file = file.name + + def tearDown(self) -> None: + self._wdir.cleanup() + os.unlink(self._config_file) + + +class ConfigHelperWithEmulator(ConfigHelper): + """Helper that setups and tears down the configuration file""" + + def setUp(self) -> None: + super().setUp() + self._emu = Emulator(Path(self._wdir.name), Path(self._config_file)) diff --git a/pw_emu/py/tests/core_test.py b/pw_emu/py/tests/core_test.py new file mode 100644 index 000000000..1131d5dd3 --- /dev/null +++ b/pw_emu/py/tests/core_test.py @@ -0,0 +1,480 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Tests for core / infrastructure code.""" + +import copy +import io +import json +import os +import socket +import sys +import tempfile +import time +import unittest +from pathlib import Path +from typing import Any, Dict + +from unittest.mock import patch + +from pw_emu.core import ( + AlreadyRunning, + Config, + ConfigError, + Handles, + InvalidTarget, + InvalidChannelName, + InvalidChannelType, + Launcher, +) +from mock_emu_frontend import _mock_emu +from tests.common import ConfigHelper + + +class ConfigHelperWithLauncher(ConfigHelper): + def setUp(self) -> None: + super().setUp() + self._launcher = Launcher.get('mock-emu', Path(self._config_file)) + + +class TestInvalidTarget(ConfigHelperWithLauncher): + """Check that InvalidTarget is raised with an empty config.""" + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + } + + def test_invalid_target(self) -> None: + with self.assertRaises(InvalidTarget): + self._launcher.start(Path(self._wdir.name), 'test-target') + + +class TestStart(ConfigHelperWithLauncher): + """Start tests for valid config.""" + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._connector = self._launcher.start( + Path(self._wdir.name), 'test-target' + ) + + def tearDown(self) -> None: + self._connector.stop() + super().tearDown() + + def test_running(self) -> None: + self.assertTrue(self._connector.running()) + + def test_pid_file(self) -> None: + self.assertTrue( + os.path.exists(os.path.join(self._wdir.name, 'mock-emu.pid')) + ) + + def test_handles_file(self) -> None: + self.assertTrue( + os.path.exists(os.path.join(self._wdir.name, 'handles.json')) + ) + + def test_already_running(self) -> None: + with self.assertRaises(AlreadyRunning): + self._launcher.start(Path(self._wdir.name), 'test-target') + + def test_log(self) -> None: + exp = 'starting mock emulator' + path = os.path.join(self._wdir.name, 'mock-emu.log') + deadline = time.monotonic() + 100 + while os.path.getsize(path) < len(exp): + time.sleep(0.1) + if time.monotonic() > deadline: + break + + with open(os.path.join(self._wdir.name, 'mock-emu.log'), 'rt') as file: + data = file.read() + self.assertTrue(exp in data, data) + + +class TestPrePostStartCmds(ConfigHelperWithLauncher): + """Tests for configurations with pre-start commands.""" + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'targets': { + 'test-target': { + 'pre-start-cmds': { + 'pre-1': _mock_emu + ['pre-1'], + 'pre-2': _mock_emu + ['$pw_emu_wdir{pre-2}'], + }, + 'post-start-cmds': { + 'post-1': _mock_emu + + ['$pw_emu_channel_path{test_subst_pty}'], + 'post-2': _mock_emu + + [ + '$pw_emu_channel_host{test_subst_tcp}', + '$pw_emu_channel_port{test_subst_tcp}', + ], + }, + 'mock-emu': {}, + } + }, + } + + def setUp(self) -> None: + super().setUp() + self._connector = self._launcher.start( + Path(self._wdir.name), 'test-target' + ) + + def tearDown(self) -> None: + self._connector.stop() + super().tearDown() + + def test_running(self) -> None: + for proc in self._connector.get_procs().keys(): + self.assertTrue(self._connector.proc_running(proc)) + + def test_stop(self) -> None: + self._connector.stop() + for proc in self._connector.get_procs().keys(): + self.assertFalse(self._connector.proc_running(proc)) + + def test_pid_files(self) -> None: + for proc in ['pre-1', 'pre-2', 'post-1', 'post-2']: + self.assertTrue( + os.path.exists(os.path.join(self._wdir.name, f'{proc}.pid')) + ) + + def test_logs(self): + expect = { + 'pre-1.log': 'pre-1', + 'pre-2.log': os.path.join(self._wdir.name, 'pre-2'), + 'post-1.log': 'pty-path', + 'post-2.log': 'localhost 1234', + } + + for log, pattern in expect.items(): + path = os.path.join(self._wdir.name, log) + deadline = time.monotonic() + 100 + while os.path.getsize(path) < len(pattern): + time.sleep(0.1) + if time.monotonic() > deadline: + break + with open(os.path.join(self._wdir.name, log)) as file: + data = file.read() + self.assertTrue(pattern in data, f'`{pattern}` not in `{data}`') + + +class TestStop(ConfigHelperWithLauncher): + """Stop tests for valid config.""" + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._connector = self._launcher.start( + Path(self._wdir.name), 'test-target' + ) + self._connector.stop() + + def test_pid_files(self) -> None: + self.assertFalse( + os.path.exists(os.path.join(self._wdir.name, 'emu.pid')) + ) + + def test_target_file(self) -> None: + self.assertFalse( + os.path.exists(os.path.join(self._wdir.name, 'target')) + ) + + def test_running(self) -> None: + self.assertFalse(self._connector.running()) + + +class TestChannels(ConfigHelperWithLauncher): + """Test Connector channels APIs.""" + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'mock-emu': { + 'tcp_channel': True, + 'pty_channel': sys.platform != 'win32', + }, + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._connector = self._launcher.start( + Path(self._wdir.name), 'test-target' + ) + + def tearDown(self) -> None: + self._connector.stop() + super().tearDown() + + def test_tcp_channel_addr(self) -> None: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(self._connector.get_channel_addr('tcp')) + sock.close() + + def test_get_channel_type(self) -> None: + self.assertEqual(self._connector.get_channel_type('tcp'), 'tcp') + if sys.platform != 'win32': + self.assertEqual(self._connector.get_channel_type('pty'), 'pty') + with self.assertRaises(InvalidChannelName): + self._connector.get_channel_type('invalid channel') + + def test_pty_channel_path(self) -> None: + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + self.assertTrue(os.path.exists(self._connector.get_channel_path('pty'))) + with self.assertRaises(InvalidChannelType): + self._connector.get_channel_path('tcp') + + def _test_stream(self, stream: io.RawIOBase) -> None: + for char in [b'1', b'2', b'3', b'4']: + stream.write(char) + self.assertEqual(stream.read(1), char) + + def test_tcp_stream(self) -> None: + with self._connector.get_channel_stream('tcp') as stream: + self._test_stream(stream) + + def test_pty_stream(self) -> None: + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + with self._connector.get_channel_stream('pty') as stream: + self._test_stream(stream) + + +class TestTargetFragments(unittest.TestCase): + """Tests for configurations using target fragments.""" + + _config_templ: Dict[str, Any] = { + 'pw': { + 'pw_emu': { + 'target_files': [], + 'targets': { + 'test-target': { + 'test-key': 'test-value', + } + }, + } + } + } + + _tf1_config: Dict[str, Any] = { + 'targets': { + 'test-target1': {}, + 'test-target': { + 'test-key': 'test-value-file1', + }, + } + } + + _tf2_config: Dict[str, Any] = {'targets': {'test-target2': {}}} + + def setUp(self) -> None: + with tempfile.NamedTemporaryFile( + 'wt', delete=False + ) as config_file, tempfile.NamedTemporaryFile( + 'wt', delete=False + ) as targets1_file, tempfile.NamedTemporaryFile( + 'wt', delete=False + ) as targets2_file: + self._config_path = config_file.name + self._targets1_path = targets1_file.name + self._targets2_path = targets2_file.name + json.dump(self._tf1_config, targets1_file) + json.dump(self._tf2_config, targets2_file) + config = copy.deepcopy(self._config_templ) + config['pw']['pw_emu']['target_files'].append(self._targets1_path) + config['pw']['pw_emu']['target_files'].append(self._targets2_path) + json.dump(config, config_file) + self._config = Config(Path(self._config_path)) + + def tearDown(self) -> None: + os.unlink(self._config_path) + os.unlink(self._targets1_path) + os.unlink(self._targets2_path) + + def test_targets_loaded(self) -> None: + self.assertIsNotNone(self._config.get(['targets', 'test-target'])) + self.assertIsNotNone(self._config.get(['targets', 'test-target1'])) + self.assertIsNotNone(self._config.get(['targets', 'test-target2'])) + + def test_targets_priority(self) -> None: + self.assertEqual( + self._config.get(['targets', 'test-target', 'test-key']), + 'test-value', + ) + + +class TestHandles(unittest.TestCase): + """Tests for Handles.""" + + _config = { + 'emu': 'mock-emu', + 'config': 'test-config', + 'target': 'test-target', + 'gdb_cmd': ['test-gdb'], + 'channels': { + 'tcp_chan': {'type': 'tcp', 'host': 'localhost', 'port': 1234}, + 'pty_chan': {'type': 'pty', 'path': 'path'}, + }, + 'procs': {'proc0': {'pid': 1983}, 'proc1': {'pid': 1234}}, + } + + def test_serialize(self): + handles = Handles('mock-emu', 'test-config') + handles.add_channel_tcp('tcp_chan', 'localhost', 1234) + handles.add_channel_pty('pty_chan', 'path') + handles.add_proc('proc0', 1983) + handles.add_proc('proc1', 1234) + handles.set_target('test-target') + handles.set_gdb_cmd(['test-gdb']) + tmp = tempfile.TemporaryDirectory() + handles.save(Path(tmp.name)) + with open(os.path.join(tmp.name, 'handles.json'), 'rt') as file: + self.assertTrue(json.load(file) == self._config) + tmp.cleanup() + + def test_load(self): + tmp = tempfile.TemporaryDirectory() + with open(os.path.join(tmp.name, 'handles.json'), 'wt') as file: + json.dump(self._config, file) + handles = Handles.load(Path(tmp.name)) + self.assertEqual(handles.emu, 'mock-emu') + self.assertEqual(handles.gdb_cmd, ['test-gdb']) + self.assertEqual(handles.target, 'test-target') + self.assertEqual(handles.config, 'test-config') + tmp.cleanup() + + +class TestConfig(ConfigHelper): + """Stop tests for valid config.""" + + _config: Dict[str, Any] = { + 'top': 'entry', + 'multi': { + 'level': { + 'entry': 0, + }, + }, + 'subst': 'a/$pw_env{PW_EMU_TEST_ENV_SUBST}/c', + 'targets': { + 'test-target': { + 'entry': [1, 2, 3], + 'mock-emu': { + 'entry': 'test', + }, + } + }, + 'mock-emu': { + 'executable': _mock_emu, + }, + 'list': ['a', '$pw_env{PW_EMU_TEST_ENV_SUBST}', 'c'], + 'bad-subst-type': '$pw_bad_subst_type{test}', + } + + def setUp(self) -> None: + super().setUp() + self._cfg = Config(Path(self._config_file), 'test-target', 'mock-emu') + + def test_top_entry(self) -> None: + self.assertEqual(self._cfg.get(['top']), 'entry') + + def test_empty_subst(self) -> None: + with self.assertRaises(ConfigError): + self._cfg.get(['subst']) + + def test_subst(self) -> None: + with patch.dict('os.environ', {'PW_EMU_TEST_ENV_SUBST': 'b'}): + self.assertEqual(self._cfg.get(['subst']), 'a/b/c') + + def test_multi_level_entry(self) -> None: + self.assertEqual(self._cfg.get(['multi', 'level', 'entry']), 0) + + def test_get_target(self) -> None: + self.assertEqual(self._cfg.get_targets(), ['test-target']) + + def test_target(self) -> None: + self.assertEqual(self._cfg.get_target(['entry']), [1, 2, 3]) + + def test_target_emu(self) -> None: + self.assertEqual(self._cfg.get_target_emu(['entry']), 'test') + + def test_type_checking(self) -> None: + with self.assertRaises(ConfigError): + self._cfg.get(['top'], entry_type=int) + self._cfg.get(['top'], entry_type=str) + self._cfg.get_target(['entry'], entry_type=list) + self._cfg.get_target_emu(['entry'], entry_type=str) + self._cfg.get(['targets'], entry_type=dict) + + def test_non_optional(self) -> None: + with self.assertRaises(ConfigError): + self._cfg.get(['non-existing'], optional=False) + + def test_optional(self) -> None: + self.assertEqual(self._cfg.get(['non-existing']), None) + self.assertEqual(self._cfg.get(['non-existing'], entry_type=int), 0) + self.assertEqual(self._cfg.get(['non-existing'], entry_type=str), '') + self.assertEqual(self._cfg.get(['non-existing'], entry_type=list), []) + + def test_list(self) -> None: + with self.assertRaises(ConfigError): + self._cfg.get(['list']) + with patch.dict('os.environ', {'PW_EMU_TEST_ENV_SUBST': 'b'}): + self.assertEqual(self._cfg.get(['list']), ['a', 'b', 'c']) + + def test_bad_subst(self) -> None: + with self.assertRaises(ConfigError): + self._cfg.get(['bad-subst-type']) + + +if __name__ == '__main__': + unittest.main() diff --git a/pw_emu/py/tests/frontend_test.py b/pw_emu/py/tests/frontend_test.py new file mode 100644 index 000000000..36ea3d7b2 --- /dev/null +++ b/pw_emu/py/tests/frontend_test.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Emulator API tests.""" + +import unittest + +from typing import Any, Dict + +from pw_emu.core import ( + ConfigError, + InvalidChannelType, + InvalidProperty, + InvalidPropertyPath, +) +from mock_emu_frontend import _mock_emu +from tests.common import ConfigHelperWithEmulator + + +class TestEmulator(ConfigHelperWithEmulator): + """Test Emulator APIs.""" + + _config = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'mock-emu': { + 'gdb_channel': True, + }, + 'gdb': _mock_emu + ['--exit', '--'], + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._emu.start('test-target') + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_gdb_target_remote(self) -> None: + output = self._emu.run_gdb_cmds([]).stdout + host, port = self._emu.get_channel_addr('gdb') + self.assertTrue(f'{host}:{port}' in output.decode('utf-8')) + + def test_gdb_commands(self) -> None: + output = self._emu.run_gdb_cmds(['test_gdb_cmd']).stdout + self.assertTrue( + output and '-ex test_gdb_cmd' in output.decode('utf-8'), output + ) + + def test_gdb_executable(self) -> None: + output = self._emu.run_gdb_cmds([], 'test_gdb_exec').stdout + self.assertTrue('test_gdb_exec' in output.decode('utf-8')) + + def test_gdb_pause(self) -> None: + output = self._emu.run_gdb_cmds([], pause=True).stdout + self.assertTrue('-ex disconnect' in output.decode('utf-8')) + + # Minimal testing for APIs that are straight wrappers over Connector APIs. + def test_running(self) -> None: + self.assertTrue(self._emu.running()) + + def test_reset(self) -> None: + self._emu.reset() + + def test_cont(self) -> None: + self._emu.cont() + + def test_list_properties(self) -> None: + with self.assertRaises(InvalidPropertyPath): + self._emu.list_properties('invalid path') + self.assertEqual(self._emu.list_properties('path1'), ['prop1']) + + def test_get_property(self) -> None: + with self.assertRaises(InvalidProperty): + self._emu.get_property('path1', 'invalid property') + with self.assertRaises(InvalidPropertyPath): + self._emu.get_property('invalid path', 'prop1') + self.assertEqual(self._emu.get_property('path1', 'prop1'), 'val1') + + def test_set_property(self) -> None: + with self.assertRaises(InvalidPropertyPath): + self._emu.set_property('invalid path', 'prop1', 'test') + with self.assertRaises(InvalidProperty): + self._emu.set_property('path1', 'invalid property', 'test') + self._emu.set_property('path1', 'prop1', 'val2') + self.assertEqual(self._emu.get_property('path1', 'prop1'), 'val2') + + def test_get_channel_type(self) -> None: + self.assertEqual(self._emu.get_channel_type('gdb'), 'tcp') + + def test_get_channel_path(self) -> None: + with self.assertRaises(InvalidChannelType): + self._emu.get_channel_path('gdb') + + def test_get_channel_addr(self) -> None: + self.assertEqual(len(self._emu.get_channel_addr('gdb')), 2) + + def test_channel_stream(self) -> None: + with self._emu.get_channel_stream('gdb') as _: + pass + + +class TestGdbEmptyConfig(ConfigHelperWithEmulator): + """Check that ConfigError is raised when running gdb with an empty + gdb config. + + """ + + _config: Dict[str, Any] = { + 'emulators': { + 'mock-emu': { + 'launcher': 'mock_emu_frontend.MockEmuLauncher', + 'connector': 'mock_emu_frontend.MockEmuConnector', + } + }, + 'targets': {'test-target': {'mock-emu': {}}}, + } + + def setUp(self) -> None: + super().setUp() + self._emu.start('test-target') + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_gdb_config_error(self) -> None: + with self.assertRaises(ConfigError): + self._emu.run_gdb_cmds([]) + + +if __name__ == '__main__': + unittest.main() diff --git a/pw_emu/py/tests/py.typed b/pw_emu/py/tests/py.typed new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/pw_emu/py/tests/py.typed diff --git a/pw_emu/py/tests/qemu_test.py b/pw_emu/py/tests/qemu_test.py new file mode 100644 index 000000000..c84fb2b6d --- /dev/null +++ b/pw_emu/py/tests/qemu_test.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""QEMU emulator tests.""" + +import json +import os +import socket +import sys +import tempfile +import time +import unittest + +from pathlib import Path +from typing import Any, Dict, Optional + +from pw_emu.core import InvalidChannelName, InvalidChannelType +from tests.common import check_prog, ConfigHelperWithEmulator + + +# TODO: b/301382004 - The Python Pigweed package install (into python-venv) +# races with running this test and there is no way to add that package as a test +# depedency without creating circular depedencies. This means we can't rely on +# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. +# +# run the arm_gdb.py wrapper directly +_arm_none_eabi_gdb_path = Path( + os.path.join( + os.environ['PW_ROOT'], + 'pw_env_setup', + 'py', + 'pw_env_setup', + 'entry_points', + 'arm_gdb.py', + ) +).resolve() + + +class TestQemu(ConfigHelperWithEmulator): + """Tests for a valid qemu configuration.""" + + _config = { + 'gdb': ['python', str(_arm_none_eabi_gdb_path)], + 'qemu': { + 'executable': 'qemu-system-arm', + }, + 'targets': { + 'test-target': { + 'ignore1': None, + 'qemu': { + 'machine': 'lm3s6965evb', + 'channels': { + 'chardevs': { + 'test_uart': { + 'id': 'serial0', + } + } + }, + }, + 'ignore2': None, + } + }, + } + + def setUp(self) -> None: + super().setUp() + # No image so start paused to avoid crashing. + self._emu.start(target='test-target', pause=True) + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_running(self) -> None: + self.assertTrue(self._emu.running()) + + def test_list_properties(self) -> None: + self.assertIsNotNone(self._emu.list_properties('/machine')) + + def test_get_property(self) -> None: + self.assertEqual( + self._emu.get_property('/machine', 'type'), 'lm3s6965evb-machine' + ) + + def test_set_property(self) -> None: + self._emu.set_property('/machine', 'graphics', False) + self.assertFalse(self._emu.get_property('/machine', 'graphics')) + + def test_bad_channel_name(self) -> None: + with self.assertRaises(InvalidChannelName): + self._emu.get_channel_addr('serial1') + + def get_reg(self, addr: int) -> bytes: + temp = tempfile.NamedTemporaryFile(delete=False) + temp.close() + + res = self._emu.run_gdb_cmds( + [ + f'dump val {temp.name} *(char*){addr}', + 'disconnect', + ] + ) + self.assertEqual(res.returncode, 0, res.stderr.decode('ascii')) + + with open(temp.name, 'rb') as file: + ret = file.read(1) + + self.assertNotEqual(ret, b'', res.stderr.decode('ascii')) + + os.unlink(temp.name) + + return ret + + def poll_data(self, timeout: int) -> Optional[bytes]: + uartris = 0x4000C03C + uartrd = 0x4000C000 + + deadline = time.monotonic() + timeout + while self.get_reg(uartris) == b'\x00': + time.sleep(0.1) + if time.monotonic() > deadline: + return None + return self.get_reg(uartrd) + + def test_channel_stream(self) -> None: + ok, msg = check_prog('arm-none-eabi-gdb') + if not ok: + self.skipTest(msg) + + stream = self._emu.get_channel_stream('test_uart') + stream.write('test\n'.encode('ascii')) + + self.assertEqual(self.poll_data(5), b't') + self.assertEqual(self.poll_data(5), b'e') + self.assertEqual(self.poll_data(5), b's') + self.assertEqual(self.poll_data(5), b't') + + def test_gdb(self) -> None: + self._emu.run_gdb_cmds(['c']) + deadline = time.monotonic() + 5 + while self._emu.running(): + if time.monotonic() > deadline: + return + self.assertFalse(self._emu.running()) + + +class TestQemuChannelsTcp(TestQemu): + """Tests for configurations using TCP channels.""" + + _config: Dict[str, Any] = {} + _config.update(json.loads(json.dumps(TestQemu._config))) + _config['qemu']['channels'] = {'type': 'tcp'} + + def test_get_channel_addr(self) -> None: + host, port = self._emu.get_channel_addr('test_uart') + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.close() + + +class TestQemuChannelsPty(TestQemu): + """Tests for configurations using PTY channels.""" + + _config: Dict[str, Any] = {} + _config.update(json.loads(json.dumps(TestQemu._config))) + _config['qemu']['channels'] = {'type': 'pty'} + + def setUp(self): + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + super().setUp() + + def test_get_path(self) -> None: + self.assertTrue(os.path.exists(self._emu.get_channel_path('test_uart'))) + + +class TestQemuInvalidChannelType(ConfigHelperWithEmulator): + """Test invalid channel type configuration.""" + + _config = { + 'qemu': { + 'executable': 'qemu-system-arm', + 'channels': {'type': 'invalid'}, + }, + 'targets': { + 'test-target': { + 'qemu': { + 'machine': 'lm3s6965evb', + } + } + }, + } + + def test_start(self) -> None: + with self.assertRaises(InvalidChannelType): + self._emu.start('test-target', pause=True) + + +class TestQemuTargetChannelsMixed(ConfigHelperWithEmulator): + """Test configuration with mixed channels types.""" + + _config = { + 'qemu': { + 'executable': 'qemu-system-arm', + }, + 'targets': { + 'test-target': { + 'qemu': { + 'machine': 'lm3s6965evb', + 'channels': { + 'chardevs': { + 'test_uart0': { + 'id': 'serial0', + }, + 'test_uart1': { + 'id': 'serial1', + 'type': 'tcp', + }, + 'test_uart2': { + 'id': 'serial2', + 'type': 'pty', + }, + } + }, + } + } + }, + } + + def setUp(self) -> None: + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + super().setUp() + # no image to run so start paused + self._emu.start('test-target', pause=True) + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_uart0_addr(self) -> None: + host, port = self._emu.get_channel_addr('test_uart0') + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.close() + + def test_uart1_addr(self) -> None: + host, port = self._emu.get_channel_addr('test_uart1') + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.close() + + def test_uart2_path(self) -> None: + self.assertTrue( + os.path.exists(self._emu.get_channel_path('test_uart2')) + ) + + +def main() -> None: + ok, msg = check_prog('qemu-system-arm') + if not ok: + print(f'skipping tests: {msg}') + sys.exit(0) + + unittest.main() + + +if __name__ == '__main__': + main() diff --git a/pw_emu/py/tests/renode_test.py b/pw_emu/py/tests/renode_test.py new file mode 100644 index 000000000..acb9e47a9 --- /dev/null +++ b/pw_emu/py/tests/renode_test.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""renode emulator tests.""" + +import json +import os +import sys +import struct +import tempfile +import time +import unittest + +from pathlib import Path +from typing import Any, Dict, Optional + +from pw_emu.core import InvalidChannelName, InvalidChannelType +from tests.common import check_prog, ConfigHelperWithEmulator + + +# TODO: b/301382004 - The Python Pigweed package install (into python-venv) +# races with running this test and there is no way to add that package as a test +# depedency without creating circular depedencies. This means we can't rely on +# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. +# +# run the arm_gdb.py wrapper directly +_arm_none_eabi_gdb_path = Path( + os.path.join( + os.environ['PW_ROOT'], + 'pw_env_setup', + 'py', + 'pw_env_setup', + 'entry_points', + 'arm_gdb.py', + ) +).resolve() + + +class TestRenode(ConfigHelperWithEmulator): + """Tests for a valid renode configuration.""" + + _config = { + 'gdb': ['python', str(_arm_none_eabi_gdb_path)], + 'renode': { + 'executable': 'renode', + }, + 'targets': { + 'test-target': { + 'renode': { + 'machine': 'platforms/boards/stm32f4_discovery-kit.repl', + } + } + }, + } + + def setUp(self) -> None: + super().setUp() + # no image to run so start paused + self._emu.start(target='test-target', pause=True) + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_running(self) -> None: + self.assertTrue(self._emu.running()) + + def test_list_properties(self) -> None: + self.assertIsNotNone(self._emu.list_properties('sysbus.usart1')) + + def test_get_property(self) -> None: + self.assertIsNotNone( + self._emu.get_property('sysbus.usart1', 'BaudRate') + ) + + def test_set_property(self) -> None: + self._emu.set_property('sysbus.timer1', 'Frequency', 100) + self.assertEqual( + int(self._emu.get_property('sysbus.timer1', 'Frequency'), 16), 100 + ) + + +class TestRenodeInvalidChannelType(ConfigHelperWithEmulator): + """Test invalid channel type configuration.""" + + _config = { + 'renode': { + 'executable': 'renode', + }, + 'targets': { + 'test-target': { + 'renode': { + 'machine': 'platforms/boards/stm32f4_discovery-kit.repl', + 'channels': { + 'terminals': { + 'test_uart': { + 'device-path': 'sysbus.usart1', + 'type': 'invalid', + } + } + }, + } + } + }, + } + + def test_start(self) -> None: + with self.assertRaises(InvalidChannelType): + self._emu.start('test-target', pause=True) + + +class TestRenodeChannels(ConfigHelperWithEmulator): + """Tests for a valid renode channels configuration.""" + + _config = { + 'gdb': ['python', str(_arm_none_eabi_gdb_path)], + 'renode': { + 'executable': 'renode', + }, + 'targets': { + 'test-target': { + 'renode': { + 'machine': 'platforms/boards/stm32f4_discovery-kit.repl', + 'channels': { + 'terminals': { + 'test_uart': { + 'device-path': 'sysbus.usart1', + } + } + }, + } + } + }, + } + + def setUp(self) -> None: + super().setUp() + # no image to run so start paused + self._emu.start(target='test-target', pause=True) + + def tearDown(self) -> None: + self._emu.stop() + super().tearDown() + + def test_bad_channel_name(self) -> None: + with self.assertRaises(InvalidChannelName): + self._emu.get_channel_addr('serial1') + + def set_reg(self, addr: int, val: int) -> None: + self._emu.run_gdb_cmds([f'set *(unsigned int*){addr}={val}']) + + def get_reg(self, addr: int) -> int: + temp = tempfile.NamedTemporaryFile(delete=False) + temp.close() + + res = self._emu.run_gdb_cmds( + [ + f'dump val {temp.name} *(char*){addr}', + 'disconnect', + ] + ) + self.assertEqual(res.returncode, 0, res.stderr.decode('ascii')) + + with open(temp.name, 'rb') as file: + ret = file.read(1) + + self.assertNotEqual(ret, b'', res.stderr.decode('ascii')) + + os.unlink(temp.name) + + return struct.unpack('B', ret)[0] + + def poll_data(self, timeout: int) -> Optional[int]: + usart_sr = 0x40011000 + usart_dr = 0x40011004 + deadline = time.monotonic() + timeout + while self.get_reg(usart_sr) & 0x20 == 0: + time.sleep(0.1) + if time.monotonic() > deadline: + return None + return self.get_reg(usart_dr) + + def test_channel_stream(self) -> None: + ok, msg = check_prog('arm-none-eabi-gdb') + if not ok: + self.skipTest(msg) + + usart_cr1 = 0x4001100C + # enable RX and TX + self.set_reg(usart_cr1, 0xC) + + stream = self._emu.get_channel_stream('test_uart') + stream.write('test\n'.encode('ascii')) + + self.assertEqual(self.poll_data(5), ord('t')) + self.assertEqual(self.poll_data(5), ord('e')) + self.assertEqual(self.poll_data(5), ord('s')) + self.assertEqual(self.poll_data(5), ord('t')) + + +class TestRenodeChannelsPty(TestRenodeChannels): + """Tests for configurations using PTY channels.""" + + _config: Dict[str, Any] = {} + _config.update(json.loads(json.dumps(TestRenodeChannels._config))) + _config['renode']['channels'] = {'terminals': {'type': 'pty'}} + + def setUp(self): + if sys.platform == 'win32': + self.skipTest('pty not supported on win32') + super().setUp() + + def test_get_path(self) -> None: + self.assertTrue(os.path.exists(self._emu.get_channel_path('test_uart'))) + + +def main() -> None: + ok, msg = check_prog('renode') + if not ok: + print(f'skipping tests: {msg}') + sys.exit(0) + + unittest.main() + + +if __name__ == '__main__': + main() |