summaryrefslogtreecommitdiff
path: root/grpc/src/cpp/client/channel_cc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/cpp/client/channel_cc.cc')
-rw-r--r--grpc/src/cpp/client/channel_cc.cc52
1 files changed, 38 insertions, 14 deletions
diff --git a/grpc/src/cpp/client/channel_cc.cc b/grpc/src/cpp/client/channel_cc.cc
index 1395c72e..a9f20839 100644
--- a/grpc/src/cpp/client/channel_cc.cc
+++ b/grpc/src/cpp/client/channel_cc.cc
@@ -38,7 +38,9 @@
#include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/config.h>
#include <grpcpp/support/status.h>
+
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
@@ -55,8 +57,14 @@ Channel::Channel(const std::string& host, grpc_channel* channel,
Channel::~Channel() {
grpc_channel_destroy(c_channel_);
- if (callback_cq_ != nullptr) {
- callback_cq_->Shutdown();
+ CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed);
+ if (callback_cq != nullptr) {
+ if (grpc_iomgr_run_in_background()) {
+ // gRPC-core provides the backing needed for the preferred CQ type
+ callback_cq->Shutdown();
+ } else {
+ CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
+ }
}
}
@@ -139,9 +147,9 @@ void ChannelResetConnectionBackoff(Channel* channel) {
// ClientRpcInfo should be set before call because set_call also checks
// whether the call has been cancelled, and if the call was cancelled, we
// should notify the interceptors too.
- auto* info =
- context->set_client_rpc_info(method.name(), method.method_type(), this,
- interceptor_creators_, interceptor_pos);
+ auto* info = context->set_client_rpc_info(
+ method.name(), method.suffix_for_stats(), method.method_type(), this,
+ interceptor_creators_, interceptor_pos);
context->set_call(c_call, shared_from_this());
return ::grpc::internal::Call(c_call, this, cq, info);
@@ -236,17 +244,33 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
::grpc::CompletionQueue* Channel::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-channel CQ registered
+ CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire);
+ if (callback_cq != nullptr) {
+ return callback_cq;
+ }
+ // The callback_cq_ wasn't already set, so grab a lock and set it up exactly
+ // once for this channel.
grpc::internal::MutexLock l(&mu_);
- if (callback_cq_ == nullptr) {
- auto* shutdown_callback = new ShutdownCallback;
- callback_cq_ = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
- shutdown_callback});
-
- // Transfer ownership of the new cq to its own shutdown callback
- shutdown_callback->TakeCQ(callback_cq_);
+ callback_cq = callback_cq_.load(std::memory_order_relaxed);
+ if (callback_cq == nullptr) {
+ if (grpc_iomgr_run_in_background()) {
+ // gRPC-core provides the backing needed for the preferred CQ type
+
+ auto* shutdown_callback = new ShutdownCallback;
+ callback_cq =
+ new ::grpc::CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK,
+ GRPC_CQ_DEFAULT_POLLING, shutdown_callback});
+
+ // Transfer ownership of the new cq to its own shutdown callback
+ shutdown_callback->TakeCQ(callback_cq);
+ } else {
+ // Otherwise we need to use the alternative CQ variant
+ callback_cq = CompletionQueue::CallbackAlternativeCQ();
+ }
+ callback_cq_.store(callback_cq, std::memory_order_release);
}
- return callback_cq_;
+ return callback_cq;
}
} // namespace grpc