diff options
Diffstat (limited to 'grpc/src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r-- | grpc/src/core/lib/iomgr/tcp_posix.cc | 83 |
1 files changed, 41 insertions, 42 deletions
diff --git a/grpc/src/core/lib/iomgr/tcp_posix.cc b/grpc/src/core/lib/iomgr/tcp_posix.cc index 7813e044..1f28b773 100644 --- a/grpc/src/core/lib/iomgr/tcp_posix.cc +++ b/grpc/src/core/lib/iomgr/tcp_posix.cc @@ -45,6 +45,7 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" @@ -54,7 +55,6 @@ #include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" -#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -435,12 +435,12 @@ static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp); static gpr_atm g_uncovered_notifications_pending; static gpr_atm g_backup_poller; /* backup_poller* */ -static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error); -static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error); +static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error); +static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error); static void tcp_drop_uncovered_then_handle_write(void* arg /* grpc_tcp */, - grpc_error* error); + grpc_error_handle error); -static void done_poller(void* bp, grpc_error* /*error_ignored*/) { +static void done_poller(void* bp, grpc_error_handle /*error_ignored*/) { backup_poller* p = static_cast<backup_poller*>(bp); if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "BACKUP_POLLER:%p destroy", p); @@ -449,7 +449,7 @@ static void done_poller(void* bp, grpc_error* /*error_ignored*/) { gpr_free(p); } -static void run_poller(void* bp, grpc_error* /*error_ignored*/) { +static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) { backup_poller* p = static_cast<backup_poller*>(bp); if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "BACKUP_POLLER:%p run", p); @@ -560,9 +560,11 @@ static void notify_on_write(grpc_tcp* tcp) { grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure); } -static void tcp_drop_uncovered_then_handle_write(void* arg, grpc_error* error) { +static void tcp_drop_uncovered_then_handle_write(void* arg, + grpc_error_handle error) { if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { - gpr_log(GPR_INFO, "TCP:%p got_write: %s", arg, grpc_error_string(error)); + gpr_log(GPR_INFO, "TCP:%p got_write: %s", arg, + grpc_error_std_string(error).c_str()); } drop_uncovered(static_cast<grpc_tcp*>(arg)); tcp_handle_write(arg, error); @@ -604,7 +606,8 @@ static size_t get_target_read_size(grpc_tcp* tcp) { return sz; } -static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) { +static grpc_error_handle tcp_annotate_error(grpc_error_handle src_error, + grpc_tcp* tcp) { return grpc_error_set_str( grpc_error_set_int( grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd), @@ -615,10 +618,10 @@ static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) { grpc_slice_from_copied_string(tcp->peer_string.c_str())); } -static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error); -static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error); +static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error); +static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error); -static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) { +static void tcp_shutdown(grpc_endpoint* ep, grpc_error_handle why) { grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); ZerocopyDisableAndWaitForRemaining(tcp); grpc_fd_shutdown(tcp->em_fd, why); @@ -678,16 +681,14 @@ static void tcp_destroy(grpc_endpoint* ep) { TCP_UNREF(tcp, "destroy"); } -static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { +static void call_read_cb(grpc_tcp* tcp, grpc_error_handle error) { grpc_closure* cb = tcp->read_cb; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); size_t i; - const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "READ %p (peer=%s) error=%s", tcp, - tcp->peer_string.c_str(), str); - + tcp->peer_string.c_str(), grpc_error_std_string(error).c_str()); if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { for (i = 0; i < tcp->incoming_buffer->count; i++) { char* dump = grpc_dump_slice(tcp->incoming_buffer->slices[i], @@ -850,11 +851,11 @@ static void tcp_do_read(grpc_tcp* tcp) { TCP_UNREF(tcp, "read"); } -static void tcp_read_allocation_done(void* tcpp, grpc_error* error) { +static void tcp_read_allocation_done(void* tcpp, grpc_error_handle error) { grpc_tcp* tcp = static_cast<grpc_tcp*>(tcpp); if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { gpr_log(GPR_INFO, "TCP:%p read_allocation_done: %s", tcp, - grpc_error_string(error)); + grpc_error_std_string(error).c_str()); } if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); @@ -887,10 +888,11 @@ static void tcp_continue_read(grpc_tcp* tcp) { tcp_do_read(tcp); } -static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) { +static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) { grpc_tcp* tcp = static_cast<grpc_tcp*>(arg); if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { - gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, grpc_error_string(error)); + gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, + grpc_error_std_string(error).c_str()); } if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { @@ -958,7 +960,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, int additional_flags = 0); /** The callback function to be invoked when we get an error on the socket. */ -static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); +static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error_handle error); static TcpZerocopySendRecord* tcp_get_send_zerocopy_record( grpc_tcp* tcp, grpc_slice_buffer* buf); @@ -1213,10 +1215,12 @@ static bool process_errors(grpc_tcp* tcp) { } } -static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { +static void tcp_handle_error(void* arg /* grpc_tcp */, + grpc_error_handle error) { grpc_tcp* tcp = static_cast<grpc_tcp*>(arg); if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { - gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error)); + gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, + grpc_error_std_string(error).c_str()); } if (error != GRPC_ERROR_NONE || @@ -1241,11 +1245,11 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { #else /* GRPC_LINUX_ERRQUEUE */ static TcpZerocopySendRecord* tcp_get_send_zerocopy_record( - grpc_tcp* tcp, grpc_slice_buffer* buf) { + grpc_tcp* /*tcp*/, grpc_slice_buffer* /*buf*/) { return nullptr; } -static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) {} +static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* /*tcp*/) {} static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/, size_t /*sending_length*/, @@ -1257,7 +1261,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/, } static void tcp_handle_error(void* /*arg*/ /* grpc_tcp */, - grpc_error* /*error*/) { + grpc_error_handle /*error*/) { gpr_log(GPR_ERROR, "Error handling is not supported for this platform"); GPR_ASSERT(0); } @@ -1323,7 +1327,7 @@ void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length, // returns true if done, false if pending; if returning true, *error is set static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, - grpc_error** error) { + grpc_error_handle* error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; @@ -1391,15 +1395,15 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp, TcpZerocopySendRecord* record, - uint32_t seq, - const char* /* tag */) { + uint32_t /*seq*/, + const char* /*tag*/) { if (record->Unref()) { tcp->tcp_zerocopy_send_ctx.PutSendRecord(record); } } static bool tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, - grpc_error** error) { + grpc_error_handle* error) { bool done = do_tcp_flush_zerocopy(tcp, record, error); if (done) { // Either we encountered an error, or we successfully sent all the bytes. @@ -1409,7 +1413,7 @@ static bool tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record, return done; } -static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { +static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; @@ -1516,7 +1520,8 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { } } -static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { +static void tcp_handle_write(void* arg /* grpc_tcp */, + grpc_error_handle error) { grpc_tcp* tcp = static_cast<grpc_tcp*>(arg); grpc_closure* cb; @@ -1549,8 +1554,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { tcp->write_cb = nullptr; tcp->current_zerocopy_send = nullptr; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { - const char* str = grpc_error_string(error); - gpr_log(GPR_INFO, "write: %s", str); + gpr_log(GPR_INFO, "write: %s", grpc_error_std_string(error).c_str()); } // No need to take a ref on error since tcp_flush provides a ref. grpc_core::Closure::Run(DEBUG_LOCATION, cb, error); @@ -1562,7 +1566,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, grpc_closure* cb, void* arg) { GPR_TIMER_SCOPE("tcp_write", 0); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); - grpc_error* error = GRPC_ERROR_NONE; + grpc_error_handle error = GRPC_ERROR_NONE; TcpZerocopySendRecord* zerocopy_send_record = nullptr; if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { @@ -1618,8 +1622,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, notify_on_write(tcp); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { - const char* str = grpc_error_string(error); - gpr_log(GPR_INFO, "write: %s", str); + gpr_log(GPR_INFO, "write: %s", grpc_error_std_string(error).c_str()); } grpc_core::Closure::Run(DEBUG_LOCATION, cb, error); } @@ -1639,7 +1642,6 @@ static void tcp_add_to_pollset_set(grpc_endpoint* ep, static void tcp_delete_from_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset_set) { grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); - ZerocopyDisableAndWaitForRemaining(tcp); grpc_pollset_set_del_fd(pollset_set, tcp->em_fd); } @@ -1673,10 +1675,7 @@ static bool tcp_can_track_err(grpc_endpoint* ep) { if (getsockname(tcp->fd, &addr, &len) < 0) { return false; } - if (addr.sa_family == AF_INET || addr.sa_family == AF_INET6) { - return true; - } - return false; + return addr.sa_family == AF_INET || addr.sa_family == AF_INET6; } static const grpc_endpoint_vtable vtable = {tcp_read, |