aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArmando Montanez <amontanez@google.com>2022-03-30 14:57:15 -0700
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-04-02 00:06:14 +0000
commit5fac8d6b6c9eb2a45bff6c4be0e7e28b4f179fe1 (patch)
tree89d6cc03cc2a4b35be83ec3f1ec54718588f86ae
parent92511aace1c2caaf521bc15c777fed19912aa4e9 (diff)
downloadpigweed-5fac8d6b6c9eb2a45bff6c4be0e7e28b4f179fe1.tar.gz
pw_rpc: Add ChannelManipulator
Adds a channel-level processor so RPC packets can be manipulated at ingress and egress points, increasing testing flexibility. Change-Id: If638910d534b41abfb7e69d52b0b46d42e8ebf11 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/89383 Pigweed-Auto-Submit: Armando Montanez <amontanez@google.com> Reviewed-by: Alexei Frolov <frolv@google.com> Reviewed-by: Wyatt Hepler <hepler@google.com> Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
-rw-r--r--pw_hdlc/py/pw_hdlc/rpc.py78
-rw-r--r--pw_rpc/py/docs.rst7
-rw-r--r--pw_rpc/py/pw_rpc/__init__.py2
-rw-r--r--pw_rpc/py/pw_rpc/descriptors.py52
-rwxr-xr-xpw_transfer/py/tests/python_cpp_transfer_test.py25
5 files changed, 109 insertions, 55 deletions
diff --git a/pw_hdlc/py/pw_hdlc/rpc.py b/pw_hdlc/py/pw_hdlc/rpc.py
index 786f09702..b41275331 100644
--- a/pw_hdlc/py/pw_hdlc/rpc.py
+++ b/pw_hdlc/py/pw_hdlc/rpc.py
@@ -139,7 +139,8 @@ class HdlcRpcClient:
output: Callable[[bytes], Any] = write_to_file,
client_impl: pw_rpc.client.ClientImpl = None,
*,
- _incoming_packet_filter_for_testing: '_PacketFilter' = None):
+ _incoming_packet_filter_for_testing: pw_rpc.
+ ChannelManipulator = None):
"""Creates an RPC client configured to communicate using HDLC.
Args:
@@ -159,10 +160,13 @@ class HdlcRpcClient:
self.client = pw_rpc.Client.from_modules(client_impl, channels,
self.protos.modules())
- self._test_filter = _incoming_packet_filter_for_testing
+ rpc_output: Callable[[bytes], Any] = self._handle_rpc_packet
+ if _incoming_packet_filter_for_testing is not None:
+ _incoming_packet_filter_for_testing.send_packet = rpc_output
+ rpc_output = _incoming_packet_filter_for_testing
frame_handlers: FrameHandlers = {
- DEFAULT_ADDRESS: self._handle_rpc_packet,
+ DEFAULT_ADDRESS: lambda frame: rpc_output(frame.data),
STDOUT_ADDRESS: lambda frame: output(frame.data),
}
@@ -184,12 +188,9 @@ class HdlcRpcClient:
return self.client.channel(channel_id).rpcs
- def _handle_rpc_packet(self, frame: Frame) -> None:
- if self._test_filter and not self._test_filter.keep_packet(frame.data):
- return
-
- if not self.client.process_packet(frame.data):
- _LOG.error('Packet not handled by RPC client: %s', frame.data)
+ def _handle_rpc_packet(self, packet: bytes) -> None:
+ if not self.client.process_packet(packet):
+ _LOG.error('Packet not handled by RPC client: %s', packet)
def _try_connect(port: int, attempts: int = 10) -> socket.socket:
@@ -241,16 +242,21 @@ class SocketSubprocess:
self.close()
-class _PacketFilter:
+class PacketFilter(pw_rpc.ChannelManipulator):
"""Determines if a packet should be kept or dropped for testing purposes."""
_Action = Callable[[int], Tuple[bool, bool]]
_KEEP = lambda _: (True, False)
_DROP = lambda _: (False, False)
def __init__(self, name: str) -> None:
+ super().__init__()
self.name = name
self.packet_count = 0
- self._actions: Deque[_PacketFilter._Action] = collections.deque()
+ self._actions: Deque[PacketFilter._Action] = collections.deque()
+
+ def process_and_send(self, packet: bytes):
+ if self.keep_packet(packet):
+ self.send_packet(packet)
def reset(self) -> None:
self.packet_count = 0
@@ -258,11 +264,11 @@ class _PacketFilter:
def keep(self, count: int) -> None:
"""Keeps the next count packets."""
- self._actions.extend(_PacketFilter._KEEP for _ in range(count))
+ self._actions.extend(PacketFilter._KEEP for _ in range(count))
def drop(self, count: int) -> None:
"""Drops the next count packets."""
- self._actions.extend(_PacketFilter._DROP for _ in range(count))
+ self._actions.extend(PacketFilter._DROP for _ in range(count))
def drop_every(self, every: int) -> None:
"""Drops every Nth packet forever."""
@@ -290,33 +296,21 @@ class _PacketFilter:
return keep
-class _TestChannelOutput:
- def __init__(self, send: Callable[[bytes], Any]) -> None:
- self._send = send
- self.packets = _PacketFilter('outgoing RPC')
-
- def __call__(self, data: bytes) -> None:
- if self.packets.keep_packet(data):
- self._send(data)
-
-
class HdlcRpcLocalServerAndClient:
"""Runs an RPC server in a subprocess and connects to it over a socket.
This can be used to run a local RPC server in an integration test.
"""
- def __init__(self,
- server_command: Sequence,
- port: int,
- protos: PathsModulesOrProtoLibrary,
- *,
- for_testing: bool = False) -> None:
- """Creates a new HdlcRpcLocalServerAndClient.
-
- If for_testing=True, the HdlcRpcLocalServerAndClient will have
- outgoing_packets and incoming_packets _PacketFilter members that can be
- used to program packet loss for testing purposes.
- """
+ def __init__(
+ self,
+ server_command: Sequence,
+ port: int,
+ protos: PathsModulesOrProtoLibrary,
+ *,
+ incoming_processor: Optional[pw_rpc.ChannelManipulator] = None,
+ outgoing_processor: Optional[pw_rpc.ChannelManipulator] = None
+ ) -> None:
+ """Creates a new HdlcRpcLocalServerAndClient."""
self.server = SocketSubprocess(server_command, port)
@@ -327,20 +321,18 @@ class HdlcRpcLocalServerAndClient:
self.output = io.BytesIO()
self.channel_output: Any = self.server.socket.sendall
- if for_testing:
- self.channel_output = _TestChannelOutput(self.channel_output)
- self.outgoing_packets = self.channel_output.packets
- self.incoming_packets = _PacketFilter('incoming RPC')
- incoming_filter: Optional[_PacketFilter] = self.incoming_packets
- else:
- incoming_filter = None
+
+ self._incoming_processor = incoming_processor
+ if outgoing_processor is not None:
+ outgoing_processor.send_packet = self.channel_output
+ self.channel_output = outgoing_processor
self.client = HdlcRpcClient(
self._bytes_queue.get,
protos,
default_channels(self.channel_output),
self.output.write,
- _incoming_packet_filter_for_testing=incoming_filter).client
+ _incoming_packet_filter_for_testing=incoming_processor).client
def _read_from_socket(self):
while True:
diff --git a/pw_rpc/py/docs.rst b/pw_rpc/py/docs.rst
index 9d29a107c..2c0e3060e 100644
--- a/pw_rpc/py/docs.rst
+++ b/pw_rpc/py/docs.rst
@@ -23,6 +23,13 @@ pw_rpc.callback_client
ClientStreamingCall,
BidirectionalStreamingCall,
+pw_rpc.descriptors
+==================
+.. automodule:: pw_rpc.descriptors
+ :members:
+ Channel,
+ ChannelManipulator,
+
pw_rpc.console_tools
====================
.. automodule:: pw_rpc.console_tools
diff --git a/pw_rpc/py/pw_rpc/__init__.py b/pw_rpc/py/pw_rpc/__init__.py
index 1f1e72e41..ff1f8713b 100644
--- a/pw_rpc/py/pw_rpc/__init__.py
+++ b/pw_rpc/py/pw_rpc/__init__.py
@@ -14,4 +14,4 @@
"""Package for calling Pigweed RPCs from Python."""
from pw_rpc.client import Client
-from pw_rpc.descriptors import Channel
+from pw_rpc.descriptors import Channel, ChannelManipulator
diff --git a/pw_rpc/py/pw_rpc/descriptors.py b/pw_rpc/py/pw_rpc/descriptors.py
index fdae73211..57ba98458 100644
--- a/pw_rpc/py/pw_rpc/descriptors.py
+++ b/pw_rpc/py/pw_rpc/descriptors.py
@@ -13,6 +13,7 @@
# the License.
"""Types representing the basic pw_rpc concepts: channel, service, method."""
+import abc
from dataclasses import dataclass
import enum
from inspect import Parameter
@@ -37,6 +38,57 @@ class Channel:
return f'Channel({self.id})'
+class ChannelManipulator(abc.ABC):
+ """A a pipe interface that may manipulate packets before they're sent.
+
+ ``ChannelManipulator``s allow application-specific packet handling to be
+ injected into the packet processing pipeline for an ingress or egress
+ channel-like pathway. This is particularly useful for integration testing
+ resilience to things like packet loss on a usually-reliable transport. RPC
+ server integrations (e.g. ``HdlcRpcLocalServerAndClient``) may provide an
+ opportunity to inject a ``ChannelManipulator`` for this use case.
+
+ A ``ChannelManipulator`` should not modify send_packet, as the consumer of a
+ ``ChannelManipulator`` will use ``send_packet`` to insert the provided
+ ``ChannelManipulator`` into a packet processing path.
+
+ For example:
+
+ .. code-block:: python
+
+ class PacketLogger(ChannelManipulator):
+ def process_and_send(self, packet: bytes) -> None:
+ _LOG.debug('Received packet with payload: %s', str(packet))
+ self.send_packet(packet)
+
+
+ packet_logger = PacketLogger()
+
+ # Configure actual send command.
+ packet_logger.send_packet = socket.sendall
+
+ # Route the output channel through the PacketLogger channel manipulator.
+ channels = tuple(Channel(_DEFAULT_CHANNEL, packet_logger))
+
+ # Create a RPC client.
+ client = HdlcRpcClient(socket.read, protos, channels, stdout)
+ """
+ def __init__(self):
+ self.send_packet: Callable[[bytes], Any] = lambda _: None
+
+ @abc.abstractmethod
+ def process_and_send(self, packet: bytes) -> None:
+ """Processes an incoming packet before optionally sending it.
+
+ Implementations of this method may send the processed packet, multiple
+ packets, or no packets at all via the registered `send_packet()`
+ handler.
+ """
+
+ def __call__(self, data: bytes) -> None:
+ self.process_and_send(data)
+
+
@dataclass(frozen=True, eq=False)
class Service:
"""Describes an RPC service."""
diff --git a/pw_transfer/py/tests/python_cpp_transfer_test.py b/pw_transfer/py/tests/python_cpp_transfer_test.py
index 136a18844..62e0d14eb 100755
--- a/pw_transfer/py/tests/python_cpp_transfer_test.py
+++ b/pw_transfer/py/tests/python_cpp_transfer_test.py
@@ -44,10 +44,13 @@ class TransferServiceIntegrationTest(unittest.TestCase):
self.directory = Path(self._tempdir.name)
command = (*self.test_server_command, str(self.directory))
+ self._outgoing_filter = rpc.PacketFilter('outgoing RPC')
+ self._incoming_filter = rpc.PacketFilter('incoming RPC')
self._context = rpc.HdlcRpcLocalServerAndClient(
command,
self.port, [transfer_pb2, test_server_pb2],
- for_testing=True)
+ outgoing_processor=self._outgoing_filter,
+ incoming_processor=self._incoming_filter)
service = self._context.client.channel(1).rpcs.pw.transfer.Transfer
self.manager = pw_transfer.Manager(
@@ -148,12 +151,12 @@ class TransferServiceIntegrationTest(unittest.TestCase):
self.set_content(34, 'junk')
# Allow the initial packet and first chunk, then drop the second chunk.
- self._context.outgoing_packets.keep(2)
- self._context.outgoing_packets.drop(1)
+ self._outgoing_filter.keep(2)
+ self._outgoing_filter.drop(1)
# Allow the initial transfer parameters updates, then drop the next two.
- self._context.incoming_packets.keep(1)
- self._context.incoming_packets.drop(2)
+ self._incoming_filter.keep(1)
+ self._incoming_filter.drop(2)
with self.assertLogs('pw_transfer', 'DEBUG') as logs:
self.manager.write(34, _DATA_4096B)
@@ -169,8 +172,8 @@ class TransferServiceIntegrationTest(unittest.TestCase):
def test_write_regularly_drop_packets(self) -> None:
self.set_content(35, 'junk')
- self._context.outgoing_packets.drop_every(5) # drop one per window
- self._context.incoming_packets.drop_every(3)
+ self._outgoing_filter.drop_every(5) # drop one per window
+ self._incoming_filter.drop_every(3)
self.manager.write(35, _DATA_4096B)
@@ -184,16 +187,16 @@ class TransferServiceIntegrationTest(unittest.TestCase):
self.set_content(seed, 'junk')
rand = random.Random(seed)
- self._context.incoming_packets.randomly_drop(3, rand)
- self._context.outgoing_packets.randomly_drop(3, rand)
+ self._incoming_filter.randomly_drop(3, rand)
+ self._outgoing_filter.randomly_drop(3, rand)
data = bytes(
rand.randrange(256) for _ in range(rand.randrange(16384)))
self.manager.write(seed, data)
self.assertEqual(self.get_content(seed), data)
- self._context.incoming_packets.reset()
- self._context.outgoing_packets.reset()
+ self._incoming_filter.reset()
+ self._outgoing_filter.reset()
def _main(test_server_command: List[str], port: int,