diff options
Diffstat (limited to 'rtc_base/virtual_socket_server.cc')
-rw-r--r-- | rtc_base/virtual_socket_server.cc | 111 |
1 files changed, 59 insertions, 52 deletions
diff --git a/rtc_base/virtual_socket_server.cc b/rtc_base/virtual_socket_server.cc index 80d7f3c047..f5e993645e 100644 --- a/rtc_base/virtual_socket_server.cc +++ b/rtc_base/virtual_socket_server.cc @@ -19,7 +19,6 @@ #include "absl/algorithm/container.h" #include "rtc_base/checks.h" -#include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/fake_clock.h" #include "rtc_base/logging.h" #include "rtc_base/physical_socket_server.h" @@ -164,6 +163,8 @@ int VirtualSocket::Close() { } if (SOCK_STREAM == type_) { + webrtc::MutexLock lock(&mutex_); + // Cancel pending sockets if (listen_queue_) { while (!listen_queue_->empty()) { @@ -173,7 +174,6 @@ int VirtualSocket::Close() { server_->Disconnect(addr); listen_queue_->pop_front(); } - delete listen_queue_; listen_queue_ = nullptr; } // Disconnect stream sockets @@ -231,6 +231,8 @@ int VirtualSocket::RecvFrom(void* pv, if (timestamp) { *timestamp = -1; } + + webrtc::MutexLock lock(&mutex_); // If we don't have a packet, then either error or wait for one to arrive. if (recv_buffer_.empty()) { if (async_) { @@ -273,6 +275,7 @@ int VirtualSocket::RecvFrom(void* pv, } int VirtualSocket::Listen(int backlog) { + webrtc::MutexLock lock(&mutex_); RTC_DCHECK(SOCK_STREAM == type_); RTC_DCHECK(CS_CLOSED == state_); if (local_addr_.IsNil()) { @@ -280,12 +283,13 @@ int VirtualSocket::Listen(int backlog) { return -1; } RTC_DCHECK(nullptr == listen_queue_); - listen_queue_ = new ListenQueue; + listen_queue_ = std::make_unique<ListenQueue>(); state_ = CS_CONNECTING; return 0; } VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { + webrtc::MutexLock lock(&mutex_); if (nullptr == listen_queue_) { error_ = EINVAL; return nullptr; @@ -304,7 +308,7 @@ VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { delete socket; continue; } - socket->CompleteConnect(remote_addr, false); + socket->CompleteConnect(remote_addr); if (paddr) { *paddr = remote_addr; } @@ -341,47 +345,57 @@ int VirtualSocket::SetOption(Option opt, int value) { } void VirtualSocket::OnMessage(Message* pmsg) { - if (pmsg->message_id == MSG_ID_PACKET) { - RTC_DCHECK(nullptr != pmsg->pdata); - Packet* packet = static_cast<Packet*>(pmsg->pdata); - - recv_buffer_.push_back(packet); - - if (async_) { - SignalReadEvent(this); - } - } else if (pmsg->message_id == MSG_ID_CONNECT) { - RTC_DCHECK(nullptr != pmsg->pdata); - MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata); - if (listen_queue_ != nullptr) { - listen_queue_->push_back(data->addr); - if (async_) { - SignalReadEvent(this); + bool signal_read_event = false; + bool signal_close_event = false; + bool signal_connect_event = false; + int error_to_signal = 0; + { + webrtc::MutexLock lock(&mutex_); + if (pmsg->message_id == MSG_ID_PACKET) { + RTC_DCHECK(nullptr != pmsg->pdata); + Packet* packet = static_cast<Packet*>(pmsg->pdata); + + recv_buffer_.push_back(packet); + signal_read_event = async_; + } else if (pmsg->message_id == MSG_ID_CONNECT) { + RTC_DCHECK(nullptr != pmsg->pdata); + MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata); + if (listen_queue_ != nullptr) { + listen_queue_->push_back(data->addr); + signal_read_event = async_; + } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { + CompleteConnect(data->addr); + signal_connect_event = async_; + } else { + RTC_LOG(LS_VERBOSE) + << "Socket at " << local_addr_.ToString() << " is not listening"; + server_->Disconnect(data->addr); } - } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { - CompleteConnect(data->addr, true); - } else { - RTC_LOG(LS_VERBOSE) << "Socket at " << local_addr_.ToString() - << " is not listening"; - server_->Disconnect(data->addr); - } - delete data; - } else if (pmsg->message_id == MSG_ID_DISCONNECT) { - RTC_DCHECK(SOCK_STREAM == type_); - if (CS_CLOSED != state_) { - int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; - state_ = CS_CLOSED; - remote_addr_.Clear(); - if (async_) { - SignalCloseEvent(this, error); + delete data; + } else if (pmsg->message_id == MSG_ID_DISCONNECT) { + RTC_DCHECK(SOCK_STREAM == type_); + if (CS_CLOSED != state_) { + error_to_signal = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; + state_ = CS_CLOSED; + remote_addr_.Clear(); + signal_close_event = async_; } + } else if (pmsg->message_id == MSG_ID_SIGNALREADEVENT) { + signal_read_event = !recv_buffer_.empty(); + } else { + RTC_NOTREACHED(); } - } else if (pmsg->message_id == MSG_ID_SIGNALREADEVENT) { - if (!recv_buffer_.empty()) { - SignalReadEvent(this); - } - } else { - RTC_NOTREACHED(); + } + // Signal events without holding `mutex_`, to avoid recursive locking, as well + // as issues with sigslot and lock order. + if (signal_read_event) { + SignalReadEvent(this); + } + if (signal_close_event) { + SignalCloseEvent(this, error_to_signal); + } + if (signal_connect_event) { + SignalConnectEvent(this); } } @@ -416,14 +430,11 @@ int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { return 0; } -void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { +void VirtualSocket::CompleteConnect(const SocketAddress& addr) { RTC_DCHECK(CS_CONNECTING == state_); remote_addr_ = addr; state_ = CS_CONNECTED; server_->AddConnection(remote_addr_, local_addr_, this); - if (async_ && notify) { - SignalConnectEvent(this); - } } int VirtualSocket::SendUdp(const void* pv, @@ -475,7 +486,7 @@ void VirtualSocket::OnSocketServerReadyToSend() { } void VirtualSocket::SetToBlocked() { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); ready_to_send_ = false; error_ = EWOULDBLOCK; } @@ -525,7 +536,7 @@ int64_t VirtualSocket::UpdateOrderedDelivery(int64_t ts) { } size_t VirtualSocket::PurgeNetworkPackets(int64_t cur_time) { - CritScope cs(&crit_); + webrtc::MutexLock lock(&mutex_); while (!network_.empty() && (network_.front().done_time <= cur_time)) { RTC_DCHECK(network_size_ >= network_.front().size); @@ -613,10 +624,6 @@ VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) { void VirtualSocketServer::SetMessageQueue(Thread* msg_queue) { msg_queue_ = msg_queue; - if (msg_queue_) { - msg_queue_->SignalQueueDestroyed.connect( - this, &VirtualSocketServer::OnMessageQueueDestroyed); - } } bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { |