diff options
Diffstat (limited to 'src/core/lib/surface/completion_queue.cc')
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 222 |
1 files changed, 94 insertions, 128 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 21664f03c8..8abcb70d25 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -58,13 +58,12 @@ typedef struct { bool can_listen; size_t (*size)(void); void (*init)(grpc_pollset *pollset, gpr_mu **mu); - grpc_error *(*kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_error *(*kick)(grpc_pollset *pollset, grpc_pollset_worker *specific_worker); - grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, grpc_millis deadline); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_closure *closure); - void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); + grpc_error *(*work)(grpc_pollset *pollset, grpc_pollset_worker **worker, + grpc_millis deadline); + void (*shutdown)(grpc_pollset *pollset, grpc_closure *closure); + void (*destroy)(grpc_pollset *pollset); } cq_poller_vtable; typedef struct non_polling_worker { @@ -90,14 +89,12 @@ static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) { *mu = &npp->mu; } -static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset) { +static void non_polling_poller_destroy(grpc_pollset *pollset) { non_polling_poller *npp = (non_polling_poller *)pollset; gpr_mu_destroy(&npp->mu); } -static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, +static grpc_error *non_polling_poller_work(grpc_pollset *pollset, grpc_pollset_worker **worker, grpc_millis deadline) { non_polling_poller *npp = (non_polling_poller *)pollset; @@ -122,7 +119,7 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, npp->root = w.next; if (&w == npp->root) { if (npp->shutdown) { - GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE); } npp->root = NULL; } @@ -135,8 +132,7 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, } static grpc_error *non_polling_poller_kick( - grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *specific_worker) { + grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { non_polling_poller *p = (non_polling_poller *)pollset; if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root; if (specific_worker != NULL) { @@ -149,14 +145,13 @@ static grpc_error *non_polling_poller_kick( return GRPC_ERROR_NONE; } -static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, +static void non_polling_poller_shutdown(grpc_pollset *pollset, grpc_closure *closure) { non_polling_poller *p = (non_polling_poller *)pollset; GPR_ASSERT(closure != NULL); p->shutdown = closure; if (p->root == NULL) { - GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); } else { non_polling_worker *w = p->root; do { @@ -183,13 +178,11 @@ typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; void (*init)(void *data); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); + void (*shutdown)(grpc_completion_queue *cq); void (*destroy)(void *data); bool (*begin_op)(grpc_completion_queue *cq, void *tag); - void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, - grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, - grpc_cq_completion *storage), + void (*end_op)(grpc_completion_queue *cq, void *tag, grpc_error *error, + void (*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); @@ -274,31 +267,23 @@ struct grpc_completion_queue { }; /* Forward declarations */ -static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq); -static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq); -static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq); -static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq); +static void cq_finish_shutdown_next(grpc_completion_queue *cq); +static void cq_finish_shutdown_pluck(grpc_completion_queue *cq); +static void cq_shutdown_next(grpc_completion_queue *cq); +static void cq_shutdown_pluck(grpc_completion_queue *cq); static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); -static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq, void *tag, +static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, - void *done_arg, + void (*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); -static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq, void *tag, +static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, - void *done_arg, + void (*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); @@ -342,8 +327,7 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = gpr_free(_ev); \ } -static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, - grpc_error *error); +static void on_pollset_shutdown_done(void *cq, grpc_error *error); static void cq_event_queue_init(grpc_cq_event_queue *q) { gpr_mpscq_init(&q->queue); @@ -362,23 +346,23 @@ static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { grpc_cq_completion *c = NULL; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; if (gpr_spinlock_trylock(&q->queue_lock)) { - GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(); bool is_empty = false; c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty); gpr_spinlock_unlock(&q->queue_lock); if (c == NULL && !is_empty) { - GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx); + GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(); } } else { - GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(); } - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); if (c) { gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); @@ -409,9 +393,9 @@ grpc_completion_queue *grpc_completion_queue_create_internal( const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_STATS_INC_CQS_CREATED(&exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); + ExecCtx _local_exec_ctx; + GRPC_STATS_INC_CQS_CREATED(); + grpc_exec_ctx_finish(); cq = (grpc_completion_queue *)gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + @@ -493,15 +477,14 @@ void grpc_cq_internal_ref(grpc_completion_queue *cq) { gpr_ref(&cq->owning_refs); } -static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void on_pollset_shutdown_done(void *arg, grpc_error *error) { grpc_completion_queue *cq = (grpc_completion_queue *)arg; - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy"); + GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy"); } #ifndef NDEBUG -void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, - const char *reason, const char *file, int line) { +void grpc_cq_internal_unref(grpc_completion_queue *cq, const char *reason, + const char *file, int line) { if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, @@ -509,12 +492,11 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, reason); } #else -void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq) { +void grpc_cq_internal_unref(grpc_completion_queue *cq) { #endif if (gpr_unref(&cq->owning_refs)) { cq->vtable->destroy(DATA_FROM_CQ(cq)); - cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq)); + cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq)); #ifndef NDEBUG gpr_free(cq->outstanding_tags); #endif @@ -595,11 +577,9 @@ bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_NEXT) */ -static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq, void *tag, +static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, - void *done_arg, + void (*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { GPR_TIMER_BEGIN("cq_end_op_for_next", 0); @@ -609,9 +589,9 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, " + "cq_end_op_for_next(=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); + 7, (&exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); @@ -643,7 +623,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, if (is_first) { gpr_mu_lock(cq->mu); grpc_error *kick_error = - cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { @@ -655,17 +635,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(exec_ctx, cq); + cq_finish_shutdown_next(cq); gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); } } else { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); gpr_atm_rel_store(&cqd->pending_events, 0); gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(exec_ctx, cq); + cq_finish_shutdown_next(cq); gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); @@ -676,11 +656,9 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_PLUCK) */ -static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq, void *tag, +static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, - void *done_arg, + void (*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); @@ -693,9 +671,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, " + "cq_end_op_for_pluck(=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); + 7, (&exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); @@ -717,7 +695,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, cqd->completed_tail = storage; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_pluck(exec_ctx, cq); + cq_finish_shutdown_pluck(cq); gpr_mu_unlock(cq->mu); } else { grpc_pollset_worker *pluck_worker = NULL; @@ -729,7 +707,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, } grpc_error *kick_error = - cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); gpr_mu_unlock(cq->mu); @@ -746,12 +724,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, - void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, - grpc_cq_completion *storage), +void grpc_cq_end_op(grpc_completion_queue *cq, void *tag, grpc_error *error, + void (*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage); + cq->vtable->end_op(cq, tag, error, done, done_arg, storage); } typedef struct { @@ -763,7 +739,7 @@ typedef struct { bool first_loop; } cq_is_finished_arg; -static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { +static bool cq_is_next_finished(void *arg) { cq_is_finished_arg *a = (cq_is_finished_arg *)arg; grpc_completion_queue *cq = a->cq; cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); @@ -786,7 +762,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { return true; } } - return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); + return !a->first_loop && a->deadline < grpc_exec_ctx_now(); } #ifndef NDEBUG @@ -841,8 +817,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, NULL, NULL, true}; - grpc_exec_ctx exec_ctx = - GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); + ExecCtx _local_exec_ctx(0, cq_is_next_finished, &is_finished_arg); for (;;) { grpc_millis iteration_deadline = deadline_millis; @@ -852,7 +827,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(&exec_ctx, c->done_arg, c); + c->done(c->done_arg, c); break; } @@ -862,7 +837,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(&exec_ctx, c->done_arg, c); + c->done(c->done_arg, c); break; } else { /* If c == NULL it means either the queue is empty OR in an transient @@ -892,8 +867,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, break; } - if (!is_finished_arg.first_loop && - grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { + if (!is_finished_arg.first_loop && grpc_exec_ctx_now() >= deadline_millis) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cq); @@ -903,8 +877,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, /* The main polling work happens in grpc_pollset_work */ gpr_mu_lock(cq->mu); cq->num_polls++; - grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - NULL, iteration_deadline); + grpc_error *err = + cq->poller_vtable->work(POLLSET_FROM_CQ(cq), NULL, iteration_deadline); gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { @@ -923,13 +897,13 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, if (cq_event_queue_num_items(&cqd->queue) > 0 && gpr_atm_acq_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); - cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), NULL); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); gpr_mu_unlock(cq->mu); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_CQ_INTERNAL_UNREF(cq, "next"); + grpc_exec_ctx_finish(); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); GPR_TIMER_END("grpc_completion_queue_next", 0); @@ -943,19 +917,16 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, - Must be called only once in completion queue's lifetime - grpc_completion_queue_shutdown() MUST have been called before calling this function */ -static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq) { +static void cq_finish_shutdown_next(grpc_completion_queue *cq) { cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); - cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), - &cq->pollset_shutdown_done); + cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); } -static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq) { +static void cq_shutdown_next(grpc_completion_queue *cq) { cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: @@ -968,7 +939,7 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); return; } cqd->shutdown_called = true; @@ -976,10 +947,10 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write * on this counter without necessarily holding a lock on cq */ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_next(exec_ctx, cq); + cq_finish_shutdown_next(cq); } gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); } grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, @@ -1012,7 +983,7 @@ static void del_plucker(grpc_completion_queue *cq, void *tag, GPR_UNREACHABLE_CODE(return ); } -static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { +static bool cq_is_pluck_finished(void *arg) { cq_is_finished_arg *a = (cq_is_finished_arg *)arg; grpc_completion_queue *cq = a->cq; cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); @@ -1041,7 +1012,7 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { } gpr_mu_unlock(cq->mu); } - return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx); + return !a->first_loop && a->deadline < grpc_exec_ctx_now(); } static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, @@ -1078,8 +1049,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, NULL, tag, true}; - grpc_exec_ctx exec_ctx = - GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); + ExecCtx _local_exec_ctx(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { gpr_mu_unlock(cq->mu); @@ -1088,7 +1058,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(&exec_ctx, c->done_arg, c); + c->done(c->done_arg, c); break; } prev = &cqd->completed_head; @@ -1103,7 +1073,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(&exec_ctx, c->done_arg, c); + c->done(c->done_arg, c); goto done; } prev = c; @@ -1126,8 +1096,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, dump_pending_tags(cq); break; } - if (!is_finished_arg.first_loop && - grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) { + if (!is_finished_arg.first_loop && grpc_exec_ctx_now() >= deadline_millis) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); @@ -1136,8 +1105,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, break; } cq->num_polls++; - grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), - &worker, deadline_millis); + grpc_error *err = + cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis); if (err != GRPC_ERROR_NONE) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); @@ -1155,8 +1124,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, } done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck"); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_CQ_INTERNAL_UNREF(cq, "pluck"); + grpc_exec_ctx_finish(); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); GPR_TIMER_END("grpc_completion_queue_pluck", 0); @@ -1169,22 +1138,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, return cq->vtable->pluck(cq, tag, deadline, reserved); } -static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq) { +static void cq_finish_shutdown_pluck(grpc_completion_queue *cq) { cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); gpr_atm_no_barrier_store(&cqd->shutdown, 1); - cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), - &cq->pollset_shutdown_done); + cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); } /* NOTE: This function is almost exactly identical to cq_shutdown_next() but * merging them is a bit tricky and probably not worth it */ -static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cq) { +static void cq_shutdown_pluck(grpc_completion_queue *cq) { cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: @@ -1197,25 +1163,25 @@ static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)"); return; } cqd->shutdown_called = true; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { - cq_finish_shutdown_pluck(exec_ctx, cq); + cq_finish_shutdown_pluck(cq); } gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)"); + GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)"); } /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cq) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); - cq->vtable->shutdown(&exec_ctx, cq); - grpc_exec_ctx_finish(&exec_ctx); + cq->vtable->shutdown(cq); + grpc_exec_ctx_finish(); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -1224,9 +1190,9 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cq) { GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cq); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); + ExecCtx _local_exec_ctx; + GRPC_CQ_INTERNAL_UNREF(cq, "destroy"); + grpc_exec_ctx_finish(); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } |