aboutsummaryrefslogtreecommitdiff
path: root/src/core/lib/surface/completion_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/completion_queue.cc')
-rw-r--r--src/core/lib/surface/completion_queue.cc222
1 files changed, 94 insertions, 128 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 21664f03c8..8abcb70d25 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -58,13 +58,12 @@ typedef struct {
bool can_listen;
size_t (*size)(void);
void (*init)(grpc_pollset *pollset, gpr_mu **mu);
- grpc_error *(*kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_error *(*kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
- grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, grpc_millis deadline);
- void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_closure *closure);
- void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
+ grpc_error *(*work)(grpc_pollset *pollset, grpc_pollset_worker **worker,
+ grpc_millis deadline);
+ void (*shutdown)(grpc_pollset *pollset, grpc_closure *closure);
+ void (*destroy)(grpc_pollset *pollset);
} cq_poller_vtable;
typedef struct non_polling_worker {
@@ -90,14 +89,12 @@ static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &npp->mu;
}
-static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset) {
+static void non_polling_poller_destroy(grpc_pollset *pollset) {
non_polling_poller *npp = (non_polling_poller *)pollset;
gpr_mu_destroy(&npp->mu);
}
-static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset,
+static grpc_error *non_polling_poller_work(grpc_pollset *pollset,
grpc_pollset_worker **worker,
grpc_millis deadline) {
non_polling_poller *npp = (non_polling_poller *)pollset;
@@ -122,7 +119,7 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
npp->root = w.next;
if (&w == npp->root) {
if (npp->shutdown) {
- GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE);
}
npp->root = NULL;
}
@@ -135,8 +132,7 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
}
static grpc_error *non_polling_poller_kick(
- grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker *specific_worker) {
+ grpc_pollset *pollset, grpc_pollset_worker *specific_worker) {
non_polling_poller *p = (non_polling_poller *)pollset;
if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root;
if (specific_worker != NULL) {
@@ -149,14 +145,13 @@ static grpc_error *non_polling_poller_kick(
return GRPC_ERROR_NONE;
}
-static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset,
+static void non_polling_poller_shutdown(grpc_pollset *pollset,
grpc_closure *closure) {
non_polling_poller *p = (non_polling_poller *)pollset;
GPR_ASSERT(closure != NULL);
p->shutdown = closure;
if (p->root == NULL) {
- GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
} else {
non_polling_worker *w = p->root;
do {
@@ -183,13 +178,11 @@ typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
void (*init)(void *data);
- void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
+ void (*shutdown)(grpc_completion_queue *cq);
void (*destroy)(void *data);
bool (*begin_op)(grpc_completion_queue *cq, void *tag);
- void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
- grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
- grpc_cq_completion *storage),
+ void (*end_op)(grpc_completion_queue *cq, void *tag, grpc_error *error,
+ void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved);
@@ -274,31 +267,23 @@ struct grpc_completion_queue {
};
/* Forward declarations */
-static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq);
-static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq);
-static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq);
-static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq);
+static void cq_finish_shutdown_next(grpc_completion_queue *cq);
+static void cq_finish_shutdown_pluck(grpc_completion_queue *cq);
+static void cq_shutdown_next(grpc_completion_queue *cq);
+static void cq_shutdown_pluck(grpc_completion_queue *cq);
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
-static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq, void *tag,
+static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag,
grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx,
- void *done_arg,
+ void (*done)(void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
-static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq, void *tag,
+static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag,
grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx,
- void *done_arg,
+ void (*done)(void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
@@ -342,8 +327,7 @@ grpc_tracer_flag grpc_cq_event_timeout_trace =
gpr_free(_ev); \
}
-static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq,
- grpc_error *error);
+static void on_pollset_shutdown_done(void *cq, grpc_error *error);
static void cq_event_queue_init(grpc_cq_event_queue *q) {
gpr_mpscq_init(&q->queue);
@@ -362,23 +346,23 @@ static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
grpc_cq_completion *c = NULL;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
if (gpr_spinlock_trylock(&q->queue_lock)) {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
bool is_empty = false;
c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
gpr_spinlock_unlock(&q->queue_lock);
if (c == NULL && !is_empty) {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
}
} else {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
}
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_exec_ctx_finish();
if (c) {
gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
@@ -409,9 +393,9 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
const cq_poller_vtable *poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_STATS_INC_CQS_CREATED(&exec_ctx);
- grpc_exec_ctx_finish(&exec_ctx);
+ ExecCtx _local_exec_ctx;
+ GRPC_STATS_INC_CQS_CREATED();
+ grpc_exec_ctx_finish();
cq = (grpc_completion_queue *)gpr_zalloc(sizeof(grpc_completion_queue) +
vtable->data_size +
@@ -493,15 +477,14 @@ void grpc_cq_internal_ref(grpc_completion_queue *cq) {
gpr_ref(&cq->owning_refs);
}
-static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void on_pollset_shutdown_done(void *arg, grpc_error *error) {
grpc_completion_queue *cq = (grpc_completion_queue *)arg;
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy");
+ GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
}
#ifndef NDEBUG
-void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
- const char *reason, const char *file, int line) {
+void grpc_cq_internal_unref(grpc_completion_queue *cq, const char *reason,
+ const char *file, int line) {
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -509,12 +492,11 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
reason);
}
#else
-void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
+void grpc_cq_internal_unref(grpc_completion_queue *cq) {
#endif
if (gpr_unref(&cq->owning_refs)) {
cq->vtable->destroy(DATA_FROM_CQ(cq));
- cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq));
+ cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
#ifndef NDEBUG
gpr_free(cq->outstanding_tags);
#endif
@@ -595,11 +577,9 @@ bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
-static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq, void *tag,
+static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag,
grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx,
- void *done_arg,
+ void (*done)(void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
GPR_TIMER_BEGIN("cq_end_op_for_next", 0);
@@ -609,9 +589,9 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
+ "cq_end_op_for_next(=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
+ 7, (&exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
@@ -643,7 +623,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
if (is_first) {
gpr_mu_lock(cq->mu);
grpc_error *kick_error =
- cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
@@ -655,17 +635,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
} else {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_atm_rel_store(&cqd->pending_events, 0);
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
GPR_TIMER_END("cq_end_op_for_next", 0);
@@ -676,11 +656,9 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_PLUCK) */
-static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq, void *tag,
+static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag,
grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx,
- void *done_arg,
+ void (*done)(void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
@@ -693,9 +671,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
+ "cq_end_op_for_pluck(=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
+ 7, (&exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
@@ -717,7 +695,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
cqd->completed_tail = storage;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_pluck(exec_ctx, cq);
+ cq_finish_shutdown_pluck(cq);
gpr_mu_unlock(cq->mu);
} else {
grpc_pollset_worker *pluck_worker = NULL;
@@ -729,7 +707,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
}
grpc_error *kick_error =
- cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
gpr_mu_unlock(cq->mu);
@@ -746,12 +724,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
- void *tag, grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
- grpc_cq_completion *storage),
+void grpc_cq_end_op(grpc_completion_queue *cq, void *tag, grpc_error *error,
+ void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage);
+ cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
}
typedef struct {
@@ -763,7 +739,7 @@ typedef struct {
bool first_loop;
} cq_is_finished_arg;
-static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
+static bool cq_is_next_finished(void *arg) {
cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
grpc_completion_queue *cq = a->cq;
cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
@@ -786,7 +762,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
return true;
}
}
- return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
+ return !a->first_loop && a->deadline < grpc_exec_ctx_now();
}
#ifndef NDEBUG
@@ -841,8 +817,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
NULL,
NULL,
true};
- grpc_exec_ctx exec_ctx =
- GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
+ ExecCtx _local_exec_ctx(0, cq_is_next_finished, &is_finished_arg);
for (;;) {
grpc_millis iteration_deadline = deadline_millis;
@@ -852,7 +827,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
break;
}
@@ -862,7 +837,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
break;
} else {
/* If c == NULL it means either the queue is empty OR in an transient
@@ -892,8 +867,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
break;
}
- if (!is_finished_arg.first_loop &&
- grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
+ if (!is_finished_arg.first_loop && grpc_exec_ctx_now() >= deadline_millis) {
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cq);
@@ -903,8 +877,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
/* The main polling work happens in grpc_pollset_work */
gpr_mu_lock(cq->mu);
cq->num_polls++;
- grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- NULL, iteration_deadline);
+ grpc_error *err =
+ cq->poller_vtable->work(POLLSET_FROM_CQ(cq), NULL, iteration_deadline);
gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
@@ -923,13 +897,13 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
gpr_atm_acq_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
- cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), NULL);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
gpr_mu_unlock(cq->mu);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
- grpc_exec_ctx_finish(&exec_ctx);
+ GRPC_CQ_INTERNAL_UNREF(cq, "next");
+ grpc_exec_ctx_finish();
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
GPR_TIMER_END("grpc_completion_queue_next", 0);
@@ -943,19 +917,16 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
- Must be called only once in completion queue's lifetime
- grpc_completion_queue_shutdown() MUST have been called before calling
this function */
-static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
+static void cq_finish_shutdown_next(grpc_completion_queue *cq) {
cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
- cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
- &cq->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
-static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
+static void cq_shutdown_next(grpc_completion_queue *cq) {
cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
@@ -968,7 +939,7 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
return;
}
cqd->shutdown_called = true;
@@ -976,10 +947,10 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
* cq_begin_op_for_next and and cq_end_op_for_next functions which read/write
* on this counter without necessarily holding a lock on cq */
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_next(exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
}
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
@@ -1012,7 +983,7 @@ static void del_plucker(grpc_completion_queue *cq, void *tag,
GPR_UNREACHABLE_CODE(return );
}
-static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
+static bool cq_is_pluck_finished(void *arg) {
cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
grpc_completion_queue *cq = a->cq;
cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
@@ -1041,7 +1012,7 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
gpr_mu_unlock(cq->mu);
}
- return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
+ return !a->first_loop && a->deadline < grpc_exec_ctx_now();
}
static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
@@ -1078,8 +1049,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
NULL,
tag,
true};
- grpc_exec_ctx exec_ctx =
- GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
+ ExecCtx _local_exec_ctx(0, cq_is_pluck_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cq->mu);
@@ -1088,7 +1058,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
break;
}
prev = &cqd->completed_head;
@@ -1103,7 +1073,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
goto done;
}
prev = c;
@@ -1126,8 +1096,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
dump_pending_tags(cq);
break;
}
- if (!is_finished_arg.first_loop &&
- grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
+ if (!is_finished_arg.first_loop && grpc_exec_ctx_now() >= deadline_millis) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
@@ -1136,8 +1105,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
break;
}
cq->num_polls++;
- grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- &worker, deadline_millis);
+ grpc_error *err =
+ cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
@@ -1155,8 +1124,8 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck");
- grpc_exec_ctx_finish(&exec_ctx);
+ GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
+ grpc_exec_ctx_finish();
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
GPR_TIMER_END("grpc_completion_queue_pluck", 0);
@@ -1169,22 +1138,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
return cq->vtable->pluck(cq, tag, deadline, reserved);
}
-static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
+static void cq_finish_shutdown_pluck(grpc_completion_queue *cq) {
cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
gpr_atm_no_barrier_store(&cqd->shutdown, 1);
- cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
- &cq->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
* merging them is a bit tricky and probably not worth it */
-static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
+static void cq_shutdown_pluck(grpc_completion_queue *cq) {
cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
@@ -1197,25 +1163,25 @@ static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
return;
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_pluck(exec_ctx, cq);
+ cq_finish_shutdown_pluck(cq);
}
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cq) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ ExecCtx _local_exec_ctx;
GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
- cq->vtable->shutdown(&exec_ctx, cq);
- grpc_exec_ctx_finish(&exec_ctx);
+ cq->vtable->shutdown(cq);
+ grpc_exec_ctx_finish();
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
@@ -1224,9 +1190,9 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cq) {
GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
grpc_completion_queue_shutdown(cq);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy");
- grpc_exec_ctx_finish(&exec_ctx);
+ ExecCtx _local_exec_ctx;
+ GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
+ grpc_exec_ctx_finish();
GPR_TIMER_END("grpc_completion_queue_destroy", 0);
}