diff options
Diffstat (limited to 'base/threading')
37 files changed, 2005 insertions, 962 deletions
diff --git a/base/threading/non_thread_safe.h b/base/threading/non_thread_safe.h index d41c08608c..64ae8e4da9 100644 --- a/base/threading/non_thread_safe.h +++ b/base/threading/non_thread_safe.h @@ -10,14 +10,7 @@ // There is a specific macro to do it: NON_EXPORTED_BASE(), defined in // compiler_specific.h #include "base/compiler_specific.h" - -// See comment at top of thread_checker.h -#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) -#define ENABLE_NON_THREAD_SAFE 1 -#else -#define ENABLE_NON_THREAD_SAFE 0 -#endif - +#include "base/logging.h" #include "base/threading/non_thread_safe_impl.h" namespace base { @@ -58,13 +51,11 @@ class NonThreadSafeDoNothing { // to have a base::ThreadChecker as a member, rather than inherit from // NonThreadSafe. For more details about when to choose one over the other, see // the documentation for base::ThreadChecker. -#if ENABLE_NON_THREAD_SAFE +#if DCHECK_IS_ON() typedef NonThreadSafeImpl NonThreadSafe; #else typedef NonThreadSafeDoNothing NonThreadSafe; -#endif // ENABLE_NON_THREAD_SAFE - -#undef ENABLE_NON_THREAD_SAFE +#endif // DCHECK_IS_ON() } // namespace base diff --git a/base/threading/non_thread_safe_unittest.cc b/base/threading/non_thread_safe_unittest.cc index d523fc55b1..5752d5f2b3 100644 --- a/base/threading/non_thread_safe_unittest.cc +++ b/base/threading/non_thread_safe_unittest.cc @@ -8,6 +8,7 @@ #include "base/logging.h" #include "base/macros.h" +#include "base/test/gtest_util.h" #include "base/threading/simple_thread.h" #include "testing/gtest/include/gtest/gtest.h" @@ -106,8 +107,6 @@ TEST(NonThreadSafeTest, DetachThenDestructOnDifferentThread) { delete_on_thread.Join(); } -#if GTEST_HAS_DEATH_TEST || !ENABLE_NON_THREAD_SAFE - void NonThreadSafeClass::MethodOnDifferentThreadImpl() { std::unique_ptr<NonThreadSafeClass> non_thread_safe_class( new NonThreadSafeClass); @@ -120,17 +119,15 @@ void NonThreadSafeClass::MethodOnDifferentThreadImpl() { call_on_thread.Join(); } -#if ENABLE_NON_THREAD_SAFE +#if DCHECK_IS_ON() TEST(NonThreadSafeDeathTest, MethodNotAllowedOnDifferentThreadInDebug) { - ASSERT_DEATH({ - NonThreadSafeClass::MethodOnDifferentThreadImpl(); - }, ""); + ASSERT_DCHECK_DEATH({ NonThreadSafeClass::MethodOnDifferentThreadImpl(); }); } #else TEST(NonThreadSafeTest, MethodAllowedOnDifferentThreadInRelease) { NonThreadSafeClass::MethodOnDifferentThreadImpl(); } -#endif // ENABLE_NON_THREAD_SAFE +#endif // DCHECK_IS_ON() void NonThreadSafeClass::DestructorOnDifferentThreadImpl() { std::unique_ptr<NonThreadSafeClass> non_thread_safe_class( @@ -145,21 +142,15 @@ void NonThreadSafeClass::DestructorOnDifferentThreadImpl() { delete_on_thread.Join(); } -#if ENABLE_NON_THREAD_SAFE +#if DCHECK_IS_ON() TEST(NonThreadSafeDeathTest, DestructorNotAllowedOnDifferentThreadInDebug) { - ASSERT_DEATH({ - NonThreadSafeClass::DestructorOnDifferentThreadImpl(); - }, ""); + ASSERT_DCHECK_DEATH( + { NonThreadSafeClass::DestructorOnDifferentThreadImpl(); }); } #else TEST(NonThreadSafeTest, DestructorAllowedOnDifferentThreadInRelease) { NonThreadSafeClass::DestructorOnDifferentThreadImpl(); } -#endif // ENABLE_NON_THREAD_SAFE - -#endif // GTEST_HAS_DEATH_TEST || !ENABLE_NON_THREAD_SAFE - -// Just in case we ever get lumped together with other compilation units. -#undef ENABLE_NON_THREAD_SAFE +#endif // DCHECK_IS_ON() } // namespace base diff --git a/base/threading/platform_thread.h b/base/threading/platform_thread.h index 9b217a9c65..8c0d8e4432 100644 --- a/base/threading/platform_thread.h +++ b/base/threading/platform_thread.h @@ -18,6 +18,8 @@ #if defined(OS_WIN) #include <windows.h> +#elif defined(OS_MACOSX) +#include <mach/mach_types.h> #elif defined(OS_POSIX) #include <pthread.h> #include <unistd.h> @@ -28,6 +30,8 @@ namespace base { // Used for logging. Always an integer value. #if defined(OS_WIN) typedef DWORD PlatformThreadId; +#elif defined(OS_MACOSX) +typedef mach_port_t PlatformThreadId; #elif defined(OS_POSIX) typedef pid_t PlatformThreadId; #endif @@ -59,6 +63,8 @@ class PlatformThreadRef { return id_ == other.id_; } + bool operator!=(PlatformThreadRef other) const { return id_ != other.id_; } + bool is_null() const { return id_ == 0; } @@ -175,6 +181,12 @@ class BASE_EXPORT PlatformThread { // PlatformThreadHandle. static bool CreateNonJoinable(size_t stack_size, Delegate* delegate); + // CreateNonJoinableWithPriority() does the same thing as CreateNonJoinable() + // except the priority of the thread is set based on |priority|. + static bool CreateNonJoinableWithPriority(size_t stack_size, + Delegate* delegate, + ThreadPriority priority); + // Joins with a thread created via the Create function. This function blocks // the caller until the designated thread exits. This will invalidate // |thread_handle|. @@ -184,6 +196,10 @@ class BASE_EXPORT PlatformThread { // and |thread_handle| is invalidated after this call. static void Detach(PlatformThreadHandle thread_handle); + // Returns true if SetCurrentThreadPriority() can be used to increase the + // priority of the current thread. + static bool CanIncreaseCurrentThreadPriority(); + // Toggles the current thread's priority at runtime. A thread may not be able // to raise its priority back up after lowering it if the process does not // have a proper permission, e.g. CAP_SYS_NICE on Linux. A thread may not be @@ -195,6 +211,20 @@ class BASE_EXPORT PlatformThread { static ThreadPriority GetCurrentThreadPriority(); +#if defined(OS_LINUX) + // Toggles a specific thread's priority at runtime. This can be used to + // change the priority of a thread in a different process and will fail + // if the calling process does not have proper permissions. The + // SetCurrentThreadPriority() function above is preferred in favor of + // security but on platforms where sandboxed processes are not allowed to + // change priority this function exists to allow a non-sandboxed process + // to change the priority of sandboxed threads for improved performance. + // Warning: Don't use this for a main thread because that will change the + // whole thread group's (i.e. process) priority. + static void SetThreadPriority(PlatformThreadId thread_id, + ThreadPriority priority); +#endif + private: DISALLOW_IMPLICIT_CONSTRUCTORS(PlatformThread); }; diff --git a/base/threading/platform_thread_linux.cc b/base/threading/platform_thread_linux.cc index ab7c97ef51..474410f18a 100644 --- a/base/threading/platform_thread_linux.cc +++ b/base/threading/platform_thread_linux.cc @@ -8,8 +8,10 @@ #include <sched.h> #include <stddef.h> +#include "base/files/file_util.h" #include "base/lazy_instance.h" #include "base/logging.h" +#include "base/strings/string_number_conversions.h" #include "base/threading/platform_thread_internal_posix.h" #include "base/threading/thread_id_name_manager.h" #include "base/tracked_objects.h" @@ -18,11 +20,68 @@ #if !defined(OS_NACL) #include <pthread.h> #include <sys/prctl.h> +#include <sys/resource.h> +#include <sys/time.h> #include <sys/types.h> #include <unistd.h> #endif namespace base { +namespace { +#if !defined(OS_NACL) +const FilePath::CharType kCgroupDirectory[] = + FILE_PATH_LITERAL("/sys/fs/cgroup"); + +FilePath ThreadPriorityToCgroupDirectory(const FilePath& cgroup_filepath, + ThreadPriority priority) { + switch (priority) { + case ThreadPriority::NORMAL: + return cgroup_filepath; + case ThreadPriority::BACKGROUND: + return cgroup_filepath.Append(FILE_PATH_LITERAL("non-urgent")); + case ThreadPriority::DISPLAY: + case ThreadPriority::REALTIME_AUDIO: + return cgroup_filepath.Append(FILE_PATH_LITERAL("urgent")); + } + NOTREACHED(); + return FilePath(); +} + +void SetThreadCgroup(PlatformThreadId thread_id, + const FilePath& cgroup_directory) { + FilePath tasks_filepath = cgroup_directory.Append(FILE_PATH_LITERAL("tasks")); + std::string tid = IntToString(thread_id); + int bytes_written = WriteFile(tasks_filepath, tid.c_str(), tid.size()); + if (bytes_written != static_cast<int>(tid.size())) { + DVLOG(1) << "Failed to add " << tid << " to " << tasks_filepath.value(); + } +} + +void SetThreadCgroupForThreadPriority(PlatformThreadId thread_id, + const FilePath& cgroup_filepath, + ThreadPriority priority) { + // Append "chrome" suffix. + FilePath cgroup_directory = ThreadPriorityToCgroupDirectory( + cgroup_filepath.Append(FILE_PATH_LITERAL("chrome")), priority); + + // Silently ignore request if cgroup directory doesn't exist. + if (!DirectoryExists(cgroup_directory)) + return; + + SetThreadCgroup(thread_id, cgroup_directory); +} + +void SetThreadCgroupsForThreadPriority(PlatformThreadId thread_id, + ThreadPriority priority) { + FilePath cgroup_filepath(kCgroupDirectory); + SetThreadCgroupForThreadPriority( + thread_id, cgroup_filepath.Append(FILE_PATH_LITERAL("cpuset")), priority); + SetThreadCgroupForThreadPriority( + thread_id, cgroup_filepath.Append(FILE_PATH_LITERAL("schedtune")), + priority); +} +#endif +} // namespace namespace internal { @@ -41,6 +100,7 @@ const ThreadPriorityToNiceValuePair kThreadPriorityToNiceValueMap[4] = { bool SetCurrentThreadPriorityForPlatform(ThreadPriority priority) { #if !defined(OS_NACL) + SetThreadCgroupsForThreadPriority(PlatformThread::CurrentId(), priority); return priority == ThreadPriority::REALTIME_AUDIO && pthread_setschedparam(pthread_self(), SCHED_RR, &kRealTimePrio) == 0; #else @@ -90,6 +150,25 @@ void PlatformThread::SetName(const std::string& name) { #endif // !defined(OS_NACL) } +#if !defined(OS_NACL) +// static +void PlatformThread::SetThreadPriority(PlatformThreadId thread_id, + ThreadPriority priority) { + // Changing current main threads' priority is not permitted in favor of + // security, this interface is restricted to change only non-main thread + // priority. + CHECK_NE(thread_id, getpid()); + + SetThreadCgroupsForThreadPriority(thread_id, priority); + + const int nice_setting = internal::ThreadPriorityToNiceValue(priority); + if (setpriority(PRIO_PROCESS, thread_id, nice_setting)) { + DVPLOG(1) << "Failed to set nice value of thread (" << thread_id << ") to " + << nice_setting; + } +} +#endif // !defined(OS_NACL) + void InitThreading() {} void TerminateOnThread() {} diff --git a/base/threading/platform_thread_mac.mm b/base/threading/platform_thread_mac.mm index 51f3621af2..e743044ec1 100644 --- a/base/threading/platform_thread_mac.mm +++ b/base/threading/platform_thread_mac.mm @@ -162,6 +162,11 @@ void SetPriorityRealtimeAudio(mach_port_t mach_thread_id) { } // anonymous namespace // static +bool PlatformThread::CanIncreaseCurrentThreadPriority() { + return true; +} + +// static void PlatformThread::SetCurrentThreadPriority(ThreadPriority priority) { // Convert from pthread_t to mach thread identifier. mach_port_t mach_thread_id = diff --git a/base/threading/platform_thread_posix.cc b/base/threading/platform_thread_posix.cc index 2321b3cd49..9a6a2bb999 100644 --- a/base/threading/platform_thread_posix.cc +++ b/base/threading/platform_thread_posix.cc @@ -11,9 +11,12 @@ #include <stdint.h> #include <sys/resource.h> #include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> #include <memory> +#include "base/debug/activity_tracker.h" #include "base/lazy_instance.h" #include "base/logging.h" #include "base/threading/platform_thread_internal_posix.h" @@ -23,8 +26,6 @@ #if defined(OS_LINUX) #include <sys/syscall.h> -#elif defined(OS_ANDROID) -#include <sys/types.h> #endif namespace base { @@ -187,21 +188,32 @@ const char* PlatformThread::GetName() { bool PlatformThread::CreateWithPriority(size_t stack_size, Delegate* delegate, PlatformThreadHandle* thread_handle, ThreadPriority priority) { - return CreateThread(stack_size, true, // joinable thread - delegate, thread_handle, priority); + return CreateThread(stack_size, true /* joinable thread */, delegate, + thread_handle, priority); } // static bool PlatformThread::CreateNonJoinable(size_t stack_size, Delegate* delegate) { + return CreateNonJoinableWithPriority(stack_size, delegate, + ThreadPriority::NORMAL); +} + +// static +bool PlatformThread::CreateNonJoinableWithPriority(size_t stack_size, + Delegate* delegate, + ThreadPriority priority) { PlatformThreadHandle unused; bool result = CreateThread(stack_size, false /* non-joinable thread */, - delegate, &unused, ThreadPriority::NORMAL); + delegate, &unused, priority); return result; } // static void PlatformThread::Join(PlatformThreadHandle thread_handle) { + // Record the event that this thread is blocking upon (for hang diagnosis). + base::debug::ScopedThreadJoinActivity thread_activity(&thread_handle); + // Joining another thread may block the current thread for a long time, since // the thread referred to by |thread_handle| may still be running long-lived / // blocking tasks. @@ -218,6 +230,18 @@ void PlatformThread::Detach(PlatformThreadHandle thread_handle) { #if !defined(OS_MACOSX) // static +bool PlatformThread::CanIncreaseCurrentThreadPriority() { +#if defined(OS_NACL) + return false; +#else + // Only root can raise thread priority on POSIX environment. On Linux, users + // who have CAP_SYS_NICE permission also can raise the thread priority, but + // libcap.so would be needed to check the capability. + return geteuid() == 0; +#endif // defined(OS_NACL) +} + +// static void PlatformThread::SetCurrentThreadPriority(ThreadPriority priority) { #if defined(OS_NACL) NOTIMPLEMENTED(); diff --git a/base/threading/platform_thread_unittest.cc b/base/threading/platform_thread_unittest.cc index 2d99ed8750..0febf8ba9b 100644 --- a/base/threading/platform_thread_unittest.cc +++ b/base/threading/platform_thread_unittest.cc @@ -12,8 +12,6 @@ #include "testing/gtest/include/gtest/gtest.h" #if defined(OS_POSIX) -#include <sys/types.h> -#include <unistd.h> #include "base/threading/platform_thread_internal_posix.h" #elif defined(OS_WIN) #include <windows.h> @@ -235,17 +233,6 @@ const ThreadPriority kThreadPriorityTestValues[] = { ThreadPriority::NORMAL, ThreadPriority::BACKGROUND}; -bool IsBumpingPriorityAllowed() { -#if defined(OS_POSIX) - // Only root can raise thread priority on POSIX environment. On Linux, users - // who have CAP_SYS_NICE permission also can raise the thread priority, but - // libcap.so would be needed to check the capability. - return geteuid() == 0; -#else - return true; -#endif -} - class ThreadPriorityTestThread : public FunctionTestThread { public: explicit ThreadPriorityTestThread(ThreadPriority priority) @@ -273,8 +260,9 @@ class ThreadPriorityTestThread : public FunctionTestThread { // Test changing a created thread's priority (which has different semantics on // some platforms). TEST(PlatformThreadTest, ThreadPriorityCurrentThread) { - const bool bumping_priority_allowed = IsBumpingPriorityAllowed(); - if (bumping_priority_allowed) { + const bool increase_priority_allowed = + PlatformThread::CanIncreaseCurrentThreadPriority(); + if (increase_priority_allowed) { // Bump the priority in order to verify that new threads are started with // normal priority. PlatformThread::SetCurrentThreadPriority(ThreadPriority::DISPLAY); @@ -282,7 +270,7 @@ TEST(PlatformThreadTest, ThreadPriorityCurrentThread) { // Toggle each supported priority on the thread and confirm it affects it. for (size_t i = 0; i < arraysize(kThreadPriorityTestValues); ++i) { - if (!bumping_priority_allowed && + if (!increase_priority_allowed && kThreadPriorityTestValues[i] > PlatformThread::GetCurrentThreadPriority()) { continue; diff --git a/base/threading/post_task_and_reply_impl.cc b/base/threading/post_task_and_reply_impl.cc index c906866cfb..d16f8bd225 100644 --- a/base/threading/post_task_and_reply_impl.cc +++ b/base/threading/post_task_and_reply_impl.cc @@ -4,42 +4,46 @@ #include "base/threading/post_task_and_reply_impl.h" +#include <utility> + #include "base/bind.h" -#include "base/location.h" -#include "base/single_thread_task_runner.h" -#include "base/threading/thread_task_runner_handle.h" +#include "base/debug/leak_annotations.h" +#include "base/logging.h" +#include "base/memory/ref_counted.h" +#include "base/sequence_checker.h" +#include "base/sequenced_task_runner.h" +#include "base/threading/sequenced_task_runner_handle.h" namespace base { namespace { -// This relay class remembers the MessageLoop that it was created on, and -// ensures that both the |task| and |reply| Closures are deleted on this same -// thread. Also, |task| is guaranteed to be deleted before |reply| is run or -// deleted. +// This relay class remembers the sequence that it was created on, and ensures +// that both the |task| and |reply| Closures are deleted on this same sequence. +// Also, |task| is guaranteed to be deleted before |reply| is run or deleted. // -// If this is not possible because the originating MessageLoop is no longer -// available, the the |task| and |reply| Closures are leaked. Leaking is -// considered preferable to having a thread-safetey violations caused by -// invoking the Closure destructor on the wrong thread. +// If RunReplyAndSelfDestruct() doesn't run because the originating execution +// context is no longer available, then the |task| and |reply| Closures are +// leaked. Leaking is considered preferable to having a thread-safetey +// violations caused by invoking the Closure destructor on the wrong sequence. class PostTaskAndReplyRelay { public: PostTaskAndReplyRelay(const tracked_objects::Location& from_here, - const Closure& task, - const Closure& reply) - : from_here_(from_here), - origin_task_runner_(ThreadTaskRunnerHandle::Get()) { - task_ = task; - reply_ = reply; - } + Closure task, + Closure reply) + : sequence_checker_(), + from_here_(from_here), + origin_task_runner_(SequencedTaskRunnerHandle::Get()), + reply_(std::move(reply)), + task_(std::move(task)) {} ~PostTaskAndReplyRelay() { - DCHECK(origin_task_runner_->BelongsToCurrentThread()); + DCHECK(sequence_checker_.CalledOnValidSequence()); task_.Reset(); reply_.Reset(); } - void Run() { + void RunTaskAndPostReply() { task_.Run(); origin_task_runner_->PostTask( from_here_, Bind(&PostTaskAndReplyRelay::RunReplyAndSelfDestruct, @@ -48,7 +52,7 @@ class PostTaskAndReplyRelay { private: void RunReplyAndSelfDestruct() { - DCHECK(origin_task_runner_->BelongsToCurrentThread()); + DCHECK(sequence_checker_.CalledOnValidSequence()); // Force |task_| to be released before |reply_| is to ensure that no one // accidentally depends on |task_| keeping one of its arguments alive while @@ -61,8 +65,9 @@ class PostTaskAndReplyRelay { delete this; } - tracked_objects::Location from_here_; - scoped_refptr<SingleThreadTaskRunner> origin_task_runner_; + const SequenceChecker sequence_checker_; + const tracked_objects::Location from_here_; + const scoped_refptr<SequencedTaskRunner> origin_task_runner_; Closure reply_; Closure task_; }; @@ -73,14 +78,19 @@ namespace internal { bool PostTaskAndReplyImpl::PostTaskAndReply( const tracked_objects::Location& from_here, - const Closure& task, - const Closure& reply) { - // TODO(tzik): Use DCHECK here once the crash is gone. http://crbug.com/541319 - CHECK(!task.is_null()) << from_here.ToString(); - CHECK(!reply.is_null()) << from_here.ToString(); + Closure task, + Closure reply) { + DCHECK(!task.is_null()) << from_here.ToString(); + DCHECK(!reply.is_null()) << from_here.ToString(); PostTaskAndReplyRelay* relay = - new PostTaskAndReplyRelay(from_here, task, reply); - if (!PostTask(from_here, Bind(&PostTaskAndReplyRelay::Run, + new PostTaskAndReplyRelay(from_here, std::move(task), std::move(reply)); + // PostTaskAndReplyRelay self-destructs after executing |reply|. On the flip + // side though, it is intentionally leaked if the |task| doesn't complete + // before the origin sequence stops executing tasks. Annotate |relay| as leaky + // to avoid having to suppress every callsite which happens to flakily trigger + // this race. + ANNOTATE_LEAKING_OBJECT_PTR(relay); + if (!PostTask(from_here, Bind(&PostTaskAndReplyRelay::RunTaskAndPostReply, Unretained(relay)))) { delete relay; return false; diff --git a/base/threading/post_task_and_reply_impl.h b/base/threading/post_task_and_reply_impl.h index d21ab78de8..696b668a4c 100644 --- a/base/threading/post_task_and_reply_impl.h +++ b/base/threading/post_task_and_reply_impl.h @@ -8,30 +8,29 @@ #ifndef BASE_THREADING_POST_TASK_AND_REPLY_IMPL_H_ #define BASE_THREADING_POST_TASK_AND_REPLY_IMPL_H_ -#include "base/callback_forward.h" +#include "base/base_export.h" +#include "base/callback.h" #include "base/location.h" namespace base { namespace internal { -// Inherit from this in a class that implements PostTask appropriately -// for sending to a destination thread. +// Inherit from this in a class that implements PostTask to send a task to a +// custom execution context. // -// Note that 'reply' will always get posted back to your current -// MessageLoop. -// -// If you're looking for a concrete implementation of -// PostTaskAndReply, you probably want base::SingleThreadTaskRunner, or you -// may want base::WorkerPool. -class PostTaskAndReplyImpl { +// If you're looking for a concrete implementation of PostTaskAndReply, you +// probably want base::TaskRunner, or you may want base::WorkerPool. +class BASE_EXPORT PostTaskAndReplyImpl { public: virtual ~PostTaskAndReplyImpl() = default; - // Implementation for TaskRunner::PostTaskAndReply and - // WorkerPool::PostTaskAndReply. + // Posts |task| by calling PostTask(). On completion, |reply| is posted to the + // sequence or thread that called this. Can only be called when + // SequencedTaskRunnerHandle::IsSet(). Both |task| and |reply| are guaranteed + // to be deleted on the sequence or thread that called this. bool PostTaskAndReply(const tracked_objects::Location& from_here, - const Closure& task, - const Closure& reply); + Closure task, + Closure reply); private: virtual bool PostTask(const tracked_objects::Location& from_here, diff --git a/base/threading/sequenced_task_runner_handle.cc b/base/threading/sequenced_task_runner_handle.cc index 88b36a8d64..90f68b33ab 100644 --- a/base/threading/sequenced_task_runner_handle.cc +++ b/base/threading/sequenced_task_runner_handle.cc @@ -16,39 +16,56 @@ namespace base { namespace { -base::LazyInstance<base::ThreadLocalPointer<SequencedTaskRunnerHandle>>::Leaky +LazyInstance<ThreadLocalPointer<SequencedTaskRunnerHandle>>::Leaky lazy_tls_ptr = LAZY_INSTANCE_INITIALIZER; } // namespace // static scoped_refptr<SequencedTaskRunner> SequencedTaskRunnerHandle::Get() { + // Return the registered SingleThreadTaskRunner, if any. This must be at the + // top so that a SingleThreadTaskRunner has priority over a + // SequencedTaskRunner (RLZ registers both on the same thread despite that + // being prevented by DCHECKs). + // TODO(fdoray): Move this to the bottom once RLZ stops registering a + // SingleThreadTaskRunner and a SequencedTaskRunner on the same thread. + // https://crbug.com/618530#c14 + if (ThreadTaskRunnerHandle::IsSet()) { + // Various modes of setting SequencedTaskRunnerHandle don't combine. + DCHECK(!lazy_tls_ptr.Pointer()->Get()); + DCHECK(!SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid()); + + return ThreadTaskRunnerHandle::Get(); + } + // Return the registered SequencedTaskRunner, if any. const SequencedTaskRunnerHandle* handle = lazy_tls_ptr.Pointer()->Get(); if (handle) { // Various modes of setting SequencedTaskRunnerHandle don't combine. - DCHECK(!base::ThreadTaskRunnerHandle::IsSet()); - DCHECK(!SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread()); - return handle->task_runner_; - } + DCHECK(!SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid()); - // Return the SequencedTaskRunner obtained from SequencedWorkerPool, if any. - scoped_refptr<base::SequencedTaskRunner> task_runner = - SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread(); - if (task_runner) { - DCHECK(!base::ThreadTaskRunnerHandle::IsSet()); - return task_runner; + return handle->task_runner_; } - // Return the SingleThreadTaskRunner for the current thread otherwise. - return base::ThreadTaskRunnerHandle::Get(); + // If we are on a worker thread for a SequencedBlockingPool that is running a + // sequenced task, return a SequencedTaskRunner for it. + scoped_refptr<SequencedWorkerPool> pool = + SequencedWorkerPool::GetWorkerPoolForCurrentThread(); + DCHECK(pool); + SequencedWorkerPool::SequenceToken sequence_token = + SequencedWorkerPool::GetSequenceTokenForCurrentThread(); + DCHECK(sequence_token.IsValid()); + scoped_refptr<SequencedTaskRunner> sequenced_task_runner( + pool->GetSequencedTaskRunner(sequence_token)); + DCHECK(sequenced_task_runner->RunsTasksOnCurrentThread()); + return sequenced_task_runner; } // static bool SequencedTaskRunnerHandle::IsSet() { return lazy_tls_ptr.Pointer()->Get() || - SequencedWorkerPool::GetWorkerPoolForCurrentThread() || - base::ThreadTaskRunnerHandle::IsSet(); + SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid() || + ThreadTaskRunnerHandle::IsSet(); } SequencedTaskRunnerHandle::SequencedTaskRunnerHandle( diff --git a/base/threading/sequenced_task_runner_handle.h b/base/threading/sequenced_task_runner_handle.h index e6dec1e9f8..b7f4bae8aa 100644 --- a/base/threading/sequenced_task_runner_handle.h +++ b/base/threading/sequenced_task_runner_handle.h @@ -25,8 +25,9 @@ class BASE_EXPORT SequencedTaskRunnerHandle { // instantiating a SequencedTaskRunnerHandle. // b) The current thread has a ThreadTaskRunnerHandle (which includes any // thread that has a MessageLoop associated with it), or - // c) The current thread is a worker thread belonging to a - // SequencedWorkerPool. + // c) The current thread is a worker thread belonging to a SequencedWorkerPool + // *and* is currently running a sequenced task (note: not supporting + // unsequenced tasks is intentional: https://crbug.com/618043#c4). static bool IsSet(); // Binds |task_runner| to the current thread. diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc index 57961b5cd5..ce594cd7fb 100644 --- a/base/threading/sequenced_worker_pool.cc +++ b/base/threading/sequenced_worker_pool.cc @@ -10,6 +10,7 @@ #include <map> #include <memory> #include <set> +#include <unordered_map> #include <utility> #include <vector> @@ -17,6 +18,7 @@ #include "base/callback.h" #include "base/compiler_specific.h" #include "base/critical_closure.h" +#include "base/debug/dump_without_crashing.h" #include "base/lazy_instance.h" #include "base/logging.h" #include "base/macros.h" @@ -25,15 +27,21 @@ #include "base/strings/stringprintf.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" +// Don't enable the redirect to TaskScheduler on Arc++ to avoid pulling a bunch +// of dependencies. Some code also #ifdef'ed below. +#if 0 +#include "base/task_scheduler/post_task.h" +#include "base/task_scheduler/task_scheduler.h" +#endif #include "base/threading/platform_thread.h" +#include "base/threading/sequenced_task_runner_handle.h" #include "base/threading/simple_thread.h" #include "base/threading/thread_local.h" #include "base/threading/thread_restrictions.h" -#include "base/threading/thread_task_runner_handle.h" #include "base/time/time.h" -#include "base/trace_event/heap_profiler.h" #include "base/trace_event/trace_event.h" #include "base/tracked_objects.h" +#include "base/tracking_info.h" #include "build/build_config.h" #if defined(OS_MACOSX) @@ -43,13 +51,36 @@ #endif #if !defined(OS_NACL) -#include "base/metrics/histogram.h" +#include "base/metrics/histogram_macros.h" #endif namespace base { namespace { +// An enum representing the state of all pools. A non-test process should only +// ever transition from POST_TASK_DISABLED to one of the active states. A test +// process may transition from one of the active states to POST_TASK_DISABLED +// when DisableForProcessForTesting() is called. +// +// External memory synchronization is required to call a method that reads +// |g_all_pools_state| after calling a method that modifies it. +// +// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool +// will be phased out completely otherwise). +enum class AllPoolsState { + POST_TASK_DISABLED, + USE_WORKER_POOL, + REDIRECTED_TO_TASK_SCHEDULER, +}; + +// TODO(fdoray): Change the initial state to POST_TASK_DISABLED. It is initially +// USE_WORKER_POOL to avoid a revert of the CL that adds +// debug::DumpWithoutCrashing() in case of waterfall failures. +AllPoolsState g_all_pools_state = AllPoolsState::USE_WORKER_POOL; + +TaskPriority g_max_task_priority = TaskPriority::HIGHEST; + struct SequencedTask : public TrackingInfo { SequencedTask() : sequence_token_id(0), @@ -92,6 +123,14 @@ struct SequencedTaskLessThan { } }; +// Create a process-wide unique ID to represent this task in trace events. This +// will be mangled with a Process ID hash to reduce the likelyhood of colliding +// with MessageLoop pointers on other processes. +uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) { + return (static_cast<uint64_t>(task.trace_id) << 32) | + static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool)); +} + // SequencedWorkerPoolTaskRunner --------------------------------------------- // A TaskRunner which posts tasks to a SequencedWorkerPool with a // fixed ShutdownBehavior. @@ -142,14 +181,17 @@ bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { return pool_->RunsTasksOnCurrentThread(); } -// SequencedWorkerPoolSequencedTaskRunner ------------------------------------ +} // namespace + +// SequencedWorkerPool::PoolSequencedTaskRunner ------------------------------ // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a // fixed sequence token. // // Note that this class is RefCountedThreadSafe (inherited from TaskRunner). -class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { +class SequencedWorkerPool::PoolSequencedTaskRunner + : public SequencedTaskRunner { public: - SequencedWorkerPoolSequencedTaskRunner( + PoolSequencedTaskRunner( scoped_refptr<SequencedWorkerPool> pool, SequencedWorkerPool::SequenceToken token, SequencedWorkerPool::WorkerShutdown shutdown_behavior); @@ -166,7 +208,7 @@ class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { TimeDelta delay) override; private: - ~SequencedWorkerPoolSequencedTaskRunner() override; + ~PoolSequencedTaskRunner() override; const scoped_refptr<SequencedWorkerPool> pool_; @@ -174,25 +216,25 @@ class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; - DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); + DISALLOW_COPY_AND_ASSIGN(PoolSequencedTaskRunner); }; -SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( - scoped_refptr<SequencedWorkerPool> pool, - SequencedWorkerPool::SequenceToken token, - SequencedWorkerPool::WorkerShutdown shutdown_behavior) +SequencedWorkerPool::PoolSequencedTaskRunner:: + PoolSequencedTaskRunner( + scoped_refptr<SequencedWorkerPool> pool, + SequencedWorkerPool::SequenceToken token, + SequencedWorkerPool::WorkerShutdown shutdown_behavior) : pool_(std::move(pool)), token_(token), shutdown_behavior_(shutdown_behavior) {} -SequencedWorkerPoolSequencedTaskRunner:: -~SequencedWorkerPoolSequencedTaskRunner() { -} +SequencedWorkerPool::PoolSequencedTaskRunner:: + ~PoolSequencedTaskRunner() = default; -bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( - const tracked_objects::Location& from_here, - const Closure& task, - TimeDelta delay) { +bool SequencedWorkerPool::PoolSequencedTaskRunner:: + PostDelayedTask(const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { if (delay.is_zero()) { return pool_->PostSequencedWorkerTaskWithShutdownBehavior( token_, from_here, task, shutdown_behavior_); @@ -200,29 +242,20 @@ bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); } -bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { +bool SequencedWorkerPool::PoolSequencedTaskRunner:: + RunsTasksOnCurrentThread() const { return pool_->IsRunningSequenceOnCurrentThread(token_); } -bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( - const tracked_objects::Location& from_here, - const Closure& task, - TimeDelta delay) { +bool SequencedWorkerPool::PoolSequencedTaskRunner:: + PostNonNestableDelayedTask(const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { // There's no way to run nested tasks, so simply forward to // PostDelayedTask. return PostDelayedTask(from_here, task, delay); } -// Create a process-wide unique ID to represent this task in trace events. This -// will be mangled with a Process ID hash to reduce the likelyhood of colliding -// with MessageLoop pointers on other processes. -uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) { - return (static_cast<uint64_t>(task.trace_id) << 32) | - static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool)); -} - -} // namespace - // Worker --------------------------------------------------------------------- class SequencedWorkerPool::Worker : public SimpleThread { @@ -247,6 +280,14 @@ class SequencedWorkerPool::Worker : public SimpleThread { is_processing_task_ = true; task_sequence_token_ = token; task_shutdown_behavior_ = shutdown_behavior; + + // It is dangerous for tasks with CONTINUE_ON_SHUTDOWN to access a class + // that implements a non-leaky base::Singleton because they are generally + // destroyed before the process terminates via an AtExitManager + // registration. This will trigger a DCHECK to warn of such cases. See the + // comment about CONTINUE_ON_SHUTDOWN for more details. + ThreadRestrictions::SetSingletonAllowed(task_shutdown_behavior_ != + CONTINUE_ON_SHUTDOWN); } // Indicates that the task has finished running. @@ -292,8 +333,10 @@ class SequencedWorkerPool::Inner { public: // Take a raw pointer to |worker| to avoid cycles (since we're owned // by it). - Inner(SequencedWorkerPool* worker_pool, size_t max_threads, + Inner(SequencedWorkerPool* worker_pool, + size_t max_threads, const std::string& thread_name_prefix, + base::TaskPriority task_priority, TestingObserver* observer); ~Inner(); @@ -316,11 +359,6 @@ class SequencedWorkerPool::Inner { bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; - bool IsRunningSequence(SequenceToken sequence_token) const; - - void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token, - WorkerShutdown shutdown_behavior); - void CleanupForTesting(); void SignalHasWorkForTesting(); @@ -349,6 +387,25 @@ class SequencedWorkerPool::Inner { CLEANUP_DONE, }; + // Clears ScheduledTasks in |tasks_to_delete| while ensuring that + // |this_worker| has the desired task info context during ~ScheduledTask() to + // allow sequence-checking. + void DeleteWithoutLock(std::vector<SequencedTask>* tasks_to_delete, + Worker* this_worker); + + // Helper used by PostTask() to complete the work when redirection is on. + // Returns true if the task may run at some point in the future and false if + // it will definitely not run. + // Coalesce upon resolution of http://crbug.com/622400. + bool PostTaskToTaskScheduler(const SequencedTask& sequenced, + const TimeDelta& delay); + + // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id| + // and |traits|. + scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner( + int sequence_token_id, + const TaskTraits& traits); + // Called from within the lock, this converts the given token name into a // token ID, creating a new one if necessary. int LockedGetNamedTokenID(const std::string& name); @@ -373,7 +430,7 @@ class SequencedWorkerPool::Inner { // See the implementation for a more detailed description. GetWorkStatus GetWork(SequencedTask* task, TimeDelta* wait_time, - std::vector<Closure>* delete_these_outside_lock); + std::vector<SequencedTask>* delete_these_outside_lock); void HandleCleanup(); @@ -397,7 +454,7 @@ class SequencedWorkerPool::Inner { // 0 or more. The caller should then call FinishStartingAdditionalThread to // complete initialization once the lock is released. // - // If another thread is not necessary, returne 0; + // If another thread is not necessary, return 0; // // See the implementedion for more. int PrepareToStartAdditionalThreadIfHelpful(); @@ -497,6 +554,27 @@ class SequencedWorkerPool::Inner { TestingObserver* const testing_observer_; + // Members below are used for the experimental redirection to TaskScheduler. + // TODO(gab): Remove these if http://crbug.com/622400 fails + // (SequencedWorkerPool will be phased out completely otherwise). + + // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the + // TaskScheduler as an experiment (unused otherwise). + const base::TaskPriority task_priority_; + + // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect + // sequenced tasks to the TaskScheduler. + std::unordered_map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_; + + // TaskScheduler TaskRunners to redirect unsequenced tasks to the + // TaskScheduler. Indexed by TaskShutdownBehavior. + scoped_refptr<TaskRunner> unsequenced_task_runners_[3]; + + // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as + // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread(). + // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread(). + mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_; + DISALLOW_COPY_AND_ASSIGN(Inner); }; @@ -510,6 +588,7 @@ SequencedWorkerPool::Worker::Worker( worker_pool_(std::move(worker_pool)), task_shutdown_behavior_(BLOCK_SHUTDOWN), is_processing_task_(false) { + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); Start(); } @@ -517,6 +596,8 @@ SequencedWorkerPool::Worker::~Worker() { } void SequencedWorkerPool::Worker::Run() { + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); + #if defined(OS_WIN) win::ScopedCOMInitializer com_initializer; #endif @@ -552,11 +633,11 @@ LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky // Inner definitions --------------------------------------------------------- -SequencedWorkerPool::Inner::Inner( - SequencedWorkerPool* worker_pool, - size_t max_threads, - const std::string& thread_name_prefix, - TestingObserver* observer) +SequencedWorkerPool::Inner::Inner(SequencedWorkerPool* worker_pool, + size_t max_threads, + const std::string& thread_name_prefix, + base::TaskPriority task_priority, + TestingObserver* observer) : worker_pool_(worker_pool), lock_(), has_work_cv_(&lock_), @@ -574,7 +655,13 @@ SequencedWorkerPool::Inner::Inner( cleanup_state_(CLEANUP_DONE), cleanup_idlers_(0), cleanup_cv_(&lock_), - testing_observer_(observer) {} + testing_observer_(observer), + task_priority_(static_cast<int>(task_priority) <= + static_cast<int>(g_max_task_priority) + ? task_priority + : g_max_task_priority) { + DCHECK_GT(max_threads_, 1U); +} SequencedWorkerPool::Inner::~Inner() { // You must call Shutdown() before destroying the pool. @@ -611,6 +698,13 @@ bool SequencedWorkerPool::Inner::PostTask( const tracked_objects::Location& from_here, const Closure& task, TimeDelta delay) { + // TODO(fdoray): Uncomment this DCHECK. It is initially commented to avoid a + // revert of the CL that adds debug::DumpWithoutCrashing() if it fails on the + // waterfall. https://crbug.com/622400 + // DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); + if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED) + debug::DumpWithoutCrashing(); + DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN); SequencedTask sequenced(from_here); sequenced.sequence_token_id = sequence_token.id_; @@ -624,6 +718,7 @@ bool SequencedWorkerPool::Inner::PostTask( int create_thread_id = 0; { AutoLock lock(lock_); + if (shutdown_called_) { // Don't allow a new task to be posted if it doesn't block shutdown. if (shutdown_behavior != BLOCK_SHUTDOWN) @@ -659,64 +754,178 @@ bool SequencedWorkerPool::Inner::PostTask( if (optional_token_name) sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); - pending_tasks_.insert(sequenced); - if (shutdown_behavior == BLOCK_SHUTDOWN) - blocking_shutdown_pending_task_count_++; + // See on top of the file why we don't compile this on Arc++. +#if 0 + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { + if (!PostTaskToTaskScheduler(sequenced, delay)) + return false; + } else { +#endif + pending_tasks_.insert(sequenced); - create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); + if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN) + blocking_shutdown_pending_task_count_++; + + create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); + } +#if 0 } +#endif - // Actually start the additional thread or signal an existing one now that - // we're outside the lock. - if (create_thread_id) - FinishStartingAdditionalThread(create_thread_id); - else - SignalHasWork(); + // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure + // correct behavior if a task is posted to a SequencedWorkerPool before + // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build. + if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { + // Actually start the additional thread or signal an existing one outside + // the lock. + if (create_thread_id) + FinishStartingAdditionalThread(create_thread_id); + else + SignalHasWork(); + } + +#if DCHECK_IS_ON() + { + AutoLock lock_for_dcheck(lock_); + // Some variables are exposed in both modes for convenience but only really + // intended for one of them at runtime, confirm exclusive usage here. + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { + DCHECK(pending_tasks_.empty()); + DCHECK_EQ(0, create_thread_id); + } else { + DCHECK(sequenced_task_runner_map_.empty()); + } + } +#endif // DCHECK_IS_ON() return true; } -bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { - AutoLock lock(lock_); - return ContainsKey(threads_, PlatformThread::CurrentId()); +bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler( + const SequencedTask& sequenced, + const TimeDelta& delay) { +#if 1 + NOTREACHED(); + ALLOW_UNUSED_PARAM(sequenced); + ALLOW_UNUSED_PARAM(delay); + return false; +#else + DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); + + lock_.AssertAcquired(); + + // Confirm that the TaskScheduler's shutdown behaviors use the same + // underlying values as SequencedWorkerPool. + static_assert( + static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == + static_cast<int>(CONTINUE_ON_SHUTDOWN), + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " + "CONTINUE_ON_SHUTDOWN."); + static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == + static_cast<int>(SKIP_ON_SHUTDOWN), + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " + "SKIP_ON_SHUTDOWN."); + static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == + static_cast<int>(BLOCK_SHUTDOWN), + "TaskShutdownBehavior and WorkerShutdown enum mismatch for " + "BLOCK_SHUTDOWN."); + + const TaskShutdownBehavior task_shutdown_behavior = + static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior); + const TaskTraits traits = TaskTraits() + .MayBlock() + .WithBaseSyncPrimitives() + .WithPriority(task_priority_) + .WithShutdownBehavior(task_shutdown_behavior); + return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits) + ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay); +#endif } -bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( - SequenceToken sequence_token) const { +scoped_refptr<TaskRunner> +SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner( + int sequence_token_id, + const TaskTraits& traits) { +#if 1 + NOTREACHED(); + ALLOW_UNUSED_PARAM(sequence_token_id); + ALLOW_UNUSED_PARAM(traits); + return scoped_refptr<TaskRunner>(); +#else + DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); + + lock_.AssertAcquired(); + + static_assert( + static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0, + "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be " + "used as an index in |unsequenced_task_runners_|."); + static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1, + "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to " + "be used as an index in |unsequenced_task_runners_|."); + static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == 2, + "TaskShutdownBehavior::BLOCK_SHUTDOWN must be equal to 2 to be " + "used as an index in |unsequenced_task_runners_|."); + static_assert(arraysize(unsequenced_task_runners_) == 3, + "The size of |unsequenced_task_runners_| doesn't match the " + "number of shutdown behaviors."); + + scoped_refptr<TaskRunner>& task_runner = + sequence_token_id ? sequenced_task_runner_map_[sequence_token_id] + : unsequenced_task_runners_[static_cast<int>( + traits.shutdown_behavior())]; + + // TODO(fdoray): DCHECK that all tasks posted to the same sequence have the + // same shutdown behavior. + + if (!task_runner) { + task_runner = sequence_token_id + ? CreateSequencedTaskRunnerWithTraits(traits) + : CreateTaskRunnerWithTraits(traits); + } + + return task_runner; +#endif +} + +bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { AutoLock lock(lock_); - ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); - if (found == threads_.end()) - return false; - return found->second->is_processing_task() && - sequence_token.Equals(found->second->task_sequence_token()); + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { +#if 0 + if (!runs_tasks_on_verifier_) { + runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits( + TaskTraits().MayBlock().WithBaseSyncPrimitives().WithPriority( + task_priority_)); + } +#endif + return runs_tasks_on_verifier_->RunsTasksOnCurrentThread(); + } else { + return ContainsKey(threads_, PlatformThread::CurrentId()); + } } -bool SequencedWorkerPool::Inner::IsRunningSequence( +bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( SequenceToken sequence_token) const { DCHECK(sequence_token.IsValid()); - AutoLock lock(lock_); - return !IsSequenceTokenRunnable(sequence_token.id_); -} -void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread( - SequenceToken sequence_token, - WorkerShutdown shutdown_behavior) { AutoLock lock(lock_); - ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); - DCHECK(found != threads_.end()); - DCHECK(found->second->is_processing_task()); - DCHECK(!found->second->task_sequence_token().IsValid()); - found->second->set_running_task_info(sequence_token, shutdown_behavior); - // Mark the sequence token as in use. - bool success = current_sequences_.insert(sequence_token.id_).second; - DCHECK(success); + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { + const auto sequenced_task_runner_it = + sequenced_task_runner_map_.find(sequence_token.id_); + return sequenced_task_runner_it != sequenced_task_runner_map_.end() && + sequenced_task_runner_it->second->RunsTasksOnCurrentThread(); + } else { + ThreadMap::const_iterator found = + threads_.find(PlatformThread::CurrentId()); + return found != threads_.end() && found->second->is_processing_task() && + sequence_token.Equals(found->second->task_sequence_token()); + } } // See https://code.google.com/p/chromium/issues/detail?id=168415 void SequencedWorkerPool::Inner::CleanupForTesting() { - DCHECK(!RunsTasksOnCurrentThread()); - base::ThreadRestrictions::ScopedAllowWait allow_wait; + DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER); AutoLock lock(lock_); CHECK_EQ(CLEANUP_DONE, cleanup_state_); if (shutdown_called_) @@ -744,8 +953,12 @@ void SequencedWorkerPool::Inner::Shutdown( if (shutdown_called_) return; shutdown_called_ = true; + max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; + if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL) + return; + // Tickle the threads. This will wake up a waiting one so it will know that // it can exit, which in turn will wake up any other waiting ones. SignalHasWork(); @@ -783,6 +996,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() { } void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); { AutoLock lock(lock_); DCHECK(thread_being_created_); @@ -801,18 +1015,15 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { // See GetWork for what delete_these_outside_lock is doing. SequencedTask task; TimeDelta wait_time; - std::vector<Closure> delete_these_outside_lock; + std::vector<SequencedTask> delete_these_outside_lock; GetWorkStatus status = GetWork(&task, &wait_time, &delete_these_outside_lock); if (status == GET_WORK_FOUND) { - TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), - "SequencedWorkerPool::Inner::ThreadLoop", + TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task); + TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), + "SequencedWorkerPool::Inner::PostTask", TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))), - TRACE_EVENT_FLAG_FLOW_IN, - "src_file", task.posted_from.file_name(), - "src_func", task.posted_from.function_name()); - TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION task_event( - task.posted_from.file_name()); + TRACE_EVENT_FLAG_FLOW_IN); int new_thread_id = WillRunWorkerTask(task); { AutoUnlock unlock(lock_); @@ -821,7 +1032,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { // already get a signal for each new task, but it doesn't // hurt.) SignalHasWork(); - delete_these_outside_lock.clear(); + DeleteWithoutLock(&delete_these_outside_lock, this_worker); // Complete thread creation outside the lock if necessary. if (new_thread_id) @@ -838,11 +1049,6 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking( task, stopwatch); - // Update the sequence token in case it has been set from within the - // task, so it can be removed from the set of currently running - // sequences in DidRunWorkerTask() below. - task.sequence_token_id = this_worker->task_sequence_token().id_; - // Make sure our task is erased outside the lock for the // same reason we do this with delete_these_oustide_lock. // Also, do it before calling reset_running_task_info() so @@ -857,7 +1063,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { switch (status) { case GET_WORK_WAIT: { AutoUnlock unlock(lock_); - delete_these_outside_lock.clear(); + DeleteWithoutLock(&delete_these_outside_lock, this_worker); } break; case GET_WORK_NOT_FOUND: @@ -879,7 +1085,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { // help this case. if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) { AutoUnlock unlock(lock_); - delete_these_outside_lock.clear(); + DeleteWithoutLock(&delete_these_outside_lock, this_worker); break; } @@ -887,7 +1093,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { // deletion must happen outside of the lock. if (delete_these_outside_lock.size()) { AutoUnlock unlock(lock_); - delete_these_outside_lock.clear(); + DeleteWithoutLock(&delete_these_outside_lock, this_worker); // Since the lock has been released, |status| may no longer be // accurate. It might read GET_WORK_WAIT even if there are tasks @@ -910,6 +1116,9 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { } waiting_thread_count_--; } + // |delete_these_outside_lock| should have been cleared via + // DeleteWithoutLock() above already. + DCHECK(delete_these_outside_lock.empty()); } } // Release lock_. @@ -921,7 +1130,22 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { can_shutdown_cv_.Signal(); } +void SequencedWorkerPool::Inner::DeleteWithoutLock( + std::vector<SequencedTask>* tasks_to_delete, + Worker* this_worker) { + while (!tasks_to_delete->empty()) { + const SequencedTask& deleted_task = tasks_to_delete->back(); + this_worker->set_running_task_info( + SequenceToken(deleted_task.sequence_token_id), + deleted_task.shutdown_behavior); + tasks_to_delete->pop_back(); + } + this_worker->reset_running_task_info(); +} + void SequencedWorkerPool::Inner::HandleCleanup() { + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); + lock_.AssertAcquired(); if (cleanup_state_ == CLEANUP_DONE) return; @@ -986,7 +1210,9 @@ int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( SequencedTask* task, TimeDelta* wait_time, - std::vector<Closure>* delete_these_outside_lock) { + std::vector<SequencedTask>* delete_these_outside_lock) { + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); + lock_.AssertAcquired(); // Find the next task with a sequence token that's not currently in use. @@ -1030,18 +1256,17 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( // shutdown. Delete it and get more work. // // Note that we do not want to delete unrunnable tasks. Deleting a task - // can have side effects (like freeing some objects) and deleting a - // task that's supposed to run after one that's currently running could - // cause an obscure crash. + // can have side effects (like freeing some objects) and deleting a task + // that's supposed to run after one that's currently running could cause + // an obscure crash. // // We really want to delete these tasks outside the lock in case the - // closures are holding refs to objects that want to post work from - // their destructorss (which would deadlock). The closures are - // internally refcounted, so we just need to keep a copy of them alive - // until the lock is exited. The calling code can just clear() the - // vector they passed to us once the lock is exited to make this - // happen. - delete_these_outside_lock->push_back(i->task); + // closures are holding refs to objects that want to post work from their + // destructors (which would deadlock). The closures are internally + // refcounted, so we just need to keep a copy of them alive until the lock + // is exited. The calling code can just clear() the vector they passed to + // us once the lock is exited to make this happen. + delete_these_outside_lock->push_back(*i); pending_tasks_.erase(i++); continue; } @@ -1052,7 +1277,7 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( status = GET_WORK_WAIT; if (cleanup_state_ == CLEANUP_RUNNING) { // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. - delete_these_outside_lock->push_back(i->task); + delete_these_outside_lock->push_back(*i); pending_tasks_.erase(i); } break; @@ -1073,6 +1298,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( } int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); + lock_.AssertAcquired(); // Mark the task's sequence number as in use. @@ -1104,6 +1331,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { } void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); + lock_.AssertAcquired(); if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { @@ -1117,6 +1346,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( int sequence_token_id) const { + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); + lock_.AssertAcquired(); return !sequence_token_id || current_sequences_.find(sequence_token_id) == @@ -1124,6 +1355,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( } int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); + lock_.AssertAcquired(); // How thread creation works: // @@ -1174,6 +1407,8 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( int thread_number) { + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); + // Called outside of the lock. DCHECK_GT(thread_number, 0); @@ -1183,6 +1418,8 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( } void SequencedWorkerPool::Inner::SignalHasWork() { + DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state); + has_work_cv_.Signal(); if (testing_observer_) { testing_observer_->OnHasWork(); @@ -1190,6 +1427,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() { } bool SequencedWorkerPool::Inner::CanShutdown() const { + DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state); lock_.AssertAcquired(); // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. return !thread_being_created_ && @@ -1227,44 +1465,61 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() { } // static -scoped_refptr<SequencedTaskRunner> -SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() { - Worker* worker = Worker::GetForCurrentThread(); +void SequencedWorkerPool::EnableForProcess() { + // TODO(fdoray): Uncomment this line. It is initially commented to avoid a + // revert of the CL that adds debug::DumpWithoutCrashing() in case of + // waterfall failures. + // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); + g_all_pools_state = AllPoolsState::USE_WORKER_POOL; +} - // If there is no worker, this thread is not a worker thread. Otherwise, it is - // currently running a task (sequenced or unsequenced). - if (!worker) - return nullptr; +// static +void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess( + TaskPriority max_task_priority) { +#if 1 + NOTREACHED(); + ALLOW_UNUSED_PARAM(max_task_priority); +#else + // TODO(fdoray): Uncomment this line. It is initially commented to avoid a + // revert of the CL that adds debug::DumpWithoutCrashing() in case of + // waterfall failures. + // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state); + DCHECK(TaskScheduler::GetInstance()); + g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER; + g_max_task_priority = max_task_priority; +#endif +} - scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool(); - SequenceToken sequence_token = worker->task_sequence_token(); - WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior(); - if (!sequence_token.IsValid()) { - // Create a new sequence token and bind this thread to it, to make sure that - // a task posted to the SequencedTaskRunner we are going to return is not - // immediately going to run on a different thread. - sequence_token = Inner::GetSequenceToken(); - pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token, - shutdown_behavior); - } +// static +void SequencedWorkerPool::DisableForProcessForTesting() { + g_all_pools_state = AllPoolsState::POST_TASK_DISABLED; +} - DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token)); - return new SequencedWorkerPoolSequencedTaskRunner( - std::move(pool), sequence_token, shutdown_behavior); +// static +bool SequencedWorkerPool::IsEnabled() { + return g_all_pools_state != AllPoolsState::POST_TASK_DISABLED; } SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, - const std::string& thread_name_prefix) - : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), - inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { -} + const std::string& thread_name_prefix, + base::TaskPriority task_priority) + : constructor_task_runner_(SequencedTaskRunnerHandle::Get()), + inner_(new Inner(this, + max_threads, + thread_name_prefix, + task_priority, + NULL)) {} SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, const std::string& thread_name_prefix, + base::TaskPriority task_priority, TestingObserver* observer) - : constructor_task_runner_(ThreadTaskRunnerHandle::Get()), - inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { -} + : constructor_task_runner_(SequencedTaskRunnerHandle::Get()), + inner_(new Inner(this, + max_threads, + thread_name_prefix, + task_priority, + observer)) {} SequencedWorkerPool::~SequencedWorkerPool() {} @@ -1295,7 +1550,7 @@ scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( SequenceToken token, WorkerShutdown shutdown_behavior) { - return new SequencedWorkerPoolSequencedTaskRunner( + return new PoolSequencedTaskRunner( this, token, shutdown_behavior); } @@ -1378,18 +1633,19 @@ bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { return inner_->RunsTasksOnCurrentThread(); } -bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( - SequenceToken sequence_token) const { - return inner_->IsRunningSequenceOnCurrentThread(sequence_token); -} - -bool SequencedWorkerPool::IsRunningSequence( - SequenceToken sequence_token) const { - return inner_->IsRunningSequence(sequence_token); -} - void SequencedWorkerPool::FlushForTesting() { - inner_->CleanupForTesting(); + DCHECK(!RunsTasksOnCurrentThread()); + base::ThreadRestrictions::ScopedAllowWait allow_wait; + if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) { +#if 1 + NOTREACHED(); +#else + // TODO(gab): Remove this if http://crbug.com/622400 fails. + TaskScheduler::GetInstance()->FlushForTesting(); +#endif + } else { + inner_->CleanupForTesting(); + } } void SequencedWorkerPool::SignalHasWorkForTesting() { @@ -1397,7 +1653,7 @@ void SequencedWorkerPool::SignalHasWorkForTesting() { } void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { - DCHECK(constructor_task_runner_->BelongsToCurrentThread()); + DCHECK(constructor_task_runner_->RunsTasksOnCurrentThread()); inner_->Shutdown(max_new_blocking_tasks_after_shutdown); } @@ -1405,4 +1661,9 @@ bool SequencedWorkerPool::IsShutdownInProgress() { return inner_->IsShutdownInProgress(); } +bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( + SequenceToken sequence_token) const { + return inner_->IsRunningSequenceOnCurrentThread(sequence_token); +} + } // namespace base diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h index cbec39561a..0d42de9138 100644 --- a/base/threading/sequenced_worker_pool.h +++ b/base/threading/sequenced_worker_pool.h @@ -13,10 +13,11 @@ #include "base/base_export.h" #include "base/callback_forward.h" +#include "base/compiler_specific.h" #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/single_thread_task_runner.h" #include "base/task_runner.h" +#include "base/task_scheduler/task_traits.h" namespace tracked_objects { class Location; @@ -24,12 +25,10 @@ class Location; namespace base { -class SingleThreadTaskRunner; +class SequencedTaskRunner; template <class T> class DeleteHelper; -class SequencedTaskRunner; - // A worker thread pool that enforces ordering between sets of tasks. It also // allows you to specify what should happen to your tasks on shutdown. // @@ -47,8 +46,7 @@ class SequencedTaskRunner; // destruction will be visible to T2. // // Example: -// SequencedWorkerPool::SequenceToken token = -// SequencedWorkerPool::GetSequenceToken(); +// SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken(); // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, // FROM_HERE, base::Bind(...)); // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, @@ -61,6 +59,10 @@ class SequencedTaskRunner; // These will be executed in an unspecified order. The order of execution // between tasks with different sequence tokens is also unspecified. // +// You must call EnableForProcess() or +// EnableWithRedirectionToTaskSchedulerForProcess() before starting to post +// tasks to a process' SequencedWorkerPools. +// // This class may be leaked on shutdown to facilitate fast shutdown. The // expected usage, however, is to call Shutdown(), which correctly accounts // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN @@ -164,36 +166,65 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { // an unsequenced task, returns an invalid SequenceToken. static SequenceToken GetSequenceTokenForCurrentThread(); - // Gets a SequencedTaskRunner for the current thread. If the current thread is - // running an unsequenced task, a new SequenceToken will be generated and set, - // so that the returned SequencedTaskRunner is guaranteed to run tasks after - // the current task has finished running. - static scoped_refptr<SequencedTaskRunner> - GetSequencedTaskRunnerForCurrentThread(); + // Returns the SequencedWorkerPool that owns this thread, or null if the + // current thread is not a SequencedWorkerPool worker thread. + // + // Always returns nullptr when SequencedWorkerPool is redirected to + // TaskScheduler. + // + // DEPRECATED. Use SequencedTaskRunnerHandle::Get() instead. Consequentially + // the only remaining use case is in sequenced_task_runner_handle.cc to + // implement that and will soon be removed along with SequencedWorkerPool: + // http://crbug.com/622400. + static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread(); // Returns a unique token that can be used to sequence tasks posted to // PostSequencedWorkerTask(). Valid tokens are always nonzero. - // TODO(bauerb): Rename this to better differentiate from - // GetSequenceTokenForCurrentThread(). static SequenceToken GetSequenceToken(); - // Returns the SequencedWorkerPool that owns this thread, or null if the - // current thread is not a SequencedWorkerPool worker thread. - static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread(); + // Enables posting tasks to this process' SequencedWorkerPools. Cannot be + // called if already enabled. This is not thread-safe; proper synchronization + // is required to use any SequencedWorkerPool method after calling this. + static void EnableForProcess(); + + // Same as EnableForProcess(), but tasks are redirected to the registered + // TaskScheduler. All redirections' TaskPriority will be capped to + // |max_task_priority|. There must be a registered TaskScheduler when this is + // called. + // TODO(gab): Remove this if http://crbug.com/622400 fails + // (SequencedWorkerPool will be phased out completely otherwise). + static void EnableWithRedirectionToTaskSchedulerForProcess( + TaskPriority max_task_priority = TaskPriority::HIGHEST); + + // Disables posting tasks to this process' SequencedWorkerPools. Calling this + // while there are active SequencedWorkerPools is not supported. This is not + // thread-safe; proper synchronization is required to use any + // SequencedWorkerPool method after calling this. + static void DisableForProcessForTesting(); + + // Returns true if posting tasks to this process' SequencedWorkerPool is + // enabled (with or without redirection to TaskScheduler). + static bool IsEnabled(); // When constructing a SequencedWorkerPool, there must be a // ThreadTaskRunnerHandle on the current thread unless you plan to // deliberately leak it. - // Pass the maximum number of threads (they will be lazily created as needed) - // and a prefix for the thread name to aid in debugging. + // Constructs a SequencedWorkerPool which will lazily create up to + // |max_threads| and a prefix for the thread name to aid in debugging. + // |max_threads| must be greater than 1. |task_priority| will be used to hint + // base::TaskScheduler for an experiment in which all SequencedWorkerPool + // tasks will be redirected to it in processes where a base::TaskScheduler was + // instantiated. SequencedWorkerPool(size_t max_threads, - const std::string& thread_name_prefix); + const std::string& thread_name_prefix, + base::TaskPriority task_priority); // Like above, but with |observer| for testing. Does not take ownership of // |observer|. SequencedWorkerPool(size_t max_threads, const std::string& thread_name_prefix, + base::TaskPriority task_priority, TestingObserver* observer); // Returns the sequence token associated with the given name. Calling this @@ -207,7 +238,7 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay // are posted with BLOCK_SHUTDOWN behavior. scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner( - SequenceToken token); + SequenceToken token) WARN_UNUSED_RESULT; // Returns a SequencedTaskRunner wrapper which posts to this // SequencedWorkerPool using the given sequence token. Tasks with nonzero @@ -215,14 +246,14 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { // are posted with the given shutdown behavior. scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior( SequenceToken token, - WorkerShutdown shutdown_behavior); + WorkerShutdown shutdown_behavior) WARN_UNUSED_RESULT; // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using // the given shutdown behavior. Tasks with nonzero delay are posted with // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the // given shutdown behavior. scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior( - WorkerShutdown shutdown_behavior); + WorkerShutdown shutdown_behavior) WARN_UNUSED_RESULT; // Posts the given task for execution in the worker pool. Tasks posted with // this function will execute in an unspecified order on a background thread. @@ -316,23 +347,21 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { TimeDelta delay) override; bool RunsTasksOnCurrentThread() const override; - // Returns true if the current thread is processing a task with the given - // sequence_token. - bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; - - // Returns true if any thread is currently processing a task with the given - // sequence token. Should only be called with a valid sequence token. - bool IsRunningSequence(SequenceToken sequence_token) const; - // Blocks until all pending tasks are complete. This should only be called in // unit tests when you want to validate something that should have happened. - // This will not flush delayed tasks; delayed tasks get deleted. + // Does not wait for delayed tasks. If redirection to TaskScheduler is + // disabled, delayed tasks are deleted. If redirection to TaskScheduler is + // enabled, this will wait for all tasks posted to TaskScheduler (not just + // tasks posted to this SequencedWorkerPool). // // Note that calling this will not prevent other threads from posting work to // the queue while the calling thread is waiting on Flush(). In this case, // Flush will return only when there's no more work in the queue. Normally, // this doesn't come up since in a test, all the work is being posted from // the main thread. + // + // TODO(gab): Remove mentions of TaskScheduler in this comment if + // http://crbug.com/622400 fails. void FlushForTesting(); // Spuriously signal that there is work to be done. @@ -368,9 +397,14 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner { friend class DeleteHelper<SequencedWorkerPool>; class Inner; + class PoolSequencedTaskRunner; class Worker; - const scoped_refptr<SingleThreadTaskRunner> constructor_task_runner_; + // Returns true if the current thread is processing a task with the given + // sequence_token. + bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; + + const scoped_refptr<SequencedTaskRunner> constructor_task_runner_; // Avoid pulling in too many headers by putting (almost) everything // into |inner_|. diff --git a/base/threading/simple_thread.cc b/base/threading/simple_thread.cc index 6c64a17d6a..9eb443afab 100644 --- a/base/threading/simple_thread.cc +++ b/base/threading/simple_thread.cc @@ -12,62 +12,55 @@ namespace base { SimpleThread::SimpleThread(const std::string& name_prefix) - : name_prefix_(name_prefix), - name_(name_prefix), - thread_(), - event_(WaitableEvent::ResetPolicy::MANUAL, - WaitableEvent::InitialState::NOT_SIGNALED), - tid_(0), - joined_(false) {} + : SimpleThread(name_prefix, Options()) {} SimpleThread::SimpleThread(const std::string& name_prefix, const Options& options) : name_prefix_(name_prefix), - name_(name_prefix), options_(options), - thread_(), event_(WaitableEvent::ResetPolicy::MANUAL, - WaitableEvent::InitialState::NOT_SIGNALED), - tid_(0), - joined_(false) {} + WaitableEvent::InitialState::NOT_SIGNALED) {} SimpleThread::~SimpleThread() { DCHECK(HasBeenStarted()) << "SimpleThread was never started."; - DCHECK(HasBeenJoined()) << "SimpleThread destroyed without being Join()ed."; + DCHECK(!options_.joinable || HasBeenJoined()) + << "Joinable SimpleThread destroyed without being Join()ed."; } void SimpleThread::Start() { DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times."; - bool success; - if (options_.priority() == ThreadPriority::NORMAL) { - success = PlatformThread::Create(options_.stack_size(), this, &thread_); - } else { - success = PlatformThread::CreateWithPriority(options_.stack_size(), this, - &thread_, options_.priority()); - } + bool success = + options_.joinable + ? PlatformThread::CreateWithPriority(options_.stack_size, this, + &thread_, options_.priority) + : PlatformThread::CreateNonJoinableWithPriority( + options_.stack_size, this, options_.priority); DCHECK(success); - base::ThreadRestrictions::ScopedAllowWait allow_wait; + ThreadRestrictions::ScopedAllowWait allow_wait; event_.Wait(); // Wait for the thread to complete initialization. } void SimpleThread::Join() { + DCHECK(options_.joinable) << "A non-joinable thread can't be joined."; DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread."; DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times."; PlatformThread::Join(thread_); + thread_ = PlatformThreadHandle(); joined_ = true; } bool SimpleThread::HasBeenStarted() { - base::ThreadRestrictions::ScopedAllowWait allow_wait; + ThreadRestrictions::ScopedAllowWait allow_wait; return event_.IsSignaled(); } void SimpleThread::ThreadMain() { tid_ = PlatformThread::CurrentId(); // Construct our full name of the form "name_prefix_/TID". - name_.push_back('/'); - name_.append(IntToString(tid_)); - PlatformThread::SetName(name_); + std::string name(name_prefix_); + name.push_back('/'); + name.append(IntToString(tid_)); + PlatformThread::SetName(name); // We've initialized our new thread, signal that we're done to Start(). event_.Signal(); @@ -77,24 +70,26 @@ void SimpleThread::ThreadMain() { DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, const std::string& name_prefix) - : SimpleThread(name_prefix), - delegate_(delegate) { -} + : DelegateSimpleThread(delegate, name_prefix, Options()) {} DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, const std::string& name_prefix, const Options& options) : SimpleThread(name_prefix, options), delegate_(delegate) { + DCHECK(delegate_); } -DelegateSimpleThread::~DelegateSimpleThread() { -} +DelegateSimpleThread::~DelegateSimpleThread() = default; void DelegateSimpleThread::Run() { DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)"; - delegate_->Run(); - delegate_ = NULL; + + // Non-joinable DelegateSimpleThreads are allowed to be deleted during Run(). + // Member state must not be accessed after invoking Run(). + Delegate* delegate = delegate_; + delegate_ = nullptr; + delegate->Run(); } DelegateSimpleThreadPool::DelegateSimpleThreadPool( diff --git a/base/threading/simple_thread.h b/base/threading/simple_thread.h index 3deeb1018c..f9f5e91045 100644 --- a/base/threading/simple_thread.h +++ b/base/threading/simple_thread.h @@ -48,6 +48,7 @@ #include "base/base_export.h" #include "base/compiler_specific.h" +#include "base/macros.h" #include "base/synchronization/lock.h" #include "base/synchronization/waitable_event.h" #include "base/threading/platform_thread.h" @@ -58,25 +59,26 @@ namespace base { // virtual Run method, or you can use the DelegateSimpleThread interface. class BASE_EXPORT SimpleThread : public PlatformThread::Delegate { public: - class BASE_EXPORT Options { + struct BASE_EXPORT Options { public: - Options() : stack_size_(0), priority_(ThreadPriority::NORMAL) {} - explicit Options(ThreadPriority priority) - : stack_size_(0), priority_(priority) {} - ~Options() {} + Options() = default; + explicit Options(ThreadPriority priority_in) : priority(priority_in) {} + ~Options() = default; - // We use the standard compiler-supplied copy constructor. + // Allow copies. + Options(const Options& other) = default; + Options& operator=(const Options& other) = default; // A custom stack size, or 0 for the system default. - void set_stack_size(size_t size) { stack_size_ = size; } - size_t stack_size() const { return stack_size_; } - - // A custom thread priority. - void set_priority(ThreadPriority priority) { priority_ = priority; } - ThreadPriority priority() const { return priority_; } - private: - size_t stack_size_; - ThreadPriority priority_; + size_t stack_size = 0; + + ThreadPriority priority = ThreadPriority::NORMAL; + + // If false, the underlying thread's PlatformThreadHandle will not be kept + // around and as such the SimpleThread instance will not be Join()able and + // must not be deleted before Run() is invoked. After that, it's up to + // the subclass to determine when it is safe to delete itself. + bool joinable = true; }; // Create a SimpleThread. |options| should be used to manage any specific @@ -94,19 +96,13 @@ class BASE_EXPORT SimpleThread : public PlatformThread::Delegate { // Subclasses should override the Run method. virtual void Run() = 0; - // Return the thread name prefix, or "unnamed" if none was supplied. - std::string name_prefix() { return name_prefix_; } - - // Return the completed name including TID, only valid after Start(). - std::string name() { return name_; } - // Return the thread id, only valid after Start(). PlatformThreadId tid() { return tid_; } // Return True if Start() has ever been called. bool HasBeenStarted(); - // Return True if Join() has evern been called. + // Return True if Join() has ever been called. bool HasBeenJoined() { return joined_; } // Overridden from PlatformThread::Delegate: @@ -116,18 +112,24 @@ class BASE_EXPORT SimpleThread : public PlatformThread::Delegate { const std::string name_prefix_; std::string name_; const Options options_; - PlatformThreadHandle thread_; // PlatformThread handle, invalid after Join! + PlatformThreadHandle thread_; // PlatformThread handle, reset after Join. WaitableEvent event_; // Signaled if Start() was ever called. - PlatformThreadId tid_; // The backing thread's id. - bool joined_; // True if Join has been called. + PlatformThreadId tid_ = kInvalidThreadId; // The backing thread's id. + bool joined_ = false; // True if Join has been called. + + DISALLOW_COPY_AND_ASSIGN(SimpleThread); }; +// A SimpleThread which delegates Run() to its Delegate. Non-joinable +// DelegateSimpleThread are safe to delete after Run() was invoked, their +// Delegates are also safe to delete after that point from this class' point of +// view (although implementations must of course make sure that Run() will not +// use their Delegate's member state after its deletion). class BASE_EXPORT DelegateSimpleThread : public SimpleThread { public: class BASE_EXPORT Delegate { public: - Delegate() { } - virtual ~Delegate() { } + virtual ~Delegate() = default; virtual void Run() = 0; }; @@ -142,6 +144,8 @@ class BASE_EXPORT DelegateSimpleThread : public SimpleThread { private: Delegate* delegate_; + + DISALLOW_COPY_AND_ASSIGN(DelegateSimpleThread); }; // DelegateSimpleThreadPool allows you to start up a fixed number of threads, @@ -186,6 +190,8 @@ class BASE_EXPORT DelegateSimpleThreadPool std::queue<Delegate*> delegates_; base::Lock lock_; // Locks delegates_ WaitableEvent dry_; // Not signaled when there is no work to do. + + DISALLOW_COPY_AND_ASSIGN(DelegateSimpleThreadPool); }; } // namespace base diff --git a/base/threading/simple_thread_unittest.cc b/base/threading/simple_thread_unittest.cc index 14dd4591f1..0e52500c52 100644 --- a/base/threading/simple_thread_unittest.cc +++ b/base/threading/simple_thread_unittest.cc @@ -2,9 +2,13 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include <memory> + #include "base/atomic_sequence_num.h" +#include "base/memory/ptr_util.h" #include "base/strings/string_number_conversions.h" #include "base/synchronization/waitable_event.h" +#include "base/test/gtest_util.h" #include "base/threading/simple_thread.h" #include "testing/gtest/include/gtest/gtest.h" @@ -17,11 +21,49 @@ class SetIntRunner : public DelegateSimpleThread::Delegate { SetIntRunner(int* ptr, int val) : ptr_(ptr), val_(val) { } ~SetIntRunner() override {} + private: void Run() override { *ptr_ = val_; } - private: int* ptr_; int val_; + + DISALLOW_COPY_AND_ASSIGN(SetIntRunner); +}; + +// Signals |started_| when Run() is invoked and waits until |released_| is +// signaled to return, signaling |done_| before doing so. Useful for tests that +// care to control Run()'s flow. +class ControlledRunner : public DelegateSimpleThread::Delegate { + public: + ControlledRunner() + : started_(WaitableEvent::ResetPolicy::MANUAL, + WaitableEvent::InitialState::NOT_SIGNALED), + released_(WaitableEvent::ResetPolicy::MANUAL, + WaitableEvent::InitialState::NOT_SIGNALED), + done_(WaitableEvent::ResetPolicy::MANUAL, + WaitableEvent::InitialState::NOT_SIGNALED) {} + + ~ControlledRunner() override { ReleaseAndWaitUntilDone(); } + + void WaitUntilStarted() { started_.Wait(); } + + void ReleaseAndWaitUntilDone() { + released_.Signal(); + done_.Wait(); + } + + private: + void Run() override { + started_.Signal(); + released_.Wait(); + done_.Signal(); + } + + WaitableEvent started_; + WaitableEvent released_; + WaitableEvent done_; + + DISALLOW_COPY_AND_ASSIGN(ControlledRunner); }; class WaitEventRunner : public DelegateSimpleThread::Delegate { @@ -29,22 +71,28 @@ class WaitEventRunner : public DelegateSimpleThread::Delegate { explicit WaitEventRunner(WaitableEvent* event) : event_(event) { } ~WaitEventRunner() override {} + private: void Run() override { EXPECT_FALSE(event_->IsSignaled()); event_->Signal(); EXPECT_TRUE(event_->IsSignaled()); } - private: + WaitableEvent* event_; + + DISALLOW_COPY_AND_ASSIGN(WaitEventRunner); }; class SeqRunner : public DelegateSimpleThread::Delegate { public: explicit SeqRunner(AtomicSequenceNumber* seq) : seq_(seq) { } - void Run() override { seq_->GetNext(); } private: + void Run() override { seq_->GetNext(); } + AtomicSequenceNumber* seq_; + + DISALLOW_COPY_AND_ASSIGN(SeqRunner); }; // We count up on a sequence number, firing on the event when we've hit our @@ -56,6 +104,7 @@ class VerifyPoolRunner : public DelegateSimpleThread::Delegate { int total, WaitableEvent* event) : seq_(seq), total_(total), event_(event) { } + private: void Run() override { if (seq_->GetNext() == total_) { event_->Signal(); @@ -64,10 +113,11 @@ class VerifyPoolRunner : public DelegateSimpleThread::Delegate { } } - private: AtomicSequenceNumber* seq_; int total_; WaitableEvent* event_; + + DISALLOW_COPY_AND_ASSIGN(VerifyPoolRunner); }; } // namespace @@ -108,29 +158,44 @@ TEST(SimpleThreadTest, WaitForEvent) { thread.Join(); } -TEST(SimpleThreadTest, NamedWithOptions) { - WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL, - WaitableEvent::InitialState::NOT_SIGNALED); +TEST(SimpleThreadTest, NonJoinableStartAndDieOnJoin) { + ControlledRunner runner; - WaitEventRunner runner(&event); SimpleThread::Options options; - DelegateSimpleThread thread(&runner, "event_waiter", options); - EXPECT_EQ(thread.name_prefix(), "event_waiter"); - EXPECT_FALSE(event.IsSignaled()); + options.joinable = false; + DelegateSimpleThread thread(&runner, "non_joinable", options); + EXPECT_FALSE(thread.HasBeenStarted()); thread.Start(); - EXPECT_EQ(thread.name_prefix(), "event_waiter"); - EXPECT_EQ(thread.name(), - std::string("event_waiter/") + IntToString(thread.tid())); - event.Wait(); + EXPECT_TRUE(thread.HasBeenStarted()); - EXPECT_TRUE(event.IsSignaled()); - thread.Join(); + // Note: this is not quite the same as |thread.HasBeenStarted()| which + // represents ThreadMain() getting ready to invoke Run() whereas + // |runner.WaitUntilStarted()| ensures Run() was actually invoked. + runner.WaitUntilStarted(); + + EXPECT_FALSE(thread.HasBeenJoined()); + EXPECT_DCHECK_DEATH({ thread.Join(); }); +} + +TEST(SimpleThreadTest, NonJoinableInactiveDelegateDestructionIsOkay) { + std::unique_ptr<ControlledRunner> runner(new ControlledRunner); + + SimpleThread::Options options; + options.joinable = false; + std::unique_ptr<DelegateSimpleThread> thread( + new DelegateSimpleThread(runner.get(), "non_joinable", options)); + + thread->Start(); + runner->WaitUntilStarted(); + + // Deleting a non-joinable SimpleThread after Run() was invoked is okay. + thread.reset(); - // We keep the name and tid, even after the thread is gone. - EXPECT_EQ(thread.name_prefix(), "event_waiter"); - EXPECT_EQ(thread.name(), - std::string("event_waiter/") + IntToString(thread.tid())); + runner->WaitUntilStarted(); + runner->ReleaseAndWaitUntilDone(); + // It should be safe to destroy a Delegate after its Run() method completed. + runner.reset(); } TEST(SimpleThreadTest, ThreadPool) { diff --git a/base/threading/thread.cc b/base/threading/thread.cc index 9cdc6912ea..c30320f0dc 100644 --- a/base/threading/thread.cc +++ b/base/threading/thread.cc @@ -5,8 +5,10 @@ #include "base/threading/thread.h" #include "base/bind.h" +#include "base/bind_helpers.h" #include "base/lazy_instance.h" #include "base/location.h" +#include "base/logging.h" #include "base/run_loop.h" #include "base/synchronization/waitable_event.h" #include "base/threading/thread_id_name_manager.h" @@ -14,6 +16,10 @@ #include "base/threading/thread_restrictions.h" #include "build/build_config.h" +#if defined(OS_POSIX) && !defined(OS_NACL) +#include "base/files/file_descriptor_watcher_posix.h" +#endif + #if defined(OS_WIN) #include "base/win/scoped_com_initializer.h" #endif @@ -26,53 +32,31 @@ namespace { // because its Stop method was called. This allows us to catch cases where // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when // using a Thread to setup and run a MessageLoop. -base::LazyInstance<base::ThreadLocalBoolean> lazy_tls_bool = +base::LazyInstance<base::ThreadLocalBoolean>::Leaky lazy_tls_bool = LAZY_INSTANCE_INITIALIZER; } // namespace -// This is used to trigger the message loop to exit. -void ThreadQuitHelper() { - MessageLoop::current()->QuitWhenIdle(); - Thread::SetThreadWasQuitProperly(true); -} - -Thread::Options::Options() - : message_loop_type(MessageLoop::TYPE_DEFAULT), - timer_slack(TIMER_SLACK_NONE), - stack_size(0), - priority(ThreadPriority::NORMAL) { -} +Thread::Options::Options() = default; -Thread::Options::Options(MessageLoop::Type type, - size_t size) - : message_loop_type(type), - timer_slack(TIMER_SLACK_NONE), - stack_size(size), - priority(ThreadPriority::NORMAL) { -} +Thread::Options::Options(MessageLoop::Type type, size_t size) + : message_loop_type(type), stack_size(size) {} Thread::Options::Options(const Options& other) = default; -Thread::Options::~Options() { -} +Thread::Options::~Options() = default; Thread::Thread(const std::string& name) - : -#if defined(OS_WIN) - com_status_(NONE), -#endif - stopping_(false), - running_(false), - thread_(0), - id_(kInvalidThreadId), - id_event_(WaitableEvent::ResetPolicy::MANUAL, + : id_event_(WaitableEvent::ResetPolicy::MANUAL, WaitableEvent::InitialState::NOT_SIGNALED), - message_loop_(nullptr), - message_loop_timer_slack_(TIMER_SLACK_NONE), name_(name), start_event_(WaitableEvent::ResetPolicy::MANUAL, WaitableEvent::InitialState::NOT_SIGNALED) { + // Only bind the sequence on Start(): the state is constant between + // construction and Start() and it's thus valid for Start() to be called on + // another sequence as long as every other operation is then performed on that + // sequence. + owning_sequence_checker_.DetachFromSequence(); } Thread::~Thread() { @@ -80,6 +64,8 @@ Thread::~Thread() { } bool Thread::Start() { + DCHECK(owning_sequence_checker_.CalledOnValidSequence()); + Options options; #if defined(OS_WIN) if (com_status_ == STA) @@ -89,7 +75,11 @@ bool Thread::Start() { } bool Thread::StartWithOptions(const Options& options) { + DCHECK(owning_sequence_checker_.CalledOnValidSequence()); DCHECK(!message_loop_); + DCHECK(!IsRunning()); + DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's " + << "not allowed!"; #if defined(OS_WIN) DCHECK((com_status_ != STA) || (options.message_loop_type == MessageLoop::TYPE_UI)); @@ -106,32 +96,41 @@ bool Thread::StartWithOptions(const Options& options) { type = MessageLoop::TYPE_CUSTOM; message_loop_timer_slack_ = options.timer_slack; - std::unique_ptr<MessageLoop> message_loop = + std::unique_ptr<MessageLoop> message_loop_owned = MessageLoop::CreateUnbound(type, options.message_pump_factory); - message_loop_ = message_loop.get(); + message_loop_ = message_loop_owned.get(); start_event_.Reset(); - // Hold the thread_lock_ while starting a new thread, so that we can make sure - // that thread_ is populated before the newly created thread accesses it. + // Hold |thread_lock_| while starting the new thread to synchronize with + // Stop() while it's not guaranteed to be sequenced (until crbug/629139 is + // fixed). { AutoLock lock(thread_lock_); - if (!PlatformThread::CreateWithPriority(options.stack_size, this, &thread_, - options.priority)) { + bool success = + options.joinable + ? PlatformThread::CreateWithPriority(options.stack_size, this, + &thread_, options.priority) + : PlatformThread::CreateNonJoinableWithPriority( + options.stack_size, this, options.priority); + if (!success) { DLOG(ERROR) << "failed to create thread"; message_loop_ = nullptr; return false; } } - // The ownership of message_loop is managemed by the newly created thread + joinable_ = options.joinable; + + // The ownership of |message_loop_| is managed by the newly created thread // within the ThreadMain. - ignore_result(message_loop.release()); + ignore_result(message_loop_owned.release()); DCHECK(message_loop_); return true; } bool Thread::StartAndWaitForTesting() { + DCHECK(owning_sequence_checker_.CalledOnValidSequence()); bool result = Start(); if (!result) return false; @@ -140,6 +139,7 @@ bool Thread::StartAndWaitForTesting() { } bool Thread::WaitUntilThreadStarted() const { + DCHECK(owning_sequence_checker_.CalledOnValidSequence()); if (!message_loop_) return false; base::ThreadRestrictions::ScopedAllowWait allow_wait; @@ -147,37 +147,74 @@ bool Thread::WaitUntilThreadStarted() const { return true; } +void Thread::FlushForTesting() { + DCHECK(owning_sequence_checker_.CalledOnValidSequence()); + if (!message_loop_) + return; + + WaitableEvent done(WaitableEvent::ResetPolicy::AUTOMATIC, + WaitableEvent::InitialState::NOT_SIGNALED); + task_runner()->PostTask(FROM_HERE, + Bind(&WaitableEvent::Signal, Unretained(&done))); + done.Wait(); +} + void Thread::Stop() { + DCHECK(joinable_); + + // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and + // enable this check, until then synchronization with Start() via + // |thread_lock_| is required... + // DCHECK(owning_sequence_checker_.CalledOnValidSequence()); AutoLock lock(thread_lock_); - if (thread_.is_null()) - return; StopSoon(); + // Can't join if the |thread_| is either already gone or is non-joinable. + if (thread_.is_null()) + return; + // Wait for the thread to exit. // - // TODO(darin): Unfortunately, we need to keep message_loop_ around until + // TODO(darin): Unfortunately, we need to keep |message_loop_| around until // the thread exits. Some consumers are abusing the API. Make them stop. // PlatformThread::Join(thread_); thread_ = base::PlatformThreadHandle(); - // The thread should nullify message_loop_ on exit. + // The thread should nullify |message_loop_| on exit (note: Join() adds an + // implicit memory barrier and no lock is thus required for this check). DCHECK(!message_loop_); stopping_ = false; } void Thread::StopSoon() { - // We should only be called on the same thread that started us. - - DCHECK_NE(GetThreadId(), PlatformThread::CurrentId()); + // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and + // enable this check. + // DCHECK(owning_sequence_checker_.CalledOnValidSequence()); if (stopping_ || !message_loop_) return; stopping_ = true; - task_runner()->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper)); + + if (using_external_message_loop_) { + // Setting |stopping_| to true above should have been sufficient for this + // thread to be considered "stopped" per it having never set its |running_| + // bit by lack of its own ThreadMain. + DCHECK(!IsRunning()); + message_loop_ = nullptr; + return; + } + + task_runner()->PostTask( + FROM_HERE, base::Bind(&Thread::ThreadQuitHelper, Unretained(this))); +} + +void Thread::DetachFromSequence() { + DCHECK(owning_sequence_checker_.CalledOnValidSequence()); + owning_sequence_checker_.DetachFromSequence(); } PlatformThreadId Thread::GetThreadId() const { @@ -188,26 +225,36 @@ PlatformThreadId Thread::GetThreadId() const { } bool Thread::IsRunning() const { - // If the thread's already started (i.e. message_loop_ is non-null) and - // not yet requested to stop (i.e. stopping_ is false) we can just return - // true. (Note that stopping_ is touched only on the same thread that - // starts / started the new thread so we need no locking here.) + // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and + // enable this check. + // DCHECK(owning_sequence_checker_.CalledOnValidSequence()); + + // If the thread's already started (i.e. |message_loop_| is non-null) and not + // yet requested to stop (i.e. |stopping_| is false) we can just return true. + // (Note that |stopping_| is touched only on the same sequence that starts / + // started the new thread so we need no locking here.) if (message_loop_ && !stopping_) return true; - // Otherwise check the running_ flag, which is set to true by the new thread + // Otherwise check the |running_| flag, which is set to true by the new thread // only while it is inside Run(). AutoLock lock(running_lock_); return running_; } -void Thread::Run(MessageLoop*) { - RunLoop().Run(); +void Thread::Run(RunLoop* run_loop) { + // Overridable protected method to be called from our |thread_| only. + DCHECK(id_event_.IsSignaled()); + DCHECK_EQ(id_, PlatformThread::CurrentId()); + + run_loop->Run(); } +// static void Thread::SetThreadWasQuitProperly(bool flag) { lazy_tls_bool.Pointer()->Set(flag); } +// static bool Thread::GetThreadWasQuitProperly() { bool quit_properly = true; #ifndef NDEBUG @@ -216,9 +263,27 @@ bool Thread::GetThreadWasQuitProperly() { return quit_properly; } +void Thread::SetMessageLoop(MessageLoop* message_loop) { + DCHECK(owning_sequence_checker_.CalledOnValidSequence()); + DCHECK(message_loop); + + // Setting |message_loop_| should suffice for this thread to be considered + // as "running", until Stop() is invoked. + DCHECK(!IsRunning()); + message_loop_ = message_loop; + DCHECK(IsRunning()); + + using_external_message_loop_ = true; +} + void Thread::ThreadMain() { // First, make GetThreadId() available to avoid deadlocks. It could be called // any place in the following thread initialization code. + DCHECK(!id_event_.IsSignaled()); + // Note: this read of |id_| while |id_event_| isn't signaled is exceptionally + // okay because ThreadMain has a happens-after relationship with the other + // write in StartWithOptions(). + DCHECK_EQ(kInvalidThreadId, id_); id_ = PlatformThread::CurrentId(); DCHECK_NE(kInvalidThreadId, id_); id_event_.Signal(); @@ -226,12 +291,22 @@ void Thread::ThreadMain() { // Complete the initialization of our Thread object. PlatformThread::SetName(name_.c_str()); - // Lazily initialize the message_loop so that it can run on this thread. + // Lazily initialize the |message_loop| so that it can run on this thread. DCHECK(message_loop_); std::unique_ptr<MessageLoop> message_loop(message_loop_); message_loop_->BindToCurrentThread(); message_loop_->SetTimerSlack(message_loop_timer_slack_); +#if defined(OS_POSIX) && !defined(OS_NACL) + // Allow threads running a MessageLoopForIO to use FileDescriptorWatcher API. + std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher; + if (MessageLoopForIO::IsCurrent()) { + DCHECK_EQ(message_loop_, MessageLoopForIO::current()); + file_descriptor_watcher.reset( + new FileDescriptorWatcher(MessageLoopForIO::current())); + } +#endif + #if defined(OS_WIN) std::unique_ptr<win::ScopedCOMInitializer> com_initializer; if (com_status_ != NONE) { @@ -251,7 +326,9 @@ void Thread::ThreadMain() { start_event_.Signal(); - Run(message_loop_); + RunLoop run_loop; + run_loop_ = &run_loop; + Run(run_loop_); { AutoLock lock(running_lock_); @@ -266,15 +343,22 @@ void Thread::ThreadMain() { #endif if (message_loop->type() != MessageLoop::TYPE_CUSTOM) { - // Assert that MessageLoop::QuitWhenIdle was called by ThreadQuitHelper. - // Don't check for custom message pumps, because their shutdown might not - // allow this. + // Assert that RunLoop::QuitWhenIdle was called by ThreadQuitHelper. Don't + // check for custom message pumps, because their shutdown might not allow + // this. DCHECK(GetThreadWasQuitProperly()); } // We can't receive messages anymore. // (The message loop is destructed at the end of this block) message_loop_ = nullptr; + run_loop_ = nullptr; +} + +void Thread::ThreadQuitHelper() { + DCHECK(run_loop_); + run_loop_->QuitWhenIdle(); + SetThreadWasQuitProperly(true); } } // namespace base diff --git a/base/threading/thread.h b/base/threading/thread.h index c9a77d7323..01f7d8e250 100644 --- a/base/threading/thread.h +++ b/base/threading/thread.h @@ -15,7 +15,9 @@ #include "base/macros.h" #include "base/message_loop/message_loop.h" #include "base/message_loop/timer_slack.h" +#include "base/sequence_checker.h" #include "base/single_thread_task_runner.h" +#include "base/synchronization/atomic_flag.h" #include "base/synchronization/lock.h" #include "base/synchronization/waitable_event.h" #include "base/threading/platform_thread.h" @@ -24,6 +26,7 @@ namespace base { class MessagePump; +class RunLoop; // A simple thread abstraction that establishes a MessageLoop on a new thread. // The consumer uses the MessageLoop of the thread to cause code to execute on @@ -38,6 +41,18 @@ class MessagePump; // (1) Thread::CleanUp() // (2) MessageLoop::~MessageLoop // (3.b) MessageLoop::DestructionObserver::WillDestroyCurrentMessageLoop +// +// This API is not thread-safe: unless indicated otherwise its methods are only +// valid from the owning sequence (which is the one from which Start() is +// invoked -- should it differ from the one on which it was constructed). +// +// Sometimes it's useful to kick things off on the initial sequence (e.g. +// construction, Start(), task_runner()), but to then hand the Thread over to a +// pool of users for the last one of them to destroy it when done. For that use +// case, Thread::DetachFromSequence() allows the owning sequence to give up +// ownership. The caller is then responsible to ensure a happens-after +// relationship between the DetachFromSequence() call and the next use of that +// Thread object (including ~Thread()). class BASE_EXPORT Thread : PlatformThread::Delegate { public: struct BASE_EXPORT Options { @@ -50,10 +65,10 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { // Specifies the type of message loop that will be allocated on the thread. // This is ignored if message_pump_factory.is_null() is false. - MessageLoop::Type message_loop_type; + MessageLoop::Type message_loop_type = MessageLoop::TYPE_DEFAULT; // Specifies timer slack for thread message loop. - TimerSlack timer_slack; + TimerSlack timer_slack = TIMER_SLACK_NONE; // Used to create the MessagePump for the MessageLoop. The callback is Run() // on the thread. If message_pump_factory.is_null(), then a MessagePump @@ -64,10 +79,18 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { // Specifies the maximum stack size that the thread is allowed to use. // This does not necessarily correspond to the thread's initial stack size. // A value of 0 indicates that the default maximum should be used. - size_t stack_size; + size_t stack_size = 0; // Specifies the initial thread priority. - ThreadPriority priority; + ThreadPriority priority = ThreadPriority::NORMAL; + + // If false, the thread will not be joined on destruction. This is intended + // for threads that want TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN + // semantics. Non-joinable threads can't be joined (must be leaked and + // can't be destroyed or Stop()'ed). + // TODO(gab): allow non-joinable instances to be deleted without causing + // user-after-frees (proposal @ https://crbug.com/629139#c14) + bool joinable = true; }; // Constructor. @@ -125,12 +148,19 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { // carefully for production code. bool WaitUntilThreadStarted() const; - // Signals the thread to exit and returns once the thread has exited. After - // this method returns, the Thread object is completely reset and may be used - // as if it were newly constructed (i.e., Start may be called again). + // Blocks until all tasks previously posted to this thread have been executed. + void FlushForTesting(); + + // Signals the thread to exit and returns once the thread has exited. The + // Thread object is completely reset and may be used as if it were newly + // constructed (i.e., Start may be called again). Can only be called if + // |joinable_|. // // Stop may be called multiple times and is simply ignored if the thread is - // already stopped. + // already stopped or currently stopping. + // + // Start/Stop are not thread-safe and callers that desire to invoke them from + // different threads must ensure mutual exclusion. // // NOTE: If you are a consumer of Thread, it is not necessary to call this // before deleting your Thread objects, as the destructor will do it. @@ -145,11 +175,17 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { // deadlock on Windows with printer worker thread. In any other case, Stop() // should be used. // - // StopSoon should not be called multiple times as it is risky to do so. It - // could cause a timing issue in message_loop() access. Call Stop() to reset - // the thread object once it is known that the thread has quit. + // Call Stop() to reset the thread object once it is known that the thread has + // quit. void StopSoon(); + // Detaches the owning sequence, indicating that the next call to this API + // (including ~Thread()) can happen from a different sequence (to which it + // will be rebound). This call itself must happen on the current owning + // sequence and the caller must ensure the next API call has a happens-after + // relationship with this one. + void DetachFromSequence(); + // Returns the message loop for this thread. Use the MessageLoop's // PostTask methods to execute code on the thread. This only returns // non-null after a successful call to Start. After Stop has been called, @@ -158,29 +194,52 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { // NOTE: You must not call this MessageLoop's Quit method directly. Use // the Thread's Stop method instead. // - MessageLoop* message_loop() const { return message_loop_; } + // In addition to this Thread's owning sequence, this can also safely be + // called from the underlying thread itself. + MessageLoop* message_loop() const { + // This class doesn't provide synchronization around |message_loop_| and as + // such only the owner should access it (and the underlying thread which + // never sees it before it's set). In practice, many callers are coming from + // unrelated threads but provide their own implicit (e.g. memory barriers + // from task posting) or explicit (e.g. locks) synchronization making the + // access of |message_loop_| safe... Changing all of those callers is + // unfeasible; instead verify that they can reliably see + // |message_loop_ != nullptr| without synchronization as a proof that their + // external synchronization catches the unsynchronized effects of Start(). + // TODO(gab): Despite all of the above this test has to be disabled for now + // per crbug.com/629139#c6. + // DCHECK(owning_sequence_checker_.CalledOnValidSequence() || + // (id_event_.IsSignaled() && id_ == PlatformThread::CurrentId()) || + // message_loop_); + return message_loop_; + } // Returns a TaskRunner for this thread. Use the TaskRunner's PostTask // methods to execute code on the thread. Returns nullptr if the thread is not // running (e.g. before Start or after Stop have been called). Callers can // hold on to this even after the thread is gone; in this situation, attempts // to PostTask() will fail. + // + // In addition to this Thread's owning sequence, this can also safely be + // called from the underlying thread itself. scoped_refptr<SingleThreadTaskRunner> task_runner() const { + // Refer to the DCHECK and comment inside |message_loop()|. + DCHECK(owning_sequence_checker_.CalledOnValidSequence() || + (id_event_.IsSignaled() && id_ == PlatformThread::CurrentId()) || + message_loop_); return message_loop_ ? message_loop_->task_runner() : nullptr; } // Returns the name of this thread (for display in debugger too). const std::string& thread_name() const { return name_; } - // The native thread handle. - PlatformThreadHandle thread_handle() { return thread_; } - // Returns the thread ID. Should not be called before the first Start*() // call. Keeps on returning the same ID even after a Stop() call. The next // Start*() call renews the ID. // // WARNING: This function will block if the thread hasn't started yet. // + // This method is thread-safe. PlatformThreadId GetThreadId() const; // Returns true if the thread has been started, and not yet stopped. @@ -190,8 +249,8 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { // Called just prior to starting the message loop virtual void Init() {} - // Called to start the message loop - virtual void Run(MessageLoop* message_loop); + // Called to start the run loop + virtual void Run(RunLoop* run_loop); // Called just after the message loop ends virtual void CleanUp() {} @@ -199,8 +258,11 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { static void SetThreadWasQuitProperly(bool flag); static bool GetThreadWasQuitProperly(); - void set_message_loop(MessageLoop* message_loop) { - message_loop_ = message_loop; + // Bind this Thread to an existing MessageLoop instead of starting a new one. + void SetMessageLoop(MessageLoop* message_loop); + + bool using_external_message_loop() const { + return using_external_message_loop_; } private: @@ -215,19 +277,25 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { // PlatformThread::Delegate methods: void ThreadMain() override; + void ThreadQuitHelper(); + #if defined(OS_WIN) // Whether this thread needs to initialize COM, and if so, in what mode. - ComStatus com_status_; + ComStatus com_status_ = NONE; #endif + // Mirrors the Options::joinable field used to start this thread. Verified + // on Stop() -- non-joinable threads can't be joined (must be leaked). + bool joinable_ = true; + // If true, we're in the middle of stopping, and shouldn't access // |message_loop_|. It may non-nullptr and invalid. // Should be written on the thread that created this thread. Also read data // could be wrong on other threads. - bool stopping_; + bool stopping_ = false; // True while inside of Run(). - bool running_; + bool running_ = false; mutable base::Lock running_lock_; // Protects |running_|. // The thread's handle. @@ -235,24 +303,35 @@ class BASE_EXPORT Thread : PlatformThread::Delegate { mutable base::Lock thread_lock_; // Protects |thread_|. // The thread's id once it has started. - PlatformThreadId id_; - mutable WaitableEvent id_event_; // Protects |id_|. - - // The thread's message loop. Valid only while the thread is alive. Set - // by the created thread. - MessageLoop* message_loop_; + PlatformThreadId id_ = kInvalidThreadId; + // Protects |id_| which must only be read while it's signaled. + mutable WaitableEvent id_event_; + + // The thread's MessageLoop and RunLoop. Valid only while the thread is alive. + // Set by the created thread. + MessageLoop* message_loop_ = nullptr; + RunLoop* run_loop_ = nullptr; + + // True only if |message_loop_| was externally provided by |SetMessageLoop()| + // in which case this Thread has no underlying |thread_| and should merely + // drop |message_loop_| on Stop(). In that event, this remains true after + // Stop() was invoked so that subclasses can use this state to build their own + // cleanup logic as required. + bool using_external_message_loop_ = false; // Stores Options::timer_slack_ until the message loop has been bound to // a thread. - TimerSlack message_loop_timer_slack_; + TimerSlack message_loop_timer_slack_ = TIMER_SLACK_NONE; // The name of the thread. Used for debugging purposes. - std::string name_; + const std::string name_; // Signaled when the created thread gets ready to use the message loop. mutable WaitableEvent start_event_; - friend void ThreadQuitHelper(); + // This class is not thread-safe, use this to verify access from the owning + // sequence of the Thread. + SequenceChecker owning_sequence_checker_; DISALLOW_COPY_AND_ASSIGN(Thread); }; diff --git a/base/threading/thread_checker.h b/base/threading/thread_checker.h index 1d970f093e..1d4eb1c7b0 100644 --- a/base/threading/thread_checker.h +++ b/base/threading/thread_checker.h @@ -8,16 +8,6 @@ #include "base/logging.h" #include "base/threading/thread_checker_impl.h" -// Apart from debug builds, we also enable the thread checker in -// builds with DCHECK_ALWAYS_ON so that trybots and waterfall bots -// with this define will get the same level of thread checking as -// debug bots. -#if DCHECK_IS_ON() -#define ENABLE_THREAD_CHECKER 1 -#else -#define ENABLE_THREAD_CHECKER 0 -#endif - namespace base { // Do nothing implementation, for use in release mode. @@ -63,16 +53,20 @@ class ThreadCheckerDoNothing { // ThreadChecker thread_checker_; // } // +// Note that, when enabled, CalledOnValidThread() returns false when called from +// tasks posted to SingleThreadTaskRunners bound to different sequences, even if +// the tasks happen to run on the same thread (e.g. two independent TaskRunners +// with ExecutionMode::SINGLE_THREADED on the TaskScheduler that happen to share +// a thread). +// // In Release mode, CalledOnValidThread will always return true. -#if ENABLE_THREAD_CHECKER +#if DCHECK_IS_ON() class ThreadChecker : public ThreadCheckerImpl { }; #else class ThreadChecker : public ThreadCheckerDoNothing { }; -#endif // ENABLE_THREAD_CHECKER - -#undef ENABLE_THREAD_CHECKER +#endif // DCHECK_IS_ON() } // namespace base diff --git a/base/threading/thread_checker_impl.cc b/base/threading/thread_checker_impl.cc index eb87bae772..d5ccbdb943 100644 --- a/base/threading/thread_checker_impl.cc +++ b/base/threading/thread_checker_impl.cc @@ -4,31 +4,54 @@ #include "base/threading/thread_checker_impl.h" +#include "base/threading/thread_task_runner_handle.h" + namespace base { -ThreadCheckerImpl::ThreadCheckerImpl() - : valid_thread_id_() { - EnsureThreadIdAssigned(); +ThreadCheckerImpl::ThreadCheckerImpl() { + AutoLock auto_lock(lock_); + EnsureAssigned(); } -ThreadCheckerImpl::~ThreadCheckerImpl() {} +ThreadCheckerImpl::~ThreadCheckerImpl() = default; bool ThreadCheckerImpl::CalledOnValidThread() const { - EnsureThreadIdAssigned(); AutoLock auto_lock(lock_); - return valid_thread_id_ == PlatformThread::CurrentRef(); + EnsureAssigned(); + + // Always return true when called from the task from which this + // ThreadCheckerImpl was assigned to a thread. + if (task_token_ == TaskToken::GetForCurrentThread()) + return true; + + // If this ThreadCheckerImpl is bound to a valid SequenceToken, it must be + // equal to the current SequenceToken and there must be a registered + // ThreadTaskRunnerHandle. Otherwise, the fact that the current task runs on + // the thread to which this ThreadCheckerImpl is bound is fortuitous. + if (sequence_token_.IsValid() && + (sequence_token_ != SequenceToken::GetForCurrentThread() || + !ThreadTaskRunnerHandle::IsSet())) { + return false; + } + + return thread_id_ == PlatformThread::CurrentRef(); } void ThreadCheckerImpl::DetachFromThread() { AutoLock auto_lock(lock_); - valid_thread_id_ = PlatformThreadRef(); + thread_id_ = PlatformThreadRef(); + task_token_ = TaskToken(); + sequence_token_ = SequenceToken(); } -void ThreadCheckerImpl::EnsureThreadIdAssigned() const { - AutoLock auto_lock(lock_); - if (valid_thread_id_.is_null()) { - valid_thread_id_ = PlatformThread::CurrentRef(); - } +void ThreadCheckerImpl::EnsureAssigned() const { + lock_.AssertAcquired(); + if (!thread_id_.is_null()) + return; + + thread_id_ = PlatformThread::CurrentRef(); + task_token_ = TaskToken::GetForCurrentThread(); + sequence_token_ = SequenceToken::GetForCurrentThread(); } } // namespace base diff --git a/base/threading/thread_checker_impl.h b/base/threading/thread_checker_impl.h index c92e143db0..13193d1299 100644 --- a/base/threading/thread_checker_impl.h +++ b/base/threading/thread_checker_impl.h @@ -7,17 +7,18 @@ #include "base/base_export.h" #include "base/compiler_specific.h" +#include "base/sequence_token.h" #include "base/synchronization/lock.h" #include "base/threading/platform_thread.h" namespace base { -// Real implementation of ThreadChecker, for use in debug mode, or -// for temporary use in release mode (e.g. to CHECK on a threading issue -// seen only in the wild). +// Real implementation of ThreadChecker, for use in debug mode, or for temporary +// use in release mode (e.g. to CHECK on a threading issue seen only in the +// wild). // -// Note: You should almost always use the ThreadChecker class to get the -// right version for your build configuration. +// Note: You should almost always use the ThreadChecker class to get the right +// version for your build configuration. class BASE_EXPORT ThreadCheckerImpl { public: ThreadCheckerImpl(); @@ -31,12 +32,29 @@ class BASE_EXPORT ThreadCheckerImpl { void DetachFromThread(); private: - void EnsureThreadIdAssigned() const; + void EnsureAssigned() const; + // Members are mutable so that CalledOnValidThread() can set them. + + // Synchronizes access to all members. mutable base::Lock lock_; - // This is mutable so that CalledOnValidThread can set it. - // It's guarded by |lock_|. - mutable PlatformThreadRef valid_thread_id_; + + // Thread on which CalledOnValidThread() may return true. + mutable PlatformThreadRef thread_id_; + + // TaskToken for which CalledOnValidThread() always returns true. This allows + // CalledOnValidThread() to return true when called multiple times from the + // same task, even if it's not running in a single-threaded context itself + // (allowing usage of ThreadChecker/NonThreadSafe objects on the stack in the + // scope of one-off tasks). Note: CalledOnValidThread() may return true even + // if the current TaskToken is not equal to this. + mutable TaskToken task_token_; + + // SequenceToken for which CalledOnValidThread() may return true. Used to + // ensure that CalledOnValidThread() doesn't return true for TaskScheduler + // tasks that happen to run on the same thread but weren't posted to the same + // SingleThreadTaskRunner. + mutable SequenceToken sequence_token_; }; } // namespace base diff --git a/base/threading/thread_checker_unittest.cc b/base/threading/thread_checker_unittest.cc index bc5b1e473a..96455e66c7 100644 --- a/base/threading/thread_checker_unittest.cc +++ b/base/threading/thread_checker_unittest.cc @@ -2,180 +2,194 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "base/threading/thread_checker.h" - #include <memory> -#include "base/logging.h" +#include "base/bind.h" +#include "base/bind_helpers.h" #include "base/macros.h" +#include "base/memory/ref_counted.h" +#include "base/sequence_token.h" +#include "base/test/test_simple_task_runner.h" #include "base/threading/simple_thread.h" +#include "base/threading/thread_checker_impl.h" +#include "base/threading/thread_task_runner_handle.h" #include "testing/gtest/include/gtest/gtest.h" -// Duplicated from base/threading/thread_checker.h so that we can be -// good citizens there and undef the macro. -#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) -#define ENABLE_THREAD_CHECKER 1 -#else -#define ENABLE_THREAD_CHECKER 0 -#endif - namespace base { - namespace { -// Simple class to exercise the basics of ThreadChecker. -// Both the destructor and DoStuff should verify that they were -// called on the same thread as the constructor. -class ThreadCheckerClass : public ThreadChecker { +// A thread that runs a callback. +class RunCallbackThread : public SimpleThread { public: - ThreadCheckerClass() {} - - // Verifies that it was called on the same thread as the constructor. - void DoStuff() { - DCHECK(CalledOnValidThread()); - } - - void DetachFromThread() { - ThreadChecker::DetachFromThread(); - } - - static void MethodOnDifferentThreadImpl(); - static void DetachThenCallFromDifferentThreadImpl(); + explicit RunCallbackThread(const Closure& callback) + : SimpleThread("RunCallbackThread"), callback_(callback) {} private: - DISALLOW_COPY_AND_ASSIGN(ThreadCheckerClass); -}; + // SimpleThread: + void Run() override { callback_.Run(); } -// Calls ThreadCheckerClass::DoStuff on another thread. -class CallDoStuffOnThread : public base::SimpleThread { - public: - explicit CallDoStuffOnThread(ThreadCheckerClass* thread_checker_class) - : SimpleThread("call_do_stuff_on_thread"), - thread_checker_class_(thread_checker_class) { - } + const Closure callback_; - void Run() override { thread_checker_class_->DoStuff(); } + DISALLOW_COPY_AND_ASSIGN(RunCallbackThread); +}; - private: - ThreadCheckerClass* thread_checker_class_; +// Runs a callback on a new thread synchronously. +void RunCallbackOnNewThreadSynchronously(const Closure& callback) { + RunCallbackThread run_callback_thread(callback); + run_callback_thread.Start(); + run_callback_thread.Join(); +} - DISALLOW_COPY_AND_ASSIGN(CallDoStuffOnThread); -}; +void ExpectCalledOnValidThread(ThreadCheckerImpl* thread_checker) { + ASSERT_TRUE(thread_checker); -// Deletes ThreadCheckerClass on a different thread. -class DeleteThreadCheckerClassOnThread : public base::SimpleThread { - public: - explicit DeleteThreadCheckerClassOnThread( - ThreadCheckerClass* thread_checker_class) - : SimpleThread("delete_thread_checker_class_on_thread"), - thread_checker_class_(thread_checker_class) { - } + // This should bind |thread_checker| to the current thread if it wasn't + // already bound to a thread. + EXPECT_TRUE(thread_checker->CalledOnValidThread()); - void Run() override { thread_checker_class_.reset(); } + // Since |thread_checker| is now bound to the current thread, another call to + // CalledOnValidThread() should return true. + EXPECT_TRUE(thread_checker->CalledOnValidThread()); +} - private: - std::unique_ptr<ThreadCheckerClass> thread_checker_class_; +void ExpectNotCalledOnValidThread(ThreadCheckerImpl* thread_checker) { + ASSERT_TRUE(thread_checker); + EXPECT_FALSE(thread_checker->CalledOnValidThread()); +} - DISALLOW_COPY_AND_ASSIGN(DeleteThreadCheckerClassOnThread); -}; +void ExpectNotCalledOnValidThreadWithSequenceTokenAndThreadTaskRunnerHandle( + ThreadCheckerImpl* thread_checker, + SequenceToken sequence_token) { + ThreadTaskRunnerHandle thread_task_runner_handle( + make_scoped_refptr(new TestSimpleTaskRunner)); + ScopedSetSequenceTokenForCurrentThread + scoped_set_sequence_token_for_current_thread(sequence_token); + ExpectNotCalledOnValidThread(thread_checker); +} } // namespace -TEST(ThreadCheckerTest, CallsAllowedOnSameThread) { - std::unique_ptr<ThreadCheckerClass> thread_checker_class( - new ThreadCheckerClass); +TEST(ThreadCheckerTest, AllowedSameThreadNoSequenceToken) { + ThreadCheckerImpl thread_checker; + EXPECT_TRUE(thread_checker.CalledOnValidThread()); +} - // Verify that DoStuff doesn't assert. - thread_checker_class->DoStuff(); +TEST(ThreadCheckerTest, + AllowedSameThreadAndSequenceDifferentTasksWithThreadTaskRunnerHandle) { + ThreadTaskRunnerHandle thread_task_runner_handle( + make_scoped_refptr(new TestSimpleTaskRunner)); - // Verify that the destructor doesn't assert. - thread_checker_class.reset(); -} + std::unique_ptr<ThreadCheckerImpl> thread_checker; + const SequenceToken sequence_token = SequenceToken::Create(); -TEST(ThreadCheckerTest, DestructorAllowedOnDifferentThread) { - std::unique_ptr<ThreadCheckerClass> thread_checker_class( - new ThreadCheckerClass); + { + ScopedSetSequenceTokenForCurrentThread + scoped_set_sequence_token_for_current_thread(sequence_token); + thread_checker.reset(new ThreadCheckerImpl); + } - // Verify that the destructor doesn't assert - // when called on a different thread. - DeleteThreadCheckerClassOnThread delete_on_thread( - thread_checker_class.release()); + { + ScopedSetSequenceTokenForCurrentThread + scoped_set_sequence_token_for_current_thread(sequence_token); + EXPECT_TRUE(thread_checker->CalledOnValidThread()); + } +} - delete_on_thread.Start(); - delete_on_thread.Join(); +TEST(ThreadCheckerTest, + AllowedSameThreadSequenceAndTaskNoThreadTaskRunnerHandle) { + ScopedSetSequenceTokenForCurrentThread + scoped_set_sequence_token_for_current_thread(SequenceToken::Create()); + ThreadCheckerImpl thread_checker; + EXPECT_TRUE(thread_checker.CalledOnValidThread()); } -TEST(ThreadCheckerTest, DetachFromThread) { - std::unique_ptr<ThreadCheckerClass> thread_checker_class( - new ThreadCheckerClass); +TEST(ThreadCheckerTest, + DisallowedSameThreadAndSequenceDifferentTasksNoThreadTaskRunnerHandle) { + std::unique_ptr<ThreadCheckerImpl> thread_checker; - // Verify that DoStuff doesn't assert when called on a different thread after - // a call to DetachFromThread. - thread_checker_class->DetachFromThread(); - CallDoStuffOnThread call_on_thread(thread_checker_class.get()); + { + ScopedSetSequenceTokenForCurrentThread + scoped_set_sequence_token_for_current_thread(SequenceToken::Create()); + thread_checker.reset(new ThreadCheckerImpl); + } - call_on_thread.Start(); - call_on_thread.Join(); + { + ScopedSetSequenceTokenForCurrentThread + scoped_set_sequence_token_for_current_thread(SequenceToken::Create()); + EXPECT_FALSE(thread_checker->CalledOnValidThread()); + } } -#if GTEST_HAS_DEATH_TEST || !ENABLE_THREAD_CHECKER +TEST(ThreadCheckerTest, DisallowedDifferentThreadsNoSequenceToken) { + ThreadCheckerImpl thread_checker; + RunCallbackOnNewThreadSynchronously( + Bind(&ExpectNotCalledOnValidThread, Unretained(&thread_checker))); +} -void ThreadCheckerClass::MethodOnDifferentThreadImpl() { - std::unique_ptr<ThreadCheckerClass> thread_checker_class( - new ThreadCheckerClass); +TEST(ThreadCheckerTest, DisallowedDifferentThreadsSameSequence) { + ThreadTaskRunnerHandle thread_task_runner_handle( + make_scoped_refptr(new TestSimpleTaskRunner)); + const SequenceToken sequence_token(SequenceToken::Create()); - // DoStuff should assert in debug builds only when called on a - // different thread. - CallDoStuffOnThread call_on_thread(thread_checker_class.get()); + ScopedSetSequenceTokenForCurrentThread + scoped_set_sequence_token_for_current_thread(sequence_token); + ThreadCheckerImpl thread_checker; + EXPECT_TRUE(thread_checker.CalledOnValidThread()); - call_on_thread.Start(); - call_on_thread.Join(); + RunCallbackOnNewThreadSynchronously(Bind( + &ExpectNotCalledOnValidThreadWithSequenceTokenAndThreadTaskRunnerHandle, + Unretained(&thread_checker), sequence_token)); } -#if ENABLE_THREAD_CHECKER -TEST(ThreadCheckerDeathTest, MethodNotAllowedOnDifferentThreadInDebug) { - ASSERT_DEATH({ - ThreadCheckerClass::MethodOnDifferentThreadImpl(); - }, ""); -} -#else -TEST(ThreadCheckerTest, MethodAllowedOnDifferentThreadInRelease) { - ThreadCheckerClass::MethodOnDifferentThreadImpl(); -} -#endif // ENABLE_THREAD_CHECKER +TEST(ThreadCheckerTest, DisallowedSameThreadDifferentSequence) { + std::unique_ptr<ThreadCheckerImpl> thread_checker; -void ThreadCheckerClass::DetachThenCallFromDifferentThreadImpl() { - std::unique_ptr<ThreadCheckerClass> thread_checker_class( - new ThreadCheckerClass); + ThreadTaskRunnerHandle thread_task_runner_handle( + make_scoped_refptr(new TestSimpleTaskRunner)); - // DoStuff doesn't assert when called on a different thread - // after a call to DetachFromThread. - thread_checker_class->DetachFromThread(); - CallDoStuffOnThread call_on_thread(thread_checker_class.get()); + { + ScopedSetSequenceTokenForCurrentThread + scoped_set_sequence_token_for_current_thread(SequenceToken::Create()); + thread_checker.reset(new ThreadCheckerImpl); + } - call_on_thread.Start(); - call_on_thread.Join(); + { + // Different SequenceToken. + ScopedSetSequenceTokenForCurrentThread + scoped_set_sequence_token_for_current_thread(SequenceToken::Create()); + EXPECT_FALSE(thread_checker->CalledOnValidThread()); + } - // DoStuff should assert in debug builds only after moving to - // another thread. - thread_checker_class->DoStuff(); + // No SequenceToken. + EXPECT_FALSE(thread_checker->CalledOnValidThread()); } -#if ENABLE_THREAD_CHECKER -TEST(ThreadCheckerDeathTest, DetachFromThreadInDebug) { - ASSERT_DEATH({ - ThreadCheckerClass::DetachThenCallFromDifferentThreadImpl(); - }, ""); -} -#else -TEST(ThreadCheckerTest, DetachFromThreadInRelease) { - ThreadCheckerClass::DetachThenCallFromDifferentThreadImpl(); +TEST(ThreadCheckerTest, DetachFromThread) { + ThreadCheckerImpl thread_checker; + thread_checker.DetachFromThread(); + + // Verify that CalledOnValidThread() returns true when called on a different + // thread after a call to DetachFromThread(). + RunCallbackOnNewThreadSynchronously( + Bind(&ExpectCalledOnValidThread, Unretained(&thread_checker))); + + EXPECT_FALSE(thread_checker.CalledOnValidThread()); } -#endif // ENABLE_THREAD_CHECKER -#endif // GTEST_HAS_DEATH_TEST || !ENABLE_THREAD_CHECKER +TEST(ThreadCheckerTest, DetachFromThreadWithSequenceToken) { + ThreadTaskRunnerHandle thread_task_runner_handle( + make_scoped_refptr(new TestSimpleTaskRunner)); + ScopedSetSequenceTokenForCurrentThread + scoped_set_sequence_token_for_current_thread(SequenceToken::Create()); + ThreadCheckerImpl thread_checker; + thread_checker.DetachFromThread(); -// Just in case we ever get lumped together with other compilation units. -#undef ENABLE_THREAD_CHECKER + // Verify that CalledOnValidThread() returns true when called on a different + // thread after a call to DetachFromThread(). + RunCallbackOnNewThreadSynchronously( + Bind(&ExpectCalledOnValidThread, Unretained(&thread_checker))); + + EXPECT_FALSE(thread_checker.CalledOnValidThread()); +} } // namespace base diff --git a/base/threading/thread_local.h b/base/threading/thread_local.h index f40420cd2f..cad9add3a9 100644 --- a/base/threading/thread_local.h +++ b/base/threading/thread_local.h @@ -2,35 +2,34 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -// WARNING: Thread local storage is a bit tricky to get right. Please make -// sure that this is really the proper solution for what you're trying to -// achieve. Don't prematurely optimize, most likely you can just use a Lock. +// WARNING: Thread local storage is a bit tricky to get right. Please make sure +// that this is really the proper solution for what you're trying to achieve. +// Don't prematurely optimize, most likely you can just use a Lock. // -// These classes implement a wrapper around the platform's TLS storage -// mechanism. On construction, they will allocate a TLS slot, and free the -// TLS slot on destruction. No memory management (creation or destruction) is -// handled. This means for uses of ThreadLocalPointer, you must correctly -// manage the memory yourself, these classes will not destroy the pointer for -// you. There are no at-thread-exit actions taken by these classes. +// These classes implement a wrapper around ThreadLocalStorage::Slot. On +// construction, they will allocate a TLS slot, and free the TLS slot on +// destruction. No memory management (creation or destruction) is handled. This +// means for uses of ThreadLocalPointer, you must correctly manage the memory +// yourself, these classes will not destroy the pointer for you. There are no +// at-thread-exit actions taken by these classes. // -// ThreadLocalPointer<Type> wraps a Type*. It performs no creation or -// destruction, so memory management must be handled elsewhere. The first call -// to Get() on a thread will return NULL. You can update the pointer with a -// call to Set(). +// ThreadLocalPointer<Type> wraps a Type*. It performs no creation or +// destruction, so memory management must be handled elsewhere. The first call +// to Get() on a thread will return NULL. You can update the pointer with a call +// to Set(). // -// ThreadLocalBoolean wraps a bool. It will default to false if it has never +// ThreadLocalBoolean wraps a bool. It will default to false if it has never // been set otherwise with Set(). // -// Thread Safety: An instance of ThreadLocalStorage is completely thread safe -// once it has been created. If you want to dynamically create an instance, -// you must of course properly deal with safety and race conditions. This -// means a function-level static initializer is generally inappropiate. +// Thread Safety: An instance of ThreadLocalStorage is completely thread safe +// once it has been created. If you want to dynamically create an instance, you +// must of course properly deal with safety and race conditions. This means a +// function-level static initializer is generally inappropiate. // -// In Android, the system TLS is limited, the implementation is backed with -// ThreadLocalStorage. +// In Android, the system TLS is limited. // // Example usage: -// // My class is logically attached to a single thread. We cache a pointer +// // My class is logically attached to a single thread. We cache a pointer // // on the thread it was created on, so we can implement current(). // MyClass::MyClass() { // DCHECK(Singleton<ThreadLocalPointer<MyClass> >::get()->Get() == NULL); @@ -51,76 +50,42 @@ #ifndef BASE_THREADING_THREAD_LOCAL_H_ #define BASE_THREADING_THREAD_LOCAL_H_ -#include "base/base_export.h" #include "base/macros.h" #include "base/threading/thread_local_storage.h" -#include "build/build_config.h" - -#if defined(OS_POSIX) -#include <pthread.h> -#endif namespace base { -namespace internal { - -// Helper functions that abstract the cross-platform APIs. Do not use directly. -struct BASE_EXPORT ThreadLocalPlatform { -#if defined(OS_WIN) - typedef unsigned long SlotType; -#elif defined(OS_ANDROID) - typedef ThreadLocalStorage::StaticSlot SlotType; -#elif defined(OS_POSIX) - typedef pthread_key_t SlotType; -#endif - - static void AllocateSlot(SlotType* slot); - static void FreeSlot(SlotType slot); - static void* GetValueFromSlot(SlotType slot); - static void SetValueInSlot(SlotType slot, void* value); -}; - -} // namespace internal template <typename Type> class ThreadLocalPointer { public: - ThreadLocalPointer() : slot_() { - internal::ThreadLocalPlatform::AllocateSlot(&slot_); - } - - ~ThreadLocalPointer() { - internal::ThreadLocalPlatform::FreeSlot(slot_); - } + ThreadLocalPointer() = default; + ~ThreadLocalPointer() = default; Type* Get() { - return static_cast<Type*>( - internal::ThreadLocalPlatform::GetValueFromSlot(slot_)); + return static_cast<Type*>(slot_.Get()); } void Set(Type* ptr) { - internal::ThreadLocalPlatform::SetValueInSlot( - slot_, const_cast<void*>(static_cast<const void*>(ptr))); + slot_.Set(const_cast<void*>(static_cast<const void*>(ptr))); } private: - typedef internal::ThreadLocalPlatform::SlotType SlotType; - - SlotType slot_; + ThreadLocalStorage::Slot slot_; DISALLOW_COPY_AND_ASSIGN(ThreadLocalPointer<Type>); }; class ThreadLocalBoolean { public: - ThreadLocalBoolean() {} - ~ThreadLocalBoolean() {} + ThreadLocalBoolean() = default; + ~ThreadLocalBoolean() = default; bool Get() { - return tlp_.Get() != NULL; + return tlp_.Get() != nullptr; } void Set(bool val) { - tlp_.Set(val ? this : NULL); + tlp_.Set(val ? this : nullptr); } private: diff --git a/base/threading/thread_local_posix.cc b/base/threading/thread_local_posix.cc deleted file mode 100644 index 8bc46ad190..0000000000 --- a/base/threading/thread_local_posix.cc +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "base/threading/thread_local.h" - -#include <pthread.h> - -#include "base/logging.h" -#include "build/build_config.h" - -#if !defined(OS_ANDROID) - -namespace base { -namespace internal { - -// static -void ThreadLocalPlatform::AllocateSlot(SlotType* slot) { - int error = pthread_key_create(slot, NULL); - CHECK_EQ(error, 0); -} - -// static -void ThreadLocalPlatform::FreeSlot(SlotType slot) { - int error = pthread_key_delete(slot); - DCHECK_EQ(0, error); -} - -// static -void* ThreadLocalPlatform::GetValueFromSlot(SlotType slot) { - return pthread_getspecific(slot); -} - -// static -void ThreadLocalPlatform::SetValueInSlot(SlotType slot, void* value) { - int error = pthread_setspecific(slot, value); - DCHECK_EQ(error, 0); -} - -} // namespace internal -} // namespace base - -#endif // !defined(OS_ANDROID) diff --git a/base/threading/thread_local_storage.cc b/base/threading/thread_local_storage.cc index a7eb527888..48c1dd58c2 100644 --- a/base/threading/thread_local_storage.cc +++ b/base/threading/thread_local_storage.cc @@ -6,10 +6,57 @@ #include "base/atomicops.h" #include "base/logging.h" +#include "base/synchronization/lock.h" #include "build/build_config.h" using base::internal::PlatformThreadLocalStorage; +// Chrome Thread Local Storage (TLS) +// +// This TLS system allows Chrome to use a single OS level TLS slot process-wide, +// and allows us to control the slot limits instead of being at the mercy of the +// platform. To do this, Chrome TLS replicates an array commonly found in the OS +// thread metadata. +// +// Overview: +// +// OS TLS Slots Per-Thread Per-Process Global +// ... +// [] Chrome TLS Array Chrome TLS Metadata +// [] ----------> [][][][][ ][][][][] [][][][][ ][][][][] +// [] | | +// ... V V +// Metadata Version Slot Information +// Your Data! +// +// Using a single OS TLS slot, Chrome TLS allocates an array on demand for the +// lifetime of each thread that requests Chrome TLS data. Each per-thread TLS +// array matches the length of the per-process global metadata array. +// +// A per-process global TLS metadata array tracks information about each item in +// the per-thread array: +// * Status: Tracks if the slot is allocated or free to assign. +// * Destructor: An optional destructor to call on thread destruction for that +// specific slot. +// * Version: Tracks the current version of the TLS slot. Each TLS slot +// allocation is associated with a unique version number. +// +// Most OS TLS APIs guarantee that a newly allocated TLS slot is +// initialized to 0 for all threads. The Chrome TLS system provides +// this guarantee by tracking the version for each TLS slot here +// on each per-thread Chrome TLS array entry. Threads that access +// a slot with a mismatched version will receive 0 as their value. +// The metadata version is incremented when the client frees a +// slot. The per-thread metadata version is updated when a client +// writes to the slot. This scheme allows for constant time +// invalidation and avoids the need to iterate through each Chrome +// TLS array to mark the slot as zero. +// +// Just like an OS TLS API, clients of the Chrome TLS are responsible for +// managing any necessary lifetime of the data in their slots. The only +// convenience provided is automatic destruction when a thread ends. If a client +// frees a slot, that client is responsible for destroying the data in the slot. + namespace { // In order to make TLS destructors work, we need to keep around a function // pointer to the destructor for each slot. We keep this array of pointers in a @@ -18,37 +65,42 @@ namespace { // hold a pointer to a per-thread array (table) of slots that we allocate to // Chromium consumers. -// g_native_tls_key is the one native TLS that we use. It stores our table. +// g_native_tls_key is the one native TLS that we use. It stores our table. base::subtle::Atomic32 g_native_tls_key = PlatformThreadLocalStorage::TLS_KEY_OUT_OF_INDEXES; -// g_last_used_tls_key is the high-water-mark of allocated thread local storage. -// Each allocation is an index into our g_tls_destructors[]. Each such index is -// assigned to the instance variable slot_ in a ThreadLocalStorage::Slot -// instance. We reserve the value slot_ == 0 to indicate that the corresponding -// instance of ThreadLocalStorage::Slot has been freed (i.e., destructor called, -// etc.). This reserved use of 0 is then stated as the initial value of -// g_last_used_tls_key, so that the first issued index will be 1. -base::subtle::Atomic32 g_last_used_tls_key = 0; +// The maximum number of slots in our thread local storage stack. +constexpr int kThreadLocalStorageSize = 256; +constexpr int kInvalidSlotValue = -1; + +enum TlsStatus { + FREE, + IN_USE, +}; + +struct TlsMetadata { + TlsStatus status; + base::ThreadLocalStorage::TLSDestructorFunc destructor; + uint32_t version; +}; -// The maximum number of 'slots' in our thread local storage stack. -const int kThreadLocalStorageSize = 256; +struct TlsVectorEntry { + void* data; + uint32_t version; +}; + +// This lock isn't needed until after we've constructed the per-thread TLS +// vector, so it's safe to use. +base::Lock* GetTLSMetadataLock() { + static auto* lock = new base::Lock(); + return lock; +} +TlsMetadata g_tls_metadata[kThreadLocalStorageSize]; +size_t g_last_assigned_slot = 0; // The maximum number of times to try to clear slots by calling destructors. // Use pthread naming convention for clarity. -const int kMaxDestructorIterations = kThreadLocalStorageSize; - -// An array of destructor function pointers for the slots. If a slot has a -// destructor, it will be stored in its corresponding entry in this array. -// The elements are volatile to ensure that when the compiler reads the value -// to potentially call the destructor, it does so once, and that value is tested -// for null-ness and then used. Yes, that would be a weird de-optimization, -// but I can imagine some register machines where it was just as easy to -// re-fetch an array element, and I want to be sure a call to free the key -// (i.e., null out the destructor entry) that happens on a separate thread can't -// hurt the racy calls to the destructors on another thread. -volatile base::ThreadLocalStorage::TLSDestructorFunc - g_tls_destructors[kThreadLocalStorageSize]; +constexpr int kMaxDestructorIterations = kThreadLocalStorageSize; // This function is called to initialize our entire Chromium TLS system. // It may be called very early, and we need to complete most all of the setup @@ -56,7 +108,7 @@ volatile base::ThreadLocalStorage::TLSDestructorFunc // recursively depend on this initialization. // As a result, we use Atomics, and avoid anything (like a singleton) that might // require memory allocations. -void** ConstructTlsVector() { +TlsVectorEntry* ConstructTlsVector() { PlatformThreadLocalStorage::TLSKey key = base::subtle::NoBarrier_Load(&g_native_tls_key); if (key == PlatformThreadLocalStorage::TLS_KEY_OUT_OF_INDEXES) { @@ -73,8 +125,8 @@ void** ConstructTlsVector() { key != PlatformThreadLocalStorage::TLS_KEY_OUT_OF_INDEXES); PlatformThreadLocalStorage::FreeTLS(tmp); } - // Atomically test-and-set the tls_key. If the key is - // TLS_KEY_OUT_OF_INDEXES, go ahead and set it. Otherwise, do nothing, as + // Atomically test-and-set the tls_key. If the key is + // TLS_KEY_OUT_OF_INDEXES, go ahead and set it. Otherwise, do nothing, as // another thread already did our dirty work. if (PlatformThreadLocalStorage::TLS_KEY_OUT_OF_INDEXES != static_cast<PlatformThreadLocalStorage::TLSKey>( @@ -90,39 +142,38 @@ void** ConstructTlsVector() { } CHECK(!PlatformThreadLocalStorage::GetTLSValue(key)); - // Some allocators, such as TCMalloc, make use of thread local storage. - // As a result, any attempt to call new (or malloc) will lazily cause such a - // system to initialize, which will include registering for a TLS key. If we - // are not careful here, then that request to create a key will call new back, - // and we'll have an infinite loop. We avoid that as follows: - // Use a stack allocated vector, so that we don't have dependence on our - // allocator until our service is in place. (i.e., don't even call new until - // after we're setup) - void* stack_allocated_tls_data[kThreadLocalStorageSize]; + // Some allocators, such as TCMalloc, make use of thread local storage. As a + // result, any attempt to call new (or malloc) will lazily cause such a system + // to initialize, which will include registering for a TLS key. If we are not + // careful here, then that request to create a key will call new back, and + // we'll have an infinite loop. We avoid that as follows: Use a stack + // allocated vector, so that we don't have dependence on our allocator until + // our service is in place. (i.e., don't even call new until after we're + // setup) + TlsVectorEntry stack_allocated_tls_data[kThreadLocalStorageSize]; memset(stack_allocated_tls_data, 0, sizeof(stack_allocated_tls_data)); // Ensure that any rentrant calls change the temp version. PlatformThreadLocalStorage::SetTLSValue(key, stack_allocated_tls_data); // Allocate an array to store our data. - void** tls_data = new void*[kThreadLocalStorageSize]; + TlsVectorEntry* tls_data = new TlsVectorEntry[kThreadLocalStorageSize]; memcpy(tls_data, stack_allocated_tls_data, sizeof(stack_allocated_tls_data)); PlatformThreadLocalStorage::SetTLSValue(key, tls_data); return tls_data; } -void OnThreadExitInternal(void* value) { - DCHECK(value); - void** tls_data = static_cast<void**>(value); - // Some allocators, such as TCMalloc, use TLS. As a result, when a thread +void OnThreadExitInternal(TlsVectorEntry* tls_data) { + DCHECK(tls_data); + // Some allocators, such as TCMalloc, use TLS. As a result, when a thread // terminates, one of the destructor calls we make may be to shut down an - // allocator. We have to be careful that after we've shutdown all of the - // known destructors (perchance including an allocator), that we don't call - // the allocator and cause it to resurrect itself (with no possibly destructor - // call to follow). We handle this problem as follows: - // Switch to using a stack allocated vector, so that we don't have dependence - // on our allocator after we have called all g_tls_destructors. (i.e., don't - // even call delete[] after we're done with destructors.) - void* stack_allocated_tls_data[kThreadLocalStorageSize]; + // allocator. We have to be careful that after we've shutdown all of the known + // destructors (perchance including an allocator), that we don't call the + // allocator and cause it to resurrect itself (with no possibly destructor + // call to follow). We handle this problem as follows: Switch to using a stack + // allocated vector, so that we don't have dependence on our allocator after + // we have called all g_tls_metadata destructors. (i.e., don't even call + // delete[] after we're done with destructors.) + TlsVectorEntry stack_allocated_tls_data[kThreadLocalStorageSize]; memcpy(stack_allocated_tls_data, tls_data, sizeof(stack_allocated_tls_data)); // Ensure that any re-entrant calls change the temp version. PlatformThreadLocalStorage::TLSKey key = @@ -130,32 +181,38 @@ void OnThreadExitInternal(void* value) { PlatformThreadLocalStorage::SetTLSValue(key, stack_allocated_tls_data); delete[] tls_data; // Our last dependence on an allocator. + // Snapshot the TLS Metadata so we don't have to lock on every access. + TlsMetadata tls_metadata[kThreadLocalStorageSize]; + { + base::AutoLock auto_lock(*GetTLSMetadataLock()); + memcpy(tls_metadata, g_tls_metadata, sizeof(g_tls_metadata)); + } + int remaining_attempts = kMaxDestructorIterations; bool need_to_scan_destructors = true; while (need_to_scan_destructors) { need_to_scan_destructors = false; // Try to destroy the first-created-slot (which is slot 1) in our last - // destructor call. That user was able to function, and define a slot with + // destructor call. That user was able to function, and define a slot with // no other services running, so perhaps it is a basic service (like an - // allocator) and should also be destroyed last. If we get the order wrong, - // then we'll itterate several more times, so it is really not that - // critical (but it might help). - base::subtle::Atomic32 last_used_tls_key = - base::subtle::NoBarrier_Load(&g_last_used_tls_key); - for (int slot = last_used_tls_key; slot > 0; --slot) { - void* tls_value = stack_allocated_tls_data[slot]; - if (tls_value == NULL) + // allocator) and should also be destroyed last. If we get the order wrong, + // then we'll iterate several more times, so it is really not that critical + // (but it might help). + for (int slot = 0; slot < kThreadLocalStorageSize ; ++slot) { + void* tls_value = stack_allocated_tls_data[slot].data; + if (!tls_value || tls_metadata[slot].status == TlsStatus::FREE || + stack_allocated_tls_data[slot].version != tls_metadata[slot].version) continue; base::ThreadLocalStorage::TLSDestructorFunc destructor = - g_tls_destructors[slot]; - if (destructor == NULL) + tls_metadata[slot].destructor; + if (!destructor) continue; - stack_allocated_tls_data[slot] = NULL; // pre-clear the slot. + stack_allocated_tls_data[slot].data = nullptr; // pre-clear the slot. destructor(tls_value); - // Any destructor might have called a different service, which then set - // a different slot to a non-NULL value. Hence we need to check - // the whole vector again. This is a pthread standard. + // Any destructor might have called a different service, which then set a + // different slot to a non-null value. Hence we need to check the whole + // vector again. This is a pthread standard. need_to_scan_destructors = true; } if (--remaining_attempts <= 0) { @@ -165,7 +222,7 @@ void OnThreadExitInternal(void* value) { } // Remove our stack allocated vector. - PlatformThreadLocalStorage::SetTLSValue(key, NULL); + PlatformThreadLocalStorage::SetTLSValue(key, nullptr); } } // namespace @@ -184,69 +241,107 @@ void PlatformThreadLocalStorage::OnThreadExit() { // Maybe we have never initialized TLS for this thread. if (!tls_data) return; - OnThreadExitInternal(tls_data); + OnThreadExitInternal(static_cast<TlsVectorEntry*>(tls_data)); } #elif defined(OS_POSIX) void PlatformThreadLocalStorage::OnThreadExit(void* value) { - OnThreadExitInternal(value); + OnThreadExitInternal(static_cast<TlsVectorEntry*>(value)); } #endif // defined(OS_WIN) } // namespace internal -ThreadLocalStorage::Slot::Slot(TLSDestructorFunc destructor) { - slot_ = 0; - base::subtle::Release_Store(&initialized_, 0); - Initialize(destructor); -} - void ThreadLocalStorage::StaticSlot::Initialize(TLSDestructorFunc destructor) { PlatformThreadLocalStorage::TLSKey key = base::subtle::NoBarrier_Load(&g_native_tls_key); if (key == PlatformThreadLocalStorage::TLS_KEY_OUT_OF_INDEXES || - !PlatformThreadLocalStorage::GetTLSValue(key)) + !PlatformThreadLocalStorage::GetTLSValue(key)) { ConstructTlsVector(); + } // Grab a new slot. - slot_ = base::subtle::NoBarrier_AtomicIncrement(&g_last_used_tls_key, 1); - DCHECK_GT(slot_, 0); + slot_ = kInvalidSlotValue; + version_ = 0; + { + base::AutoLock auto_lock(*GetTLSMetadataLock()); + for (int i = 0; i < kThreadLocalStorageSize; ++i) { + // Tracking the last assigned slot is an attempt to find the next + // available slot within one iteration. Under normal usage, slots remain + // in use for the lifetime of the process (otherwise before we reclaimed + // slots, we would have run out of slots). This makes it highly likely the + // next slot is going to be a free slot. + size_t slot_candidate = + (g_last_assigned_slot + 1 + i) % kThreadLocalStorageSize; + if (g_tls_metadata[slot_candidate].status == TlsStatus::FREE) { + g_tls_metadata[slot_candidate].status = TlsStatus::IN_USE; + g_tls_metadata[slot_candidate].destructor = destructor; + g_last_assigned_slot = slot_candidate; + slot_ = slot_candidate; + version_ = g_tls_metadata[slot_candidate].version; + break; + } + } + } + CHECK_NE(slot_, kInvalidSlotValue); CHECK_LT(slot_, kThreadLocalStorageSize); // Setup our destructor. - g_tls_destructors[slot_] = destructor; base::subtle::Release_Store(&initialized_, 1); } void ThreadLocalStorage::StaticSlot::Free() { - // At this time, we don't reclaim old indices for TLS slots. - // So all we need to do is wipe the destructor. - DCHECK_GT(slot_, 0); + DCHECK_NE(slot_, kInvalidSlotValue); DCHECK_LT(slot_, kThreadLocalStorageSize); - g_tls_destructors[slot_] = NULL; - slot_ = 0; + { + base::AutoLock auto_lock(*GetTLSMetadataLock()); + g_tls_metadata[slot_].status = TlsStatus::FREE; + g_tls_metadata[slot_].destructor = nullptr; + ++(g_tls_metadata[slot_].version); + } + slot_ = kInvalidSlotValue; base::subtle::Release_Store(&initialized_, 0); } void* ThreadLocalStorage::StaticSlot::Get() const { - void** tls_data = static_cast<void**>( + TlsVectorEntry* tls_data = static_cast<TlsVectorEntry*>( PlatformThreadLocalStorage::GetTLSValue( base::subtle::NoBarrier_Load(&g_native_tls_key))); if (!tls_data) tls_data = ConstructTlsVector(); - DCHECK_GT(slot_, 0); + DCHECK_NE(slot_, kInvalidSlotValue); DCHECK_LT(slot_, kThreadLocalStorageSize); - return tls_data[slot_]; + // Version mismatches means this slot was previously freed. + if (tls_data[slot_].version != version_) + return nullptr; + return tls_data[slot_].data; } void ThreadLocalStorage::StaticSlot::Set(void* value) { - void** tls_data = static_cast<void**>( + TlsVectorEntry* tls_data = static_cast<TlsVectorEntry*>( PlatformThreadLocalStorage::GetTLSValue( base::subtle::NoBarrier_Load(&g_native_tls_key))); if (!tls_data) tls_data = ConstructTlsVector(); - DCHECK_GT(slot_, 0); + DCHECK_NE(slot_, kInvalidSlotValue); DCHECK_LT(slot_, kThreadLocalStorageSize); - tls_data[slot_] = value; + tls_data[slot_].data = value; + tls_data[slot_].version = version_; +} + +ThreadLocalStorage::Slot::Slot(TLSDestructorFunc destructor) { + tls_slot_.Initialize(destructor); +} + +ThreadLocalStorage::Slot::~Slot() { + tls_slot_.Free(); +} + +void* ThreadLocalStorage::Slot::Get() const { + return tls_slot_.Get(); +} + +void ThreadLocalStorage::Slot::Set(void* value) { + tls_slot_.Set(value); } } // namespace base diff --git a/base/threading/thread_local_storage.h b/base/threading/thread_local_storage.h index 0c7a692a66..5e70410af9 100644 --- a/base/threading/thread_local_storage.h +++ b/base/threading/thread_local_storage.h @@ -5,6 +5,8 @@ #ifndef BASE_THREADING_THREAD_LOCAL_STORAGE_H_ #define BASE_THREADING_THREAD_LOCAL_STORAGE_H_ +#include <stdint.h> + #include "base/atomicops.h" #include "base/base_export.h" #include "base/macros.h" @@ -20,9 +22,12 @@ namespace base { namespace internal { -// WARNING: You should *NOT* be using this class directly. -// PlatformThreadLocalStorage is low-level abstraction to the OS's TLS -// interface, you should instead be using ThreadLocalStorage::StaticSlot/Slot. +// WARNING: You should *NOT* use this class directly. +// PlatformThreadLocalStorage is a low-level abstraction of the OS's TLS +// interface. Instead, you should use one of the following: +// * ThreadLocalBoolean (from thread_local.h) for booleans. +// * ThreadLocalPointer (from thread_local.h) for pointers. +// * ThreadLocalStorage::StaticSlot/Slot for more direct control of the slot. class BASE_EXPORT PlatformThreadLocalStorage { public: @@ -89,7 +94,7 @@ class BASE_EXPORT ThreadLocalStorage { // initialization, as base's LINKER_INITIALIZED requires a constructor and on // some compilers (notably gcc 4.4) this still ends up needing runtime // initialization. - #define TLS_INITIALIZER {false, 0} +#define TLS_INITIALIZER {false, 0, 0} // A key representing one value stored in TLS. // Initialize like @@ -123,18 +128,25 @@ class BASE_EXPORT ThreadLocalStorage { // The internals of this struct should be considered private. base::subtle::Atomic32 initialized_; int slot_; + uint32_t version_; }; // A convenience wrapper around StaticSlot with a constructor. Can be used // as a member variable. - class BASE_EXPORT Slot : public StaticSlot { + class BASE_EXPORT Slot { public: - // Calls StaticSlot::Initialize(). explicit Slot(TLSDestructorFunc destructor = NULL); + ~Slot(); + + // Get the thread-local value stored in this slot. + // Values are guaranteed to initially be zero. + void* Get() const; + + // Set the slot's thread-local value to |value|. + void Set(void* value); private: - using StaticSlot::initialized_; - using StaticSlot::slot_; + StaticSlot tls_slot_; DISALLOW_COPY_AND_ASSIGN(Slot); }; diff --git a/base/threading/thread_local_storage_unittest.cc b/base/threading/thread_local_storage_unittest.cc index 322524b10e..335252b18e 100644 --- a/base/threading/thread_local_storage_unittest.cc +++ b/base/threading/thread_local_storage_unittest.cc @@ -127,4 +127,14 @@ TEST(ThreadLocalStorageTest, MAYBE_TLSDestructors) { tls_slot.Free(); // Stop doing callbacks to cleanup threads. } +TEST(ThreadLocalStorageTest, TLSReclaim) { + // Creates and destroys many TLS slots and ensures they all zero-inited. + for (int i = 0; i < 1000; ++i) { + ThreadLocalStorage::Slot slot(nullptr); + EXPECT_EQ(nullptr, slot.Get()); + slot.Set(reinterpret_cast<void*>(0xBAADF00D)); + EXPECT_EQ(reinterpret_cast<void*>(0xBAADF00D), slot.Get()); + } +} + } // namespace base diff --git a/base/threading/thread_restrictions.cc b/base/threading/thread_restrictions.cc index 00306c5ae7..8dd7743332 100644 --- a/base/threading/thread_restrictions.cc +++ b/base/threading/thread_restrictions.cc @@ -4,7 +4,7 @@ #include "base/threading/thread_restrictions.h" -#if ENABLE_THREAD_RESTRICTIONS +#if DCHECK_IS_ON() #include "base/lazy_instance.h" #include "base/logging.h" @@ -35,7 +35,7 @@ bool ThreadRestrictions::SetIOAllowed(bool allowed) { // static void ThreadRestrictions::AssertIOAllowed() { if (g_io_disallowed.Get().Get()) { - LOG(FATAL) << + NOTREACHED() << "Function marked as IO-only was called from a thread that " "disallows IO! If this thread really should be allowed to " "make IO calls, adjust the call to " @@ -54,10 +54,14 @@ bool ThreadRestrictions::SetSingletonAllowed(bool allowed) { // static void ThreadRestrictions::AssertSingletonAllowed() { if (g_singleton_disallowed.Get().Get()) { - LOG(FATAL) << "LazyInstance/Singleton is not allowed to be used on this " - << "thread. Most likely it's because this thread is not " - << "joinable, so AtExitManager may have deleted the object " - << "on shutdown, leading to a potential shutdown crash."; + NOTREACHED() << "LazyInstance/Singleton is not allowed to be used on this " + << "thread. Most likely it's because this thread is not " + << "joinable (or the current task is running with " + << "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN semantics), so " + << "AtExitManager may have deleted the object on shutdown, " + << "leading to a potential shutdown crash. If you need to use " + << "the object from this context, it'll have to be updated to " + << "use Leaky traits."; } } @@ -69,8 +73,8 @@ void ThreadRestrictions::DisallowWaiting() { // static void ThreadRestrictions::AssertWaitAllowed() { if (g_wait_disallowed.Get().Get()) { - LOG(FATAL) << "Waiting is not allowed to be used on this thread to prevent " - << "jank and deadlock."; + NOTREACHED() << "Waiting is not allowed to be used on this thread to " + << "prevent jank and deadlock."; } } @@ -82,4 +86,4 @@ bool ThreadRestrictions::SetWaitAllowed(bool allowed) { } // namespace base -#endif // ENABLE_THREAD_RESTRICTIONS +#endif // DCHECK_IS_ON() diff --git a/base/threading/thread_restrictions.h b/base/threading/thread_restrictions.h index 4212a4b6eb..a86dd452b8 100644 --- a/base/threading/thread_restrictions.h +++ b/base/threading/thread_restrictions.h @@ -6,15 +6,9 @@ #define BASE_THREADING_THREAD_RESTRICTIONS_H_ #include "base/base_export.h" +#include "base/logging.h" #include "base/macros.h" -// See comment at top of thread_checker.h -#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) -#define ENABLE_THREAD_RESTRICTIONS 1 -#else -#define ENABLE_THREAD_RESTRICTIONS 0 -#endif - class BrowserProcessImpl; class HistogramSynchronizer; class NativeBackendKWallet; @@ -57,10 +51,10 @@ namespace gpu { class GpuChannelHost; } namespace mojo { -namespace common { -class MessagePumpMojo; -} class SyncCallRestrictions; +namespace edk { +class ScopedIPCSupport; +} } namespace ui { class CommandBufferClientImpl; @@ -92,6 +86,10 @@ namespace android { class JavaHandlerThread; } +namespace internal { +class TaskTracker; +} + class SequencedWorkerPool; class SimpleThread; class Thread; @@ -137,21 +135,7 @@ class BASE_EXPORT ThreadRestrictions { DISALLOW_COPY_AND_ASSIGN(ScopedAllowIO); }; - // Constructing a ScopedAllowSingleton temporarily allows accessing for the - // current thread. Doing this is almost always incorrect. - class BASE_EXPORT ScopedAllowSingleton { - public: - ScopedAllowSingleton() { previous_value_ = SetSingletonAllowed(true); } - ~ScopedAllowSingleton() { SetSingletonAllowed(previous_value_); } - private: - // Whether singleton use is allowed when the ScopedAllowSingleton was - // constructed. - bool previous_value_; - - DISALLOW_COPY_AND_ASSIGN(ScopedAllowSingleton); - }; - -#if ENABLE_THREAD_RESTRICTIONS +#if DCHECK_IS_ON() // Set whether the current thread to make IO calls. // Threads start out in the *allowed* state. // Returns the previous value. @@ -197,6 +181,7 @@ class BASE_EXPORT ThreadRestrictions { friend class content::ScopedAllowWaitForAndroidLayoutTests; friend class content::ScopedAllowWaitForDebugURL; friend class ::HistogramSynchronizer; + friend class internal::TaskTracker; friend class ::ScopedAllowWaitForLegacyWebViewApi; friend class cc::CompletionEvent; friend class cc::SingleThreadTaskGraphRunner; @@ -210,8 +195,8 @@ class BASE_EXPORT ThreadRestrictions { friend class ThreadTestHelper; friend class PlatformThread; friend class android::JavaHandlerThread; - friend class mojo::common::MessagePumpMojo; friend class mojo::SyncCallRestrictions; + friend class mojo::edk::ScopedIPCSupport; friend class ui::CommandBufferClientImpl; friend class ui::CommandBufferLocal; friend class ui::GpuState; @@ -240,7 +225,7 @@ class BASE_EXPORT ThreadRestrictions { friend class views::ScreenMus; // END USAGE THAT NEEDS TO BE FIXED. -#if ENABLE_THREAD_RESTRICTIONS +#if DCHECK_IS_ON() static bool SetWaitAllowed(bool allowed); #else static bool SetWaitAllowed(bool) { return true; } diff --git a/base/threading/thread_task_runner_handle.cc b/base/threading/thread_task_runner_handle.cc index 190e18ffc6..00deaa4e20 100644 --- a/base/threading/thread_task_runner_handle.cc +++ b/base/threading/thread_task_runner_handle.cc @@ -6,8 +6,10 @@ #include <utility> +#include "base/bind.h" #include "base/lazy_instance.h" #include "base/logging.h" +#include "base/memory/ptr_util.h" #include "base/threading/sequenced_task_runner_handle.h" #include "base/threading/thread_local.h" @@ -32,6 +34,50 @@ bool ThreadTaskRunnerHandle::IsSet() { return !!lazy_tls_ptr.Pointer()->Get(); } +// static +ScopedClosureRunner ThreadTaskRunnerHandle::OverrideForTesting( + scoped_refptr<SingleThreadTaskRunner> overriding_task_runner) { + // OverrideForTesting() is not compatible with a SequencedTaskRunnerHandle + // being set (but SequencedTaskRunnerHandle::IsSet() includes + // ThreadTaskRunnerHandle::IsSet() so that's discounted as the only valid + // excuse for it to be true). Sadly this means that tests that merely need a + // SequencedTaskRunnerHandle on their main thread can be forced to use a + // ThreadTaskRunnerHandle if they're also using test task runners (that + // OverrideForTesting() when running their tasks from said main thread). To + // solve this: sequence_task_runner_handle.cc and thread_task_runner_handle.cc + // would have to be merged into a single impl file and share TLS state. This + // was deemed unecessary for now as most tests should use higher level + // constructs and not have to instantiate task runner handles on their own. + DCHECK(!SequencedTaskRunnerHandle::IsSet() || IsSet()); + + if (!IsSet()) { + std::unique_ptr<ThreadTaskRunnerHandle> top_level_ttrh = + MakeUnique<ThreadTaskRunnerHandle>(std::move(overriding_task_runner)); + return ScopedClosureRunner(base::Bind( + [](std::unique_ptr<ThreadTaskRunnerHandle>) {}, + base::Passed(&top_level_ttrh))); + } + + ThreadTaskRunnerHandle* ttrh = lazy_tls_ptr.Pointer()->Get(); + // Swap the two (and below bind |overriding_task_runner|, which is now the + // previous one, as the |task_runner_to_restore|). + ttrh->task_runner_.swap(overriding_task_runner); + + return ScopedClosureRunner(base::Bind( + [](scoped_refptr<SingleThreadTaskRunner> task_runner_to_restore, + SingleThreadTaskRunner* expected_task_runner_before_restore) { + ThreadTaskRunnerHandle* ttrh = lazy_tls_ptr.Pointer()->Get(); + + DCHECK_EQ(expected_task_runner_before_restore, ttrh->task_runner_.get()) + << "Nested overrides must expire their ScopedClosureRunners " + "in LIFO order."; + + ttrh->task_runner_.swap(task_runner_to_restore); + }, + base::Passed(&overriding_task_runner), + base::Unretained(ttrh->task_runner_.get()))); +} + ThreadTaskRunnerHandle::ThreadTaskRunnerHandle( scoped_refptr<SingleThreadTaskRunner> task_runner) : task_runner_(std::move(task_runner)) { diff --git a/base/threading/thread_task_runner_handle.h b/base/threading/thread_task_runner_handle.h index c8e58935f0..7ae85e6dcf 100644 --- a/base/threading/thread_task_runner_handle.h +++ b/base/threading/thread_task_runner_handle.h @@ -6,6 +6,7 @@ #define BASE_THREADING_THREAD_TASK_RUNNER_HANDLE_H_ #include "base/base_export.h" +#include "base/callback_helpers.h" #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/single_thread_task_runner.h" @@ -26,6 +27,17 @@ class BASE_EXPORT ThreadTaskRunnerHandle { // the current thread. static bool IsSet(); + // Overrides ThreadTaskRunnerHandle::Get()'s |task_runner_| to point at + // |overriding_task_runner| until the returned ScopedClosureRunner goes out of + // scope (instantiates a ThreadTaskRunnerHandle for that scope if |!IsSet()|). + // Nested overrides are allowed but callers must ensure the + // ScopedClosureRunners expire in LIFO (stack) order. Note: nesting + // ThreadTaskRunnerHandles isn't generally desired but it's useful in unit + // tests where multiple task runners can share the main thread for simplicity + // and determinism. + static ScopedClosureRunner OverrideForTesting( + scoped_refptr<SingleThreadTaskRunner> overriding_task_runner); + // Binds |task_runner| to the current thread. |task_runner| must belong // to the current thread for this to succeed. explicit ThreadTaskRunnerHandle( diff --git a/base/threading/thread_unittest.cc b/base/threading/thread_unittest.cc index b0fd26521a..af8347432b 100644 --- a/base/threading/thread_unittest.cc +++ b/base/threading/thread_unittest.cc @@ -5,13 +5,22 @@ #include "base/threading/thread.h" #include <stddef.h> +#include <stdint.h> #include <vector> #include "base/bind.h" -#include "base/location.h" +#include "base/debug/leak_annotations.h" +#include "base/macros.h" +#include "base/memory/ptr_util.h" +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" #include "base/single_thread_task_runner.h" #include "base/synchronization/waitable_event.h" +#include "base/test/gtest_util.h" +#include "base/third_party/dynamic_annotations/dynamic_annotations.h" +#include "base/threading/platform_thread.h" +#include "base/time/time.h" #include "build/build_config.h" #include "testing/gtest/include/gtest/gtest.h" #include "testing/platform_test.h" @@ -38,8 +47,11 @@ class SleepInsideInitThread : public Thread { init_called_ = true; } bool InitCalled() { return init_called_; } + private: bool init_called_; + + DISALLOW_COPY_AND_ASSIGN(SleepInsideInitThread); }; enum ThreadEvent { @@ -76,6 +88,8 @@ class CaptureToEventList : public Thread { private: EventList* event_list_; + + DISALLOW_COPY_AND_ASSIGN(CaptureToEventList); }; // Observer that writes a value into |event_list| when a message loop has been @@ -96,6 +110,8 @@ class CapturingDestructionObserver private: EventList* event_list_; + + DISALLOW_COPY_AND_ASSIGN(CapturingDestructionObserver); }; // Task that adds a destruction observer to the current message loop. @@ -115,59 +131,79 @@ void ReturnThreadId(base::Thread* thread, } // namespace -TEST_F(ThreadTest, Restart) { - Thread a("Restart"); - a.Stop(); - EXPECT_FALSE(a.message_loop()); - EXPECT_FALSE(a.IsRunning()); - EXPECT_TRUE(a.Start()); - EXPECT_TRUE(a.message_loop()); - EXPECT_TRUE(a.IsRunning()); - a.Stop(); - EXPECT_FALSE(a.message_loop()); - EXPECT_FALSE(a.IsRunning()); - EXPECT_TRUE(a.Start()); - EXPECT_TRUE(a.message_loop()); - EXPECT_TRUE(a.IsRunning()); - a.Stop(); - EXPECT_FALSE(a.message_loop()); - EXPECT_FALSE(a.IsRunning()); - a.Stop(); - EXPECT_FALSE(a.message_loop()); - EXPECT_FALSE(a.IsRunning()); -} - TEST_F(ThreadTest, StartWithOptions_StackSize) { Thread a("StartWithStackSize"); // Ensure that the thread can work with only 12 kb and still process a - // message. + // message. At the same time, we should scale with the bitness of the system + // where 12 kb is definitely not enough. + // 12 kb = 3072 Slots on a 32-bit system, so we'll scale based off of that. Thread::Options options; -#if defined(ADDRESS_SANITIZER) - // ASan bloats the stack variables and overflows the 12 kb stack. - options.stack_size = 24*1024; +#if defined(ADDRESS_SANITIZER) || !defined(NDEBUG) + // ASan bloats the stack variables and overflows the 3072 slot stack. Some + // debug builds also grow the stack too much. + options.stack_size = 2 * 3072 * sizeof(uintptr_t); #else - options.stack_size = 12*1024; + options.stack_size = 3072 * sizeof(uintptr_t); #endif EXPECT_TRUE(a.StartWithOptions(options)); EXPECT_TRUE(a.message_loop()); EXPECT_TRUE(a.IsRunning()); - bool was_invoked = false; - a.task_runner()->PostTask(FROM_HERE, base::Bind(&ToggleValue, &was_invoked)); + base::WaitableEvent event(base::WaitableEvent::ResetPolicy::AUTOMATIC, + base::WaitableEvent::InitialState::NOT_SIGNALED); + a.task_runner()->PostTask(FROM_HERE, base::Bind(&base::WaitableEvent::Signal, + base::Unretained(&event))); + event.Wait(); +} - // wait for the task to run (we could use a kernel event here - // instead to avoid busy waiting, but this is sufficient for - // testing purposes). - for (int i = 100; i >= 0 && !was_invoked; --i) { - base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10)); - } - EXPECT_TRUE(was_invoked); +// Intentional test-only race for otherwise untestable code, won't fix. +// https://crbug.com/634383 +#if !defined(THREAD_SANITIZER) +TEST_F(ThreadTest, StartWithOptions_NonJoinable) { + Thread* a = new Thread("StartNonJoinable"); + // Non-joinable threads have to be leaked for now (see + // Thread::Options::joinable for details). + ANNOTATE_LEAKING_OBJECT_PTR(a); + + Thread::Options options; + options.joinable = false; + EXPECT_TRUE(a->StartWithOptions(options)); + EXPECT_TRUE(a->message_loop()); + EXPECT_TRUE(a->IsRunning()); + + // Without this call this test is racy. The above IsRunning() succeeds because + // of an early-return condition while between Start() and StopSoon(), after + // invoking StopSoon() below this early-return condition is no longer + // satisfied and the real |is_running_| bit has to be checked. It could still + // be false if the message loop hasn't started for real in practice. This is + // only a requirement for this test because the non-joinable property forces + // it to use StopSoon() and not wait for a complete Stop(). + EXPECT_TRUE(a->WaitUntilThreadStarted()); + + // Make the thread block until |block_event| is signaled. + base::WaitableEvent block_event( + base::WaitableEvent::ResetPolicy::AUTOMATIC, + base::WaitableEvent::InitialState::NOT_SIGNALED); + a->task_runner()->PostTask( + FROM_HERE, + base::Bind(&base::WaitableEvent::Wait, base::Unretained(&block_event))); + + a->StopSoon(); + EXPECT_TRUE(a->IsRunning()); + + // Unblock the task and give a bit of extra time to unwind QuitWhenIdle(). + block_event.Signal(); + base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20)); + + // The thread should now have stopped on its own. + EXPECT_FALSE(a->IsRunning()); } +#endif -TEST_F(ThreadTest, TwoTasks) { +TEST_F(ThreadTest, TwoTasksOnJoinableThread) { bool was_invoked = false; { - Thread a("TwoTasks"); + Thread a("TwoTasksOnJoinableThread"); EXPECT_TRUE(a.Start()); EXPECT_TRUE(a.message_loop()); @@ -184,18 +220,164 @@ TEST_F(ThreadTest, TwoTasks) { EXPECT_TRUE(was_invoked); } +TEST_F(ThreadTest, DestroyWhileRunningIsSafe) { + Thread a("DestroyWhileRunningIsSafe"); + EXPECT_TRUE(a.Start()); + EXPECT_TRUE(a.WaitUntilThreadStarted()); +} + +// TODO(gab): Enable this test when destroying a non-joinable Thread instance +// is supported (proposal @ https://crbug.com/629139#c14). +TEST_F(ThreadTest, DISABLED_DestroyWhileRunningNonJoinableIsSafe) { + { + Thread a("DestroyWhileRunningNonJoinableIsSafe"); + Thread::Options options; + options.joinable = false; + EXPECT_TRUE(a.StartWithOptions(options)); + EXPECT_TRUE(a.WaitUntilThreadStarted()); + } + + // Attempt to catch use-after-frees from the non-joinable thread in the + // scope of this test if any. + base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20)); +} + TEST_F(ThreadTest, StopSoon) { Thread a("StopSoon"); EXPECT_TRUE(a.Start()); EXPECT_TRUE(a.message_loop()); EXPECT_TRUE(a.IsRunning()); a.StopSoon(); + a.Stop(); + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); +} + +TEST_F(ThreadTest, StopTwiceNop) { + Thread a("StopTwiceNop"); + EXPECT_TRUE(a.Start()); + EXPECT_TRUE(a.message_loop()); + EXPECT_TRUE(a.IsRunning()); + a.StopSoon(); + // Calling StopSoon() a second time should be a nop. a.StopSoon(); a.Stop(); + // Same with Stop(). + a.Stop(); + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); + // Calling them when not running should also nop. + a.StopSoon(); + a.Stop(); +} + +// TODO(gab): Enable this test in conjunction with re-enabling the sequence +// check in Thread::Stop() as part of http://crbug.com/629139. +TEST_F(ThreadTest, DISABLED_StopOnNonOwningThreadIsDeath) { + Thread a("StopOnNonOwningThreadDeath"); + EXPECT_TRUE(a.StartAndWaitForTesting()); + + Thread b("NonOwningThread"); + b.Start(); + EXPECT_DCHECK_DEATH({ + // Stopping |a| on |b| isn't allowed. + b.task_runner()->PostTask(FROM_HERE, + base::Bind(&Thread::Stop, base::Unretained(&a))); + // Block here so the DCHECK on |b| always happens in this scope. + base::PlatformThread::Sleep(base::TimeDelta::Max()); + }); +} + +TEST_F(ThreadTest, TransferOwnershipAndStop) { + std::unique_ptr<Thread> a = + base::MakeUnique<Thread>("TransferOwnershipAndStop"); + EXPECT_TRUE(a->StartAndWaitForTesting()); + EXPECT_TRUE(a->IsRunning()); + + Thread b("TakingOwnershipThread"); + b.Start(); + + base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL, + base::WaitableEvent::InitialState::NOT_SIGNALED); + + // a->DetachFromSequence() should allow |b| to use |a|'s Thread API. + a->DetachFromSequence(); + b.task_runner()->PostTask( + FROM_HERE, base::Bind( + [](std::unique_ptr<Thread> thread_to_stop, + base::WaitableEvent* event_to_signal) -> void { + thread_to_stop->Stop(); + event_to_signal->Signal(); + }, + base::Passed(&a), base::Unretained(&event))); + + event.Wait(); +} + +TEST_F(ThreadTest, StartTwice) { + Thread a("StartTwice"); + + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); + + EXPECT_TRUE(a.Start()); + EXPECT_TRUE(a.message_loop()); + EXPECT_TRUE(a.IsRunning()); + + a.Stop(); + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); + + EXPECT_TRUE(a.Start()); + EXPECT_TRUE(a.message_loop()); + EXPECT_TRUE(a.IsRunning()); + + a.Stop(); EXPECT_FALSE(a.message_loop()); EXPECT_FALSE(a.IsRunning()); } +// Intentional test-only race for otherwise untestable code, won't fix. +// https://crbug.com/634383 +#if !defined(THREAD_SANITIZER) +TEST_F(ThreadTest, StartTwiceNonJoinableNotAllowed) { + LOG(ERROR) << __FUNCTION__; + Thread* a = new Thread("StartTwiceNonJoinable"); + // Non-joinable threads have to be leaked for now (see + // Thread::Options::joinable for details). + ANNOTATE_LEAKING_OBJECT_PTR(a); + + Thread::Options options; + options.joinable = false; + EXPECT_TRUE(a->StartWithOptions(options)); + EXPECT_TRUE(a->message_loop()); + EXPECT_TRUE(a->IsRunning()); + + // Signaled when last task on |a| is processed. + base::WaitableEvent last_task_event( + base::WaitableEvent::ResetPolicy::AUTOMATIC, + base::WaitableEvent::InitialState::NOT_SIGNALED); + a->task_runner()->PostTask(FROM_HERE, + base::Bind(&base::WaitableEvent::Signal, + base::Unretained(&last_task_event))); + + // StopSoon() is non-blocking, Yield() to |a|, wait for last task to be + // processed and a little more for QuitWhenIdle() to unwind before considering + // the thread "stopped". + a->StopSoon(); + base::PlatformThread::YieldCurrentThread(); + last_task_event.Wait(); + base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20)); + + // This test assumes that the above was sufficient to let the thread fully + // stop. + ASSERT_FALSE(a->IsRunning()); + + // Restarting it should not be allowed. + EXPECT_DCHECK_DEATH(a->Start()); +} +#endif + TEST_F(ThreadTest, ThreadName) { Thread a("ThreadName"); EXPECT_TRUE(a.Start()); @@ -297,3 +479,91 @@ TEST_F(ThreadTest, MultipleWaitUntilThreadStarted) { EXPECT_TRUE(a.WaitUntilThreadStarted()); EXPECT_TRUE(a.WaitUntilThreadStarted()); } + +TEST_F(ThreadTest, FlushForTesting) { + Thread a("FlushForTesting"); + + // Flushing a non-running thread should be a no-op. + a.FlushForTesting(); + + ASSERT_TRUE(a.Start()); + + // Flushing a thread with no tasks shouldn't block. + a.FlushForTesting(); + + constexpr base::TimeDelta kSleepPerTestTask = + base::TimeDelta::FromMilliseconds(50); + constexpr size_t kNumSleepTasks = 5; + + const base::TimeTicks ticks_before_post = base::TimeTicks::Now(); + + for (size_t i = 0; i < kNumSleepTasks; ++i) { + a.task_runner()->PostTask( + FROM_HERE, base::Bind(&base::PlatformThread::Sleep, kSleepPerTestTask)); + } + + // All tasks should have executed, as reflected by the elapsed time. + a.FlushForTesting(); + EXPECT_GE(base::TimeTicks::Now() - ticks_before_post, + kNumSleepTasks * kSleepPerTestTask); + + a.Stop(); + + // Flushing a stopped thread should be a no-op. + a.FlushForTesting(); +} + +namespace { + +// A Thread which uses a MessageLoop on the stack. It won't start a real +// underlying thread (instead its messages can be processed by a RunLoop on the +// stack). +class ExternalMessageLoopThread : public Thread { + public: + ExternalMessageLoopThread() : Thread("ExternalMessageLoopThread") {} + + ~ExternalMessageLoopThread() override { Stop(); } + + void InstallMessageLoop() { SetMessageLoop(&external_message_loop_); } + + void VerifyUsingExternalMessageLoop( + bool expected_using_external_message_loop) { + EXPECT_EQ(expected_using_external_message_loop, + using_external_message_loop()); + } + + private: + base::MessageLoop external_message_loop_; + + DISALLOW_COPY_AND_ASSIGN(ExternalMessageLoopThread); +}; + +} // namespace + +TEST_F(ThreadTest, ExternalMessageLoop) { + ExternalMessageLoopThread a; + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); + a.VerifyUsingExternalMessageLoop(false); + + a.InstallMessageLoop(); + EXPECT_TRUE(a.message_loop()); + EXPECT_TRUE(a.IsRunning()); + a.VerifyUsingExternalMessageLoop(true); + + bool ran = false; + a.task_runner()->PostTask( + FROM_HERE, base::Bind([](bool* toggled) { *toggled = true; }, &ran)); + base::RunLoop().RunUntilIdle(); + EXPECT_TRUE(ran); + + a.Stop(); + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); + a.VerifyUsingExternalMessageLoop(true); + + // Confirm that running any remaining tasks posted from Stop() goes smoothly + // (e.g. https://codereview.chromium.org/2135413003/#ps300001 crashed if + // StopSoon() posted Thread::ThreadQuitHelper() while |run_loop_| was null). + base::RunLoop().RunUntilIdle(); +} diff --git a/base/threading/worker_pool.cc b/base/threading/worker_pool.cc index 0b7bf8eca1..d47037d79a 100644 --- a/base/threading/worker_pool.cc +++ b/base/threading/worker_pool.cc @@ -4,9 +4,11 @@ #include "base/threading/worker_pool.h" +#include <utility> + #include "base/bind.h" #include "base/compiler_specific.h" -#include "base/lazy_instance.h" +#include "base/debug/leak_annotations.h" #include "base/macros.h" #include "base/task_runner.h" #include "base/threading/post_task_and_reply_impl.h" @@ -97,27 +99,27 @@ struct TaskRunnerHolder { scoped_refptr<TaskRunner> taskrunners_[2]; }; -base::LazyInstance<TaskRunnerHolder>::Leaky - g_taskrunners = LAZY_INSTANCE_INITIALIZER; - } // namespace bool WorkerPool::PostTaskAndReply(const tracked_objects::Location& from_here, - const Closure& task, - const Closure& reply, + Closure task, + Closure reply, bool task_is_slow) { // Do not report PostTaskAndReplyRelay leaks in tests. There's nothing we can // do about them because WorkerPool doesn't have a flushing API. // http://crbug.com/248513 // http://crbug.com/290897 - return PostTaskAndReplyWorkerPool(task_is_slow).PostTaskAndReply( - from_here, task, reply); + // Note: this annotation does not cover tasks posted through a TaskRunner. + ANNOTATE_SCOPED_MEMORY_LEAK; + return PostTaskAndReplyWorkerPool(task_is_slow) + .PostTaskAndReply(from_here, std::move(task), std::move(reply)); } // static const scoped_refptr<TaskRunner>& WorkerPool::GetTaskRunner(bool tasks_are_slow) { - return g_taskrunners.Get().taskrunners_[tasks_are_slow]; + static auto* task_runner_holder = new TaskRunnerHolder(); + return task_runner_holder->taskrunners_[tasks_are_slow]; } } // namespace base diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h index a52a41428b..865948e437 100644 --- a/base/threading/worker_pool.h +++ b/base/threading/worker_pool.h @@ -6,11 +6,9 @@ #define BASE_THREADING_WORKER_POOL_H_ #include "base/base_export.h" -#include "base/callback_forward.h" +#include "base/callback.h" #include "base/memory/ref_counted.h" -class Task; - namespace tracked_objects { class Location; } // namespace tracked_objects @@ -40,8 +38,8 @@ class BASE_EXPORT WorkerPool { // for |task| is a worker thread and you can specify |task_is_slow| just // like you can for PostTask above. static bool PostTaskAndReply(const tracked_objects::Location& from_here, - const Closure& task, - const Closure& reply, + Closure task, + Closure reply, bool task_is_slow); // Return true if the current thread is one that this WorkerPool runs tasks diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc index 6b4c42f601..0e19a1a0fe 100644 --- a/base/threading/worker_pool_posix.cc +++ b/base/threading/worker_pool_posix.cc @@ -30,10 +30,21 @@ base::LazyInstance<ThreadLocalBoolean>::Leaky const int kIdleSecondsBeforeExit = 10 * 60; +#if defined(OS_MACOSX) +// On Mac OS X a background thread's default stack size is 512Kb. We need at +// least 1MB for compilation tasks in V8, so increase this default. +const int kStackSize = 1 * 1024 * 1024; +#else +const int kStackSize = 0; +#endif + class WorkerPoolImpl { public: WorkerPoolImpl(); - ~WorkerPoolImpl(); + + // WorkerPoolImpl is only instantiated as a leaky LazyInstance, so the + // destructor is never called. + ~WorkerPoolImpl() = delete; void PostTask(const tracked_objects::Location& from_here, const base::Closure& task, @@ -47,17 +58,13 @@ WorkerPoolImpl::WorkerPoolImpl() : pool_(new base::PosixDynamicThreadPool("WorkerPool", kIdleSecondsBeforeExit)) {} -WorkerPoolImpl::~WorkerPoolImpl() { - pool_->Terminate(); -} - void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, const base::Closure& task, bool /*task_is_slow*/) { pool_->PostTask(from_here, task); } -base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool = +base::LazyInstance<WorkerPoolImpl>::Leaky g_lazy_worker_pool = LAZY_INSTANCE_INITIALIZER; class WorkerThread : public PlatformThread::Delegate { @@ -90,7 +97,7 @@ void WorkerThread::ThreadMain() { tracked_objects::TaskStopwatch stopwatch; stopwatch.Start(); - pending_task.task.Run(); + std::move(pending_task.task).Run(); stopwatch.Stop(); tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking( @@ -121,23 +128,13 @@ PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix, : name_prefix_(name_prefix), idle_seconds_before_exit_(idle_seconds_before_exit), pending_tasks_available_cv_(&lock_), - num_idle_threads_(0), - terminated_(false) {} + num_idle_threads_(0) {} PosixDynamicThreadPool::~PosixDynamicThreadPool() { while (!pending_tasks_.empty()) pending_tasks_.pop(); } -void PosixDynamicThreadPool::Terminate() { - { - AutoLock locked(lock_); - DCHECK(!terminated_) << "Thread pool is already terminated."; - terminated_ = true; - } - pending_tasks_available_cv_.Broadcast(); -} - void PosixDynamicThreadPool::PostTask( const tracked_objects::Location& from_here, const base::Closure& task) { @@ -147,8 +144,6 @@ void PosixDynamicThreadPool::PostTask( void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { AutoLock locked(lock_); - DCHECK(!terminated_) - << "This thread pool is already terminated. Do not post new tasks."; pending_tasks_.push(std::move(*pending_task)); @@ -159,16 +154,13 @@ void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { // The new PlatformThread will take ownership of the WorkerThread object, // which will delete itself on exit. WorkerThread* worker = new WorkerThread(name_prefix_, this); - PlatformThread::CreateNonJoinable(0, worker); + PlatformThread::CreateNonJoinable(kStackSize, worker); } } PendingTask PosixDynamicThreadPool::WaitForTask() { AutoLock locked(lock_); - if (terminated_) - return PendingTask(FROM_HERE, base::Closure()); - if (pending_tasks_.empty()) { // No work available, wait for work. num_idle_threads_++; if (num_idle_threads_cv_.get()) diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h index 628e2b6420..d65ae8f8cf 100644 --- a/base/threading/worker_pool_posix.h +++ b/base/threading/worker_pool_posix.h @@ -38,8 +38,6 @@ #include "base/threading/platform_thread.h" #include "base/tracked_objects.h" -class Task; - namespace base { class BASE_EXPORT PosixDynamicThreadPool @@ -52,10 +50,6 @@ class BASE_EXPORT PosixDynamicThreadPool PosixDynamicThreadPool(const std::string& name_prefix, int idle_seconds_before_exit); - // Indicates that the thread pool is going away. Stops handing out tasks to - // worker threads. Wakes up all the idle threads to let them exit. - void Terminate(); - // Adds |task| to the thread pool. void PostTask(const tracked_objects::Location& from_here, const Closure& task); @@ -85,7 +79,6 @@ class BASE_EXPORT PosixDynamicThreadPool ConditionVariable pending_tasks_available_cv_; int num_idle_threads_; TaskQueue pending_tasks_; - bool terminated_; // Only used for tests to ensure correct thread ordering. It will always be // NULL in non-test code. std::unique_ptr<ConditionVariable> num_idle_threads_cv_; diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc index 6cefeed34e..b4e8b58520 100644 --- a/base/threading/worker_pool_posix_unittest.cc +++ b/base/threading/worker_pool_posix_unittest.cc @@ -103,12 +103,6 @@ class PosixDynamicThreadPoolTest : public testing::Test { peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock())); } - void TearDown() override { - // Wake up the idle threads so they can terminate. - if (pool_.get()) - pool_->Terminate(); - } - void WaitForTasksToStart(int num_tasks) { base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_); while (num_waiting_to_start_ < num_tasks) { |