summaryrefslogtreecommitdiff
path: root/grpc/src/core/lib/surface/completion_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/lib/surface/completion_queue.cc')
-rw-r--r--grpc/src/core/lib/surface/completion_queue.cc93
1 files changed, 46 insertions, 47 deletions
diff --git a/grpc/src/core/lib/surface/completion_queue.cc b/grpc/src/core/lib/surface/completion_queue.cc
index 02ac5067..2b889e0d 100644
--- a/grpc/src/core/lib/surface/completion_queue.cc
+++ b/grpc/src/core/lib/surface/completion_queue.cc
@@ -70,10 +70,10 @@ struct cq_poller_vtable {
bool can_listen;
size_t (*size)(void);
void (*init)(grpc_pollset* pollset, gpr_mu** mu);
- grpc_error* (*kick)(grpc_pollset* pollset,
- grpc_pollset_worker* specific_worker);
- grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
- grpc_millis deadline);
+ grpc_error_handle (*kick)(grpc_pollset* pollset,
+ grpc_pollset_worker* specific_worker);
+ grpc_error_handle (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
+ grpc_millis deadline);
void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
void (*destroy)(grpc_pollset* pollset);
};
@@ -103,9 +103,9 @@ void non_polling_poller_destroy(grpc_pollset* pollset) {
gpr_mu_destroy(&npp->mu);
}
-grpc_error* non_polling_poller_work(grpc_pollset* pollset,
- grpc_pollset_worker** worker,
- grpc_millis deadline) {
+grpc_error_handle non_polling_poller_work(grpc_pollset* pollset,
+ grpc_pollset_worker** worker,
+ grpc_millis deadline) {
non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
if (npp->shutdown) return GRPC_ERROR_NONE;
if (npp->kicked_without_poller) {
@@ -145,8 +145,8 @@ grpc_error* non_polling_poller_work(grpc_pollset* pollset,
return GRPC_ERROR_NONE;
}
-grpc_error* non_polling_poller_kick(grpc_pollset* pollset,
- grpc_pollset_worker* specific_worker) {
+grpc_error_handle non_polling_poller_kick(
+ grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
if (specific_worker == nullptr) {
specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
@@ -202,7 +202,7 @@ struct cq_vtable {
void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
- void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
+ void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage, bool internal);
grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
@@ -376,17 +376,17 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
// safe to free up that storage. The storage MUST NOT be freed until the
// done callback is invoked.
static void cq_end_op_for_next(
- grpc_completion_queue* cq, void* tag, grpc_error* error,
+ grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal);
static void cq_end_op_for_pluck(
- grpc_completion_queue* cq, void* tag, grpc_error* error,
+ grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal);
static void cq_end_op_for_callback(
- grpc_completion_queue* cq, void* tag, grpc_error* error,
+ grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal);
@@ -439,7 +439,7 @@ grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
} \
} while (0)
-static void on_pollset_shutdown_done(void* arg, grpc_error* error);
+static void on_pollset_shutdown_done(void* arg, grpc_error_handle error);
void grpc_cq_global_init() {
gpr_tls_init(&g_cached_event);
@@ -604,7 +604,7 @@ void grpc_cq_internal_ref(grpc_completion_queue* cq) {
cq->owning_refs.Ref(debug_location, reason);
}
-static void on_pollset_shutdown_done(void* arg, grpc_error* /*error*/) {
+static void on_pollset_shutdown_done(void* arg, grpc_error_handle /*error*/) {
grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
}
@@ -690,7 +690,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
* completion
* type of GRPC_CQ_NEXT) */
static void cq_end_op_for_next(
- grpc_completion_queue* cq, void* tag, grpc_error* error,
+ grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool /*internal*/) {
GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
@@ -698,14 +698,15 @@ static void cq_end_op_for_next(
if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
(GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE)) {
- const char* errmsg = grpc_error_string(error);
+ std::string errmsg = grpc_error_std_string(error);
GRPC_API_TRACE(
"cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 6, (cq, tag, errmsg, done, done_arg, storage));
+ 6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
- gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
+ gpr_log(GPR_INFO, "Operation failed: tag=%p, error=%s", tag,
+ errmsg.c_str());
}
}
cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
@@ -736,13 +737,13 @@ static void cq_end_op_for_next(
/* Only kick if this is the first item queued */
if (is_first) {
gpr_mu_lock(cq->mu);
- grpc_error* kick_error =
+ grpc_error_handle kick_error =
cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
- const char* msg = grpc_error_string(kick_error);
- gpr_log(GPR_ERROR, "Kick failed: %s", msg);
+ gpr_log(GPR_ERROR, "Kick failed: %s",
+ grpc_error_std_string(kick_error).c_str());
GRPC_ERROR_UNREF(kick_error);
}
}
@@ -771,7 +772,7 @@ static void cq_end_op_for_next(
* completion
* type of GRPC_CQ_PLUCK) */
static void cq_end_op_for_pluck(
- grpc_completion_queue* cq, void* tag, grpc_error* error,
+ grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool /*internal*/) {
GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
@@ -782,14 +783,15 @@ static void cq_end_op_for_pluck(
if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
(GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE)) {
- const char* errmsg = grpc_error_string(error);
+ std::string errmsg = grpc_error_std_string(error).c_str();
GRPC_API_TRACE(
"cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 6, (cq, tag, errmsg, done, done_arg, storage));
+ 6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
- gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
+ gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag,
+ errmsg.c_str());
}
}
@@ -820,15 +822,12 @@ static void cq_end_op_for_pluck(
}
}
- grpc_error* kick_error =
+ grpc_error_handle kick_error =
cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
-
gpr_mu_unlock(cq->mu);
-
if (kick_error != GRPC_ERROR_NONE) {
- const char* msg = grpc_error_string(kick_error);
- gpr_log(GPR_ERROR, "Kick failed: %s", msg);
-
+ gpr_log(GPR_ERROR, "Kick failed: %s",
+ grpc_error_std_string(kick_error).c_str());
GRPC_ERROR_UNREF(kick_error);
}
}
@@ -836,14 +835,14 @@ static void cq_end_op_for_pluck(
GRPC_ERROR_UNREF(error);
}
-static void functor_callback(void* arg, grpc_error* error) {
+static void functor_callback(void* arg, grpc_error_handle error) {
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
functor->functor_run(functor, error == GRPC_ERROR_NONE);
}
/* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
static void cq_end_op_for_callback(
- grpc_completion_queue* cq, void* tag, grpc_error* error,
+ grpc_completion_queue* cq, void* tag, grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
grpc_cq_completion* storage, bool internal) {
GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
@@ -853,14 +852,15 @@ static void cq_end_op_for_callback(
if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
(GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE)) {
- const char* errmsg = grpc_error_string(error);
+ std::string errmsg = grpc_error_std_string(error);
GRPC_API_TRACE(
"cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 6, (cq, tag, errmsg, done, done_arg, storage));
+ 6, (cq, tag, errmsg.c_str(), done, done_arg, storage));
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
- gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
+ gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag,
+ errmsg.c_str());
}
}
@@ -896,7 +896,8 @@ static void cq_end_op_for_callback(
GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error);
}
-void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
+void grpc_cq_end_op(grpc_completion_queue* cq, void* tag,
+ grpc_error_handle error,
void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage,
bool internal) {
@@ -1056,14 +1057,13 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
/* The main polling work happens in grpc_pollset_work */
gpr_mu_lock(cq->mu);
cq->num_polls++;
- grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
- iteration_deadline);
+ grpc_error_handle err = cq->poller_vtable->work(
+ POLLSET_FROM_CQ(cq), nullptr, iteration_deadline);
gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
- const char* msg = grpc_error_string(err);
- gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
-
+ gpr_log(GPR_ERROR, "Completion queue next failed: %s",
+ grpc_error_std_string(err).c_str());
GRPC_ERROR_UNREF(err);
ret.type = GRPC_QUEUE_TIMEOUT;
ret.success = 0;
@@ -1299,14 +1299,13 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
break;
}
cq->num_polls++;
- grpc_error* err =
+ grpc_error_handle err =
cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
- const char* msg = grpc_error_string(err);
- gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
-
+ gpr_log(GPR_ERROR, "Completion queue pluck failed: %s",
+ grpc_error_std_string(err).c_str());
GRPC_ERROR_UNREF(err);
ret.type = GRPC_QUEUE_TIMEOUT;
ret.success = 0;