aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Green <aarongreen@google.com>2023-03-20 18:00:54 +0000
committerCQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>2023-03-20 18:00:54 +0000
commit3cbf0600ab0c909c471f81918942f61ed74a7235 (patch)
tree8d33d72b98c47bae9a9e4e1ce7052afdb05dc29e
parent1c3881e2c6bcc8807f2b26e58cabfe05203fec46 (diff)
downloadpigweed-3cbf0600ab0c909c471f81918942f61ed74a7235.tar.gz
pw_rpc: Add client_fuzz_test
This CL adds an integration test that uses the `test_rpc_server` and a client that generates a pseudorandom sequence of actions from a seed. These actions are performed in parallel by multiple threads using multiple `Call` objects, and include: * Creating new requests * Waiting for responses * Canceling requests * Abandoning requests * Moving `Call` objects * Destroying `Call` objects The callbacks for requests also perform pseudorandom actions, except that they will not wait for other responses or move or destroy the `Call` object represented by the callback. Change-Id: Id7e3448e77ad8e333b0d28269709ab6f2c12be9c Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/128372 Commit-Queue: Aaron Green <aarongreen@google.com> Reviewed-by: Wyatt Hepler <hepler@google.com>
-rw-r--r--BUILD.gn13
-rw-r--r--pw_rpc/BUILD.gn1
-rw-r--r--pw_rpc/benchmark.rst20
-rw-r--r--pw_rpc/fuzz/BUILD.gn140
-rw-r--r--pw_rpc/fuzz/alarm_timer_test.cc64
-rw-r--r--pw_rpc/fuzz/argparse.cc259
-rw-r--r--pw_rpc/fuzz/argparse_test.cc196
-rw-r--r--pw_rpc/fuzz/client_fuzzer.cc111
-rw-r--r--pw_rpc/fuzz/engine.cc553
-rw-r--r--pw_rpc/fuzz/engine_test.cc264
-rw-r--r--pw_rpc/fuzz/public/pw_rpc/fuzz/alarm_timer.h56
-rw-r--r--pw_rpc/fuzz/public/pw_rpc/fuzz/argparse.h230
-rw-r--r--pw_rpc/fuzz/public/pw_rpc/fuzz/engine.h339
-rw-r--r--pw_rpc/internal/integration_test_ports.gni1
-rw-r--r--pw_rpc/py/pw_rpc/testing.py30
15 files changed, 2271 insertions, 6 deletions
diff --git a/BUILD.gn b/BUILD.gn
index 6a13b3b1e..aa97568b9 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -287,9 +287,18 @@ group("integration_tests") {
# Build-only target for fuzzers.
group("fuzzers") {
- # Fuzzing is only supported on Linux and MacOS using clang.
+ deps = []
+
+ # TODO(b/274437709): The client_fuzzer encounters build errors on macos. Limit
+ # it to Linux hosts for now.
+ if (host_os == "linux") {
+ _default_tc = _default_toolchain_prefix + pw_DEFAULT_C_OPTIMIZATION_LEVEL
+ deps += [ "$dir_pw_rpc/fuzz:client_fuzzer($_default_tc)" ]
+ }
+
if (host_os != "win") {
- deps = [
+ # Coverage-guided fuzzing is only supported on Linux and MacOS using clang.
+ deps += [
"$dir_pw_bluetooth_hci:fuzzers($dir_pigweed/targets/host:host_clang_fuzz)",
"$dir_pw_fuzzer:fuzzers($dir_pigweed/targets/host:host_clang_fuzz)",
"$dir_pw_protobuf:fuzzers($dir_pigweed/targets/host:host_clang_fuzz)",
diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn
index ad7ff7b1e..18c8778e3 100644
--- a/pw_rpc/BUILD.gn
+++ b/pw_rpc/BUILD.gn
@@ -455,6 +455,7 @@ pw_test_group("tests") {
":service_test",
]
group_deps = [
+ "fuzz:tests",
"nanopb:tests",
"pwpb:tests",
"raw:tests",
diff --git a/pw_rpc/benchmark.rst b/pw_rpc/benchmark.rst
index 6e97ac1d0..5ecc136a8 100644
--- a/pw_rpc/benchmark.rst
+++ b/pw_rpc/benchmark.rst
@@ -49,3 +49,23 @@ Example
server.RegisterService(benchmark_service);
}
+Stress Test
+===========
+.. attention::
+ This section is experimental and liable to change.
+
+The Benchmark service is also used as part of a stress test of the ``pw_rpc``
+module. This stress test is implemented as an unguided fuzzer that uses
+multiple worker threads to perform generated sequences of actions using RPC
+``Call`` objects. The test is included as an integration test, and can found and
+be run locally using GN:
+
+.. code-block:: bash
+
+ $ gn desc out //:integration_tests deps | grep fuzz
+ //pw_rpc/fuzz:cpp_client_server_fuzz_test(//targets/host/pigweed_internal:pw_strict_host_clang_debug)
+
+ $ gn outputs out '//pw_rpc/fuzz:cpp_client_server_fuzz_test(//targets/host/pigweed_internal:pw_strict_host_clang_debug)'
+ pw_strict_host_clang_debug/gen/pw_rpc/fuzz/cpp_client_server_fuzz_test.pw_pystamp
+
+ $ ninja -C out pw_strict_host_clang_debug/gen/pw_rpc/fuzz/cpp_client_server_fuzz_test.pw_pystamp
diff --git a/pw_rpc/fuzz/BUILD.gn b/pw_rpc/fuzz/BUILD.gn
new file mode 100644
index 000000000..bec42fe44
--- /dev/null
+++ b/pw_rpc/fuzz/BUILD.gn
@@ -0,0 +1,140 @@
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import("//build_overrides/pigweed.gni")
+
+import("$dir_pw_build/target_types.gni")
+import("$dir_pw_chrono/backend.gni")
+import("$dir_pw_rpc/internal/integration_test_ports.gni")
+import("$dir_pw_thread/backend.gni")
+import("$dir_pw_unit_test/test.gni")
+
+config("public_include_path") {
+ include_dirs = [
+ "public",
+ "$dir_pw_rpc/public",
+ ]
+ visibility = [ ":*" ]
+}
+
+pw_source_set("alarm_timer") {
+ public_configs = [ ":public_include_path" ]
+ public = [ "public/pw_rpc/fuzz/alarm_timer.h" ]
+ public_deps = [
+ "$dir_pw_chrono:system_clock",
+ "$dir_pw_chrono:system_timer",
+ ]
+ visibility = [ ":*" ]
+}
+
+pw_test("alarm_timer_test") {
+ enable_if = pw_chrono_SYSTEM_TIMER_BACKEND != ""
+ sources = [ "alarm_timer_test.cc" ]
+ deps = [
+ ":alarm_timer",
+ "$dir_pw_sync:binary_semaphore",
+ ]
+}
+
+pw_source_set("argparse") {
+ public_configs = [ ":public_include_path" ]
+ public = [ "public/pw_rpc/fuzz/argparse.h" ]
+ sources = [ "argparse.cc" ]
+ public_deps = [
+ "$dir_pw_containers:vector",
+ dir_pw_status,
+ ]
+ deps = [
+ "$dir_pw_string:builder",
+ dir_pw_assert,
+ dir_pw_log,
+ ]
+ visibility = [ ":*" ]
+}
+
+pw_test("argparse_test") {
+ sources = [ "argparse_test.cc" ]
+ deps = [ ":argparse" ]
+}
+
+pw_source_set("engine") {
+ public_configs = [ ":public_include_path" ]
+ public = [ "public/pw_rpc/fuzz/engine.h" ]
+ sources = [ "engine.cc" ]
+ public_deps = [
+ ":alarm_timer",
+ "$dir_pw_chrono:system_clock",
+ "$dir_pw_rpc:benchmark",
+ "$dir_pw_rpc:log_config",
+ "$dir_pw_rpc:protos.raw_rpc",
+ "$dir_pw_string:format",
+ "$dir_pw_sync:condition_variable",
+ "$dir_pw_sync:timed_mutex",
+ "$dir_pw_thread:thread",
+ dir_pw_random,
+ ]
+ deps = [ "$dir_pw_rpc:client" ]
+ visibility = [ ":*" ]
+}
+
+pw_test("engine_test") {
+ enable_if =
+ pw_chrono_SYSTEM_TIMER_BACKEND == "$dir_pw_chrono_stl:system_timer" &&
+ pw_thread_THREAD_BACKEND == "$dir_pw_thread_stl:thread"
+ sources = [ "engine_test.cc" ]
+ deps = [
+ ":engine",
+ "$dir_pw_rpc:client_server_testing_threaded",
+ "$dir_pw_thread:test_threads",
+ "$dir_pw_thread_stl:test_threads",
+ dir_pw_log,
+ pw_chrono_SYSTEM_TIMER_BACKEND,
+ ]
+}
+
+pw_executable("client_fuzzer") {
+ sources = [ "client_fuzzer.cc" ]
+ deps = [
+ ":argparse",
+ ":engine",
+ "$dir_pw_rpc:client",
+ "$dir_pw_rpc:integration_testing",
+ ]
+}
+
+pw_python_action("cpp_client_server_fuzz_test") {
+ script = "../py/pw_rpc/testing.py"
+ args = [
+ "--server",
+ "<TARGET_FILE($dir_pw_rpc:test_rpc_server)>",
+ "--client",
+ "<TARGET_FILE(:client_fuzzer)>",
+ "--",
+ "$pw_rpc_CPP_CLIENT_FUZZER_TEST_PORT",
+ ]
+ deps = [
+ ":client_fuzzer",
+ "$dir_pw_rpc:test_rpc_server",
+ ]
+
+ stamp = true
+}
+
+pw_test_group("tests") {
+ tests = [
+ ":argparse_test",
+ ":alarm_timer_test",
+ ":engine_test",
+ ]
+}
diff --git a/pw_rpc/fuzz/alarm_timer_test.cc b/pw_rpc/fuzz/alarm_timer_test.cc
new file mode 100644
index 000000000..ce8395a6b
--- /dev/null
+++ b/pw_rpc/fuzz/alarm_timer_test.cc
@@ -0,0 +1,64 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/fuzz/alarm_timer.h"
+
+#include <chrono>
+
+#include "gtest/gtest.h"
+#include "pw_sync/binary_semaphore.h"
+
+namespace pw::rpc::fuzz {
+namespace {
+
+using namespace std::chrono_literals;
+
+TEST(AlarmTimerTest, Start) {
+ sync::BinarySemaphore sem;
+ AlarmTimer timer([&sem](chrono::SystemClock::time_point) { sem.release(); });
+ timer.Start(10ms);
+ sem.acquire();
+}
+
+TEST(AlarmTimerTest, Restart) {
+ sync::BinarySemaphore sem;
+ AlarmTimer timer([&sem](chrono::SystemClock::time_point) { sem.release(); });
+ timer.Start(50ms);
+ for (size_t i = 0; i < 10; ++i) {
+ timer.Restart();
+ EXPECT_FALSE(sem.try_acquire_for(10us));
+ }
+ sem.acquire();
+}
+
+TEST(AlarmTimerTest, Cancel) {
+ sync::BinarySemaphore sem;
+ AlarmTimer timer([&sem](chrono::SystemClock::time_point) { sem.release(); });
+ timer.Start(50ms);
+ timer.Cancel();
+ EXPECT_FALSE(sem.try_acquire_for(100us));
+}
+
+TEST(AlarmTimerTest, Destroy) {
+ sync::BinarySemaphore sem;
+ {
+ AlarmTimer timer(
+ [&sem](chrono::SystemClock::time_point) { sem.release(); });
+ timer.Start(50ms);
+ }
+ EXPECT_FALSE(sem.try_acquire_for(100us));
+}
+
+} // namespace
+} // namespace pw::rpc::fuzz
diff --git a/pw_rpc/fuzz/argparse.cc b/pw_rpc/fuzz/argparse.cc
new file mode 100644
index 000000000..39a5dd694
--- /dev/null
+++ b/pw_rpc/fuzz/argparse.cc
@@ -0,0 +1,259 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/fuzz/argparse.h"
+
+#include <cctype>
+#include <cstring>
+
+#include "pw_assert/check.h"
+#include "pw_log/log.h"
+#include "pw_string/string_builder.h"
+
+namespace pw::rpc::fuzz {
+namespace {
+
+// Visitor to `ArgVariant` used by `ParseArgs` below.
+struct ParseVisitor {
+ std::string_view arg0;
+ std::string_view arg1;
+
+ template <typename Parser>
+ ParseStatus operator()(Parser& parser) {
+ return parser.Parse(arg0, arg1);
+ }
+};
+
+// Visitor to `ArgVariant` used by `GetArg` below.
+struct ValueVisitor {
+ std::string_view name;
+
+ template <typename Parser>
+ std::optional<ArgVariant> operator()(Parser& parser) {
+ std::optional<ArgVariant> result;
+ if (parser.short_name() == name || parser.long_name() == name) {
+ result.emplace(parser.value());
+ }
+ return result;
+ }
+};
+
+// Visitor to `ArgVariant` used by `PrintUsage` below.
+const size_t kMaxUsageLen = 256;
+struct UsageVisitor {
+ StringBuffer<kMaxUsageLen>* buffer;
+
+ void operator()(const BoolParser& parser) const {
+ auto short_name = parser.short_name();
+ auto long_name = parser.long_name();
+ *buffer << " [" << short_name << "|--[no-]" << long_name.substr(2) << "]";
+ }
+
+ template <typename T>
+ void operator()(const UnsignedParser<T>& parser) const {
+ auto short_name = parser.short_name();
+ auto long_name = parser.long_name();
+ *buffer << " ";
+ if (!parser.positional()) {
+ *buffer << "[";
+ if (!short_name.empty()) {
+ *buffer << short_name << "|";
+ }
+ *buffer << long_name << " ";
+ }
+ for (const auto& c : long_name) {
+ *buffer << static_cast<char>(toupper(c));
+ }
+ if (!parser.positional()) {
+ *buffer << "]";
+ }
+ }
+};
+
+// Visitor to `ArgVariant` used by `ResetArg` below.
+struct ResetVisitor {
+ std::string_view name;
+
+ template <typename Parser>
+ bool operator()(Parser& parser) {
+ if (parser.short_name() != name && parser.long_name() != name) {
+ return false;
+ }
+ parser.Reset();
+ return true;
+ }
+};
+
+} // namespace
+
+ArgParserBase::ArgParserBase(std::string_view name) : long_name_(name) {
+ PW_CHECK(!name.empty());
+ PW_CHECK(name != "--");
+ positional_ =
+ name[0] != '-' || (name.size() > 2 && name.substr(0, 2) != "--");
+}
+
+ArgParserBase::ArgParserBase(std::string_view shortopt,
+ std::string_view longopt)
+ : short_name_(shortopt), long_name_(longopt) {
+ PW_CHECK(shortopt.size() == 2);
+ PW_CHECK(shortopt[0] == '-');
+ PW_CHECK(shortopt != "--");
+ PW_CHECK(longopt.size() > 2);
+ PW_CHECK(longopt.substr(0, 2) == "--");
+ positional_ = false;
+}
+
+bool ArgParserBase::Match(std::string_view arg) {
+ if (arg.empty()) {
+ return false;
+ }
+ if (!positional_) {
+ return arg == short_name_ || arg == long_name_;
+ }
+ if (!std::holds_alternative<std::monostate>(value_)) {
+ return false;
+ }
+ if ((arg.size() == 2 && arg[0] == '-') ||
+ (arg.size() > 2 && arg.substr(0, 2) == "--")) {
+ PW_LOG_WARN("Argument parsed for '%s' appears to be a flag: '%s'",
+ long_name_.data(),
+ arg.data());
+ }
+ return true;
+}
+
+const ArgVariant& ArgParserBase::GetValue() const {
+ return std::holds_alternative<std::monostate>(value_) ? initial_ : value_;
+}
+
+BoolParser::BoolParser(std::string_view name) : ArgParserBase(name) {}
+BoolParser::BoolParser(std::string_view shortopt, std::string_view longopt)
+ : ArgParserBase(shortopt, longopt) {}
+
+BoolParser& BoolParser::set_default(bool value) {
+ set_initial(value);
+ return *this;
+}
+
+ParseStatus BoolParser::Parse(std::string_view arg0,
+ [[maybe_unused]] std::string_view arg1) {
+ if (Match(arg0)) {
+ set_value(true);
+ return kParsedOne;
+ }
+ if (arg0.size() > 5 && arg0.substr(0, 5) == "--no-" &&
+ arg0.substr(5) == long_name().substr(2)) {
+ set_value(false);
+ return kParsedOne;
+ }
+ return kParseMismatch;
+}
+
+UnsignedParserBase::UnsignedParserBase(std::string_view name)
+ : ArgParserBase(name) {}
+UnsignedParserBase::UnsignedParserBase(std::string_view shortopt,
+ std::string_view longopt)
+ : ArgParserBase(shortopt, longopt) {}
+
+ParseStatus UnsignedParserBase::Parse(std::string_view arg0,
+ std::string_view arg1,
+ uint64_t max) {
+ auto result = kParsedOne;
+ if (!Match(arg0)) {
+ return kParseMismatch;
+ }
+ if (!positional()) {
+ if (arg1.empty()) {
+ PW_LOG_ERROR("Missing value for flag '%s'", arg0.data());
+ return kParseFailure;
+ }
+ arg0 = arg1;
+ result = kParsedTwo;
+ }
+ char* endptr;
+ auto value = strtoull(arg0.data(), &endptr, 0);
+ if (*endptr) {
+ PW_LOG_ERROR("Failed to parse number from '%s'", arg0.data());
+ return kParseFailure;
+ }
+ if (value > max) {
+ PW_LOG_ERROR("Parsed value is too large: %llu", value);
+ return kParseFailure;
+ }
+ set_value(value);
+ return result;
+}
+
+Status ParseArgs(Vector<ArgParserVariant>& parsers, int argc, char** argv) {
+ for (int i = 1; i < argc; ++i) {
+ auto arg0 = std::string_view(argv[i]);
+ auto arg1 =
+ i == (argc - 1) ? std::string_view() : std::string_view(argv[i + 1]);
+ bool parsed = false;
+ for (auto& parser : parsers) {
+ switch (std::visit(ParseVisitor{.arg0 = arg0, .arg1 = arg1}, parser)) {
+ case kParsedOne:
+ break;
+ case kParsedTwo:
+ ++i;
+ break;
+ case kParseMismatch:
+ continue;
+ case kParseFailure:
+ PW_LOG_ERROR("Failed to parse '%s'", arg0.data());
+ return Status::InvalidArgument();
+ }
+ parsed = true;
+ break;
+ }
+ if (!parsed) {
+ PW_LOG_ERROR("Unrecognized argument: '%s'", arg0.data());
+ return Status::InvalidArgument();
+ }
+ }
+ return OkStatus();
+}
+
+void PrintUsage(const Vector<ArgParserVariant>& parsers,
+ std::string_view argv0) {
+ StringBuffer<kMaxUsageLen> buffer;
+ buffer << "usage: " << argv0;
+ for (auto& parser : parsers) {
+ std::visit(UsageVisitor{.buffer = &buffer}, parser);
+ }
+ PW_LOG_INFO("%s", buffer.c_str());
+}
+
+std::optional<ArgVariant> GetArg(const Vector<ArgParserVariant>& parsers,
+ std::string_view name) {
+ for (auto& parser : parsers) {
+ if (auto result = std::visit(ValueVisitor{.name = name}, parser);
+ result.has_value()) {
+ return result;
+ }
+ }
+ return std::optional<ArgVariant>();
+}
+
+Status ResetArg(Vector<ArgParserVariant>& parsers, std::string_view name) {
+ for (auto& parser : parsers) {
+ if (std::visit(ResetVisitor{.name = name}, parser)) {
+ return OkStatus();
+ }
+ }
+ return Status::InvalidArgument();
+}
+
+} // namespace pw::rpc::fuzz
diff --git a/pw_rpc/fuzz/argparse_test.cc b/pw_rpc/fuzz/argparse_test.cc
new file mode 100644
index 000000000..6f0ab3889
--- /dev/null
+++ b/pw_rpc/fuzz/argparse_test.cc
@@ -0,0 +1,196 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/fuzz/argparse.h"
+
+#include <cstdint>
+#include <limits>
+
+#include "gtest/gtest.h"
+
+namespace pw::rpc::fuzz {
+namespace {
+
+TEST(ArgsParseTest, ParseBoolFlag) {
+ auto parser1 = BoolParser("-t", "--true").set_default(true);
+ auto parser2 = BoolParser("-f").set_default(false);
+ EXPECT_TRUE(parser1.value());
+ EXPECT_FALSE(parser2.value());
+
+ EXPECT_EQ(parser1.Parse("-t"), ParseStatus::kParsedOne);
+ EXPECT_EQ(parser2.Parse("-t"), ParseStatus::kParseMismatch);
+ EXPECT_TRUE(parser1.value());
+ EXPECT_FALSE(parser2.value());
+
+ EXPECT_EQ(parser1.Parse("--true"), ParseStatus::kParsedOne);
+ EXPECT_EQ(parser2.Parse("--true"), ParseStatus::kParseMismatch);
+ EXPECT_TRUE(parser1.value());
+ EXPECT_FALSE(parser2.value());
+
+ EXPECT_EQ(parser1.Parse("--no-true"), ParseStatus::kParsedOne);
+ EXPECT_EQ(parser2.Parse("--no-true"), ParseStatus::kParseMismatch);
+ EXPECT_FALSE(parser1.value());
+ EXPECT_FALSE(parser2.value());
+
+ EXPECT_EQ(parser1.Parse("-f"), ParseStatus::kParseMismatch);
+ EXPECT_EQ(parser2.Parse("-f"), ParseStatus::kParsedOne);
+ EXPECT_FALSE(parser1.value());
+ EXPECT_TRUE(parser2.value());
+}
+
+template <typename T>
+void ParseUnsignedFlag() {
+ auto parser = UnsignedParser<T>("-u", "--unsigned").set_default(137);
+ EXPECT_EQ(parser.value(), 137u);
+
+ // Wrong name.
+ EXPECT_EQ(parser.Parse("-s"), ParseStatus::kParseMismatch);
+ EXPECT_EQ(parser.Parse("--signed"), ParseStatus::kParseMismatch);
+ EXPECT_EQ(parser.value(), 137u);
+
+ // Missing values.
+ EXPECT_EQ(parser.Parse("-u"), ParseStatus::kParseFailure);
+ EXPECT_EQ(parser.Parse("--unsigned"), ParseStatus::kParseFailure);
+ EXPECT_EQ(parser.value(), 137u);
+
+ // Non-numeric values.
+ EXPECT_EQ(parser.Parse("-u", "foo"), ParseStatus::kParseFailure);
+ EXPECT_EQ(parser.Parse("--unsigned", "bar"), ParseStatus::kParseFailure);
+ EXPECT_EQ(parser.value(), 137u);
+
+ // Minimum values.
+ EXPECT_EQ(parser.Parse("-u", "0"), ParseStatus::kParsedTwo);
+ EXPECT_EQ(parser.Parse("--unsigned", "0"), ParseStatus::kParsedTwo);
+ EXPECT_EQ(parser.value(), 0u);
+
+ // Maximum values.
+ T max = std::numeric_limits<T>::max();
+ StringBuffer<32> buf;
+ buf << max;
+ EXPECT_EQ(parser.Parse("-u", buf.c_str()), ParseStatus::kParsedTwo);
+ EXPECT_EQ(parser.value(), max);
+ EXPECT_EQ(parser.Parse("--unsigned", buf.c_str()), ParseStatus::kParsedTwo);
+ EXPECT_EQ(parser.value(), max);
+
+ // Out of-range value.
+ if (max < std::numeric_limits<uint64_t>::max()) {
+ buf.clear();
+ buf << (max + 1ULL);
+ EXPECT_EQ(parser.Parse("-u", buf.c_str()), ParseStatus::kParseFailure);
+ EXPECT_EQ(parser.Parse("--unsigned", buf.c_str()),
+ ParseStatus::kParseFailure);
+ EXPECT_EQ(parser.value(), max);
+ }
+}
+
+TEST(ArgsParseTest, ParseUnsignedFlags) {
+ ParseUnsignedFlag<uint8_t>();
+ ParseUnsignedFlag<uint16_t>();
+ ParseUnsignedFlag<uint32_t>();
+ ParseUnsignedFlag<uint64_t>();
+}
+
+TEST(ArgsParseTest, ParsePositional) {
+ auto parser = UnsignedParser<size_t>("positional").set_default(1);
+ EXPECT_EQ(parser.Parse("-p", "2"), ParseStatus::kParseFailure);
+ EXPECT_EQ(parser.value(), 1u);
+
+ EXPECT_EQ(parser.Parse("--positional", "2"), ParseStatus::kParseFailure);
+ EXPECT_EQ(parser.value(), 1u);
+
+ // Second arg is ignored..
+ EXPECT_EQ(parser.Parse("2", "3"), ParseStatus::kParsedOne);
+ EXPECT_EQ(parser.value(), 2u);
+
+ // Positional only matches once.
+ EXPECT_EQ(parser.Parse("3"), ParseStatus::kParseMismatch);
+ EXPECT_EQ(parser.value(), 2u);
+}
+
+TEST(ArgsParseTest, PrintUsage) {
+ // Just verify it compiles and runs.
+ Vector<ArgParserVariant, 3> parsers = {
+ BoolParser("-v", "--verbose").set_default(false),
+ UnsignedParser<size_t>("-r", "--runs").set_default(1000),
+ UnsignedParser<size_t>("port").set_default(11111),
+ };
+ PrintUsage(parsers, "test-bin");
+}
+
+void CheckArgs(Vector<ArgParserVariant>& parsers,
+ bool verbose,
+ size_t runs,
+ uint16_t port) {
+ bool actual_verbose;
+ EXPECT_EQ(GetArg(parsers, "--verbose", &actual_verbose), OkStatus());
+ EXPECT_EQ(verbose, actual_verbose);
+ EXPECT_EQ(ResetArg(parsers, "--verbose"), OkStatus());
+
+ size_t actual_runs;
+ EXPECT_EQ(GetArg(parsers, "--runs", &actual_runs), OkStatus());
+ EXPECT_EQ(runs, actual_runs);
+ EXPECT_EQ(ResetArg(parsers, "--runs"), OkStatus());
+
+ uint16_t actual_port;
+ EXPECT_EQ(GetArg(parsers, "port", &actual_port), OkStatus());
+ EXPECT_EQ(port, actual_port);
+ EXPECT_EQ(ResetArg(parsers, "port"), OkStatus());
+}
+
+TEST(ArgsParseTest, ParseArgs) {
+ Vector<ArgParserVariant, 3> parsers{
+ BoolParser("-v", "--verbose").set_default(false),
+ UnsignedParser<size_t>("-r", "--runs").set_default(1000),
+ UnsignedParser<uint16_t>("port").set_default(11111),
+ };
+
+ char const* argv1[] = {"test-bin"};
+ EXPECT_EQ(ParseArgs(parsers, 1, const_cast<char**>(argv1)), OkStatus());
+ CheckArgs(parsers, false, 1000, 11111);
+
+ char const* argv2[] = {"test-bin", "22222"};
+ EXPECT_EQ(ParseArgs(parsers, 2, const_cast<char**>(argv2)), OkStatus());
+ CheckArgs(parsers, false, 1000, 22222);
+
+ // Out of range argument.
+ char const* argv3[] = {"test-bin", "65536"};
+ EXPECT_EQ(ParseArgs(parsers, 2, const_cast<char**>(argv3)),
+ Status::InvalidArgument());
+
+ // Extra argument.
+ char const* argv4[] = {"test-bin", "1", "2"};
+ EXPECT_EQ(ParseArgs(parsers, 3, const_cast<char**>(argv4)),
+ Status::InvalidArgument());
+ EXPECT_EQ(ResetArg(parsers, "port"), OkStatus());
+
+ // Flag missing value.
+ char const* argv5[] = {"test-bin", "--runs"};
+ EXPECT_EQ(ParseArgs(parsers, 2, const_cast<char**>(argv5)),
+ Status::InvalidArgument());
+
+ char const* argv6[] = {"test-bin", "-v", "33333", "--runs", "300"};
+ EXPECT_EQ(ParseArgs(parsers, 5, const_cast<char**>(argv6)), OkStatus());
+ CheckArgs(parsers, true, 300, 33333);
+
+ char const* argv7[] = {"test-bin", "-r", "400", "--verbose"};
+ EXPECT_EQ(ParseArgs(parsers, 4, const_cast<char**>(argv7)), OkStatus());
+ CheckArgs(parsers, true, 400, 11111);
+
+ char const* argv8[] = {"test-bin", "--no-verbose", "-r", "5000", "55555"};
+ EXPECT_EQ(ParseArgs(parsers, 5, const_cast<char**>(argv8)), OkStatus());
+ CheckArgs(parsers, false, 5000, 55555);
+}
+
+} // namespace
+} // namespace pw::rpc::fuzz
diff --git a/pw_rpc/fuzz/client_fuzzer.cc b/pw_rpc/fuzz/client_fuzzer.cc
new file mode 100644
index 000000000..c5086becf
--- /dev/null
+++ b/pw_rpc/fuzz/client_fuzzer.cc
@@ -0,0 +1,111 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+// clang-format off
+#include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first.
+// clang-format on
+
+#include <sys/socket.h>
+
+#include <cstring>
+
+#include "pw_log/log.h"
+#include "pw_rpc/fuzz/argparse.h"
+#include "pw_rpc/fuzz/engine.h"
+#include "pw_rpc/integration_testing.h"
+
+namespace pw::rpc::fuzz {
+namespace {
+
+// This client configures a socket read timeout to allow the RPC dispatch thread
+// to exit gracefully.
+constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0};
+
+int FuzzClient(int argc, char** argv) {
+ // TODO(aarongreen): Incorporate descriptions into usage message.
+ Vector<ArgParserVariant, 5> parsers{
+ // Enables additional logging.
+ BoolParser("-v", "--verbose").set_default(false),
+
+ // The number of actions to perform as part of the test. A value of 0 runs
+ // indefinitely.
+ UnsignedParser<size_t>("-n", "--num-actions").set_default(256),
+
+ // The seed value for the PRNG. A value of 0 generates a seed.
+ UnsignedParser<uint64_t>("-s", "--seed").set_default(0),
+
+ // The time, in milliseconds, that can elapse without triggering an error.
+ UnsignedParser<size_t>("-t", "--timeout").set_default(5000),
+
+ // The port use to connect to the `test_rpc_server`.
+ UnsignedParser<uint16_t>("port").set_default(48000)};
+
+ if (!ParseArgs(parsers, argc, argv).ok()) {
+ PrintUsage(parsers, argv[0]);
+ return 1;
+ }
+
+ bool verbose;
+ size_t num_actions;
+ uint64_t seed;
+ size_t timeout_ms;
+ uint16_t port;
+ if (!GetArg(parsers, "--verbose", &verbose).ok() ||
+ !GetArg(parsers, "--num-actions", &num_actions).ok() ||
+ !GetArg(parsers, "--seed", &seed).ok() ||
+ !GetArg(parsers, "--timeout", &timeout_ms).ok() ||
+ !GetArg(parsers, "port", &port).ok()) {
+ return 1;
+ }
+
+ if (!seed) {
+ seed = chrono::SystemClock::now().time_since_epoch().count();
+ }
+
+ if (auto status = integration_test::InitializeClient(port); !status.ok()) {
+ PW_LOG_ERROR("Failed to initialize client: %s", pw_StatusString(status));
+ return 1;
+ }
+
+ // Set read timout on socket to allow
+ // pw::rpc::integration_test::TerminateClient() to complete.
+ int fd = integration_test::GetClientSocketFd();
+ if (setsockopt(fd,
+ SOL_SOCKET,
+ SO_RCVTIMEO,
+ &kSocketReadTimeout,
+ sizeof(kSocketReadTimeout)) != 0) {
+ PW_LOG_ERROR("Failed to configure socket receive timeout with errno=%d",
+ errno);
+ return 1;
+ }
+
+ if (num_actions == 0) {
+ num_actions = std::numeric_limits<size_t>::max();
+ }
+
+ Fuzzer fuzzer(integration_test::client(), integration_test::kChannelId);
+ fuzzer.set_verbose(verbose);
+ fuzzer.set_timeout(std::chrono::milliseconds(timeout_ms));
+ fuzzer.Run(seed, num_actions);
+ integration_test::TerminateClient();
+ return 0;
+}
+
+} // namespace
+} // namespace pw::rpc::fuzz
+
+int main(int argc, char** argv) {
+ return pw::rpc::fuzz::FuzzClient(argc, argv);
+}
diff --git a/pw_rpc/fuzz/engine.cc b/pw_rpc/fuzz/engine.cc
new file mode 100644
index 000000000..b19dfa398
--- /dev/null
+++ b/pw_rpc/fuzz/engine.cc
@@ -0,0 +1,553 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+// clang-format off
+#include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first.
+
+#include "pw_rpc/fuzz/engine.h"
+// clang-format on
+
+#include <algorithm>
+#include <cctype>
+#include <chrono>
+#include <cinttypes>
+#include <limits>
+#include <mutex>
+
+#include "pw_assert/check.h"
+#include "pw_bytes/span.h"
+#include "pw_log/log.h"
+#include "pw_span/span.h"
+#include "pw_status/status.h"
+#include "pw_string/format.h"
+
+namespace pw::rpc::fuzz {
+namespace {
+
+using namespace std::chrono_literals;
+
+// Maximum number of bytes written in a single unary or stream request.
+constexpr size_t kMaxWriteLen = MaxSafePayloadSize();
+static_assert(kMaxWriteLen * 0x7E <= std::numeric_limits<uint16_t>::max());
+
+struct ActiveVisitor final {
+ using result_type = bool;
+ result_type operator()(std::monostate&) { return false; }
+ result_type operator()(pw::rpc::RawUnaryReceiver& call) {
+ return call.active();
+ }
+ result_type operator()(pw::rpc::RawClientReaderWriter& call) {
+ return call.active();
+ }
+};
+
+struct CloseClientStreamVisitor final {
+ using result_type = void;
+ result_type operator()(std::monostate&) {}
+ result_type operator()(pw::rpc::RawUnaryReceiver&) {}
+ result_type operator()(pw::rpc::RawClientReaderWriter& call) {
+ call.CloseClientStream().IgnoreError();
+ }
+};
+
+struct WriteVisitor final {
+ using result_type = bool;
+ result_type operator()(std::monostate&) { return false; }
+ result_type operator()(pw::rpc::RawUnaryReceiver&) { return false; }
+ result_type operator()(pw::rpc::RawClientReaderWriter& call) {
+ if (!call.active()) {
+ return false;
+ }
+ call.Write(data).IgnoreError();
+ return true;
+ }
+ ConstByteSpan data;
+};
+
+struct CancelVisitor final {
+ using result_type = void;
+ result_type operator()(std::monostate&) {}
+ result_type operator()(pw::rpc::RawUnaryReceiver& call) {
+ call.Cancel().IgnoreError();
+ }
+ result_type operator()(pw::rpc::RawClientReaderWriter& call) {
+ call.Cancel().IgnoreError();
+ }
+};
+
+struct AbandonVisitor final {
+ using result_type = void;
+ result_type operator()(std::monostate&) {}
+ result_type operator()(pw::rpc::RawUnaryReceiver& call) { call.Abandon(); }
+ result_type operator()(pw::rpc::RawClientReaderWriter& call) {
+ call.Abandon();
+ }
+};
+
+} // namespace
+
+// `Action` methods.
+
+Action::Action(uint32_t encoded) {
+ // The first byte is used to determine the operation. The ranges used set the
+ // relative likelihood of each result, e.g. `kWait` is more likely than
+ // `kAbandon`.
+ uint32_t raw = encoded & 0xFF;
+ if (raw == 0) {
+ op = kSkip;
+ } else if (raw < 0x60) {
+ op = kWait;
+ } else if (raw < 0x80) {
+ op = kWriteUnary;
+ } else if (raw < 0xA0) {
+ op = kWriteStream;
+ } else if (raw < 0xC0) {
+ op = kCloseClientStream;
+ } else if (raw < 0xD0) {
+ op = kCancel;
+ } else if (raw < 0xE0) {
+ op = kAbandon;
+ } else if (raw < 0xF0) {
+ op = kSwap;
+ } else {
+ op = kDestroy;
+ }
+ target = ((encoded & 0xFF00) >> 8) % Fuzzer::kMaxConcurrentCalls;
+ value = encoded >> 16;
+}
+
+Action::Action(Op op_, size_t target_, uint16_t value_)
+ : op(op_), target(target_), value(value_) {}
+
+Action::Action(Op op_, size_t target_, char val, size_t len)
+ : op(op_), target(target_) {
+ PW_ASSERT(op == kWriteUnary || op == kWriteStream);
+ value = static_cast<uint16_t>(((val % 0x80) * kMaxWriteLen) +
+ (len % kMaxWriteLen));
+}
+
+char Action::DecodeWriteValue(uint16_t value) {
+ return static_cast<char>((value / kMaxWriteLen) % 0x7F);
+}
+
+size_t Action::DecodeWriteLength(uint16_t value) {
+ return value % kMaxWriteLen;
+}
+
+uint32_t Action::Encode() const {
+ uint32_t encoded = 0;
+ switch (op) {
+ case kSkip:
+ encoded = 0x00;
+ break;
+ case kWait:
+ encoded = 0x5F;
+ break;
+ case kWriteUnary:
+ encoded = 0x7F;
+ break;
+ case kWriteStream:
+ encoded = 0x9F;
+ break;
+ case kCloseClientStream:
+ encoded = 0xBF;
+ break;
+ case kCancel:
+ encoded = 0xCF;
+ break;
+ case kAbandon:
+ encoded = 0xDF;
+ break;
+ case kSwap:
+ encoded = 0xEF;
+ break;
+ case kDestroy:
+ encoded = 0xFF;
+ break;
+ }
+ encoded |=
+ ((target < Fuzzer::kMaxConcurrentCalls ? target
+ : Fuzzer::kMaxConcurrentCalls) %
+ 0xFF)
+ << 8;
+ encoded |= (static_cast<uint32_t>(value) << 16);
+ return encoded;
+}
+
+void Action::Log(bool verbose, size_t num_actions, const char* fmt, ...) const {
+ if (!verbose) {
+ return;
+ }
+ char s1[16];
+ auto result = callback_id < Fuzzer::kMaxConcurrentCalls
+ ? string::Format(s1, "%-3zu", callback_id)
+ : string::Format(s1, "n/a");
+ va_list ap;
+ va_start(ap, fmt);
+ char s2[128];
+ if (result.ok()) {
+ result = string::FormatVaList(s2, fmt, ap);
+ }
+ va_end(ap);
+ if (result.ok()) {
+ PW_LOG_INFO("#%-12zu\tthread: %zu\tcallback for: %s\ttarget call: %zu\t%s",
+ num_actions,
+ thread_id,
+ s1,
+ target,
+ s2);
+ } else {
+ LogFailure(verbose, num_actions, result.status());
+ }
+}
+
+void Action::LogFailure(bool verbose, size_t num_actions, Status status) const {
+ if (verbose && !status.ok()) {
+ PW_LOG_INFO("#%-12zu\tthread: %zu\tFailed to log action: %s",
+ num_actions,
+ thread_id,
+ pw_StatusString(status));
+ }
+}
+
+// FuzzyCall methods.
+
+void FuzzyCall::RecordWrite(size_t num, bool append) {
+ std::lock_guard lock(mutex_);
+ if (append) {
+ last_write_ += num;
+ } else {
+ last_write_ = num;
+ }
+ total_written_ += num;
+ pending_ = true;
+}
+
+void FuzzyCall::Await() {
+ std::unique_lock<sync::Mutex> lock(mutex_);
+ cv_.wait(lock, [this]() PW_NO_LOCK_SAFETY_ANALYSIS { return !pending_; });
+}
+
+void FuzzyCall::Notify() {
+ if (pending_.exchange(false)) {
+ cv_.notify_all();
+ }
+}
+
+void FuzzyCall::Swap(FuzzyCall& other) {
+ if (index_ == other.index_) {
+ return;
+ }
+ // Manually acquire locks in an order based on call IDs to prevent deadlock.
+ if (index_ < other.index_) {
+ mutex_.lock();
+ other.mutex_.lock();
+ } else {
+ other.mutex_.lock();
+ mutex_.lock();
+ }
+ call_.swap(other.call_);
+ std::swap(id_, other.id_);
+ pending_ = other.pending_.exchange(pending_);
+ std::swap(last_write_, other.last_write_);
+ std::swap(total_written_, other.total_written_);
+ mutex_.unlock();
+ other.mutex_.unlock();
+ cv_.notify_all();
+ other.cv_.notify_all();
+}
+
+void FuzzyCall::Reset(Variant call) {
+ {
+ std::lock_guard lock(mutex_);
+ call_ = std::move(call);
+ }
+ cv_.notify_all();
+}
+
+void FuzzyCall::Log() {
+ if (mutex_.try_lock_for(100ms)) {
+ PW_LOG_INFO("call %zu:", index_);
+ PW_LOG_INFO(" active: %s",
+ std::visit(ActiveVisitor(), call_) ? "true" : "false");
+ PW_LOG_INFO(" request pending: %s ", pending_ ? "true" : "false");
+ PW_LOG_INFO(" last write: %zu bytes", last_write_);
+ PW_LOG_INFO(" total written: %zu bytes", total_written_);
+ mutex_.unlock();
+ } else {
+ PW_LOG_WARN("call %zu: failed to acquire lock", index_);
+ }
+}
+
+// `Fuzzer` methods.
+
+#define FUZZ_LOG_VERBOSE(...) \
+ if (verbose_) { \
+ PW_LOG_INFO(__VA_ARGS__); \
+ }
+
+Fuzzer::Fuzzer(Client& client, uint32_t channel_id)
+ : client_(client, channel_id),
+ timer_([this](chrono::SystemClock::time_point) {
+ PW_LOG_ERROR(
+ "Workers performed %zu actions before timing out without an "
+ "update.",
+ num_actions_.load());
+ PW_LOG_INFO("Additional call details:");
+ for (auto& call : fuzzy_calls_) {
+ call.Log();
+ }
+ PW_CRASH("Fuzzer found a fatal error condition: TIMEOUT.");
+ }) {
+ for (size_t index = 0; index < kMaxConcurrentCalls; ++index) {
+ fuzzy_calls_.emplace_back(index);
+ indices_.push_back(index);
+ contexts_.push_back(CallbackContext{.id = index, .fuzzer = this});
+ }
+}
+
+void Fuzzer::Run(uint64_t seed, size_t num_actions) {
+ FUZZ_LOG_VERBOSE("Fuzzing RPC client with:");
+ FUZZ_LOG_VERBOSE(" num_actions: %zu", num_actions);
+ FUZZ_LOG_VERBOSE(" seed: %" PRIu64, seed);
+ num_actions_.store(0);
+ random::XorShiftStarRng64 rng(seed);
+ while (true) {
+ {
+ size_t actions_done = num_actions_.load();
+ if (actions_done >= num_actions) {
+ FUZZ_LOG_VERBOSE("Fuzzing complete; %zu actions performed.",
+ actions_done);
+ break;
+ }
+ FUZZ_LOG_VERBOSE("%zu actions remaining.", num_actions - actions_done);
+ }
+ FUZZ_LOG_VERBOSE("Generating %zu random actions.", kMaxActions);
+ pw::Vector<uint32_t, kMaxActions> actions;
+ for (size_t i = 0; i < kNumThreads; ++i) {
+ size_t num_actions_for_thread;
+ rng.GetInt(num_actions_for_thread, kMaxActionsPerThread + 1);
+ for (size_t j = 0; j < num_actions_for_thread; ++j) {
+ uint32_t encoded = 0;
+ while (!encoded) {
+ rng.GetInt(encoded);
+ }
+ actions.push_back(encoded);
+ }
+ actions.push_back(0);
+ }
+ Run(actions);
+ }
+}
+
+void Fuzzer::Run(const pw::Vector<uint32_t>& actions) {
+ FUZZ_LOG_VERBOSE("Starting %zu threads to perform %zu actions:",
+ kNumThreads - 1,
+ actions.size());
+ FUZZ_LOG_VERBOSE(" timeout: %lldms", timer_.timeout() / 1ms);
+ auto iter = actions.begin();
+ timer_.Restart();
+ for (size_t thread_id = 0; thread_id < kNumThreads; ++thread_id) {
+ pw::Vector<uint32_t, kMaxActionsPerThread> thread_actions;
+ while (thread_actions.size() < kMaxActionsPerThread &&
+ iter != actions.end()) {
+ uint32_t encoded = *iter++;
+ if (!encoded) {
+ break;
+ }
+ thread_actions.push_back(encoded);
+ }
+ if (thread_id == 0) {
+ std::lock_guard lock(mutex_);
+ callback_actions_ = std::move(thread_actions);
+ callback_iterator_ = callback_actions_.begin();
+ } else {
+ threads_.emplace_back(
+ [this, thread_id, actions = std::move(thread_actions)]() {
+ for (const auto& encoded : actions) {
+ Action action(encoded);
+ action.set_thread_id(thread_id);
+ Perform(action);
+ }
+ });
+ }
+ }
+ for (auto& t : threads_) {
+ t.join();
+ }
+ for (auto& fuzzy_call : fuzzy_calls_) {
+ fuzzy_call.Reset();
+ }
+ timer_.Cancel();
+}
+
+void Fuzzer::Perform(const Action& action) {
+ FuzzyCall& fuzzy_call = FindCall(action.target);
+ switch (action.op) {
+ case Action::kSkip: {
+ if (action.thread_id == 0) {
+ action.Log(verbose_, ++num_actions_, "Callback chain completed");
+ }
+ break;
+ }
+ case Action::kWait: {
+ if (action.callback_id == action.target) {
+ // Don't wait in a callback of the target call.
+ break;
+ }
+ if (fuzzy_call.pending()) {
+ action.Log(verbose_, ++num_actions_, "Waiting for call.");
+ fuzzy_call.Await();
+ }
+ break;
+ }
+ case Action::kWriteUnary:
+ case Action::kWriteStream: {
+ if (action.callback_id == action.target) {
+ // Don't create a new call from the call's own callback.
+ break;
+ }
+ char buf[kMaxWriteLen];
+ char val = Action::DecodeWriteValue(action.value);
+ size_t len = Action::DecodeWriteLength(action.value);
+ memset(buf, val, len);
+ if (verbose_) {
+ char msg_buf[64];
+ span msg(msg_buf);
+ auto result = string::Format(
+ msg,
+ "Writing %s request of ",
+ action.op == Action::kWriteUnary ? "unary" : "stream");
+ if (result.ok()) {
+ size_t off = result.size();
+ result = string::Format(
+ msg.subspan(off),
+ isprint(val) ? "['%c'; %zu]." : "['\\x%02x'; %zu].",
+ val,
+ len);
+ }
+ size_t num_actions = ++num_actions_;
+ if (result.ok()) {
+ action.Log(verbose_, num_actions, "%s", msg.data());
+ } else if (verbose_) {
+ action.LogFailure(verbose_, num_actions, result.status());
+ }
+ }
+ bool append = false;
+ if (action.op == Action::kWriteUnary) {
+ // Send a unary request.
+ fuzzy_call.Reset(client_.UnaryEcho(
+ as_bytes(span(buf, len)),
+ /* on completed */
+ [context = GetContext(action.target)](ConstByteSpan, Status) {
+ context->fuzzer->OnCompleted(context->id);
+ },
+ /* on error */
+ [context = GetContext(action.target)](Status status) {
+ context->fuzzer->OnError(context->id, status);
+ }));
+
+ } else if (fuzzy_call.Visit(
+ WriteVisitor{.data = as_bytes(span(buf, len))})) {
+ // Append to an existing stream
+ append = true;
+ } else {
+ // .Open a new stream.
+ fuzzy_call.Reset(client_.BidirectionalEcho(
+ /* on next */
+ [context = GetContext(action.target)](ConstByteSpan) {
+ context->fuzzer->OnNext(context->id);
+ },
+ /* on completed */
+ [context = GetContext(action.target)](Status) {
+ context->fuzzer->OnCompleted(context->id);
+ },
+ /* on error */
+ [context = GetContext(action.target)](Status status) {
+ context->fuzzer->OnError(context->id, status);
+ }));
+ }
+ fuzzy_call.RecordWrite(len, append);
+ break;
+ }
+ case Action::kCloseClientStream:
+ action.Log(verbose_, ++num_actions_, "Closing stream.");
+ fuzzy_call.Visit(CloseClientStreamVisitor());
+ break;
+ case Action::kCancel:
+ action.Log(verbose_, ++num_actions_, "Canceling call.");
+ fuzzy_call.Visit(CancelVisitor());
+ break;
+ case Action::kAbandon: {
+ action.Log(verbose_, ++num_actions_, "Abandoning call.");
+ fuzzy_call.Visit(AbandonVisitor());
+ break;
+ }
+ case Action::kSwap: {
+ size_t other_target = action.value % kMaxConcurrentCalls;
+ if (action.callback_id == action.target ||
+ action.callback_id == other_target) {
+ // Don't move a call from within its own callback.
+ break;
+ }
+ action.Log(verbose_,
+ ++num_actions_,
+ "Swapping call with call %zu.",
+ other_target);
+ std::lock_guard lock(mutex_);
+ FuzzyCall& other = FindCallLocked(other_target);
+ std::swap(indices_[fuzzy_call.id()], indices_[other.id()]);
+ fuzzy_call.Swap(other);
+ break;
+ }
+ case Action::kDestroy: {
+ if (action.callback_id == action.target) {
+ // Don't destroy a call from within its own callback.
+ break;
+ }
+ action.Log(verbose_, ++num_actions_, "Destroying call.");
+ fuzzy_call.Reset();
+ break;
+ }
+ default:
+ break;
+ }
+ timer_.Restart();
+}
+
+void Fuzzer::OnNext(size_t callback_id) { FindCall(callback_id).Notify(); }
+
+void Fuzzer::OnCompleted(size_t callback_id) {
+ uint32_t encoded = 0;
+ {
+ std::lock_guard lock(mutex_);
+ if (callback_iterator_ != callback_actions_.end()) {
+ encoded = *callback_iterator_++;
+ }
+ }
+ Action action(encoded);
+ action.set_callback_id(callback_id);
+ Perform(action);
+ FindCall(callback_id).Notify();
+}
+
+void Fuzzer::OnError(size_t callback_id, Status status) {
+ FuzzyCall& call = FindCall(callback_id);
+ PW_LOG_WARN("Call %zu received an error from the server: %s",
+ call.id(),
+ pw_StatusString(status));
+ call.Notify();
+}
+
+} // namespace pw::rpc::fuzz
diff --git a/pw_rpc/fuzz/engine_test.cc b/pw_rpc/fuzz/engine_test.cc
new file mode 100644
index 000000000..5ac1a49a1
--- /dev/null
+++ b/pw_rpc/fuzz/engine_test.cc
@@ -0,0 +1,264 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_rpc/fuzz/engine.h"
+
+#include <chrono>
+
+#include "gtest/gtest.h"
+#include "pw_containers/vector.h"
+#include "pw_log/log.h"
+#include "pw_rpc/benchmark.h"
+#include "pw_rpc/internal/client_server_testing_threaded.h"
+#include "pw_rpc/internal/fake_channel_output.h"
+#include "pw_thread/test_threads.h"
+
+namespace pw::rpc::fuzz {
+namespace {
+
+using namespace std::literals::chrono_literals;
+
+// Maximum time, in milliseconds, that can elapse without a call completing or
+// being dropped in some way..
+const chrono::SystemClock::duration kTimeout = 5s;
+
+// These are fairly tight constraints in order to fit within the default
+// `PW_UNIT_TEST_CONFIG_MEMORY_POOL_SIZE`.
+constexpr size_t kMaxPackets = 128;
+constexpr size_t kMaxPayloadSize = 64;
+
+using BufferedChannelOutputBase =
+ internal::test::FakeChannelOutputBuffer<kMaxPackets, kMaxPayloadSize>;
+
+/// Channel output backed by a fixed buffer.
+class BufferedChannelOutput : public BufferedChannelOutputBase {
+ public:
+ BufferedChannelOutput() : BufferedChannelOutputBase() {}
+};
+
+using FuzzerChannelOutputBase =
+ internal::WatchableChannelOutput<BufferedChannelOutput,
+ kMaxPayloadSize,
+ kMaxPackets,
+ kMaxPayloadSize>;
+
+/// Channel output that can be waited on by the server.
+class FuzzerChannelOutput : public FuzzerChannelOutputBase {
+ public:
+ FuzzerChannelOutput() : FuzzerChannelOutputBase() {}
+};
+
+using FuzzerContextBase =
+ internal::ClientServerTestContextThreaded<FuzzerChannelOutput,
+ kMaxPayloadSize,
+ kMaxPackets,
+ kMaxPayloadSize>;
+class FuzzerContext : public FuzzerContextBase {
+ public:
+ static constexpr uint32_t kChannelId = 1;
+
+ FuzzerContext() : FuzzerContextBase(thread::test::TestOptionsThread0()) {}
+};
+
+class RpcFuzzTestingTest : public testing::Test {
+ protected:
+ void SetUp() override { context_.server().RegisterService(service_); }
+
+ void Add(Action::Op op, size_t target, uint16_t value) {
+ actions_.push_back(Action(op, target, value).Encode());
+ }
+
+ void Add(Action::Op op, size_t target, char val, size_t len) {
+ actions_.push_back(Action(op, target, val, len).Encode());
+ }
+
+ void NextThread() { actions_.push_back(0); }
+
+ void Run() {
+ Fuzzer fuzzer(context_.client(), FuzzerContext::kChannelId);
+ fuzzer.set_verbose(true);
+ fuzzer.set_timeout(kTimeout);
+ fuzzer.Run(actions_);
+ }
+
+ private:
+ FuzzerContext context_;
+ BenchmarkService service_;
+ Vector<uint32_t, Fuzzer::kMaxActions> actions_;
+};
+
+TEST_F(RpcFuzzTestingTest, SequentialRequests) {
+ // Callback thread
+ Add(Action::kWriteStream, 1, 'B', 1);
+ Add(Action::kSkip, 0, 0);
+ Add(Action::kWriteStream, 2, 'B', 2);
+ Add(Action::kSkip, 0, 0);
+ Add(Action::kWriteStream, 3, 'B', 3);
+ Add(Action::kSkip, 0, 0);
+ NextThread();
+
+ // Thread 1
+ Add(Action::kWriteStream, 0, 'A', 2);
+ Add(Action::kWait, 1, 0);
+ Add(Action::kWriteStream, 1, 'A', 4);
+ NextThread();
+
+ // Thread 2
+ NextThread();
+ Add(Action::kWait, 2, 0);
+ Add(Action::kWriteStream, 2, 'A', 6);
+
+ // Thread 3
+ NextThread();
+ Add(Action::kWait, 3, 0);
+
+ Run();
+}
+
+// TODO(b/274437709): Re-enable.
+TEST_F(RpcFuzzTestingTest, DISABLED_SimultaneousRequests) {
+ // Callback thread
+ NextThread();
+
+ // Thread 1
+ Add(Action::kWriteUnary, 1, 'A', 1);
+ Add(Action::kWait, 2, 0);
+ NextThread();
+
+ // Thread 2
+ Add(Action::kWriteUnary, 2, 'B', 2);
+ Add(Action::kWait, 3, 0);
+ NextThread();
+
+ // Thread 3
+ Add(Action::kWriteUnary, 3, 'C', 3);
+ Add(Action::kWait, 1, 0);
+ NextThread();
+
+ Run();
+}
+
+// TODO(b/274437709) This test currently does not pass as it exhausts the fake
+// channel. It will be re-enabled when the underlying stream is swapped for
+// a pw_ring_buffer-based approach.
+TEST_F(RpcFuzzTestingTest, DISABLED_CanceledRequests) {
+ // Callback thread
+ NextThread();
+
+ // Thread 1
+ for (size_t i = 0; i < 10; ++i) {
+ Add(Action::kWriteUnary, i % 3, 'A', i);
+ }
+ Add(Action::kWait, 0, 0);
+ Add(Action::kWait, 1, 0);
+ Add(Action::kWait, 2, 0);
+ NextThread();
+
+ // Thread 2
+ for (size_t i = 0; i < 10; ++i) {
+ Add(Action::kCancel, i % 3, 0);
+ }
+ NextThread();
+
+ // Thread 3
+ NextThread();
+
+ Run();
+}
+
+// TODO(b/274437709) This test currently does not pass as it exhausts the fake
+// channel. It will be re-enabled when the underlying stream is swapped for
+// a pw_ring_buffer-based approach.
+TEST_F(RpcFuzzTestingTest, DISABLED_AbandonedRequests) {
+ // Callback thread
+ NextThread();
+
+ // Thread 1
+ for (size_t i = 0; i < 10; ++i) {
+ Add(Action::kWriteUnary, i % 3, 'A', i);
+ }
+ Add(Action::kWait, 0, 0);
+ Add(Action::kWait, 1, 0);
+ Add(Action::kWait, 2, 0);
+ NextThread();
+
+ // Thread 2
+ for (size_t i = 0; i < 10; ++i) {
+ Add(Action::kAbandon, i % 3, 0);
+ }
+ NextThread();
+
+ // Thread 3
+ NextThread();
+
+ Run();
+}
+
+// TODO(b/274437709) This test currently does not pass as it exhausts the fake
+// channel. It will be re-enabled when the underlying stream is swapped for
+// a pw_ring_buffer-based approach.
+TEST_F(RpcFuzzTestingTest, DISABLED_SwappedRequests) {
+ Vector<uint32_t, Fuzzer::kMaxActions> actions;
+ // Callback thread
+ NextThread();
+ // Thread 1
+ for (size_t i = 0; i < 10; ++i) {
+ Add(Action::kWriteUnary, i % 3, 'A', i);
+ }
+ Add(Action::kWait, 0, 0);
+ Add(Action::kWait, 1, 0);
+ Add(Action::kWait, 2, 0);
+ NextThread();
+ // Thread 2
+ for (size_t i = 0; i < 100; ++i) {
+ auto j = i % 3;
+ Add(Action::kSwap, j, j + 1);
+ }
+ NextThread();
+ // Thread 3
+ NextThread();
+
+ Run();
+}
+
+// TODO(b/274437709) This test currently does not pass as it exhausts the fake
+// channel. It will be re-enabled when the underlying stream is swapped for
+// a pw_ring_buffer-based approach.
+TEST_F(RpcFuzzTestingTest, DISABLED_DestroyedRequests) {
+ // Callback thread
+ NextThread();
+
+ // Thread 1
+ for (size_t i = 0; i < 100; ++i) {
+ Add(Action::kWriteUnary, i % 3, 'A', i);
+ }
+ Add(Action::kWait, 0, 0);
+ Add(Action::kWait, 1, 0);
+ Add(Action::kWait, 2, 0);
+ NextThread();
+
+ // Thread 2
+ for (size_t i = 0; i < 100; ++i) {
+ Add(Action::kDestroy, i % 3, 0);
+ }
+ NextThread();
+
+ // Thread 3
+ NextThread();
+
+ Run();
+}
+
+} // namespace
+} // namespace pw::rpc::fuzz
diff --git a/pw_rpc/fuzz/public/pw_rpc/fuzz/alarm_timer.h b/pw_rpc/fuzz/public/pw_rpc/fuzz/alarm_timer.h
new file mode 100644
index 000000000..9ccd7ce0b
--- /dev/null
+++ b/pw_rpc/fuzz/public/pw_rpc/fuzz/alarm_timer.h
@@ -0,0 +1,56 @@
+// Copyright 2022 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include "pw_chrono/system_clock.h"
+#include "pw_chrono/system_timer.h"
+
+namespace pw::rpc::fuzz {
+
+/// Represents a timer that invokes a callback on timeout. Once started, it will
+/// invoke the callback after a provided duration unless it is restarted,
+/// canceled, or destroyed.
+class AlarmTimer {
+ public:
+ AlarmTimer(chrono::SystemTimer::ExpiryCallback&& on_timeout)
+ : timer_(std::move(on_timeout)) {}
+
+ chrono::SystemClock::duration timeout() const { return timeout_; }
+
+ /// "Arms" the timer. The callback will be invoked if `timeout` elapses
+ /// without a call to `Restart`, `Cancel`, or the destructor. Calling `Start`
+ /// again restarts the timer, possibly with a different `timeout` value.
+ void Start(chrono::SystemClock::duration timeout) {
+ timeout_ = timeout;
+ Restart();
+ }
+
+ /// Restarts the timer. This is equivalent to calling `Start` with the same
+ /// `timeout` as passed previously. Does nothing if `Start` has not been
+ /// called.
+ void Restart() {
+ Cancel();
+ timer_.InvokeAfter(timeout_);
+ }
+
+ /// "Disarms" the timer. The callback will not be invoked unless `Start` is
+ /// called again. Does nothing if `Start` has not been called.
+ void Cancel() { timer_.Cancel(); }
+
+ private:
+ chrono::SystemTimer timer_;
+ chrono::SystemClock::duration timeout_;
+};
+
+} // namespace pw::rpc::fuzz
diff --git a/pw_rpc/fuzz/public/pw_rpc/fuzz/argparse.h b/pw_rpc/fuzz/public/pw_rpc/fuzz/argparse.h
new file mode 100644
index 000000000..05a7633e6
--- /dev/null
+++ b/pw_rpc/fuzz/public/pw_rpc/fuzz/argparse.h
@@ -0,0 +1,230 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+/// Command line argument parsing.
+///
+/// The objects defined below can be used to parse command line arguments of
+/// different types. These objects are "just enough" defined for current use
+/// cases, but the design is intended to be extensible as new types and traits
+/// are needed.
+///
+/// Example:
+///
+/// Given a boolean flag "verbose", a numerical flag "runs", and a positional
+/// "port" argument to be parsed, we can create a vector of parsers. In this
+/// example, we modify the parsers during creation to set default values:
+///
+/// @code
+/// Vector<ArgParserVariant, 3> parsers = {
+/// BoolParser("-v", "--verbose").set_default(false),
+/// UnsignedParser<size_t>("-r", "--runs").set_default(1000),
+/// UnsignedParser<uint16_t>("port").set_default(11111),
+/// };
+/// @endcode
+///
+/// With this vector, we can then parse command line arguments and extract
+/// the values of arguments that were set, e.g.:
+///
+/// @code
+/// if (!ParseArgs(parsers, argc, argv).ok()) {
+/// PrintUsage(parsers, argv[0]);
+/// return 1;
+/// }
+/// bool verbose;
+/// size_t runs;
+/// uint16_t port;
+/// if (!GetArg(parsers, "--verbose", &verbose).ok() ||
+/// !GetArg(parsers, "--runs", &runs).ok() ||
+/// !GetArg(parsers, "port", &port).ok()) {
+/// // Shouldn't happen unless names do not match.
+/// return 1;
+/// }
+///
+/// // Do stuff with `verbose`, `runs`, and `port`...
+/// @endcode
+
+#include <cstddef>
+#include <cstdint>
+#include <string_view>
+#include <variant>
+
+#include "pw_containers/vector.h"
+#include "pw_status/status.h"
+
+namespace pw::rpc::fuzz {
+
+/// Enumerates the results of trying to parse a specific command line argument
+/// with a particular parsers.
+enum ParseStatus {
+ /// The argument matched the parser and was successfully parsed without a
+ /// value.
+ kParsedOne,
+
+ /// The argument matched the parser and was successfully parsed with a value.
+ kParsedTwo,
+
+ /// The argument did not match the parser. This is not necessarily an error;
+ /// the argument may match a different parser.
+ kParseMismatch,
+
+ /// The argument matched a parser, but could not be parsed. This may be due to
+ /// a missing value for a flag, a value of the wrong type, a provided value
+ /// being out of range, etc. Parsers should log additional details before
+ /// returning this value.
+ kParseFailure,
+};
+
+/// Holds parsed argument values of different types.
+using ArgVariant = std::variant<std::monostate, bool, uint64_t>;
+
+/// Base class for argument parsers.
+class ArgParserBase {
+ public:
+ virtual ~ArgParserBase() = default;
+
+ std::string_view short_name() const { return short_name_; }
+ std::string_view long_name() const { return long_name_; }
+ bool positional() const { return positional_; }
+
+ /// Clears the value. Typically, command line arguments are only parsed once,
+ /// but this method is useful for testing.
+ void Reset() { value_ = std::monostate(); }
+
+ protected:
+ /// Defines an argument parser with a single name. This may be a positional
+ /// argument or a flag.
+ ArgParserBase(std::string_view name);
+
+ /// Defines an argument parser for a flag with short and long names.
+ ArgParserBase(std::string_view shortopt, std::string_view longopt);
+
+ void set_initial(ArgVariant initial) { initial_ = initial; }
+ void set_value(ArgVariant value) { value_ = value; }
+
+ /// Examines if the given `arg` matches this parser. A parser for a flag can
+ /// match the short name (e.g. '-f') if set, or the long name (e.g. '--foo').
+ /// A parser for a positional argument will match anything until it has a
+ /// value set.
+ bool Match(std::string_view arg);
+
+ /// Returns the parsed value.
+ template <typename T>
+ T Get() const {
+ return std::get<T>(GetValue());
+ }
+
+ private:
+ const ArgVariant& GetValue() const;
+
+ std::string_view short_name_;
+ std::string_view long_name_;
+ bool positional_;
+
+ ArgVariant initial_;
+ ArgVariant value_;
+};
+
+// Argument parsers for boolean arguments. These arguments are always flags, and
+// can be specified as, e.g. "-f" (true), "--foo" (true) or "--no-foo" (false).
+class BoolParser : public ArgParserBase {
+ public:
+ BoolParser(std::string_view optname);
+ BoolParser(std::string_view shortopt, std::string_view longopt);
+
+ bool value() const { return Get<bool>(); }
+ BoolParser& set_default(bool value);
+
+ ParseStatus Parse(std::string_view arg0,
+ std::string_view arg1 = std::string_view());
+};
+
+// Type-erasing argument parser for unsigned integer arguments. This object
+// always parses values as `uint64_t`s and should not be used directly.
+// Instead, use `UnsignedParser<T>` with a type to explicitly narrow to.
+class UnsignedParserBase : public ArgParserBase {
+ protected:
+ UnsignedParserBase(std::string_view name);
+ UnsignedParserBase(std::string_view shortopt, std::string_view longopt);
+
+ ParseStatus Parse(std::string_view arg0, std::string_view arg1, uint64_t max);
+};
+
+// Argument parser for unsigned integer arguments. These arguments may be flags
+// or positional arguments.
+template <typename T, typename std::enable_if_t<std::is_unsigned_v<T>, int> = 0>
+class UnsignedParser : public UnsignedParserBase {
+ public:
+ UnsignedParser(std::string_view name) : UnsignedParserBase(name) {}
+ UnsignedParser(std::string_view shortopt, std::string_view longopt)
+ : UnsignedParserBase(shortopt, longopt) {}
+
+ T value() const { return static_cast<T>(Get<uint64_t>()); }
+
+ UnsignedParser& set_default(T value) {
+ set_initial(static_cast<uint64_t>(value));
+ return *this;
+ }
+
+ ParseStatus Parse(std::string_view arg0,
+ std::string_view arg1 = std::string_view()) {
+ return UnsignedParserBase::Parse(arg0, arg1, std::numeric_limits<T>::max());
+ }
+};
+
+// Holds argument parsers of different types.
+using ArgParserVariant =
+ std::variant<BoolParser, UnsignedParser<uint16_t>, UnsignedParser<size_t>>;
+
+// Parses the command line arguments and sets the values of the given `parsers`.
+Status ParseArgs(Vector<ArgParserVariant>& parsers, int argc, char** argv);
+
+// Logs a usage message based on the given `parsers` and the program name given
+// by `argv0`.
+void PrintUsage(const Vector<ArgParserVariant>& parsers,
+ std::string_view argv0);
+
+// Attempts to find the parser in `parsers` with the given `name`, and returns
+// its value if found.
+std::optional<ArgVariant> GetArg(const Vector<ArgParserVariant>& parsers,
+ std::string_view name);
+
+inline void GetArgValue(const ArgVariant& arg, bool* out) {
+ *out = std::get<bool>(arg);
+}
+
+template <typename T, typename std::enable_if_t<std::is_unsigned_v<T>, int> = 0>
+void GetArgValue(const ArgVariant& arg, T* out) {
+ *out = static_cast<T>(std::get<uint64_t>(arg));
+}
+
+// Like `GetArgVariant` above, but extracts the typed value from the variant
+// into `out`. Returns an error if no parser exists in `parsers` with the given
+// `name`.
+template <typename T>
+Status GetArg(const Vector<ArgParserVariant>& parsers,
+ std::string_view name,
+ T* out) {
+ const auto& arg = GetArg(parsers, name);
+ if (!arg.has_value()) {
+ return Status::InvalidArgument();
+ }
+ GetArgValue(*arg, out);
+ return OkStatus();
+}
+
+// Resets the parser with the given name. Returns an error if not found.
+Status ResetArg(Vector<ArgParserVariant>& parsers, std::string_view name);
+
+} // namespace pw::rpc::fuzz
diff --git a/pw_rpc/fuzz/public/pw_rpc/fuzz/engine.h b/pw_rpc/fuzz/public/pw_rpc/fuzz/engine.h
new file mode 100644
index 000000000..34e92c003
--- /dev/null
+++ b/pw_rpc/fuzz/public/pw_rpc/fuzz/engine.h
@@ -0,0 +1,339 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <atomic>
+#include <cstdarg>
+#include <cstddef>
+#include <cstdint>
+#include <thread>
+#include <variant>
+
+#include "pw_containers/vector.h"
+#include "pw_random/xor_shift.h"
+#include "pw_rpc/benchmark.h"
+#include "pw_rpc/benchmark.raw_rpc.pb.h"
+#include "pw_rpc/fuzz/alarm_timer.h"
+#include "pw_sync/condition_variable.h"
+#include "pw_sync/lock_annotations.h"
+#include "pw_sync/mutex.h"
+#include "pw_sync/timed_mutex.h"
+
+namespace pw::rpc::fuzz {
+
+/// Describes an action a fuzzing thread can perform on a call.
+struct Action {
+ enum Op : uint8_t {
+ /// No-op.
+ kSkip,
+
+ /// Waits for the call indicated by `target` to complete.
+ kWait,
+
+ /// Makes a new unary request using the call indicated by `target`. The data
+ /// written is derived from `value`.
+ kWriteUnary,
+
+ /// Writes to a stream request using the call indicated by `target`, or
+ /// makes
+ /// a new one if not currently a stream call. The data written is derived
+ /// from `value`.
+ kWriteStream,
+
+ /// Closes the stream if the call indicated by `target` is a stream call.
+ kCloseClientStream,
+
+ /// Cancels the call indicated by `target`.
+ kCancel,
+
+ /// Abandons the call indicated by `target`.
+ kAbandon,
+
+ /// Swaps the call indicated by `target` with a call indicated by `value`.
+ kSwap,
+
+ /// Sets the call indicated by `target` to an initial, unset state.
+ kDestroy,
+ };
+
+ constexpr Action() = default;
+ Action(uint32_t encoded);
+ Action(Op op, size_t target, uint16_t value);
+ Action(Op op, size_t target, char val, size_t len);
+ ~Action() = default;
+
+ void set_thread_id(size_t thread_id_) {
+ thread_id = thread_id_;
+ callback_id = std::numeric_limits<size_t>::max();
+ }
+
+ void set_callback_id(size_t callback_id_) {
+ thread_id = 0;
+ callback_id = callback_id_;
+ }
+
+ // For a write action's value, returns the character value to be written.
+ static char DecodeWriteValue(uint16_t value);
+
+ // For a write action's value, returns the number of characters to be written.
+ static size_t DecodeWriteLength(uint16_t value);
+
+ /// Returns a value that represents the fields of an action. Constructing an
+ /// `Action` with this value will produce the same fields.
+ uint32_t Encode() const;
+
+ /// Records details of the action being performed if verbose logging is
+ /// enabled.
+ void Log(bool verbose, size_t num_actions, const char* fmt, ...) const;
+
+ /// Records an encountered when trying to log an action.
+ void LogFailure(bool verbose, size_t num_actions, Status status) const;
+
+ Op op = kSkip;
+ size_t target = 0;
+ uint16_t value = 0;
+
+ size_t thread_id = 0;
+ size_t callback_id = std::numeric_limits<size_t>::max();
+};
+
+/// Wraps an RPC call that may be either a `RawUnaryReceiver` or
+/// `RawClientReaderWriter`. Allows applying `Action`s to each possible
+/// type of call.
+class FuzzyCall {
+ public:
+ using Variant =
+ std::variant<std::monostate, RawUnaryReceiver, RawClientReaderWriter>;
+
+ explicit FuzzyCall(size_t index) : index_(index), id_(index) {}
+ ~FuzzyCall() = default;
+
+ size_t id() {
+ std::lock_guard lock(mutex_);
+ return id_;
+ }
+
+ bool pending() {
+ std::lock_guard lock(mutex_);
+ return pending_;
+ }
+
+ /// Applies the given visitor to the call variant. If the action taken by the
+ /// visitor is expected to complete the call, it will notify any threads
+ /// waiting for the call to complete. This version of the method does not
+ /// return the result of the visiting the variant.
+ template <typename Visitor,
+ typename std::enable_if_t<
+ std::is_same_v<typename Visitor::result_type, void>,
+ int> = 0>
+ typename Visitor::result_type Visit(Visitor visitor, bool completes = true) {
+ {
+ std::lock_guard lock(mutex_);
+ std::visit(std::move(visitor), call_);
+ }
+ if (completes && pending_.exchange(false)) {
+ cv_.notify_all();
+ }
+ }
+
+ /// Applies the given visitor to the call variant. If the action taken by the
+ /// visitor is expected to complete the call, it will notify any threads
+ /// waiting for the call to complete. This version of the method returns the
+ /// result of the visiting the variant.
+ template <typename Visitor,
+ typename std::enable_if_t<
+ !std::is_same_v<typename Visitor::result_type, void>,
+ int> = 0>
+ typename Visitor::result_type Visit(Visitor visitor, bool completes = true) {
+ typename Visitor::result_type result;
+ {
+ std::lock_guard lock(mutex_);
+ result = std::visit(std::move(visitor), call_);
+ }
+ if (completes && pending_.exchange(false)) {
+ cv_.notify_all();
+ }
+ return result;
+ }
+
+ // Records the number of bytes written as part of a request. If `append` is
+ // true, treats the write as a continuation of a streaming request.
+ void RecordWrite(size_t num, bool append = false);
+
+ /// Waits to be notified that a callback has been invoked.
+ void Await() PW_LOCKS_EXCLUDED(mutex_);
+
+ /// Completes the call, notifying any waiters.
+ void Notify() PW_LOCKS_EXCLUDED(mutex_);
+
+ /// Exchanges the call represented by this object with another.
+ void Swap(FuzzyCall& other);
+
+ /// Resets the call wrapped by this object with a new one. Destorys the
+ /// previous call.
+ void Reset(Variant call = Variant()) PW_LOCKS_EXCLUDED(mutex_);
+
+ // Reports the state of this object.
+ void Log() PW_LOCKS_EXCLUDED(mutex_);
+
+ private:
+ /// This represents the index in the engine's list of calls. It is used to
+ /// ensure a consistent order of locking multiple calls.
+ const size_t index_;
+
+ sync::TimedMutex mutex_;
+ sync::ConditionVariable cv_;
+
+ /// An identifier that can be used find this object, e.g. by a callback, even
+ /// when it has been swapped with another call.
+ size_t id_ PW_GUARDED_BY(mutex_);
+
+ /// Holds the actual pw::rpc::Call object, when present.
+ Variant call_ PW_GUARDED_BY(mutex_);
+
+ /// Set when a request is sent, and cleared when a callback is invoked.
+ std::atomic_bool pending_ = false;
+
+ /// Bytes sent in the last unary request or stream write.
+ size_t last_write_ PW_GUARDED_BY(mutex_) = 0;
+
+ /// Total bytes sent using this call object.
+ size_t total_written_ PW_GUARDED_BY(mutex_) = 0;
+};
+
+/// The main RPC fuzzing engine.
+///
+/// This class takes or generates a sequence of actions, and dsitributes them to
+/// a number of threads that can perform them using an RPC client. Passing the
+/// same seed to the engine at construction will allow it to generate the same
+/// sequence of actions.
+class Fuzzer {
+ public:
+ /// Number of fuzzing threads. The first thread counted is the RPC dispatch
+ /// thread.
+ static constexpr size_t kNumThreads = 4;
+
+ /// Maximum number of actions that a single thread will try to perform before
+ /// exiting.
+ static constexpr size_t kMaxActionsPerThread = 255;
+
+ /// The number of call objects available to be used for fuzzing.
+ static constexpr size_t kMaxConcurrentCalls = 8;
+
+ /// The mxiumum number of individual fuzzing actions that the fuzzing threads
+ /// can perform. The `+ 1` is to allow the inclusion of a special `0` action
+ /// to separate each thread's actions when concatenated into a single list.
+ static constexpr size_t kMaxActions =
+ kNumThreads * (kMaxActionsPerThread + 1);
+
+ explicit Fuzzer(Client& client, uint32_t channel_id);
+
+ /// The fuzzer engine should remain pinned in memory since it is referenced by
+ /// the `CallbackContext`s.
+ Fuzzer(const Fuzzer&) = delete;
+ Fuzzer(Fuzzer&&) = delete;
+ Fuzzer& operator=(const Fuzzer&) = delete;
+ Fuzzer& operator=(Fuzzer&&) = delete;
+
+ void set_verbose(bool verbose) { verbose_ = verbose; }
+
+ /// Sets the timeout and starts the timer.
+ void set_timeout(chrono::SystemClock::duration timeout) {
+ timer_.Start(timeout);
+ }
+
+ /// Generates encoded actions from the RNG and `Run`s them.
+ void Run(uint64_t seed, size_t num_actions);
+
+ /// Splits the provided `actions` between the fuzzing threads and runs them to
+ /// completion.
+ void Run(const Vector<uint32_t>& actions);
+
+ private:
+ /// Information passed to the RPC callbacks, including the index of the
+ /// associated call and a pointer to the fuzzer object.
+ struct CallbackContext {
+ size_t id;
+ Fuzzer* fuzzer;
+ };
+
+ /// Restarts the alarm timer, delaying it from detecting a timeout. This is
+ /// called whenever actions complete and indicates progress is still being
+ /// made.
+ void ResetTimerLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+
+ /// Decodes the `encoded` action and performs it. The `thread_id` is used for
+ /// verbose diagnostics. When invoked from `PerformCallback` the `callback_id`
+ /// will be set to the index of the associated call. This allows avoiding
+ /// specific, prohibited actions, e.g. destroying a call from its own
+ /// callback.
+ void Perform(const Action& action) PW_LOCKS_EXCLUDED(mutex_);
+
+ /// Returns the call with the matching `id`.
+ FuzzyCall& FindCall(size_t id) PW_LOCKS_EXCLUDED(mutex_) {
+ std::lock_guard lock(mutex_);
+ return FindCallLocked(id);
+ }
+
+ FuzzyCall& FindCallLocked(size_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
+ return fuzzy_calls_[indices_[id]];
+ }
+
+ /// Returns a pointer to callback context for the given call index.
+ CallbackContext* GetContext(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_) {
+ std::lock_guard lock(mutex_);
+ return &contexts_[callback_id];
+ }
+
+ /// Callback for stream write made by the call with the given `callback_id`.
+ void OnNext(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_);
+
+ /// Callback for completed request for the call with the given `callback_id`.
+ void OnCompleted(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_);
+
+ /// Callback for an error for the call with the given `callback_id`.
+ void OnError(size_t callback_id, Status status) PW_LOCKS_EXCLUDED(mutex_);
+
+ bool verbose_ = false;
+ pw_rpc::raw::Benchmark::Client client_;
+ BenchmarkService service_;
+
+ /// Alarm thread that detects when no workers have made recent progress.
+ AlarmTimer timer_;
+
+ sync::Mutex mutex_;
+
+ /// Worker threads. The first thread is the RPC response dispatcher.
+ Vector<std::thread, kNumThreads> threads_;
+
+ /// RPC call objects.
+ Vector<FuzzyCall, kMaxConcurrentCalls> fuzzy_calls_;
+
+ /// Maps each call's IDs to its index. Since calls may be move before their
+ /// callbacks are invoked, this list can be used to find the original call.
+ Vector<size_t, kMaxConcurrentCalls> indices_ PW_GUARDED_BY(mutex_);
+
+ /// Context objects used to reference the engine and call.
+ Vector<CallbackContext, kMaxConcurrentCalls> contexts_ PW_GUARDED_BY(mutex_);
+
+ /// Set of actions performed as callbacks from other calls.
+ Vector<uint32_t, kMaxActionsPerThread> callback_actions_
+ PW_GUARDED_BY(mutex_);
+ Vector<uint32_t>::iterator callback_iterator_ PW_GUARDED_BY(mutex_);
+
+ /// Total actions performed by all workers.
+ std::atomic<size_t> num_actions_ = 0;
+};
+
+} // namespace pw::rpc::fuzz
diff --git a/pw_rpc/internal/integration_test_ports.gni b/pw_rpc/internal/integration_test_ports.gni
index 5413f8796..5197aab46 100644
--- a/pw_rpc/internal/integration_test_ports.gni
+++ b/pw_rpc/internal/integration_test_ports.gni
@@ -16,4 +16,5 @@
# in one place to prevent accidental conflicts between tests.
pw_rpc_PYTHON_CLIENT_CPP_SERVER_TEST_PORT = 30576
pw_rpc_CPP_CLIENT_INTEGRATION_TEST_PORT = 30577
+pw_rpc_CPP_CLIENT_FUZZER_TEST_PORT = 30578
pw_unit_test_RPC_SERVICE_TEST_PORT = 30580
diff --git a/pw_rpc/py/pw_rpc/testing.py b/pw_rpc/py/pw_rpc/testing.py
index 0f1475899..de8ab6223 100644
--- a/pw_rpc/py/pw_rpc/testing.py
+++ b/pw_rpc/py/pw_rpc/testing.py
@@ -14,6 +14,7 @@
"""Utilities for testing pw_rpc."""
import argparse
+import shlex
import subprocess
import sys
import tempfile
@@ -68,8 +69,22 @@ def _parse_subprocess_integration_test_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description='Executes a test between two subprocesses'
)
- parser.add_argument('--client', required=True, help='Client binary to run')
- parser.add_argument('--server', required=True, help='Server binary to run')
+ parser.add_argument(
+ '--client',
+ required=True,
+ help=(
+ 'Client command to run. '
+ 'Use quotes and whitespace to pass client-specifc arguments.'
+ ),
+ )
+ parser.add_argument(
+ '--server',
+ required=True,
+ help=(
+ 'Server command to run. '
+ 'Use quotes and whitespace to pass client-specifc arguments.'
+ ),
+ )
parser.add_argument(
'common_args',
metavar='-- ...',
@@ -96,6 +111,7 @@ def execute_integration_test(
common_args: Sequence[str],
setup_time_s: float = 0.2,
) -> int:
+ """Runs an RPC server and client as part of an integration test."""
temp_dir: Optional[tempfile.TemporaryDirectory] = None
if TEMP_DIR_MARKER in common_args:
@@ -105,11 +121,17 @@ def execute_integration_test(
]
try:
- server_process = subprocess.Popen([server, *common_args])
+ server_cmdline = shlex.split(server)
+ client_cmdline = shlex.split(client)
+ if common_args:
+ server_cmdline += [*common_args]
+ client_cmdline += [*common_args]
+
+ server_process = subprocess.Popen(server_cmdline)
# TODO(b/234879791): Replace this delay with some sort of IPC.
time.sleep(setup_time_s)
- result = subprocess.run([client, *common_args]).returncode
+ result = subprocess.run(client_cmdline).returncode
server_process.terminate()
server_process.communicate()