diff options
Diffstat (limited to 'rtc_base/task_utils')
-rw-r--r-- | rtc_base/task_utils/BUILD.gn | 3 | ||||
-rw-r--r-- | rtc_base/task_utils/pending_task_safety_flag.cc | 8 | ||||
-rw-r--r-- | rtc_base/task_utils/pending_task_safety_flag.h | 9 | ||||
-rw-r--r-- | rtc_base/task_utils/repeating_task.cc | 40 | ||||
-rw-r--r-- | rtc_base/task_utils/repeating_task.h | 69 | ||||
-rw-r--r-- | rtc_base/task_utils/repeating_task_unittest.cc | 18 |
6 files changed, 75 insertions, 72 deletions
diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn index 39e4ba1100..64a041908e 100644 --- a/rtc_base/task_utils/BUILD.gn +++ b/rtc_base/task_utils/BUILD.gn @@ -14,6 +14,7 @@ rtc_library("repeating_task") { "repeating_task.h", ] deps = [ + ":pending_task_safety_flag", ":to_queued_task", "..:logging", "..:timeutils", @@ -33,7 +34,7 @@ rtc_library("pending_task_safety_flag") { ] deps = [ "..:checks", - "..:refcount", + "../../api:refcountedbase", "../../api:scoped_refptr", "../../api:sequence_checker", "../system:no_unique_address", diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc index b83d714916..57b3f6ce88 100644 --- a/rtc_base/task_utils/pending_task_safety_flag.cc +++ b/rtc_base/task_utils/pending_task_safety_flag.cc @@ -10,19 +10,17 @@ #include "rtc_base/task_utils/pending_task_safety_flag.h" -#include "rtc_base/ref_counted_object.h" - namespace webrtc { // static rtc::scoped_refptr<PendingTaskSafetyFlag> PendingTaskSafetyFlag::Create() { - return new rtc::RefCountedObject<PendingTaskSafetyFlag>(true); + return new PendingTaskSafetyFlag(true); } rtc::scoped_refptr<PendingTaskSafetyFlag> PendingTaskSafetyFlag::CreateDetached() { rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag( - new rtc::RefCountedObject<PendingTaskSafetyFlag>(true)); + new PendingTaskSafetyFlag(true)); safety_flag->main_sequence_.Detach(); return safety_flag; } @@ -30,7 +28,7 @@ PendingTaskSafetyFlag::CreateDetached() { rtc::scoped_refptr<PendingTaskSafetyFlag> PendingTaskSafetyFlag::CreateDetachedInactive() { rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag( - new rtc::RefCountedObject<PendingTaskSafetyFlag>(false)); + new PendingTaskSafetyFlag(false)); safety_flag->main_sequence_.Detach(); return safety_flag; } diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h index 4864b5de3b..fc1b5bd878 100644 --- a/rtc_base/task_utils/pending_task_safety_flag.h +++ b/rtc_base/task_utils/pending_task_safety_flag.h @@ -11,10 +11,10 @@ #ifndef RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ #define RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_ +#include "api/ref_counted_base.h" #include "api/scoped_refptr.h" #include "api/sequence_checker.h" #include "rtc_base/checks.h" -#include "rtc_base/ref_count.h" #include "rtc_base/system/no_unique_address.h" namespace webrtc { @@ -55,7 +55,8 @@ namespace webrtc { // my_task_queue_->PostTask(ToQueuedTask(pending_task_safety_flag_, // [this]() { MyMethod(); })); // -class PendingTaskSafetyFlag : public rtc::RefCountInterface { +class PendingTaskSafetyFlag final + : public rtc::RefCountedNonVirtual<PendingTaskSafetyFlag> { public: static rtc::scoped_refptr<PendingTaskSafetyFlag> Create(); @@ -113,7 +114,7 @@ class PendingTaskSafetyFlag : public rtc::RefCountInterface { // This should be used by the class that wants tasks dropped after destruction. // The requirement is that the instance has to be constructed and destructed on // the same thread as the potentially dropped tasks would be running on. -class ScopedTaskSafety { +class ScopedTaskSafety final { public: ScopedTaskSafety() = default; ~ScopedTaskSafety() { flag_->SetNotAlive(); } @@ -128,7 +129,7 @@ class ScopedTaskSafety { // Like ScopedTaskSafety, but allows construction on a different thread than // where the flag will be used. -class ScopedTaskSafetyDetached { +class ScopedTaskSafetyDetached final { public: ScopedTaskSafetyDetached() = default; ~ScopedTaskSafetyDetached() { flag_->SetNotAlive(); } diff --git a/rtc_base/task_utils/repeating_task.cc b/rtc_base/task_utils/repeating_task.cc index 574e6331f1..9636680cb4 100644 --- a/rtc_base/task_utils/repeating_task.cc +++ b/rtc_base/task_utils/repeating_task.cc @@ -12,32 +12,36 @@ #include "absl/memory/memory.h" #include "rtc_base/logging.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/time_utils.h" namespace webrtc { namespace webrtc_repeating_task_impl { -RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue, - TimeDelta first_delay, - Clock* clock) +RepeatingTaskBase::RepeatingTaskBase( + TaskQueueBase* task_queue, + TimeDelta first_delay, + Clock* clock, + rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag) : task_queue_(task_queue), clock_(clock), - next_run_time_(clock_->CurrentTime() + first_delay) {} + next_run_time_(clock_->CurrentTime() + first_delay), + alive_flag_(std::move(alive_flag)) {} RepeatingTaskBase::~RepeatingTaskBase() = default; bool RepeatingTaskBase::Run() { RTC_DCHECK_RUN_ON(task_queue_); // Return true to tell the TaskQueue to destruct this object. - if (next_run_time_.IsPlusInfinity()) + if (!alive_flag_->alive()) return true; TimeDelta delay = RunClosure(); // The closure might have stopped this task, in which case we return true to // destruct this object. - if (next_run_time_.IsPlusInfinity()) + if (!alive_flag_->alive()) return true; RTC_DCHECK(delay.IsFinite()); @@ -53,33 +57,11 @@ bool RepeatingTaskBase::Run() { return false; } -void RepeatingTaskBase::Stop() { - RTC_DCHECK_RUN_ON(task_queue_); - RTC_DCHECK(next_run_time_.IsFinite()); - next_run_time_ = Timestamp::PlusInfinity(); -} - } // namespace webrtc_repeating_task_impl -RepeatingTaskHandle::RepeatingTaskHandle(RepeatingTaskHandle&& other) - : repeating_task_(other.repeating_task_) { - other.repeating_task_ = nullptr; -} - -RepeatingTaskHandle& RepeatingTaskHandle::operator=( - RepeatingTaskHandle&& other) { - repeating_task_ = other.repeating_task_; - other.repeating_task_ = nullptr; - return *this; -} - -RepeatingTaskHandle::RepeatingTaskHandle( - webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task) - : repeating_task_(repeating_task) {} - void RepeatingTaskHandle::Stop() { if (repeating_task_) { - repeating_task_->Stop(); + repeating_task_->SetNotAlive(); repeating_task_ = nullptr; } } diff --git a/rtc_base/task_utils/repeating_task.h b/rtc_base/task_utils/repeating_task.h index 487b7d19d4..d5066fdb5c 100644 --- a/rtc_base/task_utils/repeating_task.h +++ b/rtc_base/task_utils/repeating_task.h @@ -19,22 +19,19 @@ #include "api/task_queue/task_queue_base.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "system_wrappers/include/clock.h" namespace webrtc { - -class RepeatingTaskHandle; - namespace webrtc_repeating_task_impl { class RepeatingTaskBase : public QueuedTask { public: RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay, - Clock* clock); + Clock* clock, + rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag); ~RepeatingTaskBase() override; - void Stop(); - private: virtual TimeDelta RunClosure() = 0; @@ -42,9 +39,10 @@ class RepeatingTaskBase : public QueuedTask { TaskQueueBase* const task_queue_; Clock* const clock_; - // This is always finite, except for the special case where it's PlusInfinity - // to signal that the task should stop. + // This is always finite. Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_); + rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag_ + RTC_GUARDED_BY(task_queue_); }; // The template closure pattern is based on rtc::ClosureTask. @@ -54,8 +52,12 @@ class RepeatingTaskImpl final : public RepeatingTaskBase { RepeatingTaskImpl(TaskQueueBase* task_queue, TimeDelta first_delay, Closure&& closure, - Clock* clock) - : RepeatingTaskBase(task_queue, first_delay, clock), + Clock* clock, + rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag) + : RepeatingTaskBase(task_queue, + first_delay, + clock, + std::move(alive_flag)), closure_(std::forward<Closure>(closure)) { static_assert( std::is_same<TimeDelta, @@ -81,28 +83,27 @@ class RepeatingTaskHandle { public: RepeatingTaskHandle() = default; ~RepeatingTaskHandle() = default; - RepeatingTaskHandle(RepeatingTaskHandle&& other); - RepeatingTaskHandle& operator=(RepeatingTaskHandle&& other); + RepeatingTaskHandle(RepeatingTaskHandle&& other) = default; + RepeatingTaskHandle& operator=(RepeatingTaskHandle&& other) = default; RepeatingTaskHandle(const RepeatingTaskHandle&) = delete; RepeatingTaskHandle& operator=(const RepeatingTaskHandle&) = delete; // Start can be used to start a task that will be reposted with a delay // determined by the return value of the provided closure. The actual task is // owned by the TaskQueue and will live until it has been stopped or the - // TaskQueue is destroyed. Note that this means that trying to stop the - // repeating task after the TaskQueue is destroyed is an error. However, it's - // perfectly fine to destroy the handle while the task is running, since the - // repeated task is owned by the TaskQueue. + // TaskQueue deletes it. It's perfectly fine to destroy the handle while the + // task is running, since the repeated task is owned by the TaskQueue. template <class Closure> static RepeatingTaskHandle Start(TaskQueueBase* task_queue, Closure&& closure, Clock* clock = Clock::GetRealTimeClock()) { - auto repeating_task = std::make_unique< - webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>( - task_queue, TimeDelta::Zero(), std::forward<Closure>(closure), clock); - auto* repeating_task_ptr = repeating_task.get(); - task_queue->PostTask(std::move(repeating_task)); - return RepeatingTaskHandle(repeating_task_ptr); + auto alive_flag = PendingTaskSafetyFlag::CreateDetached(); + task_queue->PostTask( + std::make_unique< + webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>( + task_queue, TimeDelta::Zero(), std::forward<Closure>(closure), + clock, alive_flag)); + return RepeatingTaskHandle(std::move(alive_flag)); } // DelayedStart is equivalent to Start except that the first invocation of the @@ -113,12 +114,14 @@ class RepeatingTaskHandle { TimeDelta first_delay, Closure&& closure, Clock* clock = Clock::GetRealTimeClock()) { - auto repeating_task = std::make_unique< - webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>( - task_queue, first_delay, std::forward<Closure>(closure), clock); - auto* repeating_task_ptr = repeating_task.get(); - task_queue->PostDelayedTask(std::move(repeating_task), first_delay.ms()); - return RepeatingTaskHandle(repeating_task_ptr); + auto alive_flag = PendingTaskSafetyFlag::CreateDetached(); + task_queue->PostDelayedTask( + std::make_unique< + webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>( + task_queue, first_delay, std::forward<Closure>(closure), clock, + alive_flag), + first_delay.ms()); + return RepeatingTaskHandle(std::move(alive_flag)); } // Stops future invocations of the repeating task closure. Can only be called @@ -127,15 +130,15 @@ class RepeatingTaskHandle { // closure itself. void Stop(); - // Returns true if Start() or DelayedStart() was called most recently. Returns - // false initially and if Stop() or PostStop() was called most recently. + // Returns true until Stop() was called. + // Can only be called from the TaskQueue where the task is running. bool Running() const; private: explicit RepeatingTaskHandle( - webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task); - // Owned by the task queue. - webrtc_repeating_task_impl::RepeatingTaskBase* repeating_task_ = nullptr; + rtc::scoped_refptr<PendingTaskSafetyFlag> alive_flag) + : repeating_task_(std::move(alive_flag)) {} + rtc::scoped_refptr<PendingTaskSafetyFlag> repeating_task_; }; } // namespace webrtc diff --git a/rtc_base/task_utils/repeating_task_unittest.cc b/rtc_base/task_utils/repeating_task_unittest.cc index 2fb15d1e5a..b23284f988 100644 --- a/rtc_base/task_utils/repeating_task_unittest.cc +++ b/rtc_base/task_utils/repeating_task_unittest.cc @@ -276,4 +276,22 @@ TEST(RepeatingTaskTest, ClockIntegration) { handle.Stop(); } +TEST(RepeatingTaskTest, CanBeStoppedAfterTaskQueueDeletedTheRepeatingTask) { + std::unique_ptr<QueuedTask> repeating_task; + + MockTaskQueue task_queue; + EXPECT_CALL(task_queue, PostDelayedTask) + .WillOnce([&](std::unique_ptr<QueuedTask> task, uint32_t milliseconds) { + repeating_task = std::move(task); + }); + + RepeatingTaskHandle handle = + RepeatingTaskHandle::DelayedStart(&task_queue, TimeDelta::Millis(100), + [] { return TimeDelta::Millis(100); }); + + // shutdown task queue: delete all pending tasks and run 'regular' task. + repeating_task = nullptr; + handle.Stop(); +} + } // namespace webrtc |