aboutsummaryrefslogtreecommitdiff
path: root/p2p/client/basic_port_allocator.cc
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/client/basic_port_allocator.cc')
-rw-r--r--p2p/client/basic_port_allocator.cc160
1 files changed, 80 insertions, 80 deletions
diff --git a/p2p/client/basic_port_allocator.cc b/p2p/client/basic_port_allocator.cc
index 7e1f970fad..1d38a4c19f 100644
--- a/p2p/client/basic_port_allocator.cc
+++ b/p2p/client/basic_port_allocator.cc
@@ -12,12 +12,14 @@
#include <algorithm>
#include <functional>
+#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "absl/algorithm/container.h"
+#include "absl/memory/memory.h"
#include "p2p/base/basic_packet_socket_factory.h"
#include "p2p/base/port.h"
#include "p2p/base/stun_port.h"
@@ -27,6 +29,8 @@
#include "rtc_base/checks.h"
#include "rtc_base/helpers.h"
#include "rtc_base/logging.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+#include "rtc_base/trace_event.h"
#include "system_wrappers/include/field_trial.h"
#include "system_wrappers/include/metrics.h"
@@ -35,15 +39,6 @@ using rtc::CreateRandomId;
namespace cricket {
namespace {
-enum {
- MSG_CONFIG_START,
- MSG_CONFIG_READY,
- MSG_ALLOCATE,
- MSG_ALLOCATION_PHASE,
- MSG_SEQUENCEOBJECTS_CREATED,
- MSG_CONFIG_STOP,
-};
-
const int PHASE_UDP = 0;
const int PHASE_RELAY = 1;
const int PHASE_TCP = 2;
@@ -268,16 +263,18 @@ BasicPortAllocatorSession::BasicPortAllocatorSession(
network_manager_started_(false),
allocation_sequences_created_(false),
turn_port_prune_policy_(allocator->turn_port_prune_policy()) {
+ TRACE_EVENT0("webrtc",
+ "BasicPortAllocatorSession::BasicPortAllocatorSession");
allocator_->network_manager()->SignalNetworksChanged.connect(
this, &BasicPortAllocatorSession::OnNetworksChanged);
allocator_->network_manager()->StartUpdating();
}
BasicPortAllocatorSession::~BasicPortAllocatorSession() {
+ TRACE_EVENT0("webrtc",
+ "BasicPortAllocatorSession::~BasicPortAllocatorSession");
RTC_DCHECK_RUN_ON(network_thread_);
allocator_->network_manager()->StopUpdating();
- if (network_thread_ != NULL)
- network_thread_->Clear(this);
for (uint32_t i = 0; i < sequences_.size(); ++i) {
// AllocationSequence should clear it's map entry for turn ports before
@@ -289,8 +286,7 @@ BasicPortAllocatorSession::~BasicPortAllocatorSession() {
for (it = ports_.begin(); it != ports_.end(); it++)
delete it->port();
- for (uint32_t i = 0; i < configs_.size(); ++i)
- delete configs_[i];
+ configs_.clear();
for (uint32_t i = 0; i < sequences_.size(); ++i)
delete sequences_[i];
@@ -370,7 +366,8 @@ void BasicPortAllocatorSession::StartGettingPorts() {
socket_factory_ = owned_socket_factory_.get();
}
- network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_START);
+ network_thread_->PostTask(webrtc::ToQueuedTask(
+ network_safety_, [this] { GetPortConfigurations(); }));
RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy "
<< turn_port_prune_policy_;
@@ -386,11 +383,12 @@ void BasicPortAllocatorSession::StopGettingPorts() {
void BasicPortAllocatorSession::ClearGettingPorts() {
RTC_DCHECK_RUN_ON(network_thread_);
- network_thread_->Clear(this, MSG_ALLOCATE);
+ ++allocation_epoch_;
for (uint32_t i = 0; i < sequences_.size(); ++i) {
sequences_[i]->Stop();
}
- network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_STOP);
+ network_thread_->PostTask(
+ webrtc::ToQueuedTask(network_safety_, [this] { OnConfigStop(); }));
state_ = SessionState::CLEARED;
}
@@ -489,8 +487,10 @@ void BasicPortAllocatorSession::GetCandidateStatsFromReadyPorts(
for (auto* port : ports) {
auto candidates = port->Candidates();
for (const auto& candidate : candidates) {
- CandidateStats candidate_stats(allocator_->SanitizeCandidate(candidate));
- port->GetStunStats(&candidate_stats.stun_stats);
+ absl::optional<StunStats> stun_stats;
+ port->GetStunStats(&stun_stats);
+ CandidateStats candidate_stats(allocator_->SanitizeCandidate(candidate),
+ std::move(stun_stats));
candidate_stats_list->push_back(std::move(candidate_stats));
}
}
@@ -574,28 +574,6 @@ bool BasicPortAllocatorSession::CandidatesAllocationDone() const {
ports_, [](const PortData& port) { return port.inprogress(); });
}
-void BasicPortAllocatorSession::OnMessage(rtc::Message* message) {
- switch (message->message_id) {
- case MSG_CONFIG_START:
- GetPortConfigurations();
- break;
- case MSG_CONFIG_READY:
- OnConfigReady(static_cast<PortConfiguration*>(message->pdata));
- break;
- case MSG_ALLOCATE:
- OnAllocate();
- break;
- case MSG_SEQUENCEOBJECTS_CREATED:
- OnAllocationSequenceObjectsCreated();
- break;
- case MSG_CONFIG_STOP:
- OnConfigStop();
- break;
- default:
- RTC_NOTREACHED();
- }
-}
-
void BasicPortAllocatorSession::UpdateIceParametersInternal() {
RTC_DCHECK_RUN_ON(network_thread_);
for (PortData& port : ports_) {
@@ -607,26 +585,35 @@ void BasicPortAllocatorSession::UpdateIceParametersInternal() {
void BasicPortAllocatorSession::GetPortConfigurations() {
RTC_DCHECK_RUN_ON(network_thread_);
- PortConfiguration* config =
- new PortConfiguration(allocator_->stun_servers(), username(), password());
+ auto config = std::make_unique<PortConfiguration>(allocator_->stun_servers(),
+ username(), password());
for (const RelayServerConfig& turn_server : allocator_->turn_servers()) {
config->AddRelay(turn_server);
}
- ConfigReady(config);
+ ConfigReady(std::move(config));
}
void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) {
RTC_DCHECK_RUN_ON(network_thread_);
- network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_READY, config);
+ ConfigReady(absl::WrapUnique(config));
+}
+
+void BasicPortAllocatorSession::ConfigReady(
+ std::unique_ptr<PortConfiguration> config) {
+ RTC_DCHECK_RUN_ON(network_thread_);
+ network_thread_->PostTask(webrtc::ToQueuedTask(
+ network_safety_, [this, config = std::move(config)]() mutable {
+ OnConfigReady(std::move(config));
+ }));
}
// Adds a configuration to the list.
-void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) {
+void BasicPortAllocatorSession::OnConfigReady(
+ std::unique_ptr<PortConfiguration> config) {
RTC_DCHECK_RUN_ON(network_thread_);
- if (config) {
- configs_.push_back(config);
- }
+ if (config)
+ configs_.push_back(std::move(config));
AllocatePorts();
}
@@ -664,11 +651,16 @@ void BasicPortAllocatorSession::OnConfigStop() {
void BasicPortAllocatorSession::AllocatePorts() {
RTC_DCHECK_RUN_ON(network_thread_);
- network_thread_->Post(RTC_FROM_HERE, this, MSG_ALLOCATE);
+ network_thread_->PostTask(webrtc::ToQueuedTask(
+ network_safety_, [this, allocation_epoch = allocation_epoch_] {
+ OnAllocate(allocation_epoch);
+ }));
}
-void BasicPortAllocatorSession::OnAllocate() {
+void BasicPortAllocatorSession::OnAllocate(int allocation_epoch) {
RTC_DCHECK_RUN_ON(network_thread_);
+ if (allocation_epoch != allocation_epoch_)
+ return;
if (network_manager_started_ && !IsStopped()) {
bool disable_equivalent_phases = true;
@@ -774,7 +766,8 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) {
done_signal_needed = true;
} else {
RTC_LOG(LS_INFO) << "Allocate ports on " << networks.size() << " networks";
- PortConfiguration* config = configs_.empty() ? nullptr : configs_.back();
+ PortConfiguration* config =
+ configs_.empty() ? nullptr : configs_.back().get();
for (uint32_t i = 0; i < networks.size(); ++i) {
uint32_t sequence_flags = flags();
if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) {
@@ -814,9 +807,11 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) {
}
AllocationSequence* sequence =
- new AllocationSequence(this, networks[i], config, sequence_flags);
- sequence->SignalPortAllocationComplete.connect(
- this, &BasicPortAllocatorSession::OnPortAllocationComplete);
+ new AllocationSequence(this, networks[i], config, sequence_flags,
+ [this, safety_flag = network_safety_.flag()] {
+ if (safety_flag->alive())
+ OnPortAllocationComplete();
+ });
sequence->Init();
sequence->Start();
sequences_.push_back(sequence);
@@ -824,7 +819,8 @@ void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) {
}
}
if (done_signal_needed) {
- network_thread_->Post(RTC_FROM_HERE, this, MSG_SEQUENCEOBJECTS_CREATED);
+ network_thread_->PostTask(webrtc::ToQueuedTask(
+ network_safety_, [this] { OnAllocationSequenceObjectsCreated(); }));
}
}
@@ -1128,8 +1124,7 @@ bool BasicPortAllocatorSession::CandidatePairable(const Candidate& c,
!host_candidates_disabled);
}
-void BasicPortAllocatorSession::OnPortAllocationComplete(
- AllocationSequence* seq) {
+void BasicPortAllocatorSession::OnPortAllocationComplete() {
RTC_DCHECK_RUN_ON(network_thread_);
// Send candidate allocation complete signal if all ports are done.
MaybeSignalCandidatesAllocationDone();
@@ -1220,10 +1215,12 @@ void BasicPortAllocatorSession::PrunePortsAndRemoveCandidates(
// AllocationSequence
-AllocationSequence::AllocationSequence(BasicPortAllocatorSession* session,
- rtc::Network* network,
- PortConfiguration* config,
- uint32_t flags)
+AllocationSequence::AllocationSequence(
+ BasicPortAllocatorSession* session,
+ rtc::Network* network,
+ PortConfiguration* config,
+ uint32_t flags,
+ std::function<void()> port_allocation_complete_callback)
: session_(session),
network_(network),
config_(config),
@@ -1231,7 +1228,9 @@ AllocationSequence::AllocationSequence(BasicPortAllocatorSession* session,
flags_(flags),
udp_socket_(),
udp_port_(NULL),
- phase_(0) {}
+ phase_(0),
+ port_allocation_complete_callback_(
+ std::move(port_allocation_complete_callback)) {}
void AllocationSequence::Init() {
if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {
@@ -1248,6 +1247,7 @@ void AllocationSequence::Init() {
}
void AllocationSequence::Clear() {
+ TRACE_EVENT0("webrtc", "AllocationSequence::Clear");
udp_port_ = NULL;
relay_ports_.clear();
}
@@ -1259,10 +1259,6 @@ void AllocationSequence::OnNetworkFailed() {
Stop();
}
-AllocationSequence::~AllocationSequence() {
- session_->network_thread()->Clear(this);
-}
-
void AllocationSequence::DisableEquivalentPhases(rtc::Network* network,
PortConfiguration* config,
uint32_t* flags) {
@@ -1337,7 +1333,9 @@ void AllocationSequence::DisableEquivalentPhases(rtc::Network* network,
void AllocationSequence::Start() {
state_ = kRunning;
- session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE);
+
+ session_->network_thread()->PostTask(webrtc::ToQueuedTask(
+ safety_, [this, epoch = epoch_] { Process(epoch); }));
// Take a snapshot of the best IP, so that when DisableEquivalentPhases is
// called next time, we enable all phases if the best IP has since changed.
previous_best_ip_ = network_->GetBestIP();
@@ -1347,16 +1345,18 @@ void AllocationSequence::Stop() {
// If the port is completed, don't set it to stopped.
if (state_ == kRunning) {
state_ = kStopped;
- session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
+ // Cause further Process calls in the previous epoch to be ignored.
+ ++epoch_;
}
}
-void AllocationSequence::OnMessage(rtc::Message* msg) {
+void AllocationSequence::Process(int epoch) {
RTC_DCHECK(rtc::Thread::Current() == session_->network_thread());
- RTC_DCHECK(msg->message_id == MSG_ALLOCATION_PHASE);
-
const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};
+ if (epoch != epoch_)
+ return;
+
// Perform all of the phases in the current step.
RTC_LOG(LS_INFO) << network_->ToString()
<< ": Allocation Phase=" << PHASE_NAMES[phase_];
@@ -1382,14 +1382,16 @@ void AllocationSequence::OnMessage(rtc::Message* msg) {
if (state() == kRunning) {
++phase_;
- session_->network_thread()->PostDelayed(RTC_FROM_HERE,
- session_->allocator()->step_delay(),
- this, MSG_ALLOCATION_PHASE);
+ session_->network_thread()->PostDelayedTask(
+ webrtc::ToQueuedTask(safety_,
+ [this, epoch = epoch_] { Process(epoch); }),
+ session_->allocator()->step_delay());
} else {
- // If all phases in AllocationSequence are completed, no allocation
- // steps needed further. Canceling pending signal.
- session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
- SignalPortAllocationComplete(this);
+ // No allocation steps needed further if all phases in AllocationSequence
+ // are completed. Cause further Process calls in the previous epoch to be
+ // ignored.
+ ++epoch_;
+ port_allocation_complete_callback_();
}
}
@@ -1657,8 +1659,6 @@ PortConfiguration::PortConfiguration(const ServerAddresses& stun_servers,
webrtc::field_trial::IsDisabled("WebRTC-UseTurnServerAsStunServer");
}
-PortConfiguration::~PortConfiguration() = default;
-
ServerAddresses PortConfiguration::StunServers() {
if (!stun_address.IsNil() &&
stun_servers.find(stun_address) == stun_servers.end()) {