diff options
author | Florent Castelli <orphis@webrtc.org> | 2022-08-22 17:46:39 +0000 |
---|---|---|
committer | WebRTC LUCI CQ <webrtc-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-08-23 14:32:28 +0000 |
commit | dbc2ba20266c2b603b5808d6720ef0a6dbad02da (patch) | |
tree | dd2e4b1991109f1b79fef214048b9eda778cf87d /media | |
parent | 64c70a260ec6891e3fa435f16624d3a36a7985e9 (diff) | |
download | webrtc-dbc2ba20266c2b603b5808d6720ef0a6dbad02da.tar.gz |
dcsctp: Track open channels accurately
In rare cases, it is possible to queue a call to SendData from the signaling
thread on a channel being closed or already closed in the network thread.
By keeping track of currently open streams, we avoid sending messages
with a stream id of channels that the other side already considers closed
and has already reused for a new channel.
This caused rare messages to be delivered on the wrong data channel if
a message was quickly sent, channel closed and a new one reopened.
Bug: webrtc:14277
Change-Id: If35fed8d12d5d2c18cdc6601085d8b632c37a0ba
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272624
Commit-Queue: Florent Castelli <orphis@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37880}
Diffstat (limited to 'media')
-rw-r--r-- | media/sctp/dcsctp_transport.cc | 80 | ||||
-rw-r--r-- | media/sctp/dcsctp_transport.h | 8 | ||||
-rw-r--r-- | media/sctp/dcsctp_transport_unittest.cc | 62 |
3 files changed, 129 insertions, 21 deletions
diff --git a/media/sctp/dcsctp_transport.cc b/media/sctp/dcsctp_transport.cc index c8e6da4f00..062360d251 100644 --- a/media/sctp/dcsctp_transport.cc +++ b/media/sctp/dcsctp_transport.cc @@ -218,16 +218,17 @@ bool DcSctpTransport::Start(int local_sctp_port, } bool DcSctpTransport::OpenStream(int sid) { + RTC_DCHECK_RUN_ON(network_thread_); RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ")."; - if (!socket_) { - RTC_LOG(LS_ERROR) << debug_name_ << "->OpenStream(sid=" << sid - << "): Transport is not started."; - return false; - } + + StreamState stream_state; + stream_states_.insert_or_assign(dcsctp::StreamID(static_cast<uint16_t>(sid)), + stream_state); return true; } bool DcSctpTransport::ResetStream(int sid) { + RTC_DCHECK_RUN_ON(network_thread_); RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ")."; if (!socket_) { RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid @@ -237,14 +238,21 @@ bool DcSctpTransport::ResetStream(int sid) { dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))}; - StreamClosingState& closing_state = closing_states_[streams[0]]; - if (closing_state.closure_initiated || closing_state.incoming_reset_done || - closing_state.outgoing_reset_done) { + auto it = stream_states_.find(streams[0]); + if (it == stream_states_.end()) { + RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid + << "): Stream is not open."; + return false; + } + + StreamState& stream_state = it->second; + if (stream_state.closure_initiated || stream_state.incoming_reset_done || + stream_state.outgoing_reset_done) { // The closing procedure was already initiated by the remote, don't do // anything. return false; } - closing_state.closure_initiated = true; + stream_state.closure_initiated = true; socket_->ResetStreams(streams); return true; } @@ -265,6 +273,30 @@ bool DcSctpTransport::SendData(int sid, return false; } + // It is possible for a message to be sent from the signaling thread at the + // same time a data-channel is closing, but before the signaling thread is + // aware of it. So we need to keep track of currently active data channels and + // skip sending messages for the ones that are not open or closing. + // The sending errors are not impacting the data channel API contract as + // it is allowed to discard queued messages when the channel is closing. + auto stream_state = + stream_states_.find(dcsctp::StreamID(static_cast<uint16_t>(sid))); + if (stream_state == stream_states_.end()) { + RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: " + << sid; + *result = cricket::SDR_ERROR; + return false; + } + + if (stream_state->second.closure_initiated || + stream_state->second.incoming_reset_done || + stream_state->second.outgoing_reset_done) { + RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: " + << sid; + *result = cricket::SDR_ERROR; + return false; + } + auto max_message_size = socket_->options().max_message_size; if (max_message_size > 0 && payload.size() > max_message_size) { RTC_LOG(LS_WARNING) << debug_name_ @@ -519,16 +551,23 @@ void DcSctpTransport::OnStreamsResetPerformed( RTC_LOG(LS_INFO) << debug_name_ << "->OnStreamsResetPerformed(...): Outgoing stream reset" << ", sid=" << stream_id.value(); - StreamClosingState& closing_state = closing_states_[stream_id]; - closing_state.outgoing_reset_done = true; - if (closing_state.incoming_reset_done) { + auto it = stream_states_.find(stream_id); + if (it == stream_states_.end()) { + // Ignoring an outgoing stream reset for a closed stream + return; + } + + StreamState& stream_state = it->second; + stream_state.outgoing_reset_done = true; + + if (stream_state.incoming_reset_done) { // When the close was not initiated locally, we can signal the end of the // data channel close procedure when the remote ACKs the reset. if (data_channel_sink_) { data_channel_sink_->OnChannelClosed(stream_id.value()); } - closing_states_.erase(stream_id); + stream_states_.erase(stream_id); } } } @@ -540,10 +579,15 @@ void DcSctpTransport::OnIncomingStreamsReset( RTC_LOG(LS_INFO) << debug_name_ << "->OnIncomingStreamsReset(...): Incoming stream reset" << ", sid=" << stream_id.value(); - StreamClosingState& closing_state = closing_states_[stream_id]; - closing_state.incoming_reset_done = true; - if (!closing_state.closure_initiated) { + auto it = stream_states_.find(stream_id); + if (it == stream_states_.end()) + return; + + StreamState& stream_state = it->second; + stream_state.incoming_reset_done = true; + + if (!stream_state.closure_initiated) { // When receiving an incoming stream reset event for a non local close // procedure, the transport needs to reset the stream in the other // direction too. @@ -554,13 +598,13 @@ void DcSctpTransport::OnIncomingStreamsReset( } } - if (closing_state.outgoing_reset_done) { + if (stream_state.outgoing_reset_done) { // The close procedure that was initiated locally is complete when we // receive and incoming reset event. if (data_channel_sink_) { data_channel_sink_->OnChannelClosed(stream_id.value()); } - closing_states_.erase(stream_id); + stream_states_.erase(stream_id); } } } diff --git a/media/sctp/dcsctp_transport.h b/media/sctp/dcsctp_transport.h index 1f71db87c4..f86ac5a23a 100644 --- a/media/sctp/dcsctp_transport.h +++ b/media/sctp/dcsctp_transport.h @@ -114,10 +114,10 @@ class DcSctpTransport : public cricket::SctpTransportInternal, std::string debug_name_ = "DcSctpTransport"; rtc::CopyOnWriteBuffer receive_buffer_; - // Used to keep track of the closing state of the data channel. + // Used to keep track of the state of data channels. // Reset needs to happen both ways before signaling the transport // is closed. - struct StreamClosingState { + struct StreamState { // True when the local connection has initiated the reset. // If a connection receives a reset for a stream that isn't // already being reset locally, it needs to fire the signal @@ -129,7 +129,9 @@ class DcSctpTransport : public cricket::SctpTransportInternal, bool outgoing_reset_done = false; }; - flat_map<dcsctp::StreamID, StreamClosingState> closing_states_; + // Map of all currently open or closing data channels + flat_map<dcsctp::StreamID, StreamState> stream_states_ + RTC_GUARDED_BY(network_thread_); bool ready_to_send_data_ = false; std::function<void()> on_connected_callback_ RTC_GUARDED_BY(network_thread_); DataChannelSink* data_channel_sink_ RTC_GUARDED_BY(network_thread_) = nullptr; diff --git a/media/sctp/dcsctp_transport_unittest.cc b/media/sctp/dcsctp_transport_unittest.cc index 270b06a63f..08dc2ec0b6 100644 --- a/media/sctp/dcsctp_transport_unittest.cc +++ b/media/sctp/dcsctp_transport_unittest.cc @@ -18,6 +18,7 @@ #include "p2p/base/fake_packet_transport.h" #include "test/gtest.h" +using ::testing::_; using ::testing::ByMove; using ::testing::DoAll; using ::testing::ElementsAre; @@ -25,6 +26,7 @@ using ::testing::InSequence; using ::testing::Invoke; using ::testing::NiceMock; using ::testing::Return; +using ::testing::ReturnPointee; namespace webrtc { @@ -112,6 +114,7 @@ TEST(DcSctpTransportTest, CloseSequence) { peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_b.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_a.sctp_transport_->OpenStream(1); + peer_b.sctp_transport_->OpenStream(1); peer_a.sctp_transport_->ResetStream(1); // Simulate the callbacks from the stream resets @@ -153,6 +156,7 @@ TEST(DcSctpTransportTest, CloseSequenceSimultaneous) { peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_b.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_a.sctp_transport_->OpenStream(1); + peer_b.sctp_transport_->OpenStream(1); peer_a.sctp_transport_->ResetStream(1); peer_b.sctp_transport_->ResetStream(1); @@ -168,4 +172,62 @@ TEST(DcSctpTransportTest, CloseSequenceSimultaneous) { ->OnIncomingStreamsReset(streams); } +TEST(DcSctpTransportTest, DiscardMessageClosedChannel) { + rtc::AutoThread main_thread; + Peer peer_a; + + EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(0); + + peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); + + cricket::SendDataResult result; + SendDataParams params; + rtc::CopyOnWriteBuffer payload; + bool send_data_return = + peer_a.sctp_transport_->SendData(1, params, payload, &result); + EXPECT_FALSE(send_data_return); + EXPECT_EQ(cricket::SDR_ERROR, result); +} + +TEST(DcSctpTransportTest, DiscardMessageClosingChannel) { + rtc::AutoThread main_thread; + Peer peer_a; + + EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(0); + + peer_a.sctp_transport_->OpenStream(1); + peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); + peer_a.sctp_transport_->ResetStream(1); + + cricket::SendDataResult result; + SendDataParams params; + rtc::CopyOnWriteBuffer payload; + + bool send_data_return = + peer_a.sctp_transport_->SendData(1, params, payload, &result); + EXPECT_FALSE(send_data_return); + EXPECT_EQ(cricket::SDR_ERROR, result); +} + +TEST(DcSctpTransportTest, SendDataOpenChannel) { + rtc::AutoThread main_thread; + Peer peer_a; + dcsctp::DcSctpOptions options; + + EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(1); + EXPECT_CALL(*peer_a.socket_, options()).WillOnce(ReturnPointee(&options)); + + peer_a.sctp_transport_->OpenStream(1); + peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); + + cricket::SendDataResult result; + SendDataParams params; + rtc::CopyOnWriteBuffer payload; + + bool send_data_return = + peer_a.sctp_transport_->SendData(1, params, payload, &result); + EXPECT_TRUE(send_data_return); + EXPECT_EQ(cricket::SDR_SUCCESS, result); +} + } // namespace webrtc |