diff options
Diffstat (limited to 'grpc/src/cpp/client/channel_cc.cc')
-rw-r--r-- | grpc/src/cpp/client/channel_cc.cc | 52 |
1 files changed, 38 insertions, 14 deletions
diff --git a/grpc/src/cpp/client/channel_cc.cc b/grpc/src/cpp/client/channel_cc.cc index 1395c72e..a9f20839 100644 --- a/grpc/src/cpp/client/channel_cc.cc +++ b/grpc/src/cpp/client/channel_cc.cc @@ -38,7 +38,9 @@ #include <grpcpp/support/channel_arguments.h> #include <grpcpp/support/config.h> #include <grpcpp/support/status.h> + #include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/surface/completion_queue.h" namespace grpc { @@ -55,8 +57,14 @@ Channel::Channel(const std::string& host, grpc_channel* channel, Channel::~Channel() { grpc_channel_destroy(c_channel_); - if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { + if (grpc_iomgr_run_in_background()) { + // gRPC-core provides the backing needed for the preferred CQ type + callback_cq->Shutdown(); + } else { + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); + } } } @@ -139,9 +147,9 @@ void ChannelResetConnectionBackoff(Channel* channel) { // ClientRpcInfo should be set before call because set_call also checks // whether the call has been cancelled, and if the call was cancelled, we // should notify the interceptors too. - auto* info = - context->set_client_rpc_info(method.name(), method.method_type(), this, - interceptor_creators_, interceptor_pos); + auto* info = context->set_client_rpc_info( + method.name(), method.suffix_for_stats(), method.method_type(), this, + interceptor_creators_, interceptor_pos); context->set_call(c_call, shared_from_this()); return ::grpc::internal::Call(c_call, this, cq, info); @@ -236,17 +244,33 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { ::grpc::CompletionQueue* Channel::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-channel CQ registered + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire); + if (callback_cq != nullptr) { + return callback_cq; + } + // The callback_cq_ wasn't already set, so grab a lock and set it up exactly + // once for this channel. grpc::internal::MutexLock l(&mu_); - if (callback_cq_ == nullptr) { - auto* shutdown_callback = new ShutdownCallback; - callback_cq_ = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, - shutdown_callback}); - - // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); + callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq == nullptr) { + if (grpc_iomgr_run_in_background()) { + // gRPC-core provides the backing needed for the preferred CQ type + + auto* shutdown_callback = new ShutdownCallback; + callback_cq = + new ::grpc::CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, + GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq); + } else { + // Otherwise we need to use the alternative CQ variant + callback_cq = CompletionQueue::CallbackAlternativeCQ(); + } + callback_cq_.store(callback_cq, std::memory_order_release); } - return callback_cq_; + return callback_cq; } } // namespace grpc |