diff options
Diffstat (limited to 'grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r-- | grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 97 |
1 files changed, 49 insertions, 48 deletions
diff --git a/grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 6a46a0bb..4b4f1ab6 100644 --- a/grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/grpc/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -90,6 +90,8 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/lib/address_utils/parse_address.h" +#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" @@ -98,9 +100,7 @@ #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/parse_address.h" #include "src/core/lib/iomgr/sockaddr.h" -#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -186,17 +186,17 @@ class GrpcLb : public LoadBalancingPolicy { void ScheduleNextClientLoadReportLocked(); void SendClientLoadReportLocked(); - static void MaybeSendClientLoadReport(void* arg, grpc_error* error); - static void ClientLoadReportDone(void* arg, grpc_error* error); - static void OnInitialRequestSent(void* arg, grpc_error* error); - static void OnBalancerMessageReceived(void* arg, grpc_error* error); - static void OnBalancerStatusReceived(void* arg, grpc_error* error); + static void MaybeSendClientLoadReport(void* arg, grpc_error_handle error); + static void ClientLoadReportDone(void* arg, grpc_error_handle error); + static void OnInitialRequestSent(void* arg, grpc_error_handle error); + static void OnBalancerMessageReceived(void* arg, grpc_error_handle error); + static void OnBalancerStatusReceived(void* arg, grpc_error_handle error); - void MaybeSendClientLoadReportLocked(grpc_error* error); - void ClientLoadReportDoneLocked(grpc_error* error); + void MaybeSendClientLoadReportLocked(grpc_error_handle error); + void ClientLoadReportDoneLocked(grpc_error_handle error); void OnInitialRequestSentLocked(); void OnBalancerMessageReceivedLocked(); - void OnBalancerStatusReceivedLocked(grpc_error* error); + void OnBalancerStatusReceivedLocked(grpc_error_handle error); // The owning LB policy. RefCountedPtr<LoadBalancingPolicy> grpclb_policy_; @@ -410,14 +410,14 @@ class GrpcLb : public LoadBalancingPolicy { // Methods for dealing with fallback state. void MaybeEnterFallbackModeAfterStartup(); - static void OnFallbackTimer(void* arg, grpc_error* error); - void OnFallbackTimerLocked(grpc_error* error); + static void OnFallbackTimer(void* arg, grpc_error_handle error); + void OnFallbackTimerLocked(grpc_error_handle error); // Methods for dealing with the balancer call. void StartBalancerCallLocked(); void StartBalancerCallRetryTimerLocked(); - static void OnBalancerCallRetryTimer(void* arg, grpc_error* error); - void OnBalancerCallRetryTimerLocked(grpc_error* error); + static void OnBalancerCallRetryTimer(void* arg, grpc_error_handle error); + void OnBalancerCallRetryTimerLocked(grpc_error_handle error); // Methods for dealing with the child policy. grpc_channel_args* CreateChildPolicyArgsLocked( @@ -893,6 +893,10 @@ void GrpcLb::BalancerCallState::StartQuery() { } void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { + // InvalidateNow to avoid getting stuck re-initializing this timer + // in a loop while draining the currently-held WorkSerializer. + // Also see https://github.com/grpc/grpc/issues/26079. + ExecCtx::Get()->InvalidateNow(); const grpc_millis next_client_load_report_time = ExecCtx::Get()->Now() + client_stats_report_interval_; GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport, @@ -902,8 +906,8 @@ void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { client_load_report_timer_callback_pending_ = true; } -void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(void* arg, - grpc_error* error) { +void GrpcLb::BalancerCallState::MaybeSendClientLoadReport( + void* arg, grpc_error_handle error) { BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); GRPC_ERROR_REF(error); // ref owned by lambda lb_calld->grpclb_policy()->work_serializer()->Run( @@ -912,7 +916,7 @@ void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(void* arg, } void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked( - grpc_error* error) { + grpc_error_handle error) { client_load_report_timer_callback_pending_ = false; if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) { Unref(DEBUG_LOCATION, "client_load_report"); @@ -982,7 +986,7 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { } void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg, - grpc_error* error) { + grpc_error_handle error) { BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); GRPC_ERROR_REF(error); // ref owned by lambda lb_calld->grpclb_policy()->work_serializer()->Run( @@ -990,7 +994,8 @@ void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg, DEBUG_LOCATION); } -void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(grpc_error* error) { +void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked( + grpc_error_handle error) { grpc_byte_buffer_destroy(send_message_payload_); send_message_payload_ = nullptr; if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) { @@ -1001,8 +1006,8 @@ void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(grpc_error* error) { ScheduleNextClientLoadReportLocked(); } -void GrpcLb::BalancerCallState::OnInitialRequestSent(void* arg, - grpc_error* /*error*/) { +void GrpcLb::BalancerCallState::OnInitialRequestSent( + void* arg, grpc_error_handle /*error*/) { BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); lb_calld->grpclb_policy()->work_serializer()->Run( [lb_calld]() { lb_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION); @@ -1021,7 +1026,7 @@ void GrpcLb::BalancerCallState::OnInitialRequestSentLocked() { } void GrpcLb::BalancerCallState::OnBalancerMessageReceived( - void* arg, grpc_error* /*error*/) { + void* arg, grpc_error_handle /*error*/) { BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); lb_calld->grpclb_policy()->work_serializer()->Run( [lb_calld]() { lb_calld->OnBalancerMessageReceivedLocked(); }, @@ -1183,8 +1188,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() { } } -void GrpcLb::BalancerCallState::OnBalancerStatusReceived(void* arg, - grpc_error* error) { +void GrpcLb::BalancerCallState::OnBalancerStatusReceived( + void* arg, grpc_error_handle error) { BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); GRPC_ERROR_REF(error); // owned by lambda lb_calld->grpclb_policy()->work_serializer()->Run( @@ -1193,7 +1198,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceived(void* arg, } void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( - grpc_error* error) { + grpc_error_handle error) { GPR_ASSERT(lb_call_ != nullptr); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { char* status_details = grpc_slice_to_c_string(lb_call_status_details_); @@ -1201,7 +1206,7 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( "[grpclb %p] lb_calld=%p: Status from LB server received. " "Status = %d, details = '%s', (lb_call: %p), error '%s'", grpclb_policy(), this, lb_call_status_, status_details, lb_call_, - grpc_error_string(error)); + grpc_error_std_string(error).c_str()); gpr_free(status_details); } GRPC_ERROR_UNREF(error); @@ -1259,12 +1264,10 @@ ServerAddressList ExtractBalancerAddresses(const grpc_channel_args& args) { * stream for the reception of load balancing updates. * * Inputs: - * - \a addresses: corresponding to the balancers. * - \a response_generator: in order to propagate updates from the resolver * above the grpclb policy. * - \a args: other args inherited from the grpclb policy. */ grpc_channel_args* BuildBalancerChannelArgs( - const ServerAddressList& addresses, FakeResolverResponseGenerator* response_generator, const grpc_channel_args* args) { // Channel args to remove. @@ -1313,7 +1316,7 @@ grpc_channel_args* BuildBalancerChannelArgs( args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(), args_to_add.size()); // Make any necessary modifications for security. - return ModifyGrpclbBalancerChannelArgs(addresses, new_args); + return ModifyGrpclbBalancerChannelArgs(new_args); } // @@ -1422,13 +1425,12 @@ void GrpcLb::UpdateLocked(UpdateArgs args) { // Start watching the channel's connectivity state. If the channel // goes into state TRANSIENT_FAILURE before the timer fires, we go into // fallback mode even if the fallback timeout has not elapsed. - grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(lb_channel_)); - GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + ClientChannel* client_channel = ClientChannel::GetFromChannel(lb_channel_); + GPR_ASSERT(client_channel != nullptr); // Ref held by callback. watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher")); - grpc_client_channel_start_connectivity_watch( - client_channel_elem, GRPC_CHANNEL_IDLE, + client_channel->AddConnectivityWatcher( + GRPC_CHANNEL_IDLE, OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_)); // Start balancer call. StartBalancerCallLocked(); @@ -1464,8 +1466,8 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked( &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); // Construct args for balancer channel. ServerAddressList balancer_addresses = ExtractBalancerAddresses(args); - grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs( - balancer_addresses, response_generator_.get(), &args); + grpc_channel_args* lb_channel_args = + BuildBalancerChannelArgs(response_generator_.get(), &args); // Create balancer channel if needed. if (lb_channel_ == nullptr) { std::string uri_str = absl::StrCat("fake:///", server_name_); @@ -1492,10 +1494,9 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked( } void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() { - grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(lb_channel_)); - GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_); + ClientChannel* client_channel = ClientChannel::GetFromChannel(lb_channel_); + GPR_ASSERT(client_channel != nullptr); + client_channel->RemoveConnectivityWatcher(watcher_); } // @@ -1538,7 +1539,7 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() { grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); } -void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) { +void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error_handle error) { GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); GRPC_ERROR_REF(error); // ref owned by lambda grpclb_policy->work_serializer()->Run( @@ -1548,7 +1549,7 @@ void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) { DEBUG_LOCATION); } -void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error* error) { +void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error_handle error) { retry_timer_callback_pending_ = false; if (!shutting_down_ && error == GRPC_ERROR_NONE && lb_calld_ == nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { @@ -1582,7 +1583,7 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() { } } -void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) { +void GrpcLb::OnFallbackTimer(void* arg, grpc_error_handle error) { GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); GRPC_ERROR_REF(error); // ref owned by lambda grpclb_policy->work_serializer()->Run( @@ -1590,7 +1591,7 @@ void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) { DEBUG_LOCATION); } -void GrpcLb::OnFallbackTimerLocked(grpc_error* error) { +void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) { // If we receive a serverlist after the timer fires but before this callback // actually runs, don't fall back. if (fallback_at_startup_checks_pending_ && !shutting_down_ && @@ -1694,12 +1695,12 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { const char* name() const override { return kGrpclb; } RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( - const Json& json, grpc_error** error) const override { + const Json& json, grpc_error_handle* error) const override { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); if (json.type() == Json::Type::JSON_NULL) { return MakeRefCounted<GrpcLbConfig>(nullptr, ""); } - std::vector<grpc_error*> error_list; + std::vector<grpc_error_handle> error_list; Json child_policy_config_json_tmp; const Json* child_policy_config_json; std::string service_name; @@ -1722,12 +1723,12 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { } else { child_policy_config_json = &it->second; } - grpc_error* parse_error = GRPC_ERROR_NONE; + grpc_error_handle parse_error = GRPC_ERROR_NONE; RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( *child_policy_config_json, &parse_error); if (parse_error != GRPC_ERROR_NONE) { - std::vector<grpc_error*> child_errors; + std::vector<grpc_error_handle> child_errors; child_errors.push_back(parse_error); error_list.push_back( GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); |