diff options
Diffstat (limited to 'p2p/client/basic_port_allocator.cc')
-rw-r--r-- | p2p/client/basic_port_allocator.cc | 160 |
1 files changed, 80 insertions, 80 deletions
diff --git a/p2p/client/basic_port_allocator.cc b/p2p/client/basic_port_allocator.cc index 7e1f970fad..1d38a4c19f 100644 --- a/p2p/client/basic_port_allocator.cc +++ b/p2p/client/basic_port_allocator.cc @@ -12,12 +12,14 @@ #include <algorithm> #include <functional> +#include <memory> #include <set> #include <string> #include <utility> #include <vector> #include "absl/algorithm/container.h" +#include "absl/memory/memory.h" #include "p2p/base/basic_packet_socket_factory.h" #include "p2p/base/port.h" #include "p2p/base/stun_port.h" @@ -27,6 +29,8 @@ #include "rtc_base/checks.h" #include "rtc_base/helpers.h" #include "rtc_base/logging.h" +#include "rtc_base/task_utils/to_queued_task.h" +#include "rtc_base/trace_event.h" #include "system_wrappers/include/field_trial.h" #include "system_wrappers/include/metrics.h" @@ -35,15 +39,6 @@ using rtc::CreateRandomId; namespace cricket { namespace { -enum { - MSG_CONFIG_START, - MSG_CONFIG_READY, - MSG_ALLOCATE, - MSG_ALLOCATION_PHASE, - MSG_SEQUENCEOBJECTS_CREATED, - MSG_CONFIG_STOP, -}; - const int PHASE_UDP = 0; const int PHASE_RELAY = 1; const int PHASE_TCP = 2; @@ -268,16 +263,18 @@ BasicPortAllocatorSession::BasicPortAllocatorSession( network_manager_started_(false), allocation_sequences_created_(false), turn_port_prune_policy_(allocator->turn_port_prune_policy()) { + TRACE_EVENT0("webrtc", + "BasicPortAllocatorSession::BasicPortAllocatorSession"); allocator_->network_manager()->SignalNetworksChanged.connect( this, &BasicPortAllocatorSession::OnNetworksChanged); allocator_->network_manager()->StartUpdating(); } BasicPortAllocatorSession::~BasicPortAllocatorSession() { + TRACE_EVENT0("webrtc", + "BasicPortAllocatorSession::~BasicPortAllocatorSession"); RTC_DCHECK_RUN_ON(network_thread_); allocator_->network_manager()->StopUpdating(); - if (network_thread_ != NULL) - network_thread_->Clear(this); for (uint32_t i = 0; i < sequences_.size(); ++i) { // AllocationSequence should clear it's map entry for turn ports before @@ -289,8 +286,7 @@ BasicPortAllocatorSession::~BasicPortAllocatorSession() { for (it = ports_.begin(); it != ports_.end(); it++) delete it->port(); - for (uint32_t i = 0; i < configs_.size(); ++i) - delete configs_[i]; + configs_.clear(); for (uint32_t i = 0; i < sequences_.size(); ++i) delete sequences_[i]; @@ -370,7 +366,8 @@ void BasicPortAllocatorSession::StartGettingPorts() { socket_factory_ = owned_socket_factory_.get(); } - network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_START); + network_thread_->PostTask(webrtc::ToQueuedTask( + network_safety_, [this] { GetPortConfigurations(); })); RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy " << turn_port_prune_policy_; @@ -386,11 +383,12 @@ void BasicPortAllocatorSession::StopGettingPorts() { void BasicPortAllocatorSession::ClearGettingPorts() { RTC_DCHECK_RUN_ON(network_thread_); - network_thread_->Clear(this, MSG_ALLOCATE); + ++allocation_epoch_; for (uint32_t i = 0; i < sequences_.size(); ++i) { sequences_[i]->Stop(); } - network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_STOP); + network_thread_->PostTask( + webrtc::ToQueuedTask(network_safety_, [this] { OnConfigStop(); })); state_ = SessionState::CLEARED; } @@ -489,8 +487,10 @@ void BasicPortAllocatorSession::GetCandidateStatsFromReadyPorts( for (auto* port : ports) { auto candidates = port->Candidates(); for (const auto& candidate : candidates) { - CandidateStats candidate_stats(allocator_->SanitizeCandidate(candidate)); - port->GetStunStats(&candidate_stats.stun_stats); + absl::optional<StunStats> stun_stats; + port->GetStunStats(&stun_stats); + CandidateStats candidate_stats(allocator_->SanitizeCandidate(candidate), + std::move(stun_stats)); candidate_stats_list->push_back(std::move(candidate_stats)); } } @@ -574,28 +574,6 @@ bool BasicPortAllocatorSession::CandidatesAllocationDone() const { ports_, [](const PortData& port) { return port.inprogress(); }); } -void BasicPortAllocatorSession::OnMessage(rtc::Message* message) { - switch (message->message_id) { - case MSG_CONFIG_START: - GetPortConfigurations(); - break; - case MSG_CONFIG_READY: - OnConfigReady(static_cast<PortConfiguration*>(message->pdata)); - break; - case MSG_ALLOCATE: - OnAllocate(); - break; - case MSG_SEQUENCEOBJECTS_CREATED: - OnAllocationSequenceObjectsCreated(); - break; - case MSG_CONFIG_STOP: - OnConfigStop(); - break; - default: - RTC_NOTREACHED(); - } -} - void BasicPortAllocatorSession::UpdateIceParametersInternal() { RTC_DCHECK_RUN_ON(network_thread_); for (PortData& port : ports_) { @@ -607,26 +585,35 @@ void BasicPortAllocatorSession::UpdateIceParametersInternal() { void BasicPortAllocatorSession::GetPortConfigurations() { RTC_DCHECK_RUN_ON(network_thread_); - PortConfiguration* config = - new PortConfiguration(allocator_->stun_servers(), username(), password()); + auto config = std::make_unique<PortConfiguration>(allocator_->stun_servers(), + username(), password()); for (const RelayServerConfig& turn_server : allocator_->turn_servers()) { config->AddRelay(turn_server); } - ConfigReady(config); + ConfigReady(std::move(config)); } void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) { RTC_DCHECK_RUN_ON(network_thread_); - network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_READY, config); + ConfigReady(absl::WrapUnique(config)); +} + +void BasicPortAllocatorSession::ConfigReady( + std::unique_ptr<PortConfiguration> config) { + RTC_DCHECK_RUN_ON(network_thread_); + network_thread_->PostTask(webrtc::ToQueuedTask( + network_safety_, [this, config = std::move(config)]() mutable { + OnConfigReady(std::move(config)); + })); } // Adds a configuration to the list. -void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) { +void BasicPortAllocatorSession::OnConfigReady( + std::unique_ptr<PortConfiguration> config) { RTC_DCHECK_RUN_ON(network_thread_); - if (config) { - configs_.push_back(config); - } + if (config) + configs_.push_back(std::move(config)); AllocatePorts(); } @@ -664,11 +651,16 @@ void BasicPortAllocatorSession::OnConfigStop() { void BasicPortAllocatorSession::AllocatePorts() { RTC_DCHECK_RUN_ON(network_thread_); - network_thread_->Post(RTC_FROM_HERE, this, MSG_ALLOCATE); + network_thread_->PostTask(webrtc::ToQueuedTask( + network_safety_, [this, allocation_epoch = allocation_epoch_] { + OnAllocate(allocation_epoch); + })); } -void BasicPortAllocatorSession::OnAllocate() { +void BasicPortAllocatorSession::OnAllocate(int allocation_epoch) { RTC_DCHECK_RUN_ON(network_thread_); + if (allocation_epoch != allocation_epoch_) + return; if (network_manager_started_ && !IsStopped()) { bool disable_equivalent_phases = true; @@ -774,7 +766,8 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) { done_signal_needed = true; } else { RTC_LOG(LS_INFO) << "Allocate ports on " << networks.size() << " networks"; - PortConfiguration* config = configs_.empty() ? nullptr : configs_.back(); + PortConfiguration* config = + configs_.empty() ? nullptr : configs_.back().get(); for (uint32_t i = 0; i < networks.size(); ++i) { uint32_t sequence_flags = flags(); if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) { @@ -814,9 +807,11 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) { } AllocationSequence* sequence = - new AllocationSequence(this, networks[i], config, sequence_flags); - sequence->SignalPortAllocationComplete.connect( - this, &BasicPortAllocatorSession::OnPortAllocationComplete); + new AllocationSequence(this, networks[i], config, sequence_flags, + [this, safety_flag = network_safety_.flag()] { + if (safety_flag->alive()) + OnPortAllocationComplete(); + }); sequence->Init(); sequence->Start(); sequences_.push_back(sequence); @@ -824,7 +819,8 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) { } } if (done_signal_needed) { - network_thread_->Post(RTC_FROM_HERE, this, MSG_SEQUENCEOBJECTS_CREATED); + network_thread_->PostTask(webrtc::ToQueuedTask( + network_safety_, [this] { OnAllocationSequenceObjectsCreated(); })); } } @@ -1128,8 +1124,7 @@ bool BasicPortAllocatorSession::CandidatePairable(const Candidate& c, !host_candidates_disabled); } -void BasicPortAllocatorSession::OnPortAllocationComplete( - AllocationSequence* seq) { +void BasicPortAllocatorSession::OnPortAllocationComplete() { RTC_DCHECK_RUN_ON(network_thread_); // Send candidate allocation complete signal if all ports are done. MaybeSignalCandidatesAllocationDone(); @@ -1220,10 +1215,12 @@ void BasicPortAllocatorSession::PrunePortsAndRemoveCandidates( // AllocationSequence -AllocationSequence::AllocationSequence(BasicPortAllocatorSession* session, - rtc::Network* network, - PortConfiguration* config, - uint32_t flags) +AllocationSequence::AllocationSequence( + BasicPortAllocatorSession* session, + rtc::Network* network, + PortConfiguration* config, + uint32_t flags, + std::function<void()> port_allocation_complete_callback) : session_(session), network_(network), config_(config), @@ -1231,7 +1228,9 @@ AllocationSequence::AllocationSequence(BasicPortAllocatorSession* session, flags_(flags), udp_socket_(), udp_port_(NULL), - phase_(0) {} + phase_(0), + port_allocation_complete_callback_( + std::move(port_allocation_complete_callback)) {} void AllocationSequence::Init() { if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) { @@ -1248,6 +1247,7 @@ void AllocationSequence::Init() { } void AllocationSequence::Clear() { + TRACE_EVENT0("webrtc", "AllocationSequence::Clear"); udp_port_ = NULL; relay_ports_.clear(); } @@ -1259,10 +1259,6 @@ void AllocationSequence::OnNetworkFailed() { Stop(); } -AllocationSequence::~AllocationSequence() { - session_->network_thread()->Clear(this); -} - void AllocationSequence::DisableEquivalentPhases(rtc::Network* network, PortConfiguration* config, uint32_t* flags) { @@ -1337,7 +1333,9 @@ void AllocationSequence::DisableEquivalentPhases(rtc::Network* network, void AllocationSequence::Start() { state_ = kRunning; - session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE); + + session_->network_thread()->PostTask(webrtc::ToQueuedTask( + safety_, [this, epoch = epoch_] { Process(epoch); })); // Take a snapshot of the best IP, so that when DisableEquivalentPhases is // called next time, we enable all phases if the best IP has since changed. previous_best_ip_ = network_->GetBestIP(); @@ -1347,16 +1345,18 @@ void AllocationSequence::Stop() { // If the port is completed, don't set it to stopped. if (state_ == kRunning) { state_ = kStopped; - session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE); + // Cause further Process calls in the previous epoch to be ignored. + ++epoch_; } } -void AllocationSequence::OnMessage(rtc::Message* msg) { +void AllocationSequence::Process(int epoch) { RTC_DCHECK(rtc::Thread::Current() == session_->network_thread()); - RTC_DCHECK(msg->message_id == MSG_ALLOCATION_PHASE); - const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"}; + if (epoch != epoch_) + return; + // Perform all of the phases in the current step. RTC_LOG(LS_INFO) << network_->ToString() << ": Allocation Phase=" << PHASE_NAMES[phase_]; @@ -1382,14 +1382,16 @@ void AllocationSequence::OnMessage(rtc::Message* msg) { if (state() == kRunning) { ++phase_; - session_->network_thread()->PostDelayed(RTC_FROM_HERE, - session_->allocator()->step_delay(), - this, MSG_ALLOCATION_PHASE); + session_->network_thread()->PostDelayedTask( + webrtc::ToQueuedTask(safety_, + [this, epoch = epoch_] { Process(epoch); }), + session_->allocator()->step_delay()); } else { - // If all phases in AllocationSequence are completed, no allocation - // steps needed further. Canceling pending signal. - session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE); - SignalPortAllocationComplete(this); + // No allocation steps needed further if all phases in AllocationSequence + // are completed. Cause further Process calls in the previous epoch to be + // ignored. + ++epoch_; + port_allocation_complete_callback_(); } } @@ -1657,8 +1659,6 @@ PortConfiguration::PortConfiguration(const ServerAddresses& stun_servers, webrtc::field_trial::IsDisabled("WebRTC-UseTurnServerAsStunServer"); } -PortConfiguration::~PortConfiguration() = default; - ServerAddresses PortConfiguration::StunServers() { if (!stun_address.IsNil() && stun_servers.find(stun_address) == stun_servers.end()) { |