diff options
author | Armando Montanez <amontanez@google.com> | 2022-03-30 14:57:15 -0700 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-04-02 00:06:14 +0000 |
commit | 5fac8d6b6c9eb2a45bff6c4be0e7e28b4f179fe1 (patch) | |
tree | 89d6cc03cc2a4b35be83ec3f1ec54718588f86ae | |
parent | 92511aace1c2caaf521bc15c777fed19912aa4e9 (diff) | |
download | pigweed-5fac8d6b6c9eb2a45bff6c4be0e7e28b4f179fe1.tar.gz |
pw_rpc: Add ChannelManipulator
Adds a channel-level processor so RPC packets can be manipulated at
ingress and egress points, increasing testing flexibility.
Change-Id: If638910d534b41abfb7e69d52b0b46d42e8ebf11
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/89383
Pigweed-Auto-Submit: Armando Montanez <amontanez@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
-rw-r--r-- | pw_hdlc/py/pw_hdlc/rpc.py | 78 | ||||
-rw-r--r-- | pw_rpc/py/docs.rst | 7 | ||||
-rw-r--r-- | pw_rpc/py/pw_rpc/__init__.py | 2 | ||||
-rw-r--r-- | pw_rpc/py/pw_rpc/descriptors.py | 52 | ||||
-rwxr-xr-x | pw_transfer/py/tests/python_cpp_transfer_test.py | 25 |
5 files changed, 109 insertions, 55 deletions
diff --git a/pw_hdlc/py/pw_hdlc/rpc.py b/pw_hdlc/py/pw_hdlc/rpc.py index 786f09702..b41275331 100644 --- a/pw_hdlc/py/pw_hdlc/rpc.py +++ b/pw_hdlc/py/pw_hdlc/rpc.py @@ -139,7 +139,8 @@ class HdlcRpcClient: output: Callable[[bytes], Any] = write_to_file, client_impl: pw_rpc.client.ClientImpl = None, *, - _incoming_packet_filter_for_testing: '_PacketFilter' = None): + _incoming_packet_filter_for_testing: pw_rpc. + ChannelManipulator = None): """Creates an RPC client configured to communicate using HDLC. Args: @@ -159,10 +160,13 @@ class HdlcRpcClient: self.client = pw_rpc.Client.from_modules(client_impl, channels, self.protos.modules()) - self._test_filter = _incoming_packet_filter_for_testing + rpc_output: Callable[[bytes], Any] = self._handle_rpc_packet + if _incoming_packet_filter_for_testing is not None: + _incoming_packet_filter_for_testing.send_packet = rpc_output + rpc_output = _incoming_packet_filter_for_testing frame_handlers: FrameHandlers = { - DEFAULT_ADDRESS: self._handle_rpc_packet, + DEFAULT_ADDRESS: lambda frame: rpc_output(frame.data), STDOUT_ADDRESS: lambda frame: output(frame.data), } @@ -184,12 +188,9 @@ class HdlcRpcClient: return self.client.channel(channel_id).rpcs - def _handle_rpc_packet(self, frame: Frame) -> None: - if self._test_filter and not self._test_filter.keep_packet(frame.data): - return - - if not self.client.process_packet(frame.data): - _LOG.error('Packet not handled by RPC client: %s', frame.data) + def _handle_rpc_packet(self, packet: bytes) -> None: + if not self.client.process_packet(packet): + _LOG.error('Packet not handled by RPC client: %s', packet) def _try_connect(port: int, attempts: int = 10) -> socket.socket: @@ -241,16 +242,21 @@ class SocketSubprocess: self.close() -class _PacketFilter: +class PacketFilter(pw_rpc.ChannelManipulator): """Determines if a packet should be kept or dropped for testing purposes.""" _Action = Callable[[int], Tuple[bool, bool]] _KEEP = lambda _: (True, False) _DROP = lambda _: (False, False) def __init__(self, name: str) -> None: + super().__init__() self.name = name self.packet_count = 0 - self._actions: Deque[_PacketFilter._Action] = collections.deque() + self._actions: Deque[PacketFilter._Action] = collections.deque() + + def process_and_send(self, packet: bytes): + if self.keep_packet(packet): + self.send_packet(packet) def reset(self) -> None: self.packet_count = 0 @@ -258,11 +264,11 @@ class _PacketFilter: def keep(self, count: int) -> None: """Keeps the next count packets.""" - self._actions.extend(_PacketFilter._KEEP for _ in range(count)) + self._actions.extend(PacketFilter._KEEP for _ in range(count)) def drop(self, count: int) -> None: """Drops the next count packets.""" - self._actions.extend(_PacketFilter._DROP for _ in range(count)) + self._actions.extend(PacketFilter._DROP for _ in range(count)) def drop_every(self, every: int) -> None: """Drops every Nth packet forever.""" @@ -290,33 +296,21 @@ class _PacketFilter: return keep -class _TestChannelOutput: - def __init__(self, send: Callable[[bytes], Any]) -> None: - self._send = send - self.packets = _PacketFilter('outgoing RPC') - - def __call__(self, data: bytes) -> None: - if self.packets.keep_packet(data): - self._send(data) - - class HdlcRpcLocalServerAndClient: """Runs an RPC server in a subprocess and connects to it over a socket. This can be used to run a local RPC server in an integration test. """ - def __init__(self, - server_command: Sequence, - port: int, - protos: PathsModulesOrProtoLibrary, - *, - for_testing: bool = False) -> None: - """Creates a new HdlcRpcLocalServerAndClient. - - If for_testing=True, the HdlcRpcLocalServerAndClient will have - outgoing_packets and incoming_packets _PacketFilter members that can be - used to program packet loss for testing purposes. - """ + def __init__( + self, + server_command: Sequence, + port: int, + protos: PathsModulesOrProtoLibrary, + *, + incoming_processor: Optional[pw_rpc.ChannelManipulator] = None, + outgoing_processor: Optional[pw_rpc.ChannelManipulator] = None + ) -> None: + """Creates a new HdlcRpcLocalServerAndClient.""" self.server = SocketSubprocess(server_command, port) @@ -327,20 +321,18 @@ class HdlcRpcLocalServerAndClient: self.output = io.BytesIO() self.channel_output: Any = self.server.socket.sendall - if for_testing: - self.channel_output = _TestChannelOutput(self.channel_output) - self.outgoing_packets = self.channel_output.packets - self.incoming_packets = _PacketFilter('incoming RPC') - incoming_filter: Optional[_PacketFilter] = self.incoming_packets - else: - incoming_filter = None + + self._incoming_processor = incoming_processor + if outgoing_processor is not None: + outgoing_processor.send_packet = self.channel_output + self.channel_output = outgoing_processor self.client = HdlcRpcClient( self._bytes_queue.get, protos, default_channels(self.channel_output), self.output.write, - _incoming_packet_filter_for_testing=incoming_filter).client + _incoming_packet_filter_for_testing=incoming_processor).client def _read_from_socket(self): while True: diff --git a/pw_rpc/py/docs.rst b/pw_rpc/py/docs.rst index 9d29a107c..2c0e3060e 100644 --- a/pw_rpc/py/docs.rst +++ b/pw_rpc/py/docs.rst @@ -23,6 +23,13 @@ pw_rpc.callback_client ClientStreamingCall, BidirectionalStreamingCall, +pw_rpc.descriptors +================== +.. automodule:: pw_rpc.descriptors + :members: + Channel, + ChannelManipulator, + pw_rpc.console_tools ==================== .. automodule:: pw_rpc.console_tools diff --git a/pw_rpc/py/pw_rpc/__init__.py b/pw_rpc/py/pw_rpc/__init__.py index 1f1e72e41..ff1f8713b 100644 --- a/pw_rpc/py/pw_rpc/__init__.py +++ b/pw_rpc/py/pw_rpc/__init__.py @@ -14,4 +14,4 @@ """Package for calling Pigweed RPCs from Python.""" from pw_rpc.client import Client -from pw_rpc.descriptors import Channel +from pw_rpc.descriptors import Channel, ChannelManipulator diff --git a/pw_rpc/py/pw_rpc/descriptors.py b/pw_rpc/py/pw_rpc/descriptors.py index fdae73211..57ba98458 100644 --- a/pw_rpc/py/pw_rpc/descriptors.py +++ b/pw_rpc/py/pw_rpc/descriptors.py @@ -13,6 +13,7 @@ # the License. """Types representing the basic pw_rpc concepts: channel, service, method.""" +import abc from dataclasses import dataclass import enum from inspect import Parameter @@ -37,6 +38,57 @@ class Channel: return f'Channel({self.id})' +class ChannelManipulator(abc.ABC): + """A a pipe interface that may manipulate packets before they're sent. + + ``ChannelManipulator``s allow application-specific packet handling to be + injected into the packet processing pipeline for an ingress or egress + channel-like pathway. This is particularly useful for integration testing + resilience to things like packet loss on a usually-reliable transport. RPC + server integrations (e.g. ``HdlcRpcLocalServerAndClient``) may provide an + opportunity to inject a ``ChannelManipulator`` for this use case. + + A ``ChannelManipulator`` should not modify send_packet, as the consumer of a + ``ChannelManipulator`` will use ``send_packet`` to insert the provided + ``ChannelManipulator`` into a packet processing path. + + For example: + + .. code-block:: python + + class PacketLogger(ChannelManipulator): + def process_and_send(self, packet: bytes) -> None: + _LOG.debug('Received packet with payload: %s', str(packet)) + self.send_packet(packet) + + + packet_logger = PacketLogger() + + # Configure actual send command. + packet_logger.send_packet = socket.sendall + + # Route the output channel through the PacketLogger channel manipulator. + channels = tuple(Channel(_DEFAULT_CHANNEL, packet_logger)) + + # Create a RPC client. + client = HdlcRpcClient(socket.read, protos, channels, stdout) + """ + def __init__(self): + self.send_packet: Callable[[bytes], Any] = lambda _: None + + @abc.abstractmethod + def process_and_send(self, packet: bytes) -> None: + """Processes an incoming packet before optionally sending it. + + Implementations of this method may send the processed packet, multiple + packets, or no packets at all via the registered `send_packet()` + handler. + """ + + def __call__(self, data: bytes) -> None: + self.process_and_send(data) + + @dataclass(frozen=True, eq=False) class Service: """Describes an RPC service.""" diff --git a/pw_transfer/py/tests/python_cpp_transfer_test.py b/pw_transfer/py/tests/python_cpp_transfer_test.py index 136a18844..62e0d14eb 100755 --- a/pw_transfer/py/tests/python_cpp_transfer_test.py +++ b/pw_transfer/py/tests/python_cpp_transfer_test.py @@ -44,10 +44,13 @@ class TransferServiceIntegrationTest(unittest.TestCase): self.directory = Path(self._tempdir.name) command = (*self.test_server_command, str(self.directory)) + self._outgoing_filter = rpc.PacketFilter('outgoing RPC') + self._incoming_filter = rpc.PacketFilter('incoming RPC') self._context = rpc.HdlcRpcLocalServerAndClient( command, self.port, [transfer_pb2, test_server_pb2], - for_testing=True) + outgoing_processor=self._outgoing_filter, + incoming_processor=self._incoming_filter) service = self._context.client.channel(1).rpcs.pw.transfer.Transfer self.manager = pw_transfer.Manager( @@ -148,12 +151,12 @@ class TransferServiceIntegrationTest(unittest.TestCase): self.set_content(34, 'junk') # Allow the initial packet and first chunk, then drop the second chunk. - self._context.outgoing_packets.keep(2) - self._context.outgoing_packets.drop(1) + self._outgoing_filter.keep(2) + self._outgoing_filter.drop(1) # Allow the initial transfer parameters updates, then drop the next two. - self._context.incoming_packets.keep(1) - self._context.incoming_packets.drop(2) + self._incoming_filter.keep(1) + self._incoming_filter.drop(2) with self.assertLogs('pw_transfer', 'DEBUG') as logs: self.manager.write(34, _DATA_4096B) @@ -169,8 +172,8 @@ class TransferServiceIntegrationTest(unittest.TestCase): def test_write_regularly_drop_packets(self) -> None: self.set_content(35, 'junk') - self._context.outgoing_packets.drop_every(5) # drop one per window - self._context.incoming_packets.drop_every(3) + self._outgoing_filter.drop_every(5) # drop one per window + self._incoming_filter.drop_every(3) self.manager.write(35, _DATA_4096B) @@ -184,16 +187,16 @@ class TransferServiceIntegrationTest(unittest.TestCase): self.set_content(seed, 'junk') rand = random.Random(seed) - self._context.incoming_packets.randomly_drop(3, rand) - self._context.outgoing_packets.randomly_drop(3, rand) + self._incoming_filter.randomly_drop(3, rand) + self._outgoing_filter.randomly_drop(3, rand) data = bytes( rand.randrange(256) for _ in range(rand.randrange(16384))) self.manager.write(seed, data) self.assertEqual(self.get_content(seed), data) - self._context.incoming_packets.reset() - self._context.outgoing_packets.reset() + self._incoming_filter.reset() + self._outgoing_filter.reset() def _main(test_server_command: List[str], port: int, |