diff options
Diffstat (limited to 'grpc/src/cpp/common/completion_queue_cc.cc')
-rw-r--r-- | grpc/src/cpp/common/completion_queue_cc.cc | 110 |
1 files changed, 109 insertions, 1 deletions
diff --git a/grpc/src/cpp/common/completion_queue_cc.cc b/grpc/src/cpp/common/completion_queue_cc.cc index 96a7105e..015b5c1d 100644 --- a/grpc/src/cpp/common/completion_queue_cc.cc +++ b/grpc/src/cpp/common/completion_queue_cc.cc @@ -20,13 +20,106 @@ #include <memory> #include <grpc/grpc.h> +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/support/time.h> +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/thd.h" + namespace grpc { +namespace { + +internal::GrpcLibraryInitializer g_gli_initializer; + +gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT; +grpc_core::Mutex* g_callback_alternative_mu; + +// Implement a ref-counted callback CQ for global use in the alternative +// implementation so that its threads are only created once. Do this using +// explicit ref-counts and raw pointers rather than a shared-ptr since that +// has a non-trivial destructor and thus can't be used for global variables. +struct CallbackAlternativeCQ { + int refs ABSL_GUARDED_BY(g_callback_alternative_mu) = 0; + CompletionQueue* cq ABSL_GUARDED_BY(g_callback_alternative_mu); + std::vector<grpc_core::Thread>* nexting_threads + ABSL_GUARDED_BY(g_callback_alternative_mu); -static internal::GrpcLibraryInitializer g_gli_initializer; + CompletionQueue* Ref() { + grpc_core::MutexLock lock(&*g_callback_alternative_mu); + refs++; + if (refs == 1) { + cq = new CompletionQueue; + int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores() / 2, 2, 16); + nexting_threads = new std::vector<grpc_core::Thread>; + for (int i = 0; i < num_nexting_threads; i++) { + nexting_threads->emplace_back( + "nexting_thread", + [](void* arg) { + grpc_completion_queue* cq = + static_cast<CompletionQueue*>(arg)->cq(); + while (true) { + // Use the raw Core next function rather than the C++ Next since + // Next incorporates FinalizeResult and we actually want that + // called from the callback functor itself. + // TODO(vjpai): Migrate below to next without a timeout or idle + // phase. That's currently starving out some other polling, + // though. + auto ev = grpc_completion_queue_next( + cq, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(1000, GPR_TIMESPAN)), + nullptr); + if (ev.type == GRPC_QUEUE_SHUTDOWN) { + return; + } + if (ev.type == GRPC_QUEUE_TIMEOUT) { + gpr_sleep_until( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(100, GPR_TIMESPAN))); + continue; + } + GPR_DEBUG_ASSERT(ev.type == GRPC_OP_COMPLETE); + // We can always execute the callback inline rather than + // pushing it to another Executor thread because this + // thread is definitely running on a background thread, does not + // hold any application locks before executing the callback, + // and cannot be entered recursively. + auto* functor = + static_cast<grpc_experimental_completion_queue_functor*>( + ev.tag); + functor->functor_run(functor, ev.success); + } + }, + cq); + } + for (auto& th : *nexting_threads) { + th.Start(); + } + } + return cq; + } + + void Unref() { + grpc_core::MutexLock lock(g_callback_alternative_mu); + refs--; + if (refs == 0) { + cq->Shutdown(); + for (auto& th : *nexting_threads) { + th.Join(); + } + delete nexting_threads; + delete cq; + } + } +}; + +CallbackAlternativeCQ g_callback_alternative_cq; + +} // namespace // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create @@ -96,4 +189,19 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { return false; } +CompletionQueue* CompletionQueue::CallbackAlternativeCQ() { + gpr_once_init(&g_once_init_callback_alternative, + [] { g_callback_alternative_mu = new grpc_core::Mutex(); }); + return g_callback_alternative_cq.Ref(); +} + +void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq) + ABSL_NO_THREAD_SAFETY_ANALYSIS { + (void)cq; + // This accesses g_callback_alternative_cq without acquiring the mutex + // but it's considered safe because it just reads the pointer address. + GPR_DEBUG_ASSERT(cq == g_callback_alternative_cq.cq); + g_callback_alternative_cq.Unref(); +} + } // namespace grpc |