aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCraig Tiller <ctiller@google.com>2022-04-21 07:37:14 -1000
committerGitHub <noreply@github.com>2022-04-21 10:37:14 -0700
commit99c5319a765802db8e8f6bc86ccd0568e43fd14f (patch)
tree2b013bc642211eddff58c1d2a27c9f51522fe339
parent4d118154a936ca1fc72c1cc08445bc37ddb45388 (diff)
downloadgrpc-grpc-99c5319a765802db8e8f6bc86ccd0568e43fd14f.tar.gz
[1.46][fault-injection] Backport fix for fault injection filter #29467
Backports #29288
-rw-r--r--BUILD1
-rw-r--r--src/core/ext/filters/fault_injection/fault_injection_filter.cc56
2 files changed, 43 insertions, 14 deletions
diff --git a/BUILD b/BUILD
index 907724d265..8d0717b54d 100644
--- a/BUILD
+++ b/BUILD
@@ -2688,6 +2688,7 @@ grpc_cc_library(
external_deps = ["absl/strings"],
language = "c++",
deps = [
+ "capture",
"gpr_base",
"grpc_base",
"grpc_service_config",
diff --git a/src/core/ext/filters/fault_injection/fault_injection_filter.cc b/src/core/ext/filters/fault_injection/fault_injection_filter.cc
index c93c084aa7..cdb01e5848 100644
--- a/src/core/ext/filters/fault_injection/fault_injection_filter.cc
+++ b/src/core/ext/filters/fault_injection/fault_injection_filter.cc
@@ -29,6 +29,7 @@
#include "src/core/ext/filters/fault_injection/service_config_parser.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/status_util.h"
+#include "src/core/lib/gprpp/capture.h"
#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/service_config/service_config_call_data.h"
@@ -62,6 +63,33 @@ inline bool UnderFraction(const uint32_t numerator,
return random_number < numerator;
}
+// Tracks an active faults lifetime.
+// Increments g_active_faults when created, and decrements it when destroyed.
+class FaultHandle {
+ public:
+ explicit FaultHandle(bool active) : active_(active) {
+ if (active) {
+ g_active_faults.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+ ~FaultHandle() {
+ if (active_) {
+ g_active_faults.fetch_sub(1, std::memory_order_relaxed);
+ }
+ }
+ FaultHandle(const FaultHandle&) = delete;
+ FaultHandle& operator=(const FaultHandle&) = delete;
+ FaultHandle(FaultHandle&& other) noexcept
+ : active_(absl::exchange(other.active_, false)) {}
+ FaultHandle& operator=(FaultHandle&& other) noexcept {
+ std::swap(active_, other.active_);
+ return *this;
+ }
+
+ private:
+ bool active_;
+};
+
} // namespace
class FaultInjectionFilter::InjectionDecision {
@@ -73,15 +101,16 @@ class FaultInjectionFilter::InjectionDecision {
abort_request_(abort_request) {}
std::string ToString() const;
- Timestamp DelayUntil() const;
+ Timestamp DelayUntil();
absl::Status MaybeAbort() const;
private:
- bool HaveActiveFaultsQuota(bool increment) const;
+ bool HaveActiveFaultsQuota() const;
uint32_t max_faults_;
Duration delay_time_;
absl::optional<absl::Status> abort_request_;
+ FaultHandle active_fault_{false};
};
absl::StatusOr<FaultInjectionFilter> FaultInjectionFilter::Create(
@@ -104,9 +133,12 @@ ArenaPromise<ServerMetadataHandle> FaultInjectionFilter::MakeCallPromise(
gpr_log(GPR_INFO, "chand=%p: Fault injection triggered %s", this,
decision.ToString().c_str());
}
+ auto delay = decision.DelayUntil();
return TrySeq(
- Sleep(decision.DelayUntil()),
- [decision]() { return decision.MaybeAbort(); },
+ Sleep(delay),
+ Capture(
+ [](InjectionDecision* decision) { return decision->MaybeAbort(); },
+ std::move(decision)),
next_promise_factory(std::move(call_args)));
}
@@ -190,17 +222,13 @@ FaultInjectionFilter::MakeInjectionDecision(
: absl::nullopt);
}
-bool FaultInjectionFilter::InjectionDecision::HaveActiveFaultsQuota(
- bool increment) const {
- if (g_active_faults.load(std::memory_order_acquire) >= max_faults_) {
- return false;
- }
- if (increment) g_active_faults.fetch_add(1, std::memory_order_relaxed);
- return true;
+bool FaultInjectionFilter::InjectionDecision::HaveActiveFaultsQuota() const {
+ return g_active_faults.load(std::memory_order_acquire) < max_faults_;
}
-Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil() const {
- if (delay_time_ != Duration::Zero() && HaveActiveFaultsQuota(true)) {
+Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil() {
+ if (delay_time_ != Duration::Zero() && HaveActiveFaultsQuota()) {
+ active_fault_ = FaultHandle{true};
return ExecCtx::Get()->Now() + delay_time_;
}
return Timestamp::InfPast();
@@ -208,7 +236,7 @@ Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil() const {
absl::Status FaultInjectionFilter::InjectionDecision::MaybeAbort() const {
if (abort_request_.has_value() &&
- (delay_time_ != Duration::Zero() || HaveActiveFaultsQuota(false))) {
+ (delay_time_ != Duration::Zero() || HaveActiveFaultsQuota())) {
return abort_request_.value();
}
return absl::OkStatus();