diff options
Diffstat (limited to 'grpc/src/core/lib/surface/completion_queue.cc')
-rw-r--r-- | grpc/src/core/lib/surface/completion_queue.cc | 93 |
1 files changed, 46 insertions, 47 deletions
diff --git a/grpc/src/core/lib/surface/completion_queue.cc b/grpc/src/core/lib/surface/completion_queue.cc index 02ac5067..2b889e0d 100644 --- a/grpc/src/core/lib/surface/completion_queue.cc +++ b/grpc/src/core/lib/surface/completion_queue.cc @@ -70,10 +70,10 @@ struct cq_poller_vtable { bool can_listen; size_t (*size)(void); void (*init)(grpc_pollset* pollset, gpr_mu** mu); - grpc_error* (*kick)(grpc_pollset* pollset, - grpc_pollset_worker* specific_worker); - grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker, - grpc_millis deadline); + grpc_error_handle (*kick)(grpc_pollset* pollset, + grpc_pollset_worker* specific_worker); + grpc_error_handle (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker, + grpc_millis deadline); void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure); void (*destroy)(grpc_pollset* pollset); }; @@ -103,9 +103,9 @@ void non_polling_poller_destroy(grpc_pollset* pollset) { gpr_mu_destroy(&npp->mu); } -grpc_error* non_polling_poller_work(grpc_pollset* pollset, - grpc_pollset_worker** worker, - grpc_millis deadline) { +grpc_error_handle non_polling_poller_work(grpc_pollset* pollset, + grpc_pollset_worker** worker, + grpc_millis deadline) { non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset); if (npp->shutdown) return GRPC_ERROR_NONE; if (npp->kicked_without_poller) { @@ -145,8 +145,8 @@ grpc_error* non_polling_poller_work(grpc_pollset* pollset, return GRPC_ERROR_NONE; } -grpc_error* non_polling_poller_kick(grpc_pollset* pollset, - grpc_pollset_worker* specific_worker) { +grpc_error_handle non_polling_poller_kick( + grpc_pollset* pollset, grpc_pollset_worker* specific_worker) { non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset); if (specific_worker == nullptr) { specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root); @@ -202,7 +202,7 @@ struct cq_vtable { void (*shutdown)(grpc_completion_queue* cq); void (*destroy)(void* data); bool (*begin_op)(grpc_completion_queue* cq, void* tag); - void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error_handle error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool internal); grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, @@ -376,17 +376,17 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); // safe to free up that storage. The storage MUST NOT be freed until the // done callback is invoked. static void cq_end_op_for_next( - grpc_completion_queue* cq, void* tag, grpc_error* error, + grpc_completion_queue* cq, void* tag, grpc_error_handle error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool internal); static void cq_end_op_for_pluck( - grpc_completion_queue* cq, void* tag, grpc_error* error, + grpc_completion_queue* cq, void* tag, grpc_error_handle error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool internal); static void cq_end_op_for_callback( - grpc_completion_queue* cq, void* tag, grpc_error* error, + grpc_completion_queue* cq, void* tag, grpc_error_handle error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool internal); @@ -439,7 +439,7 @@ grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck"); } \ } while (0) -static void on_pollset_shutdown_done(void* arg, grpc_error* error); +static void on_pollset_shutdown_done(void* arg, grpc_error_handle error); void grpc_cq_global_init() { gpr_tls_init(&g_cached_event); @@ -604,7 +604,7 @@ void grpc_cq_internal_ref(grpc_completion_queue* cq) { cq->owning_refs.Ref(debug_location, reason); } -static void on_pollset_shutdown_done(void* arg, grpc_error* /*error*/) { +static void on_pollset_shutdown_done(void* arg, grpc_error_handle /*error*/) { grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg); GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy"); } @@ -690,7 +690,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { * completion * type of GRPC_CQ_NEXT) */ static void cq_end_op_for_next( - grpc_completion_queue* cq, void* tag, grpc_error* error, + grpc_completion_queue* cq, void* tag, grpc_error_handle error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool /*internal*/) { GPR_TIMER_SCOPE("cq_end_op_for_next", 0); @@ -698,14 +698,15 @@ static void cq_end_op_for_next( if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE)) { - const char* errmsg = grpc_error_string(error); + std::string errmsg = grpc_error_std_string(error); GRPC_API_TRACE( "cq_end_op_for_next(cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 6, (cq, tag, errmsg, done, done_arg, storage)); + 6, (cq, tag, errmsg.c_str(), done, done_arg, storage)); if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + gpr_log(GPR_INFO, "Operation failed: tag=%p, error=%s", tag, + errmsg.c_str()); } } cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq); @@ -736,13 +737,13 @@ static void cq_end_op_for_next( /* Only kick if this is the first item queued */ if (is_first) { gpr_mu_lock(cq->mu); - grpc_error* kick_error = + grpc_error_handle kick_error = cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr); gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { - const char* msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); + gpr_log(GPR_ERROR, "Kick failed: %s", + grpc_error_std_string(kick_error).c_str()); GRPC_ERROR_UNREF(kick_error); } } @@ -771,7 +772,7 @@ static void cq_end_op_for_next( * completion * type of GRPC_CQ_PLUCK) */ static void cq_end_op_for_pluck( - grpc_completion_queue* cq, void* tag, grpc_error* error, + grpc_completion_queue* cq, void* tag, grpc_error_handle error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool /*internal*/) { GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); @@ -782,14 +783,15 @@ static void cq_end_op_for_pluck( if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE)) { - const char* errmsg = grpc_error_string(error); + std::string errmsg = grpc_error_std_string(error).c_str(); GRPC_API_TRACE( "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 6, (cq, tag, errmsg, done, done_arg, storage)); + 6, (cq, tag, errmsg.c_str(), done, done_arg, storage)); if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, + errmsg.c_str()); } } @@ -820,15 +822,12 @@ static void cq_end_op_for_pluck( } } - grpc_error* kick_error = + grpc_error_handle kick_error = cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); - gpr_mu_unlock(cq->mu); - if (kick_error != GRPC_ERROR_NONE) { - const char* msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); - + gpr_log(GPR_ERROR, "Kick failed: %s", + grpc_error_std_string(kick_error).c_str()); GRPC_ERROR_UNREF(kick_error); } } @@ -836,14 +835,14 @@ static void cq_end_op_for_pluck( GRPC_ERROR_UNREF(error); } -static void functor_callback(void* arg, grpc_error* error) { +static void functor_callback(void* arg, grpc_error_handle error) { auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg); functor->functor_run(functor, error == GRPC_ERROR_NONE); } /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */ static void cq_end_op_for_callback( - grpc_completion_queue* cq, void* tag, grpc_error* error, + grpc_completion_queue* cq, void* tag, grpc_error_handle error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); @@ -853,14 +852,15 @@ static void cq_end_op_for_callback( if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE)) { - const char* errmsg = grpc_error_string(error); + std::string errmsg = grpc_error_std_string(error); GRPC_API_TRACE( "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 6, (cq, tag, errmsg, done, done_arg, storage)); + 6, (cq, tag, errmsg.c_str(), done, done_arg, storage)); if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, + errmsg.c_str()); } } @@ -896,7 +896,8 @@ static void cq_end_op_for_callback( GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error); } -void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, +void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, + grpc_error_handle error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool internal) { @@ -1056,14 +1057,13 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, /* The main polling work happens in grpc_pollset_work */ gpr_mu_lock(cq->mu); cq->num_polls++; - grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr, - iteration_deadline); + grpc_error_handle err = cq->poller_vtable->work( + POLLSET_FROM_CQ(cq), nullptr, iteration_deadline); gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { - const char* msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); - + gpr_log(GPR_ERROR, "Completion queue next failed: %s", + grpc_error_std_string(err).c_str()); GRPC_ERROR_UNREF(err); ret.type = GRPC_QUEUE_TIMEOUT; ret.success = 0; @@ -1299,14 +1299,13 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, break; } cq->num_polls++; - grpc_error* err = + grpc_error_handle err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis); if (err != GRPC_ERROR_NONE) { del_plucker(cq, tag, &worker); gpr_mu_unlock(cq->mu); - const char* msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg); - + gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", + grpc_error_std_string(err).c_str()); GRPC_ERROR_UNREF(err); ret.type = GRPC_QUEUE_TIMEOUT; ret.success = 0; |