aboutsummaryrefslogtreecommitdiff
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/buffer_list.cc134
-rw-r--r--src/core/lib/iomgr/buffer_list.h96
-rw-r--r--src/core/lib/iomgr/endpoint.cc4
-rw-r--r--src/core/lib/iomgr/endpoint.h8
-rw-r--r--src/core/lib/iomgr/endpoint_cfstream.cc2
-rw-r--r--src/core/lib/iomgr/endpoint_pair_posix.cc4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc72
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc3
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc114
-rw-r--r--src/core/lib/iomgr/ev_posix.cc70
-rw-r--r--src/core/lib/iomgr/ev_posix.h10
-rw-r--r--src/core/lib/iomgr/exec_ctx.cc6
-rw-r--r--src/core/lib/iomgr/exec_ctx.h2
-rw-r--r--src/core/lib/iomgr/fork_posix.cc13
-rw-r--r--src/core/lib/iomgr/internal_errqueue.cc36
-rw-r--r--src/core/lib/iomgr/internal_errqueue.h83
-rw-r--r--src/core/lib/iomgr/port.h17
-rw-r--r--src/core/lib/iomgr/resource_quota.cc78
-rw-r--r--src/core/lib/iomgr/resource_quota.h16
-rw-r--r--src/core/lib/iomgr/socket_mutator.cc2
-rw-r--r--src/core/lib/iomgr/socket_mutator.h2
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_custom.cc2
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc312
-rw-r--r--src/core/lib/iomgr/tcp_posix.h3
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc4
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc2
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc2
-rw-r--r--src/core/lib/iomgr/timer.h5
-rw-r--r--src/core/lib/iomgr/udp_server.cc2
30 files changed, 1056 insertions, 50 deletions
diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc
new file mode 100644
index 0000000000..6ada23db1c
--- /dev/null
+++ b/src/core/lib/iomgr/buffer_list.cc
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/buffer_list.h"
+#include "src/core/lib/iomgr/port.h"
+
+#include <grpc/support/log.h>
+
+#ifdef GRPC_LINUX_ERRQUEUE
+#include <time.h>
+
+#include "src/core/lib/gprpp/memory.h"
+
+namespace grpc_core {
+void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
+ void* arg) {
+ GPR_DEBUG_ASSERT(head != nullptr);
+ TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
+ /* Store the current time as the sendmsg time. */
+ new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME);
+ if (*head == nullptr) {
+ *head = new_elem;
+ return;
+ }
+ /* Append at the end. */
+ TracedBuffer* ptr = *head;
+ while (ptr->next_ != nullptr) {
+ ptr = ptr->next_;
+ }
+ ptr->next_ = new_elem;
+}
+
+namespace {
+/** Fills gpr_timespec gts based on values from timespec ts */
+void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) {
+ gts->tv_sec = ts->tv_sec;
+ gts->tv_nsec = static_cast<int32_t>(ts->tv_nsec);
+ gts->clock_type = GPR_CLOCK_REALTIME;
+}
+
+/** The saved callback function that will be invoked when we get all the
+ * timestamps that we are going to get for a TracedBuffer. */
+void (*timestamps_callback)(void*, grpc_core::Timestamps*,
+ grpc_error* shutdown_err);
+} /* namespace */
+
+void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
+ struct sock_extended_err* serr,
+ struct scm_timestamping* tss) {
+ GPR_DEBUG_ASSERT(head != nullptr);
+ TracedBuffer* elem = *head;
+ TracedBuffer* next = nullptr;
+ while (elem != nullptr) {
+ /* The byte number refers to the sequence number of the last byte which this
+ * timestamp relates to. */
+ if (serr->ee_data >= elem->seq_no_) {
+ switch (serr->ee_info) {
+ case SCM_TSTAMP_SCHED:
+ fill_gpr_from_timestamp(&(elem->ts_.scheduled_time), &(tss->ts[0]));
+ elem = elem->next_;
+ break;
+ case SCM_TSTAMP_SND:
+ fill_gpr_from_timestamp(&(elem->ts_.sent_time), &(tss->ts[0]));
+ elem = elem->next_;
+ break;
+ case SCM_TSTAMP_ACK:
+ fill_gpr_from_timestamp(&(elem->ts_.acked_time), &(tss->ts[0]));
+ /* Got all timestamps. Do the callback and free this TracedBuffer.
+ * The thing below can be passed by value if we don't want the
+ * restriction on the lifetime. */
+ timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE);
+ next = elem->next_;
+ Delete<TracedBuffer>(elem);
+ *head = elem = next;
+ break;
+ default:
+ abort();
+ }
+ } else {
+ break;
+ }
+ }
+}
+
+void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) {
+ GPR_DEBUG_ASSERT(head != nullptr);
+ TracedBuffer* elem = *head;
+ while (elem != nullptr) {
+ if (timestamps_callback) {
+ timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
+ }
+ auto* next = elem->next_;
+ Delete<TracedBuffer>(elem);
+ elem = next;
+ }
+ *head = nullptr;
+ GRPC_ERROR_UNREF(shutdown_err);
+}
+
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+ grpc_core::Timestamps*,
+ grpc_error* error)) {
+ timestamps_callback = fn;
+}
+} /* namespace grpc_core */
+
+#else /* GRPC_LINUX_ERRQUEUE */
+
+namespace grpc_core {
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+ grpc_core::Timestamps*,
+ grpc_error* error)) {
+ gpr_log(GPR_DEBUG, "Timestamps callback is not enabled for this platform");
+}
+} /* namespace grpc_core */
+
+#endif /* GRPC_LINUX_ERRQUEUE */
diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h
new file mode 100644
index 0000000000..cbbf50a657
--- /dev/null
+++ b/src/core/lib/iomgr/buffer_list.h
@@ -0,0 +1,96 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H
+#define GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include <grpc/support/time.h>
+
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/internal_errqueue.h"
+
+namespace grpc_core {
+struct Timestamps {
+ /* TODO(yashykt): This would also need to store OPTSTAT once support is added
+ */
+ gpr_timespec sendmsg_time;
+ gpr_timespec scheduled_time;
+ gpr_timespec sent_time;
+ gpr_timespec acked_time;
+};
+
+/** TracedBuffer is a class to keep track of timestamps for a specific buffer in
+ * the TCP layer. We are only tracking timestamps for Linux kernels and hence
+ * this class would only be used by Linux platforms. For all other platforms,
+ * TracedBuffer would be an empty class.
+ *
+ * The timestamps collected are according to grpc_core::Timestamps declared
+ * above.
+ *
+ * A TracedBuffer list is kept track of using the head element of the list. If
+ * the head element of the list is nullptr, then the list is empty.
+ */
+#ifdef GRPC_LINUX_ERRQUEUE
+class TracedBuffer {
+ public:
+ /** Add a new entry in the TracedBuffer list pointed to by head. Also saves
+ * sendmsg_time with the current timestamp. */
+ static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no,
+ void* arg);
+
+ /** Processes a received timestamp based on sock_extended_err and
+ * scm_timestamping structures. It will invoke the timestamps callback if the
+ * timestamp type is SCM_TSTAMP_ACK. */
+ static void ProcessTimestamp(grpc_core::TracedBuffer** head,
+ struct sock_extended_err* serr,
+ struct scm_timestamping* tss);
+
+ /** Cleans the list by calling the callback for each traced buffer in the list
+ * with timestamps that it has. */
+ static void Shutdown(grpc_core::TracedBuffer** head,
+ grpc_error* shutdown_err);
+
+ private:
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
+
+ TracedBuffer(int seq_no, void* arg)
+ : seq_no_(seq_no), arg_(arg), next_(nullptr) {}
+
+ uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
+ void* arg_; /* The arg to pass to timestamps_callback */
+ grpc_core::Timestamps ts_; /* The timestamps corresponding to this buffer */
+ grpc_core::TracedBuffer* next_; /* The next TracedBuffer in the list */
+};
+#else /* GRPC_LINUX_ERRQUEUE */
+class TracedBuffer {};
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+/** Sets the callback function to call when timestamps for a write are
+ * collected. The callback does not own a reference to error. */
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+ grpc_core::Timestamps*,
+ grpc_error* error));
+
+}; /* namespace grpc_core */
+
+#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H */
diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc
index 92e7930111..44fb47e19d 100644
--- a/src/core/lib/iomgr/endpoint.cc
+++ b/src/core/lib/iomgr/endpoint.cc
@@ -28,8 +28,8 @@ void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
- ep->vtable->write(ep, slices, cb);
+ grpc_closure* cb, void* arg) {
+ ep->vtable->write(ep, slices, cb, arg);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index 15db1649fa..1f590a80ca 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -33,10 +33,12 @@
typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
+class Timestamps;
struct grpc_endpoint_vtable {
void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
- void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
+ void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
+ void* arg);
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
@@ -70,9 +72,11 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep);
\a slices may be mutated at will by the endpoint until cb is called.
No guarantee is made to the content of slices after a write EXCEPT that
it is a valid slice buffer.
+ \a arg is platform specific. It is currently only used by TCP on linux
+ platforms as an argument that would be forwarded to the timestamps callback.
*/
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb);
+ grpc_closure* cb, void* arg);
/* Causes any pending and future read/write callbacks to run immediately with
success==0 */
diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc
index c3bc0cc8fd..df2cf508c8 100644
--- a/src/core/lib/iomgr/endpoint_cfstream.cc
+++ b/src/core/lib/iomgr/endpoint_cfstream.cc
@@ -268,7 +268,7 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc
index 5c5c246f99..3afbfd7254 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.cc
+++ b/src/core/lib/iomgr/endpoint_pair_posix.cc
@@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
grpc_core::ExecCtx exec_ctx;
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args,
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args,
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args,
"socketpair-client");
gpr_free(final_name);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 66e0f1fd6d..aa5016bd8f 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -131,6 +131,13 @@ static void epoll_set_shutdown() {
* Fd Declarations
*/
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+struct grpc_fork_fd_list {
+ grpc_fd* fd;
+ grpc_fd* next;
+ grpc_fd* prev;
+};
+
struct grpc_fd {
int fd;
@@ -141,6 +148,9 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
grpc_iomgr_object iomgr_object;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
};
static void fd_global_init(void);
@@ -256,6 +266,10 @@ static bool append_error(grpc_error** composite, grpc_error* error,
static grpc_fd* fd_freelist = nullptr;
static gpr_mu fd_freelist_mu;
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+static grpc_fd* fork_fd_list_head = nullptr;
+static gpr_mu fork_fd_list_mu;
+
static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
static void fd_global_shutdown(void) {
@@ -269,6 +283,38 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
+static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->next = fork_fd_list_head;
+ fd->fork_fd_list->prev = nullptr;
+ if (fork_fd_list_head != nullptr) {
+ fork_fd_list_head->fork_fd_list->prev = fd;
+ }
+ fork_fd_list_head = fd;
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
+static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ if (fork_fd_list_head == fd) {
+ fork_fd_list_head = fd->fork_fd_list->next;
+ }
+ if (fd->fork_fd_list->prev != nullptr) {
+ fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
+ }
+ if (fd->fork_fd_list->next != nullptr) {
+ fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
+ }
+ gpr_free(fd->fork_fd_list);
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
@@ -295,6 +341,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
char* fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
+ fork_fd_list_add_grpc_fd(new_fd);
#ifndef NDEBUG
if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
@@ -361,6 +408,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object);
+ fork_fd_list_remove_grpc_fd(fd);
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
fd->error_closure->DestroyEvent();
@@ -1190,6 +1238,10 @@ static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
epoll_set_shutdown();
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_destroy(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
+ }
}
static const grpc_event_engine_vtable vtable = {
@@ -1227,6 +1279,21 @@ static const grpc_event_engine_vtable vtable = {
shutdown_engine,
};
+/* Called by the child process's post-fork handler to close open fds, including
+ * the global epoll fd. This allows gRPC to shutdown in the child process
+ * without interfering with connections or RPCs ongoing in the parent. */
+static void reset_event_manager_on_fork() {
+ gpr_mu_lock(&fork_fd_list_mu);
+ while (fork_fd_list_head != nullptr) {
+ close(fork_fd_list_head->fd);
+ fork_fd_list_head->fd = -1;
+ fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
+ }
+ gpr_mu_unlock(&fork_fd_list_mu);
+ shutdown_engine();
+ grpc_init_epoll1_linux(true);
+}
+
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
* support is available */
@@ -1248,6 +1315,11 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
return nullptr;
}
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_init(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(
+ reset_event_manager_on_fork);
+ }
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 96eae30345..b082634af1 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -46,6 +46,7 @@
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/is_epollexclusive_available.h"
@@ -735,7 +736,7 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
GPR_TIMER_SCOPE("kick_one_worker", 0);
pollable* p = specific_worker->pollable_obj;
- grpc_core::mu_guard lock(&p->mu);
+ grpc_core::MutexLock lock(&p->mu);
GPR_ASSERT(specific_worker != nullptr);
if (specific_worker->kicked) {
if (grpc_polling_trace.enabled()) {
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index fb4c71ef71..16562538a6 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -60,6 +60,19 @@ typedef struct grpc_fd_watcher {
grpc_fd* fd;
} grpc_fd_watcher;
+typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
+
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+struct grpc_fork_fd_list {
+ /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
+ set to nullptr. */
+ grpc_fd* fd;
+ grpc_cached_wakeup_fd* cached_wakeup_fd;
+
+ grpc_fork_fd_list* next;
+ grpc_fork_fd_list* prev;
+};
+
struct grpc_fd {
int fd;
/* refst format:
@@ -108,8 +121,18 @@ struct grpc_fd {
grpc_closure* on_done_closure;
grpc_iomgr_object iomgr_object;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
};
+/* True when GRPC_ENABLE_FORK_SUPPORT=1. We do not support fork with poll-cv */
+static bool track_fds_for_fork = false;
+
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+static grpc_fork_fd_list* fork_fd_list_head = nullptr;
+static gpr_mu fork_fd_list_mu;
+
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that
@@ -156,6 +179,9 @@ static void fd_unref(grpc_fd* fd);
typedef struct grpc_cached_wakeup_fd {
grpc_wakeup_fd fd;
struct grpc_cached_wakeup_fd* next;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
} grpc_cached_wakeup_fd;
struct grpc_pollset_worker {
@@ -281,9 +307,61 @@ poll_hash_table poll_cache;
grpc_cv_fd_table g_cvfds;
/*******************************************************************************
- * fd_posix.c
+ * functions to track opened fds. No-ops unless track_fds_for_fork is true.
*/
+static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
+ if (track_fds_for_fork) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ if (fork_fd_list_head == node) {
+ fork_fd_list_head = node->next;
+ }
+ if (node->prev != nullptr) {
+ node->prev->next = node->next;
+ }
+ if (node->next != nullptr) {
+ node->next->prev = node->prev;
+ }
+ gpr_free(node);
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
+static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ node->next = fork_fd_list_head;
+ node->prev = nullptr;
+ if (fork_fd_list_head != nullptr) {
+ fork_fd_list_head->prev = node;
+ }
+ fork_fd_list_head = node;
+ gpr_mu_unlock(&fork_fd_list_mu);
+}
+
+static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
+ if (track_fds_for_fork) {
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->fd = fd;
+ fd->fork_fd_list->cached_wakeup_fd = nullptr;
+ fork_fd_list_add_node(fd->fork_fd_list);
+ }
+}
+
+static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
+ if (track_fds_for_fork) {
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->cached_wakeup_fd = fd;
+ fd->fork_fd_list->fd = nullptr;
+ fork_fd_list_add_node(fd->fork_fd_list);
+ }
+}
+
+ /*******************************************************************************
+ * fd_posix.c
+ */
+
#ifndef NDEBUG
#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
@@ -319,6 +397,7 @@ static void unref_by(grpc_fd* fd, int n) {
if (old == n) {
gpr_mu_destroy(&fd->mu);
grpc_iomgr_unregister_object(&fd->iomgr_object);
+ fork_fd_list_remove_node(fd->fork_fd_list);
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_free(fd);
} else {
@@ -347,6 +426,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
gpr_asprintf(&name2, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&r->iomgr_object, name2);
gpr_free(name2);
+ fork_fd_list_add_grpc_fd(r);
return r;
}
@@ -822,6 +902,7 @@ static void pollset_destroy(grpc_pollset* pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
while (pollset->local_wakeup_cache) {
grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
+ fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
gpr_free(pollset->local_wakeup_cache);
pollset->local_wakeup_cache = next;
@@ -895,6 +976,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
gpr_malloc(sizeof(*worker.wakeup_fd)));
error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
if (error != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
return error;
@@ -1705,6 +1787,10 @@ static void shutdown_engine(void) {
if (grpc_cv_wakeup_fds_enabled()) {
global_cv_fd_table_shutdown();
}
+ if (track_fds_for_fork) {
+ gpr_mu_destroy(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
+ }
}
static const grpc_event_engine_vtable vtable = {
@@ -1742,6 +1828,26 @@ static const grpc_event_engine_vtable vtable = {
shutdown_engine,
};
+/* Called by the child process's post-fork handler to close open fds, including
+ * worker wakeup fds. This allows gRPC to shutdown in the child process without
+ * interfering with connections or RPCs ongoing in the parent. */
+static void reset_event_manager_on_fork() {
+ gpr_mu_lock(&fork_fd_list_mu);
+ while (fork_fd_list_head != nullptr) {
+ if (fork_fd_list_head->fd != nullptr) {
+ close(fork_fd_list_head->fd->fd);
+ fork_fd_list_head->fd->fd = -1;
+ } else {
+ close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
+ fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
+ close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
+ fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
+ }
+ fork_fd_list_head = fork_fd_list_head->next;
+ }
+ gpr_mu_unlock(&fork_fd_list_mu);
+}
+
const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!grpc_has_wakeup_fd()) {
gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
@@ -1750,6 +1856,12 @@ const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
return nullptr;
}
+ if (grpc_core::Fork::Enabled()) {
+ track_fds_for_fork = true;
+ gpr_mu_init(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(
+ reset_event_manager_on_fork);
+ }
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 0e45fc42ca..d4377e2d50 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -59,7 +59,14 @@ grpc_core::DebugOnlyTraceFlag grpc_polling_api_trace(false, "polling_api");
/** Default poll() function - a pointer so that it can be overridden by some
* tests */
+#ifndef GPR_AIX
grpc_poll_function_type grpc_poll_function = poll;
+#else
+int aix_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
+ return poll(fds, nfds, timeout);
+}
+grpc_poll_function_type grpc_poll_function = aix_poll;
+#endif
grpc_wakeup_fd grpc_global_wakeup_fd;
@@ -101,10 +108,28 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
}
} // namespace
-static const event_engine_factory g_factories[] = {
+#define ENGINE_HEAD_CUSTOM "head_custom"
+#define ENGINE_TAIL_CUSTOM "tail_custom"
+
+// The global array of event-engine factories. Each entry is a pair with a name
+// and an event-engine generator function (nullptr if there is no generator
+// registered for this name). The middle entries are the engines predefined by
+// open-source gRPC. The head entries represent an opportunity for specific
+// high-priority custom pollers to be added by the initializer plugins of
+// custom-built gRPC libraries. The tail entries represent the same, but for
+// low-priority custom pollers. The actual poller selected is either the first
+// available one in the list if no specific poller is requested, or the first
+// specific poller that is requested by name in the GRPC_POLL_STRATEGY
+// environment variable if that variable is set (which should be a
+// comma-separated list of one or more event engine names)
+static event_engine_factory g_factories[] = {
+ {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
+ {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux},
{"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix},
{"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling},
+ {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
+ {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
};
static void add(const char* beg, const char* end, char*** ss, size_t* ns) {
@@ -138,7 +163,7 @@ static bool is(const char* want, const char* have) {
static void try_engine(const char* engine) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
- if (is(engine, g_factories[i].name)) {
+ if (g_factories[i].factory != nullptr && is(engine, g_factories[i].name)) {
if ((g_event_engine = g_factories[i].factory(
0 == strcmp(engine, g_factories[i].name)))) {
g_poll_strategy_name = g_factories[i].name;
@@ -149,14 +174,32 @@ static void try_engine(const char* engine) {
}
}
-/* This should be used for testing purposes ONLY */
-void grpc_set_event_engine_test_only(
- const grpc_event_engine_vtable* ev_engine) {
- g_event_engine = ev_engine;
-}
+/* Call this before calling grpc_event_engine_init() */
+void grpc_register_event_engine_factory(const char* name,
+ event_engine_factory_fn factory,
+ bool add_at_head) {
+ const char* custom_match =
+ add_at_head ? ENGINE_HEAD_CUSTOM : ENGINE_TAIL_CUSTOM;
-const grpc_event_engine_vtable* grpc_get_event_engine_test_only() {
- return g_event_engine;
+ // Overwrite an existing registration if already registered
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
+ if (0 == strcmp(name, g_factories[i].name)) {
+ g_factories[i].factory = factory;
+ return;
+ }
+ }
+
+ // Otherwise fill in an available custom slot
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
+ if (0 == strcmp(g_factories[i].name, custom_match)) {
+ g_factories[i].name = name;
+ g_factories[i].factory = factory;
+ return;
+ }
+ }
+
+ // Otherwise fail
+ GPR_ASSERT(false);
}
/* Call this only after calling grpc_event_engine_init() */
@@ -194,14 +237,19 @@ void grpc_event_engine_shutdown(void) {
}
bool grpc_event_engine_can_track_errors(void) {
+/* Only track errors if platform supports errqueue. */
+#ifdef GRPC_LINUX_ERRQUEUE
return g_event_engine->can_track_err;
+#else
+ return false;
+#endif /* GRPC_LINUX_ERRQUEUE */
}
grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
- GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err);
- return g_event_engine->fd_create(fd, name, track_err);
+ return g_event_engine->fd_create(fd, name,
+ track_err && g_event_engine->can_track_err);
}
int grpc_fd_wrapped_fd(grpc_fd* fd) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 8d0bcc0710..b8fb8f534b 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -82,6 +82,11 @@ typedef struct grpc_event_engine_vtable {
void (*shutdown_engine)(void);
} grpc_event_engine_vtable;
+/* register a new event engine factory */
+void grpc_register_event_engine_factory(
+ const char* name, const grpc_event_engine_vtable* (*factory)(bool),
+ bool add_at_head);
+
void grpc_event_engine_init(void);
void grpc_event_engine_shutdown(void);
@@ -173,9 +178,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
-/* WARNING: The following two functions should be used for testing purposes
- * ONLY */
-void grpc_set_event_engine_test_only(const grpc_event_engine_vtable*);
-const grpc_event_engine_vtable* grpc_get_event_engine_test_only();
-
#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index 5d5c355ff9..d68fa0714b 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -109,6 +109,12 @@ grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler;
namespace grpc_core {
GPR_TLS_CLASS_DEF(ExecCtx::exec_ctx_);
+// WARNING: for testing purposes only!
+void ExecCtx::TestOnlyGlobalInit(gpr_timespec new_val) {
+ g_start_time = new_val;
+ gpr_tls_init(&exec_ctx_);
+}
+
void ExecCtx::GlobalInit(void) {
g_start_time = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_tls_init(&exec_ctx_);
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 8ddab0d381..f3528d527a 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -192,6 +192,8 @@ class ExecCtx {
now_is_valid_ = true;
}
+ static void TestOnlyGlobalInit(gpr_timespec new_val);
+
/** Global initialization for ExecCtx. Called by iomgr. */
static void GlobalInit(void);
diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc
index b37384b8db..e957bad73d 100644
--- a/src/core/lib/iomgr/fork_posix.cc
+++ b/src/core/lib/iomgr/fork_posix.cc
@@ -25,6 +25,7 @@
#include <string.h>
#include <grpc/fork.h>
+#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/lib/gpr/env.h"
@@ -34,7 +35,6 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/surface/init.h"
/*
* NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
@@ -58,6 +58,12 @@ void grpc_prefork() {
"environment variable GRPC_ENABLE_FORK_SUPPORT=1");
return;
}
+ if (strcmp(grpc_get_poll_strategy_name(), "epoll1") != 0 &&
+ strcmp(grpc_get_poll_strategy_name(), "poll") != 0) {
+ gpr_log(GPR_ERROR,
+ "Fork support is only compatible with the epoll1 and poll polling "
+ "strategies");
+ }
if (!grpc_core::Fork::BlockExecCtx()) {
gpr_log(GPR_INFO,
"Other threads are currently calling into gRPC, skipping fork() "
@@ -84,6 +90,11 @@ void grpc_postfork_child() {
if (!skipped_handler) {
grpc_core::Fork::AllowExecCtx();
grpc_core::ExecCtx exec_ctx;
+ grpc_core::Fork::child_postfork_func reset_polling_engine =
+ grpc_core::Fork::GetResetChildPollingEngineFunc();
+ if (reset_polling_engine != nullptr) {
+ reset_polling_engine();
+ }
grpc_timer_manager_set_threading(true);
grpc_executor_set_threading(true);
}
diff --git a/src/core/lib/iomgr/internal_errqueue.cc b/src/core/lib/iomgr/internal_errqueue.cc
new file mode 100644
index 0000000000..99c22e9055
--- /dev/null
+++ b/src/core/lib/iomgr/internal_errqueue.cc
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include "src/core/lib/iomgr/internal_errqueue.h"
+
+#ifdef GRPC_POSIX_SOCKET_TCP
+
+bool kernel_supports_errqueue() {
+#ifdef LINUX_VERSION_CODE
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0)
+ return true;
+#endif /* LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0) */
+#endif /* LINUX_VERSION_CODE */
+ return false;
+}
+
+#endif /* GRPC_POSIX_SOCKET_TCP */
diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h
new file mode 100644
index 0000000000..9d122808f9
--- /dev/null
+++ b/src/core/lib/iomgr/internal_errqueue.h
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* This file contains constants defined in <linux/errqueue.h> and
+ * <linux/net_tstamp.h> so as to allow collecting network timestamps in the
+ * kernel. This file allows tcp_posix.cc to compile on platforms that do not
+ * have <linux/errqueue.h> and <linux/net_tstamp.h>.
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H
+#define GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_POSIX_SOCKET_TCP
+
+#include <sys/types.h>
+#include <time.h>
+
+#ifdef GRPC_LINUX_ERRQUEUE
+#include <linux/errqueue.h>
+#include <linux/net_tstamp.h>
+#include <sys/socket.h>
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+namespace grpc_core {
+
+#ifdef GRPC_LINUX_ERRQUEUE
+
+/* Redefining scm_timestamping in the same way that <linux/errqueue.h> defines
+ * it, so that code compiles on systems that don't have it. */
+struct scm_timestamping {
+ struct timespec ts[3];
+};
+/* Also redefine timestamp types */
+/* The timestamp type for when the driver passed skb to NIC, or HW. */
+constexpr int SCM_TSTAMP_SND = 0;
+/* The timestamp type for when data entered the packet scheduler. */
+constexpr int SCM_TSTAMP_SCHED = 1;
+/* The timestamp type for when data acknowledged by peer. */
+constexpr int SCM_TSTAMP_ACK = 2;
+/* Redefine required constants from <linux/net_tstamp.h> */
+constexpr uint32_t SOF_TIMESTAMPING_TX_SOFTWARE = 1u << 1;
+constexpr uint32_t SOF_TIMESTAMPING_SOFTWARE = 1u << 4;
+constexpr uint32_t SOF_TIMESTAMPING_OPT_ID = 1u << 7;
+constexpr uint32_t SOF_TIMESTAMPING_TX_SCHED = 1u << 8;
+constexpr uint32_t SOF_TIMESTAMPING_TX_ACK = 1u << 9;
+constexpr uint32_t SOF_TIMESTAMPING_OPT_TSONLY = 1u << 11;
+
+constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE |
+ SOF_TIMESTAMPING_OPT_ID |
+ SOF_TIMESTAMPING_OPT_TSONLY;
+constexpr uint32_t kTimestampingRecordingOptions =
+ SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE |
+ SOF_TIMESTAMPING_TX_ACK;
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+/* Returns true if kernel is capable of supporting errqueue and timestamping.
+ * Currently allowing only linux kernels above 4.0.0
+ */
+bool kernel_supports_errqueue();
+} // namespace grpc_core
+
+#endif /* GRPC_POSIX_SOCKET_TCP */
+
+#endif /* GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 1d0ecff802..abf96662f5 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -60,6 +60,11 @@
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
#define GRPC_HAVE_UNIX_SOCKET 1
+#ifdef LINUX_VERSION_CODE
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0)
+#define GRPC_LINUX_ERRQUEUE 1
+#endif /* LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) */
+#endif /* LINUX_VERSION_CODE */
#define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_HOST_NAME_MAX 1
@@ -140,6 +145,18 @@
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
+#elif defined(GPR_SOLARIS)
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#elif defined(GPR_AIX)
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
#elif defined(GPR_NACL)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc
index 539bc120ce..b6fc7579f7 100644
--- a/src/core/lib/iomgr/resource_quota.cc
+++ b/src/core/lib/iomgr/resource_quota.cc
@@ -96,6 +96,9 @@ struct grpc_resource_user {
list, false otherwise */
bool added_to_free_pool;
+ /* The number of threads currently allocated to this resource user */
+ gpr_atm num_threads_allocated;
+
/* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
*/
grpc_closure* reclaimers[2];
@@ -135,12 +138,33 @@ struct grpc_resource_quota {
gpr_atm last_size;
+ /* Mutex to protect max_threads and num_threads_allocated */
+ /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
+ * and avoid having this mutex; but in that case, each invocation of the
+ * function grpc_resource_user_allocate_threads() would have had to do at
+ * least two atomic loads (for max_threads and num_threads_allocated) followed
+ * by a CAS (on num_threads_allocated).
+ * Moreover, we expect grpc_resource_user_allocate_threads() to be often
+ * called concurrently thereby increasing the chances of failing the CAS
+ * operation. This additional complexity is not worth the tiny perf gain we
+ * may (or may not) have by using atomics */
+ gpr_mu thread_count_mu;
+
+ /* Max number of threads allowed */
+ int max_threads;
+
+ /* Number of threads currently allocated via this resource_quota object */
+ int num_threads_allocated;
+
/* Has rq_step been scheduled to occur? */
bool step_scheduled;
+
/* Are we currently reclaiming memory */
bool reclaiming;
+
/* Closure around rq_step */
grpc_closure rq_step_closure;
+
/* Closure around rq_reclamation_done */
grpc_closure rq_reclamation_done_closure;
@@ -524,6 +548,11 @@ static void ru_shutdown(void* ru, grpc_error* error) {
static void ru_destroy(void* ru, grpc_error* error) {
grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
+ // Free all the remaining thread quota
+ grpc_resource_user_free_threads(resource_user,
+ static_cast<int>(gpr_atm_no_barrier_load(
+ &resource_user->num_threads_allocated)));
+
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, static_cast<grpc_rulist>(i));
}
@@ -594,6 +623,9 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {
resource_quota->free_pool = INT64_MAX;
resource_quota->size = INT64_MAX;
gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
+ gpr_mu_init(&resource_quota->thread_count_mu);
+ resource_quota->max_threads = INT_MAX;
+ resource_quota->num_threads_allocated = 0;
resource_quota->step_scheduled = false;
resource_quota->reclaiming = false;
gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
@@ -616,6 +648,8 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {
void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
if (gpr_unref(&resource_quota->refs)) {
+ // No outstanding thread quota
+ GPR_ASSERT(resource_quota->num_threads_allocated == 0);
GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
gpr_free(resource_quota->name);
gpr_free(resource_quota);
@@ -647,6 +681,15 @@ double grpc_resource_quota_get_memory_pressure(
}
/* Public API */
+void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
+ int new_max_threads) {
+ GPR_ASSERT(new_max_threads >= 0);
+ gpr_mu_lock(&resource_quota->thread_count_mu);
+ resource_quota->max_threads = new_max_threads;
+ gpr_mu_unlock(&resource_quota->thread_count_mu);
+}
+
+/* Public API */
void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
size_t size) {
grpc_core::ExecCtx exec_ctx;
@@ -731,6 +774,7 @@ grpc_resource_user* grpc_resource_user_create(
grpc_closure_list_init(&resource_user->on_allocated);
resource_user->allocating = false;
resource_user->added_to_free_pool = false;
+ gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
resource_user->reclaimers[0] = nullptr;
resource_user->reclaimers[1] = nullptr;
resource_user->new_reclaimers[0] = nullptr;
@@ -785,6 +829,40 @@ void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
}
}
+bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
+ int thread_count) {
+ GPR_ASSERT(thread_count >= 0);
+ bool is_success = false;
+ gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
+ grpc_resource_quota* rq = resource_user->resource_quota;
+ if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
+ rq->num_threads_allocated += thread_count;
+ gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
+ thread_count);
+ is_success = true;
+ }
+ gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
+ return is_success;
+}
+
+void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
+ int thread_count) {
+ GPR_ASSERT(thread_count >= 0);
+ gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
+ grpc_resource_quota* rq = resource_user->resource_quota;
+ rq->num_threads_allocated -= thread_count;
+ int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
+ &resource_user->num_threads_allocated, -thread_count));
+ if (old_count < thread_count || rq->num_threads_allocated < 0) {
+ gpr_log(GPR_ERROR,
+ "Releasing more threads (%d) than currently allocated (rq threads: "
+ "%d, ru threads: %d)",
+ thread_count, rq->num_threads_allocated + thread_count, old_count);
+ abort();
+ }
+ gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
+}
+
void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
grpc_closure* optional_on_done) {
gpr_mu_lock(&resource_user->mu);
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index 937daf8728..7b0ed7417a 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -93,6 +93,22 @@ void grpc_resource_user_ref(grpc_resource_user* resource_user);
void grpc_resource_user_unref(grpc_resource_user* resource_user);
void grpc_resource_user_shutdown(grpc_resource_user* resource_user);
+/* Attempts to get quota from the resource_user to create 'thread_count' number
+ * of threads. Returns true if successful (i.e the caller is now free to create
+ * 'thread_count' number of threads) or false if quota is not available */
+bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
+ int thread_count);
+/* Releases 'thread_count' worth of quota back to the resource user. The quota
+ * should have been previously obtained successfully by calling
+ * grpc_resource_user_allocate_threads().
+ *
+ * Note: There need not be an exact one-to-one correspondence between
+ * grpc_resource_user_allocate_threads() and grpc_resource_user_free_threads()
+ * calls. The only requirement is that the number of threads allocated should
+ * all be eventually released */
+void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
+ int thread_count);
+
/* Allocate from the resource user (and its quota).
If optional_on_done is NULL, then allocate immediately. This may push the
quota over-limit, at which point reclamation will kick in.
diff --git a/src/core/lib/iomgr/socket_mutator.cc b/src/core/lib/iomgr/socket_mutator.cc
index b9b8eaf4ad..a448c9f61c 100644
--- a/src/core/lib/iomgr/socket_mutator.cc
+++ b/src/core/lib/iomgr/socket_mutator.cc
@@ -57,7 +57,7 @@ int grpc_socket_mutator_compare(grpc_socket_mutator* a,
void grpc_socket_mutator_unref(grpc_socket_mutator* mutator) {
if (gpr_unref(&mutator->refcount)) {
- mutator->vtable->destory(mutator);
+ mutator->vtable->destroy(mutator);
}
}
diff --git a/src/core/lib/iomgr/socket_mutator.h b/src/core/lib/iomgr/socket_mutator.h
index 6c7781c51d..8742a3ba61 100644
--- a/src/core/lib/iomgr/socket_mutator.h
+++ b/src/core/lib/iomgr/socket_mutator.h
@@ -33,7 +33,7 @@ typedef struct {
/** Compare socket mutator \a a and \a b */
int (*compare)(grpc_socket_mutator* a, grpc_socket_mutator* b);
/** Destroys the socket mutator instance */
- void (*destory)(grpc_socket_mutator* mutator);
+ void (*destroy)(grpc_socket_mutator* mutator);
} grpc_socket_mutator_vtable;
/** The Socket Mutator interface allows changes on socket options */
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 296ee74311..9c989b7dfe 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -279,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
}
addr_str = grpc_sockaddr_to_uri(mapped_addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
- *fdobj = grpc_fd_create(fd, name, false);
+ *fdobj = grpc_fd_create(fd, name, true);
gpr_free(name);
gpr_free(addr_str);
return GRPC_ERROR_NONE;
diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc
index 990e8d632b..e02a1898f2 100644
--- a/src/core/lib/iomgr/tcp_custom.cc
+++ b/src/core/lib/iomgr/tcp_custom.cc
@@ -221,7 +221,7 @@ static void custom_write_callback(grpc_custom_socket* socket,
}
static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index b53ffbf01c..ac1e919acb 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -27,7 +27,9 @@
#include <errno.h>
#include <limits.h>
+#include <netinet/in.h>
#include <stdbool.h>
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
@@ -46,6 +48,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
@@ -97,17 +100,42 @@ struct grpc_tcp {
grpc_closure read_done_closure;
grpc_closure write_done_closure;
+ grpc_closure error_closure;
char* peer_string;
grpc_resource_user* resource_user;
grpc_resource_user_slice_allocator slice_allocator;
+
+ grpc_core::TracedBuffer* tb_head; /* List of traced buffers */
+ gpr_mu tb_mu; /* Lock for access to list of traced buffers */
+
+ /* grpc_endpoint_write takes an argument which if non-null means that the
+ * transport layer wants the TCP layer to collect timestamps for this write.
+ * This arg is forwarded to the timestamps callback function when the ACK
+ * timestamp is received from the kernel. This arg is a (void *) which allows
+ * users of this API to pass in a pointer to any kind of structure. This
+ * structure could actually be a tag or any book-keeping object that the user
+ * can use to distinguish between different traced writes. The only
+ * requirement from the TCP endpoint layer is that this arg should be non-null
+ * if the user wants timestamps for the write. */
+ void* outgoing_buffer_arg;
+ /* A counter which starts at 0. It is initialized the first time the socket
+ * options for collecting timestamps are set, and is incremented with each
+ * byte sent. */
+ int bytes_counter;
+ bool socket_ts_enabled; /* True if timestamping options are set on the socket
+ */
+ gpr_atm
+ stop_error_notification; /* Set to 1 if we do not want to be notified on
+ errors anymore */
};
struct backup_poller {
gpr_mu* pollset_mu;
grpc_closure run_poller;
};
+
} // namespace
#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
@@ -302,6 +330,7 @@ static void tcp_free(grpc_tcp* tcp) {
grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string);
+ gpr_mu_destroy(&tcp->tb_mu);
gpr_free(tcp);
}
@@ -347,6 +376,10 @@ static void tcp_destroy(grpc_endpoint* ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+ if (grpc_event_engine_can_track_errors()) {
+ gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
+ grpc_fd_set_error(tcp->em_fd);
+ }
TCP_UNREF(tcp, "destroy");
}
@@ -513,6 +546,235 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
}
}
+/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
+ * of bytes sent. */
+ssize_t tcp_send(int fd, const struct msghdr* msg) {
+ GPR_TIMER_SCOPE("sendmsg", 1);
+ ssize_t sent_length;
+ do {
+ /* TODO(klempner): Cork if this is a partial write */
+ GRPC_STATS_INC_SYSCALL_WRITE();
+ sent_length = sendmsg(fd, msg, SENDMSG_FLAGS);
+ } while (sent_length < 0 && errno == EINTR);
+ return sent_length;
+}
+
+/** This is to be called if outgoing_buffer_arg is not null. On linux platforms,
+ * this will call sendmsg with socket options set to collect timestamps inside
+ * the kernel. On return, sent_length is set to the return value of the sendmsg
+ * call. Returns false if setting the socket options failed. This is not
+ * implemented for non-linux platforms currently, and crashes out.
+ */
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+ size_t sending_length,
+ ssize_t* sent_length, grpc_error** error);
+
+/** The callback function to be invoked when we get an error on the socket. */
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
+
+#ifdef GRPC_LINUX_ERRQUEUE
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+ size_t sending_length,
+ ssize_t* sent_length,
+ grpc_error** error) {
+ if (!tcp->socket_ts_enabled) {
+ uint32_t opt = grpc_core::kTimestampingSocketOptions;
+ if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
+ static_cast<void*>(&opt), sizeof(opt)) != 0) {
+ *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp);
+ grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket.");
+ }
+ return false;
+ }
+ tcp->bytes_counter = -1;
+ tcp->socket_ts_enabled = true;
+ }
+ /* Set control message to indicate that you want timestamps. */
+ union {
+ char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))];
+ struct cmsghdr align;
+ } u;
+ cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SO_TIMESTAMPING;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t));
+ *reinterpret_cast<int*>(CMSG_DATA(cmsg)) =
+ grpc_core::kTimestampingRecordingOptions;
+ msg->msg_control = u.cmsg_buf;
+ msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));
+
+ /* If there was an error on sendmsg the logic in tcp_flush will handle it. */
+ ssize_t length = tcp_send(tcp->fd, msg);
+ *sent_length = length;
+ /* Only save timestamps if all the bytes were taken by sendmsg. */
+ if (sending_length == static_cast<size_t>(length)) {
+ gpr_mu_lock(&tcp->tb_mu);
+ grpc_core::TracedBuffer::AddNewEntry(
+ &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length),
+ tcp->outgoing_buffer_arg);
+ gpr_mu_unlock(&tcp->tb_mu);
+ tcp->outgoing_buffer_arg = nullptr;
+ }
+ return true;
+}
+
+/** Reads \a cmsg to derive timestamps from the control messages. If a valid
+ * timestamp is found, the traced buffer list is updated with this timestamp.
+ * The caller of this function should be looping on the control messages found
+ * in \a msg. \a cmsg should point to the control message that the caller wants
+ * processed.
+ * On return, a pointer to a control message is returned. On the next iteration,
+ * CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. */
+struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
+ struct cmsghdr* cmsg) {
+ auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
+ if (next_cmsg == nullptr) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Received timestamp without extended error");
+ }
+ return cmsg;
+ }
+
+ if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
+ !(next_cmsg->cmsg_type == IP_RECVERR ||
+ next_cmsg->cmsg_type == IPV6_RECVERR)) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Unexpected control message");
+ }
+ return cmsg;
+ }
+
+ auto tss =
+ reinterpret_cast<struct grpc_core::scm_timestamping*>(CMSG_DATA(cmsg));
+ auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
+ if (serr->ee_errno != ENOMSG ||
+ serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
+ gpr_log(GPR_ERROR, "Unexpected control message");
+ return cmsg;
+ }
+ /* The error handling can potentially be done on another thread so we need
+ * to protect the traced buffer list. A lock free list might be better. Using
+ * a simple mutex for now. */
+ gpr_mu_lock(&tcp->tb_mu);
+ grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, tss);
+ gpr_mu_unlock(&tcp->tb_mu);
+ return next_cmsg;
+}
+
+/** For linux platforms, reads the socket's error queue and processes error
+ * messages from the queue. Returns true if all the errors processed were
+ * timestamps. Returns false if any of the errors were not timestamps. For
+ * non-linux platforms, error processing is not used/enabled currently.
+ */
+static bool process_errors(grpc_tcp* tcp) {
+ while (true) {
+ struct iovec iov;
+ iov.iov_base = nullptr;
+ iov.iov_len = 0;
+ struct msghdr msg;
+ msg.msg_name = nullptr;
+ msg.msg_namelen = 0;
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 0;
+ msg.msg_flags = 0;
+
+ union {
+ char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) +
+ CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/];
+ struct cmsghdr align;
+ } aligned_buf;
+ memset(&aligned_buf, 0, sizeof(aligned_buf));
+
+ msg.msg_control = aligned_buf.rbuf;
+ msg.msg_controllen = sizeof(aligned_buf.rbuf);
+
+ int r, saved_errno;
+ do {
+ r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE);
+ saved_errno = errno;
+ } while (r < 0 && saved_errno == EINTR);
+
+ if (r == -1 && saved_errno == EAGAIN) {
+ return true; /* No more errors to process */
+ }
+ if (r == -1) {
+ return false;
+ }
+ if (grpc_tcp_trace.enabled()) {
+ if ((msg.msg_flags & MSG_CTRUNC) == 1) {
+ gpr_log(GPR_INFO, "Error message was truncated.");
+ }
+ }
+
+ if (msg.msg_controllen == 0) {
+ /* There was no control message found. It was probably spurious. */
+ return true;
+ }
+ for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
+ cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ if (cmsg->cmsg_level != SOL_SOCKET ||
+ cmsg->cmsg_type != SCM_TIMESTAMPING) {
+ /* Got a control message that is not a timestamp. Don't know how to
+ * handle this. */
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "unknown control message cmsg_level:%d cmsg_type:%d",
+ cmsg->cmsg_level, cmsg->cmsg_type);
+ }
+ return false;
+ }
+ process_timestamp(tcp, &msg, cmsg);
+ }
+ }
+}
+
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
+ grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error));
+ }
+
+ if (error != GRPC_ERROR_NONE ||
+ static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) {
+ /* We aren't going to register to hear on error anymore, so it is safe to
+ * unref. */
+ grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error));
+ TCP_UNREF(tcp, "error-tracking");
+ return;
+ }
+
+ /* We are still interested in collecting timestamps, so let's try reading
+ * them. */
+ if (!process_errors(tcp)) {
+ /* This was not a timestamps error. This was an actual error. Set the
+ * read and write closures to be ready.
+ */
+ grpc_fd_set_readable(tcp->em_fd);
+ grpc_fd_set_writable(tcp->em_fd);
+ }
+ GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
+}
+
+#else /* GRPC_LINUX_ERRQUEUE */
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+ size_t sending_length,
+ ssize_t* sent_length,
+ grpc_error** error) {
+ gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
+ GPR_ASSERT(0);
+ return false;
+}
+
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
+ gpr_log(GPR_ERROR, "Error handling is not supported for this platform");
+ GPR_ASSERT(0);
+}
+#endif /* GRPC_LINUX_ERRQUEUE */
+
/* returns true if done, false if pending; if returning true, *error is set */
#if defined(IOV_MAX) && IOV_MAX < 1000
#define MAX_WRITE_IOVEC IOV_MAX
@@ -557,19 +819,20 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iov_size;
- msg.msg_control = nullptr;
- msg.msg_controllen = 0;
msg.msg_flags = 0;
+ if (tcp->outgoing_buffer_arg != nullptr) {
+ if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
+ error))
+ return true; /* something went wrong with timestamps */
+ } else {
+ msg.msg_control = nullptr;
+ msg.msg_controllen = 0;
- GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
- GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
+ GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
+ GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
- GPR_TIMER_SCOPE("sendmsg", 1);
- do {
- /* TODO(klempner): Cork if this is a partial write */
- GRPC_STATS_INC_SYSCALL_WRITE();
- sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
- } while (sent_length < 0 && errno == EINTR);
+ sent_length = tcp_send(tcp->fd, &msg);
+ }
if (sent_length < 0) {
if (errno == EAGAIN) {
@@ -593,6 +856,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
}
GPR_ASSERT(tcp->outgoing_byte_idx == 0);
+ tcp->bytes_counter += sent_length;
trailing = sending_length - static_cast<size_t>(sent_length);
while (trailing > 0) {
size_t slice_length;
@@ -607,7 +871,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
trailing -= slice_length;
}
}
-
if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
*error = GRPC_ERROR_NONE;
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
@@ -640,14 +903,13 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
-
GRPC_CLOSURE_SCHED(cb, error);
TCP_UNREF(tcp, "write");
}
}
static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
GPR_TIMER_SCOPE("tcp_write", 0);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_error* error = GRPC_ERROR_NONE;
@@ -675,6 +937,10 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
}
tcp->outgoing_buffer = buf;
tcp->outgoing_byte_idx = 0;
+ tcp->outgoing_buffer_arg = arg;
+ if (arg) {
+ GPR_ASSERT(grpc_event_engine_can_track_errors());
+ }
if (!tcp_flush(tcp, &error)) {
TCP_REF(tcp, "write");
@@ -792,6 +1058,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->bytes_read_this_round = 0;
/* Will be set to false by the very first endpoint read function */
tcp->is_first_read = true;
+ tcp->bytes_counter = -1;
+ tcp->socket_ts_enabled = false;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
@@ -803,6 +1071,19 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
grpc_resource_quota_unref_internal(resource_quota);
+ gpr_mu_init(&tcp->tb_mu);
+ tcp->tb_head = nullptr;
+ /* Start being notified on errors if event engine can track errors. */
+ if (grpc_event_engine_can_track_errors()) {
+ /* Grab a ref to tcp so that we can safely access the tcp struct when
+ * processing errors. We unref when we no longer want to track errors
+ * separately. */
+ TCP_REF(tcp, "error-tracking");
+ gpr_atm_rel_store(&tcp->stop_error_notification, 0);
+ GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
+ }
return &tcp->base;
}
@@ -821,6 +1102,11 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
tcp->release_fd = fd;
tcp->release_fd_cb = done;
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+ if (grpc_event_engine_can_track_errors()) {
+ /* Stop errors notification. */
+ gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
+ grpc_fd_set_error(tcp->em_fd);
+ }
TCP_UNREF(tcp, "destroy");
}
diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h
index af89bd24db..eff825cb92 100644
--- a/src/core/lib/iomgr/tcp_posix.h
+++ b/src/core/lib/iomgr/tcp_posix.h
@@ -31,7 +31,10 @@
#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
+
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 8ddf684fea..824db07fbf 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) {
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- grpc_fd* fdobj = grpc_fd_create(fd, name, false);
+ grpc_fd* fdobj = grpc_fd_create(fd, name, true);
read_notifier_pollset =
sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
@@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
listener->sibling = sp;
sp->server = listener->server;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name, false);
+ sp->emfd = grpc_fd_create(fd, name, true);
memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = listener->port_index;
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index b9f8145572..9595c028ce 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd,
s->tail = sp;
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name, false);
+ sp->emfd = grpc_fd_create(fd, name, true);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = port_index;
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index b3cb442f18..64c4a56ae9 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -296,7 +296,7 @@ static void on_write(void* tcpp, grpc_error* error) {
/* Initiates a write. */
static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_winsocket* socket = tcp->socket;
grpc_winsocket_callback_info* info = &socket->write_info;
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index 7f534476df..17e933b865 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -61,10 +61,11 @@ typedef struct grpc_timer_vtable {
/* Initialize *timer. When expired or canceled, closure will be called with
error set to indicate if it expired (GRPC_ERROR_NONE) or was canceled
- (GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called exactly once, and
+ (GRPC_ERROR_CANCELLED). *closure is guaranteed to be called exactly once, and
application code should check the error to determine how it was invoked. The
application callback is also responsible for maintaining information about
- when to free up any user-level state. */
+ when to free up any user-level state. Behavior is undefined for a deadline of
+ GRPC_MILLIS_INF_FUTURE. */
void grpc_timer_init(grpc_timer* timer, grpc_millis deadline,
grpc_closure* closure);
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index bdb2d0e764..3dd7cab855 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
- emfd_ = grpc_fd_create(fd, name, false);
+ emfd_ = grpc_fd_create(fd, name, true);
memcpy(&addr_, addr, sizeof(grpc_resolved_address));
GPR_ASSERT(emfd_);
gpr_free(name);