summaryrefslogtreecommitdiff
path: root/grpc/src/core/lib/iomgr/tcp_posix.cc
diff options
context:
space:
mode:
Diffstat (limited to 'grpc/src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r--grpc/src/core/lib/iomgr/tcp_posix.cc83
1 files changed, 41 insertions, 42 deletions
diff --git a/grpc/src/core/lib/iomgr/tcp_posix.cc b/grpc/src/core/lib/iomgr/tcp_posix.cc
index 7813e044..1f28b773 100644
--- a/grpc/src/core/lib/iomgr/tcp_posix.cc
+++ b/grpc/src/core/lib/iomgr/tcp_posix.cc
@@ -45,6 +45,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
@@ -54,7 +55,6 @@
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
-#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -435,12 +435,12 @@ static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp);
static gpr_atm g_uncovered_notifications_pending;
static gpr_atm g_backup_poller; /* backup_poller* */
-static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error);
-static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error);
+static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error);
+static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error);
static void tcp_drop_uncovered_then_handle_write(void* arg /* grpc_tcp */,
- grpc_error* error);
+ grpc_error_handle error);
-static void done_poller(void* bp, grpc_error* /*error_ignored*/) {
+static void done_poller(void* bp, grpc_error_handle /*error_ignored*/) {
backup_poller* p = static_cast<backup_poller*>(bp);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "BACKUP_POLLER:%p destroy", p);
@@ -449,7 +449,7 @@ static void done_poller(void* bp, grpc_error* /*error_ignored*/) {
gpr_free(p);
}
-static void run_poller(void* bp, grpc_error* /*error_ignored*/) {
+static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) {
backup_poller* p = static_cast<backup_poller*>(bp);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "BACKUP_POLLER:%p run", p);
@@ -560,9 +560,11 @@ static void notify_on_write(grpc_tcp* tcp) {
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure);
}
-static void tcp_drop_uncovered_then_handle_write(void* arg, grpc_error* error) {
+static void tcp_drop_uncovered_then_handle_write(void* arg,
+ grpc_error_handle error) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
- gpr_log(GPR_INFO, "TCP:%p got_write: %s", arg, grpc_error_string(error));
+ gpr_log(GPR_INFO, "TCP:%p got_write: %s", arg,
+ grpc_error_std_string(error).c_str());
}
drop_uncovered(static_cast<grpc_tcp*>(arg));
tcp_handle_write(arg, error);
@@ -604,7 +606,8 @@ static size_t get_target_read_size(grpc_tcp* tcp) {
return sz;
}
-static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) {
+static grpc_error_handle tcp_annotate_error(grpc_error_handle src_error,
+ grpc_tcp* tcp) {
return grpc_error_set_str(
grpc_error_set_int(
grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd),
@@ -615,10 +618,10 @@ static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) {
grpc_slice_from_copied_string(tcp->peer_string.c_str()));
}
-static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error);
-static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error);
+static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error);
+static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error);
-static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) {
+static void tcp_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
ZerocopyDisableAndWaitForRemaining(tcp);
grpc_fd_shutdown(tcp->em_fd, why);
@@ -678,16 +681,14 @@ static void tcp_destroy(grpc_endpoint* ep) {
TCP_UNREF(tcp, "destroy");
}
-static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
+static void call_read_cb(grpc_tcp* tcp, grpc_error_handle error) {
grpc_closure* cb = tcp->read_cb;
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
size_t i;
- const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "READ %p (peer=%s) error=%s", tcp,
- tcp->peer_string.c_str(), str);
-
+ tcp->peer_string.c_str(), grpc_error_std_string(error).c_str());
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
for (i = 0; i < tcp->incoming_buffer->count; i++) {
char* dump = grpc_dump_slice(tcp->incoming_buffer->slices[i],
@@ -850,11 +851,11 @@ static void tcp_do_read(grpc_tcp* tcp) {
TCP_UNREF(tcp, "read");
}
-static void tcp_read_allocation_done(void* tcpp, grpc_error* error) {
+static void tcp_read_allocation_done(void* tcpp, grpc_error_handle error) {
grpc_tcp* tcp = static_cast<grpc_tcp*>(tcpp);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP:%p read_allocation_done: %s", tcp,
- grpc_error_string(error));
+ grpc_error_std_string(error).c_str());
}
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
@@ -887,10 +888,11 @@ static void tcp_continue_read(grpc_tcp* tcp) {
tcp_do_read(tcp);
}
-static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) {
+static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error) {
grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
- gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, grpc_error_string(error));
+ gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp,
+ grpc_error_std_string(error).c_str());
}
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
@@ -958,7 +960,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
int additional_flags = 0);
/** The callback function to be invoked when we get an error on the socket. */
-static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error_handle error);
static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
grpc_tcp* tcp, grpc_slice_buffer* buf);
@@ -1213,10 +1215,12 @@ static bool process_errors(grpc_tcp* tcp) {
}
}
-static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
+static void tcp_handle_error(void* arg /* grpc_tcp */,
+ grpc_error_handle error) {
grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
- gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error));
+ gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp,
+ grpc_error_std_string(error).c_str());
}
if (error != GRPC_ERROR_NONE ||
@@ -1241,11 +1245,11 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
#else /* GRPC_LINUX_ERRQUEUE */
static TcpZerocopySendRecord* tcp_get_send_zerocopy_record(
- grpc_tcp* tcp, grpc_slice_buffer* buf) {
+ grpc_tcp* /*tcp*/, grpc_slice_buffer* /*buf*/) {
return nullptr;
}
-static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) {}
+static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* /*tcp*/) {}
static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/,
size_t /*sending_length*/,
@@ -1257,7 +1261,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/,
}
static void tcp_handle_error(void* /*arg*/ /* grpc_tcp */,
- grpc_error* /*error*/) {
+ grpc_error_handle /*error*/) {
gpr_log(GPR_ERROR, "Error handling is not supported for this platform");
GPR_ASSERT(0);
}
@@ -1323,7 +1327,7 @@ void TcpZerocopySendRecord::UpdateOffsetForBytesSent(size_t sending_length,
// returns true if done, false if pending; if returning true, *error is set
static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
- grpc_error** error) {
+ grpc_error_handle* error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
msg_iovlen_type iov_size;
@@ -1391,15 +1395,15 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
static void UnrefMaybePutZerocopySendRecord(grpc_tcp* tcp,
TcpZerocopySendRecord* record,
- uint32_t seq,
- const char* /* tag */) {
+ uint32_t /*seq*/,
+ const char* /*tag*/) {
if (record->Unref()) {
tcp->tcp_zerocopy_send_ctx.PutSendRecord(record);
}
}
static bool tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
- grpc_error** error) {
+ grpc_error_handle* error) {
bool done = do_tcp_flush_zerocopy(tcp, record, error);
if (done) {
// Either we encountered an error, or we successfully sent all the bytes.
@@ -1409,7 +1413,7 @@ static bool tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
return done;
}
-static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
+static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
msg_iovlen_type iov_size;
@@ -1516,7 +1520,8 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
}
}
-static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
+static void tcp_handle_write(void* arg /* grpc_tcp */,
+ grpc_error_handle error) {
grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
grpc_closure* cb;
@@ -1549,8 +1554,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
tcp->write_cb = nullptr;
tcp->current_zerocopy_send = nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
- const char* str = grpc_error_string(error);
- gpr_log(GPR_INFO, "write: %s", str);
+ gpr_log(GPR_INFO, "write: %s", grpc_error_std_string(error).c_str());
}
// No need to take a ref on error since tcp_flush provides a ref.
grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
@@ -1562,7 +1566,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
grpc_closure* cb, void* arg) {
GPR_TIMER_SCOPE("tcp_write", 0);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
TcpZerocopySendRecord* zerocopy_send_record = nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
@@ -1618,8 +1622,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
notify_on_write(tcp);
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
- const char* str = grpc_error_string(error);
- gpr_log(GPR_INFO, "write: %s", str);
+ gpr_log(GPR_INFO, "write: %s", grpc_error_std_string(error).c_str());
}
grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
}
@@ -1639,7 +1642,6 @@ static void tcp_add_to_pollset_set(grpc_endpoint* ep,
static void tcp_delete_from_pollset_set(grpc_endpoint* ep,
grpc_pollset_set* pollset_set) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
- ZerocopyDisableAndWaitForRemaining(tcp);
grpc_pollset_set_del_fd(pollset_set, tcp->em_fd);
}
@@ -1673,10 +1675,7 @@ static bool tcp_can_track_err(grpc_endpoint* ep) {
if (getsockname(tcp->fd, &addr, &len) < 0) {
return false;
}
- if (addr.sa_family == AF_INET || addr.sa_family == AF_INET6) {
- return true;
- }
- return false;
+ return addr.sa_family == AF_INET || addr.sa_family == AF_INET6;
}
static const grpc_endpoint_vtable vtable = {tcp_read,