aboutsummaryrefslogtreecommitdiff
path: root/media
diff options
context:
space:
mode:
authorFlorent Castelli <orphis@webrtc.org>2022-08-22 17:46:39 +0000
committerWebRTC LUCI CQ <webrtc-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-08-23 14:32:28 +0000
commitdbc2ba20266c2b603b5808d6720ef0a6dbad02da (patch)
treedd2e4b1991109f1b79fef214048b9eda778cf87d /media
parent64c70a260ec6891e3fa435f16624d3a36a7985e9 (diff)
downloadwebrtc-dbc2ba20266c2b603b5808d6720ef0a6dbad02da.tar.gz
dcsctp: Track open channels accurately
In rare cases, it is possible to queue a call to SendData from the signaling thread on a channel being closed or already closed in the network thread. By keeping track of currently open streams, we avoid sending messages with a stream id of channels that the other side already considers closed and has already reused for a new channel. This caused rare messages to be delivered on the wrong data channel if a message was quickly sent, channel closed and a new one reopened. Bug: webrtc:14277 Change-Id: If35fed8d12d5d2c18cdc6601085d8b632c37a0ba Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272624 Commit-Queue: Florent Castelli <orphis@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37880}
Diffstat (limited to 'media')
-rw-r--r--media/sctp/dcsctp_transport.cc80
-rw-r--r--media/sctp/dcsctp_transport.h8
-rw-r--r--media/sctp/dcsctp_transport_unittest.cc62
3 files changed, 129 insertions, 21 deletions
diff --git a/media/sctp/dcsctp_transport.cc b/media/sctp/dcsctp_transport.cc
index c8e6da4f00..062360d251 100644
--- a/media/sctp/dcsctp_transport.cc
+++ b/media/sctp/dcsctp_transport.cc
@@ -218,16 +218,17 @@ bool DcSctpTransport::Start(int local_sctp_port,
}
bool DcSctpTransport::OpenStream(int sid) {
+ RTC_DCHECK_RUN_ON(network_thread_);
RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ").";
- if (!socket_) {
- RTC_LOG(LS_ERROR) << debug_name_ << "->OpenStream(sid=" << sid
- << "): Transport is not started.";
- return false;
- }
+
+ StreamState stream_state;
+ stream_states_.insert_or_assign(dcsctp::StreamID(static_cast<uint16_t>(sid)),
+ stream_state);
return true;
}
bool DcSctpTransport::ResetStream(int sid) {
+ RTC_DCHECK_RUN_ON(network_thread_);
RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ").";
if (!socket_) {
RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
@@ -237,14 +238,21 @@ bool DcSctpTransport::ResetStream(int sid) {
dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))};
- StreamClosingState& closing_state = closing_states_[streams[0]];
- if (closing_state.closure_initiated || closing_state.incoming_reset_done ||
- closing_state.outgoing_reset_done) {
+ auto it = stream_states_.find(streams[0]);
+ if (it == stream_states_.end()) {
+ RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
+ << "): Stream is not open.";
+ return false;
+ }
+
+ StreamState& stream_state = it->second;
+ if (stream_state.closure_initiated || stream_state.incoming_reset_done ||
+ stream_state.outgoing_reset_done) {
// The closing procedure was already initiated by the remote, don't do
// anything.
return false;
}
- closing_state.closure_initiated = true;
+ stream_state.closure_initiated = true;
socket_->ResetStreams(streams);
return true;
}
@@ -265,6 +273,30 @@ bool DcSctpTransport::SendData(int sid,
return false;
}
+ // It is possible for a message to be sent from the signaling thread at the
+ // same time a data-channel is closing, but before the signaling thread is
+ // aware of it. So we need to keep track of currently active data channels and
+ // skip sending messages for the ones that are not open or closing.
+ // The sending errors are not impacting the data channel API contract as
+ // it is allowed to discard queued messages when the channel is closing.
+ auto stream_state =
+ stream_states_.find(dcsctp::StreamID(static_cast<uint16_t>(sid)));
+ if (stream_state == stream_states_.end()) {
+ RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: "
+ << sid;
+ *result = cricket::SDR_ERROR;
+ return false;
+ }
+
+ if (stream_state->second.closure_initiated ||
+ stream_state->second.incoming_reset_done ||
+ stream_state->second.outgoing_reset_done) {
+ RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: "
+ << sid;
+ *result = cricket::SDR_ERROR;
+ return false;
+ }
+
auto max_message_size = socket_->options().max_message_size;
if (max_message_size > 0 && payload.size() > max_message_size) {
RTC_LOG(LS_WARNING) << debug_name_
@@ -519,16 +551,23 @@ void DcSctpTransport::OnStreamsResetPerformed(
RTC_LOG(LS_INFO) << debug_name_
<< "->OnStreamsResetPerformed(...): Outgoing stream reset"
<< ", sid=" << stream_id.value();
- StreamClosingState& closing_state = closing_states_[stream_id];
- closing_state.outgoing_reset_done = true;
- if (closing_state.incoming_reset_done) {
+ auto it = stream_states_.find(stream_id);
+ if (it == stream_states_.end()) {
+ // Ignoring an outgoing stream reset for a closed stream
+ return;
+ }
+
+ StreamState& stream_state = it->second;
+ stream_state.outgoing_reset_done = true;
+
+ if (stream_state.incoming_reset_done) {
// When the close was not initiated locally, we can signal the end of the
// data channel close procedure when the remote ACKs the reset.
if (data_channel_sink_) {
data_channel_sink_->OnChannelClosed(stream_id.value());
}
- closing_states_.erase(stream_id);
+ stream_states_.erase(stream_id);
}
}
}
@@ -540,10 +579,15 @@ void DcSctpTransport::OnIncomingStreamsReset(
RTC_LOG(LS_INFO) << debug_name_
<< "->OnIncomingStreamsReset(...): Incoming stream reset"
<< ", sid=" << stream_id.value();
- StreamClosingState& closing_state = closing_states_[stream_id];
- closing_state.incoming_reset_done = true;
- if (!closing_state.closure_initiated) {
+ auto it = stream_states_.find(stream_id);
+ if (it == stream_states_.end())
+ return;
+
+ StreamState& stream_state = it->second;
+ stream_state.incoming_reset_done = true;
+
+ if (!stream_state.closure_initiated) {
// When receiving an incoming stream reset event for a non local close
// procedure, the transport needs to reset the stream in the other
// direction too.
@@ -554,13 +598,13 @@ void DcSctpTransport::OnIncomingStreamsReset(
}
}
- if (closing_state.outgoing_reset_done) {
+ if (stream_state.outgoing_reset_done) {
// The close procedure that was initiated locally is complete when we
// receive and incoming reset event.
if (data_channel_sink_) {
data_channel_sink_->OnChannelClosed(stream_id.value());
}
- closing_states_.erase(stream_id);
+ stream_states_.erase(stream_id);
}
}
}
diff --git a/media/sctp/dcsctp_transport.h b/media/sctp/dcsctp_transport.h
index 1f71db87c4..f86ac5a23a 100644
--- a/media/sctp/dcsctp_transport.h
+++ b/media/sctp/dcsctp_transport.h
@@ -114,10 +114,10 @@ class DcSctpTransport : public cricket::SctpTransportInternal,
std::string debug_name_ = "DcSctpTransport";
rtc::CopyOnWriteBuffer receive_buffer_;
- // Used to keep track of the closing state of the data channel.
+ // Used to keep track of the state of data channels.
// Reset needs to happen both ways before signaling the transport
// is closed.
- struct StreamClosingState {
+ struct StreamState {
// True when the local connection has initiated the reset.
// If a connection receives a reset for a stream that isn't
// already being reset locally, it needs to fire the signal
@@ -129,7 +129,9 @@ class DcSctpTransport : public cricket::SctpTransportInternal,
bool outgoing_reset_done = false;
};
- flat_map<dcsctp::StreamID, StreamClosingState> closing_states_;
+ // Map of all currently open or closing data channels
+ flat_map<dcsctp::StreamID, StreamState> stream_states_
+ RTC_GUARDED_BY(network_thread_);
bool ready_to_send_data_ = false;
std::function<void()> on_connected_callback_ RTC_GUARDED_BY(network_thread_);
DataChannelSink* data_channel_sink_ RTC_GUARDED_BY(network_thread_) = nullptr;
diff --git a/media/sctp/dcsctp_transport_unittest.cc b/media/sctp/dcsctp_transport_unittest.cc
index 270b06a63f..08dc2ec0b6 100644
--- a/media/sctp/dcsctp_transport_unittest.cc
+++ b/media/sctp/dcsctp_transport_unittest.cc
@@ -18,6 +18,7 @@
#include "p2p/base/fake_packet_transport.h"
#include "test/gtest.h"
+using ::testing::_;
using ::testing::ByMove;
using ::testing::DoAll;
using ::testing::ElementsAre;
@@ -25,6 +26,7 @@ using ::testing::InSequence;
using ::testing::Invoke;
using ::testing::NiceMock;
using ::testing::Return;
+using ::testing::ReturnPointee;
namespace webrtc {
@@ -112,6 +114,7 @@ TEST(DcSctpTransportTest, CloseSequence) {
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
peer_b.sctp_transport_->Start(5000, 5000, 256 * 1024);
peer_a.sctp_transport_->OpenStream(1);
+ peer_b.sctp_transport_->OpenStream(1);
peer_a.sctp_transport_->ResetStream(1);
// Simulate the callbacks from the stream resets
@@ -153,6 +156,7 @@ TEST(DcSctpTransportTest, CloseSequenceSimultaneous) {
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
peer_b.sctp_transport_->Start(5000, 5000, 256 * 1024);
peer_a.sctp_transport_->OpenStream(1);
+ peer_b.sctp_transport_->OpenStream(1);
peer_a.sctp_transport_->ResetStream(1);
peer_b.sctp_transport_->ResetStream(1);
@@ -168,4 +172,62 @@ TEST(DcSctpTransportTest, CloseSequenceSimultaneous) {
->OnIncomingStreamsReset(streams);
}
+TEST(DcSctpTransportTest, DiscardMessageClosedChannel) {
+ rtc::AutoThread main_thread;
+ Peer peer_a;
+
+ EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(0);
+
+ peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
+
+ cricket::SendDataResult result;
+ SendDataParams params;
+ rtc::CopyOnWriteBuffer payload;
+ bool send_data_return =
+ peer_a.sctp_transport_->SendData(1, params, payload, &result);
+ EXPECT_FALSE(send_data_return);
+ EXPECT_EQ(cricket::SDR_ERROR, result);
+}
+
+TEST(DcSctpTransportTest, DiscardMessageClosingChannel) {
+ rtc::AutoThread main_thread;
+ Peer peer_a;
+
+ EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(0);
+
+ peer_a.sctp_transport_->OpenStream(1);
+ peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
+ peer_a.sctp_transport_->ResetStream(1);
+
+ cricket::SendDataResult result;
+ SendDataParams params;
+ rtc::CopyOnWriteBuffer payload;
+
+ bool send_data_return =
+ peer_a.sctp_transport_->SendData(1, params, payload, &result);
+ EXPECT_FALSE(send_data_return);
+ EXPECT_EQ(cricket::SDR_ERROR, result);
+}
+
+TEST(DcSctpTransportTest, SendDataOpenChannel) {
+ rtc::AutoThread main_thread;
+ Peer peer_a;
+ dcsctp::DcSctpOptions options;
+
+ EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(1);
+ EXPECT_CALL(*peer_a.socket_, options()).WillOnce(ReturnPointee(&options));
+
+ peer_a.sctp_transport_->OpenStream(1);
+ peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
+
+ cricket::SendDataResult result;
+ SendDataParams params;
+ rtc::CopyOnWriteBuffer payload;
+
+ bool send_data_return =
+ peer_a.sctp_transport_->SendData(1, params, payload, &result);
+ EXPECT_TRUE(send_data_return);
+ EXPECT_EQ(cricket::SDR_SUCCESS, result);
+}
+
} // namespace webrtc