aboutsummaryrefslogtreecommitdiff
path: root/rtc_base/task_utils
diff options
context:
space:
mode:
Diffstat (limited to 'rtc_base/task_utils')
-rw-r--r--rtc_base/task_utils/BUILD.gn3
-rw-r--r--rtc_base/task_utils/pending_task_safety_flag.cc8
-rw-r--r--rtc_base/task_utils/pending_task_safety_flag.h9
-rw-r--r--rtc_base/task_utils/repeating_task.cc40
-rw-r--r--rtc_base/task_utils/repeating_task.h69
-rw-r--r--rtc_base/task_utils/repeating_task_unittest.cc18
6 files changed, 75 insertions, 72 deletions
diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn
index 39e4ba1100..64a041908e 100644
--- a/rtc_base/task_utils/BUILD.gn
+++ b/rtc_base/task_utils/BUILD.gn
@@ -14,6 +14,7 @@ rtc_library("repeating_task") {
"repeating_task.h",
]
deps = [
+ ":pending_task_safety_flag",
":to_queued_task",
"..:logging",
"..:timeutils",
@@ -33,7 +34,7 @@ rtc_library("pending_task_safety_flag") {
]
deps = [
"..:checks",
- "..:refcount",
+ "../../api:refcountedbase",
"../../api:scoped_refptr",
"../../api:sequence_checker",
"../system:no_unique_address",
diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc
index b83d714916..57b3f6ce88 100644
--- a/rtc_base/task_utils/pending_task_safety_flag.cc
+++ b/rtc_base/task_utils/pending_task_safety_flag.cc
@@ -10,19 +10,17 @@
#include "rtc_base/task_utils/pending_task_safety_flag.h"
-#include "rtc_base/ref_counted_object.h"
-
namespace webrtc {
// static
rtc::scoped_refptr<PendingTaskSafetyFlag> PendingTaskSafetyFlag::Create() {
- return new rtc::RefCountedObject<PendingTaskSafetyFlag>(true);
+ return new PendingTaskSafetyFlag(true);
}
rtc::scoped_refptr<PendingTaskSafetyFlag>
PendingTaskSafetyFlag::CreateDetached() {
rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag(
- new rtc::RefCountedObject<PendingTaskSafetyFlag>(true));
+ new PendingTaskSafetyFlag(true));
safety_flag->main_sequence_.Detach();
return safety_flag;
}
@@ -30,7 +28,7 @@ PendingTaskSafetyFlag::CreateDetached() {
rtc::scoped_refptr<PendingTaskSafetyFlag>
PendingTaskSafetyFlag::CreateDetachedInactive() {
rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag(
- new rtc::RefCountedObject<PendingTaskSafetyFlag>(false));
+ new PendingTaskSafetyFlag(false));
safety_flag->main_sequence_.Detach();
return safety_flag;
}
diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h
index 4864b5de3b..fc1b5bd878 100644
--- a/rtc_base/task_utils/pending_task_safety_flag.h
+++ b/rtc_base/task_utils/pending_task_safety_flag.h
@@ -11,10 +11,10 @@
#ifndef RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
#define RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
+#include "api/ref_counted_base.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "rtc_base/checks.h"
-#include "rtc_base/ref_count.h"
#include "rtc_base/system/no_unique_address.h"
namespace webrtc {
@@ -55,7 +55,8 @@ namespace webrtc {
// my_task_queue_->PostTask(ToQueuedTask(pending_task_safety_flag_,
// [this]() { MyMethod(); }));
//
-class PendingTaskSafetyFlag : public rtc::RefCountInterface {
+class PendingTaskSafetyFlag final
+ : public rtc::RefCountedNonVirtual<PendingTaskSafetyFlag> {
public:
static rtc::scoped_refptr<PendingTaskSafetyFlag> Create();
@@ -113,7 +114,7 @@ class PendingTaskSafetyFlag : public rtc::RefCountInterface {
// This should be used by the class that wants tasks dropped after destruction.
// The requirement is that the instance has to be constructed and destructed on
// the same thread as the potentially dropped tasks would be running on.
-class ScopedTaskSafety {
+class ScopedTaskSafety final {
public:
ScopedTaskSafety() = default;
~ScopedTaskSafety() { flag_->SetNotAlive(); }
@@ -128,7 +129,7 @@ class ScopedTaskSafety {
// Like ScopedTaskSafety, but allows construction on a different thread than
// where the flag will be used.
-class ScopedTaskSafetyDetached {
+class ScopedTaskSafetyDetached final {
public:
ScopedTaskSafetyDetached() = default;
~ScopedTaskSafetyDetached() { flag_->SetNotAlive(); }
diff --git a/rtc_base/task_utils/repeating_task.cc b/rtc_base/task_utils/repeating_task.cc
index 574e6331f1..9636680cb4 100644
--- a/rtc_base/task_utils/repeating_task.cc
+++ b/rtc_base/task_utils/repeating_task.cc
@@ -12,32 +12,36 @@
#include "absl/memory/memory.h"
#include "rtc_base/logging.h"
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
namespace webrtc {
namespace webrtc_repeating_task_impl {
-RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue,
- TimeDelta first_delay,
- Clock* clock)
+RepeatingTaskBase::RepeatingTaskBase(
+ TaskQueueBase* task_queue,
+ TimeDelta first_delay,
+ Clock* clock,
+ rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag)
: task_queue_(task_queue),
clock_(clock),
- next_run_time_(clock_->CurrentTime() + first_delay) {}
+ next_run_time_(clock_->CurrentTime() + first_delay),
+ alive_flag_(std::move(alive_flag)) {}
RepeatingTaskBase::~RepeatingTaskBase() = default;
bool RepeatingTaskBase::Run() {
RTC_DCHECK_RUN_ON(task_queue_);
// Return true to tell the TaskQueue to destruct this object.
- if (next_run_time_.IsPlusInfinity())
+ if (!alive_flag_->alive())
return true;
TimeDelta delay = RunClosure();
// The closure might have stopped this task, in which case we return true to
// destruct this object.
- if (next_run_time_.IsPlusInfinity())
+ if (!alive_flag_->alive())
return true;
RTC_DCHECK(delay.IsFinite());
@@ -53,33 +57,11 @@ bool RepeatingTaskBase::Run() {
return false;
}
-void RepeatingTaskBase::Stop() {
- RTC_DCHECK_RUN_ON(task_queue_);
- RTC_DCHECK(next_run_time_.IsFinite());
- next_run_time_ = Timestamp::PlusInfinity();
-}
-
} // namespace webrtc_repeating_task_impl
-RepeatingTaskHandle::RepeatingTaskHandle(RepeatingTaskHandle&& other)
- : repeating_task_(other.repeating_task_) {
- other.repeating_task_ = nullptr;
-}
-
-RepeatingTaskHandle& RepeatingTaskHandle::operator=(
- RepeatingTaskHandle&& other) {
- repeating_task_ = other.repeating_task_;
- other.repeating_task_ = nullptr;
- return *this;
-}
-
-RepeatingTaskHandle::RepeatingTaskHandle(
- webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task)
- : repeating_task_(repeating_task) {}
-
void RepeatingTaskHandle::Stop() {
if (repeating_task_) {
- repeating_task_->Stop();
+ repeating_task_->SetNotAlive();
repeating_task_ = nullptr;
}
}
diff --git a/rtc_base/task_utils/repeating_task.h b/rtc_base/task_utils/repeating_task.h
index 487b7d19d4..d5066fdb5c 100644
--- a/rtc_base/task_utils/repeating_task.h
+++ b/rtc_base/task_utils/repeating_task.h
@@ -19,22 +19,19 @@
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
-
-class RepeatingTaskHandle;
-
namespace webrtc_repeating_task_impl {
class RepeatingTaskBase : public QueuedTask {
public:
RepeatingTaskBase(TaskQueueBase* task_queue,
TimeDelta first_delay,
- Clock* clock);
+ Clock* clock,
+ rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag);
~RepeatingTaskBase() override;
- void Stop();
-
private:
virtual TimeDelta RunClosure() = 0;
@@ -42,9 +39,10 @@ class RepeatingTaskBase : public QueuedTask {
TaskQueueBase* const task_queue_;
Clock* const clock_;
- // This is always finite, except for the special case where it's PlusInfinity
- // to signal that the task should stop.
+ // This is always finite.
Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_);
+ rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag_
+ RTC_GUARDED_BY(task_queue_);
};
// The template closure pattern is based on rtc::ClosureTask.
@@ -54,8 +52,12 @@ class RepeatingTaskImpl final : public RepeatingTaskBase {
RepeatingTaskImpl(TaskQueueBase* task_queue,
TimeDelta first_delay,
Closure&& closure,
- Clock* clock)
- : RepeatingTaskBase(task_queue, first_delay, clock),
+ Clock* clock,
+ rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag)
+ : RepeatingTaskBase(task_queue,
+ first_delay,
+ clock,
+ std::move(alive_flag)),
closure_(std::forward<Closure>(closure)) {
static_assert(
std::is_same<TimeDelta,
@@ -81,28 +83,27 @@ class RepeatingTaskHandle {
public:
RepeatingTaskHandle() = default;
~RepeatingTaskHandle() = default;
- RepeatingTaskHandle(RepeatingTaskHandle&& other);
- RepeatingTaskHandle& operator=(RepeatingTaskHandle&& other);
+ RepeatingTaskHandle(RepeatingTaskHandle&& other) = default;
+ RepeatingTaskHandle& operator=(RepeatingTaskHandle&& other) = default;
RepeatingTaskHandle(const RepeatingTaskHandle&) = delete;
RepeatingTaskHandle& operator=(const RepeatingTaskHandle&) = delete;
// Start can be used to start a task that will be reposted with a delay
// determined by the return value of the provided closure. The actual task is
// owned by the TaskQueue and will live until it has been stopped or the
- // TaskQueue is destroyed. Note that this means that trying to stop the
- // repeating task after the TaskQueue is destroyed is an error. However, it's
- // perfectly fine to destroy the handle while the task is running, since the
- // repeated task is owned by the TaskQueue.
+ // TaskQueue deletes it. It's perfectly fine to destroy the handle while the
+ // task is running, since the repeated task is owned by the TaskQueue.
template <class Closure>
static RepeatingTaskHandle Start(TaskQueueBase* task_queue,
Closure&& closure,
Clock* clock = Clock::GetRealTimeClock()) {
- auto repeating_task = std::make_unique<
- webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
- task_queue, TimeDelta::Zero(), std::forward<Closure>(closure), clock);
- auto* repeating_task_ptr = repeating_task.get();
- task_queue->PostTask(std::move(repeating_task));
- return RepeatingTaskHandle(repeating_task_ptr);
+ auto alive_flag = PendingTaskSafetyFlag::CreateDetached();
+ task_queue->PostTask(
+ std::make_unique<
+ webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
+ task_queue, TimeDelta::Zero(), std::forward<Closure>(closure),
+ clock, alive_flag));
+ return RepeatingTaskHandle(std::move(alive_flag));
}
// DelayedStart is equivalent to Start except that the first invocation of the
@@ -113,12 +114,14 @@ class RepeatingTaskHandle {
TimeDelta first_delay,
Closure&& closure,
Clock* clock = Clock::GetRealTimeClock()) {
- auto repeating_task = std::make_unique<
- webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
- task_queue, first_delay, std::forward<Closure>(closure), clock);
- auto* repeating_task_ptr = repeating_task.get();
- task_queue->PostDelayedTask(std::move(repeating_task), first_delay.ms());
- return RepeatingTaskHandle(repeating_task_ptr);
+ auto alive_flag = PendingTaskSafetyFlag::CreateDetached();
+ task_queue->PostDelayedTask(
+ std::make_unique<
+ webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
+ task_queue, first_delay, std::forward<Closure>(closure), clock,
+ alive_flag),
+ first_delay.ms());
+ return RepeatingTaskHandle(std::move(alive_flag));
}
// Stops future invocations of the repeating task closure. Can only be called
@@ -127,15 +130,15 @@ class RepeatingTaskHandle {
// closure itself.
void Stop();
- // Returns true if Start() or DelayedStart() was called most recently. Returns
- // false initially and if Stop() or PostStop() was called most recently.
+ // Returns true until Stop() was called.
+ // Can only be called from the TaskQueue where the task is running.
bool Running() const;
private:
explicit RepeatingTaskHandle(
- webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task);
- // Owned by the task queue.
- webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task_ = nullptr;
+ rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag)
+ : repeating_task_(std::move(alive_flag)) {}
+ rtc::scoped_refptr<PendingTaskSafetyFlag> repeating_task_;
};
} // namespace webrtc
diff --git a/rtc_base/task_utils/repeating_task_unittest.cc b/rtc_base/task_utils/repeating_task_unittest.cc
index 2fb15d1e5a..b23284f988 100644
--- a/rtc_base/task_utils/repeating_task_unittest.cc
+++ b/rtc_base/task_utils/repeating_task_unittest.cc
@@ -276,4 +276,22 @@ TEST(RepeatingTaskTest, ClockIntegration) {
handle.Stop();
}
+TEST(RepeatingTaskTest, CanBeStoppedAfterTaskQueueDeletedTheRepeatingTask) {
+ std::unique_ptr<QueuedTask> repeating_task;
+
+ MockTaskQueue task_queue;
+ EXPECT_CALL(task_queue, PostDelayedTask)
+ .WillOnce([&](std::unique_ptr<QueuedTask> task, uint32_t milliseconds) {
+ repeating_task = std::move(task);
+ });
+
+ RepeatingTaskHandle handle =
+ RepeatingTaskHandle::DelayedStart(&task_queue, TimeDelta::Millis(100),
+ [] { return TimeDelta::Millis(100); });
+
+ // shutdown task queue: delete all pending tasks and run 'regular' task.
+ repeating_task = nullptr;
+ handle.Stop();
+}
+
} // namespace webrtc