summaryrefslogtreecommitdiff
path: root/grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r--grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc97
1 files changed, 49 insertions, 48 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 6a46a0bb..4b4f1ab6 100644
--- a/grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -90,6 +90,8 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/server_address.h"
+#include "src/core/lib/address_utils/parse_address.h"
+#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
@@ -98,9 +100,7 @@
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
-#include "src/core/lib/iomgr/parse_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
-#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@@ -186,17 +186,17 @@ class GrpcLb : public LoadBalancingPolicy {
void ScheduleNextClientLoadReportLocked();
void SendClientLoadReportLocked();
- static void MaybeSendClientLoadReport(void* arg, grpc_error* error);
- static void ClientLoadReportDone(void* arg, grpc_error* error);
- static void OnInitialRequestSent(void* arg, grpc_error* error);
- static void OnBalancerMessageReceived(void* arg, grpc_error* error);
- static void OnBalancerStatusReceived(void* arg, grpc_error* error);
+ static void MaybeSendClientLoadReport(void* arg, grpc_error_handle error);
+ static void ClientLoadReportDone(void* arg, grpc_error_handle error);
+ static void OnInitialRequestSent(void* arg, grpc_error_handle error);
+ static void OnBalancerMessageReceived(void* arg, grpc_error_handle error);
+ static void OnBalancerStatusReceived(void* arg, grpc_error_handle error);
- void MaybeSendClientLoadReportLocked(grpc_error* error);
- void ClientLoadReportDoneLocked(grpc_error* error);
+ void MaybeSendClientLoadReportLocked(grpc_error_handle error);
+ void ClientLoadReportDoneLocked(grpc_error_handle error);
void OnInitialRequestSentLocked();
void OnBalancerMessageReceivedLocked();
- void OnBalancerStatusReceivedLocked(grpc_error* error);
+ void OnBalancerStatusReceivedLocked(grpc_error_handle error);
// The owning LB policy.
RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
@@ -410,14 +410,14 @@ class GrpcLb : public LoadBalancingPolicy {
// Methods for dealing with fallback state.
void MaybeEnterFallbackModeAfterStartup();
- static void OnFallbackTimer(void* arg, grpc_error* error);
- void OnFallbackTimerLocked(grpc_error* error);
+ static void OnFallbackTimer(void* arg, grpc_error_handle error);
+ void OnFallbackTimerLocked(grpc_error_handle error);
// Methods for dealing with the balancer call.
void StartBalancerCallLocked();
void StartBalancerCallRetryTimerLocked();
- static void OnBalancerCallRetryTimer(void* arg, grpc_error* error);
- void OnBalancerCallRetryTimerLocked(grpc_error* error);
+ static void OnBalancerCallRetryTimer(void* arg, grpc_error_handle error);
+ void OnBalancerCallRetryTimerLocked(grpc_error_handle error);
// Methods for dealing with the child policy.
grpc_channel_args* CreateChildPolicyArgsLocked(
@@ -893,6 +893,10 @@ void GrpcLb::BalancerCallState::StartQuery() {
}
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
+ // InvalidateNow to avoid getting stuck re-initializing this timer
+ // in a loop while draining the currently-held WorkSerializer.
+ // Also see https://github.com/grpc/grpc/issues/26079.
+ ExecCtx::Get()->InvalidateNow();
const grpc_millis next_client_load_report_time =
ExecCtx::Get()->Now() + client_stats_report_interval_;
GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport,
@@ -902,8 +906,8 @@ void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
client_load_report_timer_callback_pending_ = true;
}
-void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(void* arg,
- grpc_error* error) {
+void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(
+ void* arg, grpc_error_handle error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
lb_calld->grpclb_policy()->work_serializer()->Run(
@@ -912,7 +916,7 @@ void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(void* arg,
}
void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
- grpc_error* error) {
+ grpc_error_handle error) {
client_load_report_timer_callback_pending_ = false;
if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) {
Unref(DEBUG_LOCATION, "client_load_report");
@@ -982,7 +986,7 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
}
void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
- grpc_error* error) {
+ grpc_error_handle error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
lb_calld->grpclb_policy()->work_serializer()->Run(
@@ -990,7 +994,8 @@ void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
DEBUG_LOCATION);
}
-void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(grpc_error* error) {
+void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(
+ grpc_error_handle error) {
grpc_byte_buffer_destroy(send_message_payload_);
send_message_payload_ = nullptr;
if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) {
@@ -1001,8 +1006,8 @@ void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(grpc_error* error) {
ScheduleNextClientLoadReportLocked();
}
-void GrpcLb::BalancerCallState::OnInitialRequestSent(void* arg,
- grpc_error* /*error*/) {
+void GrpcLb::BalancerCallState::OnInitialRequestSent(
+ void* arg, grpc_error_handle /*error*/) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->work_serializer()->Run(
[lb_calld]() { lb_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION);
@@ -1021,7 +1026,7 @@ void GrpcLb::BalancerCallState::OnInitialRequestSentLocked() {
}
void GrpcLb::BalancerCallState::OnBalancerMessageReceived(
- void* arg, grpc_error* /*error*/) {
+ void* arg, grpc_error_handle /*error*/) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->work_serializer()->Run(
[lb_calld]() { lb_calld->OnBalancerMessageReceivedLocked(); },
@@ -1183,8 +1188,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
}
}
-void GrpcLb::BalancerCallState::OnBalancerStatusReceived(void* arg,
- grpc_error* error) {
+void GrpcLb::BalancerCallState::OnBalancerStatusReceived(
+ void* arg, grpc_error_handle error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
GRPC_ERROR_REF(error); // owned by lambda
lb_calld->grpclb_policy()->work_serializer()->Run(
@@ -1193,7 +1198,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceived(void* arg,
}
void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
- grpc_error* error) {
+ grpc_error_handle error) {
GPR_ASSERT(lb_call_ != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
char* status_details = grpc_slice_to_c_string(lb_call_status_details_);
@@ -1201,7 +1206,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
"[grpclb %p] lb_calld=%p: Status from LB server received. "
"Status = %d, details = '%s', (lb_call: %p), error '%s'",
grpclb_policy(), this, lb_call_status_, status_details, lb_call_,
- grpc_error_string(error));
+ grpc_error_std_string(error).c_str());
gpr_free(status_details);
}
GRPC_ERROR_UNREF(error);
@@ -1259,12 +1264,10 @@ ServerAddressList ExtractBalancerAddresses(const grpc_channel_args& args) {
* stream for the reception of load balancing updates.
*
* Inputs:
- * - \a addresses: corresponding to the balancers.
* - \a response_generator: in order to propagate updates from the resolver
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
grpc_channel_args* BuildBalancerChannelArgs(
- const ServerAddressList& addresses,
FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
// Channel args to remove.
@@ -1313,7 +1316,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
args_to_add.size());
// Make any necessary modifications for security.
- return ModifyGrpclbBalancerChannelArgs(addresses, new_args);
+ return ModifyGrpclbBalancerChannelArgs(new_args);
}
//
@@ -1422,13 +1425,12 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE before the timer fires, we go into
// fallback mode even if the fallback timeout has not elapsed.
- grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(lb_channel_));
- GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+ ClientChannel* client_channel = ClientChannel::GetFromChannel(lb_channel_);
+ GPR_ASSERT(client_channel != nullptr);
// Ref held by callback.
watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
- grpc_client_channel_start_connectivity_watch(
- client_channel_elem, GRPC_CHANNEL_IDLE,
+ client_channel->AddConnectivityWatcher(
+ GRPC_CHANNEL_IDLE,
OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
// Start balancer call.
StartBalancerCallLocked();
@@ -1464,8 +1466,8 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked(
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
// Construct args for balancer channel.
ServerAddressList balancer_addresses = ExtractBalancerAddresses(args);
- grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(
- balancer_addresses, response_generator_.get(), &args);
+ grpc_channel_args* lb_channel_args =
+ BuildBalancerChannelArgs(response_generator_.get(), &args);
// Create balancer channel if needed.
if (lb_channel_ == nullptr) {
std::string uri_str = absl::StrCat("fake:///", server_name_);
@@ -1492,10 +1494,9 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked(
}
void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
- grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(lb_channel_));
- GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
- grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
+ ClientChannel* client_channel = ClientChannel::GetFromChannel(lb_channel_);
+ GPR_ASSERT(client_channel != nullptr);
+ client_channel->RemoveConnectivityWatcher(watcher_);
}
//
@@ -1538,7 +1539,7 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
}
-void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) {
+void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error_handle error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
grpclb_policy->work_serializer()->Run(
@@ -1548,7 +1549,7 @@ void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) {
DEBUG_LOCATION);
}
-void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error* error) {
+void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error_handle error) {
retry_timer_callback_pending_ = false;
if (!shutting_down_ && error == GRPC_ERROR_NONE && lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
@@ -1582,7 +1583,7 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
}
}
-void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) {
+void GrpcLb::OnFallbackTimer(void* arg, grpc_error_handle error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
grpclb_policy->work_serializer()->Run(
@@ -1590,7 +1591,7 @@ void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) {
DEBUG_LOCATION);
}
-void GrpcLb::OnFallbackTimerLocked(grpc_error* error) {
+void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) {
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (fallback_at_startup_checks_pending_ && !shutting_down_ &&
@@ -1694,12 +1695,12 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
const char* name() const override { return kGrpclb; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
- const Json& json, grpc_error** error) const override {
+ const Json& json, grpc_error_handle* error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
return MakeRefCounted<GrpcLbConfig>(nullptr, "");
}
- std::vector<grpc_error*> error_list;
+ std::vector<grpc_error_handle> error_list;
Json child_policy_config_json_tmp;
const Json* child_policy_config_json;
std::string service_name;
@@ -1722,12 +1723,12 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
} else {
child_policy_config_json = &it->second;
}
- grpc_error* parse_error = GRPC_ERROR_NONE;
+ grpc_error_handle parse_error = GRPC_ERROR_NONE;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
*child_policy_config_json, &parse_error);
if (parse_error != GRPC_ERROR_NONE) {
- std::vector<grpc_error*> child_errors;
+ std::vector<grpc_error_handle> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));