summaryrefslogtreecommitdiff
path: root/grpc/src/core/ext/filters/fault_injection/fault_injection_filter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/ext/filters/fault_injection/fault_injection_filter.cc')
-rw-r--r--grpc/src/core/ext/filters/fault_injection/fault_injection_filter.cc501
1 files changed, 501 insertions, 0 deletions
diff --git a/grpc/src/core/ext/filters/fault_injection/fault_injection_filter.cc b/grpc/src/core/ext/filters/fault_injection/fault_injection_filter.cc
new file mode 100644
index 00000000..5f8d7794
--- /dev/null
+++ b/grpc/src/core/ext/filters/fault_injection/fault_injection_filter.cc
@@ -0,0 +1,501 @@
+//
+// Copyright 2021 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/fault_injection/fault_injection_filter.h"
+
+#include "absl/strings/numbers.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/ext/filters/client_channel/service_config.h"
+#include "src/core/ext/filters/client_channel/service_config_call_data.h"
+#include "src/core/ext/filters/fault_injection/service_config_parser.h"
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/channel/status_util.h"
+#include "src/core/lib/gprpp/atomic.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/transport/status_conversion.h"
+
+namespace grpc_core {
+
+TraceFlag grpc_fault_injection_filter_trace(false, "fault_injection_filter");
+
+namespace {
+
+Atomic<uint32_t> g_active_faults{0};
+static_assert(
+ std::is_trivially_destructible<Atomic<uint32_t>>::value,
+ "the active fault counter needs to have a trivially destructible type");
+
+inline int GetLinkedMetadatumValueInt(grpc_linked_mdelem* md) {
+ int res;
+ if (absl::SimpleAtoi(StringViewFromSlice(GRPC_MDVALUE(md->md)), &res)) {
+ return res;
+ } else {
+ return -1;
+ }
+}
+
+inline uint32_t GetLinkedMetadatumValueUnsignedInt(grpc_linked_mdelem* md) {
+ uint32_t res;
+ if (absl::SimpleAtoi(StringViewFromSlice(GRPC_MDVALUE(md->md)), &res)) {
+ return res;
+ } else {
+ return -1;
+ }
+}
+
+inline int64_t GetLinkedMetadatumValueInt64(grpc_linked_mdelem* md) {
+ int64_t res;
+ if (absl::SimpleAtoi(StringViewFromSlice(GRPC_MDVALUE(md->md)), &res)) {
+ return res;
+ } else {
+ return -1;
+ }
+}
+
+inline bool UnderFraction(const uint32_t numerator,
+ const uint32_t denominator) {
+ if (numerator <= 0) return false;
+ if (numerator >= denominator) return true;
+ // Generate a random number in [0, denominator).
+ const uint32_t random_number = rand() % denominator;
+ return random_number < numerator;
+}
+
+class ChannelData {
+ public:
+ static grpc_error_handle Init(grpc_channel_element* elem,
+ grpc_channel_element_args* args);
+ static void Destroy(grpc_channel_element* elem);
+
+ int index() const { return index_; }
+
+ private:
+ ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args);
+ ~ChannelData() = default;
+
+ // The relative index of instances of the same filter.
+ int index_;
+};
+
+class CallData {
+ public:
+ static grpc_error_handle Init(grpc_call_element* elem,
+ const grpc_call_element_args* args);
+
+ static void Destroy(grpc_call_element* elem,
+ const grpc_call_final_info* /*final_info*/,
+ grpc_closure* /*then_schedule_closure*/);
+
+ static void StartTransportStreamOpBatch(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
+
+ private:
+ class ResumeBatchCanceller;
+
+ CallData(grpc_call_element* elem, const grpc_call_element_args* args);
+ ~CallData();
+
+ void DecideWhetherToInjectFaults(grpc_metadata_batch* initial_metadata);
+
+ // Checks if current active faults exceed the allowed max faults.
+ bool HaveActiveFaultsQuota(bool increment);
+
+ // Returns true if this RPC needs to be delayed. If so, this call will be
+ // counted as an active fault.
+ bool MaybeDelay();
+
+ // Returns the aborted RPC status if this RPC needs to be aborted. If so,
+ // this call will be counted as an active fault. Otherwise, it returns
+ // GRPC_ERROR_NONE.
+ // If this call is already been delay injected, skip the active faults
+ // quota check.
+ grpc_error_handle MaybeAbort();
+
+ // Delays the stream operations batch.
+ void DelayBatch(grpc_call_element* elem,
+ grpc_transport_stream_op_batch* batch);
+
+ // Cancels the delay timer.
+ void CancelDelayTimer() { grpc_timer_cancel(&delay_timer_); }
+
+ // Finishes the fault injection, should only be called once.
+ void FaultInjectionFinished() {
+ g_active_faults.FetchSub(1, MemoryOrder::RELAXED);
+ }
+
+ // This is a callback that will be invoked after the delay timer is up.
+ static void ResumeBatch(void* arg, grpc_error_handle error);
+
+ // This is a callback invoked upon completion of recv_trailing_metadata.
+ // Injects the abort_error_ to the recv_trailing_metadata batch if needed.
+ static void HijackedRecvTrailingMetadataReady(void* arg, grpc_error_handle);
+
+ // Used to track the policy structs that needs to be destroyed in dtor.
+ bool fi_policy_owned_ = false;
+ const FaultInjectionMethodParsedConfig::FaultInjectionPolicy* fi_policy_;
+ grpc_call_stack* owning_call_;
+ Arena* arena_;
+ CallCombiner* call_combiner_;
+
+ // Indicates whether we are doing a delay and/or an abort for this call.
+ bool delay_request_ = false;
+ bool abort_request_ = false;
+
+ // Delay states
+ grpc_timer delay_timer_ ABSL_GUARDED_BY(delay_mu_);
+ ResumeBatchCanceller* resume_batch_canceller_ ABSL_GUARDED_BY(delay_mu_);
+ grpc_transport_stream_op_batch* delayed_batch_ ABSL_GUARDED_BY(delay_mu_);
+ // Abort states
+ grpc_error_handle abort_error_ = GRPC_ERROR_NONE;
+ grpc_closure recv_trailing_metadata_ready_;
+ grpc_closure* original_recv_trailing_metadata_ready_;
+ // Protects the asynchronous delay, resume, and cancellation.
+ Mutex delay_mu_;
+};
+
+// ChannelData
+
+grpc_error_handle ChannelData::Init(grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ GPR_ASSERT(elem->filter == &FaultInjectionFilterVtable);
+ new (elem->channel_data) ChannelData(elem, args);
+ return GRPC_ERROR_NONE;
+}
+
+void ChannelData::Destroy(grpc_channel_element* elem) {
+ auto* chand = static_cast<ChannelData*>(elem->channel_data);
+ chand->~ChannelData();
+}
+
+ChannelData::ChannelData(grpc_channel_element* elem,
+ grpc_channel_element_args* args)
+ : index_(grpc_channel_stack_filter_instance_number(args->channel_stack,
+ elem)) {}
+
+// CallData::ResumeBatchCanceller
+
+class CallData::ResumeBatchCanceller {
+ public:
+ explicit ResumeBatchCanceller(grpc_call_element* elem) : elem_(elem) {
+ auto* calld = static_cast<CallData*>(elem->call_data);
+ GRPC_CALL_STACK_REF(calld->owning_call_, "ResumeBatchCanceller");
+ GRPC_CLOSURE_INIT(&closure_, &Cancel, this, grpc_schedule_on_exec_ctx);
+ calld->call_combiner_->SetNotifyOnCancel(&closure_);
+ }
+
+ private:
+ static void Cancel(void* arg, grpc_error_handle error) {
+ auto* self = static_cast<ResumeBatchCanceller*>(arg);
+ auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
+ auto* calld = static_cast<CallData*>(self->elem_->call_data);
+ {
+ MutexLock lock(&calld->delay_mu_);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: cancelling schdueled pick: "
+ "error=%s self=%p calld->resume_batch_canceller_=%p",
+ chand, calld, grpc_error_std_string(error).c_str(), self,
+ calld->resume_batch_canceller_);
+ }
+ if (error != GRPC_ERROR_NONE && calld->resume_batch_canceller_ == self) {
+ // Cancel the delayed pick.
+ calld->CancelDelayTimer();
+ calld->FaultInjectionFinished();
+ // Fail pending batches on the call.
+ grpc_transport_stream_op_batch_finish_with_failure(
+ calld->delayed_batch_, GRPC_ERROR_REF(error),
+ calld->call_combiner_);
+ }
+ }
+ GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResumeBatchCanceller");
+ delete self;
+ }
+
+ grpc_call_element* elem_;
+ grpc_closure closure_;
+};
+
+// CallData
+
+grpc_error_handle CallData::Init(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ auto* calld = new (elem->call_data) CallData(elem, args);
+ if (calld->fi_policy_ == nullptr) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "failed to find fault injection policy");
+ }
+ return GRPC_ERROR_NONE;
+}
+
+void CallData::Destroy(grpc_call_element* elem,
+ const grpc_call_final_info* /*final_info*/,
+ grpc_closure* /*then_schedule_closure*/) {
+ auto* calld = static_cast<CallData*>(elem->call_data);
+ calld->~CallData();
+}
+
+void CallData::StartTransportStreamOpBatch(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ auto* calld = static_cast<CallData*>(elem->call_data);
+ // There should only be one send_initial_metdata op, and fault injection also
+ // only need to be enforced once.
+ if (batch->send_initial_metadata) {
+ calld->DecideWhetherToInjectFaults(
+ batch->payload->send_initial_metadata.send_initial_metadata);
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: Fault injection triggered delay=%d abort=%d",
+ elem->channel_data, calld, calld->delay_request_,
+ calld->abort_request_);
+ }
+ if (calld->MaybeDelay()) {
+ // Delay the batch, and pass down the batch in the scheduled closure.
+ calld->DelayBatch(elem, batch);
+ return;
+ }
+ grpc_error_handle abort_error = calld->MaybeAbort();
+ if (abort_error != GRPC_ERROR_NONE) {
+ calld->abort_error_ = abort_error;
+ grpc_transport_stream_op_batch_finish_with_failure(
+ batch, GRPC_ERROR_REF(calld->abort_error_), calld->call_combiner_);
+ return;
+ }
+ } else {
+ if (batch->recv_trailing_metadata) {
+ // Intercept recv_trailing_metadata callback so that we can inject the
+ // failure when aborting streaming calls, because their
+ // recv_trailing_metatdata op may not be on the same batch as the
+ // send_initial_metadata op.
+ calld->original_recv_trailing_metadata_ready_ =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready_;
+ }
+ if (calld->abort_error_ != GRPC_ERROR_NONE) {
+ // If we already decided to abort, then immediately fail this batch.
+ grpc_transport_stream_op_batch_finish_with_failure(
+ batch, GRPC_ERROR_REF(calld->abort_error_), calld->call_combiner_);
+ return;
+ }
+ }
+ // Chain to the next filter.
+ grpc_call_next_op(elem, batch);
+}
+
+CallData::CallData(grpc_call_element* elem, const grpc_call_element_args* args)
+ : owning_call_(args->call_stack),
+ arena_(args->arena),
+ call_combiner_(args->call_combiner) {
+ auto* chand = static_cast<ChannelData*>(elem->channel_data);
+ // Fetch the fault injection policy from the service config, based on the
+ // relative index for which policy should this CallData use.
+ auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
+ args->context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
+ auto* method_params = static_cast<FaultInjectionMethodParsedConfig*>(
+ service_config_call_data->GetMethodParsedConfig(
+ FaultInjectionServiceConfigParser::ParserIndex()));
+ if (method_params != nullptr) {
+ fi_policy_ = method_params->fault_injection_policy(chand->index());
+ }
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
+ HijackedRecvTrailingMetadataReady, elem,
+ grpc_schedule_on_exec_ctx);
+}
+
+CallData::~CallData() {
+ if (fi_policy_owned_) {
+ fi_policy_->~FaultInjectionPolicy();
+ }
+ GRPC_ERROR_UNREF(abort_error_);
+}
+
+void CallData::DecideWhetherToInjectFaults(
+ grpc_metadata_batch* initial_metadata) {
+ FaultInjectionMethodParsedConfig::FaultInjectionPolicy* copied_policy =
+ nullptr;
+ // Update the policy with values in initial metadata.
+ if (!fi_policy_->abort_code_header.empty() ||
+ !fi_policy_->abort_percentage_header.empty() ||
+ !fi_policy_->delay_header.empty() ||
+ !fi_policy_->delay_percentage_header.empty()) {
+ // Defer the actual copy until the first matched header.
+ auto maybe_copy_policy_func = [this, &copied_policy]() {
+ if (copied_policy == nullptr) {
+ copied_policy =
+ arena_->New<FaultInjectionMethodParsedConfig::FaultInjectionPolicy>(
+ *fi_policy_);
+ }
+ };
+ for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
+ md = md->next) {
+ absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
+ // Only perform string comparison if:
+ // 1. Needs to check this header;
+ // 2. The value is not been filled before.
+ if (!fi_policy_->abort_code_header.empty() &&
+ (copied_policy == nullptr ||
+ copied_policy->abort_code == GRPC_STATUS_OK) &&
+ key == fi_policy_->abort_code_header) {
+ maybe_copy_policy_func();
+ grpc_status_code_from_int(GetLinkedMetadatumValueInt(md),
+ &copied_policy->abort_code);
+ }
+ if (!fi_policy_->abort_percentage_header.empty() &&
+ key == fi_policy_->abort_percentage_header) {
+ maybe_copy_policy_func();
+ copied_policy->abort_percentage_numerator =
+ GPR_MIN(GetLinkedMetadatumValueUnsignedInt(md),
+ fi_policy_->abort_percentage_numerator);
+ }
+ if (!fi_policy_->delay_header.empty() &&
+ (copied_policy == nullptr || copied_policy->delay == 0) &&
+ key == fi_policy_->delay_header) {
+ maybe_copy_policy_func();
+ copied_policy->delay = static_cast<grpc_millis>(
+ GPR_MAX(GetLinkedMetadatumValueInt64(md), 0));
+ }
+ if (!fi_policy_->delay_percentage_header.empty() &&
+ key == fi_policy_->delay_percentage_header) {
+ maybe_copy_policy_func();
+ copied_policy->delay_percentage_numerator =
+ GPR_MIN(GetLinkedMetadatumValueUnsignedInt(md),
+ fi_policy_->delay_percentage_numerator);
+ }
+ }
+ if (copied_policy != nullptr) fi_policy_ = copied_policy;
+ }
+ // Roll the dice
+ delay_request_ = fi_policy_->delay != 0 &&
+ UnderFraction(fi_policy_->delay_percentage_numerator,
+ fi_policy_->delay_percentage_denominator);
+ abort_request_ = fi_policy_->abort_code != GRPC_STATUS_OK &&
+ UnderFraction(fi_policy_->abort_percentage_numerator,
+ fi_policy_->abort_percentage_denominator);
+ if (!delay_request_ && !abort_request_) {
+ if (copied_policy != nullptr) copied_policy->~FaultInjectionPolicy();
+ // No fault injection for this call
+ } else {
+ fi_policy_owned_ = copied_policy != nullptr;
+ }
+}
+
+bool CallData::HaveActiveFaultsQuota(bool increment) {
+ if (g_active_faults.Load(MemoryOrder::ACQUIRE) >= fi_policy_->max_faults) {
+ return false;
+ }
+ if (increment) g_active_faults.FetchAdd(1, MemoryOrder::RELAXED);
+ return true;
+}
+
+bool CallData::MaybeDelay() {
+ if (delay_request_) {
+ return HaveActiveFaultsQuota(true);
+ }
+ return false;
+}
+
+grpc_error_handle CallData::MaybeAbort() {
+ if (abort_request_ && (delay_request_ || HaveActiveFaultsQuota(false))) {
+ return grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(fi_policy_->abort_message.c_str()),
+ GRPC_ERROR_INT_GRPC_STATUS, fi_policy_->abort_code);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+void CallData::DelayBatch(grpc_call_element* elem,
+ grpc_transport_stream_op_batch* batch) {
+ MutexLock lock(&delay_mu_);
+ delayed_batch_ = batch;
+ resume_batch_canceller_ = new ResumeBatchCanceller(elem);
+ grpc_millis resume_time = ExecCtx::Get()->Now() + fi_policy_->delay;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure, ResumeBatch, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_timer_init(&delay_timer_, resume_time, &batch->handler_private.closure);
+}
+
+void CallData::ResumeBatch(void* arg, grpc_error_handle error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+ auto* calld = static_cast<CallData*>(elem->call_data);
+ MutexLock lock(&calld->delay_mu_);
+ // Cancelled or canceller has already run
+ if (error == GRPC_ERROR_CANCELLED ||
+ calld->resume_batch_canceller_ == nullptr) {
+ return;
+ }
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_fault_injection_filter_trace)) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: Resuming delayed stream op batch %p",
+ elem->channel_data, calld, calld->delayed_batch_);
+ }
+ // Lame the canceller
+ calld->resume_batch_canceller_ = nullptr;
+ // Finish fault injection.
+ calld->FaultInjectionFinished();
+ // Abort if needed.
+ error = calld->MaybeAbort();
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ calld->delayed_batch_, error, calld->call_combiner_);
+ return;
+ }
+ // Chain to the next filter.
+ grpc_call_next_op(elem, calld->delayed_batch_);
+}
+
+void CallData::HijackedRecvTrailingMetadataReady(void* arg,
+ grpc_error_handle error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+ auto* calld = static_cast<CallData*>(elem->call_data);
+ if (calld->abort_error_ != GRPC_ERROR_NONE) {
+ error = grpc_error_add_child(GRPC_ERROR_REF(error),
+ GRPC_ERROR_REF(calld->abort_error_));
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
+ error);
+}
+
+} // namespace
+
+extern const grpc_channel_filter FaultInjectionFilterVtable = {
+ CallData::StartTransportStreamOpBatch,
+ grpc_channel_next_op,
+ sizeof(CallData),
+ CallData::Init,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ CallData::Destroy,
+ sizeof(ChannelData),
+ ChannelData::Init,
+ ChannelData::Destroy,
+ grpc_channel_next_get_info,
+ "fault_injection_filter",
+};
+
+void FaultInjectionFilterInit(void) {
+ grpc_core::FaultInjectionServiceConfigParser::Register();
+}
+
+void FaultInjectionFilterShutdown(void) {}
+
+} // namespace grpc_core