diff options
Diffstat (limited to 'grpc/src/cpp/server/server_cc.cc')
-rw-r--r-- | grpc/src/cpp/server/server_cc.cc | 530 |
1 files changed, 280 insertions, 250 deletions
diff --git a/grpc/src/cpp/server/server_cc.cc b/grpc/src/cpp/server/server_cc.cc index 2b5486af..a334cdd8 100644 --- a/grpc/src/cpp/server/server_cc.cc +++ b/grpc/src/cpp/server/server_cc.cc @@ -45,7 +45,9 @@ #include "absl/memory/memory.h" #include "src/core/ext/transport/inproc/inproc_transport.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/completion_queue.h" @@ -65,6 +67,15 @@ namespace { // max-threads set) to the server builder. #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX +// Give a useful status error message if the resource is exhausted specifically +// because the server threadpool is full. +const char* kServerThreadpoolExhausted = "Server Threadpool Exhausted"; + +// Although we might like to give a useful status error message on unimplemented +// RPCs, it's not always possible since that also would need to be added across +// languages and isn't actually required by the spec. +const char* kUnknownRpcMethod = ""; + class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} @@ -88,7 +99,7 @@ class ShutdownTag : public internal::CompletionQueueTag { } }; -class DummyTag : public internal::CompletionQueueTag { +class PhonyTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** /*tag*/, bool* /*status*/) override { return true; @@ -336,199 +347,171 @@ class Server::UnimplementedAsyncResponse final class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { public: - SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) - : method_(method), - method_tag_(method_tag), - in_flight_(false), - has_request_payload_(method->method_type() == - grpc::internal::RpcMethod::NORMAL_RPC || - method->method_type() == - grpc::internal::RpcMethod::SERVER_STREAMING), - call_details_(nullptr), - cq_(nullptr) { - grpc_metadata_array_init(&request_metadata_); - } - - ~SyncRequest() override { - if (call_details_) { - delete call_details_; - } - grpc_metadata_array_destroy(&request_metadata_); - } - - void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); } - - void TeardownRequest() { - grpc_completion_queue_destroy(cq_); - cq_ = nullptr; + SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method, + grpc_core::Server::RegisteredCallAllocation* data) + : SyncRequest(server, method) { + CommonSetup(data); + data->deadline = &deadline_; + data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; } - void Request(grpc_server* server, grpc_completion_queue* notify_cq) { - GPR_ASSERT(cq_ && !in_flight_); - in_flight_ = true; - if (method_tag_) { - if (grpc_server_request_registered_call( - server, method_tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this) != GRPC_CALL_OK) { - TeardownRequest(); - return; - } - } else { - if (!call_details_) { - call_details_ = new grpc_call_details; - grpc_call_details_init(call_details_); - } - if (grpc_server_request_call(server, &call_, call_details_, - &request_metadata_, cq_, notify_cq, - this) != GRPC_CALL_OK) { - TeardownRequest(); - return; - } - } + SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method, + grpc_core::Server::BatchCallAllocation* data) + : SyncRequest(server, method) { + CommonSetup(data); + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + data->details = call_details_; } - void PostShutdownCleanup() { - if (call_) { - grpc_call_unref(call_); - call_ = nullptr; + ~SyncRequest() override { + // The destructor should only cleanup those objects created in the + // constructor, since some paths may or may not actually go through the + // Run stage where other objects are allocated. + if (has_request_payload_ && request_payload_) { + grpc_byte_buffer_destroy(request_payload_); } - if (cq_) { - grpc_completion_queue_destroy(cq_); - cq_ = nullptr; + if (call_details_ != nullptr) { + grpc_call_details_destroy(call_details_); + delete call_details_; } + grpc_metadata_array_destroy(&request_metadata_); + server_->UnrefWithPossibleNotify(); } bool FinalizeResult(void** /*tag*/, bool* status) override { if (!*status) { - grpc_completion_queue_destroy(cq_); - cq_ = nullptr; + delete this; + return false; } if (call_details_) { deadline_ = call_details_->deadline; - grpc_call_details_destroy(call_details_); - grpc_call_details_init(call_details_); } return true; } - // The CallData class represents a call that is "active" as opposed - // to just being requested. It wraps and takes ownership of the cq from - // the call request - class CallData final { - public: - explicit CallData(Server* server, SyncRequest* mrd) - : cq_(mrd->cq_), - ctx_(mrd->deadline_, &mrd->request_metadata_), - has_request_payload_(mrd->has_request_payload_), - request_payload_(has_request_payload_ ? mrd->request_payload_ - : nullptr), - request_(nullptr), - method_(mrd->method_), - call_( - mrd->call_, server, &cq_, server->max_receive_message_size(), - ctx_.set_server_rpc_info(method_->name(), method_->method_type(), - server->interceptor_creators_)), - server_(server), - global_callbacks_(nullptr), - resources_(false) { - ctx_.set_call(mrd->call_); - ctx_.cq_ = &cq_; - GPR_ASSERT(mrd->in_flight_); - mrd->in_flight_ = false; - mrd->request_metadata_.count = 0; + void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, + bool resources) { + ctx_.Init(deadline_, &request_metadata_); + wrapped_call_.Init( + call_, server_, &cq_, server_->max_receive_message_size(), + ctx_->ctx.set_server_rpc_info(method_->name(), method_->method_type(), + server_->interceptor_creators_)); + ctx_->ctx.set_call(call_); + ctx_->ctx.cq_ = &cq_; + request_metadata_.count = 0; + + global_callbacks_ = global_callbacks; + resources_ = resources; + + interceptor_methods_.SetCall(&*wrapped_call_); + interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + interceptor_methods_.SetRecvInitialMetadata(&ctx_->ctx.client_metadata_); + + if (has_request_payload_) { + // Set interception point for RECV MESSAGE + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + deserialized_request_ = handler->Deserialize(call_, request_payload_, + &request_status_, nullptr); + + request_payload_ = nullptr; + interceptor_methods_.AddInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + interceptor_methods_.SetRecvMessage(deserialized_request_, nullptr); } - ~CallData() { - if (has_request_payload_ && request_payload_) { - grpc_byte_buffer_destroy(request_payload_); - } + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. } + } - void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, - bool resources) { - global_callbacks_ = global_callbacks; - resources_ = resources; + void ContinueRunAfterInterception() { + ctx_->ctx.BeginCompletionOp(&*wrapped_call_, nullptr, nullptr); + global_callbacks_->PreSynchronousRequest(&ctx_->ctx); + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( + &*wrapped_call_, &ctx_->ctx, deserialized_request_, request_status_, + nullptr, nullptr)); + global_callbacks_->PostSynchronousRequest(&ctx_->ctx); - interceptor_methods_.SetCall(&call_); - interceptor_methods_.SetReverse(); - // Set interception point for RECV INITIAL METADATA - interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints:: - POST_RECV_INITIAL_METADATA); - interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); + cq_.Shutdown(); - if (has_request_payload_) { - // Set interception point for RECV MESSAGE - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); - request_ = handler->Deserialize(call_.call(), request_payload_, - &request_status_, nullptr); + grpc::internal::CompletionQueueTag* op_tag = ctx_->ctx.GetCompletionOpTag(); + cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); - request_payload_ = nullptr; - interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - interceptor_methods_.SetRecvMessage(request_, nullptr); - } + // Ensure the cq_ is shutdown + grpc::PhonyTag ignored_tag; + GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); - if (interceptor_methods_.RunInterceptors( - [this]() { ContinueRunAfterInterception(); })) { - ContinueRunAfterInterception(); - } else { - // There were interceptors to be run, so ContinueRunAfterInterception - // will be run when interceptors are done. - } - } + // Cleanup structures allocated during Run/ContinueRunAfterInterception + wrapped_call_.Destroy(); + ctx_.Destroy(); - void ContinueRunAfterInterception() { - { - ctx_.BeginCompletionOp(&call_, nullptr, nullptr); - global_callbacks_->PreSynchronousRequest(&ctx_); - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); - handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_, request_status_, nullptr, nullptr)); - request_ = nullptr; - global_callbacks_->PostSynchronousRequest(&ctx_); - - cq_.Shutdown(); - - grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); - cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); - - /* Ensure the cq_ is shutdown */ - grpc::DummyTag ignored_tag; - GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); - } - delete this; - } + delete this; + } - private: - grpc::CompletionQueue cq_; - grpc::ServerContext ctx_; - const bool has_request_payload_; - grpc_byte_buffer* request_payload_; - void* request_; - grpc::Status request_status_; - grpc::internal::RpcServiceMethod* const method_; - grpc::internal::Call call_; - Server* server_; - std::shared_ptr<GlobalCallbacks> global_callbacks_; - bool resources_; - grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; - }; + // For requests that must be only cleaned up but not actually Run + void Cleanup() { + cq_.Shutdown(); + grpc_call_unref(call_); + delete this; + } private: + SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method) + : server_(server), + method_(method), + has_request_payload_(method->method_type() == + grpc::internal::RpcMethod::NORMAL_RPC || + method->method_type() == + grpc::internal::RpcMethod::SERVER_STREAMING), + cq_(grpc_completion_queue_create_for_pluck(nullptr)) {} + + template <class CallAllocation> + void CommonSetup(CallAllocation* data) { + server_->Ref(); + grpc_metadata_array_init(&request_metadata_); + data->tag = static_cast<void*>(this); + data->call = &call_; + data->initial_metadata = &request_metadata_; + data->cq = cq_.cq(); + } + + Server* const server_; grpc::internal::RpcServiceMethod* const method_; - void* const method_tag_; - bool in_flight_; const bool has_request_payload_; grpc_call* call_; - grpc_call_details* call_details_; + grpc_call_details* call_details_ = nullptr; gpr_timespec deadline_; grpc_metadata_array request_metadata_; - grpc_byte_buffer* request_payload_; - grpc_completion_queue* cq_; + grpc_byte_buffer* request_payload_ = nullptr; + grpc::CompletionQueue cq_; + grpc::Status request_status_; + std::shared_ptr<GlobalCallbacks> global_callbacks_; + bool resources_; + void* deserialized_request_ = nullptr; + grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; + + // ServerContextWrapper allows ManualConstructor while using a private + // contructor of ServerContext via this friend class. + struct ServerContextWrapper { + ServerContext ctx; + + ServerContextWrapper(gpr_timespec deadline, grpc_metadata_array* arr) + : ctx(deadline, arr) {} + }; + + grpc_core::ManualConstructor<ServerContextWrapper> ctx_; + grpc_core::ManualConstructor<internal::Call> wrapped_call_; }; template <class ServerContextType> @@ -552,7 +535,10 @@ class Server::CallbackRequest final method->method_type() == grpc::internal::RpcMethod::SERVER_STREAMING), cq_(cq), - tag_(this) { + tag_(this), + ctx_(server_->context_allocator() != nullptr + ? server_->context_allocator()->NewCallbackServerContext() + : nullptr) { CommonSetup(server, data); data->deadline = &deadline_; data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; @@ -567,7 +553,11 @@ class Server::CallbackRequest final has_request_payload_(false), call_details_(new grpc_call_details), cq_(cq), - tag_(this) { + tag_(this), + ctx_(server_->context_allocator() != nullptr + ? server_->context_allocator() + ->NewGenericCallbackServerContext() + : nullptr) { CommonSetup(server, data); grpc_call_details_init(call_details_); data->details = call_details_; @@ -579,6 +569,9 @@ class Server::CallbackRequest final if (has_request_payload_ && request_payload_) { grpc_byte_buffer_destroy(request_payload_); } + if (ctx_alloc_by_default_ || server_->context_allocator() == nullptr) { + default_ctx_.Destroy(); + } server_->UnrefWithPossibleNotify(); } @@ -631,10 +624,10 @@ class Server::CallbackRequest final } // Bind the call, deadline, and metadata from what we got - req_->ctx_.set_call(req_->call_); - req_->ctx_.cq_ = req_->cq_; - req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, - &req_->request_metadata_); + req_->ctx_->set_call(req_->call_); + req_->ctx_->cq_ = req_->cq_; + req_->ctx_->BindDeadlineAndMetadata(req_->deadline_, + &req_->request_metadata_); req_->request_metadata_.count = 0; // Create a C++ Call to control the underlying core call @@ -643,7 +636,7 @@ class Server::CallbackRequest final grpc::internal::Call( req_->call_, req_->server_, req_->cq_, req_->server_->max_receive_message_size(), - req_->ctx_.set_server_rpc_info( + req_->ctx_->set_server_rpc_info( req_->method_name(), (req_->method_ != nullptr) ? req_->method_->method_type() @@ -657,7 +650,7 @@ class Server::CallbackRequest final grpc::experimental::InterceptionHookPoints:: POST_RECV_INITIAL_METADATA); req_->interceptor_methods_.SetRecvInitialMetadata( - &req_->ctx_.client_metadata_); + &req_->ctx_->client_metadata_); if (req_->has_request_payload_) { // Set interception point for RECV MESSAGE @@ -683,7 +676,7 @@ class Server::CallbackRequest final ? req_->method_->handler() : req_->server_->generic_handler_.get(); handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( - call_, &req_->ctx_, req_->request_, req_->request_status_, + call_, req_->ctx_, req_->request_, req_->request_status_, req_->handler_data_, [this] { delete req_; })); } }; @@ -692,9 +685,16 @@ class Server::CallbackRequest final void CommonSetup(Server* server, CallAllocation* data) { server->Ref(); grpc_metadata_array_init(&request_metadata_); - data->tag = &tag_; + data->tag = static_cast<void*>(&tag_); data->call = &call_; data->initial_metadata = &request_metadata_; + if (ctx_ == nullptr) { + default_ctx_.Init(); + ctx_ = &*default_ctx_; + ctx_alloc_by_default_ = true; + } + ctx_->set_context_allocator(server->context_allocator()); + data->cq = cq_->cq(); } Server* const server_; @@ -709,8 +709,10 @@ class Server::CallbackRequest final gpr_timespec deadline_; grpc_metadata_array request_metadata_; grpc::CompletionQueue* const cq_; + bool ctx_alloc_by_default_ = false; CallbackCallTag tag_; - ServerContextType ctx_; + ServerContextType* ctx_ = nullptr; + grpc_core::ManualConstructor<ServerContextType> default_ctx_; grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; @@ -727,8 +729,8 @@ bool Server::CallbackRequest< if (*status) { deadline_ = call_details_->deadline; // TODO(yangg) remove the copy here - ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method); - ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host); + ctx_->method_ = grpc::StringFromCopiedSlice(call_details_->method); + ctx_->host_ = grpc::StringFromCopiedSlice(call_details_->host); } grpc_slice_unref(call_details_->method); grpc_slice_unref(call_details_->host); @@ -744,7 +746,7 @@ const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name() template <> const char* Server::CallbackRequest< grpc::GenericCallbackServerContext>::method_name() const { - return ctx_.method().c_str(); + return ctx_->method().c_str(); } // Implementation of ThreadManager. Each instance of SyncRequestThreadManager @@ -783,44 +785,39 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { } void DoWork(void* tag, bool ok, bool resources) override { + (void)ok; SyncRequest* sync_req = static_cast<SyncRequest*>(tag); - if (!sync_req) { - // No tag. Nothing to work on. This is an unlikley scenario and possibly a - // bug in RPC Manager implementation. - gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag"); - return; - } - - if (ok) { - // Calldata takes ownership of the completion queue and interceptors - // inside sync_req - auto* cd = new SyncRequest::CallData(server_, sync_req); - // Prepare for the next request - if (!IsShutdown()) { - sync_req->SetupRequest(); // Create new completion queue for sync_req - sync_req->Request(server_->c_server(), server_cq_->cq()); - } + // Under the AllocatingRequestMatcher model we will never see an invalid tag + // here. + GPR_DEBUG_ASSERT(sync_req != nullptr); + GPR_DEBUG_ASSERT(ok); - GPR_TIMER_SCOPE("cd.Run()", 0); - cd->Run(global_callbacks_, resources); - } - // TODO (sreek) If ok is false here (which it isn't in case of - // grpc_request_registered_call), we should still re-queue the request - // object + GPR_TIMER_SCOPE("sync_req->Run()", 0); + sync_req->Run(global_callbacks_, resources); } void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { - sync_requests_.emplace_back(new SyncRequest(method, tag)); + server_->server()->core_server->SetRegisteredMethodAllocator( + server_cq_->cq(), tag, [this, method] { + grpc_core::Server::RegisteredCallAllocation result; + new SyncRequest(server_, method, &result); + return result; + }); + has_sync_method_ = true; } void AddUnknownSyncMethod() { - if (!sync_requests_.empty()) { + if (has_sync_method_) { unknown_method_ = absl::make_unique<grpc::internal::RpcServiceMethod>( "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, - new grpc::internal::UnknownMethodHandler); - sync_requests_.emplace_back( - new SyncRequest(unknown_method_.get(), nullptr)); + new grpc::internal::UnknownMethodHandler(kUnknownRpcMethod)); + server_->server()->core_server->SetBatchMethodAllocator( + server_cq_->cq(), [this] { + grpc_core::Server::BatchCallAllocation result; + new SyncRequest(server_, unknown_method_.get(), &result); + return result; + }); } } @@ -835,27 +832,14 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { void* tag; bool ok; while (server_cq_->Next(&tag, &ok)) { - if (ok) { - // If a request was pulled off the queue, it means that the thread - // handling the request added it to the completion queue after shutdown - // was called - because the thread had already started and checked the - // shutdown flag before shutdown was called. In this case, we simply - // clean it up here, *after* calling wait on all the worker threads, at - // which point we are certain no in-flight requests will add more to the - // queue. This fixes an intermittent memory leak on shutdown. - SyncRequest* sync_req = static_cast<SyncRequest*>(tag); - sync_req->PostShutdownCleanup(); - } + // This problem can arise if the server CQ gets a request queued to it + // before it gets shutdown but then pulls it after shutdown. + static_cast<SyncRequest*>(tag)->Cleanup(); } } void Start() { - if (!sync_requests_.empty()) { - for (const auto& value : sync_requests_) { - value->SetupRequest(); - value->Request(server_->c_server(), server_cq_->cq()); - } - + if (has_sync_method_) { Initialize(); // ThreadManager's Initialize() } } @@ -864,7 +848,7 @@ class Server::SyncRequestThreadManager : public grpc::ThreadManager { Server* server_; grpc::CompletionQueue* server_cq_; int cq_timeout_msec_; - std::vector<std::unique_ptr<SyncRequest>> sync_requests_; + bool has_sync_method_ = false; std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; @@ -948,16 +932,23 @@ Server::~Server() { { grpc::internal::ReleasableMutexLock lock(&mu_); if (started_ && !shutdown_) { - lock.Unlock(); + lock.Release(); Shutdown(); } else if (!started_) { // Shutdown the completion queues for (const auto& value : sync_req_mgrs_) { value->Shutdown(); } - if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); - callback_cq_ = nullptr; + CompletionQueue* callback_cq = + callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { + if (grpc_iomgr_run_in_background()) { + // gRPC-core provides the backing needed for the preferred CQ type + callback_cq->Shutdown(); + } else { + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); + } + callback_cq_.store(nullptr, std::memory_order_release); } } } @@ -1124,7 +1115,9 @@ void Server::UnrefAndWaitLocked() { shutdown_done_ = true; return; // no need to wait on CV since done condition already set } - shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; }); + grpc::internal::WaitUntil( + &shutdown_done_cv_, &mu_, + [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return shutdown_done_; }); } void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { @@ -1173,13 +1166,27 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { } #endif - grpc_server_start(server_); + // If we have a generic service, all unmatched method names go there. + // Otherwise, we must provide at least one RPC request for an "unimplemented" + // RPC, which covers any RPC for a method name that isn't matched. If we + // have a sync service, let it be a sync unimplemented RPC, which must be + // registered before server start (to initialize an AllocatingRequestMatcher). + // If we have an AllocatingRequestMatcher, we can't also specify other + // unimplemented RPCs via explicit async requests, so we won't do so. If we + // only have async services, we can specify unimplemented RPCs on each async + // CQ so that some user polling thread will move them along as long as some + // progress is being made on any RPCs in the system. + bool unknown_rpc_needed = + !has_async_generic_service_ && !has_callback_generic_service_; + + if (unknown_rpc_needed && !sync_req_mgrs_.empty()) { + sync_req_mgrs_[0]->AddUnknownSyncMethod(); + unknown_rpc_needed = false; + } - if (!has_async_generic_service_ && !has_callback_generic_service_) { - for (const auto& value : sync_req_mgrs_) { - value->AddUnknownSyncMethod(); - } + grpc_server_start(server_); + if (unknown_rpc_needed) { for (size_t i = 0; i < num_cqs; i++) { if (cqs[i]->IsFrequentlyPolled()) { new UnimplementedAsyncRequest(this, cqs[i]); @@ -1188,6 +1195,7 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { if (health_check_cq != nullptr) { new UnimplementedAsyncRequest(this, health_check_cq); } + unknown_rpc_needed = false; } // If this server has any support for synchronous methods (has any sync @@ -1195,7 +1203,8 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // to deal with the case of thread exhaustion if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { resource_exhausted_handler_ = - absl::make_unique<grpc::internal::ResourceExhaustedHandler>(); + absl::make_unique<grpc::internal::ResourceExhaustedHandler>( + kServerThreadpoolExhausted); } for (const auto& value : sync_req_mgrs_) { @@ -1225,7 +1234,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { /// The completion queue to use for server shutdown completion notification grpc::CompletionQueue shutdown_cq; - grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag + grpc::ShutdownTag shutdown_tag; // Phony shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); shutdown_cq.Shutdown(); @@ -1243,6 +1252,9 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has // successfully shutdown + // Drop the shutdown ref and wait for all other refs to drop as well. + UnrefAndWaitLocked(); + // Shutdown all ThreadManagers. This will try to gracefully stop all the // threads in the ThreadManagers (once they process any inflight requests) for (const auto& value : sync_req_mgrs_) { @@ -1254,14 +1266,17 @@ void Server::ShutdownInternal(gpr_timespec deadline) { value->Wait(); } - // Drop the shutdown ref and wait for all other refs to drop as well. - UnrefAndWaitLocked(); - // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it // will delete itself at true shutdown. - if (callback_cq_ != nullptr) { - callback_cq_->Shutdown(); - callback_cq_ = nullptr; + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { + if (grpc_iomgr_run_in_background()) { + // gRPC-core provides the backing needed for the preferred CQ type + callback_cq->Shutdown(); + } else { + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); + } + callback_cq_.store(nullptr, std::memory_order_release); } // Drain the shutdown queue (if the previous call to AsyncNext() timed out @@ -1271,7 +1286,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } shutdown_notified_ = true; - shutdown_cv_.Broadcast(); + shutdown_cv_.SignalAll(); #ifndef NDEBUG // Unregister this server with the CQs passed into it by the user so that @@ -1316,8 +1331,9 @@ bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { - grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, ""); - grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this); + grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, kUnknownRpcMethod); + grpc::internal::UnknownMethodHandler::FillOps(request_->context(), + kUnknownRpcMethod, this); request_->stream()->call_.PerformOps(this); } @@ -1328,19 +1344,33 @@ grpc::ServerInitializer* Server::initializer() { grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire); + if (callback_cq != nullptr) { + return callback_cq; + } + // The callback_cq_ wasn't already set, so grab a lock and set it up exactly + // once for this server. grpc::internal::MutexLock l(&mu_); - if (callback_cq_ != nullptr) { - return callback_cq_; + callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { + return callback_cq; + } + if (grpc_iomgr_run_in_background()) { + // gRPC-core provides the backing needed for the preferred CQ type + auto* shutdown_callback = new grpc::ShutdownCallback; + callback_cq = new grpc::CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq); + } else { + // Otherwise we need to use the alternative CQ variant + callback_cq = CompletionQueue::CallbackAlternativeCQ(); } - auto* shutdown_callback = new grpc::ShutdownCallback; - callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, - shutdown_callback}); - - // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); - return callback_cq_; + callback_cq_.store(callback_cq, std::memory_order_release); + return callback_cq; } } // namespace grpc |