diff options
author | Per Kjellander <perkj@webrtc.org> | 2022-11-08 12:48:52 +0100 |
---|---|---|
committer | WebRTC LUCI CQ <webrtc-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-11-08 12:51:49 +0000 |
commit | fdcfefa70861e545f9e073fafd5a654c239bda1a (patch) | |
tree | 4c16ea5ec16f3a6e074ba087b2f1d12d44d63964 /rtc_base | |
parent | 119fb1910a218a2edd0c542b39c65a47ed9f2e48 (diff) | |
download | webrtc-fdcfefa70861e545f9e073fafd5a654c239bda1a.tar.gz |
Add experiment to use ::recvmsg to receive packets on posix systems
Using ::recvmsg ensure packet timestamp can then be read directly when reading the buffer
instead of a separate system call and should also work on Ios/Mac.
The same experiment field trial flag will be "WebRTC-SCM-Timestamp/enabled/" and is also planned to be used for fixing webrtc:14066
Bug: webrtc:5773, webrtc:14066
Change-Id: I8a3749e87c686aa18fcee947472c1b602a0f63c8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/279280
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Jonas Oreland <jonaso@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38585}
Diffstat (limited to 'rtc_base')
-rw-r--r-- | rtc_base/BUILD.gn | 2 | ||||
-rw-r--r-- | rtc_base/physical_socket_server.cc | 118 | ||||
-rw-r--r-- | rtc_base/physical_socket_server.h | 6 | ||||
-rw-r--r-- | rtc_base/physical_socket_server_unittest.cc | 14 | ||||
-rw-r--r-- | rtc_base/socket_unittest.cc | 12 |
5 files changed, 132 insertions, 20 deletions
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index b171b162bf..c5b9024cc1 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -961,6 +961,7 @@ rtc_library("threading") { "../api/task_queue", "../api/task_queue:pending_task_safety_flag", "../api/units:time_delta", + "../system_wrappers:field_trial", "synchronization:mutex", "system:no_unique_address", "system:rtc_export", @@ -1498,6 +1499,7 @@ if (rtc_include_tests) { ":timeutils", "../api/units:time_delta", "../system_wrappers", + "../test:field_trial", "../test:fileutils", "../test:test_main", "../test:test_support", diff --git a/rtc_base/physical_socket_server.cc b/rtc_base/physical_socket_server.cc index 7c01815d30..b7d69140e0 100644 --- a/rtc_base/physical_socket_server.cc +++ b/rtc_base/physical_socket_server.cc @@ -52,6 +52,7 @@ #include "rtc_base/null_socket_server.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/time_utils.h" +#include "system_wrappers/include/field_trial.h" #if defined(WEBRTC_LINUX) #include <linux/sockios.h> @@ -118,6 +119,12 @@ class ScopedSetTrue { private: bool* value_; }; + +// Returns true if the the client is in the experiment to get timestamps +// from the socket implementation. +bool IsScmTimeStampExperimentEnabled() { + return webrtc::field_trial::IsEnabled("WebRTC-SCM-Timestamp"); +} } // namespace namespace rtc { @@ -127,7 +134,8 @@ PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s) s_(s), error_(0), state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED), - resolver_(nullptr) { + resolver_(nullptr), + read_scm_timestamp_experiment_(IsScmTimeStampExperimentEnabled()) { if (s_ != INVALID_SOCKET) { SetEnabledEvents(DE_READ | DE_WRITE); @@ -395,7 +403,7 @@ int PhysicalSocket::SendTo(const void* buffer, int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { int received = - ::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0); + DoReadFromSocket(buffer, length, /*out_addr*/ nullptr, timestamp); if ((received == 0) && (length != 0)) { // Note: on graceful shutdown, recv can return 0. In this case, we // pretend it is blocking, and then signal close, so that simplifying @@ -407,9 +415,7 @@ int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { SetError(EWOULDBLOCK); return SOCKET_ERROR; } - if (timestamp) { - *timestamp = GetSocketRecvTimestamp(s_); - } + UpdateLastError(); int error = GetError(); bool success = (received >= 0) || IsBlockingError(error); @@ -426,17 +432,8 @@ int PhysicalSocket::RecvFrom(void* buffer, size_t length, SocketAddress* out_addr, int64_t* timestamp) { - sockaddr_storage addr_storage; - socklen_t addr_len = sizeof(addr_storage); - sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); - int received = ::recvfrom(s_, static_cast<char*>(buffer), - static_cast<int>(length), 0, addr, &addr_len); - if (timestamp) { - *timestamp = GetSocketRecvTimestamp(s_); - } + int received = DoReadFromSocket(buffer, length, out_addr, timestamp); UpdateLastError(); - if ((received >= 0) && (out_addr != nullptr)) - SocketAddressFromSockAddrStorage(addr_storage, out_addr); int error = GetError(); bool success = (received >= 0) || IsBlockingError(error); if (udp_ || success) { @@ -448,6 +445,84 @@ int PhysicalSocket::RecvFrom(void* buffer, return received; } +int PhysicalSocket::DoReadFromSocket(void* buffer, + size_t length, + SocketAddress* out_addr, + int64_t* timestamp) { + sockaddr_storage addr_storage; + socklen_t addr_len = sizeof(addr_storage); + sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); + +#if defined(WEBRTC_POSIX) + int received = 0; + if (read_scm_timestamp_experiment_) { + iovec iov = {.iov_base = buffer, .iov_len = length}; + msghdr msg = {.msg_iov = &iov, .msg_iovlen = 1}; + if (out_addr) { + out_addr->Clear(); + msg.msg_name = addr; + msg.msg_namelen = addr_len; + } + char control[CMSG_SPACE(sizeof(struct timeval))] = {}; + if (timestamp) { + *timestamp = -1; + msg.msg_control = &control; + msg.msg_controllen = sizeof(control); + } + received = ::recvmsg(s_, &msg, 0); + if (received <= 0) { + // An error occured or shut down. + return received; + } + if (timestamp) { + struct cmsghdr* cmsg; + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level != SOL_SOCKET) + continue; + if (cmsg->cmsg_type == SCM_TIMESTAMP) { + timeval* ts = reinterpret_cast<timeval*>(CMSG_DATA(cmsg)); + *timestamp = + rtc::kNumMicrosecsPerSec * static_cast<int64_t>(ts->tv_sec) + + static_cast<int64_t>(ts->tv_usec); + break; + } + } + } + if (out_addr) { + SocketAddressFromSockAddrStorage(addr_storage, out_addr); + } + } else { // !read_scm_timestamp_experiment_ + if (out_addr) { + received = ::recvfrom(s_, static_cast<char*>(buffer), + static_cast<int>(length), 0, addr, &addr_len); + SocketAddressFromSockAddrStorage(addr_storage, out_addr); + } else { + received = + ::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0); + } + if (timestamp) { + *timestamp = GetSocketRecvTimestamp(s_); + } + } + return received; + +#else + int received = 0; + if (out_addr) { + received = ::recvfrom(s_, static_cast<char*>(buffer), + static_cast<int>(length), 0, addr, &addr_len); + SocketAddressFromSockAddrStorage(addr_storage, out_addr); + } else { + received = + ::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0); + } + if (timestamp) { + *timestamp = -1; + } + return received; +#endif +} + int PhysicalSocket::Listen(int backlog) { int err = ::listen(s_, backlog); UpdateLastError(); @@ -643,7 +718,16 @@ bool SocketDispatcher::Initialize() { ioctlsocket(s_, FIONBIO, &argp); #elif defined(WEBRTC_POSIX) fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK); + if (IsScmTimeStampExperimentEnabled()) { + int value = 1; + // Attempt to get receive packet timestamp from the socket. + if (::setsockopt(s_, SOL_SOCKET, SO_TIMESTAMP, &value, sizeof(value)) != + 0) { + RTC_DLOG(LS_ERROR) << "::setsockopt failed. errno: " << LAST_SYSTEM_ERROR; + } + } #endif + #if defined(WEBRTC_IOS) // iOS may kill sockets when the app is moved to the background // (specifically, if the app doesn't use the "voip" UIBackgroundMode). When @@ -651,7 +735,9 @@ bool SocketDispatcher::Initialize() { // default will terminate the process, which we don't want. By specifying // this socket option, SIGPIPE will be disabled for the socket. int value = 1; - ::setsockopt(s_, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value)); + if (::setsockopt(s_, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value)) != 0) { + RTC_DLOG(LS_ERROR) << "::setsockopt failed. errno: " << LAST_SYSTEM_ERROR; + } #endif ss_->Add(this); return true; diff --git a/rtc_base/physical_socket_server.h b/rtc_base/physical_socket_server.h index f97271f422..5a3acbf84f 100644 --- a/rtc_base/physical_socket_server.h +++ b/rtc_base/physical_socket_server.h @@ -190,6 +190,11 @@ class PhysicalSocket : public Socket, public sigslot::has_slots<> { const struct sockaddr* dest_addr, socklen_t addrlen); + int DoReadFromSocket(void* buffer, + size_t length, + SocketAddress* out_addr, + int64_t* timestamp); + void OnResolveResult(AsyncResolverInterface* resolver); void UpdateLastError(); @@ -216,6 +221,7 @@ class PhysicalSocket : public Socket, public sigslot::has_slots<> { #endif private: + const bool read_scm_timestamp_experiment_; uint8_t enabled_events_ = 0; }; diff --git a/rtc_base/physical_socket_server_unittest.cc b/rtc_base/physical_socket_server_unittest.cc index 3da777a235..5d1ab5a4fd 100644 --- a/rtc_base/physical_socket_server_unittest.cc +++ b/rtc_base/physical_socket_server_unittest.cc @@ -23,6 +23,7 @@ #include "rtc_base/socket_unittest.h" #include "rtc_base/test_utils.h" #include "rtc_base/thread.h" +#include "test/field_trial.h" #include "test/gtest.h" namespace rtc { @@ -460,8 +461,9 @@ TEST_F(PhysicalSocketTest, TestGetSetOptionsIPv6) { #if defined(WEBRTC_POSIX) -// We don't get recv timestamps on Mac. #if !defined(WEBRTC_MAC) +// We don't get recv timestamps on Mac without the experiment +// WebRTC-SCM-Timestamp TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv4) { MAYBE_SKIP_IPV4; SocketTest::TestSocketRecvTimestampIPv4(); @@ -472,6 +474,16 @@ TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv6) { } #endif +TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv4ScmExperiment) { + MAYBE_SKIP_IPV4; + webrtc::test::ScopedFieldTrials trial("WebRTC-SCM-Timestamp/Enabled/"); + SocketTest::TestSocketRecvTimestampIPv4(); +} + +TEST_F(PhysicalSocketTest, TestSocketRecvTimestampIPv6ScmExperiment) { + webrtc::test::ScopedFieldTrials trial("WebRTC-SCM-Timestamp/Enabled/"); + SocketTest::TestSocketRecvTimestampIPv6(); +} // Verify that if the socket was unable to be bound to a real network interface // (not loopback), Bind will return an error. TEST_F(PhysicalSocketTest, diff --git a/rtc_base/socket_unittest.cc b/rtc_base/socket_unittest.cc index ea6407ae6e..8e76d343f6 100644 --- a/rtc_base/socket_unittest.cc +++ b/rtc_base/socket_unittest.cc @@ -1070,25 +1070,31 @@ void SocketTest::GetSetOptionsInternal(const IPAddress& loopback) { } void SocketTest::SocketRecvTimestamp(const IPAddress& loopback) { + StreamSink sink; std::unique_ptr<Socket> socket( socket_factory_->CreateSocket(loopback.family(), SOCK_DGRAM)); EXPECT_EQ(0, socket->Bind(SocketAddress(loopback, 0))); SocketAddress address = socket->GetLocalAddress(); + sink.Monitor(socket.get()); int64_t send_time_1 = TimeMicros(); socket->SendTo("foo", 3, address); + int64_t recv_timestamp_1; + // Wait until data is available. + EXPECT_TRUE_WAIT(sink.Check(socket.get(), SSE_READ), kTimeout); char buffer[3]; - socket->RecvFrom(buffer, 3, nullptr, &recv_timestamp_1); - EXPECT_GT(recv_timestamp_1, -1); + ASSERT_GT(socket->RecvFrom(buffer, 3, nullptr, &recv_timestamp_1), 0); const int64_t kTimeBetweenPacketsMs = 100; Thread::SleepMs(kTimeBetweenPacketsMs); int64_t send_time_2 = TimeMicros(); socket->SendTo("bar", 3, address); + // Wait until data is available. + EXPECT_TRUE_WAIT(sink.Check(socket.get(), SSE_READ), kTimeout); int64_t recv_timestamp_2; - socket->RecvFrom(buffer, 3, nullptr, &recv_timestamp_2); + ASSERT_GT(socket->RecvFrom(buffer, 3, nullptr, &recv_timestamp_2), 0); int64_t system_time_diff = send_time_2 - send_time_1; int64_t recv_timestamp_diff = recv_timestamp_2 - recv_timestamp_1; |