From db01bf793aeab78b8b8d85686977240afb56a536 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 11 Sep 2018 17:01:19 -0700 Subject: Add callback-based alarms --- src/cpp/common/alarm.cc | 47 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 15a373d8a5..5819a4210b 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -39,17 +39,6 @@ class AlarmImpl : public CompletionQueueTag { AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); grpc_timer_init_unset(&timer_); - GRPC_CLOSURE_INIT(&on_alarm_, - [](void* arg, grpc_error* error) { - // queue the op on the completion queue - AlarmImpl* alarm = static_cast(arg); - alarm->Ref(); - grpc_cq_end_op( - alarm->cq_, alarm, error, - [](void* arg, grpc_cq_completion* completion) {}, - arg, &alarm->completion_); - }, - this, grpc_schedule_on_exec_ctx); } ~AlarmImpl() { grpc_core::ExecCtx exec_ctx; @@ -68,6 +57,32 @@ class AlarmImpl : public CompletionQueueTag { cq_ = cq->cq(); tag_ = tag; GPR_ASSERT(grpc_cq_begin_op(cq_, this)); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { + // queue the op on the completion queue + AlarmImpl* alarm = static_cast(arg); + alarm->Ref(); + grpc_cq_end_op( + alarm->cq_, alarm, error, + [](void* arg, grpc_cq_completion* completion) {}, + arg, &alarm->completion_); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), + &on_alarm_); + } + void Set(gpr_timespec deadline, std::function f) { + grpc_core::ExecCtx exec_ctx; + // Don't use any CQ at all. Instead just use the timer to fire the function + callback_ = std::move(f); + Ref(); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { + AlarmImpl* alarm = static_cast(arg); + alarm->callback_(error == GRPC_ERROR_NONE); + alarm->Unref(); + }, + this, grpc_schedule_on_exec_ctx); grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), &on_alarm_); } @@ -95,6 +110,7 @@ class AlarmImpl : public CompletionQueueTag { // completion queue where events about this alarm will be posted grpc_completion_queue* cq_; void* tag_; + std::function callback_; }; } // namespace internal @@ -113,6 +129,15 @@ void Alarm::SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag) { static_cast(alarm_)->Set(cq, deadline, tag); } +void Alarm::SetInternal(gpr_timespec deadline, std::function f) { + // Note that we know that alarm_ is actually an internal::AlarmImpl + // but we declared it as the base pointer to avoid a forward declaration + // or exposing core data structures in the C++ public headers. + // Thus it is safe to use a static_cast to the subclass here, and the + // C++ style guide allows us to do so in this case + static_cast(alarm_)->Set(deadline, std::move(f)); +} + Alarm::~Alarm() { if (alarm_ != nullptr) { static_cast(alarm_)->Destroy(); -- cgit v1.2.3