aboutsummaryrefslogtreecommitdiff
path: root/rtc_base/virtual_socket_server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'rtc_base/virtual_socket_server.cc')
-rw-r--r--rtc_base/virtual_socket_server.cc111
1 files changed, 59 insertions, 52 deletions
diff --git a/rtc_base/virtual_socket_server.cc b/rtc_base/virtual_socket_server.cc
index 80d7f3c047..f5e993645e 100644
--- a/rtc_base/virtual_socket_server.cc
+++ b/rtc_base/virtual_socket_server.cc
@@ -19,7 +19,6 @@
#include "absl/algorithm/container.h"
#include "rtc_base/checks.h"
-#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/fake_clock.h"
#include "rtc_base/logging.h"
#include "rtc_base/physical_socket_server.h"
@@ -164,6 +163,8 @@ int VirtualSocket::Close() {
}
if (SOCK_STREAM == type_) {
+ webrtc::MutexLock lock(&mutex_);
+
// Cancel pending sockets
if (listen_queue_) {
while (!listen_queue_->empty()) {
@@ -173,7 +174,6 @@ int VirtualSocket::Close() {
server_->Disconnect(addr);
listen_queue_->pop_front();
}
- delete listen_queue_;
listen_queue_ = nullptr;
}
// Disconnect stream sockets
@@ -231,6 +231,8 @@ int VirtualSocket::RecvFrom(void* pv,
if (timestamp) {
*timestamp = -1;
}
+
+ webrtc::MutexLock lock(&mutex_);
// If we don't have a packet, then either error or wait for one to arrive.
if (recv_buffer_.empty()) {
if (async_) {
@@ -273,6 +275,7 @@ int VirtualSocket::RecvFrom(void* pv,
}
int VirtualSocket::Listen(int backlog) {
+ webrtc::MutexLock lock(&mutex_);
RTC_DCHECK(SOCK_STREAM == type_);
RTC_DCHECK(CS_CLOSED == state_);
if (local_addr_.IsNil()) {
@@ -280,12 +283,13 @@ int VirtualSocket::Listen(int backlog) {
return -1;
}
RTC_DCHECK(nullptr == listen_queue_);
- listen_queue_ = new ListenQueue;
+ listen_queue_ = std::make_unique<ListenQueue>();
state_ = CS_CONNECTING;
return 0;
}
VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) {
+ webrtc::MutexLock lock(&mutex_);
if (nullptr == listen_queue_) {
error_ = EINVAL;
return nullptr;
@@ -304,7 +308,7 @@ VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) {
delete socket;
continue;
}
- socket->CompleteConnect(remote_addr, false);
+ socket->CompleteConnect(remote_addr);
if (paddr) {
*paddr = remote_addr;
}
@@ -341,47 +345,57 @@ int VirtualSocket::SetOption(Option opt, int value) {
}
void VirtualSocket::OnMessage(Message* pmsg) {
- if (pmsg->message_id == MSG_ID_PACKET) {
- RTC_DCHECK(nullptr != pmsg->pdata);
- Packet* packet = static_cast<Packet*>(pmsg->pdata);
-
- recv_buffer_.push_back(packet);
-
- if (async_) {
- SignalReadEvent(this);
- }
- } else if (pmsg->message_id == MSG_ID_CONNECT) {
- RTC_DCHECK(nullptr != pmsg->pdata);
- MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
- if (listen_queue_ != nullptr) {
- listen_queue_->push_back(data->addr);
- if (async_) {
- SignalReadEvent(this);
+ bool signal_read_event = false;
+ bool signal_close_event = false;
+ bool signal_connect_event = false;
+ int error_to_signal = 0;
+ {
+ webrtc::MutexLock lock(&mutex_);
+ if (pmsg->message_id == MSG_ID_PACKET) {
+ RTC_DCHECK(nullptr != pmsg->pdata);
+ Packet* packet = static_cast<Packet*>(pmsg->pdata);
+
+ recv_buffer_.push_back(packet);
+ signal_read_event = async_;
+ } else if (pmsg->message_id == MSG_ID_CONNECT) {
+ RTC_DCHECK(nullptr != pmsg->pdata);
+ MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
+ if (listen_queue_ != nullptr) {
+ listen_queue_->push_back(data->addr);
+ signal_read_event = async_;
+ } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
+ CompleteConnect(data->addr);
+ signal_connect_event = async_;
+ } else {
+ RTC_LOG(LS_VERBOSE)
+ << "Socket at " << local_addr_.ToString() << " is not listening";
+ server_->Disconnect(data->addr);
}
- } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
- CompleteConnect(data->addr, true);
- } else {
- RTC_LOG(LS_VERBOSE) << "Socket at " << local_addr_.ToString()
- << " is not listening";
- server_->Disconnect(data->addr);
- }
- delete data;
- } else if (pmsg->message_id == MSG_ID_DISCONNECT) {
- RTC_DCHECK(SOCK_STREAM == type_);
- if (CS_CLOSED != state_) {
- int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
- state_ = CS_CLOSED;
- remote_addr_.Clear();
- if (async_) {
- SignalCloseEvent(this, error);
+ delete data;
+ } else if (pmsg->message_id == MSG_ID_DISCONNECT) {
+ RTC_DCHECK(SOCK_STREAM == type_);
+ if (CS_CLOSED != state_) {
+ error_to_signal = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
+ state_ = CS_CLOSED;
+ remote_addr_.Clear();
+ signal_close_event = async_;
}
+ } else if (pmsg->message_id == MSG_ID_SIGNALREADEVENT) {
+ signal_read_event = !recv_buffer_.empty();
+ } else {
+ RTC_NOTREACHED();
}
- } else if (pmsg->message_id == MSG_ID_SIGNALREADEVENT) {
- if (!recv_buffer_.empty()) {
- SignalReadEvent(this);
- }
- } else {
- RTC_NOTREACHED();
+ }
+ // Signal events without holding `mutex_`, to avoid recursive locking, as well
+ // as issues with sigslot and lock order.
+ if (signal_read_event) {
+ SignalReadEvent(this);
+ }
+ if (signal_close_event) {
+ SignalCloseEvent(this, error_to_signal);
+ }
+ if (signal_connect_event) {
+ SignalConnectEvent(this);
}
}
@@ -416,14 +430,11 @@ int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) {
return 0;
}
-void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) {
+void VirtualSocket::CompleteConnect(const SocketAddress& addr) {
RTC_DCHECK(CS_CONNECTING == state_);
remote_addr_ = addr;
state_ = CS_CONNECTED;
server_->AddConnection(remote_addr_, local_addr_, this);
- if (async_ && notify) {
- SignalConnectEvent(this);
- }
}
int VirtualSocket::SendUdp(const void* pv,
@@ -475,7 +486,7 @@ void VirtualSocket::OnSocketServerReadyToSend() {
}
void VirtualSocket::SetToBlocked() {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
ready_to_send_ = false;
error_ = EWOULDBLOCK;
}
@@ -525,7 +536,7 @@ int64_t VirtualSocket::UpdateOrderedDelivery(int64_t ts) {
}
size_t VirtualSocket::PurgeNetworkPackets(int64_t cur_time) {
- CritScope cs(&crit_);
+ webrtc::MutexLock lock(&mutex_);
while (!network_.empty() && (network_.front().done_time <= cur_time)) {
RTC_DCHECK(network_size_ >= network_.front().size);
@@ -613,10 +624,6 @@ VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) {
void VirtualSocketServer::SetMessageQueue(Thread* msg_queue) {
msg_queue_ = msg_queue;
- if (msg_queue_) {
- msg_queue_->SignalQueueDestroyed.connect(
- this, &VirtualSocketServer::OnMessageQueueDestroyed);
- }
}
bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {