summaryrefslogtreecommitdiff
path: root/grpc/src/core/lib/iomgr/ev_epollex_linux.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r--grpc/src/core/lib/iomgr/ev_epollex_linux.cc101
1 files changed, 52 insertions, 49 deletions
diff --git a/grpc/src/core/lib/iomgr/ev_epollex_linux.cc b/grpc/src/core/lib/iomgr/ev_epollex_linux.cc
index acd095a4..72d7da76 100644
--- a/grpc/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/grpc/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -134,7 +134,7 @@ static std::string pollable_desc(pollable* p) {
/// added
static pollable* g_empty_pollable;
-static grpc_error* pollable_create(pollable_type type, pollable** p);
+static grpc_error_handle pollable_create(pollable_type type, pollable** p);
static pollable* pollable_ref(pollable* p,
const grpc_core::DebugLocation& dbg_loc,
const char* reason) {
@@ -314,7 +314,7 @@ struct grpc_pollset_set {
* Common helpers
*/
-static bool append_error(grpc_error** composite, grpc_error* error,
+static bool append_error(grpc_error_handle* composite, grpc_error_handle error,
const char* desc) {
if (error == GRPC_ERROR_NONE) return true;
if (*composite == GRPC_ERROR_NONE) {
@@ -372,7 +372,7 @@ static void ref_by(grpc_fd* fd, int n) {
}
/* Uninitialize and add to the freelist */
-static void fd_destroy(void* arg, grpc_error* /*error*/) {
+static void fd_destroy(void* arg, grpc_error_handle /*error*/) {
grpc_fd* fd = static_cast<grpc_fd*>(arg);
fd->destroy();
@@ -509,7 +509,7 @@ static bool fd_is_shutdown(grpc_fd* fd) {
}
/* Might be called multiple times */
-static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
+static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
if (fd->read_closure.SetShutdown(GRPC_ERROR_REF(why))) {
if (shutdown(fd->fd, SHUT_RDWR)) {
if (errno != ENOTCONN) {
@@ -537,7 +537,7 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) {
const int epfd = pollset->active_pollable->epfd;
- grpc_core::MutexLock lock(&fd->pollable_mu);
+ grpc_core::MutexLockForGprMu lock(&fd->pollable_mu);
for (size_t i = 0; i < fd->pollset_fds.size(); ++i) {
if (fd->pollset_fds[i] == epfd) {
return true;
@@ -548,7 +548,7 @@ static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) {
static void fd_add_pollset(grpc_fd* fd, grpc_pollset* pollset) {
const int epfd = pollset->active_pollable->epfd;
- grpc_core::MutexLock lock(&fd->pollable_mu);
+ grpc_core::MutexLockForGprMu lock(&fd->pollable_mu);
fd->pollset_fds.push_back(epfd);
}
@@ -556,7 +556,7 @@ static void fd_add_pollset(grpc_fd* fd, grpc_pollset* pollset) {
* Pollable Definitions
*/
-static grpc_error* pollable_create(pollable_type type, pollable** p) {
+static grpc_error_handle pollable_create(pollable_type type, pollable** p) {
*p = nullptr;
int epfd = epoll_create1(EPOLL_CLOEXEC);
@@ -565,7 +565,7 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
}
GRPC_FD_TRACE("Pollable_create: created epfd: %d (type: %d)", epfd, type);
*p = static_cast<pollable*>(gpr_malloc(sizeof(**p)));
- grpc_error* err = grpc_wakeup_fd_init(&(*p)->wakeup);
+ grpc_error_handle err = grpc_wakeup_fd_init(&(*p)->wakeup);
if (err != GRPC_ERROR_NONE) {
GRPC_FD_TRACE(
"Pollable_create: closed epfd: %d (type: %d). wakeupfd_init error",
@@ -609,8 +609,8 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
return GRPC_ERROR_NONE;
}
-static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
- grpc_error* error = GRPC_ERROR_NONE;
+static grpc_error_handle pollable_add_fd(pollable* p, grpc_fd* fd) {
+ grpc_error_handle error = GRPC_ERROR_NONE;
static const char* err_desc = "pollable_add_fd";
const int epfd = p->epfd;
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
@@ -647,7 +647,7 @@ GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
/* Global state management */
-static grpc_error* pollset_global_init(void) {
+static grpc_error_handle pollset_global_init(void) {
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
return pollable_create(PO_EMPTY, &g_empty_pollable);
@@ -681,10 +681,10 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
/* pollset->mu must be held before calling this function,
* pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be
* held */
-static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
+static grpc_error_handle kick_one_worker(grpc_pollset_worker* specific_worker) {
GPR_TIMER_SCOPE("kick_one_worker", 0);
pollable* p = specific_worker->pollable_obj;
- grpc_core::MutexLock lock(&p->mu);
+ grpc_core::MutexLockForGprMu lock(&p->mu);
GPR_ASSERT(specific_worker != nullptr);
if (specific_worker->kicked) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
@@ -708,7 +708,7 @@ static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
gpr_log(GPR_INFO, "PS:%p kicked_specific_via_wakeup_fd", p);
}
specific_worker->kicked = true;
- grpc_error* error = grpc_wakeup_fd_wakeup(&p->wakeup);
+ grpc_error_handle error = grpc_wakeup_fd_wakeup(&p->wakeup);
return error;
}
if (specific_worker->initialized_cv) {
@@ -725,8 +725,8 @@ static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
return GRPC_ERROR_NONE;
}
-static grpc_error* pollset_kick(grpc_pollset* pollset,
- grpc_pollset_worker* specific_worker) {
+static grpc_error_handle pollset_kick(grpc_pollset* pollset,
+ grpc_pollset_worker* specific_worker) {
GPR_TIMER_SCOPE("pollset_kick", 0);
GRPC_STATS_INC_POLLSET_KICK();
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
@@ -777,9 +777,9 @@ static grpc_error* pollset_kick(grpc_pollset* pollset,
}
}
-static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
+static grpc_error_handle pollset_kick_all(grpc_pollset* pollset) {
GPR_TIMER_SCOPE("pollset_kick_all", 0);
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
const char* err_desc = "pollset_kick_all";
grpc_pollset_worker* w = pollset->root_worker;
if (w != nullptr) {
@@ -828,9 +828,9 @@ static void fd_has_errors(grpc_fd* fd) { fd->error_closure.SetReady(); }
*
* Note that if a pollable object is already attached to the fd, it may be of
* either PO_FD or PO_MULTI type */
-static grpc_error* get_fd_pollable(grpc_fd* fd, pollable** p) {
+static grpc_error_handle get_fd_pollable(grpc_fd* fd, pollable** p) {
gpr_mu_lock(&fd->pollable_mu);
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
static const char* err_desc = "get_fd_pollable";
if (fd->pollable_obj == nullptr) {
if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj),
@@ -863,8 +863,9 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
pollset_maybe_finish_shutdown(pollset);
}
-static grpc_error* pollable_process_events(grpc_pollset* pollset,
- pollable* pollable_obj, bool drain) {
+static grpc_error_handle pollable_process_events(grpc_pollset* pollset,
+ pollable* pollable_obj,
+ bool drain) {
GPR_TIMER_SCOPE("pollable_process_events", 0);
static const char* err_desc = "pollset_process_events";
// Use a simple heuristic to determine how many fd events to process
@@ -877,7 +878,7 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
if (handle_count == 0) {
handle_count = 1;
}
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
for (int i = 0; (drain || i < handle_count) &&
pollable_obj->event_cursor != pollable_obj->event_count;
i++) {
@@ -932,7 +933,7 @@ static void pollset_destroy(grpc_pollset* pollset) {
gpr_mu_destroy(&pollset->mu);
}
-static grpc_error* pollable_epoll(pollable* p, grpc_millis deadline) {
+static grpc_error_handle pollable_epoll(pollable* p, grpc_millis deadline) {
GPR_TIMER_SCOPE("pollable_epoll", 0);
int timeout = poll_deadline_to_millis_timeout(deadline);
@@ -1103,9 +1104,9 @@ static long sys_gettid(void) { return syscall(__NR_gettid); }
The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
-static grpc_error* pollset_work(grpc_pollset* pollset,
- grpc_pollset_worker** worker_hdl,
- grpc_millis deadline) {
+static grpc_error_handle pollset_work(grpc_pollset* pollset,
+ grpc_pollset_worker** worker_hdl,
+ grpc_millis deadline) {
GPR_TIMER_SCOPE("pollset_work", 0);
#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
grpc_pollset_worker* worker =
@@ -1126,7 +1127,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
deadline, pollset->kicked_without_poller, pollset->active_pollable);
}
static const char* err_desc = "pollset_work";
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
if (pollset->kicked_without_poller) {
pollset->kicked_without_poller = false;
} else {
@@ -1155,10 +1156,10 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
return error;
}
-static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked(
+static grpc_error_handle pollset_transition_pollable_from_empty_to_fd_locked(
grpc_pollset* pollset, grpc_fd* fd) {
static const char* err_desc = "pollset_transition_pollable_from_empty_to_fd";
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO,
"PS:%p add fd %p (%d); transition pollable from empty to fd",
@@ -1171,10 +1172,10 @@ static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked(
return error;
}
-static grpc_error* pollset_transition_pollable_from_fd_to_multi_locked(
+static grpc_error_handle pollset_transition_pollable_from_fd_to_multi_locked(
grpc_pollset* pollset, grpc_fd* and_add_fd) {
static const char* err_desc = "pollset_transition_pollable_from_fd_to_multi";
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(
GPR_INFO,
@@ -1200,8 +1201,9 @@ static grpc_error* pollset_transition_pollable_from_fd_to_multi_locked(
}
/* expects pollsets locked, flag whether fd is locked or not */
-static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) {
- grpc_error* error = GRPC_ERROR_NONE;
+static grpc_error_handle pollset_add_fd_locked(grpc_pollset* pollset,
+ grpc_fd* fd) {
+ grpc_error_handle error = GRPC_ERROR_NONE;
pollable* po_at_start =
POLLABLE_REF(pollset->active_pollable, "pollset_add_fd");
switch (pollset->active_pollable->type) {
@@ -1236,9 +1238,9 @@ static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) {
return error;
}
-static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset,
- pollable** pollable_obj) {
- grpc_error* error = GRPC_ERROR_NONE;
+static grpc_error_handle pollset_as_multipollable_locked(
+ grpc_pollset* pollset, pollable** pollable_obj) {
+ grpc_error_handle error = GRPC_ERROR_NONE;
pollable* po_at_start =
POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable");
switch (pollset->active_pollable->type) {
@@ -1296,8 +1298,8 @@ static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
return;
}
- grpc_core::MutexLock lock(&pollset->mu);
- grpc_error* error = pollset_add_fd_locked(pollset, fd);
+ grpc_core::MutexLockForGprMu lock(&pollset->mu);
+ grpc_error_handle error = pollset_add_fd_locked(pollset, fd);
// If we are in PO_MULTI mode, we should update the pollsets of the FD.
if (gpr_atm_no_barrier_load(&pollset->active_pollable_type) == PO_MULTI) {
@@ -1354,7 +1356,7 @@ static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd);
}
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
static const char* err_desc = "pollset_set_add_fd";
pss = pss_lock_adam(pss);
for (size_t i = 0; i < pss->pollset_count; i++) {
@@ -1421,13 +1423,14 @@ static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
// add all fds to pollables, and output a new array of unorphaned out_fds
// assumes pollsets are multipollable
-static grpc_error* add_fds_to_pollsets(grpc_fd** fds, size_t fd_count,
- grpc_pollset** pollsets,
- size_t pollset_count,
- const char* err_desc, grpc_fd** out_fds,
- size_t* out_fd_count) {
+static grpc_error_handle add_fds_to_pollsets(grpc_fd** fds, size_t fd_count,
+ grpc_pollset** pollsets,
+ size_t pollset_count,
+ const char* err_desc,
+ grpc_fd** out_fds,
+ size_t* out_fd_count) {
GPR_TIMER_SCOPE("add_fds_to_pollsets", 0);
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
for (size_t i = 0; i < fd_count; i++) {
gpr_mu_lock(&fds[i]->orphan_mu);
if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) {
@@ -1451,7 +1454,7 @@ static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, "PSS:%p: add pollset %p", pss, ps);
}
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
static const char* err_desc = "pollset_set_add_pollset";
pollable* pollable_obj = nullptr;
gpr_mu_lock(&ps->mu);
@@ -1488,7 +1491,7 @@ static void pollset_set_add_pollset_set(grpc_pollset_set* a,
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, "PSS: merge (%p, %p)", a, b);
}
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
static const char* err_desc = "pollset_set_add_fd";
for (;;) {
if (a == b) {
@@ -1573,7 +1576,7 @@ static bool is_any_background_poller_thread(void) { return false; }
static void shutdown_background_closure(void) {}
static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
- grpc_error* /*error*/) {
+ grpc_error_handle /*error*/) {
return false;
}