From b43c99de297e2686233cf495625ba1d87cbfe0e4 Mon Sep 17 00:00:00 2001 From: "jiayl@webrtc.org" Date: Fri, 20 Jun 2014 17:11:14 +0000 Subject: Limits the send and receive buffer by bytes, not by packets. The new limit is 16MB for each buffer. Also refactors the code to handle send failure more consistently. BUG=3429 R=juberti@webrtc.org Review URL: https://webrtc-codereview.appspot.com/21559005 git-svn-id: http://webrtc.googlecode.com/svn/trunk@6511 4adac7df-926f-26a2-2b94-8c16560cd09d --- talk/app/webrtc/datachannel.cc | 356 ++++++++++++++++++++--------------------- 1 file changed, 177 insertions(+), 179 deletions(-) (limited to 'talk/app/webrtc/datachannel.cc') diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc index 14caa416b4..af4fb244f5 100644 --- a/talk/app/webrtc/datachannel.cc +++ b/talk/app/webrtc/datachannel.cc @@ -35,13 +35,57 @@ namespace webrtc { -static size_t kMaxQueuedReceivedDataPackets = 100; -static size_t kMaxQueuedSendDataPackets = 100; +static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024; +static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024; enum { MSG_CHANNELREADY, }; +DataChannel::PacketQueue::PacketQueue() : byte_count_(0) {} + +DataChannel::PacketQueue::~PacketQueue() { + Clear(); +} + +bool DataChannel::PacketQueue::Empty() const { + return packets_.empty(); +} + +DataBuffer* DataChannel::PacketQueue::Front() { + return packets_.front(); +} + +void DataChannel::PacketQueue::Pop() { + if (packets_.empty()) { + return; + } + + byte_count_ -= packets_.front()->size(); + packets_.pop_front(); +} + +void DataChannel::PacketQueue::Push(DataBuffer* packet) { + byte_count_ += packet->size(); + packets_.push_back(packet); +} + +void DataChannel::PacketQueue::Clear() { + while (!packets_.empty()) { + delete packets_.front(); + packets_.pop_front(); + } + byte_count_ = 0; +} + +void DataChannel::PacketQueue::Swap(PacketQueue* other) { + size_t other_byte_count = other->byte_count_; + other->byte_count_ = byte_count_; + byte_count_ = other_byte_count; + + other->packets_.swap(packets_); +} + talk_base::scoped_refptr DataChannel::Create( DataChannelProviderInterface* provider, cricket::DataChannelType dct, @@ -114,11 +158,7 @@ bool DataChannel::Init(const InternalDataChannelInit& config) { return true; } -DataChannel::~DataChannel() { - ClearQueuedReceivedData(); - ClearQueuedSendData(); - ClearQueuedControlData(); -} +DataChannel::~DataChannel() {} void DataChannel::RegisterObserver(DataChannelObserver* observer) { observer_ = observer; @@ -139,13 +179,7 @@ bool DataChannel::reliable() const { } uint64 DataChannel::buffered_amount() const { - uint64 buffered_amount = 0; - for (std::deque::const_iterator it = queued_send_data_.begin(); - it != queued_send_data_.end(); - ++it) { - buffered_amount += (*it)->size(); - } - return buffered_amount; + return queued_send_data_.byte_count(); } void DataChannel::Close() { @@ -163,87 +197,23 @@ bool DataChannel::Send(const DataBuffer& buffer) { } // If the queue is non-empty, we're waiting for SignalReadyToSend, // so just add to the end of the queue and keep waiting. - if (!queued_send_data_.empty()) { - if (!QueueSendData(buffer)) { - if (data_channel_type_ == cricket::DCT_RTP) { - return false; - } + if (!queued_send_data_.Empty()) { + // Only SCTP DataChannel queues the outgoing data when the transport is + // blocked. + ASSERT(data_channel_type_ == cricket::DCT_SCTP); + if (!QueueSendDataMessage(buffer)) { Close(); } return true; } - cricket::SendDataResult send_result; - if (!InternalSendWithoutQueueing(buffer, &send_result)) { - if (data_channel_type_ == cricket::DCT_RTP) { - return false; - } - if (send_result != cricket::SDR_BLOCK || !QueueSendData(buffer)) { - Close(); - } - } - return true; -} - -void DataChannel::QueueControl(const talk_base::Buffer* buffer) { - queued_control_data_.push(buffer); -} - -bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) { - ASSERT(data_channel_type_ == cricket::DCT_SCTP && - was_ever_writable_ && - config_.id >= 0 && - !config_.negotiated); - - talk_base::scoped_ptr buffer(raw_buffer); - - cricket::SendDataParams send_params; - send_params.ssrc = config_.id; - send_params.ordered = true; - send_params.type = cricket::DMT_CONTROL; - - cricket::SendDataResult send_result; - bool retval = provider_->SendData(send_params, *buffer, &send_result); - if (retval) { - LOG(LS_INFO) << "Sent OPEN message on channel " << config_.id; - // Send data as ordered before we receive any mesage from the remote peer - // to make sure the remote peer will not receive any data before it receives - // the OPEN message. - waiting_for_open_ack_ = true; - } else if (send_result == cricket::SDR_BLOCK) { - // Link is congested. Queue for later. - QueueControl(buffer.release()); - } else { - LOG(LS_ERROR) << "Failed to send OPEN message with result " - << send_result << " on channel " << config_.id; + bool success = SendDataMessage(buffer); + if (data_channel_type_ == cricket::DCT_RTP) { + return success; } - return retval; -} - -bool DataChannel::SendOpenAckMessage(const talk_base::Buffer* raw_buffer) { - ASSERT(data_channel_type_ == cricket::DCT_SCTP && - was_ever_writable_ && - config_.id >= 0); - - talk_base::scoped_ptr buffer(raw_buffer); - - cricket::SendDataParams send_params; - send_params.ssrc = config_.id; - send_params.ordered = config_.ordered; - send_params.type = cricket::DMT_CONTROL; - cricket::SendDataResult send_result; - bool retval = provider_->SendData(send_params, *buffer, &send_result); - if (retval) { - LOG(LS_INFO) << "Sent OPEN_ACK message on channel " << config_.id; - } else if (send_result == cricket::SDR_BLOCK) { - // Link is congested. Queue for later. - QueueControl(buffer.release()); - } else { - LOG(LS_ERROR) << "Failed to send OPEN_ACK message with result " - << send_result << " on channel " << config_.id; - } - return retval; + // Always return true for SCTP DataChannel per the spec. + return true; } void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) { @@ -262,6 +232,27 @@ void DataChannel::RemotePeerRequestClose() { DoClose(); } +void DataChannel::SetSctpSid(int sid) { + ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP); + if (config_.id == sid) + return; + + config_.id = sid; + provider_->AddSctpDataStream(sid); +} + +void DataChannel::OnTransportChannelCreated() { + ASSERT(data_channel_type_ == cricket::DCT_SCTP); + if (!connected_to_provider_) { + connected_to_provider_ = provider_->ConnectDataChannel(this); + } + // The sid may have been unassigned when provider_->ConnectDataChannel was + // done. So always add the streams even if connected_to_provider_ is true. + if (config_.id >= 0) { + provider_->AddSctpDataStream(config_.id); + } +} + void DataChannel::SetSendSsrc(uint32 send_ssrc) { ASSERT(data_channel_type_ == cricket::DCT_RTP); if (send_ssrc_set_) { @@ -330,12 +321,18 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel, if (was_ever_writable_ && observer_) { observer_->OnMessage(*buffer.get()); } else { - if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) { - LOG(LS_ERROR) - << "Queued received data exceeds the max number of packets."; - ClearQueuedReceivedData(); + if (queued_received_data_.byte_count() + payload.length() > + kMaxQueuedReceivedDataBytes) { + LOG(LS_ERROR) << "Queued received data exceeds the max buffer size."; + + queued_received_data_.Clear(); + if (data_channel_type_ != cricket::DCT_RTP) { + Close(); + } + + return; } - queued_received_data_.push(buffer.release()); + queued_received_data_.Push(buffer.release()); } } @@ -350,22 +347,27 @@ void DataChannel::OnChannelReady(bool writable) { was_ever_writable_ = true; if (data_channel_type_ == cricket::DCT_SCTP) { + talk_base::Buffer payload; + if (config_.open_handshake_role == InternalDataChannelInit::kOpener) { - talk_base::Buffer* payload = new talk_base::Buffer; - WriteDataChannelOpenMessage(label_, config_, payload); - SendOpenMessage(payload); + WriteDataChannelOpenMessage(label_, config_, &payload); + SendControlMessage(payload); } else if (config_.open_handshake_role == - InternalDataChannelInit::kAcker) { - talk_base::Buffer* payload = new talk_base::Buffer; - WriteDataChannelOpenAckMessage(payload); - SendOpenAckMessage(payload); + InternalDataChannelInit::kAcker) { + WriteDataChannelOpenAckMessage(&payload); + SendControlMessage(payload); } } UpdateState(); - ASSERT(queued_send_data_.empty()); + ASSERT(queued_send_data_.Empty()); } else if (state_ == kOpen) { - DeliverQueuedSendData(); + // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition + // that the readyState is open. According to the standard, the channel + // should not become open before the OPEN message is sent. + SendQueuedControlMessages(); + + SendQueuedDataMessages(); } } @@ -389,7 +391,7 @@ void DataChannel::UpdateState() { if (was_ever_writable_) { // TODO(jiayl): Do not transition to kOpen if we failed to send the // OPEN message. - DeliverQueuedControlData(); + SendQueuedControlMessages(); SetState(kOpen); // If we have received buffers before the channel got writable. // Deliver them now. @@ -441,75 +443,27 @@ void DataChannel::DeliverQueuedReceivedData() { return; } - while (!queued_received_data_.empty()) { - DataBuffer* buffer = queued_received_data_.front(); + while (!queued_received_data_.Empty()) { + talk_base::scoped_ptr buffer(queued_received_data_.Front()); observer_->OnMessage(*buffer); - queued_received_data_.pop(); - delete buffer; - } -} - -void DataChannel::ClearQueuedReceivedData() { - while (!queued_received_data_.empty()) { - DataBuffer* buffer = queued_received_data_.front(); - queued_received_data_.pop(); - delete buffer; + queued_received_data_.Pop(); } } -void DataChannel::DeliverQueuedSendData() { +void DataChannel::SendQueuedDataMessages() { ASSERT(was_ever_writable_ && state_ == kOpen); - // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition - // that the readyState is open. According to the standard, the channel should - // not become open before the OPEN message is sent. - DeliverQueuedControlData(); - - while (!queued_send_data_.empty()) { - DataBuffer* buffer = queued_send_data_.front(); - cricket::SendDataResult send_result; - if (!InternalSendWithoutQueueing(*buffer, &send_result)) { - LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result " - << send_result; - break; - } - queued_send_data_.pop_front(); - delete buffer; - } -} - -void DataChannel::ClearQueuedControlData() { - while (!queued_control_data_.empty()) { - const talk_base::Buffer *buf = queued_control_data_.front(); - queued_control_data_.pop(); - delete buf; - } -} - -void DataChannel::DeliverQueuedControlData() { - ASSERT(was_ever_writable_); - while (!queued_control_data_.empty()) { - const talk_base::Buffer* buf = queued_control_data_.front(); - queued_control_data_.pop(); - if (config_.open_handshake_role == InternalDataChannelInit::kOpener) { - SendOpenMessage(buf); - } else { - ASSERT(config_.open_handshake_role == InternalDataChannelInit::kAcker); - SendOpenAckMessage(buf); - } - } -} + PacketQueue packet_buffer; + packet_buffer.Swap(&queued_send_data_); -void DataChannel::ClearQueuedSendData() { - while (!queued_send_data_.empty()) { - DataBuffer* buffer = queued_send_data_.front(); - queued_send_data_.pop_front(); - delete buffer; + while (!packet_buffer.Empty()) { + talk_base::scoped_ptr buffer(packet_buffer.Front()); + SendDataMessage(*buffer); + packet_buffer.Pop(); } } -bool DataChannel::InternalSendWithoutQueueing( - const DataBuffer& buffer, cricket::SendDataResult* send_result) { +bool DataChannel::SendDataMessage(const DataBuffer& buffer) { cricket::SendDataParams send_params; if (data_channel_type_ == cricket::DCT_SCTP) { @@ -529,34 +483,78 @@ bool DataChannel::InternalSendWithoutQueueing( } send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT; - return provider_->SendData(send_params, buffer.data, send_result); + cricket::SendDataResult send_result = cricket::SDR_SUCCESS; + bool success = provider_->SendData(send_params, buffer.data, &send_result); + + if (!success && data_channel_type_ == cricket::DCT_SCTP) { + if (send_result != cricket::SDR_BLOCK || !QueueSendDataMessage(buffer)) { + LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, " + << "send_result = " << send_result; + Close(); + } + } + return success; } -bool DataChannel::QueueSendData(const DataBuffer& buffer) { - if (queued_send_data_.size() >= kMaxQueuedSendDataPackets) { +bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { + if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) { LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; return false; } - queued_send_data_.push_back(new DataBuffer(buffer)); + queued_send_data_.Push(new DataBuffer(buffer)); return true; } -void DataChannel::SetSctpSid(int sid) { - ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP); - config_.id = sid; - provider_->AddSctpDataStream(sid); -} +void DataChannel::SendQueuedControlMessages() { + ASSERT(was_ever_writable_); -void DataChannel::OnTransportChannelCreated() { - ASSERT(data_channel_type_ == cricket::DCT_SCTP); - if (!connected_to_provider_) { - connected_to_provider_ = provider_->ConnectDataChannel(this); + PacketQueue control_packets; + control_packets.Swap(&queued_control_data_); + + while (!control_packets.Empty()) { + talk_base::scoped_ptr buf(control_packets.Front()); + SendControlMessage(buf->data); + control_packets.Pop(); } - // The sid may have been unassigned when provider_->ConnectDataChannel was - // done. So always add the streams even if connected_to_provider_ is true. - if (config_.id >= 0) { - provider_->AddSctpDataStream(config_.id); +} + +void DataChannel::QueueControlMessage(const talk_base::Buffer& buffer) { + queued_control_data_.Push(new DataBuffer(buffer, true)); +} + +bool DataChannel::SendControlMessage(const talk_base::Buffer& buffer) { + bool is_open_message = + (config_.open_handshake_role == InternalDataChannelInit::kOpener); + + ASSERT(data_channel_type_ == cricket::DCT_SCTP && + was_ever_writable_ && + config_.id >= 0 && + (!is_open_message || !config_.negotiated)); + + cricket::SendDataParams send_params; + send_params.ssrc = config_.id; + send_params.ordered = config_.ordered || is_open_message; + send_params.type = cricket::DMT_CONTROL; + + cricket::SendDataResult send_result = cricket::SDR_SUCCESS; + bool retval = provider_->SendData(send_params, buffer, &send_result); + if (retval) { + LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id; + + if (is_open_message) { + // Send data as ordered before we receive any message from the remote peer + // to make sure the remote peer will not receive any data before it + // receives the OPEN message. + waiting_for_open_ack_ = true; + } + } else if (send_result == cricket::SDR_BLOCK) { + QueueControlMessage(buffer); + } else { + LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send" + << " the CONTROL message, send_result = " << send_result; + Close(); } + return retval; } } // namespace webrtc -- cgit v1.2.3