diff options
Diffstat (limited to 'webrtc/p2p/base/turnport.cc')
-rw-r--r-- | webrtc/p2p/base/turnport.cc | 256 |
1 files changed, 197 insertions, 59 deletions
diff --git a/webrtc/p2p/base/turnport.cc b/webrtc/p2p/base/turnport.cc index 3fdcac5f31..5ed93dd1d8 100644 --- a/webrtc/p2p/base/turnport.cc +++ b/webrtc/p2p/base/turnport.cc @@ -38,6 +38,8 @@ static const size_t TURN_CHANNEL_HEADER_SIZE = 4U; // STUN_ERROR_ALLOCATION_MISMATCH error per rfc5766. static const size_t MAX_ALLOCATE_MISMATCH_RETRIES = 2; +static const int TURN_SUCCESS_RESULT_CODE = 0; + inline bool IsTurnChannelData(uint16_t msg_type) { return ((msg_type & 0xC000) == 0x4000); // MSB are 0b01 } @@ -137,11 +139,19 @@ class TurnEntry : public sigslot::has_slots<> { TurnPort* port() { return port_; } int channel_id() const { return channel_id_; } + // For testing only. + void set_channel_id(int channel_id) { channel_id_ = channel_id; } + const rtc::SocketAddress& address() const { return ext_addr_; } BindState state() const { return state_; } + uint32_t destruction_timestamp() { return destruction_timestamp_; } + void set_destruction_timestamp(uint32_t destruction_timestamp) { + destruction_timestamp_ = destruction_timestamp; + } + // Helper methods to send permission and channel bind requests. - void SendCreatePermissionRequest(); + void SendCreatePermissionRequest(int delay); void SendChannelBindRequest(int delay); // Sends a packet to the given destination address. // This will wrap the packet in STUN if necessary. @@ -150,8 +160,10 @@ class TurnEntry : public sigslot::has_slots<> { void OnCreatePermissionSuccess(); void OnCreatePermissionError(StunMessage* response, int code); + void OnCreatePermissionTimeout(); void OnChannelBindSuccess(); void OnChannelBindError(StunMessage* response, int code); + void OnChannelBindTimeout(); // Signal sent when TurnEntry is destroyed. sigslot::signal1<TurnEntry*> SignalDestroyed; @@ -160,6 +172,11 @@ class TurnEntry : public sigslot::has_slots<> { int channel_id_; rtc::SocketAddress ext_addr_; BindState state_; + // A non-zero value indicates that this entry is scheduled to be destroyed. + // It is also used as an ID of the event scheduling. When the destruction + // event actually fires, the TurnEntry will be destroyed only if the + // timestamp here matches the one in the firing event. + uint32_t destruction_timestamp_ = 0; }; TurnPort::TurnPort(rtc::Thread* thread, @@ -239,7 +256,7 @@ TurnPort::~TurnPort() { } while (!entries_.empty()) { - DestroyEntry(entries_.front()->address()); + DestroyEntry(entries_.front()); } if (resolver_) { resolver_->Destroy(false); @@ -267,7 +284,7 @@ void TurnPort::PrepareAddress() { server_address_.address.SetPort(TURN_DEFAULT_PORT); } - if (server_address_.address.IsUnresolved()) { + if (server_address_.address.IsUnresolvedIP()) { ResolveTurnAddress(server_address_.address); } else { // If protocol family of server address doesn't match with local, return. @@ -334,6 +351,8 @@ bool TurnPort::CreateTurnClientSocket() { socket_->SignalReadyToSend.connect(this, &TurnPort::OnReadyToSend); + socket_->SignalSentPacket.connect(this, &TurnPort::OnSentPacket); + // TCP port is ready to send stun requests after the socket is connected, // while UDP port is ready to do so once the socket is created. if (server_address_.proto == PROTO_TCP) { @@ -380,7 +399,7 @@ void TurnPort::OnSocketConnect(rtc::AsyncPacketSocket* socket) { } state_ = STATE_CONNECTED; // It is ready to send stun requests. - if (server_address_.address.IsUnresolved()) { + if (server_address_.address.IsUnresolvedIP()) { server_address_.address = socket_->GetRemoteAddress(); } @@ -392,11 +411,7 @@ void TurnPort::OnSocketConnect(rtc::AsyncPacketSocket* socket) { void TurnPort::OnSocketClose(rtc::AsyncPacketSocket* socket, int error) { LOG_J(LS_WARNING, this) << "Connection with server failed, error=" << error; ASSERT(socket == socket_); - if (!ready()) { - OnAllocateError(); - } - request_manager_.Clear(); - state_ = STATE_DISCONNECTED; + Close(); } void TurnPort::OnAllocateMismatch() { @@ -425,7 +440,7 @@ void TurnPort::OnAllocateMismatch() { Connection* TurnPort::CreateConnection(const Candidate& address, CandidateOrigin origin) { // TURN-UDP can only connect to UDP candidates. - if (address.protocol() != UDP_PROTOCOL_NAME) { + if (!SupportsProtocol(address.protocol())) { return NULL; } @@ -438,7 +453,7 @@ Connection* TurnPort::CreateConnection(const Candidate& address, } // Create an entry, if needed, so we can get our permissions set up correctly. - CreateEntry(address.address()); + CreateOrRefreshEntry(address.address()); // A TURN port will have two candiates, STUN and TURN. STUN may not // present in all cases. If present stun candidate will be added first @@ -454,6 +469,15 @@ Connection* TurnPort::CreateConnection(const Candidate& address, return NULL; } +bool TurnPort::DestroyConnection(const rtc::SocketAddress& address) { + Connection* conn = GetConnection(address); + if (conn != nullptr) { + conn->Destroy(); + return true; + } + return false; +} + int TurnPort::SetOption(rtc::Socket::Option opt, int value) { if (!socket_) { // If socket is not created yet, these options will be applied during socket @@ -560,6 +584,11 @@ void TurnPort::OnReadPacket( } } +void TurnPort::OnSentPacket(rtc::AsyncPacketSocket* socket, + const rtc::SentPacket& sent_packet) { + PortInterface::SignalSentPacket(sent_packet); +} + void TurnPort::OnReadyToSend(rtc::AsyncPacketSocket* socket) { if (ready()) { Port::OnReadyToSend(); @@ -602,6 +631,8 @@ void TurnPort::ResolveTurnAddress(const rtc::SocketAddress& address) { if (resolver_) return; + LOG_J(LS_INFO, this) << "Starting TURN host lookup for " + << address.ToSensitiveString(); resolver_ = socket_factory()->CreateAsyncResolver(); resolver_->SignalDone.connect(this, &TurnPort::OnResolveResult); resolver_->Start(address); @@ -686,35 +717,59 @@ void TurnPort::OnAllocateError() { // We will send SignalPortError asynchronously as this can be sent during // port initialization. This way it will not be blocking other port // creation. - thread()->Post(this, MSG_ERROR); + thread()->Post(this, MSG_ALLOCATE_ERROR); } -void TurnPort::OnMessage(rtc::Message* message) { - if (message->message_id == MSG_ERROR) { - SignalPortError(this); - return; - } else if (message->message_id == MSG_ALLOCATE_MISMATCH) { - OnAllocateMismatch(); - return; - } else if (message->message_id == MSG_TRY_ALTERNATE_SERVER) { - if (server_address().proto == PROTO_UDP) { - // Send another allocate request to alternate server, with the received - // realm and nonce values. - SendRequest(new TurnAllocateRequest(this), 0); - } else { - // Since it's TCP, we have to delete the connected socket and reconnect - // with the alternate server. PrepareAddress will send stun binding once - // the new socket is connected. - ASSERT(server_address().proto == PROTO_TCP); - ASSERT(!SharedSocket()); - delete socket_; - socket_ = NULL; - PrepareAddress(); - } - return; +void TurnPort::OnTurnRefreshError() { + // Need to Close the port asynchronously because otherwise, the refresh + // request may be deleted twice: once at the end of the message processing + // and the other in Close(). + thread()->Post(this, MSG_REFRESH_ERROR); +} + +void TurnPort::Close() { + if (!ready()) { + OnAllocateError(); + } + request_manager_.Clear(); + // Stop the port from creating new connections. + state_ = STATE_DISCONNECTED; + // Delete all existing connections; stop sending data. + for (auto kv : connections()) { + kv.second->Destroy(); } +} - Port::OnMessage(message); +void TurnPort::OnMessage(rtc::Message* message) { + switch (message->message_id) { + case MSG_ALLOCATE_ERROR: + SignalPortError(this); + break; + case MSG_ALLOCATE_MISMATCH: + OnAllocateMismatch(); + break; + case MSG_REFRESH_ERROR: + Close(); + break; + case MSG_TRY_ALTERNATE_SERVER: + if (server_address().proto == PROTO_UDP) { + // Send another allocate request to alternate server, with the received + // realm and nonce values. + SendRequest(new TurnAllocateRequest(this), 0); + } else { + // Since it's TCP, we have to delete the connected socket and reconnect + // with the alternate server. PrepareAddress will send stun binding once + // the new socket is connected. + ASSERT(server_address().proto == PROTO_TCP); + ASSERT(!SharedSocket()); + delete socket_; + socket_ = NULL; + PrepareAddress(); + } + break; + default: + Port::OnMessage(message); + } } void TurnPort::OnAllocateRequestTimeout() { @@ -898,24 +953,73 @@ TurnEntry* TurnPort::FindEntry(int channel_id) const { return (it != entries_.end()) ? *it : NULL; } -TurnEntry* TurnPort::CreateEntry(const rtc::SocketAddress& addr) { - ASSERT(FindEntry(addr) == NULL); - TurnEntry* entry = new TurnEntry(this, next_channel_number_++, addr); - entries_.push_back(entry); - return entry; +bool TurnPort::EntryExists(TurnEntry* e) { + auto it = std::find(entries_.begin(), entries_.end(), e); + return it != entries_.end(); } -void TurnPort::DestroyEntry(const rtc::SocketAddress& addr) { +void TurnPort::CreateOrRefreshEntry(const rtc::SocketAddress& addr) { TurnEntry* entry = FindEntry(addr); + if (entry == nullptr) { + entry = new TurnEntry(this, next_channel_number_++, addr); + entries_.push_back(entry); + } else { + // The channel binding request for the entry will be refreshed automatically + // until the entry is destroyed. + CancelEntryDestruction(entry); + } +} + +void TurnPort::DestroyEntry(TurnEntry* entry) { ASSERT(entry != NULL); entry->SignalDestroyed(entry); entries_.remove(entry); delete entry; } +void TurnPort::DestroyEntryIfNotCancelled(TurnEntry* entry, + uint32_t timestamp) { + if (!EntryExists(entry)) { + return; + } + bool cancelled = timestamp != entry->destruction_timestamp(); + if (!cancelled) { + DestroyEntry(entry); + } +} + void TurnPort::OnConnectionDestroyed(Connection* conn) { - // Destroying TurnEntry for the connection, which is already destroyed. - DestroyEntry(conn->remote_candidate().address()); + // Schedule an event to destroy TurnEntry for the connection, which is + // already destroyed. + const rtc::SocketAddress& remote_address = conn->remote_candidate().address(); + TurnEntry* entry = FindEntry(remote_address); + ASSERT(entry != NULL); + ScheduleEntryDestruction(entry); +} + +void TurnPort::ScheduleEntryDestruction(TurnEntry* entry) { + ASSERT(entry->destruction_timestamp() == 0); + uint32_t timestamp = rtc::Time(); + entry->set_destruction_timestamp(timestamp); + invoker_.AsyncInvokeDelayed<void>( + thread(), + rtc::Bind(&TurnPort::DestroyEntryIfNotCancelled, this, entry, timestamp), + TURN_PERMISSION_TIMEOUT); +} + +void TurnPort::CancelEntryDestruction(TurnEntry* entry) { + ASSERT(entry->destruction_timestamp() != 0); + entry->set_destruction_timestamp(0); +} + +bool TurnPort::SetEntryChannelId(const rtc::SocketAddress& address, + int channel_id) { + TurnEntry* entry = FindEntry(address); + if (!entry) { + return false; + } + entry->set_channel_id(channel_id); + return true; } TurnAllocateRequest::TurnAllocateRequest(TurnPort* port) @@ -1131,16 +1235,12 @@ void TurnRefreshRequest::OnResponse(StunMessage* response) { // Schedule a refresh based on the returned lifetime value. port_->ScheduleRefresh(lifetime_attr->value()); + port_->SignalTurnRefreshResult(port_, TURN_SUCCESS_RESULT_CODE); } void TurnRefreshRequest::OnErrorResponse(StunMessage* response) { const StunErrorCodeAttribute* error_code = response->GetErrorCode(); - LOG_J(LS_INFO, port_) << "Received TURN refresh error response" - << ", id=" << rtc::hex_encode(id()) - << ", code=" << error_code->code() - << ", rtt=" << Elapsed(); - if (error_code->code() == STUN_ERROR_STALE_NONCE) { if (port_->UpdateNonce(response)) { // Send RefreshRequest immediately. @@ -1151,11 +1251,14 @@ void TurnRefreshRequest::OnErrorResponse(StunMessage* response) { << ", id=" << rtc::hex_encode(id()) << ", code=" << error_code->code() << ", rtt=" << Elapsed(); + port_->OnTurnRefreshError(); + port_->SignalTurnRefreshResult(port_, error_code->code()); } } void TurnRefreshRequest::OnTimeout() { LOG_J(LS_WARNING, port_) << "TURN refresh timeout " << rtc::hex_encode(id()); + port_->OnTurnRefreshError(); } TurnCreatePermissionRequest::TurnCreatePermissionRequest( @@ -1208,6 +1311,9 @@ void TurnCreatePermissionRequest::OnErrorResponse(StunMessage* response) { void TurnCreatePermissionRequest::OnTimeout() { LOG_J(LS_WARNING, port_) << "TURN create permission timeout " << rtc::hex_encode(id()); + if (entry_) { + entry_->OnCreatePermissionTimeout(); + } } void TurnCreatePermissionRequest::OnEntryDestroyed(TurnEntry* entry) { @@ -1275,6 +1381,9 @@ void TurnChannelBindRequest::OnErrorResponse(StunMessage* response) { void TurnChannelBindRequest::OnTimeout() { LOG_J(LS_WARNING, port_) << "TURN channel bind timeout " << rtc::hex_encode(id()); + if (entry_) { + entry_->OnChannelBindTimeout(); + } } void TurnChannelBindRequest::OnEntryDestroyed(TurnEntry* entry) { @@ -1289,12 +1398,12 @@ TurnEntry::TurnEntry(TurnPort* port, int channel_id, ext_addr_(ext_addr), state_(STATE_UNBOUND) { // Creating permission for |ext_addr_|. - SendCreatePermissionRequest(); + SendCreatePermissionRequest(0); } -void TurnEntry::SendCreatePermissionRequest() { - port_->SendRequest(new TurnCreatePermissionRequest( - port_, this, ext_addr_), 0); +void TurnEntry::SendCreatePermissionRequest(int delay) { + port_->SendRequest(new TurnCreatePermissionRequest(port_, this, ext_addr_), + delay); } void TurnEntry::SendChannelBindRequest(int delay) { @@ -1335,21 +1444,43 @@ void TurnEntry::OnCreatePermissionSuccess() { LOG_J(LS_INFO, port_) << "Create permission for " << ext_addr_.ToSensitiveString() << " succeeded"; - // For success result code will be 0. - port_->SignalCreatePermissionResult(port_, ext_addr_, 0); + port_->SignalCreatePermissionResult(port_, ext_addr_, + TURN_SUCCESS_RESULT_CODE); + + // If |state_| is STATE_BOUND, the permission will be refreshed + // by ChannelBindRequest. + if (state_ != STATE_BOUND) { + // Refresh the permission request about 1 minute before the permission + // times out. + int delay = TURN_PERMISSION_TIMEOUT - 60000; + SendCreatePermissionRequest(delay); + LOG_J(LS_INFO, port_) << "Scheduled create-permission-request in " + << delay << "ms."; + } } void TurnEntry::OnCreatePermissionError(StunMessage* response, int code) { if (code == STUN_ERROR_STALE_NONCE) { if (port_->UpdateNonce(response)) { - SendCreatePermissionRequest(); + SendCreatePermissionRequest(0); } } else { + port_->DestroyConnection(ext_addr_); // Send signal with error code. port_->SignalCreatePermissionResult(port_, ext_addr_, code); + Connection* c = port_->GetConnection(ext_addr_); + if (c) { + LOG_J(LS_ERROR, c) << "Received TURN CreatePermission error response, " + << "code=" << code << "; killing connection."; + c->FailAndDestroy(); + } } } +void TurnEntry::OnCreatePermissionTimeout() { + port_->DestroyConnection(ext_addr_); +} + void TurnEntry::OnChannelBindSuccess() { LOG_J(LS_INFO, port_) << "Channel bind for " << ext_addr_.ToSensitiveString() << " succeeded"; @@ -1358,14 +1489,21 @@ void TurnEntry::OnChannelBindSuccess() { } void TurnEntry::OnChannelBindError(StunMessage* response, int code) { - // TODO(mallinath) - Implement handling of error response for channel - // bind request as per http://tools.ietf.org/html/rfc5766#section-11.3 + // If the channel bind fails due to errors other than STATE_NONCE, + // we just destroy the connection and rely on ICE restart to re-establish + // the connection. if (code == STUN_ERROR_STALE_NONCE) { if (port_->UpdateNonce(response)) { // Send channel bind request with fresh nonce. SendChannelBindRequest(0); } + } else { + state_ = STATE_UNBOUND; + port_->DestroyConnection(ext_addr_); } } - +void TurnEntry::OnChannelBindTimeout() { + state_ = STATE_UNBOUND; + port_->DestroyConnection(ext_addr_); +} } // namespace cricket |