summaryrefslogtreecommitdiff
path: root/grpc/src/cpp/server/server_cc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/cpp/server/server_cc.cc')
-rw-r--r--grpc/src/cpp/server/server_cc.cc530
1 files changed, 280 insertions, 250 deletions
diff --git a/grpc/src/cpp/server/server_cc.cc b/grpc/src/cpp/server/server_cc.cc
index 2b5486af..a334cdd8 100644
--- a/grpc/src/cpp/server/server_cc.cc
+++ b/grpc/src/cpp/server/server_cc.cc
@@ -45,7 +45,9 @@
#include "absl/memory/memory.h"
#include "src/core/ext/transport/inproc/inproc_transport.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/completion_queue.h"
@@ -65,6 +67,15 @@ namespace {
// max-threads set) to the server builder.
#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
+// Give a useful status error message if the resource is exhausted specifically
+// because the server threadpool is full.
+const char* kServerThreadpoolExhausted = "Server Threadpool Exhausted";
+
+// Although we might like to give a useful status error message on unimplemented
+// RPCs, it's not always possible since that also would need to be added across
+// languages and isn't actually required by the spec.
+const char* kUnknownRpcMethod = "";
+
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
~DefaultGlobalCallbacks() override {}
@@ -88,7 +99,7 @@ class ShutdownTag : public internal::CompletionQueueTag {
}
};
-class DummyTag : public internal::CompletionQueueTag {
+class PhonyTag : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
return true;
@@ -336,199 +347,171 @@ class Server::UnimplementedAsyncResponse final
class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
public:
- SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag)
- : method_(method),
- method_tag_(method_tag),
- in_flight_(false),
- has_request_payload_(method->method_type() ==
- grpc::internal::RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- grpc::internal::RpcMethod::SERVER_STREAMING),
- call_details_(nullptr),
- cq_(nullptr) {
- grpc_metadata_array_init(&request_metadata_);
- }
-
- ~SyncRequest() override {
- if (call_details_) {
- delete call_details_;
- }
- grpc_metadata_array_destroy(&request_metadata_);
- }
-
- void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
-
- void TeardownRequest() {
- grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
+ SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method,
+ grpc_core::Server::RegisteredCallAllocation* data)
+ : SyncRequest(server, method) {
+ CommonSetup(data);
+ data->deadline = &deadline_;
+ data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
}
- void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
- GPR_ASSERT(cq_ && !in_flight_);
- in_flight_ = true;
- if (method_tag_) {
- if (grpc_server_request_registered_call(
- server, method_tag_, &call_, &deadline_, &request_metadata_,
- has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this) != GRPC_CALL_OK) {
- TeardownRequest();
- return;
- }
- } else {
- if (!call_details_) {
- call_details_ = new grpc_call_details;
- grpc_call_details_init(call_details_);
- }
- if (grpc_server_request_call(server, &call_, call_details_,
- &request_metadata_, cq_, notify_cq,
- this) != GRPC_CALL_OK) {
- TeardownRequest();
- return;
- }
- }
+ SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method,
+ grpc_core::Server::BatchCallAllocation* data)
+ : SyncRequest(server, method) {
+ CommonSetup(data);
+ call_details_ = new grpc_call_details;
+ grpc_call_details_init(call_details_);
+ data->details = call_details_;
}
- void PostShutdownCleanup() {
- if (call_) {
- grpc_call_unref(call_);
- call_ = nullptr;
+ ~SyncRequest() override {
+ // The destructor should only cleanup those objects created in the
+ // constructor, since some paths may or may not actually go through the
+ // Run stage where other objects are allocated.
+ if (has_request_payload_ && request_payload_) {
+ grpc_byte_buffer_destroy(request_payload_);
}
- if (cq_) {
- grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
+ if (call_details_ != nullptr) {
+ grpc_call_details_destroy(call_details_);
+ delete call_details_;
}
+ grpc_metadata_array_destroy(&request_metadata_);
+ server_->UnrefWithPossibleNotify();
}
bool FinalizeResult(void** /*tag*/, bool* status) override {
if (!*status) {
- grpc_completion_queue_destroy(cq_);
- cq_ = nullptr;
+ delete this;
+ return false;
}
if (call_details_) {
deadline_ = call_details_->deadline;
- grpc_call_details_destroy(call_details_);
- grpc_call_details_init(call_details_);
}
return true;
}
- // The CallData class represents a call that is "active" as opposed
- // to just being requested. It wraps and takes ownership of the cq from
- // the call request
- class CallData final {
- public:
- explicit CallData(Server* server, SyncRequest* mrd)
- : cq_(mrd->cq_),
- ctx_(mrd->deadline_, &mrd->request_metadata_),
- has_request_payload_(mrd->has_request_payload_),
- request_payload_(has_request_payload_ ? mrd->request_payload_
- : nullptr),
- request_(nullptr),
- method_(mrd->method_),
- call_(
- mrd->call_, server, &cq_, server->max_receive_message_size(),
- ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
- server->interceptor_creators_)),
- server_(server),
- global_callbacks_(nullptr),
- resources_(false) {
- ctx_.set_call(mrd->call_);
- ctx_.cq_ = &cq_;
- GPR_ASSERT(mrd->in_flight_);
- mrd->in_flight_ = false;
- mrd->request_metadata_.count = 0;
+ void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
+ bool resources) {
+ ctx_.Init(deadline_, &request_metadata_);
+ wrapped_call_.Init(
+ call_, server_, &cq_, server_->max_receive_message_size(),
+ ctx_->ctx.set_server_rpc_info(method_->name(), method_->method_type(),
+ server_->interceptor_creators_));
+ ctx_->ctx.set_call(call_);
+ ctx_->ctx.cq_ = &cq_;
+ request_metadata_.count = 0;
+
+ global_callbacks_ = global_callbacks;
+ resources_ = resources;
+
+ interceptor_methods_.SetCall(&*wrapped_call_);
+ interceptor_methods_.SetReverse();
+ // Set interception point for RECV INITIAL METADATA
+ interceptor_methods_.AddInterceptionHookPoint(
+ grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
+ interceptor_methods_.SetRecvInitialMetadata(&ctx_->ctx.client_metadata_);
+
+ if (has_request_payload_) {
+ // Set interception point for RECV MESSAGE
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ deserialized_request_ = handler->Deserialize(call_, request_payload_,
+ &request_status_, nullptr);
+
+ request_payload_ = nullptr;
+ interceptor_methods_.AddInterceptionHookPoint(
+ grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
+ interceptor_methods_.SetRecvMessage(deserialized_request_, nullptr);
}
- ~CallData() {
- if (has_request_payload_ && request_payload_) {
- grpc_byte_buffer_destroy(request_payload_);
- }
+ if (interceptor_methods_.RunInterceptors(
+ [this]() { ContinueRunAfterInterception(); })) {
+ ContinueRunAfterInterception();
+ } else {
+ // There were interceptors to be run, so ContinueRunAfterInterception
+ // will be run when interceptors are done.
}
+ }
- void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
- bool resources) {
- global_callbacks_ = global_callbacks;
- resources_ = resources;
+ void ContinueRunAfterInterception() {
+ ctx_->ctx.BeginCompletionOp(&*wrapped_call_, nullptr, nullptr);
+ global_callbacks_->PreSynchronousRequest(&ctx_->ctx);
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
+ &*wrapped_call_, &ctx_->ctx, deserialized_request_, request_status_,
+ nullptr, nullptr));
+ global_callbacks_->PostSynchronousRequest(&ctx_->ctx);
- interceptor_methods_.SetCall(&call_);
- interceptor_methods_.SetReverse();
- // Set interception point for RECV INITIAL METADATA
- interceptor_methods_.AddInterceptionHookPoint(
- grpc::experimental::InterceptionHookPoints::
- POST_RECV_INITIAL_METADATA);
- interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
+ cq_.Shutdown();
- if (has_request_payload_) {
- // Set interception point for RECV MESSAGE
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
- request_ = handler->Deserialize(call_.call(), request_payload_,
- &request_status_, nullptr);
+ grpc::internal::CompletionQueueTag* op_tag = ctx_->ctx.GetCompletionOpTag();
+ cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
- request_payload_ = nullptr;
- interceptor_methods_.AddInterceptionHookPoint(
- grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- interceptor_methods_.SetRecvMessage(request_, nullptr);
- }
+ // Ensure the cq_ is shutdown
+ grpc::PhonyTag ignored_tag;
+ GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
- if (interceptor_methods_.RunInterceptors(
- [this]() { ContinueRunAfterInterception(); })) {
- ContinueRunAfterInterception();
- } else {
- // There were interceptors to be run, so ContinueRunAfterInterception
- // will be run when interceptors are done.
- }
- }
+ // Cleanup structures allocated during Run/ContinueRunAfterInterception
+ wrapped_call_.Destroy();
+ ctx_.Destroy();
- void ContinueRunAfterInterception() {
- {
- ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
- global_callbacks_->PreSynchronousRequest(&ctx_);
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
- handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
- &call_, &ctx_, request_, request_status_, nullptr, nullptr));
- request_ = nullptr;
- global_callbacks_->PostSynchronousRequest(&ctx_);
-
- cq_.Shutdown();
-
- grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
- cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
-
- /* Ensure the cq_ is shutdown */
- grpc::DummyTag ignored_tag;
- GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
- }
- delete this;
- }
+ delete this;
+ }
- private:
- grpc::CompletionQueue cq_;
- grpc::ServerContext ctx_;
- const bool has_request_payload_;
- grpc_byte_buffer* request_payload_;
- void* request_;
- grpc::Status request_status_;
- grpc::internal::RpcServiceMethod* const method_;
- grpc::internal::Call call_;
- Server* server_;
- std::shared_ptr<GlobalCallbacks> global_callbacks_;
- bool resources_;
- grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
- };
+ // For requests that must be only cleaned up but not actually Run
+ void Cleanup() {
+ cq_.Shutdown();
+ grpc_call_unref(call_);
+ delete this;
+ }
private:
+ SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method)
+ : server_(server),
+ method_(method),
+ has_request_payload_(method->method_type() ==
+ grpc::internal::RpcMethod::NORMAL_RPC ||
+ method->method_type() ==
+ grpc::internal::RpcMethod::SERVER_STREAMING),
+ cq_(grpc_completion_queue_create_for_pluck(nullptr)) {}
+
+ template <class CallAllocation>
+ void CommonSetup(CallAllocation* data) {
+ server_->Ref();
+ grpc_metadata_array_init(&request_metadata_);
+ data->tag = static_cast<void*>(this);
+ data->call = &call_;
+ data->initial_metadata = &request_metadata_;
+ data->cq = cq_.cq();
+ }
+
+ Server* const server_;
grpc::internal::RpcServiceMethod* const method_;
- void* const method_tag_;
- bool in_flight_;
const bool has_request_payload_;
grpc_call* call_;
- grpc_call_details* call_details_;
+ grpc_call_details* call_details_ = nullptr;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
- grpc_byte_buffer* request_payload_;
- grpc_completion_queue* cq_;
+ grpc_byte_buffer* request_payload_ = nullptr;
+ grpc::CompletionQueue cq_;
+ grpc::Status request_status_;
+ std::shared_ptr<GlobalCallbacks> global_callbacks_;
+ bool resources_;
+ void* deserialized_request_ = nullptr;
+ grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
+
+ // ServerContextWrapper allows ManualConstructor while using a private
+ // contructor of ServerContext via this friend class.
+ struct ServerContextWrapper {
+ ServerContext ctx;
+
+ ServerContextWrapper(gpr_timespec deadline, grpc_metadata_array* arr)
+ : ctx(deadline, arr) {}
+ };
+
+ grpc_core::ManualConstructor<ServerContextWrapper> ctx_;
+ grpc_core::ManualConstructor<internal::Call> wrapped_call_;
};
template <class ServerContextType>
@@ -552,7 +535,10 @@ class Server::CallbackRequest final
method->method_type() ==
grpc::internal::RpcMethod::SERVER_STREAMING),
cq_(cq),
- tag_(this) {
+ tag_(this),
+ ctx_(server_->context_allocator() != nullptr
+ ? server_->context_allocator()->NewCallbackServerContext()
+ : nullptr) {
CommonSetup(server, data);
data->deadline = &deadline_;
data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
@@ -567,7 +553,11 @@ class Server::CallbackRequest final
has_request_payload_(false),
call_details_(new grpc_call_details),
cq_(cq),
- tag_(this) {
+ tag_(this),
+ ctx_(server_->context_allocator() != nullptr
+ ? server_->context_allocator()
+ ->NewGenericCallbackServerContext()
+ : nullptr) {
CommonSetup(server, data);
grpc_call_details_init(call_details_);
data->details = call_details_;
@@ -579,6 +569,9 @@ class Server::CallbackRequest final
if (has_request_payload_ && request_payload_) {
grpc_byte_buffer_destroy(request_payload_);
}
+ if (ctx_alloc_by_default_ || server_->context_allocator() == nullptr) {
+ default_ctx_.Destroy();
+ }
server_->UnrefWithPossibleNotify();
}
@@ -631,10 +624,10 @@ class Server::CallbackRequest final
}
// Bind the call, deadline, and metadata from what we got
- req_->ctx_.set_call(req_->call_);
- req_->ctx_.cq_ = req_->cq_;
- req_->ctx_.BindDeadlineAndMetadata(req_->deadline_,
- &req_->request_metadata_);
+ req_->ctx_->set_call(req_->call_);
+ req_->ctx_->cq_ = req_->cq_;
+ req_->ctx_->BindDeadlineAndMetadata(req_->deadline_,
+ &req_->request_metadata_);
req_->request_metadata_.count = 0;
// Create a C++ Call to control the underlying core call
@@ -643,7 +636,7 @@ class Server::CallbackRequest final
grpc::internal::Call(
req_->call_, req_->server_, req_->cq_,
req_->server_->max_receive_message_size(),
- req_->ctx_.set_server_rpc_info(
+ req_->ctx_->set_server_rpc_info(
req_->method_name(),
(req_->method_ != nullptr)
? req_->method_->method_type()
@@ -657,7 +650,7 @@ class Server::CallbackRequest final
grpc::experimental::InterceptionHookPoints::
POST_RECV_INITIAL_METADATA);
req_->interceptor_methods_.SetRecvInitialMetadata(
- &req_->ctx_.client_metadata_);
+ &req_->ctx_->client_metadata_);
if (req_->has_request_payload_) {
// Set interception point for RECV MESSAGE
@@ -683,7 +676,7 @@ class Server::CallbackRequest final
? req_->method_->handler()
: req_->server_->generic_handler_.get();
handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
- call_, &req_->ctx_, req_->request_, req_->request_status_,
+ call_, req_->ctx_, req_->request_, req_->request_status_,
req_->handler_data_, [this] { delete req_; }));
}
};
@@ -692,9 +685,16 @@ class Server::CallbackRequest final
void CommonSetup(Server* server, CallAllocation* data) {
server->Ref();
grpc_metadata_array_init(&request_metadata_);
- data->tag = &tag_;
+ data->tag = static_cast<void*>(&tag_);
data->call = &call_;
data->initial_metadata = &request_metadata_;
+ if (ctx_ == nullptr) {
+ default_ctx_.Init();
+ ctx_ = &*default_ctx_;
+ ctx_alloc_by_default_ = true;
+ }
+ ctx_->set_context_allocator(server->context_allocator());
+ data->cq = cq_->cq();
}
Server* const server_;
@@ -709,8 +709,10 @@ class Server::CallbackRequest final
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
grpc::CompletionQueue* const cq_;
+ bool ctx_alloc_by_default_ = false;
CallbackCallTag tag_;
- ServerContextType ctx_;
+ ServerContextType* ctx_ = nullptr;
+ grpc_core::ManualConstructor<ServerContextType> default_ctx_;
grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
@@ -727,8 +729,8 @@ bool Server::CallbackRequest<
if (*status) {
deadline_ = call_details_->deadline;
// TODO(yangg) remove the copy here
- ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method);
- ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host);
+ ctx_->method_ = grpc::StringFromCopiedSlice(call_details_->method);
+ ctx_->host_ = grpc::StringFromCopiedSlice(call_details_->host);
}
grpc_slice_unref(call_details_->method);
grpc_slice_unref(call_details_->host);
@@ -744,7 +746,7 @@ const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name()
template <>
const char* Server::CallbackRequest<
grpc::GenericCallbackServerContext>::method_name() const {
- return ctx_.method().c_str();
+ return ctx_->method().c_str();
}
// Implementation of ThreadManager. Each instance of SyncRequestThreadManager
@@ -783,44 +785,39 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
}
void DoWork(void* tag, bool ok, bool resources) override {
+ (void)ok;
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
- if (!sync_req) {
- // No tag. Nothing to work on. This is an unlikley scenario and possibly a
- // bug in RPC Manager implementation.
- gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag");
- return;
- }
-
- if (ok) {
- // Calldata takes ownership of the completion queue and interceptors
- // inside sync_req
- auto* cd = new SyncRequest::CallData(server_, sync_req);
- // Prepare for the next request
- if (!IsShutdown()) {
- sync_req->SetupRequest(); // Create new completion queue for sync_req
- sync_req->Request(server_->c_server(), server_cq_->cq());
- }
+ // Under the AllocatingRequestMatcher model we will never see an invalid tag
+ // here.
+ GPR_DEBUG_ASSERT(sync_req != nullptr);
+ GPR_DEBUG_ASSERT(ok);
- GPR_TIMER_SCOPE("cd.Run()", 0);
- cd->Run(global_callbacks_, resources);
- }
- // TODO (sreek) If ok is false here (which it isn't in case of
- // grpc_request_registered_call), we should still re-queue the request
- // object
+ GPR_TIMER_SCOPE("sync_req->Run()", 0);
+ sync_req->Run(global_callbacks_, resources);
}
void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) {
- sync_requests_.emplace_back(new SyncRequest(method, tag));
+ server_->server()->core_server->SetRegisteredMethodAllocator(
+ server_cq_->cq(), tag, [this, method] {
+ grpc_core::Server::RegisteredCallAllocation result;
+ new SyncRequest(server_, method, &result);
+ return result;
+ });
+ has_sync_method_ = true;
}
void AddUnknownSyncMethod() {
- if (!sync_requests_.empty()) {
+ if (has_sync_method_) {
unknown_method_ = absl::make_unique<grpc::internal::RpcServiceMethod>(
"unknown", grpc::internal::RpcMethod::BIDI_STREAMING,
- new grpc::internal::UnknownMethodHandler);
- sync_requests_.emplace_back(
- new SyncRequest(unknown_method_.get(), nullptr));
+ new grpc::internal::UnknownMethodHandler(kUnknownRpcMethod));
+ server_->server()->core_server->SetBatchMethodAllocator(
+ server_cq_->cq(), [this] {
+ grpc_core::Server::BatchCallAllocation result;
+ new SyncRequest(server_, unknown_method_.get(), &result);
+ return result;
+ });
}
}
@@ -835,27 +832,14 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
void* tag;
bool ok;
while (server_cq_->Next(&tag, &ok)) {
- if (ok) {
- // If a request was pulled off the queue, it means that the thread
- // handling the request added it to the completion queue after shutdown
- // was called - because the thread had already started and checked the
- // shutdown flag before shutdown was called. In this case, we simply
- // clean it up here, *after* calling wait on all the worker threads, at
- // which point we are certain no in-flight requests will add more to the
- // queue. This fixes an intermittent memory leak on shutdown.
- SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
- sync_req->PostShutdownCleanup();
- }
+ // This problem can arise if the server CQ gets a request queued to it
+ // before it gets shutdown but then pulls it after shutdown.
+ static_cast<SyncRequest*>(tag)->Cleanup();
}
}
void Start() {
- if (!sync_requests_.empty()) {
- for (const auto& value : sync_requests_) {
- value->SetupRequest();
- value->Request(server_->c_server(), server_cq_->cq());
- }
-
+ if (has_sync_method_) {
Initialize(); // ThreadManager's Initialize()
}
}
@@ -864,7 +848,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager {
Server* server_;
grpc::CompletionQueue* server_cq_;
int cq_timeout_msec_;
- std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
+ bool has_sync_method_ = false;
std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
};
@@ -948,16 +932,23 @@ Server::~Server() {
{
grpc::internal::ReleasableMutexLock lock(&mu_);
if (started_ && !shutdown_) {
- lock.Unlock();
+ lock.Release();
Shutdown();
} else if (!started_) {
// Shutdown the completion queues
for (const auto& value : sync_req_mgrs_) {
value->Shutdown();
}
- if (callback_cq_ != nullptr) {
- callback_cq_->Shutdown();
- callback_cq_ = nullptr;
+ CompletionQueue* callback_cq =
+ callback_cq_.load(std::memory_order_relaxed);
+ if (callback_cq != nullptr) {
+ if (grpc_iomgr_run_in_background()) {
+ // gRPC-core provides the backing needed for the preferred CQ type
+ callback_cq->Shutdown();
+ } else {
+ CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
+ }
+ callback_cq_.store(nullptr, std::memory_order_release);
}
}
}
@@ -1124,7 +1115,9 @@ void Server::UnrefAndWaitLocked() {
shutdown_done_ = true;
return; // no need to wait on CV since done condition already set
}
- shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; });
+ grpc::internal::WaitUntil(
+ &shutdown_done_cv_, &mu_,
+ [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return shutdown_done_; });
}
void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
@@ -1173,13 +1166,27 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
}
#endif
- grpc_server_start(server_);
+ // If we have a generic service, all unmatched method names go there.
+ // Otherwise, we must provide at least one RPC request for an "unimplemented"
+ // RPC, which covers any RPC for a method name that isn't matched. If we
+ // have a sync service, let it be a sync unimplemented RPC, which must be
+ // registered before server start (to initialize an AllocatingRequestMatcher).
+ // If we have an AllocatingRequestMatcher, we can't also specify other
+ // unimplemented RPCs via explicit async requests, so we won't do so. If we
+ // only have async services, we can specify unimplemented RPCs on each async
+ // CQ so that some user polling thread will move them along as long as some
+ // progress is being made on any RPCs in the system.
+ bool unknown_rpc_needed =
+ !has_async_generic_service_ && !has_callback_generic_service_;
+
+ if (unknown_rpc_needed && !sync_req_mgrs_.empty()) {
+ sync_req_mgrs_[0]->AddUnknownSyncMethod();
+ unknown_rpc_needed = false;
+ }
- if (!has_async_generic_service_ && !has_callback_generic_service_) {
- for (const auto& value : sync_req_mgrs_) {
- value->AddUnknownSyncMethod();
- }
+ grpc_server_start(server_);
+ if (unknown_rpc_needed) {
for (size_t i = 0; i < num_cqs; i++) {
if (cqs[i]->IsFrequentlyPolled()) {
new UnimplementedAsyncRequest(this, cqs[i]);
@@ -1188,6 +1195,7 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
if (health_check_cq != nullptr) {
new UnimplementedAsyncRequest(this, health_check_cq);
}
+ unknown_rpc_needed = false;
}
// If this server has any support for synchronous methods (has any sync
@@ -1195,7 +1203,8 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// to deal with the case of thread exhaustion
if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
resource_exhausted_handler_ =
- absl::make_unique<grpc::internal::ResourceExhaustedHandler>();
+ absl::make_unique<grpc::internal::ResourceExhaustedHandler>(
+ kServerThreadpoolExhausted);
}
for (const auto& value : sync_req_mgrs_) {
@@ -1225,7 +1234,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
/// The completion queue to use for server shutdown completion notification
grpc::CompletionQueue shutdown_cq;
- grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag
+ grpc::ShutdownTag shutdown_tag; // Phony shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
shutdown_cq.Shutdown();
@@ -1243,6 +1252,9 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
+ // Drop the shutdown ref and wait for all other refs to drop as well.
+ UnrefAndWaitLocked();
+
// Shutdown all ThreadManagers. This will try to gracefully stop all the
// threads in the ThreadManagers (once they process any inflight requests)
for (const auto& value : sync_req_mgrs_) {
@@ -1254,14 +1266,17 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
value->Wait();
}
- // Drop the shutdown ref and wait for all other refs to drop as well.
- UnrefAndWaitLocked();
-
// Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
// will delete itself at true shutdown.
- if (callback_cq_ != nullptr) {
- callback_cq_->Shutdown();
- callback_cq_ = nullptr;
+ CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed);
+ if (callback_cq != nullptr) {
+ if (grpc_iomgr_run_in_background()) {
+ // gRPC-core provides the backing needed for the preferred CQ type
+ callback_cq->Shutdown();
+ } else {
+ CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
+ }
+ callback_cq_.store(nullptr, std::memory_order_release);
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
@@ -1271,7 +1286,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
}
shutdown_notified_ = true;
- shutdown_cv_.Broadcast();
+ shutdown_cv_.SignalAll();
#ifndef NDEBUG
// Unregister this server with the CQs passed into it by the user so that
@@ -1316,8 +1331,9 @@ bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
UnimplementedAsyncRequest* request)
: request_(request) {
- grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, "");
- grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this);
+ grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, kUnknownRpcMethod);
+ grpc::internal::UnknownMethodHandler::FillOps(request_->context(),
+ kUnknownRpcMethod, this);
request_->stream()->call_.PerformOps(this);
}
@@ -1328,19 +1344,33 @@ grpc::ServerInitializer* Server::initializer() {
grpc::CompletionQueue* Server::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-server CQ registered
+ CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire);
+ if (callback_cq != nullptr) {
+ return callback_cq;
+ }
+ // The callback_cq_ wasn't already set, so grab a lock and set it up exactly
+ // once for this server.
grpc::internal::MutexLock l(&mu_);
- if (callback_cq_ != nullptr) {
- return callback_cq_;
+ callback_cq = callback_cq_.load(std::memory_order_relaxed);
+ if (callback_cq != nullptr) {
+ return callback_cq;
+ }
+ if (grpc_iomgr_run_in_background()) {
+ // gRPC-core provides the backing needed for the preferred CQ type
+ auto* shutdown_callback = new grpc::ShutdownCallback;
+ callback_cq = new grpc::CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
+ shutdown_callback});
+
+ // Transfer ownership of the new cq to its own shutdown callback
+ shutdown_callback->TakeCQ(callback_cq);
+ } else {
+ // Otherwise we need to use the alternative CQ variant
+ callback_cq = CompletionQueue::CallbackAlternativeCQ();
}
- auto* shutdown_callback = new grpc::ShutdownCallback;
- callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
- shutdown_callback});
-
- // Transfer ownership of the new cq to its own shutdown callback
- shutdown_callback->TakeCQ(callback_cq_);
- return callback_cq_;
+ callback_cq_.store(callback_cq, std::memory_order_release);
+ return callback_cq;
}
} // namespace grpc