diff options
Diffstat (limited to 'grpc/src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r-- | grpc/src/core/lib/iomgr/ev_epollex_linux.cc | 101 |
1 files changed, 52 insertions, 49 deletions
diff --git a/grpc/src/core/lib/iomgr/ev_epollex_linux.cc b/grpc/src/core/lib/iomgr/ev_epollex_linux.cc index acd095a4..72d7da76 100644 --- a/grpc/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/grpc/src/core/lib/iomgr/ev_epollex_linux.cc @@ -134,7 +134,7 @@ static std::string pollable_desc(pollable* p) { /// added static pollable* g_empty_pollable; -static grpc_error* pollable_create(pollable_type type, pollable** p); +static grpc_error_handle pollable_create(pollable_type type, pollable** p); static pollable* pollable_ref(pollable* p, const grpc_core::DebugLocation& dbg_loc, const char* reason) { @@ -314,7 +314,7 @@ struct grpc_pollset_set { * Common helpers */ -static bool append_error(grpc_error** composite, grpc_error* error, +static bool append_error(grpc_error_handle* composite, grpc_error_handle error, const char* desc) { if (error == GRPC_ERROR_NONE) return true; if (*composite == GRPC_ERROR_NONE) { @@ -372,7 +372,7 @@ static void ref_by(grpc_fd* fd, int n) { } /* Uninitialize and add to the freelist */ -static void fd_destroy(void* arg, grpc_error* /*error*/) { +static void fd_destroy(void* arg, grpc_error_handle /*error*/) { grpc_fd* fd = static_cast<grpc_fd*>(arg); fd->destroy(); @@ -509,7 +509,7 @@ static bool fd_is_shutdown(grpc_fd* fd) { } /* Might be called multiple times */ -static void fd_shutdown(grpc_fd* fd, grpc_error* why) { +static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) { if (fd->read_closure.SetShutdown(GRPC_ERROR_REF(why))) { if (shutdown(fd->fd, SHUT_RDWR)) { if (errno != ENOTCONN) { @@ -537,7 +537,7 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) { const int epfd = pollset->active_pollable->epfd; - grpc_core::MutexLock lock(&fd->pollable_mu); + grpc_core::MutexLockForGprMu lock(&fd->pollable_mu); for (size_t i = 0; i < fd->pollset_fds.size(); ++i) { if (fd->pollset_fds[i] == epfd) { return true; @@ -548,7 +548,7 @@ static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) { static void fd_add_pollset(grpc_fd* fd, grpc_pollset* pollset) { const int epfd = pollset->active_pollable->epfd; - grpc_core::MutexLock lock(&fd->pollable_mu); + grpc_core::MutexLockForGprMu lock(&fd->pollable_mu); fd->pollset_fds.push_back(epfd); } @@ -556,7 +556,7 @@ static void fd_add_pollset(grpc_fd* fd, grpc_pollset* pollset) { * Pollable Definitions */ -static grpc_error* pollable_create(pollable_type type, pollable** p) { +static grpc_error_handle pollable_create(pollable_type type, pollable** p) { *p = nullptr; int epfd = epoll_create1(EPOLL_CLOEXEC); @@ -565,7 +565,7 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { } GRPC_FD_TRACE("Pollable_create: created epfd: %d (type: %d)", epfd, type); *p = static_cast<pollable*>(gpr_malloc(sizeof(**p))); - grpc_error* err = grpc_wakeup_fd_init(&(*p)->wakeup); + grpc_error_handle err = grpc_wakeup_fd_init(&(*p)->wakeup); if (err != GRPC_ERROR_NONE) { GRPC_FD_TRACE( "Pollable_create: closed epfd: %d (type: %d). wakeupfd_init error", @@ -609,8 +609,8 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { return GRPC_ERROR_NONE; } -static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { - grpc_error* error = GRPC_ERROR_NONE; +static grpc_error_handle pollable_add_fd(pollable* p, grpc_fd* fd) { + grpc_error_handle error = GRPC_ERROR_NONE; static const char* err_desc = "pollable_add_fd"; const int epfd = p->epfd; if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) { @@ -647,7 +647,7 @@ GPR_TLS_DECL(g_current_thread_pollset); GPR_TLS_DECL(g_current_thread_worker); /* Global state management */ -static grpc_error* pollset_global_init(void) { +static grpc_error_handle pollset_global_init(void) { gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_worker); return pollable_create(PO_EMPTY, &g_empty_pollable); @@ -681,10 +681,10 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) { /* pollset->mu must be held before calling this function, * pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be * held */ -static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) { +static grpc_error_handle kick_one_worker(grpc_pollset_worker* specific_worker) { GPR_TIMER_SCOPE("kick_one_worker", 0); pollable* p = specific_worker->pollable_obj; - grpc_core::MutexLock lock(&p->mu); + grpc_core::MutexLockForGprMu lock(&p->mu); GPR_ASSERT(specific_worker != nullptr); if (specific_worker->kicked) { if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) { @@ -708,7 +708,7 @@ static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) { gpr_log(GPR_INFO, "PS:%p kicked_specific_via_wakeup_fd", p); } specific_worker->kicked = true; - grpc_error* error = grpc_wakeup_fd_wakeup(&p->wakeup); + grpc_error_handle error = grpc_wakeup_fd_wakeup(&p->wakeup); return error; } if (specific_worker->initialized_cv) { @@ -725,8 +725,8 @@ static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) { return GRPC_ERROR_NONE; } -static grpc_error* pollset_kick(grpc_pollset* pollset, - grpc_pollset_worker* specific_worker) { +static grpc_error_handle pollset_kick(grpc_pollset* pollset, + grpc_pollset_worker* specific_worker) { GPR_TIMER_SCOPE("pollset_kick", 0); GRPC_STATS_INC_POLLSET_KICK(); if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) { @@ -777,9 +777,9 @@ static grpc_error* pollset_kick(grpc_pollset* pollset, } } -static grpc_error* pollset_kick_all(grpc_pollset* pollset) { +static grpc_error_handle pollset_kick_all(grpc_pollset* pollset) { GPR_TIMER_SCOPE("pollset_kick_all", 0); - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; const char* err_desc = "pollset_kick_all"; grpc_pollset_worker* w = pollset->root_worker; if (w != nullptr) { @@ -828,9 +828,9 @@ static void fd_has_errors(grpc_fd* fd) { fd->error_closure.SetReady(); } * * Note that if a pollable object is already attached to the fd, it may be of * either PO_FD or PO_MULTI type */ -static grpc_error* get_fd_pollable(grpc_fd* fd, pollable** p) { +static grpc_error_handle get_fd_pollable(grpc_fd* fd, pollable** p) { gpr_mu_lock(&fd->pollable_mu); - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; static const char* err_desc = "get_fd_pollable"; if (fd->pollable_obj == nullptr) { if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj), @@ -863,8 +863,9 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { pollset_maybe_finish_shutdown(pollset); } -static grpc_error* pollable_process_events(grpc_pollset* pollset, - pollable* pollable_obj, bool drain) { +static grpc_error_handle pollable_process_events(grpc_pollset* pollset, + pollable* pollable_obj, + bool drain) { GPR_TIMER_SCOPE("pollable_process_events", 0); static const char* err_desc = "pollset_process_events"; // Use a simple heuristic to determine how many fd events to process @@ -877,7 +878,7 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, if (handle_count == 0) { handle_count = 1; } - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; for (int i = 0; (drain || i < handle_count) && pollable_obj->event_cursor != pollable_obj->event_count; i++) { @@ -932,7 +933,7 @@ static void pollset_destroy(grpc_pollset* pollset) { gpr_mu_destroy(&pollset->mu); } -static grpc_error* pollable_epoll(pollable* p, grpc_millis deadline) { +static grpc_error_handle pollable_epoll(pollable* p, grpc_millis deadline) { GPR_TIMER_SCOPE("pollable_epoll", 0); int timeout = poll_deadline_to_millis_timeout(deadline); @@ -1103,9 +1104,9 @@ static long sys_gettid(void) { return syscall(__NR_gettid); } The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ -static grpc_error* pollset_work(grpc_pollset* pollset, - grpc_pollset_worker** worker_hdl, - grpc_millis deadline) { +static grpc_error_handle pollset_work(grpc_pollset* pollset, + grpc_pollset_worker** worker_hdl, + grpc_millis deadline) { GPR_TIMER_SCOPE("pollset_work", 0); #ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP grpc_pollset_worker* worker = @@ -1126,7 +1127,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, deadline, pollset->kicked_without_poller, pollset->active_pollable); } static const char* err_desc = "pollset_work"; - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; if (pollset->kicked_without_poller) { pollset->kicked_without_poller = false; } else { @@ -1155,10 +1156,10 @@ static grpc_error* pollset_work(grpc_pollset* pollset, return error; } -static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked( +static grpc_error_handle pollset_transition_pollable_from_empty_to_fd_locked( grpc_pollset* pollset, grpc_fd* fd) { static const char* err_desc = "pollset_transition_pollable_from_empty_to_fd"; - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) { gpr_log(GPR_INFO, "PS:%p add fd %p (%d); transition pollable from empty to fd", @@ -1171,10 +1172,10 @@ static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked( return error; } -static grpc_error* pollset_transition_pollable_from_fd_to_multi_locked( +static grpc_error_handle pollset_transition_pollable_from_fd_to_multi_locked( grpc_pollset* pollset, grpc_fd* and_add_fd) { static const char* err_desc = "pollset_transition_pollable_from_fd_to_multi"; - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) { gpr_log( GPR_INFO, @@ -1200,8 +1201,9 @@ static grpc_error* pollset_transition_pollable_from_fd_to_multi_locked( } /* expects pollsets locked, flag whether fd is locked or not */ -static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) { - grpc_error* error = GRPC_ERROR_NONE; +static grpc_error_handle pollset_add_fd_locked(grpc_pollset* pollset, + grpc_fd* fd) { + grpc_error_handle error = GRPC_ERROR_NONE; pollable* po_at_start = POLLABLE_REF(pollset->active_pollable, "pollset_add_fd"); switch (pollset->active_pollable->type) { @@ -1236,9 +1238,9 @@ static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) { return error; } -static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset, - pollable** pollable_obj) { - grpc_error* error = GRPC_ERROR_NONE; +static grpc_error_handle pollset_as_multipollable_locked( + grpc_pollset* pollset, pollable** pollable_obj) { + grpc_error_handle error = GRPC_ERROR_NONE; pollable* po_at_start = POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable"); switch (pollset->active_pollable->type) { @@ -1296,8 +1298,8 @@ static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) { return; } - grpc_core::MutexLock lock(&pollset->mu); - grpc_error* error = pollset_add_fd_locked(pollset, fd); + grpc_core::MutexLockForGprMu lock(&pollset->mu); + grpc_error_handle error = pollset_add_fd_locked(pollset, fd); // If we are in PO_MULTI mode, we should update the pollsets of the FD. if (gpr_atm_no_barrier_load(&pollset->active_pollable_type) == PO_MULTI) { @@ -1354,7 +1356,7 @@ static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) { if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) { gpr_log(GPR_INFO, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd); } - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; static const char* err_desc = "pollset_set_add_fd"; pss = pss_lock_adam(pss); for (size_t i = 0; i < pss->pollset_count; i++) { @@ -1421,13 +1423,14 @@ static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) { // add all fds to pollables, and output a new array of unorphaned out_fds // assumes pollsets are multipollable -static grpc_error* add_fds_to_pollsets(grpc_fd** fds, size_t fd_count, - grpc_pollset** pollsets, - size_t pollset_count, - const char* err_desc, grpc_fd** out_fds, - size_t* out_fd_count) { +static grpc_error_handle add_fds_to_pollsets(grpc_fd** fds, size_t fd_count, + grpc_pollset** pollsets, + size_t pollset_count, + const char* err_desc, + grpc_fd** out_fds, + size_t* out_fd_count) { GPR_TIMER_SCOPE("add_fds_to_pollsets", 0); - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; for (size_t i = 0; i < fd_count; i++) { gpr_mu_lock(&fds[i]->orphan_mu); if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) { @@ -1451,7 +1454,7 @@ static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) { if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) { gpr_log(GPR_INFO, "PSS:%p: add pollset %p", pss, ps); } - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; static const char* err_desc = "pollset_set_add_pollset"; pollable* pollable_obj = nullptr; gpr_mu_lock(&ps->mu); @@ -1488,7 +1491,7 @@ static void pollset_set_add_pollset_set(grpc_pollset_set* a, if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) { gpr_log(GPR_INFO, "PSS: merge (%p, %p)", a, b); } - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; static const char* err_desc = "pollset_set_add_fd"; for (;;) { if (a == b) { @@ -1573,7 +1576,7 @@ static bool is_any_background_poller_thread(void) { return false; } static void shutdown_background_closure(void) {} static bool add_closure_to_background_poller(grpc_closure* /*closure*/, - grpc_error* /*error*/) { + grpc_error_handle /*error*/) { return false; } |