diff options
author | Aaron Green <aarongreen@google.com> | 2023-03-20 18:00:54 +0000 |
---|---|---|
committer | CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2023-03-20 18:00:54 +0000 |
commit | 3cbf0600ab0c909c471f81918942f61ed74a7235 (patch) | |
tree | 8d33d72b98c47bae9a9e4e1ce7052afdb05dc29e | |
parent | 1c3881e2c6bcc8807f2b26e58cabfe05203fec46 (diff) | |
download | pigweed-3cbf0600ab0c909c471f81918942f61ed74a7235.tar.gz |
pw_rpc: Add client_fuzz_test
This CL adds an integration test that uses the `test_rpc_server` and a
client that generates a pseudorandom sequence of actions from a seed.
These actions are performed in parallel by multiple threads using
multiple `Call` objects, and include:
* Creating new requests
* Waiting for responses
* Canceling requests
* Abandoning requests
* Moving `Call` objects
* Destroying `Call` objects
The callbacks for requests also perform pseudorandom actions, except
that they will not wait for other responses or move or destroy the
`Call` object represented by the callback.
Change-Id: Id7e3448e77ad8e333b0d28269709ab6f2c12be9c
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/128372
Commit-Queue: Aaron Green <aarongreen@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
-rw-r--r-- | BUILD.gn | 13 | ||||
-rw-r--r-- | pw_rpc/BUILD.gn | 1 | ||||
-rw-r--r-- | pw_rpc/benchmark.rst | 20 | ||||
-rw-r--r-- | pw_rpc/fuzz/BUILD.gn | 140 | ||||
-rw-r--r-- | pw_rpc/fuzz/alarm_timer_test.cc | 64 | ||||
-rw-r--r-- | pw_rpc/fuzz/argparse.cc | 259 | ||||
-rw-r--r-- | pw_rpc/fuzz/argparse_test.cc | 196 | ||||
-rw-r--r-- | pw_rpc/fuzz/client_fuzzer.cc | 111 | ||||
-rw-r--r-- | pw_rpc/fuzz/engine.cc | 553 | ||||
-rw-r--r-- | pw_rpc/fuzz/engine_test.cc | 264 | ||||
-rw-r--r-- | pw_rpc/fuzz/public/pw_rpc/fuzz/alarm_timer.h | 56 | ||||
-rw-r--r-- | pw_rpc/fuzz/public/pw_rpc/fuzz/argparse.h | 230 | ||||
-rw-r--r-- | pw_rpc/fuzz/public/pw_rpc/fuzz/engine.h | 339 | ||||
-rw-r--r-- | pw_rpc/internal/integration_test_ports.gni | 1 | ||||
-rw-r--r-- | pw_rpc/py/pw_rpc/testing.py | 30 |
15 files changed, 2271 insertions, 6 deletions
@@ -287,9 +287,18 @@ group("integration_tests") { # Build-only target for fuzzers. group("fuzzers") { - # Fuzzing is only supported on Linux and MacOS using clang. + deps = [] + + # TODO(b/274437709): The client_fuzzer encounters build errors on macos. Limit + # it to Linux hosts for now. + if (host_os == "linux") { + _default_tc = _default_toolchain_prefix + pw_DEFAULT_C_OPTIMIZATION_LEVEL + deps += [ "$dir_pw_rpc/fuzz:client_fuzzer($_default_tc)" ] + } + if (host_os != "win") { - deps = [ + # Coverage-guided fuzzing is only supported on Linux and MacOS using clang. + deps += [ "$dir_pw_bluetooth_hci:fuzzers($dir_pigweed/targets/host:host_clang_fuzz)", "$dir_pw_fuzzer:fuzzers($dir_pigweed/targets/host:host_clang_fuzz)", "$dir_pw_protobuf:fuzzers($dir_pigweed/targets/host:host_clang_fuzz)", diff --git a/pw_rpc/BUILD.gn b/pw_rpc/BUILD.gn index ad7ff7b1e..18c8778e3 100644 --- a/pw_rpc/BUILD.gn +++ b/pw_rpc/BUILD.gn @@ -455,6 +455,7 @@ pw_test_group("tests") { ":service_test", ] group_deps = [ + "fuzz:tests", "nanopb:tests", "pwpb:tests", "raw:tests", diff --git a/pw_rpc/benchmark.rst b/pw_rpc/benchmark.rst index 6e97ac1d0..5ecc136a8 100644 --- a/pw_rpc/benchmark.rst +++ b/pw_rpc/benchmark.rst @@ -49,3 +49,23 @@ Example server.RegisterService(benchmark_service); } +Stress Test +=========== +.. attention:: + This section is experimental and liable to change. + +The Benchmark service is also used as part of a stress test of the ``pw_rpc`` +module. This stress test is implemented as an unguided fuzzer that uses +multiple worker threads to perform generated sequences of actions using RPC +``Call`` objects. The test is included as an integration test, and can found and +be run locally using GN: + +.. code-block:: bash + + $ gn desc out //:integration_tests deps | grep fuzz + //pw_rpc/fuzz:cpp_client_server_fuzz_test(//targets/host/pigweed_internal:pw_strict_host_clang_debug) + + $ gn outputs out '//pw_rpc/fuzz:cpp_client_server_fuzz_test(//targets/host/pigweed_internal:pw_strict_host_clang_debug)' + pw_strict_host_clang_debug/gen/pw_rpc/fuzz/cpp_client_server_fuzz_test.pw_pystamp + + $ ninja -C out pw_strict_host_clang_debug/gen/pw_rpc/fuzz/cpp_client_server_fuzz_test.pw_pystamp diff --git a/pw_rpc/fuzz/BUILD.gn b/pw_rpc/fuzz/BUILD.gn new file mode 100644 index 000000000..bec42fe44 --- /dev/null +++ b/pw_rpc/fuzz/BUILD.gn @@ -0,0 +1,140 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import("//build_overrides/pigweed.gni") + +import("$dir_pw_build/target_types.gni") +import("$dir_pw_chrono/backend.gni") +import("$dir_pw_rpc/internal/integration_test_ports.gni") +import("$dir_pw_thread/backend.gni") +import("$dir_pw_unit_test/test.gni") + +config("public_include_path") { + include_dirs = [ + "public", + "$dir_pw_rpc/public", + ] + visibility = [ ":*" ] +} + +pw_source_set("alarm_timer") { + public_configs = [ ":public_include_path" ] + public = [ "public/pw_rpc/fuzz/alarm_timer.h" ] + public_deps = [ + "$dir_pw_chrono:system_clock", + "$dir_pw_chrono:system_timer", + ] + visibility = [ ":*" ] +} + +pw_test("alarm_timer_test") { + enable_if = pw_chrono_SYSTEM_TIMER_BACKEND != "" + sources = [ "alarm_timer_test.cc" ] + deps = [ + ":alarm_timer", + "$dir_pw_sync:binary_semaphore", + ] +} + +pw_source_set("argparse") { + public_configs = [ ":public_include_path" ] + public = [ "public/pw_rpc/fuzz/argparse.h" ] + sources = [ "argparse.cc" ] + public_deps = [ + "$dir_pw_containers:vector", + dir_pw_status, + ] + deps = [ + "$dir_pw_string:builder", + dir_pw_assert, + dir_pw_log, + ] + visibility = [ ":*" ] +} + +pw_test("argparse_test") { + sources = [ "argparse_test.cc" ] + deps = [ ":argparse" ] +} + +pw_source_set("engine") { + public_configs = [ ":public_include_path" ] + public = [ "public/pw_rpc/fuzz/engine.h" ] + sources = [ "engine.cc" ] + public_deps = [ + ":alarm_timer", + "$dir_pw_chrono:system_clock", + "$dir_pw_rpc:benchmark", + "$dir_pw_rpc:log_config", + "$dir_pw_rpc:protos.raw_rpc", + "$dir_pw_string:format", + "$dir_pw_sync:condition_variable", + "$dir_pw_sync:timed_mutex", + "$dir_pw_thread:thread", + dir_pw_random, + ] + deps = [ "$dir_pw_rpc:client" ] + visibility = [ ":*" ] +} + +pw_test("engine_test") { + enable_if = + pw_chrono_SYSTEM_TIMER_BACKEND == "$dir_pw_chrono_stl:system_timer" && + pw_thread_THREAD_BACKEND == "$dir_pw_thread_stl:thread" + sources = [ "engine_test.cc" ] + deps = [ + ":engine", + "$dir_pw_rpc:client_server_testing_threaded", + "$dir_pw_thread:test_threads", + "$dir_pw_thread_stl:test_threads", + dir_pw_log, + pw_chrono_SYSTEM_TIMER_BACKEND, + ] +} + +pw_executable("client_fuzzer") { + sources = [ "client_fuzzer.cc" ] + deps = [ + ":argparse", + ":engine", + "$dir_pw_rpc:client", + "$dir_pw_rpc:integration_testing", + ] +} + +pw_python_action("cpp_client_server_fuzz_test") { + script = "../py/pw_rpc/testing.py" + args = [ + "--server", + "<TARGET_FILE($dir_pw_rpc:test_rpc_server)>", + "--client", + "<TARGET_FILE(:client_fuzzer)>", + "--", + "$pw_rpc_CPP_CLIENT_FUZZER_TEST_PORT", + ] + deps = [ + ":client_fuzzer", + "$dir_pw_rpc:test_rpc_server", + ] + + stamp = true +} + +pw_test_group("tests") { + tests = [ + ":argparse_test", + ":alarm_timer_test", + ":engine_test", + ] +} diff --git a/pw_rpc/fuzz/alarm_timer_test.cc b/pw_rpc/fuzz/alarm_timer_test.cc new file mode 100644 index 000000000..ce8395a6b --- /dev/null +++ b/pw_rpc/fuzz/alarm_timer_test.cc @@ -0,0 +1,64 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_rpc/fuzz/alarm_timer.h" + +#include <chrono> + +#include "gtest/gtest.h" +#include "pw_sync/binary_semaphore.h" + +namespace pw::rpc::fuzz { +namespace { + +using namespace std::chrono_literals; + +TEST(AlarmTimerTest, Start) { + sync::BinarySemaphore sem; + AlarmTimer timer([&sem](chrono::SystemClock::time_point) { sem.release(); }); + timer.Start(10ms); + sem.acquire(); +} + +TEST(AlarmTimerTest, Restart) { + sync::BinarySemaphore sem; + AlarmTimer timer([&sem](chrono::SystemClock::time_point) { sem.release(); }); + timer.Start(50ms); + for (size_t i = 0; i < 10; ++i) { + timer.Restart(); + EXPECT_FALSE(sem.try_acquire_for(10us)); + } + sem.acquire(); +} + +TEST(AlarmTimerTest, Cancel) { + sync::BinarySemaphore sem; + AlarmTimer timer([&sem](chrono::SystemClock::time_point) { sem.release(); }); + timer.Start(50ms); + timer.Cancel(); + EXPECT_FALSE(sem.try_acquire_for(100us)); +} + +TEST(AlarmTimerTest, Destroy) { + sync::BinarySemaphore sem; + { + AlarmTimer timer( + [&sem](chrono::SystemClock::time_point) { sem.release(); }); + timer.Start(50ms); + } + EXPECT_FALSE(sem.try_acquire_for(100us)); +} + +} // namespace +} // namespace pw::rpc::fuzz diff --git a/pw_rpc/fuzz/argparse.cc b/pw_rpc/fuzz/argparse.cc new file mode 100644 index 000000000..39a5dd694 --- /dev/null +++ b/pw_rpc/fuzz/argparse.cc @@ -0,0 +1,259 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_rpc/fuzz/argparse.h" + +#include <cctype> +#include <cstring> + +#include "pw_assert/check.h" +#include "pw_log/log.h" +#include "pw_string/string_builder.h" + +namespace pw::rpc::fuzz { +namespace { + +// Visitor to `ArgVariant` used by `ParseArgs` below. +struct ParseVisitor { + std::string_view arg0; + std::string_view arg1; + + template <typename Parser> + ParseStatus operator()(Parser& parser) { + return parser.Parse(arg0, arg1); + } +}; + +// Visitor to `ArgVariant` used by `GetArg` below. +struct ValueVisitor { + std::string_view name; + + template <typename Parser> + std::optional<ArgVariant> operator()(Parser& parser) { + std::optional<ArgVariant> result; + if (parser.short_name() == name || parser.long_name() == name) { + result.emplace(parser.value()); + } + return result; + } +}; + +// Visitor to `ArgVariant` used by `PrintUsage` below. +const size_t kMaxUsageLen = 256; +struct UsageVisitor { + StringBuffer<kMaxUsageLen>* buffer; + + void operator()(const BoolParser& parser) const { + auto short_name = parser.short_name(); + auto long_name = parser.long_name(); + *buffer << " [" << short_name << "|--[no-]" << long_name.substr(2) << "]"; + } + + template <typename T> + void operator()(const UnsignedParser<T>& parser) const { + auto short_name = parser.short_name(); + auto long_name = parser.long_name(); + *buffer << " "; + if (!parser.positional()) { + *buffer << "["; + if (!short_name.empty()) { + *buffer << short_name << "|"; + } + *buffer << long_name << " "; + } + for (const auto& c : long_name) { + *buffer << static_cast<char>(toupper(c)); + } + if (!parser.positional()) { + *buffer << "]"; + } + } +}; + +// Visitor to `ArgVariant` used by `ResetArg` below. +struct ResetVisitor { + std::string_view name; + + template <typename Parser> + bool operator()(Parser& parser) { + if (parser.short_name() != name && parser.long_name() != name) { + return false; + } + parser.Reset(); + return true; + } +}; + +} // namespace + +ArgParserBase::ArgParserBase(std::string_view name) : long_name_(name) { + PW_CHECK(!name.empty()); + PW_CHECK(name != "--"); + positional_ = + name[0] != '-' || (name.size() > 2 && name.substr(0, 2) != "--"); +} + +ArgParserBase::ArgParserBase(std::string_view shortopt, + std::string_view longopt) + : short_name_(shortopt), long_name_(longopt) { + PW_CHECK(shortopt.size() == 2); + PW_CHECK(shortopt[0] == '-'); + PW_CHECK(shortopt != "--"); + PW_CHECK(longopt.size() > 2); + PW_CHECK(longopt.substr(0, 2) == "--"); + positional_ = false; +} + +bool ArgParserBase::Match(std::string_view arg) { + if (arg.empty()) { + return false; + } + if (!positional_) { + return arg == short_name_ || arg == long_name_; + } + if (!std::holds_alternative<std::monostate>(value_)) { + return false; + } + if ((arg.size() == 2 && arg[0] == '-') || + (arg.size() > 2 && arg.substr(0, 2) == "--")) { + PW_LOG_WARN("Argument parsed for '%s' appears to be a flag: '%s'", + long_name_.data(), + arg.data()); + } + return true; +} + +const ArgVariant& ArgParserBase::GetValue() const { + return std::holds_alternative<std::monostate>(value_) ? initial_ : value_; +} + +BoolParser::BoolParser(std::string_view name) : ArgParserBase(name) {} +BoolParser::BoolParser(std::string_view shortopt, std::string_view longopt) + : ArgParserBase(shortopt, longopt) {} + +BoolParser& BoolParser::set_default(bool value) { + set_initial(value); + return *this; +} + +ParseStatus BoolParser::Parse(std::string_view arg0, + [[maybe_unused]] std::string_view arg1) { + if (Match(arg0)) { + set_value(true); + return kParsedOne; + } + if (arg0.size() > 5 && arg0.substr(0, 5) == "--no-" && + arg0.substr(5) == long_name().substr(2)) { + set_value(false); + return kParsedOne; + } + return kParseMismatch; +} + +UnsignedParserBase::UnsignedParserBase(std::string_view name) + : ArgParserBase(name) {} +UnsignedParserBase::UnsignedParserBase(std::string_view shortopt, + std::string_view longopt) + : ArgParserBase(shortopt, longopt) {} + +ParseStatus UnsignedParserBase::Parse(std::string_view arg0, + std::string_view arg1, + uint64_t max) { + auto result = kParsedOne; + if (!Match(arg0)) { + return kParseMismatch; + } + if (!positional()) { + if (arg1.empty()) { + PW_LOG_ERROR("Missing value for flag '%s'", arg0.data()); + return kParseFailure; + } + arg0 = arg1; + result = kParsedTwo; + } + char* endptr; + auto value = strtoull(arg0.data(), &endptr, 0); + if (*endptr) { + PW_LOG_ERROR("Failed to parse number from '%s'", arg0.data()); + return kParseFailure; + } + if (value > max) { + PW_LOG_ERROR("Parsed value is too large: %llu", value); + return kParseFailure; + } + set_value(value); + return result; +} + +Status ParseArgs(Vector<ArgParserVariant>& parsers, int argc, char** argv) { + for (int i = 1; i < argc; ++i) { + auto arg0 = std::string_view(argv[i]); + auto arg1 = + i == (argc - 1) ? std::string_view() : std::string_view(argv[i + 1]); + bool parsed = false; + for (auto& parser : parsers) { + switch (std::visit(ParseVisitor{.arg0 = arg0, .arg1 = arg1}, parser)) { + case kParsedOne: + break; + case kParsedTwo: + ++i; + break; + case kParseMismatch: + continue; + case kParseFailure: + PW_LOG_ERROR("Failed to parse '%s'", arg0.data()); + return Status::InvalidArgument(); + } + parsed = true; + break; + } + if (!parsed) { + PW_LOG_ERROR("Unrecognized argument: '%s'", arg0.data()); + return Status::InvalidArgument(); + } + } + return OkStatus(); +} + +void PrintUsage(const Vector<ArgParserVariant>& parsers, + std::string_view argv0) { + StringBuffer<kMaxUsageLen> buffer; + buffer << "usage: " << argv0; + for (auto& parser : parsers) { + std::visit(UsageVisitor{.buffer = &buffer}, parser); + } + PW_LOG_INFO("%s", buffer.c_str()); +} + +std::optional<ArgVariant> GetArg(const Vector<ArgParserVariant>& parsers, + std::string_view name) { + for (auto& parser : parsers) { + if (auto result = std::visit(ValueVisitor{.name = name}, parser); + result.has_value()) { + return result; + } + } + return std::optional<ArgVariant>(); +} + +Status ResetArg(Vector<ArgParserVariant>& parsers, std::string_view name) { + for (auto& parser : parsers) { + if (std::visit(ResetVisitor{.name = name}, parser)) { + return OkStatus(); + } + } + return Status::InvalidArgument(); +} + +} // namespace pw::rpc::fuzz diff --git a/pw_rpc/fuzz/argparse_test.cc b/pw_rpc/fuzz/argparse_test.cc new file mode 100644 index 000000000..6f0ab3889 --- /dev/null +++ b/pw_rpc/fuzz/argparse_test.cc @@ -0,0 +1,196 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_rpc/fuzz/argparse.h" + +#include <cstdint> +#include <limits> + +#include "gtest/gtest.h" + +namespace pw::rpc::fuzz { +namespace { + +TEST(ArgsParseTest, ParseBoolFlag) { + auto parser1 = BoolParser("-t", "--true").set_default(true); + auto parser2 = BoolParser("-f").set_default(false); + EXPECT_TRUE(parser1.value()); + EXPECT_FALSE(parser2.value()); + + EXPECT_EQ(parser1.Parse("-t"), ParseStatus::kParsedOne); + EXPECT_EQ(parser2.Parse("-t"), ParseStatus::kParseMismatch); + EXPECT_TRUE(parser1.value()); + EXPECT_FALSE(parser2.value()); + + EXPECT_EQ(parser1.Parse("--true"), ParseStatus::kParsedOne); + EXPECT_EQ(parser2.Parse("--true"), ParseStatus::kParseMismatch); + EXPECT_TRUE(parser1.value()); + EXPECT_FALSE(parser2.value()); + + EXPECT_EQ(parser1.Parse("--no-true"), ParseStatus::kParsedOne); + EXPECT_EQ(parser2.Parse("--no-true"), ParseStatus::kParseMismatch); + EXPECT_FALSE(parser1.value()); + EXPECT_FALSE(parser2.value()); + + EXPECT_EQ(parser1.Parse("-f"), ParseStatus::kParseMismatch); + EXPECT_EQ(parser2.Parse("-f"), ParseStatus::kParsedOne); + EXPECT_FALSE(parser1.value()); + EXPECT_TRUE(parser2.value()); +} + +template <typename T> +void ParseUnsignedFlag() { + auto parser = UnsignedParser<T>("-u", "--unsigned").set_default(137); + EXPECT_EQ(parser.value(), 137u); + + // Wrong name. + EXPECT_EQ(parser.Parse("-s"), ParseStatus::kParseMismatch); + EXPECT_EQ(parser.Parse("--signed"), ParseStatus::kParseMismatch); + EXPECT_EQ(parser.value(), 137u); + + // Missing values. + EXPECT_EQ(parser.Parse("-u"), ParseStatus::kParseFailure); + EXPECT_EQ(parser.Parse("--unsigned"), ParseStatus::kParseFailure); + EXPECT_EQ(parser.value(), 137u); + + // Non-numeric values. + EXPECT_EQ(parser.Parse("-u", "foo"), ParseStatus::kParseFailure); + EXPECT_EQ(parser.Parse("--unsigned", "bar"), ParseStatus::kParseFailure); + EXPECT_EQ(parser.value(), 137u); + + // Minimum values. + EXPECT_EQ(parser.Parse("-u", "0"), ParseStatus::kParsedTwo); + EXPECT_EQ(parser.Parse("--unsigned", "0"), ParseStatus::kParsedTwo); + EXPECT_EQ(parser.value(), 0u); + + // Maximum values. + T max = std::numeric_limits<T>::max(); + StringBuffer<32> buf; + buf << max; + EXPECT_EQ(parser.Parse("-u", buf.c_str()), ParseStatus::kParsedTwo); + EXPECT_EQ(parser.value(), max); + EXPECT_EQ(parser.Parse("--unsigned", buf.c_str()), ParseStatus::kParsedTwo); + EXPECT_EQ(parser.value(), max); + + // Out of-range value. + if (max < std::numeric_limits<uint64_t>::max()) { + buf.clear(); + buf << (max + 1ULL); + EXPECT_EQ(parser.Parse("-u", buf.c_str()), ParseStatus::kParseFailure); + EXPECT_EQ(parser.Parse("--unsigned", buf.c_str()), + ParseStatus::kParseFailure); + EXPECT_EQ(parser.value(), max); + } +} + +TEST(ArgsParseTest, ParseUnsignedFlags) { + ParseUnsignedFlag<uint8_t>(); + ParseUnsignedFlag<uint16_t>(); + ParseUnsignedFlag<uint32_t>(); + ParseUnsignedFlag<uint64_t>(); +} + +TEST(ArgsParseTest, ParsePositional) { + auto parser = UnsignedParser<size_t>("positional").set_default(1); + EXPECT_EQ(parser.Parse("-p", "2"), ParseStatus::kParseFailure); + EXPECT_EQ(parser.value(), 1u); + + EXPECT_EQ(parser.Parse("--positional", "2"), ParseStatus::kParseFailure); + EXPECT_EQ(parser.value(), 1u); + + // Second arg is ignored.. + EXPECT_EQ(parser.Parse("2", "3"), ParseStatus::kParsedOne); + EXPECT_EQ(parser.value(), 2u); + + // Positional only matches once. + EXPECT_EQ(parser.Parse("3"), ParseStatus::kParseMismatch); + EXPECT_EQ(parser.value(), 2u); +} + +TEST(ArgsParseTest, PrintUsage) { + // Just verify it compiles and runs. + Vector<ArgParserVariant, 3> parsers = { + BoolParser("-v", "--verbose").set_default(false), + UnsignedParser<size_t>("-r", "--runs").set_default(1000), + UnsignedParser<size_t>("port").set_default(11111), + }; + PrintUsage(parsers, "test-bin"); +} + +void CheckArgs(Vector<ArgParserVariant>& parsers, + bool verbose, + size_t runs, + uint16_t port) { + bool actual_verbose; + EXPECT_EQ(GetArg(parsers, "--verbose", &actual_verbose), OkStatus()); + EXPECT_EQ(verbose, actual_verbose); + EXPECT_EQ(ResetArg(parsers, "--verbose"), OkStatus()); + + size_t actual_runs; + EXPECT_EQ(GetArg(parsers, "--runs", &actual_runs), OkStatus()); + EXPECT_EQ(runs, actual_runs); + EXPECT_EQ(ResetArg(parsers, "--runs"), OkStatus()); + + uint16_t actual_port; + EXPECT_EQ(GetArg(parsers, "port", &actual_port), OkStatus()); + EXPECT_EQ(port, actual_port); + EXPECT_EQ(ResetArg(parsers, "port"), OkStatus()); +} + +TEST(ArgsParseTest, ParseArgs) { + Vector<ArgParserVariant, 3> parsers{ + BoolParser("-v", "--verbose").set_default(false), + UnsignedParser<size_t>("-r", "--runs").set_default(1000), + UnsignedParser<uint16_t>("port").set_default(11111), + }; + + char const* argv1[] = {"test-bin"}; + EXPECT_EQ(ParseArgs(parsers, 1, const_cast<char**>(argv1)), OkStatus()); + CheckArgs(parsers, false, 1000, 11111); + + char const* argv2[] = {"test-bin", "22222"}; + EXPECT_EQ(ParseArgs(parsers, 2, const_cast<char**>(argv2)), OkStatus()); + CheckArgs(parsers, false, 1000, 22222); + + // Out of range argument. + char const* argv3[] = {"test-bin", "65536"}; + EXPECT_EQ(ParseArgs(parsers, 2, const_cast<char**>(argv3)), + Status::InvalidArgument()); + + // Extra argument. + char const* argv4[] = {"test-bin", "1", "2"}; + EXPECT_EQ(ParseArgs(parsers, 3, const_cast<char**>(argv4)), + Status::InvalidArgument()); + EXPECT_EQ(ResetArg(parsers, "port"), OkStatus()); + + // Flag missing value. + char const* argv5[] = {"test-bin", "--runs"}; + EXPECT_EQ(ParseArgs(parsers, 2, const_cast<char**>(argv5)), + Status::InvalidArgument()); + + char const* argv6[] = {"test-bin", "-v", "33333", "--runs", "300"}; + EXPECT_EQ(ParseArgs(parsers, 5, const_cast<char**>(argv6)), OkStatus()); + CheckArgs(parsers, true, 300, 33333); + + char const* argv7[] = {"test-bin", "-r", "400", "--verbose"}; + EXPECT_EQ(ParseArgs(parsers, 4, const_cast<char**>(argv7)), OkStatus()); + CheckArgs(parsers, true, 400, 11111); + + char const* argv8[] = {"test-bin", "--no-verbose", "-r", "5000", "55555"}; + EXPECT_EQ(ParseArgs(parsers, 5, const_cast<char**>(argv8)), OkStatus()); + CheckArgs(parsers, false, 5000, 55555); +} + +} // namespace +} // namespace pw::rpc::fuzz diff --git a/pw_rpc/fuzz/client_fuzzer.cc b/pw_rpc/fuzz/client_fuzzer.cc new file mode 100644 index 000000000..c5086becf --- /dev/null +++ b/pw_rpc/fuzz/client_fuzzer.cc @@ -0,0 +1,111 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +// clang-format off +#include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first. +// clang-format on + +#include <sys/socket.h> + +#include <cstring> + +#include "pw_log/log.h" +#include "pw_rpc/fuzz/argparse.h" +#include "pw_rpc/fuzz/engine.h" +#include "pw_rpc/integration_testing.h" + +namespace pw::rpc::fuzz { +namespace { + +// This client configures a socket read timeout to allow the RPC dispatch thread +// to exit gracefully. +constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0}; + +int FuzzClient(int argc, char** argv) { + // TODO(aarongreen): Incorporate descriptions into usage message. + Vector<ArgParserVariant, 5> parsers{ + // Enables additional logging. + BoolParser("-v", "--verbose").set_default(false), + + // The number of actions to perform as part of the test. A value of 0 runs + // indefinitely. + UnsignedParser<size_t>("-n", "--num-actions").set_default(256), + + // The seed value for the PRNG. A value of 0 generates a seed. + UnsignedParser<uint64_t>("-s", "--seed").set_default(0), + + // The time, in milliseconds, that can elapse without triggering an error. + UnsignedParser<size_t>("-t", "--timeout").set_default(5000), + + // The port use to connect to the `test_rpc_server`. + UnsignedParser<uint16_t>("port").set_default(48000)}; + + if (!ParseArgs(parsers, argc, argv).ok()) { + PrintUsage(parsers, argv[0]); + return 1; + } + + bool verbose; + size_t num_actions; + uint64_t seed; + size_t timeout_ms; + uint16_t port; + if (!GetArg(parsers, "--verbose", &verbose).ok() || + !GetArg(parsers, "--num-actions", &num_actions).ok() || + !GetArg(parsers, "--seed", &seed).ok() || + !GetArg(parsers, "--timeout", &timeout_ms).ok() || + !GetArg(parsers, "port", &port).ok()) { + return 1; + } + + if (!seed) { + seed = chrono::SystemClock::now().time_since_epoch().count(); + } + + if (auto status = integration_test::InitializeClient(port); !status.ok()) { + PW_LOG_ERROR("Failed to initialize client: %s", pw_StatusString(status)); + return 1; + } + + // Set read timout on socket to allow + // pw::rpc::integration_test::TerminateClient() to complete. + int fd = integration_test::GetClientSocketFd(); + if (setsockopt(fd, + SOL_SOCKET, + SO_RCVTIMEO, + &kSocketReadTimeout, + sizeof(kSocketReadTimeout)) != 0) { + PW_LOG_ERROR("Failed to configure socket receive timeout with errno=%d", + errno); + return 1; + } + + if (num_actions == 0) { + num_actions = std::numeric_limits<size_t>::max(); + } + + Fuzzer fuzzer(integration_test::client(), integration_test::kChannelId); + fuzzer.set_verbose(verbose); + fuzzer.set_timeout(std::chrono::milliseconds(timeout_ms)); + fuzzer.Run(seed, num_actions); + integration_test::TerminateClient(); + return 0; +} + +} // namespace +} // namespace pw::rpc::fuzz + +int main(int argc, char** argv) { + return pw::rpc::fuzz::FuzzClient(argc, argv); +} diff --git a/pw_rpc/fuzz/engine.cc b/pw_rpc/fuzz/engine.cc new file mode 100644 index 000000000..b19dfa398 --- /dev/null +++ b/pw_rpc/fuzz/engine.cc @@ -0,0 +1,553 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +// clang-format off +#include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first. + +#include "pw_rpc/fuzz/engine.h" +// clang-format on + +#include <algorithm> +#include <cctype> +#include <chrono> +#include <cinttypes> +#include <limits> +#include <mutex> + +#include "pw_assert/check.h" +#include "pw_bytes/span.h" +#include "pw_log/log.h" +#include "pw_span/span.h" +#include "pw_status/status.h" +#include "pw_string/format.h" + +namespace pw::rpc::fuzz { +namespace { + +using namespace std::chrono_literals; + +// Maximum number of bytes written in a single unary or stream request. +constexpr size_t kMaxWriteLen = MaxSafePayloadSize(); +static_assert(kMaxWriteLen * 0x7E <= std::numeric_limits<uint16_t>::max()); + +struct ActiveVisitor final { + using result_type = bool; + result_type operator()(std::monostate&) { return false; } + result_type operator()(pw::rpc::RawUnaryReceiver& call) { + return call.active(); + } + result_type operator()(pw::rpc::RawClientReaderWriter& call) { + return call.active(); + } +}; + +struct CloseClientStreamVisitor final { + using result_type = void; + result_type operator()(std::monostate&) {} + result_type operator()(pw::rpc::RawUnaryReceiver&) {} + result_type operator()(pw::rpc::RawClientReaderWriter& call) { + call.CloseClientStream().IgnoreError(); + } +}; + +struct WriteVisitor final { + using result_type = bool; + result_type operator()(std::monostate&) { return false; } + result_type operator()(pw::rpc::RawUnaryReceiver&) { return false; } + result_type operator()(pw::rpc::RawClientReaderWriter& call) { + if (!call.active()) { + return false; + } + call.Write(data).IgnoreError(); + return true; + } + ConstByteSpan data; +}; + +struct CancelVisitor final { + using result_type = void; + result_type operator()(std::monostate&) {} + result_type operator()(pw::rpc::RawUnaryReceiver& call) { + call.Cancel().IgnoreError(); + } + result_type operator()(pw::rpc::RawClientReaderWriter& call) { + call.Cancel().IgnoreError(); + } +}; + +struct AbandonVisitor final { + using result_type = void; + result_type operator()(std::monostate&) {} + result_type operator()(pw::rpc::RawUnaryReceiver& call) { call.Abandon(); } + result_type operator()(pw::rpc::RawClientReaderWriter& call) { + call.Abandon(); + } +}; + +} // namespace + +// `Action` methods. + +Action::Action(uint32_t encoded) { + // The first byte is used to determine the operation. The ranges used set the + // relative likelihood of each result, e.g. `kWait` is more likely than + // `kAbandon`. + uint32_t raw = encoded & 0xFF; + if (raw == 0) { + op = kSkip; + } else if (raw < 0x60) { + op = kWait; + } else if (raw < 0x80) { + op = kWriteUnary; + } else if (raw < 0xA0) { + op = kWriteStream; + } else if (raw < 0xC0) { + op = kCloseClientStream; + } else if (raw < 0xD0) { + op = kCancel; + } else if (raw < 0xE0) { + op = kAbandon; + } else if (raw < 0xF0) { + op = kSwap; + } else { + op = kDestroy; + } + target = ((encoded & 0xFF00) >> 8) % Fuzzer::kMaxConcurrentCalls; + value = encoded >> 16; +} + +Action::Action(Op op_, size_t target_, uint16_t value_) + : op(op_), target(target_), value(value_) {} + +Action::Action(Op op_, size_t target_, char val, size_t len) + : op(op_), target(target_) { + PW_ASSERT(op == kWriteUnary || op == kWriteStream); + value = static_cast<uint16_t>(((val % 0x80) * kMaxWriteLen) + + (len % kMaxWriteLen)); +} + +char Action::DecodeWriteValue(uint16_t value) { + return static_cast<char>((value / kMaxWriteLen) % 0x7F); +} + +size_t Action::DecodeWriteLength(uint16_t value) { + return value % kMaxWriteLen; +} + +uint32_t Action::Encode() const { + uint32_t encoded = 0; + switch (op) { + case kSkip: + encoded = 0x00; + break; + case kWait: + encoded = 0x5F; + break; + case kWriteUnary: + encoded = 0x7F; + break; + case kWriteStream: + encoded = 0x9F; + break; + case kCloseClientStream: + encoded = 0xBF; + break; + case kCancel: + encoded = 0xCF; + break; + case kAbandon: + encoded = 0xDF; + break; + case kSwap: + encoded = 0xEF; + break; + case kDestroy: + encoded = 0xFF; + break; + } + encoded |= + ((target < Fuzzer::kMaxConcurrentCalls ? target + : Fuzzer::kMaxConcurrentCalls) % + 0xFF) + << 8; + encoded |= (static_cast<uint32_t>(value) << 16); + return encoded; +} + +void Action::Log(bool verbose, size_t num_actions, const char* fmt, ...) const { + if (!verbose) { + return; + } + char s1[16]; + auto result = callback_id < Fuzzer::kMaxConcurrentCalls + ? string::Format(s1, "%-3zu", callback_id) + : string::Format(s1, "n/a"); + va_list ap; + va_start(ap, fmt); + char s2[128]; + if (result.ok()) { + result = string::FormatVaList(s2, fmt, ap); + } + va_end(ap); + if (result.ok()) { + PW_LOG_INFO("#%-12zu\tthread: %zu\tcallback for: %s\ttarget call: %zu\t%s", + num_actions, + thread_id, + s1, + target, + s2); + } else { + LogFailure(verbose, num_actions, result.status()); + } +} + +void Action::LogFailure(bool verbose, size_t num_actions, Status status) const { + if (verbose && !status.ok()) { + PW_LOG_INFO("#%-12zu\tthread: %zu\tFailed to log action: %s", + num_actions, + thread_id, + pw_StatusString(status)); + } +} + +// FuzzyCall methods. + +void FuzzyCall::RecordWrite(size_t num, bool append) { + std::lock_guard lock(mutex_); + if (append) { + last_write_ += num; + } else { + last_write_ = num; + } + total_written_ += num; + pending_ = true; +} + +void FuzzyCall::Await() { + std::unique_lock<sync::Mutex> lock(mutex_); + cv_.wait(lock, [this]() PW_NO_LOCK_SAFETY_ANALYSIS { return !pending_; }); +} + +void FuzzyCall::Notify() { + if (pending_.exchange(false)) { + cv_.notify_all(); + } +} + +void FuzzyCall::Swap(FuzzyCall& other) { + if (index_ == other.index_) { + return; + } + // Manually acquire locks in an order based on call IDs to prevent deadlock. + if (index_ < other.index_) { + mutex_.lock(); + other.mutex_.lock(); + } else { + other.mutex_.lock(); + mutex_.lock(); + } + call_.swap(other.call_); + std::swap(id_, other.id_); + pending_ = other.pending_.exchange(pending_); + std::swap(last_write_, other.last_write_); + std::swap(total_written_, other.total_written_); + mutex_.unlock(); + other.mutex_.unlock(); + cv_.notify_all(); + other.cv_.notify_all(); +} + +void FuzzyCall::Reset(Variant call) { + { + std::lock_guard lock(mutex_); + call_ = std::move(call); + } + cv_.notify_all(); +} + +void FuzzyCall::Log() { + if (mutex_.try_lock_for(100ms)) { + PW_LOG_INFO("call %zu:", index_); + PW_LOG_INFO(" active: %s", + std::visit(ActiveVisitor(), call_) ? "true" : "false"); + PW_LOG_INFO(" request pending: %s ", pending_ ? "true" : "false"); + PW_LOG_INFO(" last write: %zu bytes", last_write_); + PW_LOG_INFO(" total written: %zu bytes", total_written_); + mutex_.unlock(); + } else { + PW_LOG_WARN("call %zu: failed to acquire lock", index_); + } +} + +// `Fuzzer` methods. + +#define FUZZ_LOG_VERBOSE(...) \ + if (verbose_) { \ + PW_LOG_INFO(__VA_ARGS__); \ + } + +Fuzzer::Fuzzer(Client& client, uint32_t channel_id) + : client_(client, channel_id), + timer_([this](chrono::SystemClock::time_point) { + PW_LOG_ERROR( + "Workers performed %zu actions before timing out without an " + "update.", + num_actions_.load()); + PW_LOG_INFO("Additional call details:"); + for (auto& call : fuzzy_calls_) { + call.Log(); + } + PW_CRASH("Fuzzer found a fatal error condition: TIMEOUT."); + }) { + for (size_t index = 0; index < kMaxConcurrentCalls; ++index) { + fuzzy_calls_.emplace_back(index); + indices_.push_back(index); + contexts_.push_back(CallbackContext{.id = index, .fuzzer = this}); + } +} + +void Fuzzer::Run(uint64_t seed, size_t num_actions) { + FUZZ_LOG_VERBOSE("Fuzzing RPC client with:"); + FUZZ_LOG_VERBOSE(" num_actions: %zu", num_actions); + FUZZ_LOG_VERBOSE(" seed: %" PRIu64, seed); + num_actions_.store(0); + random::XorShiftStarRng64 rng(seed); + while (true) { + { + size_t actions_done = num_actions_.load(); + if (actions_done >= num_actions) { + FUZZ_LOG_VERBOSE("Fuzzing complete; %zu actions performed.", + actions_done); + break; + } + FUZZ_LOG_VERBOSE("%zu actions remaining.", num_actions - actions_done); + } + FUZZ_LOG_VERBOSE("Generating %zu random actions.", kMaxActions); + pw::Vector<uint32_t, kMaxActions> actions; + for (size_t i = 0; i < kNumThreads; ++i) { + size_t num_actions_for_thread; + rng.GetInt(num_actions_for_thread, kMaxActionsPerThread + 1); + for (size_t j = 0; j < num_actions_for_thread; ++j) { + uint32_t encoded = 0; + while (!encoded) { + rng.GetInt(encoded); + } + actions.push_back(encoded); + } + actions.push_back(0); + } + Run(actions); + } +} + +void Fuzzer::Run(const pw::Vector<uint32_t>& actions) { + FUZZ_LOG_VERBOSE("Starting %zu threads to perform %zu actions:", + kNumThreads - 1, + actions.size()); + FUZZ_LOG_VERBOSE(" timeout: %lldms", timer_.timeout() / 1ms); + auto iter = actions.begin(); + timer_.Restart(); + for (size_t thread_id = 0; thread_id < kNumThreads; ++thread_id) { + pw::Vector<uint32_t, kMaxActionsPerThread> thread_actions; + while (thread_actions.size() < kMaxActionsPerThread && + iter != actions.end()) { + uint32_t encoded = *iter++; + if (!encoded) { + break; + } + thread_actions.push_back(encoded); + } + if (thread_id == 0) { + std::lock_guard lock(mutex_); + callback_actions_ = std::move(thread_actions); + callback_iterator_ = callback_actions_.begin(); + } else { + threads_.emplace_back( + [this, thread_id, actions = std::move(thread_actions)]() { + for (const auto& encoded : actions) { + Action action(encoded); + action.set_thread_id(thread_id); + Perform(action); + } + }); + } + } + for (auto& t : threads_) { + t.join(); + } + for (auto& fuzzy_call : fuzzy_calls_) { + fuzzy_call.Reset(); + } + timer_.Cancel(); +} + +void Fuzzer::Perform(const Action& action) { + FuzzyCall& fuzzy_call = FindCall(action.target); + switch (action.op) { + case Action::kSkip: { + if (action.thread_id == 0) { + action.Log(verbose_, ++num_actions_, "Callback chain completed"); + } + break; + } + case Action::kWait: { + if (action.callback_id == action.target) { + // Don't wait in a callback of the target call. + break; + } + if (fuzzy_call.pending()) { + action.Log(verbose_, ++num_actions_, "Waiting for call."); + fuzzy_call.Await(); + } + break; + } + case Action::kWriteUnary: + case Action::kWriteStream: { + if (action.callback_id == action.target) { + // Don't create a new call from the call's own callback. + break; + } + char buf[kMaxWriteLen]; + char val = Action::DecodeWriteValue(action.value); + size_t len = Action::DecodeWriteLength(action.value); + memset(buf, val, len); + if (verbose_) { + char msg_buf[64]; + span msg(msg_buf); + auto result = string::Format( + msg, + "Writing %s request of ", + action.op == Action::kWriteUnary ? "unary" : "stream"); + if (result.ok()) { + size_t off = result.size(); + result = string::Format( + msg.subspan(off), + isprint(val) ? "['%c'; %zu]." : "['\\x%02x'; %zu].", + val, + len); + } + size_t num_actions = ++num_actions_; + if (result.ok()) { + action.Log(verbose_, num_actions, "%s", msg.data()); + } else if (verbose_) { + action.LogFailure(verbose_, num_actions, result.status()); + } + } + bool append = false; + if (action.op == Action::kWriteUnary) { + // Send a unary request. + fuzzy_call.Reset(client_.UnaryEcho( + as_bytes(span(buf, len)), + /* on completed */ + [context = GetContext(action.target)](ConstByteSpan, Status) { + context->fuzzer->OnCompleted(context->id); + }, + /* on error */ + [context = GetContext(action.target)](Status status) { + context->fuzzer->OnError(context->id, status); + })); + + } else if (fuzzy_call.Visit( + WriteVisitor{.data = as_bytes(span(buf, len))})) { + // Append to an existing stream + append = true; + } else { + // .Open a new stream. + fuzzy_call.Reset(client_.BidirectionalEcho( + /* on next */ + [context = GetContext(action.target)](ConstByteSpan) { + context->fuzzer->OnNext(context->id); + }, + /* on completed */ + [context = GetContext(action.target)](Status) { + context->fuzzer->OnCompleted(context->id); + }, + /* on error */ + [context = GetContext(action.target)](Status status) { + context->fuzzer->OnError(context->id, status); + })); + } + fuzzy_call.RecordWrite(len, append); + break; + } + case Action::kCloseClientStream: + action.Log(verbose_, ++num_actions_, "Closing stream."); + fuzzy_call.Visit(CloseClientStreamVisitor()); + break; + case Action::kCancel: + action.Log(verbose_, ++num_actions_, "Canceling call."); + fuzzy_call.Visit(CancelVisitor()); + break; + case Action::kAbandon: { + action.Log(verbose_, ++num_actions_, "Abandoning call."); + fuzzy_call.Visit(AbandonVisitor()); + break; + } + case Action::kSwap: { + size_t other_target = action.value % kMaxConcurrentCalls; + if (action.callback_id == action.target || + action.callback_id == other_target) { + // Don't move a call from within its own callback. + break; + } + action.Log(verbose_, + ++num_actions_, + "Swapping call with call %zu.", + other_target); + std::lock_guard lock(mutex_); + FuzzyCall& other = FindCallLocked(other_target); + std::swap(indices_[fuzzy_call.id()], indices_[other.id()]); + fuzzy_call.Swap(other); + break; + } + case Action::kDestroy: { + if (action.callback_id == action.target) { + // Don't destroy a call from within its own callback. + break; + } + action.Log(verbose_, ++num_actions_, "Destroying call."); + fuzzy_call.Reset(); + break; + } + default: + break; + } + timer_.Restart(); +} + +void Fuzzer::OnNext(size_t callback_id) { FindCall(callback_id).Notify(); } + +void Fuzzer::OnCompleted(size_t callback_id) { + uint32_t encoded = 0; + { + std::lock_guard lock(mutex_); + if (callback_iterator_ != callback_actions_.end()) { + encoded = *callback_iterator_++; + } + } + Action action(encoded); + action.set_callback_id(callback_id); + Perform(action); + FindCall(callback_id).Notify(); +} + +void Fuzzer::OnError(size_t callback_id, Status status) { + FuzzyCall& call = FindCall(callback_id); + PW_LOG_WARN("Call %zu received an error from the server: %s", + call.id(), + pw_StatusString(status)); + call.Notify(); +} + +} // namespace pw::rpc::fuzz diff --git a/pw_rpc/fuzz/engine_test.cc b/pw_rpc/fuzz/engine_test.cc new file mode 100644 index 000000000..5ac1a49a1 --- /dev/null +++ b/pw_rpc/fuzz/engine_test.cc @@ -0,0 +1,264 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include "pw_rpc/fuzz/engine.h" + +#include <chrono> + +#include "gtest/gtest.h" +#include "pw_containers/vector.h" +#include "pw_log/log.h" +#include "pw_rpc/benchmark.h" +#include "pw_rpc/internal/client_server_testing_threaded.h" +#include "pw_rpc/internal/fake_channel_output.h" +#include "pw_thread/test_threads.h" + +namespace pw::rpc::fuzz { +namespace { + +using namespace std::literals::chrono_literals; + +// Maximum time, in milliseconds, that can elapse without a call completing or +// being dropped in some way.. +const chrono::SystemClock::duration kTimeout = 5s; + +// These are fairly tight constraints in order to fit within the default +// `PW_UNIT_TEST_CONFIG_MEMORY_POOL_SIZE`. +constexpr size_t kMaxPackets = 128; +constexpr size_t kMaxPayloadSize = 64; + +using BufferedChannelOutputBase = + internal::test::FakeChannelOutputBuffer<kMaxPackets, kMaxPayloadSize>; + +/// Channel output backed by a fixed buffer. +class BufferedChannelOutput : public BufferedChannelOutputBase { + public: + BufferedChannelOutput() : BufferedChannelOutputBase() {} +}; + +using FuzzerChannelOutputBase = + internal::WatchableChannelOutput<BufferedChannelOutput, + kMaxPayloadSize, + kMaxPackets, + kMaxPayloadSize>; + +/// Channel output that can be waited on by the server. +class FuzzerChannelOutput : public FuzzerChannelOutputBase { + public: + FuzzerChannelOutput() : FuzzerChannelOutputBase() {} +}; + +using FuzzerContextBase = + internal::ClientServerTestContextThreaded<FuzzerChannelOutput, + kMaxPayloadSize, + kMaxPackets, + kMaxPayloadSize>; +class FuzzerContext : public FuzzerContextBase { + public: + static constexpr uint32_t kChannelId = 1; + + FuzzerContext() : FuzzerContextBase(thread::test::TestOptionsThread0()) {} +}; + +class RpcFuzzTestingTest : public testing::Test { + protected: + void SetUp() override { context_.server().RegisterService(service_); } + + void Add(Action::Op op, size_t target, uint16_t value) { + actions_.push_back(Action(op, target, value).Encode()); + } + + void Add(Action::Op op, size_t target, char val, size_t len) { + actions_.push_back(Action(op, target, val, len).Encode()); + } + + void NextThread() { actions_.push_back(0); } + + void Run() { + Fuzzer fuzzer(context_.client(), FuzzerContext::kChannelId); + fuzzer.set_verbose(true); + fuzzer.set_timeout(kTimeout); + fuzzer.Run(actions_); + } + + private: + FuzzerContext context_; + BenchmarkService service_; + Vector<uint32_t, Fuzzer::kMaxActions> actions_; +}; + +TEST_F(RpcFuzzTestingTest, SequentialRequests) { + // Callback thread + Add(Action::kWriteStream, 1, 'B', 1); + Add(Action::kSkip, 0, 0); + Add(Action::kWriteStream, 2, 'B', 2); + Add(Action::kSkip, 0, 0); + Add(Action::kWriteStream, 3, 'B', 3); + Add(Action::kSkip, 0, 0); + NextThread(); + + // Thread 1 + Add(Action::kWriteStream, 0, 'A', 2); + Add(Action::kWait, 1, 0); + Add(Action::kWriteStream, 1, 'A', 4); + NextThread(); + + // Thread 2 + NextThread(); + Add(Action::kWait, 2, 0); + Add(Action::kWriteStream, 2, 'A', 6); + + // Thread 3 + NextThread(); + Add(Action::kWait, 3, 0); + + Run(); +} + +// TODO(b/274437709): Re-enable. +TEST_F(RpcFuzzTestingTest, DISABLED_SimultaneousRequests) { + // Callback thread + NextThread(); + + // Thread 1 + Add(Action::kWriteUnary, 1, 'A', 1); + Add(Action::kWait, 2, 0); + NextThread(); + + // Thread 2 + Add(Action::kWriteUnary, 2, 'B', 2); + Add(Action::kWait, 3, 0); + NextThread(); + + // Thread 3 + Add(Action::kWriteUnary, 3, 'C', 3); + Add(Action::kWait, 1, 0); + NextThread(); + + Run(); +} + +// TODO(b/274437709) This test currently does not pass as it exhausts the fake +// channel. It will be re-enabled when the underlying stream is swapped for +// a pw_ring_buffer-based approach. +TEST_F(RpcFuzzTestingTest, DISABLED_CanceledRequests) { + // Callback thread + NextThread(); + + // Thread 1 + for (size_t i = 0; i < 10; ++i) { + Add(Action::kWriteUnary, i % 3, 'A', i); + } + Add(Action::kWait, 0, 0); + Add(Action::kWait, 1, 0); + Add(Action::kWait, 2, 0); + NextThread(); + + // Thread 2 + for (size_t i = 0; i < 10; ++i) { + Add(Action::kCancel, i % 3, 0); + } + NextThread(); + + // Thread 3 + NextThread(); + + Run(); +} + +// TODO(b/274437709) This test currently does not pass as it exhausts the fake +// channel. It will be re-enabled when the underlying stream is swapped for +// a pw_ring_buffer-based approach. +TEST_F(RpcFuzzTestingTest, DISABLED_AbandonedRequests) { + // Callback thread + NextThread(); + + // Thread 1 + for (size_t i = 0; i < 10; ++i) { + Add(Action::kWriteUnary, i % 3, 'A', i); + } + Add(Action::kWait, 0, 0); + Add(Action::kWait, 1, 0); + Add(Action::kWait, 2, 0); + NextThread(); + + // Thread 2 + for (size_t i = 0; i < 10; ++i) { + Add(Action::kAbandon, i % 3, 0); + } + NextThread(); + + // Thread 3 + NextThread(); + + Run(); +} + +// TODO(b/274437709) This test currently does not pass as it exhausts the fake +// channel. It will be re-enabled when the underlying stream is swapped for +// a pw_ring_buffer-based approach. +TEST_F(RpcFuzzTestingTest, DISABLED_SwappedRequests) { + Vector<uint32_t, Fuzzer::kMaxActions> actions; + // Callback thread + NextThread(); + // Thread 1 + for (size_t i = 0; i < 10; ++i) { + Add(Action::kWriteUnary, i % 3, 'A', i); + } + Add(Action::kWait, 0, 0); + Add(Action::kWait, 1, 0); + Add(Action::kWait, 2, 0); + NextThread(); + // Thread 2 + for (size_t i = 0; i < 100; ++i) { + auto j = i % 3; + Add(Action::kSwap, j, j + 1); + } + NextThread(); + // Thread 3 + NextThread(); + + Run(); +} + +// TODO(b/274437709) This test currently does not pass as it exhausts the fake +// channel. It will be re-enabled when the underlying stream is swapped for +// a pw_ring_buffer-based approach. +TEST_F(RpcFuzzTestingTest, DISABLED_DestroyedRequests) { + // Callback thread + NextThread(); + + // Thread 1 + for (size_t i = 0; i < 100; ++i) { + Add(Action::kWriteUnary, i % 3, 'A', i); + } + Add(Action::kWait, 0, 0); + Add(Action::kWait, 1, 0); + Add(Action::kWait, 2, 0); + NextThread(); + + // Thread 2 + for (size_t i = 0; i < 100; ++i) { + Add(Action::kDestroy, i % 3, 0); + } + NextThread(); + + // Thread 3 + NextThread(); + + Run(); +} + +} // namespace +} // namespace pw::rpc::fuzz diff --git a/pw_rpc/fuzz/public/pw_rpc/fuzz/alarm_timer.h b/pw_rpc/fuzz/public/pw_rpc/fuzz/alarm_timer.h new file mode 100644 index 000000000..9ccd7ce0b --- /dev/null +++ b/pw_rpc/fuzz/public/pw_rpc/fuzz/alarm_timer.h @@ -0,0 +1,56 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include "pw_chrono/system_clock.h" +#include "pw_chrono/system_timer.h" + +namespace pw::rpc::fuzz { + +/// Represents a timer that invokes a callback on timeout. Once started, it will +/// invoke the callback after a provided duration unless it is restarted, +/// canceled, or destroyed. +class AlarmTimer { + public: + AlarmTimer(chrono::SystemTimer::ExpiryCallback&& on_timeout) + : timer_(std::move(on_timeout)) {} + + chrono::SystemClock::duration timeout() const { return timeout_; } + + /// "Arms" the timer. The callback will be invoked if `timeout` elapses + /// without a call to `Restart`, `Cancel`, or the destructor. Calling `Start` + /// again restarts the timer, possibly with a different `timeout` value. + void Start(chrono::SystemClock::duration timeout) { + timeout_ = timeout; + Restart(); + } + + /// Restarts the timer. This is equivalent to calling `Start` with the same + /// `timeout` as passed previously. Does nothing if `Start` has not been + /// called. + void Restart() { + Cancel(); + timer_.InvokeAfter(timeout_); + } + + /// "Disarms" the timer. The callback will not be invoked unless `Start` is + /// called again. Does nothing if `Start` has not been called. + void Cancel() { timer_.Cancel(); } + + private: + chrono::SystemTimer timer_; + chrono::SystemClock::duration timeout_; +}; + +} // namespace pw::rpc::fuzz diff --git a/pw_rpc/fuzz/public/pw_rpc/fuzz/argparse.h b/pw_rpc/fuzz/public/pw_rpc/fuzz/argparse.h new file mode 100644 index 000000000..05a7633e6 --- /dev/null +++ b/pw_rpc/fuzz/public/pw_rpc/fuzz/argparse.h @@ -0,0 +1,230 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +/// Command line argument parsing. +/// +/// The objects defined below can be used to parse command line arguments of +/// different types. These objects are "just enough" defined for current use +/// cases, but the design is intended to be extensible as new types and traits +/// are needed. +/// +/// Example: +/// +/// Given a boolean flag "verbose", a numerical flag "runs", and a positional +/// "port" argument to be parsed, we can create a vector of parsers. In this +/// example, we modify the parsers during creation to set default values: +/// +/// @code +/// Vector<ArgParserVariant, 3> parsers = { +/// BoolParser("-v", "--verbose").set_default(false), +/// UnsignedParser<size_t>("-r", "--runs").set_default(1000), +/// UnsignedParser<uint16_t>("port").set_default(11111), +/// }; +/// @endcode +/// +/// With this vector, we can then parse command line arguments and extract +/// the values of arguments that were set, e.g.: +/// +/// @code +/// if (!ParseArgs(parsers, argc, argv).ok()) { +/// PrintUsage(parsers, argv[0]); +/// return 1; +/// } +/// bool verbose; +/// size_t runs; +/// uint16_t port; +/// if (!GetArg(parsers, "--verbose", &verbose).ok() || +/// !GetArg(parsers, "--runs", &runs).ok() || +/// !GetArg(parsers, "port", &port).ok()) { +/// // Shouldn't happen unless names do not match. +/// return 1; +/// } +/// +/// // Do stuff with `verbose`, `runs`, and `port`... +/// @endcode + +#include <cstddef> +#include <cstdint> +#include <string_view> +#include <variant> + +#include "pw_containers/vector.h" +#include "pw_status/status.h" + +namespace pw::rpc::fuzz { + +/// Enumerates the results of trying to parse a specific command line argument +/// with a particular parsers. +enum ParseStatus { + /// The argument matched the parser and was successfully parsed without a + /// value. + kParsedOne, + + /// The argument matched the parser and was successfully parsed with a value. + kParsedTwo, + + /// The argument did not match the parser. This is not necessarily an error; + /// the argument may match a different parser. + kParseMismatch, + + /// The argument matched a parser, but could not be parsed. This may be due to + /// a missing value for a flag, a value of the wrong type, a provided value + /// being out of range, etc. Parsers should log additional details before + /// returning this value. + kParseFailure, +}; + +/// Holds parsed argument values of different types. +using ArgVariant = std::variant<std::monostate, bool, uint64_t>; + +/// Base class for argument parsers. +class ArgParserBase { + public: + virtual ~ArgParserBase() = default; + + std::string_view short_name() const { return short_name_; } + std::string_view long_name() const { return long_name_; } + bool positional() const { return positional_; } + + /// Clears the value. Typically, command line arguments are only parsed once, + /// but this method is useful for testing. + void Reset() { value_ = std::monostate(); } + + protected: + /// Defines an argument parser with a single name. This may be a positional + /// argument or a flag. + ArgParserBase(std::string_view name); + + /// Defines an argument parser for a flag with short and long names. + ArgParserBase(std::string_view shortopt, std::string_view longopt); + + void set_initial(ArgVariant initial) { initial_ = initial; } + void set_value(ArgVariant value) { value_ = value; } + + /// Examines if the given `arg` matches this parser. A parser for a flag can + /// match the short name (e.g. '-f') if set, or the long name (e.g. '--foo'). + /// A parser for a positional argument will match anything until it has a + /// value set. + bool Match(std::string_view arg); + + /// Returns the parsed value. + template <typename T> + T Get() const { + return std::get<T>(GetValue()); + } + + private: + const ArgVariant& GetValue() const; + + std::string_view short_name_; + std::string_view long_name_; + bool positional_; + + ArgVariant initial_; + ArgVariant value_; +}; + +// Argument parsers for boolean arguments. These arguments are always flags, and +// can be specified as, e.g. "-f" (true), "--foo" (true) or "--no-foo" (false). +class BoolParser : public ArgParserBase { + public: + BoolParser(std::string_view optname); + BoolParser(std::string_view shortopt, std::string_view longopt); + + bool value() const { return Get<bool>(); } + BoolParser& set_default(bool value); + + ParseStatus Parse(std::string_view arg0, + std::string_view arg1 = std::string_view()); +}; + +// Type-erasing argument parser for unsigned integer arguments. This object +// always parses values as `uint64_t`s and should not be used directly. +// Instead, use `UnsignedParser<T>` with a type to explicitly narrow to. +class UnsignedParserBase : public ArgParserBase { + protected: + UnsignedParserBase(std::string_view name); + UnsignedParserBase(std::string_view shortopt, std::string_view longopt); + + ParseStatus Parse(std::string_view arg0, std::string_view arg1, uint64_t max); +}; + +// Argument parser for unsigned integer arguments. These arguments may be flags +// or positional arguments. +template <typename T, typename std::enable_if_t<std::is_unsigned_v<T>, int> = 0> +class UnsignedParser : public UnsignedParserBase { + public: + UnsignedParser(std::string_view name) : UnsignedParserBase(name) {} + UnsignedParser(std::string_view shortopt, std::string_view longopt) + : UnsignedParserBase(shortopt, longopt) {} + + T value() const { return static_cast<T>(Get<uint64_t>()); } + + UnsignedParser& set_default(T value) { + set_initial(static_cast<uint64_t>(value)); + return *this; + } + + ParseStatus Parse(std::string_view arg0, + std::string_view arg1 = std::string_view()) { + return UnsignedParserBase::Parse(arg0, arg1, std::numeric_limits<T>::max()); + } +}; + +// Holds argument parsers of different types. +using ArgParserVariant = + std::variant<BoolParser, UnsignedParser<uint16_t>, UnsignedParser<size_t>>; + +// Parses the command line arguments and sets the values of the given `parsers`. +Status ParseArgs(Vector<ArgParserVariant>& parsers, int argc, char** argv); + +// Logs a usage message based on the given `parsers` and the program name given +// by `argv0`. +void PrintUsage(const Vector<ArgParserVariant>& parsers, + std::string_view argv0); + +// Attempts to find the parser in `parsers` with the given `name`, and returns +// its value if found. +std::optional<ArgVariant> GetArg(const Vector<ArgParserVariant>& parsers, + std::string_view name); + +inline void GetArgValue(const ArgVariant& arg, bool* out) { + *out = std::get<bool>(arg); +} + +template <typename T, typename std::enable_if_t<std::is_unsigned_v<T>, int> = 0> +void GetArgValue(const ArgVariant& arg, T* out) { + *out = static_cast<T>(std::get<uint64_t>(arg)); +} + +// Like `GetArgVariant` above, but extracts the typed value from the variant +// into `out`. Returns an error if no parser exists in `parsers` with the given +// `name`. +template <typename T> +Status GetArg(const Vector<ArgParserVariant>& parsers, + std::string_view name, + T* out) { + const auto& arg = GetArg(parsers, name); + if (!arg.has_value()) { + return Status::InvalidArgument(); + } + GetArgValue(*arg, out); + return OkStatus(); +} + +// Resets the parser with the given name. Returns an error if not found. +Status ResetArg(Vector<ArgParserVariant>& parsers, std::string_view name); + +} // namespace pw::rpc::fuzz diff --git a/pw_rpc/fuzz/public/pw_rpc/fuzz/engine.h b/pw_rpc/fuzz/public/pw_rpc/fuzz/engine.h new file mode 100644 index 000000000..34e92c003 --- /dev/null +++ b/pw_rpc/fuzz/public/pw_rpc/fuzz/engine.h @@ -0,0 +1,339 @@ +// Copyright 2023 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include <atomic> +#include <cstdarg> +#include <cstddef> +#include <cstdint> +#include <thread> +#include <variant> + +#include "pw_containers/vector.h" +#include "pw_random/xor_shift.h" +#include "pw_rpc/benchmark.h" +#include "pw_rpc/benchmark.raw_rpc.pb.h" +#include "pw_rpc/fuzz/alarm_timer.h" +#include "pw_sync/condition_variable.h" +#include "pw_sync/lock_annotations.h" +#include "pw_sync/mutex.h" +#include "pw_sync/timed_mutex.h" + +namespace pw::rpc::fuzz { + +/// Describes an action a fuzzing thread can perform on a call. +struct Action { + enum Op : uint8_t { + /// No-op. + kSkip, + + /// Waits for the call indicated by `target` to complete. + kWait, + + /// Makes a new unary request using the call indicated by `target`. The data + /// written is derived from `value`. + kWriteUnary, + + /// Writes to a stream request using the call indicated by `target`, or + /// makes + /// a new one if not currently a stream call. The data written is derived + /// from `value`. + kWriteStream, + + /// Closes the stream if the call indicated by `target` is a stream call. + kCloseClientStream, + + /// Cancels the call indicated by `target`. + kCancel, + + /// Abandons the call indicated by `target`. + kAbandon, + + /// Swaps the call indicated by `target` with a call indicated by `value`. + kSwap, + + /// Sets the call indicated by `target` to an initial, unset state. + kDestroy, + }; + + constexpr Action() = default; + Action(uint32_t encoded); + Action(Op op, size_t target, uint16_t value); + Action(Op op, size_t target, char val, size_t len); + ~Action() = default; + + void set_thread_id(size_t thread_id_) { + thread_id = thread_id_; + callback_id = std::numeric_limits<size_t>::max(); + } + + void set_callback_id(size_t callback_id_) { + thread_id = 0; + callback_id = callback_id_; + } + + // For a write action's value, returns the character value to be written. + static char DecodeWriteValue(uint16_t value); + + // For a write action's value, returns the number of characters to be written. + static size_t DecodeWriteLength(uint16_t value); + + /// Returns a value that represents the fields of an action. Constructing an + /// `Action` with this value will produce the same fields. + uint32_t Encode() const; + + /// Records details of the action being performed if verbose logging is + /// enabled. + void Log(bool verbose, size_t num_actions, const char* fmt, ...) const; + + /// Records an encountered when trying to log an action. + void LogFailure(bool verbose, size_t num_actions, Status status) const; + + Op op = kSkip; + size_t target = 0; + uint16_t value = 0; + + size_t thread_id = 0; + size_t callback_id = std::numeric_limits<size_t>::max(); +}; + +/// Wraps an RPC call that may be either a `RawUnaryReceiver` or +/// `RawClientReaderWriter`. Allows applying `Action`s to each possible +/// type of call. +class FuzzyCall { + public: + using Variant = + std::variant<std::monostate, RawUnaryReceiver, RawClientReaderWriter>; + + explicit FuzzyCall(size_t index) : index_(index), id_(index) {} + ~FuzzyCall() = default; + + size_t id() { + std::lock_guard lock(mutex_); + return id_; + } + + bool pending() { + std::lock_guard lock(mutex_); + return pending_; + } + + /// Applies the given visitor to the call variant. If the action taken by the + /// visitor is expected to complete the call, it will notify any threads + /// waiting for the call to complete. This version of the method does not + /// return the result of the visiting the variant. + template <typename Visitor, + typename std::enable_if_t< + std::is_same_v<typename Visitor::result_type, void>, + int> = 0> + typename Visitor::result_type Visit(Visitor visitor, bool completes = true) { + { + std::lock_guard lock(mutex_); + std::visit(std::move(visitor), call_); + } + if (completes && pending_.exchange(false)) { + cv_.notify_all(); + } + } + + /// Applies the given visitor to the call variant. If the action taken by the + /// visitor is expected to complete the call, it will notify any threads + /// waiting for the call to complete. This version of the method returns the + /// result of the visiting the variant. + template <typename Visitor, + typename std::enable_if_t< + !std::is_same_v<typename Visitor::result_type, void>, + int> = 0> + typename Visitor::result_type Visit(Visitor visitor, bool completes = true) { + typename Visitor::result_type result; + { + std::lock_guard lock(mutex_); + result = std::visit(std::move(visitor), call_); + } + if (completes && pending_.exchange(false)) { + cv_.notify_all(); + } + return result; + } + + // Records the number of bytes written as part of a request. If `append` is + // true, treats the write as a continuation of a streaming request. + void RecordWrite(size_t num, bool append = false); + + /// Waits to be notified that a callback has been invoked. + void Await() PW_LOCKS_EXCLUDED(mutex_); + + /// Completes the call, notifying any waiters. + void Notify() PW_LOCKS_EXCLUDED(mutex_); + + /// Exchanges the call represented by this object with another. + void Swap(FuzzyCall& other); + + /// Resets the call wrapped by this object with a new one. Destorys the + /// previous call. + void Reset(Variant call = Variant()) PW_LOCKS_EXCLUDED(mutex_); + + // Reports the state of this object. + void Log() PW_LOCKS_EXCLUDED(mutex_); + + private: + /// This represents the index in the engine's list of calls. It is used to + /// ensure a consistent order of locking multiple calls. + const size_t index_; + + sync::TimedMutex mutex_; + sync::ConditionVariable cv_; + + /// An identifier that can be used find this object, e.g. by a callback, even + /// when it has been swapped with another call. + size_t id_ PW_GUARDED_BY(mutex_); + + /// Holds the actual pw::rpc::Call object, when present. + Variant call_ PW_GUARDED_BY(mutex_); + + /// Set when a request is sent, and cleared when a callback is invoked. + std::atomic_bool pending_ = false; + + /// Bytes sent in the last unary request or stream write. + size_t last_write_ PW_GUARDED_BY(mutex_) = 0; + + /// Total bytes sent using this call object. + size_t total_written_ PW_GUARDED_BY(mutex_) = 0; +}; + +/// The main RPC fuzzing engine. +/// +/// This class takes or generates a sequence of actions, and dsitributes them to +/// a number of threads that can perform them using an RPC client. Passing the +/// same seed to the engine at construction will allow it to generate the same +/// sequence of actions. +class Fuzzer { + public: + /// Number of fuzzing threads. The first thread counted is the RPC dispatch + /// thread. + static constexpr size_t kNumThreads = 4; + + /// Maximum number of actions that a single thread will try to perform before + /// exiting. + static constexpr size_t kMaxActionsPerThread = 255; + + /// The number of call objects available to be used for fuzzing. + static constexpr size_t kMaxConcurrentCalls = 8; + + /// The mxiumum number of individual fuzzing actions that the fuzzing threads + /// can perform. The `+ 1` is to allow the inclusion of a special `0` action + /// to separate each thread's actions when concatenated into a single list. + static constexpr size_t kMaxActions = + kNumThreads * (kMaxActionsPerThread + 1); + + explicit Fuzzer(Client& client, uint32_t channel_id); + + /// The fuzzer engine should remain pinned in memory since it is referenced by + /// the `CallbackContext`s. + Fuzzer(const Fuzzer&) = delete; + Fuzzer(Fuzzer&&) = delete; + Fuzzer& operator=(const Fuzzer&) = delete; + Fuzzer& operator=(Fuzzer&&) = delete; + + void set_verbose(bool verbose) { verbose_ = verbose; } + + /// Sets the timeout and starts the timer. + void set_timeout(chrono::SystemClock::duration timeout) { + timer_.Start(timeout); + } + + /// Generates encoded actions from the RNG and `Run`s them. + void Run(uint64_t seed, size_t num_actions); + + /// Splits the provided `actions` between the fuzzing threads and runs them to + /// completion. + void Run(const Vector<uint32_t>& actions); + + private: + /// Information passed to the RPC callbacks, including the index of the + /// associated call and a pointer to the fuzzer object. + struct CallbackContext { + size_t id; + Fuzzer* fuzzer; + }; + + /// Restarts the alarm timer, delaying it from detecting a timeout. This is + /// called whenever actions complete and indicates progress is still being + /// made. + void ResetTimerLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + /// Decodes the `encoded` action and performs it. The `thread_id` is used for + /// verbose diagnostics. When invoked from `PerformCallback` the `callback_id` + /// will be set to the index of the associated call. This allows avoiding + /// specific, prohibited actions, e.g. destroying a call from its own + /// callback. + void Perform(const Action& action) PW_LOCKS_EXCLUDED(mutex_); + + /// Returns the call with the matching `id`. + FuzzyCall& FindCall(size_t id) PW_LOCKS_EXCLUDED(mutex_) { + std::lock_guard lock(mutex_); + return FindCallLocked(id); + } + + FuzzyCall& FindCallLocked(size_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + return fuzzy_calls_[indices_[id]]; + } + + /// Returns a pointer to callback context for the given call index. + CallbackContext* GetContext(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_) { + std::lock_guard lock(mutex_); + return &contexts_[callback_id]; + } + + /// Callback for stream write made by the call with the given `callback_id`. + void OnNext(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_); + + /// Callback for completed request for the call with the given `callback_id`. + void OnCompleted(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_); + + /// Callback for an error for the call with the given `callback_id`. + void OnError(size_t callback_id, Status status) PW_LOCKS_EXCLUDED(mutex_); + + bool verbose_ = false; + pw_rpc::raw::Benchmark::Client client_; + BenchmarkService service_; + + /// Alarm thread that detects when no workers have made recent progress. + AlarmTimer timer_; + + sync::Mutex mutex_; + + /// Worker threads. The first thread is the RPC response dispatcher. + Vector<std::thread, kNumThreads> threads_; + + /// RPC call objects. + Vector<FuzzyCall, kMaxConcurrentCalls> fuzzy_calls_; + + /// Maps each call's IDs to its index. Since calls may be move before their + /// callbacks are invoked, this list can be used to find the original call. + Vector<size_t, kMaxConcurrentCalls> indices_ PW_GUARDED_BY(mutex_); + + /// Context objects used to reference the engine and call. + Vector<CallbackContext, kMaxConcurrentCalls> contexts_ PW_GUARDED_BY(mutex_); + + /// Set of actions performed as callbacks from other calls. + Vector<uint32_t, kMaxActionsPerThread> callback_actions_ + PW_GUARDED_BY(mutex_); + Vector<uint32_t>::iterator callback_iterator_ PW_GUARDED_BY(mutex_); + + /// Total actions performed by all workers. + std::atomic<size_t> num_actions_ = 0; +}; + +} // namespace pw::rpc::fuzz diff --git a/pw_rpc/internal/integration_test_ports.gni b/pw_rpc/internal/integration_test_ports.gni index 5413f8796..5197aab46 100644 --- a/pw_rpc/internal/integration_test_ports.gni +++ b/pw_rpc/internal/integration_test_ports.gni @@ -16,4 +16,5 @@ # in one place to prevent accidental conflicts between tests. pw_rpc_PYTHON_CLIENT_CPP_SERVER_TEST_PORT = 30576 pw_rpc_CPP_CLIENT_INTEGRATION_TEST_PORT = 30577 +pw_rpc_CPP_CLIENT_FUZZER_TEST_PORT = 30578 pw_unit_test_RPC_SERVICE_TEST_PORT = 30580 diff --git a/pw_rpc/py/pw_rpc/testing.py b/pw_rpc/py/pw_rpc/testing.py index 0f1475899..de8ab6223 100644 --- a/pw_rpc/py/pw_rpc/testing.py +++ b/pw_rpc/py/pw_rpc/testing.py @@ -14,6 +14,7 @@ """Utilities for testing pw_rpc.""" import argparse +import shlex import subprocess import sys import tempfile @@ -68,8 +69,22 @@ def _parse_subprocess_integration_test_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description='Executes a test between two subprocesses' ) - parser.add_argument('--client', required=True, help='Client binary to run') - parser.add_argument('--server', required=True, help='Server binary to run') + parser.add_argument( + '--client', + required=True, + help=( + 'Client command to run. ' + 'Use quotes and whitespace to pass client-specifc arguments.' + ), + ) + parser.add_argument( + '--server', + required=True, + help=( + 'Server command to run. ' + 'Use quotes and whitespace to pass client-specifc arguments.' + ), + ) parser.add_argument( 'common_args', metavar='-- ...', @@ -96,6 +111,7 @@ def execute_integration_test( common_args: Sequence[str], setup_time_s: float = 0.2, ) -> int: + """Runs an RPC server and client as part of an integration test.""" temp_dir: Optional[tempfile.TemporaryDirectory] = None if TEMP_DIR_MARKER in common_args: @@ -105,11 +121,17 @@ def execute_integration_test( ] try: - server_process = subprocess.Popen([server, *common_args]) + server_cmdline = shlex.split(server) + client_cmdline = shlex.split(client) + if common_args: + server_cmdline += [*common_args] + client_cmdline += [*common_args] + + server_process = subprocess.Popen(server_cmdline) # TODO(b/234879791): Replace this delay with some sort of IPC. time.sleep(setup_time_s) - result = subprocess.run([client, *common_args]).returncode + result = subprocess.run(client_cmdline).returncode server_process.terminate() server_process.communicate() |