aboutsummaryrefslogtreecommitdiff
path: root/talk/app/webrtc/datachannel.cc
diff options
context:
space:
mode:
authorjiayl@webrtc.org <jiayl@webrtc.org@4adac7df-926f-26a2-2b94-8c16560cd09d>2014-06-20 17:11:14 +0000
committerjiayl@webrtc.org <jiayl@webrtc.org@4adac7df-926f-26a2-2b94-8c16560cd09d>2014-06-20 17:11:14 +0000
commitb43c99de297e2686233cf495625ba1d87cbfe0e4 (patch)
treeea26fcce732d15d1b8d4103812b137cebf33e6ec /talk/app/webrtc/datachannel.cc
parentdb397e5c6c387ffb108f71059cb993e25c47a6fc (diff)
downloadwebrtc-b43c99de297e2686233cf495625ba1d87cbfe0e4.tar.gz
Limits the send and receive buffer by bytes, not by packets.
The new limit is 16MB for each buffer. Also refactors the code to handle send failure more consistently. BUG=3429 R=juberti@webrtc.org Review URL: https://webrtc-codereview.appspot.com/21559005 git-svn-id: http://webrtc.googlecode.com/svn/trunk@6511 4adac7df-926f-26a2-2b94-8c16560cd09d
Diffstat (limited to 'talk/app/webrtc/datachannel.cc')
-rw-r--r--talk/app/webrtc/datachannel.cc356
1 files changed, 177 insertions, 179 deletions
diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc
index 14caa416b4..af4fb244f5 100644
--- a/talk/app/webrtc/datachannel.cc
+++ b/talk/app/webrtc/datachannel.cc
@@ -35,13 +35,57 @@
namespace webrtc {
-static size_t kMaxQueuedReceivedDataPackets = 100;
-static size_t kMaxQueuedSendDataPackets = 100;
+static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
+static size_t kMaxQueuedSendDataBytes = 16 * 1024 * 1024;
enum {
MSG_CHANNELREADY,
};
+DataChannel::PacketQueue::PacketQueue() : byte_count_(0) {}
+
+DataChannel::PacketQueue::~PacketQueue() {
+ Clear();
+}
+
+bool DataChannel::PacketQueue::Empty() const {
+ return packets_.empty();
+}
+
+DataBuffer* DataChannel::PacketQueue::Front() {
+ return packets_.front();
+}
+
+void DataChannel::PacketQueue::Pop() {
+ if (packets_.empty()) {
+ return;
+ }
+
+ byte_count_ -= packets_.front()->size();
+ packets_.pop_front();
+}
+
+void DataChannel::PacketQueue::Push(DataBuffer* packet) {
+ byte_count_ += packet->size();
+ packets_.push_back(packet);
+}
+
+void DataChannel::PacketQueue::Clear() {
+ while (!packets_.empty()) {
+ delete packets_.front();
+ packets_.pop_front();
+ }
+ byte_count_ = 0;
+}
+
+void DataChannel::PacketQueue::Swap(PacketQueue* other) {
+ size_t other_byte_count = other->byte_count_;
+ other->byte_count_ = byte_count_;
+ byte_count_ = other_byte_count;
+
+ other->packets_.swap(packets_);
+}
+
talk_base::scoped_refptr<DataChannel> DataChannel::Create(
DataChannelProviderInterface* provider,
cricket::DataChannelType dct,
@@ -114,11 +158,7 @@ bool DataChannel::Init(const InternalDataChannelInit& config) {
return true;
}
-DataChannel::~DataChannel() {
- ClearQueuedReceivedData();
- ClearQueuedSendData();
- ClearQueuedControlData();
-}
+DataChannel::~DataChannel() {}
void DataChannel::RegisterObserver(DataChannelObserver* observer) {
observer_ = observer;
@@ -139,13 +179,7 @@ bool DataChannel::reliable() const {
}
uint64 DataChannel::buffered_amount() const {
- uint64 buffered_amount = 0;
- for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
- it != queued_send_data_.end();
- ++it) {
- buffered_amount += (*it)->size();
- }
- return buffered_amount;
+ return queued_send_data_.byte_count();
}
void DataChannel::Close() {
@@ -163,87 +197,23 @@ bool DataChannel::Send(const DataBuffer& buffer) {
}
// If the queue is non-empty, we're waiting for SignalReadyToSend,
// so just add to the end of the queue and keep waiting.
- if (!queued_send_data_.empty()) {
- if (!QueueSendData(buffer)) {
- if (data_channel_type_ == cricket::DCT_RTP) {
- return false;
- }
+ if (!queued_send_data_.Empty()) {
+ // Only SCTP DataChannel queues the outgoing data when the transport is
+ // blocked.
+ ASSERT(data_channel_type_ == cricket::DCT_SCTP);
+ if (!QueueSendDataMessage(buffer)) {
Close();
}
return true;
}
- cricket::SendDataResult send_result;
- if (!InternalSendWithoutQueueing(buffer, &send_result)) {
- if (data_channel_type_ == cricket::DCT_RTP) {
- return false;
- }
- if (send_result != cricket::SDR_BLOCK || !QueueSendData(buffer)) {
- Close();
- }
- }
- return true;
-}
-
-void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
- queued_control_data_.push(buffer);
-}
-
-bool DataChannel::SendOpenMessage(const talk_base::Buffer* raw_buffer) {
- ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
- was_ever_writable_ &&
- config_.id >= 0 &&
- !config_.negotiated);
-
- talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
-
- cricket::SendDataParams send_params;
- send_params.ssrc = config_.id;
- send_params.ordered = true;
- send_params.type = cricket::DMT_CONTROL;
-
- cricket::SendDataResult send_result;
- bool retval = provider_->SendData(send_params, *buffer, &send_result);
- if (retval) {
- LOG(LS_INFO) << "Sent OPEN message on channel " << config_.id;
- // Send data as ordered before we receive any mesage from the remote peer
- // to make sure the remote peer will not receive any data before it receives
- // the OPEN message.
- waiting_for_open_ack_ = true;
- } else if (send_result == cricket::SDR_BLOCK) {
- // Link is congested. Queue for later.
- QueueControl(buffer.release());
- } else {
- LOG(LS_ERROR) << "Failed to send OPEN message with result "
- << send_result << " on channel " << config_.id;
+ bool success = SendDataMessage(buffer);
+ if (data_channel_type_ == cricket::DCT_RTP) {
+ return success;
}
- return retval;
-}
-
-bool DataChannel::SendOpenAckMessage(const talk_base::Buffer* raw_buffer) {
- ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
- was_ever_writable_ &&
- config_.id >= 0);
-
- talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
-
- cricket::SendDataParams send_params;
- send_params.ssrc = config_.id;
- send_params.ordered = config_.ordered;
- send_params.type = cricket::DMT_CONTROL;
- cricket::SendDataResult send_result;
- bool retval = provider_->SendData(send_params, *buffer, &send_result);
- if (retval) {
- LOG(LS_INFO) << "Sent OPEN_ACK message on channel " << config_.id;
- } else if (send_result == cricket::SDR_BLOCK) {
- // Link is congested. Queue for later.
- QueueControl(buffer.release());
- } else {
- LOG(LS_ERROR) << "Failed to send OPEN_ACK message with result "
- << send_result << " on channel " << config_.id;
- }
- return retval;
+ // Always return true for SCTP DataChannel per the spec.
+ return true;
}
void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
@@ -262,6 +232,27 @@ void DataChannel::RemotePeerRequestClose() {
DoClose();
}
+void DataChannel::SetSctpSid(int sid) {
+ ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
+ if (config_.id == sid)
+ return;
+
+ config_.id = sid;
+ provider_->AddSctpDataStream(sid);
+}
+
+void DataChannel::OnTransportChannelCreated() {
+ ASSERT(data_channel_type_ == cricket::DCT_SCTP);
+ if (!connected_to_provider_) {
+ connected_to_provider_ = provider_->ConnectDataChannel(this);
+ }
+ // The sid may have been unassigned when provider_->ConnectDataChannel was
+ // done. So always add the streams even if connected_to_provider_ is true.
+ if (config_.id >= 0) {
+ provider_->AddSctpDataStream(config_.id);
+ }
+}
+
void DataChannel::SetSendSsrc(uint32 send_ssrc) {
ASSERT(data_channel_type_ == cricket::DCT_RTP);
if (send_ssrc_set_) {
@@ -330,12 +321,18 @@ void DataChannel::OnDataReceived(cricket::DataChannel* channel,
if (was_ever_writable_ && observer_) {
observer_->OnMessage(*buffer.get());
} else {
- if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
- LOG(LS_ERROR)
- << "Queued received data exceeds the max number of packets.";
- ClearQueuedReceivedData();
+ if (queued_received_data_.byte_count() + payload.length() >
+ kMaxQueuedReceivedDataBytes) {
+ LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
+
+ queued_received_data_.Clear();
+ if (data_channel_type_ != cricket::DCT_RTP) {
+ Close();
+ }
+
+ return;
}
- queued_received_data_.push(buffer.release());
+ queued_received_data_.Push(buffer.release());
}
}
@@ -350,22 +347,27 @@ void DataChannel::OnChannelReady(bool writable) {
was_ever_writable_ = true;
if (data_channel_type_ == cricket::DCT_SCTP) {
+ talk_base::Buffer payload;
+
if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
- talk_base::Buffer* payload = new talk_base::Buffer;
- WriteDataChannelOpenMessage(label_, config_, payload);
- SendOpenMessage(payload);
+ WriteDataChannelOpenMessage(label_, config_, &payload);
+ SendControlMessage(payload);
} else if (config_.open_handshake_role ==
- InternalDataChannelInit::kAcker) {
- talk_base::Buffer* payload = new talk_base::Buffer;
- WriteDataChannelOpenAckMessage(payload);
- SendOpenAckMessage(payload);
+ InternalDataChannelInit::kAcker) {
+ WriteDataChannelOpenAckMessage(&payload);
+ SendControlMessage(payload);
}
}
UpdateState();
- ASSERT(queued_send_data_.empty());
+ ASSERT(queued_send_data_.Empty());
} else if (state_ == kOpen) {
- DeliverQueuedSendData();
+ // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
+ // that the readyState is open. According to the standard, the channel
+ // should not become open before the OPEN message is sent.
+ SendQueuedControlMessages();
+
+ SendQueuedDataMessages();
}
}
@@ -389,7 +391,7 @@ void DataChannel::UpdateState() {
if (was_ever_writable_) {
// TODO(jiayl): Do not transition to kOpen if we failed to send the
// OPEN message.
- DeliverQueuedControlData();
+ SendQueuedControlMessages();
SetState(kOpen);
// If we have received buffers before the channel got writable.
// Deliver them now.
@@ -441,75 +443,27 @@ void DataChannel::DeliverQueuedReceivedData() {
return;
}
- while (!queued_received_data_.empty()) {
- DataBuffer* buffer = queued_received_data_.front();
+ while (!queued_received_data_.Empty()) {
+ talk_base::scoped_ptr<DataBuffer> buffer(queued_received_data_.Front());
observer_->OnMessage(*buffer);
- queued_received_data_.pop();
- delete buffer;
- }
-}
-
-void DataChannel::ClearQueuedReceivedData() {
- while (!queued_received_data_.empty()) {
- DataBuffer* buffer = queued_received_data_.front();
- queued_received_data_.pop();
- delete buffer;
+ queued_received_data_.Pop();
}
}
-void DataChannel::DeliverQueuedSendData() {
+void DataChannel::SendQueuedDataMessages() {
ASSERT(was_ever_writable_ && state_ == kOpen);
- // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
- // that the readyState is open. According to the standard, the channel should
- // not become open before the OPEN message is sent.
- DeliverQueuedControlData();
-
- while (!queued_send_data_.empty()) {
- DataBuffer* buffer = queued_send_data_.front();
- cricket::SendDataResult send_result;
- if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
- LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result "
- << send_result;
- break;
- }
- queued_send_data_.pop_front();
- delete buffer;
- }
-}
-
-void DataChannel::ClearQueuedControlData() {
- while (!queued_control_data_.empty()) {
- const talk_base::Buffer *buf = queued_control_data_.front();
- queued_control_data_.pop();
- delete buf;
- }
-}
-
-void DataChannel::DeliverQueuedControlData() {
- ASSERT(was_ever_writable_);
- while (!queued_control_data_.empty()) {
- const talk_base::Buffer* buf = queued_control_data_.front();
- queued_control_data_.pop();
- if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
- SendOpenMessage(buf);
- } else {
- ASSERT(config_.open_handshake_role == InternalDataChannelInit::kAcker);
- SendOpenAckMessage(buf);
- }
- }
-}
+ PacketQueue packet_buffer;
+ packet_buffer.Swap(&queued_send_data_);
-void DataChannel::ClearQueuedSendData() {
- while (!queued_send_data_.empty()) {
- DataBuffer* buffer = queued_send_data_.front();
- queued_send_data_.pop_front();
- delete buffer;
+ while (!packet_buffer.Empty()) {
+ talk_base::scoped_ptr<DataBuffer> buffer(packet_buffer.Front());
+ SendDataMessage(*buffer);
+ packet_buffer.Pop();
}
}
-bool DataChannel::InternalSendWithoutQueueing(
- const DataBuffer& buffer, cricket::SendDataResult* send_result) {
+bool DataChannel::SendDataMessage(const DataBuffer& buffer) {
cricket::SendDataParams send_params;
if (data_channel_type_ == cricket::DCT_SCTP) {
@@ -529,34 +483,78 @@ bool DataChannel::InternalSendWithoutQueueing(
}
send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
- return provider_->SendData(send_params, buffer.data, send_result);
+ cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
+ bool success = provider_->SendData(send_params, buffer.data, &send_result);
+
+ if (!success && data_channel_type_ == cricket::DCT_SCTP) {
+ if (send_result != cricket::SDR_BLOCK || !QueueSendDataMessage(buffer)) {
+ LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
+ << "send_result = " << send_result;
+ Close();
+ }
+ }
+ return success;
}
-bool DataChannel::QueueSendData(const DataBuffer& buffer) {
- if (queued_send_data_.size() >= kMaxQueuedSendDataPackets) {
+bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
+ if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) {
LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
return false;
}
- queued_send_data_.push_back(new DataBuffer(buffer));
+ queued_send_data_.Push(new DataBuffer(buffer));
return true;
}
-void DataChannel::SetSctpSid(int sid) {
- ASSERT(config_.id < 0 && sid >= 0 && data_channel_type_ == cricket::DCT_SCTP);
- config_.id = sid;
- provider_->AddSctpDataStream(sid);
-}
+void DataChannel::SendQueuedControlMessages() {
+ ASSERT(was_ever_writable_);
-void DataChannel::OnTransportChannelCreated() {
- ASSERT(data_channel_type_ == cricket::DCT_SCTP);
- if (!connected_to_provider_) {
- connected_to_provider_ = provider_->ConnectDataChannel(this);
+ PacketQueue control_packets;
+ control_packets.Swap(&queued_control_data_);
+
+ while (!control_packets.Empty()) {
+ talk_base::scoped_ptr<DataBuffer> buf(control_packets.Front());
+ SendControlMessage(buf->data);
+ control_packets.Pop();
}
- // The sid may have been unassigned when provider_->ConnectDataChannel was
- // done. So always add the streams even if connected_to_provider_ is true.
- if (config_.id >= 0) {
- provider_->AddSctpDataStream(config_.id);
+}
+
+void DataChannel::QueueControlMessage(const talk_base::Buffer& buffer) {
+ queued_control_data_.Push(new DataBuffer(buffer, true));
+}
+
+bool DataChannel::SendControlMessage(const talk_base::Buffer& buffer) {
+ bool is_open_message =
+ (config_.open_handshake_role == InternalDataChannelInit::kOpener);
+
+ ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
+ was_ever_writable_ &&
+ config_.id >= 0 &&
+ (!is_open_message || !config_.negotiated));
+
+ cricket::SendDataParams send_params;
+ send_params.ssrc = config_.id;
+ send_params.ordered = config_.ordered || is_open_message;
+ send_params.type = cricket::DMT_CONTROL;
+
+ cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
+ bool retval = provider_->SendData(send_params, buffer, &send_result);
+ if (retval) {
+ LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id;
+
+ if (is_open_message) {
+ // Send data as ordered before we receive any message from the remote peer
+ // to make sure the remote peer will not receive any data before it
+ // receives the OPEN message.
+ waiting_for_open_ack_ = true;
+ }
+ } else if (send_result == cricket::SDR_BLOCK) {
+ QueueControlMessage(buffer);
+ } else {
+ LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
+ << " the CONTROL message, send_result = " << send_result;
+ Close();
}
+ return retval;
}
} // namespace webrtc