diff options
Diffstat (limited to 'pc')
-rw-r--r-- | pc/data_channel.cc | 152 | ||||
-rw-r--r-- | pc/data_channel.h | 2 |
2 files changed, 106 insertions, 48 deletions
diff --git a/pc/data_channel.cc b/pc/data_channel.cc index ca6b6145cb..fcf38f9574 100644 --- a/pc/data_channel.cc +++ b/pc/data_channel.cc @@ -371,19 +371,21 @@ bool DataChannel::Send(const DataBuffer& buffer) { // If the queue is non-empty, we're waiting for SignalReadyToSend, // so just add to the end of the queue and keep waiting. - if (!queued_send_data_.Empty()) { - // Only SCTP DataChannel queues the outgoing data when the transport is - // blocked. - RTC_DCHECK(IsSctpLike(data_channel_type_)); - if (!QueueSendDataMessage(buffer)) { - RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to queue " - "additional data."; - // https://w3c.github.io/webrtc-pc/#dom-rtcdatachannel-send step 5 - // Note that the spec doesn't explicitly say to close in this situation. - CloseAbruptlyWithError(RTCError(RTCErrorType::RESOURCE_EXHAUSTED, - "Unable to queue data for sending")); + // Only SCTP DataChannel queues the outgoing data when the transport is + // blocked. + if (IsSctpLike(data_channel_type_)) { + if (!queued_send_data_.Empty()) { + if (!QueueSendDataMessage(buffer)) { + RTC_LOG(LS_ERROR) + << "Closing the DataChannel due to a failure to queue " + "additional data."; + // https://w3c.github.io/webrtc-pc/#dom-rtcdatachannel-send step 5 + // Note that the spec doesn't explicitly say to close in this situation. + CloseAbruptlyWithError(RTCError(RTCErrorType::RESOURCE_EXHAUSTED, + "Unable to queue data for sending")); + } + return true; } - return true; } bool success = SendDataMessage(buffer, true); @@ -397,7 +399,7 @@ bool DataChannel::Send(const DataBuffer& buffer) { void DataChannel::SetReceiveSsrc(uint32_t receive_ssrc) { RTC_DCHECK_RUN_ON(signaling_thread_); - RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP); + RTC_DCHECK_EQ(data_channel_type_, cricket::DCT_RTP); if (receive_ssrc_set_) { return; @@ -408,9 +410,13 @@ void DataChannel::SetReceiveSsrc(uint32_t receive_ssrc) { } void DataChannel::SetSctpSid(int sid) { + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK_LT(config_.id, 0); RTC_DCHECK_GE(sid, 0); RTC_DCHECK(IsSctpLike(data_channel_type_)); + RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck); + RTC_DCHECK_EQ(state_, kConnecting); + if (config_.id == sid) { return; } @@ -573,8 +579,11 @@ void DataChannel::OnChannelReady(bool writable) { return; } - SendQueuedControlMessages(); - SendQueuedDataMessages(); + if (IsSctpLike(data_channel_type_)) { + SendQueuedControlMessages(); + SendQueuedDataMessages(); + } + UpdateState(); } @@ -590,9 +599,12 @@ void DataChannel::CloseAbruptlyWithError(RTCError error) { } // Closing abruptly means any queued data gets thrown away. - queued_send_data_.Clear(); buffered_amount_ = 0; - queued_control_data_.Clear(); + + if (IsSctpLike(data_channel_type_)) { + queued_send_data_.Clear(); + queued_control_data_.Clear(); + } // Still go to "kClosing" before "kClosed", since observers may be expecting // that. @@ -614,22 +626,28 @@ void DataChannel::UpdateState() { // all conditions required for each state transition here for // clarity. OnChannelReady(true) will send any queued data and then invoke // UpdateState(). + if (data_channel_type_ == cricket::DCT_RTP) { + UpdateRtpState(); + } else { + UpdateSctpLikeState(); + } +} + +void DataChannel::UpdateRtpState() { + RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK_EQ(data_channel_type_, cricket::DCT_RTP); + + // UpdateState determines what to do from a few state variables. Include + // all conditions required for each state transition here for + // clarity. OnChannelReady(true) will send any queued data and then invoke + // UpdateState(). switch (state_) { case kConnecting: { if (send_ssrc_set_ == receive_ssrc_set_) { - if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) { + if (!connected_to_provider_) { connected_to_provider_ = provider_->ConnectDataChannel(this); } if (connected_to_provider_) { - if (handshake_state_ == kHandshakeShouldSendOpen) { - rtc::CopyOnWriteBuffer payload; - WriteDataChannelOpenMessage(label_, config_, &payload); - SendControlMessage(payload); - } else if (handshake_state_ == kHandshakeShouldSendAck) { - rtc::CopyOnWriteBuffer payload; - WriteDataChannelOpenAckMessage(&payload); - SendControlMessage(payload); - } if (writable_ && (handshake_state_ == kHandshakeReady || handshake_state_ == kHandshakeWaitingForAck)) { SetState(kOpen); @@ -645,28 +663,62 @@ void DataChannel::UpdateState() { break; } case kClosing: { + // For RTP data channels, we can go to "closed" after we finish + // sending data and the send/recv SSRCs are unset. + if (connected_to_provider_) { + DisconnectFromProvider(); + } + if (!send_ssrc_set_ && !receive_ssrc_set_) { + SetState(kClosed); + } + break; + } + case kClosed: + break; + } +} + +void DataChannel::UpdateSctpLikeState() { + RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK(IsSctpLike(data_channel_type_)); + + switch (state_) { + case kConnecting: { + if (connected_to_provider_) { + if (handshake_state_ == kHandshakeShouldSendOpen) { + rtc::CopyOnWriteBuffer payload; + WriteDataChannelOpenMessage(label_, config_, &payload); + SendControlMessage(payload); + } else if (handshake_state_ == kHandshakeShouldSendAck) { + rtc::CopyOnWriteBuffer payload; + WriteDataChannelOpenAckMessage(&payload); + SendControlMessage(payload); + } + if (writable_ && (handshake_state_ == kHandshakeReady || + handshake_state_ == kHandshakeWaitingForAck)) { + SetState(kOpen); + // If we have received buffers before the channel got writable. + // Deliver them now. + DeliverQueuedReceivedData(); + } + } + break; + } + case kOpen: { + break; + } + case kClosing: { // Wait for all queued data to be sent before beginning the closing // procedure. if (queued_send_data_.Empty() && queued_control_data_.Empty()) { - if (data_channel_type_ == cricket::DCT_RTP) { - // For RTP data channels, we can go to "closed" after we finish - // sending data and the send/recv SSRCs are unset. - if (connected_to_provider_) { - DisconnectFromProvider(); - } - if (!send_ssrc_set_ && !receive_ssrc_set_) { - SetState(kClosed); - } - } else { - // For SCTP data channels, we need to wait for the closing procedure - // to complete; after calling RemoveSctpDataStream, - // OnClosingProcedureComplete will end up called asynchronously - // afterwards. - if (connected_to_provider_ && !started_closing_procedure_ && - config_.id >= 0) { - started_closing_procedure_ = true; - provider_->RemoveSctpDataStream(config_.id); - } + // For SCTP data channels, we need to wait for the closing procedure + // to complete; after calling RemoveSctpDataStream, + // OnClosingProcedureComplete will end up called asynchronously + // afterwards. + if (connected_to_provider_ && !started_closing_procedure_ && + config_.id >= 0) { + started_closing_procedure_ = true; + provider_->RemoveSctpDataStream(config_.id); } } break; @@ -718,6 +770,7 @@ void DataChannel::DeliverQueuedReceivedData() { void DataChannel::SendQueuedDataMessages() { RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK(IsSctpLike(data_channel_type_)); if (queued_send_data_.Empty()) { return; } @@ -796,6 +849,7 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK(IsSctpLike(data_channel_type_)); size_t start_buffered_amount = queued_send_data_.byte_count(); if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) { RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; @@ -807,6 +861,7 @@ bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { void DataChannel::SendQueuedControlMessages() { RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK(IsSctpLike(data_channel_type_)); PacketQueue control_packets; control_packets.Swap(&queued_control_data_); @@ -818,16 +873,17 @@ void DataChannel::SendQueuedControlMessages() { void DataChannel::QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer) { RTC_DCHECK_RUN_ON(signaling_thread_); + RTC_DCHECK(IsSctpLike(data_channel_type_)); queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true)); } bool DataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { RTC_DCHECK_RUN_ON(signaling_thread_); - bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen; - RTC_DCHECK(IsSctpLike(data_channel_type_)); RTC_DCHECK(writable_); RTC_DCHECK_GE(config_.id, 0); + + bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen; RTC_DCHECK(!is_open_message || !config_.negotiated); cricket::SendDataParams send_params; diff --git a/pc/data_channel.h b/pc/data_channel.h index 09b6692f02..f6c5f819e6 100644 --- a/pc/data_channel.h +++ b/pc/data_channel.h @@ -296,6 +296,8 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { bool Init(); void UpdateState(); + void UpdateRtpState(); + void UpdateSctpLikeState(); void SetState(DataState state); void DisconnectFromProvider(); |