summaryrefslogtreecommitdiff
path: root/base/threading
diff options
context:
space:
mode:
Diffstat (limited to 'base/threading')
-rw-r--r--base/threading/non_thread_safe.h15
-rw-r--r--base/threading/non_thread_safe_unittest.cc25
-rw-r--r--base/threading/platform_thread.h30
-rw-r--r--base/threading/platform_thread_linux.cc79
-rw-r--r--base/threading/platform_thread_mac.mm5
-rw-r--r--base/threading/platform_thread_posix.cc34
-rw-r--r--base/threading/platform_thread_unittest.cc20
-rw-r--r--base/threading/post_task_and_reply_impl.cc70
-rw-r--r--base/threading/post_task_and_reply_impl.h27
-rw-r--r--base/threading/sequenced_task_runner_handle.cc47
-rw-r--r--base/threading/sequenced_task_runner_handle.h5
-rw-r--r--base/threading/sequenced_worker_pool.cc571
-rw-r--r--base/threading/sequenced_worker_pool.h100
-rw-r--r--base/threading/simple_thread.cc59
-rw-r--r--base/threading/simple_thread.h60
-rw-r--r--base/threading/simple_thread_unittest.cc107
-rw-r--r--base/threading/thread.cc206
-rw-r--r--base/threading/thread.h141
-rw-r--r--base/threading/thread_checker.h22
-rw-r--r--base/threading/thread_checker_impl.cc47
-rw-r--r--base/threading/thread_checker_impl.h36
-rw-r--r--base/threading/thread_checker_unittest.cc268
-rw-r--r--base/threading/thread_local.h93
-rw-r--r--base/threading/thread_local_posix.cc43
-rw-r--r--base/threading/thread_local_storage.cc271
-rw-r--r--base/threading/thread_local_storage.h28
-rw-r--r--base/threading/thread_local_storage_unittest.cc10
-rw-r--r--base/threading/thread_restrictions.cc22
-rw-r--r--base/threading/thread_restrictions.h39
-rw-r--r--base/threading/thread_task_runner_handle.cc46
-rw-r--r--base/threading/thread_task_runner_handle.h12
-rw-r--r--base/threading/thread_unittest.cc348
-rw-r--r--base/threading/worker_pool.cc20
-rw-r--r--base/threading/worker_pool.h8
-rw-r--r--base/threading/worker_pool_posix.cc40
-rw-r--r--base/threading/worker_pool_posix.h7
-rw-r--r--base/threading/worker_pool_posix_unittest.cc6
37 files changed, 2005 insertions, 962 deletions
diff --git a/base/threading/non_thread_safe.h b/base/threading/non_thread_safe.h
index d41c08608c..64ae8e4da9 100644
--- a/base/threading/non_thread_safe.h
+++ b/base/threading/non_thread_safe.h
@@ -10,14 +10,7 @@
// There is a specific macro to do it: NON_EXPORTED_BASE(), defined in
// compiler_specific.h
#include "base/compiler_specific.h"
-
-// See comment at top of thread_checker.h
-#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
-#define ENABLE_NON_THREAD_SAFE 1
-#else
-#define ENABLE_NON_THREAD_SAFE 0
-#endif
-
+#include "base/logging.h"
#include "base/threading/non_thread_safe_impl.h"
namespace base {
@@ -58,13 +51,11 @@ class NonThreadSafeDoNothing {
// to have a base::ThreadChecker as a member, rather than inherit from
// NonThreadSafe. For more details about when to choose one over the other, see
// the documentation for base::ThreadChecker.
-#if ENABLE_NON_THREAD_SAFE
+#if DCHECK_IS_ON()
typedef NonThreadSafeImpl NonThreadSafe;
#else
typedef NonThreadSafeDoNothing NonThreadSafe;
-#endif // ENABLE_NON_THREAD_SAFE
-
-#undef ENABLE_NON_THREAD_SAFE
+#endif // DCHECK_IS_ON()
} // namespace base
diff --git a/base/threading/non_thread_safe_unittest.cc b/base/threading/non_thread_safe_unittest.cc
index d523fc55b1..5752d5f2b3 100644
--- a/base/threading/non_thread_safe_unittest.cc
+++ b/base/threading/non_thread_safe_unittest.cc
@@ -8,6 +8,7 @@
#include "base/logging.h"
#include "base/macros.h"
+#include "base/test/gtest_util.h"
#include "base/threading/simple_thread.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -106,8 +107,6 @@ TEST(NonThreadSafeTest, DetachThenDestructOnDifferentThread) {
delete_on_thread.Join();
}
-#if GTEST_HAS_DEATH_TEST || !ENABLE_NON_THREAD_SAFE
-
void NonThreadSafeClass::MethodOnDifferentThreadImpl() {
std::unique_ptr<NonThreadSafeClass> non_thread_safe_class(
new NonThreadSafeClass);
@@ -120,17 +119,15 @@ void NonThreadSafeClass::MethodOnDifferentThreadImpl() {
call_on_thread.Join();
}
-#if ENABLE_NON_THREAD_SAFE
+#if DCHECK_IS_ON()
TEST(NonThreadSafeDeathTest, MethodNotAllowedOnDifferentThreadInDebug) {
- ASSERT_DEATH({
- NonThreadSafeClass::MethodOnDifferentThreadImpl();
- }, "");
+ ASSERT_DCHECK_DEATH({ NonThreadSafeClass::MethodOnDifferentThreadImpl(); });
}
#else
TEST(NonThreadSafeTest, MethodAllowedOnDifferentThreadInRelease) {
NonThreadSafeClass::MethodOnDifferentThreadImpl();
}
-#endif // ENABLE_NON_THREAD_SAFE
+#endif // DCHECK_IS_ON()
void NonThreadSafeClass::DestructorOnDifferentThreadImpl() {
std::unique_ptr<NonThreadSafeClass> non_thread_safe_class(
@@ -145,21 +142,15 @@ void NonThreadSafeClass::DestructorOnDifferentThreadImpl() {
delete_on_thread.Join();
}
-#if ENABLE_NON_THREAD_SAFE
+#if DCHECK_IS_ON()
TEST(NonThreadSafeDeathTest, DestructorNotAllowedOnDifferentThreadInDebug) {
- ASSERT_DEATH({
- NonThreadSafeClass::DestructorOnDifferentThreadImpl();
- }, "");
+ ASSERT_DCHECK_DEATH(
+ { NonThreadSafeClass::DestructorOnDifferentThreadImpl(); });
}
#else
TEST(NonThreadSafeTest, DestructorAllowedOnDifferentThreadInRelease) {
NonThreadSafeClass::DestructorOnDifferentThreadImpl();
}
-#endif // ENABLE_NON_THREAD_SAFE
-
-#endif // GTEST_HAS_DEATH_TEST || !ENABLE_NON_THREAD_SAFE
-
-// Just in case we ever get lumped together with other compilation units.
-#undef ENABLE_NON_THREAD_SAFE
+#endif // DCHECK_IS_ON()
} // namespace base
diff --git a/base/threading/platform_thread.h b/base/threading/platform_thread.h
index 9b217a9c65..8c0d8e4432 100644
--- a/base/threading/platform_thread.h
+++ b/base/threading/platform_thread.h
@@ -18,6 +18,8 @@
#if defined(OS_WIN)
#include <windows.h>
+#elif defined(OS_MACOSX)
+#include <mach/mach_types.h>
#elif defined(OS_POSIX)
#include <pthread.h>
#include <unistd.h>
@@ -28,6 +30,8 @@ namespace base {
// Used for logging. Always an integer value.
#if defined(OS_WIN)
typedef DWORD PlatformThreadId;
+#elif defined(OS_MACOSX)
+typedef mach_port_t PlatformThreadId;
#elif defined(OS_POSIX)
typedef pid_t PlatformThreadId;
#endif
@@ -59,6 +63,8 @@ class PlatformThreadRef {
return id_ == other.id_;
}
+ bool operator!=(PlatformThreadRef other) const { return id_ != other.id_; }
+
bool is_null() const {
return id_ == 0;
}
@@ -175,6 +181,12 @@ class BASE_EXPORT PlatformThread {
// PlatformThreadHandle.
static bool CreateNonJoinable(size_t stack_size, Delegate* delegate);
+ // CreateNonJoinableWithPriority() does the same thing as CreateNonJoinable()
+ // except the priority of the thread is set based on |priority|.
+ static bool CreateNonJoinableWithPriority(size_t stack_size,
+ Delegate* delegate,
+ ThreadPriority priority);
+
// Joins with a thread created via the Create function. This function blocks
// the caller until the designated thread exits. This will invalidate
// |thread_handle|.
@@ -184,6 +196,10 @@ class BASE_EXPORT PlatformThread {
// and |thread_handle| is invalidated after this call.
static void Detach(PlatformThreadHandle thread_handle);
+ // Returns true if SetCurrentThreadPriority() can be used to increase the
+ // priority of the current thread.
+ static bool CanIncreaseCurrentThreadPriority();
+
// Toggles the current thread's priority at runtime. A thread may not be able
// to raise its priority back up after lowering it if the process does not
// have a proper permission, e.g. CAP_SYS_NICE on Linux. A thread may not be
@@ -195,6 +211,20 @@ class BASE_EXPORT PlatformThread {
static ThreadPriority GetCurrentThreadPriority();
+#if defined(OS_LINUX)
+ // Toggles a specific thread's priority at runtime. This can be used to
+ // change the priority of a thread in a different process and will fail
+ // if the calling process does not have proper permissions. The
+ // SetCurrentThreadPriority() function above is preferred in favor of
+ // security but on platforms where sandboxed processes are not allowed to
+ // change priority this function exists to allow a non-sandboxed process
+ // to change the priority of sandboxed threads for improved performance.
+ // Warning: Don't use this for a main thread because that will change the
+ // whole thread group's (i.e. process) priority.
+ static void SetThreadPriority(PlatformThreadId thread_id,
+ ThreadPriority priority);
+#endif
+
private:
DISALLOW_IMPLICIT_CONSTRUCTORS(PlatformThread);
};
diff --git a/base/threading/platform_thread_linux.cc b/base/threading/platform_thread_linux.cc
index ab7c97ef51..474410f18a 100644
--- a/base/threading/platform_thread_linux.cc
+++ b/base/threading/platform_thread_linux.cc
@@ -8,8 +8,10 @@
#include <sched.h>
#include <stddef.h>
+#include "base/files/file_util.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
+#include "base/strings/string_number_conversions.h"
#include "base/threading/platform_thread_internal_posix.h"
#include "base/threading/thread_id_name_manager.h"
#include "base/tracked_objects.h"
@@ -18,11 +20,68 @@
#if !defined(OS_NACL)
#include <pthread.h>
#include <sys/prctl.h>
+#include <sys/resource.h>
+#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#endif
namespace base {
+namespace {
+#if !defined(OS_NACL)
+const FilePath::CharType kCgroupDirectory[] =
+ FILE_PATH_LITERAL("/sys/fs/cgroup");
+
+FilePath ThreadPriorityToCgroupDirectory(const FilePath& cgroup_filepath,
+ ThreadPriority priority) {
+ switch (priority) {
+ case ThreadPriority::NORMAL:
+ return cgroup_filepath;
+ case ThreadPriority::BACKGROUND:
+ return cgroup_filepath.Append(FILE_PATH_LITERAL("non-urgent"));
+ case ThreadPriority::DISPLAY:
+ case ThreadPriority::REALTIME_AUDIO:
+ return cgroup_filepath.Append(FILE_PATH_LITERAL("urgent"));
+ }
+ NOTREACHED();
+ return FilePath();
+}
+
+void SetThreadCgroup(PlatformThreadId thread_id,
+ const FilePath& cgroup_directory) {
+ FilePath tasks_filepath = cgroup_directory.Append(FILE_PATH_LITERAL("tasks"));
+ std::string tid = IntToString(thread_id);
+ int bytes_written = WriteFile(tasks_filepath, tid.c_str(), tid.size());
+ if (bytes_written != static_cast<int>(tid.size())) {
+ DVLOG(1) << "Failed to add " << tid << " to " << tasks_filepath.value();
+ }
+}
+
+void SetThreadCgroupForThreadPriority(PlatformThreadId thread_id,
+ const FilePath& cgroup_filepath,
+ ThreadPriority priority) {
+ // Append "chrome" suffix.
+ FilePath cgroup_directory = ThreadPriorityToCgroupDirectory(
+ cgroup_filepath.Append(FILE_PATH_LITERAL("chrome")), priority);
+
+ // Silently ignore request if cgroup directory doesn't exist.
+ if (!DirectoryExists(cgroup_directory))
+ return;
+
+ SetThreadCgroup(thread_id, cgroup_directory);
+}
+
+void SetThreadCgroupsForThreadPriority(PlatformThreadId thread_id,
+ ThreadPriority priority) {
+ FilePath cgroup_filepath(kCgroupDirectory);
+ SetThreadCgroupForThreadPriority(
+ thread_id, cgroup_filepath.Append(FILE_PATH_LITERAL("cpuset")), priority);
+ SetThreadCgroupForThreadPriority(
+ thread_id, cgroup_filepath.Append(FILE_PATH_LITERAL("schedtune")),
+ priority);
+}
+#endif
+} // namespace
namespace internal {
@@ -41,6 +100,7 @@ const ThreadPriorityToNiceValuePair kThreadPriorityToNiceValueMap[4] = {
bool SetCurrentThreadPriorityForPlatform(ThreadPriority priority) {
#if !defined(OS_NACL)
+ SetThreadCgroupsForThreadPriority(PlatformThread::CurrentId(), priority);
return priority == ThreadPriority::REALTIME_AUDIO &&
pthread_setschedparam(pthread_self(), SCHED_RR, &kRealTimePrio) == 0;
#else
@@ -90,6 +150,25 @@ void PlatformThread::SetName(const std::string& name) {
#endif // !defined(OS_NACL)
}
+#if !defined(OS_NACL)
+// static
+void PlatformThread::SetThreadPriority(PlatformThreadId thread_id,
+ ThreadPriority priority) {
+ // Changing current main threads' priority is not permitted in favor of
+ // security, this interface is restricted to change only non-main thread
+ // priority.
+ CHECK_NE(thread_id, getpid());
+
+ SetThreadCgroupsForThreadPriority(thread_id, priority);
+
+ const int nice_setting = internal::ThreadPriorityToNiceValue(priority);
+ if (setpriority(PRIO_PROCESS, thread_id, nice_setting)) {
+ DVPLOG(1) << "Failed to set nice value of thread (" << thread_id << ") to "
+ << nice_setting;
+ }
+}
+#endif // !defined(OS_NACL)
+
void InitThreading() {}
void TerminateOnThread() {}
diff --git a/base/threading/platform_thread_mac.mm b/base/threading/platform_thread_mac.mm
index 51f3621af2..e743044ec1 100644
--- a/base/threading/platform_thread_mac.mm
+++ b/base/threading/platform_thread_mac.mm
@@ -162,6 +162,11 @@ void SetPriorityRealtimeAudio(mach_port_t mach_thread_id) {
} // anonymous namespace
// static
+bool PlatformThread::CanIncreaseCurrentThreadPriority() {
+ return true;
+}
+
+// static
void PlatformThread::SetCurrentThreadPriority(ThreadPriority priority) {
// Convert from pthread_t to mach thread identifier.
mach_port_t mach_thread_id =
diff --git a/base/threading/platform_thread_posix.cc b/base/threading/platform_thread_posix.cc
index 2321b3cd49..9a6a2bb999 100644
--- a/base/threading/platform_thread_posix.cc
+++ b/base/threading/platform_thread_posix.cc
@@ -11,9 +11,12 @@
#include <stdint.h>
#include <sys/resource.h>
#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
#include <memory>
+#include "base/debug/activity_tracker.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/threading/platform_thread_internal_posix.h"
@@ -23,8 +26,6 @@
#if defined(OS_LINUX)
#include <sys/syscall.h>
-#elif defined(OS_ANDROID)
-#include <sys/types.h>
#endif
namespace base {
@@ -187,21 +188,32 @@ const char* PlatformThread::GetName() {
bool PlatformThread::CreateWithPriority(size_t stack_size, Delegate* delegate,
PlatformThreadHandle* thread_handle,
ThreadPriority priority) {
- return CreateThread(stack_size, true, // joinable thread
- delegate, thread_handle, priority);
+ return CreateThread(stack_size, true /* joinable thread */, delegate,
+ thread_handle, priority);
}
// static
bool PlatformThread::CreateNonJoinable(size_t stack_size, Delegate* delegate) {
+ return CreateNonJoinableWithPriority(stack_size, delegate,
+ ThreadPriority::NORMAL);
+}
+
+// static
+bool PlatformThread::CreateNonJoinableWithPriority(size_t stack_size,
+ Delegate* delegate,
+ ThreadPriority priority) {
PlatformThreadHandle unused;
bool result = CreateThread(stack_size, false /* non-joinable thread */,
- delegate, &unused, ThreadPriority::NORMAL);
+ delegate, &unused, priority);
return result;
}
// static
void PlatformThread::Join(PlatformThreadHandle thread_handle) {
+ // Record the event that this thread is blocking upon (for hang diagnosis).
+ base::debug::ScopedThreadJoinActivity thread_activity(&thread_handle);
+
// Joining another thread may block the current thread for a long time, since
// the thread referred to by |thread_handle| may still be running long-lived /
// blocking tasks.
@@ -218,6 +230,18 @@ void PlatformThread::Detach(PlatformThreadHandle thread_handle) {
#if !defined(OS_MACOSX)
// static
+bool PlatformThread::CanIncreaseCurrentThreadPriority() {
+#if defined(OS_NACL)
+ return false;
+#else
+ // Only root can raise thread priority on POSIX environment. On Linux, users
+ // who have CAP_SYS_NICE permission also can raise the thread priority, but
+ // libcap.so would be needed to check the capability.
+ return geteuid() == 0;
+#endif // defined(OS_NACL)
+}
+
+// static
void PlatformThread::SetCurrentThreadPriority(ThreadPriority priority) {
#if defined(OS_NACL)
NOTIMPLEMENTED();
diff --git a/base/threading/platform_thread_unittest.cc b/base/threading/platform_thread_unittest.cc
index 2d99ed8750..0febf8ba9b 100644
--- a/base/threading/platform_thread_unittest.cc
+++ b/base/threading/platform_thread_unittest.cc
@@ -12,8 +12,6 @@
#include "testing/gtest/include/gtest/gtest.h"
#if defined(OS_POSIX)
-#include <sys/types.h>
-#include <unistd.h>
#include "base/threading/platform_thread_internal_posix.h"
#elif defined(OS_WIN)
#include <windows.h>
@@ -235,17 +233,6 @@ const ThreadPriority kThreadPriorityTestValues[] = {
ThreadPriority::NORMAL,
ThreadPriority::BACKGROUND};
-bool IsBumpingPriorityAllowed() {
-#if defined(OS_POSIX)
- // Only root can raise thread priority on POSIX environment. On Linux, users
- // who have CAP_SYS_NICE permission also can raise the thread priority, but
- // libcap.so would be needed to check the capability.
- return geteuid() == 0;
-#else
- return true;
-#endif
-}
-
class ThreadPriorityTestThread : public FunctionTestThread {
public:
explicit ThreadPriorityTestThread(ThreadPriority priority)
@@ -273,8 +260,9 @@ class ThreadPriorityTestThread : public FunctionTestThread {
// Test changing a created thread's priority (which has different semantics on
// some platforms).
TEST(PlatformThreadTest, ThreadPriorityCurrentThread) {
- const bool bumping_priority_allowed = IsBumpingPriorityAllowed();
- if (bumping_priority_allowed) {
+ const bool increase_priority_allowed =
+ PlatformThread::CanIncreaseCurrentThreadPriority();
+ if (increase_priority_allowed) {
// Bump the priority in order to verify that new threads are started with
// normal priority.
PlatformThread::SetCurrentThreadPriority(ThreadPriority::DISPLAY);
@@ -282,7 +270,7 @@ TEST(PlatformThreadTest, ThreadPriorityCurrentThread) {
// Toggle each supported priority on the thread and confirm it affects it.
for (size_t i = 0; i < arraysize(kThreadPriorityTestValues); ++i) {
- if (!bumping_priority_allowed &&
+ if (!increase_priority_allowed &&
kThreadPriorityTestValues[i] >
PlatformThread::GetCurrentThreadPriority()) {
continue;
diff --git a/base/threading/post_task_and_reply_impl.cc b/base/threading/post_task_and_reply_impl.cc
index c906866cfb..d16f8bd225 100644
--- a/base/threading/post_task_and_reply_impl.cc
+++ b/base/threading/post_task_and_reply_impl.cc
@@ -4,42 +4,46 @@
#include "base/threading/post_task_and_reply_impl.h"
+#include <utility>
+
#include "base/bind.h"
-#include "base/location.h"
-#include "base/single_thread_task_runner.h"
-#include "base/threading/thread_task_runner_handle.h"
+#include "base/debug/leak_annotations.h"
+#include "base/logging.h"
+#include "base/memory/ref_counted.h"
+#include "base/sequence_checker.h"
+#include "base/sequenced_task_runner.h"
+#include "base/threading/sequenced_task_runner_handle.h"
namespace base {
namespace {
-// This relay class remembers the MessageLoop that it was created on, and
-// ensures that both the |task| and |reply| Closures are deleted on this same
-// thread. Also, |task| is guaranteed to be deleted before |reply| is run or
-// deleted.
+// This relay class remembers the sequence that it was created on, and ensures
+// that both the |task| and |reply| Closures are deleted on this same sequence.
+// Also, |task| is guaranteed to be deleted before |reply| is run or deleted.
//
-// If this is not possible because the originating MessageLoop is no longer
-// available, the the |task| and |reply| Closures are leaked. Leaking is
-// considered preferable to having a thread-safetey violations caused by
-// invoking the Closure destructor on the wrong thread.
+// If RunReplyAndSelfDestruct() doesn't run because the originating execution
+// context is no longer available, then the |task| and |reply| Closures are
+// leaked. Leaking is considered preferable to having a thread-safetey
+// violations caused by invoking the Closure destructor on the wrong sequence.
class PostTaskAndReplyRelay {
public:
PostTaskAndReplyRelay(const tracked_objects::Location& from_here,
- const Closure& task,
- const Closure& reply)
- : from_here_(from_here),
- origin_task_runner_(ThreadTaskRunnerHandle::Get()) {
- task_ = task;
- reply_ = reply;
- }
+ Closure task,
+ Closure reply)
+ : sequence_checker_(),
+ from_here_(from_here),
+ origin_task_runner_(SequencedTaskRunnerHandle::Get()),
+ reply_(std::move(reply)),
+ task_(std::move(task)) {}
~PostTaskAndReplyRelay() {
- DCHECK(origin_task_runner_->BelongsToCurrentThread());
+ DCHECK(sequence_checker_.CalledOnValidSequence());
task_.Reset();
reply_.Reset();
}
- void Run() {
+ void RunTaskAndPostReply() {
task_.Run();
origin_task_runner_->PostTask(
from_here_, Bind(&PostTaskAndReplyRelay::RunReplyAndSelfDestruct,
@@ -48,7 +52,7 @@ class PostTaskAndReplyRelay {
private:
void RunReplyAndSelfDestruct() {
- DCHECK(origin_task_runner_->BelongsToCurrentThread());
+ DCHECK(sequence_checker_.CalledOnValidSequence());
// Force |task_| to be released before |reply_| is to ensure that no one
// accidentally depends on |task_| keeping one of its arguments alive while
@@ -61,8 +65,9 @@ class PostTaskAndReplyRelay {
delete this;
}
- tracked_objects::Location from_here_;
- scoped_refptr<SingleThreadTaskRunner> origin_task_runner_;
+ const SequenceChecker sequence_checker_;
+ const tracked_objects::Location from_here_;
+ const scoped_refptr<SequencedTaskRunner> origin_task_runner_;
Closure reply_;
Closure task_;
};
@@ -73,14 +78,19 @@ namespace internal {
bool PostTaskAndReplyImpl::PostTaskAndReply(
const tracked_objects::Location& from_here,
- const Closure& task,
- const Closure& reply) {
- // TODO(tzik): Use DCHECK here once the crash is gone. http://crbug.com/541319
- CHECK(!task.is_null()) << from_here.ToString();
- CHECK(!reply.is_null()) << from_here.ToString();
+ Closure task,
+ Closure reply) {
+ DCHECK(!task.is_null()) << from_here.ToString();
+ DCHECK(!reply.is_null()) << from_here.ToString();
PostTaskAndReplyRelay* relay =
- new PostTaskAndReplyRelay(from_here, task, reply);
- if (!PostTask(from_here, Bind(&PostTaskAndReplyRelay::Run,
+ new PostTaskAndReplyRelay(from_here, std::move(task), std::move(reply));
+ // PostTaskAndReplyRelay self-destructs after executing |reply|. On the flip
+ // side though, it is intentionally leaked if the |task| doesn't complete
+ // before the origin sequence stops executing tasks. Annotate |relay| as leaky
+ // to avoid having to suppress every callsite which happens to flakily trigger
+ // this race.
+ ANNOTATE_LEAKING_OBJECT_PTR(relay);
+ if (!PostTask(from_here, Bind(&PostTaskAndReplyRelay::RunTaskAndPostReply,
Unretained(relay)))) {
delete relay;
return false;
diff --git a/base/threading/post_task_and_reply_impl.h b/base/threading/post_task_and_reply_impl.h
index d21ab78de8..696b668a4c 100644
--- a/base/threading/post_task_and_reply_impl.h
+++ b/base/threading/post_task_and_reply_impl.h
@@ -8,30 +8,29 @@
#ifndef BASE_THREADING_POST_TASK_AND_REPLY_IMPL_H_
#define BASE_THREADING_POST_TASK_AND_REPLY_IMPL_H_
-#include "base/callback_forward.h"
+#include "base/base_export.h"
+#include "base/callback.h"
#include "base/location.h"
namespace base {
namespace internal {
-// Inherit from this in a class that implements PostTask appropriately
-// for sending to a destination thread.
+// Inherit from this in a class that implements PostTask to send a task to a
+// custom execution context.
//
-// Note that 'reply' will always get posted back to your current
-// MessageLoop.
-//
-// If you're looking for a concrete implementation of
-// PostTaskAndReply, you probably want base::SingleThreadTaskRunner, or you
-// may want base::WorkerPool.
-class PostTaskAndReplyImpl {
+// If you're looking for a concrete implementation of PostTaskAndReply, you
+// probably want base::TaskRunner, or you may want base::WorkerPool.
+class BASE_EXPORT PostTaskAndReplyImpl {
public:
virtual ~PostTaskAndReplyImpl() = default;
- // Implementation for TaskRunner::PostTaskAndReply and
- // WorkerPool::PostTaskAndReply.
+ // Posts |task| by calling PostTask(). On completion, |reply| is posted to the
+ // sequence or thread that called this. Can only be called when
+ // SequencedTaskRunnerHandle::IsSet(). Both |task| and |reply| are guaranteed
+ // to be deleted on the sequence or thread that called this.
bool PostTaskAndReply(const tracked_objects::Location& from_here,
- const Closure& task,
- const Closure& reply);
+ Closure task,
+ Closure reply);
private:
virtual bool PostTask(const tracked_objects::Location& from_here,
diff --git a/base/threading/sequenced_task_runner_handle.cc b/base/threading/sequenced_task_runner_handle.cc
index 88b36a8d64..90f68b33ab 100644
--- a/base/threading/sequenced_task_runner_handle.cc
+++ b/base/threading/sequenced_task_runner_handle.cc
@@ -16,39 +16,56 @@ namespace base {
namespace {
-base::LazyInstance<base::ThreadLocalPointer<SequencedTaskRunnerHandle>>::Leaky
+LazyInstance<ThreadLocalPointer<SequencedTaskRunnerHandle>>::Leaky
lazy_tls_ptr = LAZY_INSTANCE_INITIALIZER;
} // namespace
// static
scoped_refptr<SequencedTaskRunner> SequencedTaskRunnerHandle::Get() {
+ // Return the registered SingleThreadTaskRunner, if any. This must be at the
+ // top so that a SingleThreadTaskRunner has priority over a
+ // SequencedTaskRunner (RLZ registers both on the same thread despite that
+ // being prevented by DCHECKs).
+ // TODO(fdoray): Move this to the bottom once RLZ stops registering a
+ // SingleThreadTaskRunner and a SequencedTaskRunner on the same thread.
+ // https://crbug.com/618530#c14
+ if (ThreadTaskRunnerHandle::IsSet()) {
+ // Various modes of setting SequencedTaskRunnerHandle don't combine.
+ DCHECK(!lazy_tls_ptr.Pointer()->Get());
+ DCHECK(!SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid());
+
+ return ThreadTaskRunnerHandle::Get();
+ }
+
// Return the registered SequencedTaskRunner, if any.
const SequencedTaskRunnerHandle* handle = lazy_tls_ptr.Pointer()->Get();
if (handle) {
// Various modes of setting SequencedTaskRunnerHandle don't combine.
- DCHECK(!base::ThreadTaskRunnerHandle::IsSet());
- DCHECK(!SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread());
- return handle->task_runner_;
- }
+ DCHECK(!SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid());
- // Return the SequencedTaskRunner obtained from SequencedWorkerPool, if any.
- scoped_refptr<base::SequencedTaskRunner> task_runner =
- SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread();
- if (task_runner) {
- DCHECK(!base::ThreadTaskRunnerHandle::IsSet());
- return task_runner;
+ return handle->task_runner_;
}
- // Return the SingleThreadTaskRunner for the current thread otherwise.
- return base::ThreadTaskRunnerHandle::Get();
+ // If we are on a worker thread for a SequencedBlockingPool that is running a
+ // sequenced task, return a SequencedTaskRunner for it.
+ scoped_refptr<SequencedWorkerPool> pool =
+ SequencedWorkerPool::GetWorkerPoolForCurrentThread();
+ DCHECK(pool);
+ SequencedWorkerPool::SequenceToken sequence_token =
+ SequencedWorkerPool::GetSequenceTokenForCurrentThread();
+ DCHECK(sequence_token.IsValid());
+ scoped_refptr<SequencedTaskRunner> sequenced_task_runner(
+ pool->GetSequencedTaskRunner(sequence_token));
+ DCHECK(sequenced_task_runner->RunsTasksOnCurrentThread());
+ return sequenced_task_runner;
}
// static
bool SequencedTaskRunnerHandle::IsSet() {
return lazy_tls_ptr.Pointer()->Get() ||
- SequencedWorkerPool::GetWorkerPoolForCurrentThread() ||
- base::ThreadTaskRunnerHandle::IsSet();
+ SequencedWorkerPool::GetSequenceTokenForCurrentThread().IsValid() ||
+ ThreadTaskRunnerHandle::IsSet();
}
SequencedTaskRunnerHandle::SequencedTaskRunnerHandle(
diff --git a/base/threading/sequenced_task_runner_handle.h b/base/threading/sequenced_task_runner_handle.h
index e6dec1e9f8..b7f4bae8aa 100644
--- a/base/threading/sequenced_task_runner_handle.h
+++ b/base/threading/sequenced_task_runner_handle.h
@@ -25,8 +25,9 @@ class BASE_EXPORT SequencedTaskRunnerHandle {
// instantiating a SequencedTaskRunnerHandle.
// b) The current thread has a ThreadTaskRunnerHandle (which includes any
// thread that has a MessageLoop associated with it), or
- // c) The current thread is a worker thread belonging to a
- // SequencedWorkerPool.
+ // c) The current thread is a worker thread belonging to a SequencedWorkerPool
+ // *and* is currently running a sequenced task (note: not supporting
+ // unsequenced tasks is intentional: https://crbug.com/618043#c4).
static bool IsSet();
// Binds |task_runner| to the current thread.
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 57961b5cd5..ce594cd7fb 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -10,6 +10,7 @@
#include <map>
#include <memory>
#include <set>
+#include <unordered_map>
#include <utility>
#include <vector>
@@ -17,6 +18,7 @@
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/critical_closure.h"
+#include "base/debug/dump_without_crashing.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/macros.h"
@@ -25,15 +27,21 @@
#include "base/strings/stringprintf.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
+// Don't enable the redirect to TaskScheduler on Arc++ to avoid pulling a bunch
+// of dependencies. Some code also #ifdef'ed below.
+#if 0
+#include "base/task_scheduler/post_task.h"
+#include "base/task_scheduler/task_scheduler.h"
+#endif
#include "base/threading/platform_thread.h"
+#include "base/threading/sequenced_task_runner_handle.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread_local.h"
#include "base/threading/thread_restrictions.h"
-#include "base/threading/thread_task_runner_handle.h"
#include "base/time/time.h"
-#include "base/trace_event/heap_profiler.h"
#include "base/trace_event/trace_event.h"
#include "base/tracked_objects.h"
+#include "base/tracking_info.h"
#include "build/build_config.h"
#if defined(OS_MACOSX)
@@ -43,13 +51,36 @@
#endif
#if !defined(OS_NACL)
-#include "base/metrics/histogram.h"
+#include "base/metrics/histogram_macros.h"
#endif
namespace base {
namespace {
+// An enum representing the state of all pools. A non-test process should only
+// ever transition from POST_TASK_DISABLED to one of the active states. A test
+// process may transition from one of the active states to POST_TASK_DISABLED
+// when DisableForProcessForTesting() is called.
+//
+// External memory synchronization is required to call a method that reads
+// |g_all_pools_state| after calling a method that modifies it.
+//
+// TODO(gab): Remove this if http://crbug.com/622400 fails (SequencedWorkerPool
+// will be phased out completely otherwise).
+enum class AllPoolsState {
+ POST_TASK_DISABLED,
+ USE_WORKER_POOL,
+ REDIRECTED_TO_TASK_SCHEDULER,
+};
+
+// TODO(fdoray): Change the initial state to POST_TASK_DISABLED. It is initially
+// USE_WORKER_POOL to avoid a revert of the CL that adds
+// debug::DumpWithoutCrashing() in case of waterfall failures.
+AllPoolsState g_all_pools_state = AllPoolsState::USE_WORKER_POOL;
+
+TaskPriority g_max_task_priority = TaskPriority::HIGHEST;
+
struct SequencedTask : public TrackingInfo {
SequencedTask()
: sequence_token_id(0),
@@ -92,6 +123,14 @@ struct SequencedTaskLessThan {
}
};
+// Create a process-wide unique ID to represent this task in trace events. This
+// will be mangled with a Process ID hash to reduce the likelyhood of colliding
+// with MessageLoop pointers on other processes.
+uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) {
+ return (static_cast<uint64_t>(task.trace_id) << 32) |
+ static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool));
+}
+
// SequencedWorkerPoolTaskRunner ---------------------------------------------
// A TaskRunner which posts tasks to a SequencedWorkerPool with a
// fixed ShutdownBehavior.
@@ -142,14 +181,17 @@ bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
return pool_->RunsTasksOnCurrentThread();
}
-// SequencedWorkerPoolSequencedTaskRunner ------------------------------------
+} // namespace
+
+// SequencedWorkerPool::PoolSequencedTaskRunner ------------------------------
// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
// fixed sequence token.
//
// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
-class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
+class SequencedWorkerPool::PoolSequencedTaskRunner
+ : public SequencedTaskRunner {
public:
- SequencedWorkerPoolSequencedTaskRunner(
+ PoolSequencedTaskRunner(
scoped_refptr<SequencedWorkerPool> pool,
SequencedWorkerPool::SequenceToken token,
SequencedWorkerPool::WorkerShutdown shutdown_behavior);
@@ -166,7 +208,7 @@ class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
TimeDelta delay) override;
private:
- ~SequencedWorkerPoolSequencedTaskRunner() override;
+ ~PoolSequencedTaskRunner() override;
const scoped_refptr<SequencedWorkerPool> pool_;
@@ -174,25 +216,25 @@ class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
- DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
+ DISALLOW_COPY_AND_ASSIGN(PoolSequencedTaskRunner);
};
-SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
- scoped_refptr<SequencedWorkerPool> pool,
- SequencedWorkerPool::SequenceToken token,
- SequencedWorkerPool::WorkerShutdown shutdown_behavior)
+SequencedWorkerPool::PoolSequencedTaskRunner::
+ PoolSequencedTaskRunner(
+ scoped_refptr<SequencedWorkerPool> pool,
+ SequencedWorkerPool::SequenceToken token,
+ SequencedWorkerPool::WorkerShutdown shutdown_behavior)
: pool_(std::move(pool)),
token_(token),
shutdown_behavior_(shutdown_behavior) {}
-SequencedWorkerPoolSequencedTaskRunner::
-~SequencedWorkerPoolSequencedTaskRunner() {
-}
+SequencedWorkerPool::PoolSequencedTaskRunner::
+ ~PoolSequencedTaskRunner() = default;
-bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
- const tracked_objects::Location& from_here,
- const Closure& task,
- TimeDelta delay) {
+bool SequencedWorkerPool::PoolSequencedTaskRunner::
+ PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& task,
+ TimeDelta delay) {
if (delay.is_zero()) {
return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
token_, from_here, task, shutdown_behavior_);
@@ -200,29 +242,20 @@ bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
}
-bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
+bool SequencedWorkerPool::PoolSequencedTaskRunner::
+ RunsTasksOnCurrentThread() const {
return pool_->IsRunningSequenceOnCurrentThread(token_);
}
-bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
- const tracked_objects::Location& from_here,
- const Closure& task,
- TimeDelta delay) {
+bool SequencedWorkerPool::PoolSequencedTaskRunner::
+ PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& task,
+ TimeDelta delay) {
// There's no way to run nested tasks, so simply forward to
// PostDelayedTask.
return PostDelayedTask(from_here, task, delay);
}
-// Create a process-wide unique ID to represent this task in trace events. This
-// will be mangled with a Process ID hash to reduce the likelyhood of colliding
-// with MessageLoop pointers on other processes.
-uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) {
- return (static_cast<uint64_t>(task.trace_id) << 32) |
- static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool));
-}
-
-} // namespace
-
// Worker ---------------------------------------------------------------------
class SequencedWorkerPool::Worker : public SimpleThread {
@@ -247,6 +280,14 @@ class SequencedWorkerPool::Worker : public SimpleThread {
is_processing_task_ = true;
task_sequence_token_ = token;
task_shutdown_behavior_ = shutdown_behavior;
+
+ // It is dangerous for tasks with CONTINUE_ON_SHUTDOWN to access a class
+ // that implements a non-leaky base::Singleton because they are generally
+ // destroyed before the process terminates via an AtExitManager
+ // registration. This will trigger a DCHECK to warn of such cases. See the
+ // comment about CONTINUE_ON_SHUTDOWN for more details.
+ ThreadRestrictions::SetSingletonAllowed(task_shutdown_behavior_ !=
+ CONTINUE_ON_SHUTDOWN);
}
// Indicates that the task has finished running.
@@ -292,8 +333,10 @@ class SequencedWorkerPool::Inner {
public:
// Take a raw pointer to |worker| to avoid cycles (since we're owned
// by it).
- Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
+ Inner(SequencedWorkerPool* worker_pool,
+ size_t max_threads,
const std::string& thread_name_prefix,
+ base::TaskPriority task_priority,
TestingObserver* observer);
~Inner();
@@ -316,11 +359,6 @@ class SequencedWorkerPool::Inner {
bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
- bool IsRunningSequence(SequenceToken sequence_token) const;
-
- void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
- WorkerShutdown shutdown_behavior);
-
void CleanupForTesting();
void SignalHasWorkForTesting();
@@ -349,6 +387,25 @@ class SequencedWorkerPool::Inner {
CLEANUP_DONE,
};
+ // Clears ScheduledTasks in |tasks_to_delete| while ensuring that
+ // |this_worker| has the desired task info context during ~ScheduledTask() to
+ // allow sequence-checking.
+ void DeleteWithoutLock(std::vector<SequencedTask>* tasks_to_delete,
+ Worker* this_worker);
+
+ // Helper used by PostTask() to complete the work when redirection is on.
+ // Returns true if the task may run at some point in the future and false if
+ // it will definitely not run.
+ // Coalesce upon resolution of http://crbug.com/622400.
+ bool PostTaskToTaskScheduler(const SequencedTask& sequenced,
+ const TimeDelta& delay);
+
+ // Returns the TaskScheduler TaskRunner for the specified |sequence_token_id|
+ // and |traits|.
+ scoped_refptr<TaskRunner> GetTaskSchedulerTaskRunner(
+ int sequence_token_id,
+ const TaskTraits& traits);
+
// Called from within the lock, this converts the given token name into a
// token ID, creating a new one if necessary.
int LockedGetNamedTokenID(const std::string& name);
@@ -373,7 +430,7 @@ class SequencedWorkerPool::Inner {
// See the implementation for a more detailed description.
GetWorkStatus GetWork(SequencedTask* task,
TimeDelta* wait_time,
- std::vector<Closure>* delete_these_outside_lock);
+ std::vector<SequencedTask>* delete_these_outside_lock);
void HandleCleanup();
@@ -397,7 +454,7 @@ class SequencedWorkerPool::Inner {
// 0 or more. The caller should then call FinishStartingAdditionalThread to
// complete initialization once the lock is released.
//
- // If another thread is not necessary, returne 0;
+ // If another thread is not necessary, return 0;
//
// See the implementedion for more.
int PrepareToStartAdditionalThreadIfHelpful();
@@ -497,6 +554,27 @@ class SequencedWorkerPool::Inner {
TestingObserver* const testing_observer_;
+ // Members below are used for the experimental redirection to TaskScheduler.
+ // TODO(gab): Remove these if http://crbug.com/622400 fails
+ // (SequencedWorkerPool will be phased out completely otherwise).
+
+ // The TaskPriority to be used for SequencedWorkerPool tasks redirected to the
+ // TaskScheduler as an experiment (unused otherwise).
+ const base::TaskPriority task_priority_;
+
+ // A map of SequenceToken IDs to TaskScheduler TaskRunners used to redirect
+ // sequenced tasks to the TaskScheduler.
+ std::unordered_map<int, scoped_refptr<TaskRunner>> sequenced_task_runner_map_;
+
+ // TaskScheduler TaskRunners to redirect unsequenced tasks to the
+ // TaskScheduler. Indexed by TaskShutdownBehavior.
+ scoped_refptr<TaskRunner> unsequenced_task_runners_[3];
+
+ // A dummy TaskRunner obtained from TaskScheduler with the same TaskTraits as
+ // used by this SequencedWorkerPool to query for RunsTasksOnCurrentThread().
+ // Mutable so it can be lazily instantiated from RunsTasksOnCurrentThread().
+ mutable scoped_refptr<TaskRunner> runs_tasks_on_verifier_;
+
DISALLOW_COPY_AND_ASSIGN(Inner);
};
@@ -510,6 +588,7 @@ SequencedWorkerPool::Worker::Worker(
worker_pool_(std::move(worker_pool)),
task_shutdown_behavior_(BLOCK_SHUTDOWN),
is_processing_task_(false) {
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
Start();
}
@@ -517,6 +596,8 @@ SequencedWorkerPool::Worker::~Worker() {
}
void SequencedWorkerPool::Worker::Run() {
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+
#if defined(OS_WIN)
win::ScopedCOMInitializer com_initializer;
#endif
@@ -552,11 +633,11 @@ LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
// Inner definitions ---------------------------------------------------------
-SequencedWorkerPool::Inner::Inner(
- SequencedWorkerPool* worker_pool,
- size_t max_threads,
- const std::string& thread_name_prefix,
- TestingObserver* observer)
+SequencedWorkerPool::Inner::Inner(SequencedWorkerPool* worker_pool,
+ size_t max_threads,
+ const std::string& thread_name_prefix,
+ base::TaskPriority task_priority,
+ TestingObserver* observer)
: worker_pool_(worker_pool),
lock_(),
has_work_cv_(&lock_),
@@ -574,7 +655,13 @@ SequencedWorkerPool::Inner::Inner(
cleanup_state_(CLEANUP_DONE),
cleanup_idlers_(0),
cleanup_cv_(&lock_),
- testing_observer_(observer) {}
+ testing_observer_(observer),
+ task_priority_(static_cast<int>(task_priority) <=
+ static_cast<int>(g_max_task_priority)
+ ? task_priority
+ : g_max_task_priority) {
+ DCHECK_GT(max_threads_, 1U);
+}
SequencedWorkerPool::Inner::~Inner() {
// You must call Shutdown() before destroying the pool.
@@ -611,6 +698,13 @@ bool SequencedWorkerPool::Inner::PostTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
+ // TODO(fdoray): Uncomment this DCHECK. It is initially commented to avoid a
+ // revert of the CL that adds debug::DumpWithoutCrashing() if it fails on the
+ // waterfall. https://crbug.com/622400
+ // DCHECK_NE(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
+ if (g_all_pools_state == AllPoolsState::POST_TASK_DISABLED)
+ debug::DumpWithoutCrashing();
+
DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
SequencedTask sequenced(from_here);
sequenced.sequence_token_id = sequence_token.id_;
@@ -624,6 +718,7 @@ bool SequencedWorkerPool::Inner::PostTask(
int create_thread_id = 0;
{
AutoLock lock(lock_);
+
if (shutdown_called_) {
// Don't allow a new task to be posted if it doesn't block shutdown.
if (shutdown_behavior != BLOCK_SHUTDOWN)
@@ -659,64 +754,178 @@ bool SequencedWorkerPool::Inner::PostTask(
if (optional_token_name)
sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
- pending_tasks_.insert(sequenced);
- if (shutdown_behavior == BLOCK_SHUTDOWN)
- blocking_shutdown_pending_task_count_++;
+ // See on top of the file why we don't compile this on Arc++.
+#if 0
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ if (!PostTaskToTaskScheduler(sequenced, delay))
+ return false;
+ } else {
+#endif
+ pending_tasks_.insert(sequenced);
- create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
+ if (sequenced.shutdown_behavior == BLOCK_SHUTDOWN)
+ blocking_shutdown_pending_task_count_++;
+
+ create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
+ }
+#if 0
}
+#endif
- // Actually start the additional thread or signal an existing one now that
- // we're outside the lock.
- if (create_thread_id)
- FinishStartingAdditionalThread(create_thread_id);
- else
- SignalHasWork();
+ // Use != REDIRECTED_TO_TASK_SCHEDULER instead of == USE_WORKER_POOL to ensure
+ // correct behavior if a task is posted to a SequencedWorkerPool before
+ // Enable(WithRedirectionToTaskScheduler)ForProcess() in a non-DCHECK build.
+ if (g_all_pools_state != AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ // Actually start the additional thread or signal an existing one outside
+ // the lock.
+ if (create_thread_id)
+ FinishStartingAdditionalThread(create_thread_id);
+ else
+ SignalHasWork();
+ }
+
+#if DCHECK_IS_ON()
+ {
+ AutoLock lock_for_dcheck(lock_);
+ // Some variables are exposed in both modes for convenience but only really
+ // intended for one of them at runtime, confirm exclusive usage here.
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ DCHECK(pending_tasks_.empty());
+ DCHECK_EQ(0, create_thread_id);
+ } else {
+ DCHECK(sequenced_task_runner_map_.empty());
+ }
+ }
+#endif // DCHECK_IS_ON()
return true;
}
-bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
- AutoLock lock(lock_);
- return ContainsKey(threads_, PlatformThread::CurrentId());
+bool SequencedWorkerPool::Inner::PostTaskToTaskScheduler(
+ const SequencedTask& sequenced,
+ const TimeDelta& delay) {
+#if 1
+ NOTREACHED();
+ ALLOW_UNUSED_PARAM(sequenced);
+ ALLOW_UNUSED_PARAM(delay);
+ return false;
+#else
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
+ lock_.AssertAcquired();
+
+ // Confirm that the TaskScheduler's shutdown behaviors use the same
+ // underlying values as SequencedWorkerPool.
+ static_assert(
+ static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) ==
+ static_cast<int>(CONTINUE_ON_SHUTDOWN),
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
+ "CONTINUE_ON_SHUTDOWN.");
+ static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) ==
+ static_cast<int>(SKIP_ON_SHUTDOWN),
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
+ "SKIP_ON_SHUTDOWN.");
+ static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) ==
+ static_cast<int>(BLOCK_SHUTDOWN),
+ "TaskShutdownBehavior and WorkerShutdown enum mismatch for "
+ "BLOCK_SHUTDOWN.");
+
+ const TaskShutdownBehavior task_shutdown_behavior =
+ static_cast<TaskShutdownBehavior>(sequenced.shutdown_behavior);
+ const TaskTraits traits = TaskTraits()
+ .MayBlock()
+ .WithBaseSyncPrimitives()
+ .WithPriority(task_priority_)
+ .WithShutdownBehavior(task_shutdown_behavior);
+ return GetTaskSchedulerTaskRunner(sequenced.sequence_token_id, traits)
+ ->PostDelayedTask(sequenced.posted_from, sequenced.task, delay);
+#endif
}
-bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
- SequenceToken sequence_token) const {
+scoped_refptr<TaskRunner>
+SequencedWorkerPool::Inner::GetTaskSchedulerTaskRunner(
+ int sequence_token_id,
+ const TaskTraits& traits) {
+#if 1
+ NOTREACHED();
+ ALLOW_UNUSED_PARAM(sequence_token_id);
+ ALLOW_UNUSED_PARAM(traits);
+ return scoped_refptr<TaskRunner>();
+#else
+ DCHECK_EQ(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
+ lock_.AssertAcquired();
+
+ static_assert(
+ static_cast<int>(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) == 0,
+ "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN must be equal to 0 to be "
+ "used as an index in |unsequenced_task_runners_|.");
+ static_assert(static_cast<int>(TaskShutdownBehavior::SKIP_ON_SHUTDOWN) == 1,
+ "TaskShutdownBehavior::SKIP_ON_SHUTDOWN must be equal to 1 to "
+ "be used as an index in |unsequenced_task_runners_|.");
+ static_assert(static_cast<int>(TaskShutdownBehavior::BLOCK_SHUTDOWN) == 2,
+ "TaskShutdownBehavior::BLOCK_SHUTDOWN must be equal to 2 to be "
+ "used as an index in |unsequenced_task_runners_|.");
+ static_assert(arraysize(unsequenced_task_runners_) == 3,
+ "The size of |unsequenced_task_runners_| doesn't match the "
+ "number of shutdown behaviors.");
+
+ scoped_refptr<TaskRunner>& task_runner =
+ sequence_token_id ? sequenced_task_runner_map_[sequence_token_id]
+ : unsequenced_task_runners_[static_cast<int>(
+ traits.shutdown_behavior())];
+
+ // TODO(fdoray): DCHECK that all tasks posted to the same sequence have the
+ // same shutdown behavior.
+
+ if (!task_runner) {
+ task_runner = sequence_token_id
+ ? CreateSequencedTaskRunnerWithTraits(traits)
+ : CreateTaskRunnerWithTraits(traits);
+ }
+
+ return task_runner;
+#endif
+}
+
+bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
AutoLock lock(lock_);
- ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
- if (found == threads_.end())
- return false;
- return found->second->is_processing_task() &&
- sequence_token.Equals(found->second->task_sequence_token());
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+#if 0
+ if (!runs_tasks_on_verifier_) {
+ runs_tasks_on_verifier_ = CreateTaskRunnerWithTraits(
+ TaskTraits().MayBlock().WithBaseSyncPrimitives().WithPriority(
+ task_priority_));
+ }
+#endif
+ return runs_tasks_on_verifier_->RunsTasksOnCurrentThread();
+ } else {
+ return ContainsKey(threads_, PlatformThread::CurrentId());
+ }
}
-bool SequencedWorkerPool::Inner::IsRunningSequence(
+bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
SequenceToken sequence_token) const {
DCHECK(sequence_token.IsValid());
- AutoLock lock(lock_);
- return !IsSequenceTokenRunnable(sequence_token.id_);
-}
-void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
- SequenceToken sequence_token,
- WorkerShutdown shutdown_behavior) {
AutoLock lock(lock_);
- ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
- DCHECK(found != threads_.end());
- DCHECK(found->second->is_processing_task());
- DCHECK(!found->second->task_sequence_token().IsValid());
- found->second->set_running_task_info(sequence_token, shutdown_behavior);
- // Mark the sequence token as in use.
- bool success = current_sequences_.insert(sequence_token.id_).second;
- DCHECK(success);
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+ const auto sequenced_task_runner_it =
+ sequenced_task_runner_map_.find(sequence_token.id_);
+ return sequenced_task_runner_it != sequenced_task_runner_map_.end() &&
+ sequenced_task_runner_it->second->RunsTasksOnCurrentThread();
+ } else {
+ ThreadMap::const_iterator found =
+ threads_.find(PlatformThread::CurrentId());
+ return found != threads_.end() && found->second->is_processing_task() &&
+ sequence_token.Equals(found->second->task_sequence_token());
+ }
}
// See https://code.google.com/p/chromium/issues/detail?id=168415
void SequencedWorkerPool::Inner::CleanupForTesting() {
- DCHECK(!RunsTasksOnCurrentThread());
- base::ThreadRestrictions::ScopedAllowWait allow_wait;
+ DCHECK_NE(g_all_pools_state, AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER);
AutoLock lock(lock_);
CHECK_EQ(CLEANUP_DONE, cleanup_state_);
if (shutdown_called_)
@@ -744,8 +953,12 @@ void SequencedWorkerPool::Inner::Shutdown(
if (shutdown_called_)
return;
shutdown_called_ = true;
+
max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
+ if (g_all_pools_state != AllPoolsState::USE_WORKER_POOL)
+ return;
+
// Tickle the threads. This will wake up a waiting one so it will know that
// it can exit, which in turn will wake up any other waiting ones.
SignalHasWork();
@@ -783,6 +996,7 @@ bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
}
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
{
AutoLock lock(lock_);
DCHECK(thread_being_created_);
@@ -801,18 +1015,15 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
// See GetWork for what delete_these_outside_lock is doing.
SequencedTask task;
TimeDelta wait_time;
- std::vector<Closure> delete_these_outside_lock;
+ std::vector<SequencedTask> delete_these_outside_lock;
GetWorkStatus status =
GetWork(&task, &wait_time, &delete_these_outside_lock);
if (status == GET_WORK_FOUND) {
- TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
- "SequencedWorkerPool::Inner::ThreadLoop",
+ TRACE_TASK_EXECUTION("SequencedWorkerPool::Inner::ThreadLoop", task);
+ TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
+ "SequencedWorkerPool::Inner::PostTask",
TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))),
- TRACE_EVENT_FLAG_FLOW_IN,
- "src_file", task.posted_from.file_name(),
- "src_func", task.posted_from.function_name());
- TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION task_event(
- task.posted_from.file_name());
+ TRACE_EVENT_FLAG_FLOW_IN);
int new_thread_id = WillRunWorkerTask(task);
{
AutoUnlock unlock(lock_);
@@ -821,7 +1032,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
// already get a signal for each new task, but it doesn't
// hurt.)
SignalHasWork();
- delete_these_outside_lock.clear();
+ DeleteWithoutLock(&delete_these_outside_lock, this_worker);
// Complete thread creation outside the lock if necessary.
if (new_thread_id)
@@ -838,11 +1049,6 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
task, stopwatch);
- // Update the sequence token in case it has been set from within the
- // task, so it can be removed from the set of currently running
- // sequences in DidRunWorkerTask() below.
- task.sequence_token_id = this_worker->task_sequence_token().id_;
-
// Make sure our task is erased outside the lock for the
// same reason we do this with delete_these_oustide_lock.
// Also, do it before calling reset_running_task_info() so
@@ -857,7 +1063,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
switch (status) {
case GET_WORK_WAIT: {
AutoUnlock unlock(lock_);
- delete_these_outside_lock.clear();
+ DeleteWithoutLock(&delete_these_outside_lock, this_worker);
}
break;
case GET_WORK_NOT_FOUND:
@@ -879,7 +1085,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
// help this case.
if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) {
AutoUnlock unlock(lock_);
- delete_these_outside_lock.clear();
+ DeleteWithoutLock(&delete_these_outside_lock, this_worker);
break;
}
@@ -887,7 +1093,7 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
// deletion must happen outside of the lock.
if (delete_these_outside_lock.size()) {
AutoUnlock unlock(lock_);
- delete_these_outside_lock.clear();
+ DeleteWithoutLock(&delete_these_outside_lock, this_worker);
// Since the lock has been released, |status| may no longer be
// accurate. It might read GET_WORK_WAIT even if there are tasks
@@ -910,6 +1116,9 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
}
waiting_thread_count_--;
}
+ // |delete_these_outside_lock| should have been cleared via
+ // DeleteWithoutLock() above already.
+ DCHECK(delete_these_outside_lock.empty());
}
} // Release lock_.
@@ -921,7 +1130,22 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
can_shutdown_cv_.Signal();
}
+void SequencedWorkerPool::Inner::DeleteWithoutLock(
+ std::vector<SequencedTask>* tasks_to_delete,
+ Worker* this_worker) {
+ while (!tasks_to_delete->empty()) {
+ const SequencedTask& deleted_task = tasks_to_delete->back();
+ this_worker->set_running_task_info(
+ SequenceToken(deleted_task.sequence_token_id),
+ deleted_task.shutdown_behavior);
+ tasks_to_delete->pop_back();
+ }
+ this_worker->reset_running_task_info();
+}
+
void SequencedWorkerPool::Inner::HandleCleanup() {
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+
lock_.AssertAcquired();
if (cleanup_state_ == CLEANUP_DONE)
return;
@@ -986,7 +1210,9 @@ int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
SequencedTask* task,
TimeDelta* wait_time,
- std::vector<Closure>* delete_these_outside_lock) {
+ std::vector<SequencedTask>* delete_these_outside_lock) {
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+
lock_.AssertAcquired();
// Find the next task with a sequence token that's not currently in use.
@@ -1030,18 +1256,17 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
// shutdown. Delete it and get more work.
//
// Note that we do not want to delete unrunnable tasks. Deleting a task
- // can have side effects (like freeing some objects) and deleting a
- // task that's supposed to run after one that's currently running could
- // cause an obscure crash.
+ // can have side effects (like freeing some objects) and deleting a task
+ // that's supposed to run after one that's currently running could cause
+ // an obscure crash.
//
// We really want to delete these tasks outside the lock in case the
- // closures are holding refs to objects that want to post work from
- // their destructorss (which would deadlock). The closures are
- // internally refcounted, so we just need to keep a copy of them alive
- // until the lock is exited. The calling code can just clear() the
- // vector they passed to us once the lock is exited to make this
- // happen.
- delete_these_outside_lock->push_back(i->task);
+ // closures are holding refs to objects that want to post work from their
+ // destructors (which would deadlock). The closures are internally
+ // refcounted, so we just need to keep a copy of them alive until the lock
+ // is exited. The calling code can just clear() the vector they passed to
+ // us once the lock is exited to make this happen.
+ delete_these_outside_lock->push_back(*i);
pending_tasks_.erase(i++);
continue;
}
@@ -1052,7 +1277,7 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
status = GET_WORK_WAIT;
if (cleanup_state_ == CLEANUP_RUNNING) {
// Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
- delete_these_outside_lock->push_back(i->task);
+ delete_these_outside_lock->push_back(*i);
pending_tasks_.erase(i);
}
break;
@@ -1073,6 +1298,8 @@ SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
}
int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+
lock_.AssertAcquired();
// Mark the task's sequence number as in use.
@@ -1104,6 +1331,8 @@ int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
}
void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+
lock_.AssertAcquired();
if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
@@ -1117,6 +1346,8 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
int sequence_token_id) const {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
lock_.AssertAcquired();
return !sequence_token_id ||
current_sequences_.find(sequence_token_id) ==
@@ -1124,6 +1355,8 @@ bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
}
int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
lock_.AssertAcquired();
// How thread creation works:
//
@@ -1174,6 +1407,8 @@ int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
int thread_number) {
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
+
// Called outside of the lock.
DCHECK_GT(thread_number, 0);
@@ -1183,6 +1418,8 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
}
void SequencedWorkerPool::Inner::SignalHasWork() {
+ DCHECK_NE(AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER, g_all_pools_state);
+
has_work_cv_.Signal();
if (testing_observer_) {
testing_observer_->OnHasWork();
@@ -1190,6 +1427,7 @@ void SequencedWorkerPool::Inner::SignalHasWork() {
}
bool SequencedWorkerPool::Inner::CanShutdown() const {
+ DCHECK_EQ(AllPoolsState::USE_WORKER_POOL, g_all_pools_state);
lock_.AssertAcquired();
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
return !thread_being_created_ &&
@@ -1227,44 +1465,61 @@ SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
}
// static
-scoped_refptr<SequencedTaskRunner>
-SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
- Worker* worker = Worker::GetForCurrentThread();
+void SequencedWorkerPool::EnableForProcess() {
+ // TODO(fdoray): Uncomment this line. It is initially commented to avoid a
+ // revert of the CL that adds debug::DumpWithoutCrashing() in case of
+ // waterfall failures.
+ // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
+ g_all_pools_state = AllPoolsState::USE_WORKER_POOL;
+}
- // If there is no worker, this thread is not a worker thread. Otherwise, it is
- // currently running a task (sequenced or unsequenced).
- if (!worker)
- return nullptr;
+// static
+void SequencedWorkerPool::EnableWithRedirectionToTaskSchedulerForProcess(
+ TaskPriority max_task_priority) {
+#if 1
+ NOTREACHED();
+ ALLOW_UNUSED_PARAM(max_task_priority);
+#else
+ // TODO(fdoray): Uncomment this line. It is initially commented to avoid a
+ // revert of the CL that adds debug::DumpWithoutCrashing() in case of
+ // waterfall failures.
+ // DCHECK_EQ(AllPoolsState::POST_TASK_DISABLED, g_all_pools_state);
+ DCHECK(TaskScheduler::GetInstance());
+ g_all_pools_state = AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER;
+ g_max_task_priority = max_task_priority;
+#endif
+}
- scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
- SequenceToken sequence_token = worker->task_sequence_token();
- WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
- if (!sequence_token.IsValid()) {
- // Create a new sequence token and bind this thread to it, to make sure that
- // a task posted to the SequencedTaskRunner we are going to return is not
- // immediately going to run on a different thread.
- sequence_token = Inner::GetSequenceToken();
- pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
- shutdown_behavior);
- }
+// static
+void SequencedWorkerPool::DisableForProcessForTesting() {
+ g_all_pools_state = AllPoolsState::POST_TASK_DISABLED;
+}
- DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
- return new SequencedWorkerPoolSequencedTaskRunner(
- std::move(pool), sequence_token, shutdown_behavior);
+// static
+bool SequencedWorkerPool::IsEnabled() {
+ return g_all_pools_state != AllPoolsState::POST_TASK_DISABLED;
}
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
- const std::string& thread_name_prefix)
- : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
- inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
-}
+ const std::string& thread_name_prefix,
+ base::TaskPriority task_priority)
+ : constructor_task_runner_(SequencedTaskRunnerHandle::Get()),
+ inner_(new Inner(this,
+ max_threads,
+ thread_name_prefix,
+ task_priority,
+ NULL)) {}
SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix,
+ base::TaskPriority task_priority,
TestingObserver* observer)
- : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
- inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
-}
+ : constructor_task_runner_(SequencedTaskRunnerHandle::Get()),
+ inner_(new Inner(this,
+ max_threads,
+ thread_name_prefix,
+ task_priority,
+ observer)) {}
SequencedWorkerPool::~SequencedWorkerPool() {}
@@ -1295,7 +1550,7 @@ scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
scoped_refptr<SequencedTaskRunner>
SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
SequenceToken token, WorkerShutdown shutdown_behavior) {
- return new SequencedWorkerPoolSequencedTaskRunner(
+ return new PoolSequencedTaskRunner(
this, token, shutdown_behavior);
}
@@ -1378,18 +1633,19 @@ bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
return inner_->RunsTasksOnCurrentThread();
}
-bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
- SequenceToken sequence_token) const {
- return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
-}
-
-bool SequencedWorkerPool::IsRunningSequence(
- SequenceToken sequence_token) const {
- return inner_->IsRunningSequence(sequence_token);
-}
-
void SequencedWorkerPool::FlushForTesting() {
- inner_->CleanupForTesting();
+ DCHECK(!RunsTasksOnCurrentThread());
+ base::ThreadRestrictions::ScopedAllowWait allow_wait;
+ if (g_all_pools_state == AllPoolsState::REDIRECTED_TO_TASK_SCHEDULER) {
+#if 1
+ NOTREACHED();
+#else
+ // TODO(gab): Remove this if http://crbug.com/622400 fails.
+ TaskScheduler::GetInstance()->FlushForTesting();
+#endif
+ } else {
+ inner_->CleanupForTesting();
+ }
}
void SequencedWorkerPool::SignalHasWorkForTesting() {
@@ -1397,7 +1653,7 @@ void SequencedWorkerPool::SignalHasWorkForTesting() {
}
void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
- DCHECK(constructor_task_runner_->BelongsToCurrentThread());
+ DCHECK(constructor_task_runner_->RunsTasksOnCurrentThread());
inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
}
@@ -1405,4 +1661,9 @@ bool SequencedWorkerPool::IsShutdownInProgress() {
return inner_->IsShutdownInProgress();
}
+bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
+ SequenceToken sequence_token) const {
+ return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
+}
+
} // namespace base
diff --git a/base/threading/sequenced_worker_pool.h b/base/threading/sequenced_worker_pool.h
index cbec39561a..0d42de9138 100644
--- a/base/threading/sequenced_worker_pool.h
+++ b/base/threading/sequenced_worker_pool.h
@@ -13,10 +13,11 @@
#include "base/base_export.h"
#include "base/callback_forward.h"
+#include "base/compiler_specific.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
-#include "base/single_thread_task_runner.h"
#include "base/task_runner.h"
+#include "base/task_scheduler/task_traits.h"
namespace tracked_objects {
class Location;
@@ -24,12 +25,10 @@ class Location;
namespace base {
-class SingleThreadTaskRunner;
+class SequencedTaskRunner;
template <class T> class DeleteHelper;
-class SequencedTaskRunner;
-
// A worker thread pool that enforces ordering between sets of tasks. It also
// allows you to specify what should happen to your tasks on shutdown.
//
@@ -47,8 +46,7 @@ class SequencedTaskRunner;
// destruction will be visible to T2.
//
// Example:
-// SequencedWorkerPool::SequenceToken token =
-// SequencedWorkerPool::GetSequenceToken();
+// SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken();
// pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
// FROM_HERE, base::Bind(...));
// pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
@@ -61,6 +59,10 @@ class SequencedTaskRunner;
// These will be executed in an unspecified order. The order of execution
// between tasks with different sequence tokens is also unspecified.
//
+// You must call EnableForProcess() or
+// EnableWithRedirectionToTaskSchedulerForProcess() before starting to post
+// tasks to a process' SequencedWorkerPools.
+//
// This class may be leaked on shutdown to facilitate fast shutdown. The
// expected usage, however, is to call Shutdown(), which correctly accounts
// for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN
@@ -164,36 +166,65 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
// an unsequenced task, returns an invalid SequenceToken.
static SequenceToken GetSequenceTokenForCurrentThread();
- // Gets a SequencedTaskRunner for the current thread. If the current thread is
- // running an unsequenced task, a new SequenceToken will be generated and set,
- // so that the returned SequencedTaskRunner is guaranteed to run tasks after
- // the current task has finished running.
- static scoped_refptr<SequencedTaskRunner>
- GetSequencedTaskRunnerForCurrentThread();
+ // Returns the SequencedWorkerPool that owns this thread, or null if the
+ // current thread is not a SequencedWorkerPool worker thread.
+ //
+ // Always returns nullptr when SequencedWorkerPool is redirected to
+ // TaskScheduler.
+ //
+ // DEPRECATED. Use SequencedTaskRunnerHandle::Get() instead. Consequentially
+ // the only remaining use case is in sequenced_task_runner_handle.cc to
+ // implement that and will soon be removed along with SequencedWorkerPool:
+ // http://crbug.com/622400.
+ static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread();
// Returns a unique token that can be used to sequence tasks posted to
// PostSequencedWorkerTask(). Valid tokens are always nonzero.
- // TODO(bauerb): Rename this to better differentiate from
- // GetSequenceTokenForCurrentThread().
static SequenceToken GetSequenceToken();
- // Returns the SequencedWorkerPool that owns this thread, or null if the
- // current thread is not a SequencedWorkerPool worker thread.
- static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread();
+ // Enables posting tasks to this process' SequencedWorkerPools. Cannot be
+ // called if already enabled. This is not thread-safe; proper synchronization
+ // is required to use any SequencedWorkerPool method after calling this.
+ static void EnableForProcess();
+
+ // Same as EnableForProcess(), but tasks are redirected to the registered
+ // TaskScheduler. All redirections' TaskPriority will be capped to
+ // |max_task_priority|. There must be a registered TaskScheduler when this is
+ // called.
+ // TODO(gab): Remove this if http://crbug.com/622400 fails
+ // (SequencedWorkerPool will be phased out completely otherwise).
+ static void EnableWithRedirectionToTaskSchedulerForProcess(
+ TaskPriority max_task_priority = TaskPriority::HIGHEST);
+
+ // Disables posting tasks to this process' SequencedWorkerPools. Calling this
+ // while there are active SequencedWorkerPools is not supported. This is not
+ // thread-safe; proper synchronization is required to use any
+ // SequencedWorkerPool method after calling this.
+ static void DisableForProcessForTesting();
+
+ // Returns true if posting tasks to this process' SequencedWorkerPool is
+ // enabled (with or without redirection to TaskScheduler).
+ static bool IsEnabled();
// When constructing a SequencedWorkerPool, there must be a
// ThreadTaskRunnerHandle on the current thread unless you plan to
// deliberately leak it.
- // Pass the maximum number of threads (they will be lazily created as needed)
- // and a prefix for the thread name to aid in debugging.
+ // Constructs a SequencedWorkerPool which will lazily create up to
+ // |max_threads| and a prefix for the thread name to aid in debugging.
+ // |max_threads| must be greater than 1. |task_priority| will be used to hint
+ // base::TaskScheduler for an experiment in which all SequencedWorkerPool
+ // tasks will be redirected to it in processes where a base::TaskScheduler was
+ // instantiated.
SequencedWorkerPool(size_t max_threads,
- const std::string& thread_name_prefix);
+ const std::string& thread_name_prefix,
+ base::TaskPriority task_priority);
// Like above, but with |observer| for testing. Does not take ownership of
// |observer|.
SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix,
+ base::TaskPriority task_priority,
TestingObserver* observer);
// Returns the sequence token associated with the given name. Calling this
@@ -207,7 +238,7 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
// delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
// are posted with BLOCK_SHUTDOWN behavior.
scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
- SequenceToken token);
+ SequenceToken token) WARN_UNUSED_RESULT;
// Returns a SequencedTaskRunner wrapper which posts to this
// SequencedWorkerPool using the given sequence token. Tasks with nonzero
@@ -215,14 +246,14 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
// are posted with the given shutdown behavior.
scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior(
SequenceToken token,
- WorkerShutdown shutdown_behavior);
+ WorkerShutdown shutdown_behavior) WARN_UNUSED_RESULT;
// Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using
// the given shutdown behavior. Tasks with nonzero delay are posted with
// SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the
// given shutdown behavior.
scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior(
- WorkerShutdown shutdown_behavior);
+ WorkerShutdown shutdown_behavior) WARN_UNUSED_RESULT;
// Posts the given task for execution in the worker pool. Tasks posted with
// this function will execute in an unspecified order on a background thread.
@@ -316,23 +347,21 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
TimeDelta delay) override;
bool RunsTasksOnCurrentThread() const override;
- // Returns true if the current thread is processing a task with the given
- // sequence_token.
- bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
-
- // Returns true if any thread is currently processing a task with the given
- // sequence token. Should only be called with a valid sequence token.
- bool IsRunningSequence(SequenceToken sequence_token) const;
-
// Blocks until all pending tasks are complete. This should only be called in
// unit tests when you want to validate something that should have happened.
- // This will not flush delayed tasks; delayed tasks get deleted.
+ // Does not wait for delayed tasks. If redirection to TaskScheduler is
+ // disabled, delayed tasks are deleted. If redirection to TaskScheduler is
+ // enabled, this will wait for all tasks posted to TaskScheduler (not just
+ // tasks posted to this SequencedWorkerPool).
//
// Note that calling this will not prevent other threads from posting work to
// the queue while the calling thread is waiting on Flush(). In this case,
// Flush will return only when there's no more work in the queue. Normally,
// this doesn't come up since in a test, all the work is being posted from
// the main thread.
+ //
+ // TODO(gab): Remove mentions of TaskScheduler in this comment if
+ // http://crbug.com/622400 fails.
void FlushForTesting();
// Spuriously signal that there is work to be done.
@@ -368,9 +397,14 @@ class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
friend class DeleteHelper<SequencedWorkerPool>;
class Inner;
+ class PoolSequencedTaskRunner;
class Worker;
- const scoped_refptr<SingleThreadTaskRunner> constructor_task_runner_;
+ // Returns true if the current thread is processing a task with the given
+ // sequence_token.
+ bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
+
+ const scoped_refptr<SequencedTaskRunner> constructor_task_runner_;
// Avoid pulling in too many headers by putting (almost) everything
// into |inner_|.
diff --git a/base/threading/simple_thread.cc b/base/threading/simple_thread.cc
index 6c64a17d6a..9eb443afab 100644
--- a/base/threading/simple_thread.cc
+++ b/base/threading/simple_thread.cc
@@ -12,62 +12,55 @@
namespace base {
SimpleThread::SimpleThread(const std::string& name_prefix)
- : name_prefix_(name_prefix),
- name_(name_prefix),
- thread_(),
- event_(WaitableEvent::ResetPolicy::MANUAL,
- WaitableEvent::InitialState::NOT_SIGNALED),
- tid_(0),
- joined_(false) {}
+ : SimpleThread(name_prefix, Options()) {}
SimpleThread::SimpleThread(const std::string& name_prefix,
const Options& options)
: name_prefix_(name_prefix),
- name_(name_prefix),
options_(options),
- thread_(),
event_(WaitableEvent::ResetPolicy::MANUAL,
- WaitableEvent::InitialState::NOT_SIGNALED),
- tid_(0),
- joined_(false) {}
+ WaitableEvent::InitialState::NOT_SIGNALED) {}
SimpleThread::~SimpleThread() {
DCHECK(HasBeenStarted()) << "SimpleThread was never started.";
- DCHECK(HasBeenJoined()) << "SimpleThread destroyed without being Join()ed.";
+ DCHECK(!options_.joinable || HasBeenJoined())
+ << "Joinable SimpleThread destroyed without being Join()ed.";
}
void SimpleThread::Start() {
DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times.";
- bool success;
- if (options_.priority() == ThreadPriority::NORMAL) {
- success = PlatformThread::Create(options_.stack_size(), this, &thread_);
- } else {
- success = PlatformThread::CreateWithPriority(options_.stack_size(), this,
- &thread_, options_.priority());
- }
+ bool success =
+ options_.joinable
+ ? PlatformThread::CreateWithPriority(options_.stack_size, this,
+ &thread_, options_.priority)
+ : PlatformThread::CreateNonJoinableWithPriority(
+ options_.stack_size, this, options_.priority);
DCHECK(success);
- base::ThreadRestrictions::ScopedAllowWait allow_wait;
+ ThreadRestrictions::ScopedAllowWait allow_wait;
event_.Wait(); // Wait for the thread to complete initialization.
}
void SimpleThread::Join() {
+ DCHECK(options_.joinable) << "A non-joinable thread can't be joined.";
DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread.";
DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times.";
PlatformThread::Join(thread_);
+ thread_ = PlatformThreadHandle();
joined_ = true;
}
bool SimpleThread::HasBeenStarted() {
- base::ThreadRestrictions::ScopedAllowWait allow_wait;
+ ThreadRestrictions::ScopedAllowWait allow_wait;
return event_.IsSignaled();
}
void SimpleThread::ThreadMain() {
tid_ = PlatformThread::CurrentId();
// Construct our full name of the form "name_prefix_/TID".
- name_.push_back('/');
- name_.append(IntToString(tid_));
- PlatformThread::SetName(name_);
+ std::string name(name_prefix_);
+ name.push_back('/');
+ name.append(IntToString(tid_));
+ PlatformThread::SetName(name);
// We've initialized our new thread, signal that we're done to Start().
event_.Signal();
@@ -77,24 +70,26 @@ void SimpleThread::ThreadMain() {
DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate,
const std::string& name_prefix)
- : SimpleThread(name_prefix),
- delegate_(delegate) {
-}
+ : DelegateSimpleThread(delegate, name_prefix, Options()) {}
DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate,
const std::string& name_prefix,
const Options& options)
: SimpleThread(name_prefix, options),
delegate_(delegate) {
+ DCHECK(delegate_);
}
-DelegateSimpleThread::~DelegateSimpleThread() {
-}
+DelegateSimpleThread::~DelegateSimpleThread() = default;
void DelegateSimpleThread::Run() {
DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)";
- delegate_->Run();
- delegate_ = NULL;
+
+ // Non-joinable DelegateSimpleThreads are allowed to be deleted during Run().
+ // Member state must not be accessed after invoking Run().
+ Delegate* delegate = delegate_;
+ delegate_ = nullptr;
+ delegate->Run();
}
DelegateSimpleThreadPool::DelegateSimpleThreadPool(
diff --git a/base/threading/simple_thread.h b/base/threading/simple_thread.h
index 3deeb1018c..f9f5e91045 100644
--- a/base/threading/simple_thread.h
+++ b/base/threading/simple_thread.h
@@ -48,6 +48,7 @@
#include "base/base_export.h"
#include "base/compiler_specific.h"
+#include "base/macros.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/platform_thread.h"
@@ -58,25 +59,26 @@ namespace base {
// virtual Run method, or you can use the DelegateSimpleThread interface.
class BASE_EXPORT SimpleThread : public PlatformThread::Delegate {
public:
- class BASE_EXPORT Options {
+ struct BASE_EXPORT Options {
public:
- Options() : stack_size_(0), priority_(ThreadPriority::NORMAL) {}
- explicit Options(ThreadPriority priority)
- : stack_size_(0), priority_(priority) {}
- ~Options() {}
+ Options() = default;
+ explicit Options(ThreadPriority priority_in) : priority(priority_in) {}
+ ~Options() = default;
- // We use the standard compiler-supplied copy constructor.
+ // Allow copies.
+ Options(const Options& other) = default;
+ Options& operator=(const Options& other) = default;
// A custom stack size, or 0 for the system default.
- void set_stack_size(size_t size) { stack_size_ = size; }
- size_t stack_size() const { return stack_size_; }
-
- // A custom thread priority.
- void set_priority(ThreadPriority priority) { priority_ = priority; }
- ThreadPriority priority() const { return priority_; }
- private:
- size_t stack_size_;
- ThreadPriority priority_;
+ size_t stack_size = 0;
+
+ ThreadPriority priority = ThreadPriority::NORMAL;
+
+ // If false, the underlying thread's PlatformThreadHandle will not be kept
+ // around and as such the SimpleThread instance will not be Join()able and
+ // must not be deleted before Run() is invoked. After that, it's up to
+ // the subclass to determine when it is safe to delete itself.
+ bool joinable = true;
};
// Create a SimpleThread. |options| should be used to manage any specific
@@ -94,19 +96,13 @@ class BASE_EXPORT SimpleThread : public PlatformThread::Delegate {
// Subclasses should override the Run method.
virtual void Run() = 0;
- // Return the thread name prefix, or "unnamed" if none was supplied.
- std::string name_prefix() { return name_prefix_; }
-
- // Return the completed name including TID, only valid after Start().
- std::string name() { return name_; }
-
// Return the thread id, only valid after Start().
PlatformThreadId tid() { return tid_; }
// Return True if Start() has ever been called.
bool HasBeenStarted();
- // Return True if Join() has evern been called.
+ // Return True if Join() has ever been called.
bool HasBeenJoined() { return joined_; }
// Overridden from PlatformThread::Delegate:
@@ -116,18 +112,24 @@ class BASE_EXPORT SimpleThread : public PlatformThread::Delegate {
const std::string name_prefix_;
std::string name_;
const Options options_;
- PlatformThreadHandle thread_; // PlatformThread handle, invalid after Join!
+ PlatformThreadHandle thread_; // PlatformThread handle, reset after Join.
WaitableEvent event_; // Signaled if Start() was ever called.
- PlatformThreadId tid_; // The backing thread's id.
- bool joined_; // True if Join has been called.
+ PlatformThreadId tid_ = kInvalidThreadId; // The backing thread's id.
+ bool joined_ = false; // True if Join has been called.
+
+ DISALLOW_COPY_AND_ASSIGN(SimpleThread);
};
+// A SimpleThread which delegates Run() to its Delegate. Non-joinable
+// DelegateSimpleThread are safe to delete after Run() was invoked, their
+// Delegates are also safe to delete after that point from this class' point of
+// view (although implementations must of course make sure that Run() will not
+// use their Delegate's member state after its deletion).
class BASE_EXPORT DelegateSimpleThread : public SimpleThread {
public:
class BASE_EXPORT Delegate {
public:
- Delegate() { }
- virtual ~Delegate() { }
+ virtual ~Delegate() = default;
virtual void Run() = 0;
};
@@ -142,6 +144,8 @@ class BASE_EXPORT DelegateSimpleThread : public SimpleThread {
private:
Delegate* delegate_;
+
+ DISALLOW_COPY_AND_ASSIGN(DelegateSimpleThread);
};
// DelegateSimpleThreadPool allows you to start up a fixed number of threads,
@@ -186,6 +190,8 @@ class BASE_EXPORT DelegateSimpleThreadPool
std::queue<Delegate*> delegates_;
base::Lock lock_; // Locks delegates_
WaitableEvent dry_; // Not signaled when there is no work to do.
+
+ DISALLOW_COPY_AND_ASSIGN(DelegateSimpleThreadPool);
};
} // namespace base
diff --git a/base/threading/simple_thread_unittest.cc b/base/threading/simple_thread_unittest.cc
index 14dd4591f1..0e52500c52 100644
--- a/base/threading/simple_thread_unittest.cc
+++ b/base/threading/simple_thread_unittest.cc
@@ -2,9 +2,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include <memory>
+
#include "base/atomic_sequence_num.h"
+#include "base/memory/ptr_util.h"
#include "base/strings/string_number_conversions.h"
#include "base/synchronization/waitable_event.h"
+#include "base/test/gtest_util.h"
#include "base/threading/simple_thread.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -17,11 +21,49 @@ class SetIntRunner : public DelegateSimpleThread::Delegate {
SetIntRunner(int* ptr, int val) : ptr_(ptr), val_(val) { }
~SetIntRunner() override {}
+ private:
void Run() override { *ptr_ = val_; }
- private:
int* ptr_;
int val_;
+
+ DISALLOW_COPY_AND_ASSIGN(SetIntRunner);
+};
+
+// Signals |started_| when Run() is invoked and waits until |released_| is
+// signaled to return, signaling |done_| before doing so. Useful for tests that
+// care to control Run()'s flow.
+class ControlledRunner : public DelegateSimpleThread::Delegate {
+ public:
+ ControlledRunner()
+ : started_(WaitableEvent::ResetPolicy::MANUAL,
+ WaitableEvent::InitialState::NOT_SIGNALED),
+ released_(WaitableEvent::ResetPolicy::MANUAL,
+ WaitableEvent::InitialState::NOT_SIGNALED),
+ done_(WaitableEvent::ResetPolicy::MANUAL,
+ WaitableEvent::InitialState::NOT_SIGNALED) {}
+
+ ~ControlledRunner() override { ReleaseAndWaitUntilDone(); }
+
+ void WaitUntilStarted() { started_.Wait(); }
+
+ void ReleaseAndWaitUntilDone() {
+ released_.Signal();
+ done_.Wait();
+ }
+
+ private:
+ void Run() override {
+ started_.Signal();
+ released_.Wait();
+ done_.Signal();
+ }
+
+ WaitableEvent started_;
+ WaitableEvent released_;
+ WaitableEvent done_;
+
+ DISALLOW_COPY_AND_ASSIGN(ControlledRunner);
};
class WaitEventRunner : public DelegateSimpleThread::Delegate {
@@ -29,22 +71,28 @@ class WaitEventRunner : public DelegateSimpleThread::Delegate {
explicit WaitEventRunner(WaitableEvent* event) : event_(event) { }
~WaitEventRunner() override {}
+ private:
void Run() override {
EXPECT_FALSE(event_->IsSignaled());
event_->Signal();
EXPECT_TRUE(event_->IsSignaled());
}
- private:
+
WaitableEvent* event_;
+
+ DISALLOW_COPY_AND_ASSIGN(WaitEventRunner);
};
class SeqRunner : public DelegateSimpleThread::Delegate {
public:
explicit SeqRunner(AtomicSequenceNumber* seq) : seq_(seq) { }
- void Run() override { seq_->GetNext(); }
private:
+ void Run() override { seq_->GetNext(); }
+
AtomicSequenceNumber* seq_;
+
+ DISALLOW_COPY_AND_ASSIGN(SeqRunner);
};
// We count up on a sequence number, firing on the event when we've hit our
@@ -56,6 +104,7 @@ class VerifyPoolRunner : public DelegateSimpleThread::Delegate {
int total, WaitableEvent* event)
: seq_(seq), total_(total), event_(event) { }
+ private:
void Run() override {
if (seq_->GetNext() == total_) {
event_->Signal();
@@ -64,10 +113,11 @@ class VerifyPoolRunner : public DelegateSimpleThread::Delegate {
}
}
- private:
AtomicSequenceNumber* seq_;
int total_;
WaitableEvent* event_;
+
+ DISALLOW_COPY_AND_ASSIGN(VerifyPoolRunner);
};
} // namespace
@@ -108,29 +158,44 @@ TEST(SimpleThreadTest, WaitForEvent) {
thread.Join();
}
-TEST(SimpleThreadTest, NamedWithOptions) {
- WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
- WaitableEvent::InitialState::NOT_SIGNALED);
+TEST(SimpleThreadTest, NonJoinableStartAndDieOnJoin) {
+ ControlledRunner runner;
- WaitEventRunner runner(&event);
SimpleThread::Options options;
- DelegateSimpleThread thread(&runner, "event_waiter", options);
- EXPECT_EQ(thread.name_prefix(), "event_waiter");
- EXPECT_FALSE(event.IsSignaled());
+ options.joinable = false;
+ DelegateSimpleThread thread(&runner, "non_joinable", options);
+ EXPECT_FALSE(thread.HasBeenStarted());
thread.Start();
- EXPECT_EQ(thread.name_prefix(), "event_waiter");
- EXPECT_EQ(thread.name(),
- std::string("event_waiter/") + IntToString(thread.tid()));
- event.Wait();
+ EXPECT_TRUE(thread.HasBeenStarted());
- EXPECT_TRUE(event.IsSignaled());
- thread.Join();
+ // Note: this is not quite the same as |thread.HasBeenStarted()| which
+ // represents ThreadMain() getting ready to invoke Run() whereas
+ // |runner.WaitUntilStarted()| ensures Run() was actually invoked.
+ runner.WaitUntilStarted();
+
+ EXPECT_FALSE(thread.HasBeenJoined());
+ EXPECT_DCHECK_DEATH({ thread.Join(); });
+}
+
+TEST(SimpleThreadTest, NonJoinableInactiveDelegateDestructionIsOkay) {
+ std::unique_ptr<ControlledRunner> runner(new ControlledRunner);
+
+ SimpleThread::Options options;
+ options.joinable = false;
+ std::unique_ptr<DelegateSimpleThread> thread(
+ new DelegateSimpleThread(runner.get(), "non_joinable", options));
+
+ thread->Start();
+ runner->WaitUntilStarted();
+
+ // Deleting a non-joinable SimpleThread after Run() was invoked is okay.
+ thread.reset();
- // We keep the name and tid, even after the thread is gone.
- EXPECT_EQ(thread.name_prefix(), "event_waiter");
- EXPECT_EQ(thread.name(),
- std::string("event_waiter/") + IntToString(thread.tid()));
+ runner->WaitUntilStarted();
+ runner->ReleaseAndWaitUntilDone();
+ // It should be safe to destroy a Delegate after its Run() method completed.
+ runner.reset();
}
TEST(SimpleThreadTest, ThreadPool) {
diff --git a/base/threading/thread.cc b/base/threading/thread.cc
index 9cdc6912ea..c30320f0dc 100644
--- a/base/threading/thread.cc
+++ b/base/threading/thread.cc
@@ -5,8 +5,10 @@
#include "base/threading/thread.h"
#include "base/bind.h"
+#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
#include "base/location.h"
+#include "base/logging.h"
#include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/thread_id_name_manager.h"
@@ -14,6 +16,10 @@
#include "base/threading/thread_restrictions.h"
#include "build/build_config.h"
+#if defined(OS_POSIX) && !defined(OS_NACL)
+#include "base/files/file_descriptor_watcher_posix.h"
+#endif
+
#if defined(OS_WIN)
#include "base/win/scoped_com_initializer.h"
#endif
@@ -26,53 +32,31 @@ namespace {
// because its Stop method was called. This allows us to catch cases where
// MessageLoop::QuitWhenIdle() is called directly, which is unexpected when
// using a Thread to setup and run a MessageLoop.
-base::LazyInstance<base::ThreadLocalBoolean> lazy_tls_bool =
+base::LazyInstance<base::ThreadLocalBoolean>::Leaky lazy_tls_bool =
LAZY_INSTANCE_INITIALIZER;
} // namespace
-// This is used to trigger the message loop to exit.
-void ThreadQuitHelper() {
- MessageLoop::current()->QuitWhenIdle();
- Thread::SetThreadWasQuitProperly(true);
-}
-
-Thread::Options::Options()
- : message_loop_type(MessageLoop::TYPE_DEFAULT),
- timer_slack(TIMER_SLACK_NONE),
- stack_size(0),
- priority(ThreadPriority::NORMAL) {
-}
+Thread::Options::Options() = default;
-Thread::Options::Options(MessageLoop::Type type,
- size_t size)
- : message_loop_type(type),
- timer_slack(TIMER_SLACK_NONE),
- stack_size(size),
- priority(ThreadPriority::NORMAL) {
-}
+Thread::Options::Options(MessageLoop::Type type, size_t size)
+ : message_loop_type(type), stack_size(size) {}
Thread::Options::Options(const Options& other) = default;
-Thread::Options::~Options() {
-}
+Thread::Options::~Options() = default;
Thread::Thread(const std::string& name)
- :
-#if defined(OS_WIN)
- com_status_(NONE),
-#endif
- stopping_(false),
- running_(false),
- thread_(0),
- id_(kInvalidThreadId),
- id_event_(WaitableEvent::ResetPolicy::MANUAL,
+ : id_event_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
- message_loop_(nullptr),
- message_loop_timer_slack_(TIMER_SLACK_NONE),
name_(name),
start_event_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED) {
+ // Only bind the sequence on Start(): the state is constant between
+ // construction and Start() and it's thus valid for Start() to be called on
+ // another sequence as long as every other operation is then performed on that
+ // sequence.
+ owning_sequence_checker_.DetachFromSequence();
}
Thread::~Thread() {
@@ -80,6 +64,8 @@ Thread::~Thread() {
}
bool Thread::Start() {
+ DCHECK(owning_sequence_checker_.CalledOnValidSequence());
+
Options options;
#if defined(OS_WIN)
if (com_status_ == STA)
@@ -89,7 +75,11 @@ bool Thread::Start() {
}
bool Thread::StartWithOptions(const Options& options) {
+ DCHECK(owning_sequence_checker_.CalledOnValidSequence());
DCHECK(!message_loop_);
+ DCHECK(!IsRunning());
+ DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's "
+ << "not allowed!";
#if defined(OS_WIN)
DCHECK((com_status_ != STA) ||
(options.message_loop_type == MessageLoop::TYPE_UI));
@@ -106,32 +96,41 @@ bool Thread::StartWithOptions(const Options& options) {
type = MessageLoop::TYPE_CUSTOM;
message_loop_timer_slack_ = options.timer_slack;
- std::unique_ptr<MessageLoop> message_loop =
+ std::unique_ptr<MessageLoop> message_loop_owned =
MessageLoop::CreateUnbound(type, options.message_pump_factory);
- message_loop_ = message_loop.get();
+ message_loop_ = message_loop_owned.get();
start_event_.Reset();
- // Hold the thread_lock_ while starting a new thread, so that we can make sure
- // that thread_ is populated before the newly created thread accesses it.
+ // Hold |thread_lock_| while starting the new thread to synchronize with
+ // Stop() while it's not guaranteed to be sequenced (until crbug/629139 is
+ // fixed).
{
AutoLock lock(thread_lock_);
- if (!PlatformThread::CreateWithPriority(options.stack_size, this, &thread_,
- options.priority)) {
+ bool success =
+ options.joinable
+ ? PlatformThread::CreateWithPriority(options.stack_size, this,
+ &thread_, options.priority)
+ : PlatformThread::CreateNonJoinableWithPriority(
+ options.stack_size, this, options.priority);
+ if (!success) {
DLOG(ERROR) << "failed to create thread";
message_loop_ = nullptr;
return false;
}
}
- // The ownership of message_loop is managemed by the newly created thread
+ joinable_ = options.joinable;
+
+ // The ownership of |message_loop_| is managed by the newly created thread
// within the ThreadMain.
- ignore_result(message_loop.release());
+ ignore_result(message_loop_owned.release());
DCHECK(message_loop_);
return true;
}
bool Thread::StartAndWaitForTesting() {
+ DCHECK(owning_sequence_checker_.CalledOnValidSequence());
bool result = Start();
if (!result)
return false;
@@ -140,6 +139,7 @@ bool Thread::StartAndWaitForTesting() {
}
bool Thread::WaitUntilThreadStarted() const {
+ DCHECK(owning_sequence_checker_.CalledOnValidSequence());
if (!message_loop_)
return false;
base::ThreadRestrictions::ScopedAllowWait allow_wait;
@@ -147,37 +147,74 @@ bool Thread::WaitUntilThreadStarted() const {
return true;
}
+void Thread::FlushForTesting() {
+ DCHECK(owning_sequence_checker_.CalledOnValidSequence());
+ if (!message_loop_)
+ return;
+
+ WaitableEvent done(WaitableEvent::ResetPolicy::AUTOMATIC,
+ WaitableEvent::InitialState::NOT_SIGNALED);
+ task_runner()->PostTask(FROM_HERE,
+ Bind(&WaitableEvent::Signal, Unretained(&done)));
+ done.Wait();
+}
+
void Thread::Stop() {
+ DCHECK(joinable_);
+
+ // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
+ // enable this check, until then synchronization with Start() via
+ // |thread_lock_| is required...
+ // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
AutoLock lock(thread_lock_);
- if (thread_.is_null())
- return;
StopSoon();
+ // Can't join if the |thread_| is either already gone or is non-joinable.
+ if (thread_.is_null())
+ return;
+
// Wait for the thread to exit.
//
- // TODO(darin): Unfortunately, we need to keep message_loop_ around until
+ // TODO(darin): Unfortunately, we need to keep |message_loop_| around until
// the thread exits. Some consumers are abusing the API. Make them stop.
//
PlatformThread::Join(thread_);
thread_ = base::PlatformThreadHandle();
- // The thread should nullify message_loop_ on exit.
+ // The thread should nullify |message_loop_| on exit (note: Join() adds an
+ // implicit memory barrier and no lock is thus required for this check).
DCHECK(!message_loop_);
stopping_ = false;
}
void Thread::StopSoon() {
- // We should only be called on the same thread that started us.
-
- DCHECK_NE(GetThreadId(), PlatformThread::CurrentId());
+ // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
+ // enable this check.
+ // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
if (stopping_ || !message_loop_)
return;
stopping_ = true;
- task_runner()->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper));
+
+ if (using_external_message_loop_) {
+ // Setting |stopping_| to true above should have been sufficient for this
+ // thread to be considered "stopped" per it having never set its |running_|
+ // bit by lack of its own ThreadMain.
+ DCHECK(!IsRunning());
+ message_loop_ = nullptr;
+ return;
+ }
+
+ task_runner()->PostTask(
+ FROM_HERE, base::Bind(&Thread::ThreadQuitHelper, Unretained(this)));
+}
+
+void Thread::DetachFromSequence() {
+ DCHECK(owning_sequence_checker_.CalledOnValidSequence());
+ owning_sequence_checker_.DetachFromSequence();
}
PlatformThreadId Thread::GetThreadId() const {
@@ -188,26 +225,36 @@ PlatformThreadId Thread::GetThreadId() const {
}
bool Thread::IsRunning() const {
- // If the thread's already started (i.e. message_loop_ is non-null) and
- // not yet requested to stop (i.e. stopping_ is false) we can just return
- // true. (Note that stopping_ is touched only on the same thread that
- // starts / started the new thread so we need no locking here.)
+ // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
+ // enable this check.
+ // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
+
+ // If the thread's already started (i.e. |message_loop_| is non-null) and not
+ // yet requested to stop (i.e. |stopping_| is false) we can just return true.
+ // (Note that |stopping_| is touched only on the same sequence that starts /
+ // started the new thread so we need no locking here.)
if (message_loop_ && !stopping_)
return true;
- // Otherwise check the running_ flag, which is set to true by the new thread
+ // Otherwise check the |running_| flag, which is set to true by the new thread
// only while it is inside Run().
AutoLock lock(running_lock_);
return running_;
}
-void Thread::Run(MessageLoop*) {
- RunLoop().Run();
+void Thread::Run(RunLoop* run_loop) {
+ // Overridable protected method to be called from our |thread_| only.
+ DCHECK(id_event_.IsSignaled());
+ DCHECK_EQ(id_, PlatformThread::CurrentId());
+
+ run_loop->Run();
}
+// static
void Thread::SetThreadWasQuitProperly(bool flag) {
lazy_tls_bool.Pointer()->Set(flag);
}
+// static
bool Thread::GetThreadWasQuitProperly() {
bool quit_properly = true;
#ifndef NDEBUG
@@ -216,9 +263,27 @@ bool Thread::GetThreadWasQuitProperly() {
return quit_properly;
}
+void Thread::SetMessageLoop(MessageLoop* message_loop) {
+ DCHECK(owning_sequence_checker_.CalledOnValidSequence());
+ DCHECK(message_loop);
+
+ // Setting |message_loop_| should suffice for this thread to be considered
+ // as "running", until Stop() is invoked.
+ DCHECK(!IsRunning());
+ message_loop_ = message_loop;
+ DCHECK(IsRunning());
+
+ using_external_message_loop_ = true;
+}
+
void Thread::ThreadMain() {
// First, make GetThreadId() available to avoid deadlocks. It could be called
// any place in the following thread initialization code.
+ DCHECK(!id_event_.IsSignaled());
+ // Note: this read of |id_| while |id_event_| isn't signaled is exceptionally
+ // okay because ThreadMain has a happens-after relationship with the other
+ // write in StartWithOptions().
+ DCHECK_EQ(kInvalidThreadId, id_);
id_ = PlatformThread::CurrentId();
DCHECK_NE(kInvalidThreadId, id_);
id_event_.Signal();
@@ -226,12 +291,22 @@ void Thread::ThreadMain() {
// Complete the initialization of our Thread object.
PlatformThread::SetName(name_.c_str());
- // Lazily initialize the message_loop so that it can run on this thread.
+ // Lazily initialize the |message_loop| so that it can run on this thread.
DCHECK(message_loop_);
std::unique_ptr<MessageLoop> message_loop(message_loop_);
message_loop_->BindToCurrentThread();
message_loop_->SetTimerSlack(message_loop_timer_slack_);
+#if defined(OS_POSIX) && !defined(OS_NACL)
+ // Allow threads running a MessageLoopForIO to use FileDescriptorWatcher API.
+ std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher;
+ if (MessageLoopForIO::IsCurrent()) {
+ DCHECK_EQ(message_loop_, MessageLoopForIO::current());
+ file_descriptor_watcher.reset(
+ new FileDescriptorWatcher(MessageLoopForIO::current()));
+ }
+#endif
+
#if defined(OS_WIN)
std::unique_ptr<win::ScopedCOMInitializer> com_initializer;
if (com_status_ != NONE) {
@@ -251,7 +326,9 @@ void Thread::ThreadMain() {
start_event_.Signal();
- Run(message_loop_);
+ RunLoop run_loop;
+ run_loop_ = &run_loop;
+ Run(run_loop_);
{
AutoLock lock(running_lock_);
@@ -266,15 +343,22 @@ void Thread::ThreadMain() {
#endif
if (message_loop->type() != MessageLoop::TYPE_CUSTOM) {
- // Assert that MessageLoop::QuitWhenIdle was called by ThreadQuitHelper.
- // Don't check for custom message pumps, because their shutdown might not
- // allow this.
+ // Assert that RunLoop::QuitWhenIdle was called by ThreadQuitHelper. Don't
+ // check for custom message pumps, because their shutdown might not allow
+ // this.
DCHECK(GetThreadWasQuitProperly());
}
// We can't receive messages anymore.
// (The message loop is destructed at the end of this block)
message_loop_ = nullptr;
+ run_loop_ = nullptr;
+}
+
+void Thread::ThreadQuitHelper() {
+ DCHECK(run_loop_);
+ run_loop_->QuitWhenIdle();
+ SetThreadWasQuitProperly(true);
}
} // namespace base
diff --git a/base/threading/thread.h b/base/threading/thread.h
index c9a77d7323..01f7d8e250 100644
--- a/base/threading/thread.h
+++ b/base/threading/thread.h
@@ -15,7 +15,9 @@
#include "base/macros.h"
#include "base/message_loop/message_loop.h"
#include "base/message_loop/timer_slack.h"
+#include "base/sequence_checker.h"
#include "base/single_thread_task_runner.h"
+#include "base/synchronization/atomic_flag.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/platform_thread.h"
@@ -24,6 +26,7 @@
namespace base {
class MessagePump;
+class RunLoop;
// A simple thread abstraction that establishes a MessageLoop on a new thread.
// The consumer uses the MessageLoop of the thread to cause code to execute on
@@ -38,6 +41,18 @@ class MessagePump;
// (1) Thread::CleanUp()
// (2) MessageLoop::~MessageLoop
// (3.b) MessageLoop::DestructionObserver::WillDestroyCurrentMessageLoop
+//
+// This API is not thread-safe: unless indicated otherwise its methods are only
+// valid from the owning sequence (which is the one from which Start() is
+// invoked -- should it differ from the one on which it was constructed).
+//
+// Sometimes it's useful to kick things off on the initial sequence (e.g.
+// construction, Start(), task_runner()), but to then hand the Thread over to a
+// pool of users for the last one of them to destroy it when done. For that use
+// case, Thread::DetachFromSequence() allows the owning sequence to give up
+// ownership. The caller is then responsible to ensure a happens-after
+// relationship between the DetachFromSequence() call and the next use of that
+// Thread object (including ~Thread()).
class BASE_EXPORT Thread : PlatformThread::Delegate {
public:
struct BASE_EXPORT Options {
@@ -50,10 +65,10 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// Specifies the type of message loop that will be allocated on the thread.
// This is ignored if message_pump_factory.is_null() is false.
- MessageLoop::Type message_loop_type;
+ MessageLoop::Type message_loop_type = MessageLoop::TYPE_DEFAULT;
// Specifies timer slack for thread message loop.
- TimerSlack timer_slack;
+ TimerSlack timer_slack = TIMER_SLACK_NONE;
// Used to create the MessagePump for the MessageLoop. The callback is Run()
// on the thread. If message_pump_factory.is_null(), then a MessagePump
@@ -64,10 +79,18 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// Specifies the maximum stack size that the thread is allowed to use.
// This does not necessarily correspond to the thread's initial stack size.
// A value of 0 indicates that the default maximum should be used.
- size_t stack_size;
+ size_t stack_size = 0;
// Specifies the initial thread priority.
- ThreadPriority priority;
+ ThreadPriority priority = ThreadPriority::NORMAL;
+
+ // If false, the thread will not be joined on destruction. This is intended
+ // for threads that want TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN
+ // semantics. Non-joinable threads can't be joined (must be leaked and
+ // can't be destroyed or Stop()'ed).
+ // TODO(gab): allow non-joinable instances to be deleted without causing
+ // user-after-frees (proposal @ https://crbug.com/629139#c14)
+ bool joinable = true;
};
// Constructor.
@@ -125,12 +148,19 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// carefully for production code.
bool WaitUntilThreadStarted() const;
- // Signals the thread to exit and returns once the thread has exited. After
- // this method returns, the Thread object is completely reset and may be used
- // as if it were newly constructed (i.e., Start may be called again).
+ // Blocks until all tasks previously posted to this thread have been executed.
+ void FlushForTesting();
+
+ // Signals the thread to exit and returns once the thread has exited. The
+ // Thread object is completely reset and may be used as if it were newly
+ // constructed (i.e., Start may be called again). Can only be called if
+ // |joinable_|.
//
// Stop may be called multiple times and is simply ignored if the thread is
- // already stopped.
+ // already stopped or currently stopping.
+ //
+ // Start/Stop are not thread-safe and callers that desire to invoke them from
+ // different threads must ensure mutual exclusion.
//
// NOTE: If you are a consumer of Thread, it is not necessary to call this
// before deleting your Thread objects, as the destructor will do it.
@@ -145,11 +175,17 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// deadlock on Windows with printer worker thread. In any other case, Stop()
// should be used.
//
- // StopSoon should not be called multiple times as it is risky to do so. It
- // could cause a timing issue in message_loop() access. Call Stop() to reset
- // the thread object once it is known that the thread has quit.
+ // Call Stop() to reset the thread object once it is known that the thread has
+ // quit.
void StopSoon();
+ // Detaches the owning sequence, indicating that the next call to this API
+ // (including ~Thread()) can happen from a different sequence (to which it
+ // will be rebound). This call itself must happen on the current owning
+ // sequence and the caller must ensure the next API call has a happens-after
+ // relationship with this one.
+ void DetachFromSequence();
+
// Returns the message loop for this thread. Use the MessageLoop's
// PostTask methods to execute code on the thread. This only returns
// non-null after a successful call to Start. After Stop has been called,
@@ -158,29 +194,52 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// NOTE: You must not call this MessageLoop's Quit method directly. Use
// the Thread's Stop method instead.
//
- MessageLoop* message_loop() const { return message_loop_; }
+ // In addition to this Thread's owning sequence, this can also safely be
+ // called from the underlying thread itself.
+ MessageLoop* message_loop() const {
+ // This class doesn't provide synchronization around |message_loop_| and as
+ // such only the owner should access it (and the underlying thread which
+ // never sees it before it's set). In practice, many callers are coming from
+ // unrelated threads but provide their own implicit (e.g. memory barriers
+ // from task posting) or explicit (e.g. locks) synchronization making the
+ // access of |message_loop_| safe... Changing all of those callers is
+ // unfeasible; instead verify that they can reliably see
+ // |message_loop_ != nullptr| without synchronization as a proof that their
+ // external synchronization catches the unsynchronized effects of Start().
+ // TODO(gab): Despite all of the above this test has to be disabled for now
+ // per crbug.com/629139#c6.
+ // DCHECK(owning_sequence_checker_.CalledOnValidSequence() ||
+ // (id_event_.IsSignaled() && id_ == PlatformThread::CurrentId()) ||
+ // message_loop_);
+ return message_loop_;
+ }
// Returns a TaskRunner for this thread. Use the TaskRunner's PostTask
// methods to execute code on the thread. Returns nullptr if the thread is not
// running (e.g. before Start or after Stop have been called). Callers can
// hold on to this even after the thread is gone; in this situation, attempts
// to PostTask() will fail.
+ //
+ // In addition to this Thread's owning sequence, this can also safely be
+ // called from the underlying thread itself.
scoped_refptr<SingleThreadTaskRunner> task_runner() const {
+ // Refer to the DCHECK and comment inside |message_loop()|.
+ DCHECK(owning_sequence_checker_.CalledOnValidSequence() ||
+ (id_event_.IsSignaled() && id_ == PlatformThread::CurrentId()) ||
+ message_loop_);
return message_loop_ ? message_loop_->task_runner() : nullptr;
}
// Returns the name of this thread (for display in debugger too).
const std::string& thread_name() const { return name_; }
- // The native thread handle.
- PlatformThreadHandle thread_handle() { return thread_; }
-
// Returns the thread ID. Should not be called before the first Start*()
// call. Keeps on returning the same ID even after a Stop() call. The next
// Start*() call renews the ID.
//
// WARNING: This function will block if the thread hasn't started yet.
//
+ // This method is thread-safe.
PlatformThreadId GetThreadId() const;
// Returns true if the thread has been started, and not yet stopped.
@@ -190,8 +249,8 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// Called just prior to starting the message loop
virtual void Init() {}
- // Called to start the message loop
- virtual void Run(MessageLoop* message_loop);
+ // Called to start the run loop
+ virtual void Run(RunLoop* run_loop);
// Called just after the message loop ends
virtual void CleanUp() {}
@@ -199,8 +258,11 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
static void SetThreadWasQuitProperly(bool flag);
static bool GetThreadWasQuitProperly();
- void set_message_loop(MessageLoop* message_loop) {
- message_loop_ = message_loop;
+ // Bind this Thread to an existing MessageLoop instead of starting a new one.
+ void SetMessageLoop(MessageLoop* message_loop);
+
+ bool using_external_message_loop() const {
+ return using_external_message_loop_;
}
private:
@@ -215,19 +277,25 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
// PlatformThread::Delegate methods:
void ThreadMain() override;
+ void ThreadQuitHelper();
+
#if defined(OS_WIN)
// Whether this thread needs to initialize COM, and if so, in what mode.
- ComStatus com_status_;
+ ComStatus com_status_ = NONE;
#endif
+ // Mirrors the Options::joinable field used to start this thread. Verified
+ // on Stop() -- non-joinable threads can't be joined (must be leaked).
+ bool joinable_ = true;
+
// If true, we're in the middle of stopping, and shouldn't access
// |message_loop_|. It may non-nullptr and invalid.
// Should be written on the thread that created this thread. Also read data
// could be wrong on other threads.
- bool stopping_;
+ bool stopping_ = false;
// True while inside of Run().
- bool running_;
+ bool running_ = false;
mutable base::Lock running_lock_; // Protects |running_|.
// The thread's handle.
@@ -235,24 +303,35 @@ class BASE_EXPORT Thread : PlatformThread::Delegate {
mutable base::Lock thread_lock_; // Protects |thread_|.
// The thread's id once it has started.
- PlatformThreadId id_;
- mutable WaitableEvent id_event_; // Protects |id_|.
-
- // The thread's message loop. Valid only while the thread is alive. Set
- // by the created thread.
- MessageLoop* message_loop_;
+ PlatformThreadId id_ = kInvalidThreadId;
+ // Protects |id_| which must only be read while it's signaled.
+ mutable WaitableEvent id_event_;
+
+ // The thread's MessageLoop and RunLoop. Valid only while the thread is alive.
+ // Set by the created thread.
+ MessageLoop* message_loop_ = nullptr;
+ RunLoop* run_loop_ = nullptr;
+
+ // True only if |message_loop_| was externally provided by |SetMessageLoop()|
+ // in which case this Thread has no underlying |thread_| and should merely
+ // drop |message_loop_| on Stop(). In that event, this remains true after
+ // Stop() was invoked so that subclasses can use this state to build their own
+ // cleanup logic as required.
+ bool using_external_message_loop_ = false;
// Stores Options::timer_slack_ until the message loop has been bound to
// a thread.
- TimerSlack message_loop_timer_slack_;
+ TimerSlack message_loop_timer_slack_ = TIMER_SLACK_NONE;
// The name of the thread. Used for debugging purposes.
- std::string name_;
+ const std::string name_;
// Signaled when the created thread gets ready to use the message loop.
mutable WaitableEvent start_event_;
- friend void ThreadQuitHelper();
+ // This class is not thread-safe, use this to verify access from the owning
+ // sequence of the Thread.
+ SequenceChecker owning_sequence_checker_;
DISALLOW_COPY_AND_ASSIGN(Thread);
};
diff --git a/base/threading/thread_checker.h b/base/threading/thread_checker.h
index 1d970f093e..1d4eb1c7b0 100644
--- a/base/threading/thread_checker.h
+++ b/base/threading/thread_checker.h
@@ -8,16 +8,6 @@
#include "base/logging.h"
#include "base/threading/thread_checker_impl.h"
-// Apart from debug builds, we also enable the thread checker in
-// builds with DCHECK_ALWAYS_ON so that trybots and waterfall bots
-// with this define will get the same level of thread checking as
-// debug bots.
-#if DCHECK_IS_ON()
-#define ENABLE_THREAD_CHECKER 1
-#else
-#define ENABLE_THREAD_CHECKER 0
-#endif
-
namespace base {
// Do nothing implementation, for use in release mode.
@@ -63,16 +53,20 @@ class ThreadCheckerDoNothing {
// ThreadChecker thread_checker_;
// }
//
+// Note that, when enabled, CalledOnValidThread() returns false when called from
+// tasks posted to SingleThreadTaskRunners bound to different sequences, even if
+// the tasks happen to run on the same thread (e.g. two independent TaskRunners
+// with ExecutionMode::SINGLE_THREADED on the TaskScheduler that happen to share
+// a thread).
+//
// In Release mode, CalledOnValidThread will always return true.
-#if ENABLE_THREAD_CHECKER
+#if DCHECK_IS_ON()
class ThreadChecker : public ThreadCheckerImpl {
};
#else
class ThreadChecker : public ThreadCheckerDoNothing {
};
-#endif // ENABLE_THREAD_CHECKER
-
-#undef ENABLE_THREAD_CHECKER
+#endif // DCHECK_IS_ON()
} // namespace base
diff --git a/base/threading/thread_checker_impl.cc b/base/threading/thread_checker_impl.cc
index eb87bae772..d5ccbdb943 100644
--- a/base/threading/thread_checker_impl.cc
+++ b/base/threading/thread_checker_impl.cc
@@ -4,31 +4,54 @@
#include "base/threading/thread_checker_impl.h"
+#include "base/threading/thread_task_runner_handle.h"
+
namespace base {
-ThreadCheckerImpl::ThreadCheckerImpl()
- : valid_thread_id_() {
- EnsureThreadIdAssigned();
+ThreadCheckerImpl::ThreadCheckerImpl() {
+ AutoLock auto_lock(lock_);
+ EnsureAssigned();
}
-ThreadCheckerImpl::~ThreadCheckerImpl() {}
+ThreadCheckerImpl::~ThreadCheckerImpl() = default;
bool ThreadCheckerImpl::CalledOnValidThread() const {
- EnsureThreadIdAssigned();
AutoLock auto_lock(lock_);
- return valid_thread_id_ == PlatformThread::CurrentRef();
+ EnsureAssigned();
+
+ // Always return true when called from the task from which this
+ // ThreadCheckerImpl was assigned to a thread.
+ if (task_token_ == TaskToken::GetForCurrentThread())
+ return true;
+
+ // If this ThreadCheckerImpl is bound to a valid SequenceToken, it must be
+ // equal to the current SequenceToken and there must be a registered
+ // ThreadTaskRunnerHandle. Otherwise, the fact that the current task runs on
+ // the thread to which this ThreadCheckerImpl is bound is fortuitous.
+ if (sequence_token_.IsValid() &&
+ (sequence_token_ != SequenceToken::GetForCurrentThread() ||
+ !ThreadTaskRunnerHandle::IsSet())) {
+ return false;
+ }
+
+ return thread_id_ == PlatformThread::CurrentRef();
}
void ThreadCheckerImpl::DetachFromThread() {
AutoLock auto_lock(lock_);
- valid_thread_id_ = PlatformThreadRef();
+ thread_id_ = PlatformThreadRef();
+ task_token_ = TaskToken();
+ sequence_token_ = SequenceToken();
}
-void ThreadCheckerImpl::EnsureThreadIdAssigned() const {
- AutoLock auto_lock(lock_);
- if (valid_thread_id_.is_null()) {
- valid_thread_id_ = PlatformThread::CurrentRef();
- }
+void ThreadCheckerImpl::EnsureAssigned() const {
+ lock_.AssertAcquired();
+ if (!thread_id_.is_null())
+ return;
+
+ thread_id_ = PlatformThread::CurrentRef();
+ task_token_ = TaskToken::GetForCurrentThread();
+ sequence_token_ = SequenceToken::GetForCurrentThread();
}
} // namespace base
diff --git a/base/threading/thread_checker_impl.h b/base/threading/thread_checker_impl.h
index c92e143db0..13193d1299 100644
--- a/base/threading/thread_checker_impl.h
+++ b/base/threading/thread_checker_impl.h
@@ -7,17 +7,18 @@
#include "base/base_export.h"
#include "base/compiler_specific.h"
+#include "base/sequence_token.h"
#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"
namespace base {
-// Real implementation of ThreadChecker, for use in debug mode, or
-// for temporary use in release mode (e.g. to CHECK on a threading issue
-// seen only in the wild).
+// Real implementation of ThreadChecker, for use in debug mode, or for temporary
+// use in release mode (e.g. to CHECK on a threading issue seen only in the
+// wild).
//
-// Note: You should almost always use the ThreadChecker class to get the
-// right version for your build configuration.
+// Note: You should almost always use the ThreadChecker class to get the right
+// version for your build configuration.
class BASE_EXPORT ThreadCheckerImpl {
public:
ThreadCheckerImpl();
@@ -31,12 +32,29 @@ class BASE_EXPORT ThreadCheckerImpl {
void DetachFromThread();
private:
- void EnsureThreadIdAssigned() const;
+ void EnsureAssigned() const;
+ // Members are mutable so that CalledOnValidThread() can set them.
+
+ // Synchronizes access to all members.
mutable base::Lock lock_;
- // This is mutable so that CalledOnValidThread can set it.
- // It's guarded by |lock_|.
- mutable PlatformThreadRef valid_thread_id_;
+
+ // Thread on which CalledOnValidThread() may return true.
+ mutable PlatformThreadRef thread_id_;
+
+ // TaskToken for which CalledOnValidThread() always returns true. This allows
+ // CalledOnValidThread() to return true when called multiple times from the
+ // same task, even if it's not running in a single-threaded context itself
+ // (allowing usage of ThreadChecker/NonThreadSafe objects on the stack in the
+ // scope of one-off tasks). Note: CalledOnValidThread() may return true even
+ // if the current TaskToken is not equal to this.
+ mutable TaskToken task_token_;
+
+ // SequenceToken for which CalledOnValidThread() may return true. Used to
+ // ensure that CalledOnValidThread() doesn't return true for TaskScheduler
+ // tasks that happen to run on the same thread but weren't posted to the same
+ // SingleThreadTaskRunner.
+ mutable SequenceToken sequence_token_;
};
} // namespace base
diff --git a/base/threading/thread_checker_unittest.cc b/base/threading/thread_checker_unittest.cc
index bc5b1e473a..96455e66c7 100644
--- a/base/threading/thread_checker_unittest.cc
+++ b/base/threading/thread_checker_unittest.cc
@@ -2,180 +2,194 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "base/threading/thread_checker.h"
-
#include <memory>
-#include "base/logging.h"
+#include "base/bind.h"
+#include "base/bind_helpers.h"
#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/sequence_token.h"
+#include "base/test/test_simple_task_runner.h"
#include "base/threading/simple_thread.h"
+#include "base/threading/thread_checker_impl.h"
+#include "base/threading/thread_task_runner_handle.h"
#include "testing/gtest/include/gtest/gtest.h"
-// Duplicated from base/threading/thread_checker.h so that we can be
-// good citizens there and undef the macro.
-#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
-#define ENABLE_THREAD_CHECKER 1
-#else
-#define ENABLE_THREAD_CHECKER 0
-#endif
-
namespace base {
-
namespace {
-// Simple class to exercise the basics of ThreadChecker.
-// Both the destructor and DoStuff should verify that they were
-// called on the same thread as the constructor.
-class ThreadCheckerClass : public ThreadChecker {
+// A thread that runs a callback.
+class RunCallbackThread : public SimpleThread {
public:
- ThreadCheckerClass() {}
-
- // Verifies that it was called on the same thread as the constructor.
- void DoStuff() {
- DCHECK(CalledOnValidThread());
- }
-
- void DetachFromThread() {
- ThreadChecker::DetachFromThread();
- }
-
- static void MethodOnDifferentThreadImpl();
- static void DetachThenCallFromDifferentThreadImpl();
+ explicit RunCallbackThread(const Closure& callback)
+ : SimpleThread("RunCallbackThread"), callback_(callback) {}
private:
- DISALLOW_COPY_AND_ASSIGN(ThreadCheckerClass);
-};
+ // SimpleThread:
+ void Run() override { callback_.Run(); }
-// Calls ThreadCheckerClass::DoStuff on another thread.
-class CallDoStuffOnThread : public base::SimpleThread {
- public:
- explicit CallDoStuffOnThread(ThreadCheckerClass* thread_checker_class)
- : SimpleThread("call_do_stuff_on_thread"),
- thread_checker_class_(thread_checker_class) {
- }
+ const Closure callback_;
- void Run() override { thread_checker_class_->DoStuff(); }
+ DISALLOW_COPY_AND_ASSIGN(RunCallbackThread);
+};
- private:
- ThreadCheckerClass* thread_checker_class_;
+// Runs a callback on a new thread synchronously.
+void RunCallbackOnNewThreadSynchronously(const Closure& callback) {
+ RunCallbackThread run_callback_thread(callback);
+ run_callback_thread.Start();
+ run_callback_thread.Join();
+}
- DISALLOW_COPY_AND_ASSIGN(CallDoStuffOnThread);
-};
+void ExpectCalledOnValidThread(ThreadCheckerImpl* thread_checker) {
+ ASSERT_TRUE(thread_checker);
-// Deletes ThreadCheckerClass on a different thread.
-class DeleteThreadCheckerClassOnThread : public base::SimpleThread {
- public:
- explicit DeleteThreadCheckerClassOnThread(
- ThreadCheckerClass* thread_checker_class)
- : SimpleThread("delete_thread_checker_class_on_thread"),
- thread_checker_class_(thread_checker_class) {
- }
+ // This should bind |thread_checker| to the current thread if it wasn't
+ // already bound to a thread.
+ EXPECT_TRUE(thread_checker->CalledOnValidThread());
- void Run() override { thread_checker_class_.reset(); }
+ // Since |thread_checker| is now bound to the current thread, another call to
+ // CalledOnValidThread() should return true.
+ EXPECT_TRUE(thread_checker->CalledOnValidThread());
+}
- private:
- std::unique_ptr<ThreadCheckerClass> thread_checker_class_;
+void ExpectNotCalledOnValidThread(ThreadCheckerImpl* thread_checker) {
+ ASSERT_TRUE(thread_checker);
+ EXPECT_FALSE(thread_checker->CalledOnValidThread());
+}
- DISALLOW_COPY_AND_ASSIGN(DeleteThreadCheckerClassOnThread);
-};
+void ExpectNotCalledOnValidThreadWithSequenceTokenAndThreadTaskRunnerHandle(
+ ThreadCheckerImpl* thread_checker,
+ SequenceToken sequence_token) {
+ ThreadTaskRunnerHandle thread_task_runner_handle(
+ make_scoped_refptr(new TestSimpleTaskRunner));
+ ScopedSetSequenceTokenForCurrentThread
+ scoped_set_sequence_token_for_current_thread(sequence_token);
+ ExpectNotCalledOnValidThread(thread_checker);
+}
} // namespace
-TEST(ThreadCheckerTest, CallsAllowedOnSameThread) {
- std::unique_ptr<ThreadCheckerClass> thread_checker_class(
- new ThreadCheckerClass);
+TEST(ThreadCheckerTest, AllowedSameThreadNoSequenceToken) {
+ ThreadCheckerImpl thread_checker;
+ EXPECT_TRUE(thread_checker.CalledOnValidThread());
+}
- // Verify that DoStuff doesn't assert.
- thread_checker_class->DoStuff();
+TEST(ThreadCheckerTest,
+ AllowedSameThreadAndSequenceDifferentTasksWithThreadTaskRunnerHandle) {
+ ThreadTaskRunnerHandle thread_task_runner_handle(
+ make_scoped_refptr(new TestSimpleTaskRunner));
- // Verify that the destructor doesn't assert.
- thread_checker_class.reset();
-}
+ std::unique_ptr<ThreadCheckerImpl> thread_checker;
+ const SequenceToken sequence_token = SequenceToken::Create();
-TEST(ThreadCheckerTest, DestructorAllowedOnDifferentThread) {
- std::unique_ptr<ThreadCheckerClass> thread_checker_class(
- new ThreadCheckerClass);
+ {
+ ScopedSetSequenceTokenForCurrentThread
+ scoped_set_sequence_token_for_current_thread(sequence_token);
+ thread_checker.reset(new ThreadCheckerImpl);
+ }
- // Verify that the destructor doesn't assert
- // when called on a different thread.
- DeleteThreadCheckerClassOnThread delete_on_thread(
- thread_checker_class.release());
+ {
+ ScopedSetSequenceTokenForCurrentThread
+ scoped_set_sequence_token_for_current_thread(sequence_token);
+ EXPECT_TRUE(thread_checker->CalledOnValidThread());
+ }
+}
- delete_on_thread.Start();
- delete_on_thread.Join();
+TEST(ThreadCheckerTest,
+ AllowedSameThreadSequenceAndTaskNoThreadTaskRunnerHandle) {
+ ScopedSetSequenceTokenForCurrentThread
+ scoped_set_sequence_token_for_current_thread(SequenceToken::Create());
+ ThreadCheckerImpl thread_checker;
+ EXPECT_TRUE(thread_checker.CalledOnValidThread());
}
-TEST(ThreadCheckerTest, DetachFromThread) {
- std::unique_ptr<ThreadCheckerClass> thread_checker_class(
- new ThreadCheckerClass);
+TEST(ThreadCheckerTest,
+ DisallowedSameThreadAndSequenceDifferentTasksNoThreadTaskRunnerHandle) {
+ std::unique_ptr<ThreadCheckerImpl> thread_checker;
- // Verify that DoStuff doesn't assert when called on a different thread after
- // a call to DetachFromThread.
- thread_checker_class->DetachFromThread();
- CallDoStuffOnThread call_on_thread(thread_checker_class.get());
+ {
+ ScopedSetSequenceTokenForCurrentThread
+ scoped_set_sequence_token_for_current_thread(SequenceToken::Create());
+ thread_checker.reset(new ThreadCheckerImpl);
+ }
- call_on_thread.Start();
- call_on_thread.Join();
+ {
+ ScopedSetSequenceTokenForCurrentThread
+ scoped_set_sequence_token_for_current_thread(SequenceToken::Create());
+ EXPECT_FALSE(thread_checker->CalledOnValidThread());
+ }
}
-#if GTEST_HAS_DEATH_TEST || !ENABLE_THREAD_CHECKER
+TEST(ThreadCheckerTest, DisallowedDifferentThreadsNoSequenceToken) {
+ ThreadCheckerImpl thread_checker;
+ RunCallbackOnNewThreadSynchronously(
+ Bind(&ExpectNotCalledOnValidThread, Unretained(&thread_checker)));
+}
-void ThreadCheckerClass::MethodOnDifferentThreadImpl() {
- std::unique_ptr<ThreadCheckerClass> thread_checker_class(
- new ThreadCheckerClass);
+TEST(ThreadCheckerTest, DisallowedDifferentThreadsSameSequence) {
+ ThreadTaskRunnerHandle thread_task_runner_handle(
+ make_scoped_refptr(new TestSimpleTaskRunner));
+ const SequenceToken sequence_token(SequenceToken::Create());
- // DoStuff should assert in debug builds only when called on a
- // different thread.
- CallDoStuffOnThread call_on_thread(thread_checker_class.get());
+ ScopedSetSequenceTokenForCurrentThread
+ scoped_set_sequence_token_for_current_thread(sequence_token);
+ ThreadCheckerImpl thread_checker;
+ EXPECT_TRUE(thread_checker.CalledOnValidThread());
- call_on_thread.Start();
- call_on_thread.Join();
+ RunCallbackOnNewThreadSynchronously(Bind(
+ &ExpectNotCalledOnValidThreadWithSequenceTokenAndThreadTaskRunnerHandle,
+ Unretained(&thread_checker), sequence_token));
}
-#if ENABLE_THREAD_CHECKER
-TEST(ThreadCheckerDeathTest, MethodNotAllowedOnDifferentThreadInDebug) {
- ASSERT_DEATH({
- ThreadCheckerClass::MethodOnDifferentThreadImpl();
- }, "");
-}
-#else
-TEST(ThreadCheckerTest, MethodAllowedOnDifferentThreadInRelease) {
- ThreadCheckerClass::MethodOnDifferentThreadImpl();
-}
-#endif // ENABLE_THREAD_CHECKER
+TEST(ThreadCheckerTest, DisallowedSameThreadDifferentSequence) {
+ std::unique_ptr<ThreadCheckerImpl> thread_checker;
-void ThreadCheckerClass::DetachThenCallFromDifferentThreadImpl() {
- std::unique_ptr<ThreadCheckerClass> thread_checker_class(
- new ThreadCheckerClass);
+ ThreadTaskRunnerHandle thread_task_runner_handle(
+ make_scoped_refptr(new TestSimpleTaskRunner));
- // DoStuff doesn't assert when called on a different thread
- // after a call to DetachFromThread.
- thread_checker_class->DetachFromThread();
- CallDoStuffOnThread call_on_thread(thread_checker_class.get());
+ {
+ ScopedSetSequenceTokenForCurrentThread
+ scoped_set_sequence_token_for_current_thread(SequenceToken::Create());
+ thread_checker.reset(new ThreadCheckerImpl);
+ }
- call_on_thread.Start();
- call_on_thread.Join();
+ {
+ // Different SequenceToken.
+ ScopedSetSequenceTokenForCurrentThread
+ scoped_set_sequence_token_for_current_thread(SequenceToken::Create());
+ EXPECT_FALSE(thread_checker->CalledOnValidThread());
+ }
- // DoStuff should assert in debug builds only after moving to
- // another thread.
- thread_checker_class->DoStuff();
+ // No SequenceToken.
+ EXPECT_FALSE(thread_checker->CalledOnValidThread());
}
-#if ENABLE_THREAD_CHECKER
-TEST(ThreadCheckerDeathTest, DetachFromThreadInDebug) {
- ASSERT_DEATH({
- ThreadCheckerClass::DetachThenCallFromDifferentThreadImpl();
- }, "");
-}
-#else
-TEST(ThreadCheckerTest, DetachFromThreadInRelease) {
- ThreadCheckerClass::DetachThenCallFromDifferentThreadImpl();
+TEST(ThreadCheckerTest, DetachFromThread) {
+ ThreadCheckerImpl thread_checker;
+ thread_checker.DetachFromThread();
+
+ // Verify that CalledOnValidThread() returns true when called on a different
+ // thread after a call to DetachFromThread().
+ RunCallbackOnNewThreadSynchronously(
+ Bind(&ExpectCalledOnValidThread, Unretained(&thread_checker)));
+
+ EXPECT_FALSE(thread_checker.CalledOnValidThread());
}
-#endif // ENABLE_THREAD_CHECKER
-#endif // GTEST_HAS_DEATH_TEST || !ENABLE_THREAD_CHECKER
+TEST(ThreadCheckerTest, DetachFromThreadWithSequenceToken) {
+ ThreadTaskRunnerHandle thread_task_runner_handle(
+ make_scoped_refptr(new TestSimpleTaskRunner));
+ ScopedSetSequenceTokenForCurrentThread
+ scoped_set_sequence_token_for_current_thread(SequenceToken::Create());
+ ThreadCheckerImpl thread_checker;
+ thread_checker.DetachFromThread();
-// Just in case we ever get lumped together with other compilation units.
-#undef ENABLE_THREAD_CHECKER
+ // Verify that CalledOnValidThread() returns true when called on a different
+ // thread after a call to DetachFromThread().
+ RunCallbackOnNewThreadSynchronously(
+ Bind(&ExpectCalledOnValidThread, Unretained(&thread_checker)));
+
+ EXPECT_FALSE(thread_checker.CalledOnValidThread());
+}
} // namespace base
diff --git a/base/threading/thread_local.h b/base/threading/thread_local.h
index f40420cd2f..cad9add3a9 100644
--- a/base/threading/thread_local.h
+++ b/base/threading/thread_local.h
@@ -2,35 +2,34 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-// WARNING: Thread local storage is a bit tricky to get right. Please make
-// sure that this is really the proper solution for what you're trying to
-// achieve. Don't prematurely optimize, most likely you can just use a Lock.
+// WARNING: Thread local storage is a bit tricky to get right. Please make sure
+// that this is really the proper solution for what you're trying to achieve.
+// Don't prematurely optimize, most likely you can just use a Lock.
//
-// These classes implement a wrapper around the platform's TLS storage
-// mechanism. On construction, they will allocate a TLS slot, and free the
-// TLS slot on destruction. No memory management (creation or destruction) is
-// handled. This means for uses of ThreadLocalPointer, you must correctly
-// manage the memory yourself, these classes will not destroy the pointer for
-// you. There are no at-thread-exit actions taken by these classes.
+// These classes implement a wrapper around ThreadLocalStorage::Slot. On
+// construction, they will allocate a TLS slot, and free the TLS slot on
+// destruction. No memory management (creation or destruction) is handled. This
+// means for uses of ThreadLocalPointer, you must correctly manage the memory
+// yourself, these classes will not destroy the pointer for you. There are no
+// at-thread-exit actions taken by these classes.
//
-// ThreadLocalPointer<Type> wraps a Type*. It performs no creation or
-// destruction, so memory management must be handled elsewhere. The first call
-// to Get() on a thread will return NULL. You can update the pointer with a
-// call to Set().
+// ThreadLocalPointer<Type> wraps a Type*. It performs no creation or
+// destruction, so memory management must be handled elsewhere. The first call
+// to Get() on a thread will return NULL. You can update the pointer with a call
+// to Set().
//
-// ThreadLocalBoolean wraps a bool. It will default to false if it has never
+// ThreadLocalBoolean wraps a bool. It will default to false if it has never
// been set otherwise with Set().
//
-// Thread Safety: An instance of ThreadLocalStorage is completely thread safe
-// once it has been created. If you want to dynamically create an instance,
-// you must of course properly deal with safety and race conditions. This
-// means a function-level static initializer is generally inappropiate.
+// Thread Safety: An instance of ThreadLocalStorage is completely thread safe
+// once it has been created. If you want to dynamically create an instance, you
+// must of course properly deal with safety and race conditions. This means a
+// function-level static initializer is generally inappropiate.
//
-// In Android, the system TLS is limited, the implementation is backed with
-// ThreadLocalStorage.
+// In Android, the system TLS is limited.
//
// Example usage:
-// // My class is logically attached to a single thread. We cache a pointer
+// // My class is logically attached to a single thread. We cache a pointer
// // on the thread it was created on, so we can implement current().
// MyClass::MyClass() {
// DCHECK(Singleton<ThreadLocalPointer<MyClass> >::get()->Get() == NULL);
@@ -51,76 +50,42 @@
#ifndef BASE_THREADING_THREAD_LOCAL_H_
#define BASE_THREADING_THREAD_LOCAL_H_
-#include "base/base_export.h"
#include "base/macros.h"
#include "base/threading/thread_local_storage.h"
-#include "build/build_config.h"
-
-#if defined(OS_POSIX)
-#include <pthread.h>
-#endif
namespace base {
-namespace internal {
-
-// Helper functions that abstract the cross-platform APIs. Do not use directly.
-struct BASE_EXPORT ThreadLocalPlatform {
-#if defined(OS_WIN)
- typedef unsigned long SlotType;
-#elif defined(OS_ANDROID)
- typedef ThreadLocalStorage::StaticSlot SlotType;
-#elif defined(OS_POSIX)
- typedef pthread_key_t SlotType;
-#endif
-
- static void AllocateSlot(SlotType* slot);
- static void FreeSlot(SlotType slot);
- static void* GetValueFromSlot(SlotType slot);
- static void SetValueInSlot(SlotType slot, void* value);
-};
-
-} // namespace internal
template <typename Type>
class ThreadLocalPointer {
public:
- ThreadLocalPointer() : slot_() {
- internal::ThreadLocalPlatform::AllocateSlot(&slot_);
- }
-
- ~ThreadLocalPointer() {
- internal::ThreadLocalPlatform::FreeSlot(slot_);
- }
+ ThreadLocalPointer() = default;
+ ~ThreadLocalPointer() = default;
Type* Get() {
- return static_cast<Type*>(
- internal::ThreadLocalPlatform::GetValueFromSlot(slot_));
+ return static_cast<Type*>(slot_.Get());
}
void Set(Type* ptr) {
- internal::ThreadLocalPlatform::SetValueInSlot(
- slot_, const_cast<void*>(static_cast<const void*>(ptr)));
+ slot_.Set(const_cast<void*>(static_cast<const void*>(ptr)));
}
private:
- typedef internal::ThreadLocalPlatform::SlotType SlotType;
-
- SlotType slot_;
+ ThreadLocalStorage::Slot slot_;
DISALLOW_COPY_AND_ASSIGN(ThreadLocalPointer<Type>);
};
class ThreadLocalBoolean {
public:
- ThreadLocalBoolean() {}
- ~ThreadLocalBoolean() {}
+ ThreadLocalBoolean() = default;
+ ~ThreadLocalBoolean() = default;
bool Get() {
- return tlp_.Get() != NULL;
+ return tlp_.Get() != nullptr;
}
void Set(bool val) {
- tlp_.Set(val ? this : NULL);
+ tlp_.Set(val ? this : nullptr);
}
private:
diff --git a/base/threading/thread_local_posix.cc b/base/threading/thread_local_posix.cc
deleted file mode 100644
index 8bc46ad190..0000000000
--- a/base/threading/thread_local_posix.cc
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "base/threading/thread_local.h"
-
-#include <pthread.h>
-
-#include "base/logging.h"
-#include "build/build_config.h"
-
-#if !defined(OS_ANDROID)
-
-namespace base {
-namespace internal {
-
-// static
-void ThreadLocalPlatform::AllocateSlot(SlotType* slot) {
- int error = pthread_key_create(slot, NULL);
- CHECK_EQ(error, 0);
-}
-
-// static
-void ThreadLocalPlatform::FreeSlot(SlotType slot) {
- int error = pthread_key_delete(slot);
- DCHECK_EQ(0, error);
-}
-
-// static
-void* ThreadLocalPlatform::GetValueFromSlot(SlotType slot) {
- return pthread_getspecific(slot);
-}
-
-// static
-void ThreadLocalPlatform::SetValueInSlot(SlotType slot, void* value) {
- int error = pthread_setspecific(slot, value);
- DCHECK_EQ(error, 0);
-}
-
-} // namespace internal
-} // namespace base
-
-#endif // !defined(OS_ANDROID)
diff --git a/base/threading/thread_local_storage.cc b/base/threading/thread_local_storage.cc
index a7eb527888..48c1dd58c2 100644
--- a/base/threading/thread_local_storage.cc
+++ b/base/threading/thread_local_storage.cc
@@ -6,10 +6,57 @@
#include "base/atomicops.h"
#include "base/logging.h"
+#include "base/synchronization/lock.h"
#include "build/build_config.h"
using base::internal::PlatformThreadLocalStorage;
+// Chrome Thread Local Storage (TLS)
+//
+// This TLS system allows Chrome to use a single OS level TLS slot process-wide,
+// and allows us to control the slot limits instead of being at the mercy of the
+// platform. To do this, Chrome TLS replicates an array commonly found in the OS
+// thread metadata.
+//
+// Overview:
+//
+// OS TLS Slots Per-Thread Per-Process Global
+// ...
+// [] Chrome TLS Array Chrome TLS Metadata
+// [] ----------> [][][][][ ][][][][] [][][][][ ][][][][]
+// [] | |
+// ... V V
+// Metadata Version Slot Information
+// Your Data!
+//
+// Using a single OS TLS slot, Chrome TLS allocates an array on demand for the
+// lifetime of each thread that requests Chrome TLS data. Each per-thread TLS
+// array matches the length of the per-process global metadata array.
+//
+// A per-process global TLS metadata array tracks information about each item in
+// the per-thread array:
+// * Status: Tracks if the slot is allocated or free to assign.
+// * Destructor: An optional destructor to call on thread destruction for that
+// specific slot.
+// * Version: Tracks the current version of the TLS slot. Each TLS slot
+// allocation is associated with a unique version number.
+//
+// Most OS TLS APIs guarantee that a newly allocated TLS slot is
+// initialized to 0 for all threads. The Chrome TLS system provides
+// this guarantee by tracking the version for each TLS slot here
+// on each per-thread Chrome TLS array entry. Threads that access
+// a slot with a mismatched version will receive 0 as their value.
+// The metadata version is incremented when the client frees a
+// slot. The per-thread metadata version is updated when a client
+// writes to the slot. This scheme allows for constant time
+// invalidation and avoids the need to iterate through each Chrome
+// TLS array to mark the slot as zero.
+//
+// Just like an OS TLS API, clients of the Chrome TLS are responsible for
+// managing any necessary lifetime of the data in their slots. The only
+// convenience provided is automatic destruction when a thread ends. If a client
+// frees a slot, that client is responsible for destroying the data in the slot.
+
namespace {
// In order to make TLS destructors work, we need to keep around a function
// pointer to the destructor for each slot. We keep this array of pointers in a
@@ -18,37 +65,42 @@ namespace {
// hold a pointer to a per-thread array (table) of slots that we allocate to
// Chromium consumers.
-// g_native_tls_key is the one native TLS that we use. It stores our table.
+// g_native_tls_key is the one native TLS that we use. It stores our table.
base::subtle::Atomic32 g_native_tls_key =
PlatformThreadLocalStorage::TLS_KEY_OUT_OF_INDEXES;
-// g_last_used_tls_key is the high-water-mark of allocated thread local storage.
-// Each allocation is an index into our g_tls_destructors[]. Each such index is
-// assigned to the instance variable slot_ in a ThreadLocalStorage::Slot
-// instance. We reserve the value slot_ == 0 to indicate that the corresponding
-// instance of ThreadLocalStorage::Slot has been freed (i.e., destructor called,
-// etc.). This reserved use of 0 is then stated as the initial value of
-// g_last_used_tls_key, so that the first issued index will be 1.
-base::subtle::Atomic32 g_last_used_tls_key = 0;
+// The maximum number of slots in our thread local storage stack.
+constexpr int kThreadLocalStorageSize = 256;
+constexpr int kInvalidSlotValue = -1;
+
+enum TlsStatus {
+ FREE,
+ IN_USE,
+};
+
+struct TlsMetadata {
+ TlsStatus status;
+ base::ThreadLocalStorage::TLSDestructorFunc destructor;
+ uint32_t version;
+};
-// The maximum number of 'slots' in our thread local storage stack.
-const int kThreadLocalStorageSize = 256;
+struct TlsVectorEntry {
+ void* data;
+ uint32_t version;
+};
+
+// This lock isn't needed until after we've constructed the per-thread TLS
+// vector, so it's safe to use.
+base::Lock* GetTLSMetadataLock() {
+ static auto* lock = new base::Lock();
+ return lock;
+}
+TlsMetadata g_tls_metadata[kThreadLocalStorageSize];
+size_t g_last_assigned_slot = 0;
// The maximum number of times to try to clear slots by calling destructors.
// Use pthread naming convention for clarity.
-const int kMaxDestructorIterations = kThreadLocalStorageSize;
-
-// An array of destructor function pointers for the slots. If a slot has a
-// destructor, it will be stored in its corresponding entry in this array.
-// The elements are volatile to ensure that when the compiler reads the value
-// to potentially call the destructor, it does so once, and that value is tested
-// for null-ness and then used. Yes, that would be a weird de-optimization,
-// but I can imagine some register machines where it was just as easy to
-// re-fetch an array element, and I want to be sure a call to free the key
-// (i.e., null out the destructor entry) that happens on a separate thread can't
-// hurt the racy calls to the destructors on another thread.
-volatile base::ThreadLocalStorage::TLSDestructorFunc
- g_tls_destructors[kThreadLocalStorageSize];
+constexpr int kMaxDestructorIterations = kThreadLocalStorageSize;
// This function is called to initialize our entire Chromium TLS system.
// It may be called very early, and we need to complete most all of the setup
@@ -56,7 +108,7 @@ volatile base::ThreadLocalStorage::TLSDestructorFunc
// recursively depend on this initialization.
// As a result, we use Atomics, and avoid anything (like a singleton) that might
// require memory allocations.
-void** ConstructTlsVector() {
+TlsVectorEntry* ConstructTlsVector() {
PlatformThreadLocalStorage::TLSKey key =
base::subtle::NoBarrier_Load(&g_native_tls_key);
if (key == PlatformThreadLocalStorage::TLS_KEY_OUT_OF_INDEXES) {
@@ -73,8 +125,8 @@ void** ConstructTlsVector() {
key != PlatformThreadLocalStorage::TLS_KEY_OUT_OF_INDEXES);
PlatformThreadLocalStorage::FreeTLS(tmp);
}
- // Atomically test-and-set the tls_key. If the key is
- // TLS_KEY_OUT_OF_INDEXES, go ahead and set it. Otherwise, do nothing, as
+ // Atomically test-and-set the tls_key. If the key is
+ // TLS_KEY_OUT_OF_INDEXES, go ahead and set it. Otherwise, do nothing, as
// another thread already did our dirty work.
if (PlatformThreadLocalStorage::TLS_KEY_OUT_OF_INDEXES !=
static_cast<PlatformThreadLocalStorage::TLSKey>(
@@ -90,39 +142,38 @@ void** ConstructTlsVector() {
}
CHECK(!PlatformThreadLocalStorage::GetTLSValue(key));
- // Some allocators, such as TCMalloc, make use of thread local storage.
- // As a result, any attempt to call new (or malloc) will lazily cause such a
- // system to initialize, which will include registering for a TLS key. If we
- // are not careful here, then that request to create a key will call new back,
- // and we'll have an infinite loop. We avoid that as follows:
- // Use a stack allocated vector, so that we don't have dependence on our
- // allocator until our service is in place. (i.e., don't even call new until
- // after we're setup)
- void* stack_allocated_tls_data[kThreadLocalStorageSize];
+ // Some allocators, such as TCMalloc, make use of thread local storage. As a
+ // result, any attempt to call new (or malloc) will lazily cause such a system
+ // to initialize, which will include registering for a TLS key. If we are not
+ // careful here, then that request to create a key will call new back, and
+ // we'll have an infinite loop. We avoid that as follows: Use a stack
+ // allocated vector, so that we don't have dependence on our allocator until
+ // our service is in place. (i.e., don't even call new until after we're
+ // setup)
+ TlsVectorEntry stack_allocated_tls_data[kThreadLocalStorageSize];
memset(stack_allocated_tls_data, 0, sizeof(stack_allocated_tls_data));
// Ensure that any rentrant calls change the temp version.
PlatformThreadLocalStorage::SetTLSValue(key, stack_allocated_tls_data);
// Allocate an array to store our data.
- void** tls_data = new void*[kThreadLocalStorageSize];
+ TlsVectorEntry* tls_data = new TlsVectorEntry[kThreadLocalStorageSize];
memcpy(tls_data, stack_allocated_tls_data, sizeof(stack_allocated_tls_data));
PlatformThreadLocalStorage::SetTLSValue(key, tls_data);
return tls_data;
}
-void OnThreadExitInternal(void* value) {
- DCHECK(value);
- void** tls_data = static_cast<void**>(value);
- // Some allocators, such as TCMalloc, use TLS. As a result, when a thread
+void OnThreadExitInternal(TlsVectorEntry* tls_data) {
+ DCHECK(tls_data);
+ // Some allocators, such as TCMalloc, use TLS. As a result, when a thread
// terminates, one of the destructor calls we make may be to shut down an
- // allocator. We have to be careful that after we've shutdown all of the
- // known destructors (perchance including an allocator), that we don't call
- // the allocator and cause it to resurrect itself (with no possibly destructor
- // call to follow). We handle this problem as follows:
- // Switch to using a stack allocated vector, so that we don't have dependence
- // on our allocator after we have called all g_tls_destructors. (i.e., don't
- // even call delete[] after we're done with destructors.)
- void* stack_allocated_tls_data[kThreadLocalStorageSize];
+ // allocator. We have to be careful that after we've shutdown all of the known
+ // destructors (perchance including an allocator), that we don't call the
+ // allocator and cause it to resurrect itself (with no possibly destructor
+ // call to follow). We handle this problem as follows: Switch to using a stack
+ // allocated vector, so that we don't have dependence on our allocator after
+ // we have called all g_tls_metadata destructors. (i.e., don't even call
+ // delete[] after we're done with destructors.)
+ TlsVectorEntry stack_allocated_tls_data[kThreadLocalStorageSize];
memcpy(stack_allocated_tls_data, tls_data, sizeof(stack_allocated_tls_data));
// Ensure that any re-entrant calls change the temp version.
PlatformThreadLocalStorage::TLSKey key =
@@ -130,32 +181,38 @@ void OnThreadExitInternal(void* value) {
PlatformThreadLocalStorage::SetTLSValue(key, stack_allocated_tls_data);
delete[] tls_data; // Our last dependence on an allocator.
+ // Snapshot the TLS Metadata so we don't have to lock on every access.
+ TlsMetadata tls_metadata[kThreadLocalStorageSize];
+ {
+ base::AutoLock auto_lock(*GetTLSMetadataLock());
+ memcpy(tls_metadata, g_tls_metadata, sizeof(g_tls_metadata));
+ }
+
int remaining_attempts = kMaxDestructorIterations;
bool need_to_scan_destructors = true;
while (need_to_scan_destructors) {
need_to_scan_destructors = false;
// Try to destroy the first-created-slot (which is slot 1) in our last
- // destructor call. That user was able to function, and define a slot with
+ // destructor call. That user was able to function, and define a slot with
// no other services running, so perhaps it is a basic service (like an
- // allocator) and should also be destroyed last. If we get the order wrong,
- // then we'll itterate several more times, so it is really not that
- // critical (but it might help).
- base::subtle::Atomic32 last_used_tls_key =
- base::subtle::NoBarrier_Load(&g_last_used_tls_key);
- for (int slot = last_used_tls_key; slot > 0; --slot) {
- void* tls_value = stack_allocated_tls_data[slot];
- if (tls_value == NULL)
+ // allocator) and should also be destroyed last. If we get the order wrong,
+ // then we'll iterate several more times, so it is really not that critical
+ // (but it might help).
+ for (int slot = 0; slot < kThreadLocalStorageSize ; ++slot) {
+ void* tls_value = stack_allocated_tls_data[slot].data;
+ if (!tls_value || tls_metadata[slot].status == TlsStatus::FREE ||
+ stack_allocated_tls_data[slot].version != tls_metadata[slot].version)
continue;
base::ThreadLocalStorage::TLSDestructorFunc destructor =
- g_tls_destructors[slot];
- if (destructor == NULL)
+ tls_metadata[slot].destructor;
+ if (!destructor)
continue;
- stack_allocated_tls_data[slot] = NULL; // pre-clear the slot.
+ stack_allocated_tls_data[slot].data = nullptr; // pre-clear the slot.
destructor(tls_value);
- // Any destructor might have called a different service, which then set
- // a different slot to a non-NULL value. Hence we need to check
- // the whole vector again. This is a pthread standard.
+ // Any destructor might have called a different service, which then set a
+ // different slot to a non-null value. Hence we need to check the whole
+ // vector again. This is a pthread standard.
need_to_scan_destructors = true;
}
if (--remaining_attempts <= 0) {
@@ -165,7 +222,7 @@ void OnThreadExitInternal(void* value) {
}
// Remove our stack allocated vector.
- PlatformThreadLocalStorage::SetTLSValue(key, NULL);
+ PlatformThreadLocalStorage::SetTLSValue(key, nullptr);
}
} // namespace
@@ -184,69 +241,107 @@ void PlatformThreadLocalStorage::OnThreadExit() {
// Maybe we have never initialized TLS for this thread.
if (!tls_data)
return;
- OnThreadExitInternal(tls_data);
+ OnThreadExitInternal(static_cast<TlsVectorEntry*>(tls_data));
}
#elif defined(OS_POSIX)
void PlatformThreadLocalStorage::OnThreadExit(void* value) {
- OnThreadExitInternal(value);
+ OnThreadExitInternal(static_cast<TlsVectorEntry*>(value));
}
#endif // defined(OS_WIN)
} // namespace internal
-ThreadLocalStorage::Slot::Slot(TLSDestructorFunc destructor) {
- slot_ = 0;
- base::subtle::Release_Store(&initialized_, 0);
- Initialize(destructor);
-}
-
void ThreadLocalStorage::StaticSlot::Initialize(TLSDestructorFunc destructor) {
PlatformThreadLocalStorage::TLSKey key =
base::subtle::NoBarrier_Load(&g_native_tls_key);
if (key == PlatformThreadLocalStorage::TLS_KEY_OUT_OF_INDEXES ||
- !PlatformThreadLocalStorage::GetTLSValue(key))
+ !PlatformThreadLocalStorage::GetTLSValue(key)) {
ConstructTlsVector();
+ }
// Grab a new slot.
- slot_ = base::subtle::NoBarrier_AtomicIncrement(&g_last_used_tls_key, 1);
- DCHECK_GT(slot_, 0);
+ slot_ = kInvalidSlotValue;
+ version_ = 0;
+ {
+ base::AutoLock auto_lock(*GetTLSMetadataLock());
+ for (int i = 0; i < kThreadLocalStorageSize; ++i) {
+ // Tracking the last assigned slot is an attempt to find the next
+ // available slot within one iteration. Under normal usage, slots remain
+ // in use for the lifetime of the process (otherwise before we reclaimed
+ // slots, we would have run out of slots). This makes it highly likely the
+ // next slot is going to be a free slot.
+ size_t slot_candidate =
+ (g_last_assigned_slot + 1 + i) % kThreadLocalStorageSize;
+ if (g_tls_metadata[slot_candidate].status == TlsStatus::FREE) {
+ g_tls_metadata[slot_candidate].status = TlsStatus::IN_USE;
+ g_tls_metadata[slot_candidate].destructor = destructor;
+ g_last_assigned_slot = slot_candidate;
+ slot_ = slot_candidate;
+ version_ = g_tls_metadata[slot_candidate].version;
+ break;
+ }
+ }
+ }
+ CHECK_NE(slot_, kInvalidSlotValue);
CHECK_LT(slot_, kThreadLocalStorageSize);
// Setup our destructor.
- g_tls_destructors[slot_] = destructor;
base::subtle::Release_Store(&initialized_, 1);
}
void ThreadLocalStorage::StaticSlot::Free() {
- // At this time, we don't reclaim old indices for TLS slots.
- // So all we need to do is wipe the destructor.
- DCHECK_GT(slot_, 0);
+ DCHECK_NE(slot_, kInvalidSlotValue);
DCHECK_LT(slot_, kThreadLocalStorageSize);
- g_tls_destructors[slot_] = NULL;
- slot_ = 0;
+ {
+ base::AutoLock auto_lock(*GetTLSMetadataLock());
+ g_tls_metadata[slot_].status = TlsStatus::FREE;
+ g_tls_metadata[slot_].destructor = nullptr;
+ ++(g_tls_metadata[slot_].version);
+ }
+ slot_ = kInvalidSlotValue;
base::subtle::Release_Store(&initialized_, 0);
}
void* ThreadLocalStorage::StaticSlot::Get() const {
- void** tls_data = static_cast<void**>(
+ TlsVectorEntry* tls_data = static_cast<TlsVectorEntry*>(
PlatformThreadLocalStorage::GetTLSValue(
base::subtle::NoBarrier_Load(&g_native_tls_key)));
if (!tls_data)
tls_data = ConstructTlsVector();
- DCHECK_GT(slot_, 0);
+ DCHECK_NE(slot_, kInvalidSlotValue);
DCHECK_LT(slot_, kThreadLocalStorageSize);
- return tls_data[slot_];
+ // Version mismatches means this slot was previously freed.
+ if (tls_data[slot_].version != version_)
+ return nullptr;
+ return tls_data[slot_].data;
}
void ThreadLocalStorage::StaticSlot::Set(void* value) {
- void** tls_data = static_cast<void**>(
+ TlsVectorEntry* tls_data = static_cast<TlsVectorEntry*>(
PlatformThreadLocalStorage::GetTLSValue(
base::subtle::NoBarrier_Load(&g_native_tls_key)));
if (!tls_data)
tls_data = ConstructTlsVector();
- DCHECK_GT(slot_, 0);
+ DCHECK_NE(slot_, kInvalidSlotValue);
DCHECK_LT(slot_, kThreadLocalStorageSize);
- tls_data[slot_] = value;
+ tls_data[slot_].data = value;
+ tls_data[slot_].version = version_;
+}
+
+ThreadLocalStorage::Slot::Slot(TLSDestructorFunc destructor) {
+ tls_slot_.Initialize(destructor);
+}
+
+ThreadLocalStorage::Slot::~Slot() {
+ tls_slot_.Free();
+}
+
+void* ThreadLocalStorage::Slot::Get() const {
+ return tls_slot_.Get();
+}
+
+void ThreadLocalStorage::Slot::Set(void* value) {
+ tls_slot_.Set(value);
}
} // namespace base
diff --git a/base/threading/thread_local_storage.h b/base/threading/thread_local_storage.h
index 0c7a692a66..5e70410af9 100644
--- a/base/threading/thread_local_storage.h
+++ b/base/threading/thread_local_storage.h
@@ -5,6 +5,8 @@
#ifndef BASE_THREADING_THREAD_LOCAL_STORAGE_H_
#define BASE_THREADING_THREAD_LOCAL_STORAGE_H_
+#include <stdint.h>
+
#include "base/atomicops.h"
#include "base/base_export.h"
#include "base/macros.h"
@@ -20,9 +22,12 @@ namespace base {
namespace internal {
-// WARNING: You should *NOT* be using this class directly.
-// PlatformThreadLocalStorage is low-level abstraction to the OS's TLS
-// interface, you should instead be using ThreadLocalStorage::StaticSlot/Slot.
+// WARNING: You should *NOT* use this class directly.
+// PlatformThreadLocalStorage is a low-level abstraction of the OS's TLS
+// interface. Instead, you should use one of the following:
+// * ThreadLocalBoolean (from thread_local.h) for booleans.
+// * ThreadLocalPointer (from thread_local.h) for pointers.
+// * ThreadLocalStorage::StaticSlot/Slot for more direct control of the slot.
class BASE_EXPORT PlatformThreadLocalStorage {
public:
@@ -89,7 +94,7 @@ class BASE_EXPORT ThreadLocalStorage {
// initialization, as base's LINKER_INITIALIZED requires a constructor and on
// some compilers (notably gcc 4.4) this still ends up needing runtime
// initialization.
- #define TLS_INITIALIZER {false, 0}
+#define TLS_INITIALIZER {false, 0, 0}
// A key representing one value stored in TLS.
// Initialize like
@@ -123,18 +128,25 @@ class BASE_EXPORT ThreadLocalStorage {
// The internals of this struct should be considered private.
base::subtle::Atomic32 initialized_;
int slot_;
+ uint32_t version_;
};
// A convenience wrapper around StaticSlot with a constructor. Can be used
// as a member variable.
- class BASE_EXPORT Slot : public StaticSlot {
+ class BASE_EXPORT Slot {
public:
- // Calls StaticSlot::Initialize().
explicit Slot(TLSDestructorFunc destructor = NULL);
+ ~Slot();
+
+ // Get the thread-local value stored in this slot.
+ // Values are guaranteed to initially be zero.
+ void* Get() const;
+
+ // Set the slot's thread-local value to |value|.
+ void Set(void* value);
private:
- using StaticSlot::initialized_;
- using StaticSlot::slot_;
+ StaticSlot tls_slot_;
DISALLOW_COPY_AND_ASSIGN(Slot);
};
diff --git a/base/threading/thread_local_storage_unittest.cc b/base/threading/thread_local_storage_unittest.cc
index 322524b10e..335252b18e 100644
--- a/base/threading/thread_local_storage_unittest.cc
+++ b/base/threading/thread_local_storage_unittest.cc
@@ -127,4 +127,14 @@ TEST(ThreadLocalStorageTest, MAYBE_TLSDestructors) {
tls_slot.Free(); // Stop doing callbacks to cleanup threads.
}
+TEST(ThreadLocalStorageTest, TLSReclaim) {
+ // Creates and destroys many TLS slots and ensures they all zero-inited.
+ for (int i = 0; i < 1000; ++i) {
+ ThreadLocalStorage::Slot slot(nullptr);
+ EXPECT_EQ(nullptr, slot.Get());
+ slot.Set(reinterpret_cast<void*>(0xBAADF00D));
+ EXPECT_EQ(reinterpret_cast<void*>(0xBAADF00D), slot.Get());
+ }
+}
+
} // namespace base
diff --git a/base/threading/thread_restrictions.cc b/base/threading/thread_restrictions.cc
index 00306c5ae7..8dd7743332 100644
--- a/base/threading/thread_restrictions.cc
+++ b/base/threading/thread_restrictions.cc
@@ -4,7 +4,7 @@
#include "base/threading/thread_restrictions.h"
-#if ENABLE_THREAD_RESTRICTIONS
+#if DCHECK_IS_ON()
#include "base/lazy_instance.h"
#include "base/logging.h"
@@ -35,7 +35,7 @@ bool ThreadRestrictions::SetIOAllowed(bool allowed) {
// static
void ThreadRestrictions::AssertIOAllowed() {
if (g_io_disallowed.Get().Get()) {
- LOG(FATAL) <<
+ NOTREACHED() <<
"Function marked as IO-only was called from a thread that "
"disallows IO! If this thread really should be allowed to "
"make IO calls, adjust the call to "
@@ -54,10 +54,14 @@ bool ThreadRestrictions::SetSingletonAllowed(bool allowed) {
// static
void ThreadRestrictions::AssertSingletonAllowed() {
if (g_singleton_disallowed.Get().Get()) {
- LOG(FATAL) << "LazyInstance/Singleton is not allowed to be used on this "
- << "thread. Most likely it's because this thread is not "
- << "joinable, so AtExitManager may have deleted the object "
- << "on shutdown, leading to a potential shutdown crash.";
+ NOTREACHED() << "LazyInstance/Singleton is not allowed to be used on this "
+ << "thread. Most likely it's because this thread is not "
+ << "joinable (or the current task is running with "
+ << "TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN semantics), so "
+ << "AtExitManager may have deleted the object on shutdown, "
+ << "leading to a potential shutdown crash. If you need to use "
+ << "the object from this context, it'll have to be updated to "
+ << "use Leaky traits.";
}
}
@@ -69,8 +73,8 @@ void ThreadRestrictions::DisallowWaiting() {
// static
void ThreadRestrictions::AssertWaitAllowed() {
if (g_wait_disallowed.Get().Get()) {
- LOG(FATAL) << "Waiting is not allowed to be used on this thread to prevent "
- << "jank and deadlock.";
+ NOTREACHED() << "Waiting is not allowed to be used on this thread to "
+ << "prevent jank and deadlock.";
}
}
@@ -82,4 +86,4 @@ bool ThreadRestrictions::SetWaitAllowed(bool allowed) {
} // namespace base
-#endif // ENABLE_THREAD_RESTRICTIONS
+#endif // DCHECK_IS_ON()
diff --git a/base/threading/thread_restrictions.h b/base/threading/thread_restrictions.h
index 4212a4b6eb..a86dd452b8 100644
--- a/base/threading/thread_restrictions.h
+++ b/base/threading/thread_restrictions.h
@@ -6,15 +6,9 @@
#define BASE_THREADING_THREAD_RESTRICTIONS_H_
#include "base/base_export.h"
+#include "base/logging.h"
#include "base/macros.h"
-// See comment at top of thread_checker.h
-#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
-#define ENABLE_THREAD_RESTRICTIONS 1
-#else
-#define ENABLE_THREAD_RESTRICTIONS 0
-#endif
-
class BrowserProcessImpl;
class HistogramSynchronizer;
class NativeBackendKWallet;
@@ -57,10 +51,10 @@ namespace gpu {
class GpuChannelHost;
}
namespace mojo {
-namespace common {
-class MessagePumpMojo;
-}
class SyncCallRestrictions;
+namespace edk {
+class ScopedIPCSupport;
+}
}
namespace ui {
class CommandBufferClientImpl;
@@ -92,6 +86,10 @@ namespace android {
class JavaHandlerThread;
}
+namespace internal {
+class TaskTracker;
+}
+
class SequencedWorkerPool;
class SimpleThread;
class Thread;
@@ -137,21 +135,7 @@ class BASE_EXPORT ThreadRestrictions {
DISALLOW_COPY_AND_ASSIGN(ScopedAllowIO);
};
- // Constructing a ScopedAllowSingleton temporarily allows accessing for the
- // current thread. Doing this is almost always incorrect.
- class BASE_EXPORT ScopedAllowSingleton {
- public:
- ScopedAllowSingleton() { previous_value_ = SetSingletonAllowed(true); }
- ~ScopedAllowSingleton() { SetSingletonAllowed(previous_value_); }
- private:
- // Whether singleton use is allowed when the ScopedAllowSingleton was
- // constructed.
- bool previous_value_;
-
- DISALLOW_COPY_AND_ASSIGN(ScopedAllowSingleton);
- };
-
-#if ENABLE_THREAD_RESTRICTIONS
+#if DCHECK_IS_ON()
// Set whether the current thread to make IO calls.
// Threads start out in the *allowed* state.
// Returns the previous value.
@@ -197,6 +181,7 @@ class BASE_EXPORT ThreadRestrictions {
friend class content::ScopedAllowWaitForAndroidLayoutTests;
friend class content::ScopedAllowWaitForDebugURL;
friend class ::HistogramSynchronizer;
+ friend class internal::TaskTracker;
friend class ::ScopedAllowWaitForLegacyWebViewApi;
friend class cc::CompletionEvent;
friend class cc::SingleThreadTaskGraphRunner;
@@ -210,8 +195,8 @@ class BASE_EXPORT ThreadRestrictions {
friend class ThreadTestHelper;
friend class PlatformThread;
friend class android::JavaHandlerThread;
- friend class mojo::common::MessagePumpMojo;
friend class mojo::SyncCallRestrictions;
+ friend class mojo::edk::ScopedIPCSupport;
friend class ui::CommandBufferClientImpl;
friend class ui::CommandBufferLocal;
friend class ui::GpuState;
@@ -240,7 +225,7 @@ class BASE_EXPORT ThreadRestrictions {
friend class views::ScreenMus;
// END USAGE THAT NEEDS TO BE FIXED.
-#if ENABLE_THREAD_RESTRICTIONS
+#if DCHECK_IS_ON()
static bool SetWaitAllowed(bool allowed);
#else
static bool SetWaitAllowed(bool) { return true; }
diff --git a/base/threading/thread_task_runner_handle.cc b/base/threading/thread_task_runner_handle.cc
index 190e18ffc6..00deaa4e20 100644
--- a/base/threading/thread_task_runner_handle.cc
+++ b/base/threading/thread_task_runner_handle.cc
@@ -6,8 +6,10 @@
#include <utility>
+#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
+#include "base/memory/ptr_util.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/threading/thread_local.h"
@@ -32,6 +34,50 @@ bool ThreadTaskRunnerHandle::IsSet() {
return !!lazy_tls_ptr.Pointer()->Get();
}
+// static
+ScopedClosureRunner ThreadTaskRunnerHandle::OverrideForTesting(
+ scoped_refptr<SingleThreadTaskRunner> overriding_task_runner) {
+ // OverrideForTesting() is not compatible with a SequencedTaskRunnerHandle
+ // being set (but SequencedTaskRunnerHandle::IsSet() includes
+ // ThreadTaskRunnerHandle::IsSet() so that's discounted as the only valid
+ // excuse for it to be true). Sadly this means that tests that merely need a
+ // SequencedTaskRunnerHandle on their main thread can be forced to use a
+ // ThreadTaskRunnerHandle if they're also using test task runners (that
+ // OverrideForTesting() when running their tasks from said main thread). To
+ // solve this: sequence_task_runner_handle.cc and thread_task_runner_handle.cc
+ // would have to be merged into a single impl file and share TLS state. This
+ // was deemed unecessary for now as most tests should use higher level
+ // constructs and not have to instantiate task runner handles on their own.
+ DCHECK(!SequencedTaskRunnerHandle::IsSet() || IsSet());
+
+ if (!IsSet()) {
+ std::unique_ptr<ThreadTaskRunnerHandle> top_level_ttrh =
+ MakeUnique<ThreadTaskRunnerHandle>(std::move(overriding_task_runner));
+ return ScopedClosureRunner(base::Bind(
+ [](std::unique_ptr<ThreadTaskRunnerHandle>) {},
+ base::Passed(&top_level_ttrh)));
+ }
+
+ ThreadTaskRunnerHandle* ttrh = lazy_tls_ptr.Pointer()->Get();
+ // Swap the two (and below bind |overriding_task_runner|, which is now the
+ // previous one, as the |task_runner_to_restore|).
+ ttrh->task_runner_.swap(overriding_task_runner);
+
+ return ScopedClosureRunner(base::Bind(
+ [](scoped_refptr<SingleThreadTaskRunner> task_runner_to_restore,
+ SingleThreadTaskRunner* expected_task_runner_before_restore) {
+ ThreadTaskRunnerHandle* ttrh = lazy_tls_ptr.Pointer()->Get();
+
+ DCHECK_EQ(expected_task_runner_before_restore, ttrh->task_runner_.get())
+ << "Nested overrides must expire their ScopedClosureRunners "
+ "in LIFO order.";
+
+ ttrh->task_runner_.swap(task_runner_to_restore);
+ },
+ base::Passed(&overriding_task_runner),
+ base::Unretained(ttrh->task_runner_.get())));
+}
+
ThreadTaskRunnerHandle::ThreadTaskRunnerHandle(
scoped_refptr<SingleThreadTaskRunner> task_runner)
: task_runner_(std::move(task_runner)) {
diff --git a/base/threading/thread_task_runner_handle.h b/base/threading/thread_task_runner_handle.h
index c8e58935f0..7ae85e6dcf 100644
--- a/base/threading/thread_task_runner_handle.h
+++ b/base/threading/thread_task_runner_handle.h
@@ -6,6 +6,7 @@
#define BASE_THREADING_THREAD_TASK_RUNNER_HANDLE_H_
#include "base/base_export.h"
+#include "base/callback_helpers.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/single_thread_task_runner.h"
@@ -26,6 +27,17 @@ class BASE_EXPORT ThreadTaskRunnerHandle {
// the current thread.
static bool IsSet();
+ // Overrides ThreadTaskRunnerHandle::Get()'s |task_runner_| to point at
+ // |overriding_task_runner| until the returned ScopedClosureRunner goes out of
+ // scope (instantiates a ThreadTaskRunnerHandle for that scope if |!IsSet()|).
+ // Nested overrides are allowed but callers must ensure the
+ // ScopedClosureRunners expire in LIFO (stack) order. Note: nesting
+ // ThreadTaskRunnerHandles isn't generally desired but it's useful in unit
+ // tests where multiple task runners can share the main thread for simplicity
+ // and determinism.
+ static ScopedClosureRunner OverrideForTesting(
+ scoped_refptr<SingleThreadTaskRunner> overriding_task_runner);
+
// Binds |task_runner| to the current thread. |task_runner| must belong
// to the current thread for this to succeed.
explicit ThreadTaskRunnerHandle(
diff --git a/base/threading/thread_unittest.cc b/base/threading/thread_unittest.cc
index b0fd26521a..af8347432b 100644
--- a/base/threading/thread_unittest.cc
+++ b/base/threading/thread_unittest.cc
@@ -5,13 +5,22 @@
#include "base/threading/thread.h"
#include <stddef.h>
+#include <stdint.h>
#include <vector>
#include "base/bind.h"
-#include "base/location.h"
+#include "base/debug/leak_annotations.h"
+#include "base/macros.h"
+#include "base/memory/ptr_util.h"
+#include "base/message_loop/message_loop.h"
+#include "base/run_loop.h"
#include "base/single_thread_task_runner.h"
#include "base/synchronization/waitable_event.h"
+#include "base/test/gtest_util.h"
+#include "base/third_party/dynamic_annotations/dynamic_annotations.h"
+#include "base/threading/platform_thread.h"
+#include "base/time/time.h"
#include "build/build_config.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/platform_test.h"
@@ -38,8 +47,11 @@ class SleepInsideInitThread : public Thread {
init_called_ = true;
}
bool InitCalled() { return init_called_; }
+
private:
bool init_called_;
+
+ DISALLOW_COPY_AND_ASSIGN(SleepInsideInitThread);
};
enum ThreadEvent {
@@ -76,6 +88,8 @@ class CaptureToEventList : public Thread {
private:
EventList* event_list_;
+
+ DISALLOW_COPY_AND_ASSIGN(CaptureToEventList);
};
// Observer that writes a value into |event_list| when a message loop has been
@@ -96,6 +110,8 @@ class CapturingDestructionObserver
private:
EventList* event_list_;
+
+ DISALLOW_COPY_AND_ASSIGN(CapturingDestructionObserver);
};
// Task that adds a destruction observer to the current message loop.
@@ -115,59 +131,79 @@ void ReturnThreadId(base::Thread* thread,
} // namespace
-TEST_F(ThreadTest, Restart) {
- Thread a("Restart");
- a.Stop();
- EXPECT_FALSE(a.message_loop());
- EXPECT_FALSE(a.IsRunning());
- EXPECT_TRUE(a.Start());
- EXPECT_TRUE(a.message_loop());
- EXPECT_TRUE(a.IsRunning());
- a.Stop();
- EXPECT_FALSE(a.message_loop());
- EXPECT_FALSE(a.IsRunning());
- EXPECT_TRUE(a.Start());
- EXPECT_TRUE(a.message_loop());
- EXPECT_TRUE(a.IsRunning());
- a.Stop();
- EXPECT_FALSE(a.message_loop());
- EXPECT_FALSE(a.IsRunning());
- a.Stop();
- EXPECT_FALSE(a.message_loop());
- EXPECT_FALSE(a.IsRunning());
-}
-
TEST_F(ThreadTest, StartWithOptions_StackSize) {
Thread a("StartWithStackSize");
// Ensure that the thread can work with only 12 kb and still process a
- // message.
+ // message. At the same time, we should scale with the bitness of the system
+ // where 12 kb is definitely not enough.
+ // 12 kb = 3072 Slots on a 32-bit system, so we'll scale based off of that.
Thread::Options options;
-#if defined(ADDRESS_SANITIZER)
- // ASan bloats the stack variables and overflows the 12 kb stack.
- options.stack_size = 24*1024;
+#if defined(ADDRESS_SANITIZER) || !defined(NDEBUG)
+ // ASan bloats the stack variables and overflows the 3072 slot stack. Some
+ // debug builds also grow the stack too much.
+ options.stack_size = 2 * 3072 * sizeof(uintptr_t);
#else
- options.stack_size = 12*1024;
+ options.stack_size = 3072 * sizeof(uintptr_t);
#endif
EXPECT_TRUE(a.StartWithOptions(options));
EXPECT_TRUE(a.message_loop());
EXPECT_TRUE(a.IsRunning());
- bool was_invoked = false;
- a.task_runner()->PostTask(FROM_HERE, base::Bind(&ToggleValue, &was_invoked));
+ base::WaitableEvent event(base::WaitableEvent::ResetPolicy::AUTOMATIC,
+ base::WaitableEvent::InitialState::NOT_SIGNALED);
+ a.task_runner()->PostTask(FROM_HERE, base::Bind(&base::WaitableEvent::Signal,
+ base::Unretained(&event)));
+ event.Wait();
+}
- // wait for the task to run (we could use a kernel event here
- // instead to avoid busy waiting, but this is sufficient for
- // testing purposes).
- for (int i = 100; i >= 0 && !was_invoked; --i) {
- base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10));
- }
- EXPECT_TRUE(was_invoked);
+// Intentional test-only race for otherwise untestable code, won't fix.
+// https://crbug.com/634383
+#if !defined(THREAD_SANITIZER)
+TEST_F(ThreadTest, StartWithOptions_NonJoinable) {
+ Thread* a = new Thread("StartNonJoinable");
+ // Non-joinable threads have to be leaked for now (see
+ // Thread::Options::joinable for details).
+ ANNOTATE_LEAKING_OBJECT_PTR(a);
+
+ Thread::Options options;
+ options.joinable = false;
+ EXPECT_TRUE(a->StartWithOptions(options));
+ EXPECT_TRUE(a->message_loop());
+ EXPECT_TRUE(a->IsRunning());
+
+ // Without this call this test is racy. The above IsRunning() succeeds because
+ // of an early-return condition while between Start() and StopSoon(), after
+ // invoking StopSoon() below this early-return condition is no longer
+ // satisfied and the real |is_running_| bit has to be checked. It could still
+ // be false if the message loop hasn't started for real in practice. This is
+ // only a requirement for this test because the non-joinable property forces
+ // it to use StopSoon() and not wait for a complete Stop().
+ EXPECT_TRUE(a->WaitUntilThreadStarted());
+
+ // Make the thread block until |block_event| is signaled.
+ base::WaitableEvent block_event(
+ base::WaitableEvent::ResetPolicy::AUTOMATIC,
+ base::WaitableEvent::InitialState::NOT_SIGNALED);
+ a->task_runner()->PostTask(
+ FROM_HERE,
+ base::Bind(&base::WaitableEvent::Wait, base::Unretained(&block_event)));
+
+ a->StopSoon();
+ EXPECT_TRUE(a->IsRunning());
+
+ // Unblock the task and give a bit of extra time to unwind QuitWhenIdle().
+ block_event.Signal();
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
+
+ // The thread should now have stopped on its own.
+ EXPECT_FALSE(a->IsRunning());
}
+#endif
-TEST_F(ThreadTest, TwoTasks) {
+TEST_F(ThreadTest, TwoTasksOnJoinableThread) {
bool was_invoked = false;
{
- Thread a("TwoTasks");
+ Thread a("TwoTasksOnJoinableThread");
EXPECT_TRUE(a.Start());
EXPECT_TRUE(a.message_loop());
@@ -184,18 +220,164 @@ TEST_F(ThreadTest, TwoTasks) {
EXPECT_TRUE(was_invoked);
}
+TEST_F(ThreadTest, DestroyWhileRunningIsSafe) {
+ Thread a("DestroyWhileRunningIsSafe");
+ EXPECT_TRUE(a.Start());
+ EXPECT_TRUE(a.WaitUntilThreadStarted());
+}
+
+// TODO(gab): Enable this test when destroying a non-joinable Thread instance
+// is supported (proposal @ https://crbug.com/629139#c14).
+TEST_F(ThreadTest, DISABLED_DestroyWhileRunningNonJoinableIsSafe) {
+ {
+ Thread a("DestroyWhileRunningNonJoinableIsSafe");
+ Thread::Options options;
+ options.joinable = false;
+ EXPECT_TRUE(a.StartWithOptions(options));
+ EXPECT_TRUE(a.WaitUntilThreadStarted());
+ }
+
+ // Attempt to catch use-after-frees from the non-joinable thread in the
+ // scope of this test if any.
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
+}
+
TEST_F(ThreadTest, StopSoon) {
Thread a("StopSoon");
EXPECT_TRUE(a.Start());
EXPECT_TRUE(a.message_loop());
EXPECT_TRUE(a.IsRunning());
a.StopSoon();
+ a.Stop();
+ EXPECT_FALSE(a.message_loop());
+ EXPECT_FALSE(a.IsRunning());
+}
+
+TEST_F(ThreadTest, StopTwiceNop) {
+ Thread a("StopTwiceNop");
+ EXPECT_TRUE(a.Start());
+ EXPECT_TRUE(a.message_loop());
+ EXPECT_TRUE(a.IsRunning());
+ a.StopSoon();
+ // Calling StopSoon() a second time should be a nop.
a.StopSoon();
a.Stop();
+ // Same with Stop().
+ a.Stop();
+ EXPECT_FALSE(a.message_loop());
+ EXPECT_FALSE(a.IsRunning());
+ // Calling them when not running should also nop.
+ a.StopSoon();
+ a.Stop();
+}
+
+// TODO(gab): Enable this test in conjunction with re-enabling the sequence
+// check in Thread::Stop() as part of http://crbug.com/629139.
+TEST_F(ThreadTest, DISABLED_StopOnNonOwningThreadIsDeath) {
+ Thread a("StopOnNonOwningThreadDeath");
+ EXPECT_TRUE(a.StartAndWaitForTesting());
+
+ Thread b("NonOwningThread");
+ b.Start();
+ EXPECT_DCHECK_DEATH({
+ // Stopping |a| on |b| isn't allowed.
+ b.task_runner()->PostTask(FROM_HERE,
+ base::Bind(&Thread::Stop, base::Unretained(&a)));
+ // Block here so the DCHECK on |b| always happens in this scope.
+ base::PlatformThread::Sleep(base::TimeDelta::Max());
+ });
+}
+
+TEST_F(ThreadTest, TransferOwnershipAndStop) {
+ std::unique_ptr<Thread> a =
+ base::MakeUnique<Thread>("TransferOwnershipAndStop");
+ EXPECT_TRUE(a->StartAndWaitForTesting());
+ EXPECT_TRUE(a->IsRunning());
+
+ Thread b("TakingOwnershipThread");
+ b.Start();
+
+ base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED);
+
+ // a->DetachFromSequence() should allow |b| to use |a|'s Thread API.
+ a->DetachFromSequence();
+ b.task_runner()->PostTask(
+ FROM_HERE, base::Bind(
+ [](std::unique_ptr<Thread> thread_to_stop,
+ base::WaitableEvent* event_to_signal) -> void {
+ thread_to_stop->Stop();
+ event_to_signal->Signal();
+ },
+ base::Passed(&a), base::Unretained(&event)));
+
+ event.Wait();
+}
+
+TEST_F(ThreadTest, StartTwice) {
+ Thread a("StartTwice");
+
+ EXPECT_FALSE(a.message_loop());
+ EXPECT_FALSE(a.IsRunning());
+
+ EXPECT_TRUE(a.Start());
+ EXPECT_TRUE(a.message_loop());
+ EXPECT_TRUE(a.IsRunning());
+
+ a.Stop();
+ EXPECT_FALSE(a.message_loop());
+ EXPECT_FALSE(a.IsRunning());
+
+ EXPECT_TRUE(a.Start());
+ EXPECT_TRUE(a.message_loop());
+ EXPECT_TRUE(a.IsRunning());
+
+ a.Stop();
EXPECT_FALSE(a.message_loop());
EXPECT_FALSE(a.IsRunning());
}
+// Intentional test-only race for otherwise untestable code, won't fix.
+// https://crbug.com/634383
+#if !defined(THREAD_SANITIZER)
+TEST_F(ThreadTest, StartTwiceNonJoinableNotAllowed) {
+ LOG(ERROR) << __FUNCTION__;
+ Thread* a = new Thread("StartTwiceNonJoinable");
+ // Non-joinable threads have to be leaked for now (see
+ // Thread::Options::joinable for details).
+ ANNOTATE_LEAKING_OBJECT_PTR(a);
+
+ Thread::Options options;
+ options.joinable = false;
+ EXPECT_TRUE(a->StartWithOptions(options));
+ EXPECT_TRUE(a->message_loop());
+ EXPECT_TRUE(a->IsRunning());
+
+ // Signaled when last task on |a| is processed.
+ base::WaitableEvent last_task_event(
+ base::WaitableEvent::ResetPolicy::AUTOMATIC,
+ base::WaitableEvent::InitialState::NOT_SIGNALED);
+ a->task_runner()->PostTask(FROM_HERE,
+ base::Bind(&base::WaitableEvent::Signal,
+ base::Unretained(&last_task_event)));
+
+ // StopSoon() is non-blocking, Yield() to |a|, wait for last task to be
+ // processed and a little more for QuitWhenIdle() to unwind before considering
+ // the thread "stopped".
+ a->StopSoon();
+ base::PlatformThread::YieldCurrentThread();
+ last_task_event.Wait();
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
+
+ // This test assumes that the above was sufficient to let the thread fully
+ // stop.
+ ASSERT_FALSE(a->IsRunning());
+
+ // Restarting it should not be allowed.
+ EXPECT_DCHECK_DEATH(a->Start());
+}
+#endif
+
TEST_F(ThreadTest, ThreadName) {
Thread a("ThreadName");
EXPECT_TRUE(a.Start());
@@ -297,3 +479,91 @@ TEST_F(ThreadTest, MultipleWaitUntilThreadStarted) {
EXPECT_TRUE(a.WaitUntilThreadStarted());
EXPECT_TRUE(a.WaitUntilThreadStarted());
}
+
+TEST_F(ThreadTest, FlushForTesting) {
+ Thread a("FlushForTesting");
+
+ // Flushing a non-running thread should be a no-op.
+ a.FlushForTesting();
+
+ ASSERT_TRUE(a.Start());
+
+ // Flushing a thread with no tasks shouldn't block.
+ a.FlushForTesting();
+
+ constexpr base::TimeDelta kSleepPerTestTask =
+ base::TimeDelta::FromMilliseconds(50);
+ constexpr size_t kNumSleepTasks = 5;
+
+ const base::TimeTicks ticks_before_post = base::TimeTicks::Now();
+
+ for (size_t i = 0; i < kNumSleepTasks; ++i) {
+ a.task_runner()->PostTask(
+ FROM_HERE, base::Bind(&base::PlatformThread::Sleep, kSleepPerTestTask));
+ }
+
+ // All tasks should have executed, as reflected by the elapsed time.
+ a.FlushForTesting();
+ EXPECT_GE(base::TimeTicks::Now() - ticks_before_post,
+ kNumSleepTasks * kSleepPerTestTask);
+
+ a.Stop();
+
+ // Flushing a stopped thread should be a no-op.
+ a.FlushForTesting();
+}
+
+namespace {
+
+// A Thread which uses a MessageLoop on the stack. It won't start a real
+// underlying thread (instead its messages can be processed by a RunLoop on the
+// stack).
+class ExternalMessageLoopThread : public Thread {
+ public:
+ ExternalMessageLoopThread() : Thread("ExternalMessageLoopThread") {}
+
+ ~ExternalMessageLoopThread() override { Stop(); }
+
+ void InstallMessageLoop() { SetMessageLoop(&external_message_loop_); }
+
+ void VerifyUsingExternalMessageLoop(
+ bool expected_using_external_message_loop) {
+ EXPECT_EQ(expected_using_external_message_loop,
+ using_external_message_loop());
+ }
+
+ private:
+ base::MessageLoop external_message_loop_;
+
+ DISALLOW_COPY_AND_ASSIGN(ExternalMessageLoopThread);
+};
+
+} // namespace
+
+TEST_F(ThreadTest, ExternalMessageLoop) {
+ ExternalMessageLoopThread a;
+ EXPECT_FALSE(a.message_loop());
+ EXPECT_FALSE(a.IsRunning());
+ a.VerifyUsingExternalMessageLoop(false);
+
+ a.InstallMessageLoop();
+ EXPECT_TRUE(a.message_loop());
+ EXPECT_TRUE(a.IsRunning());
+ a.VerifyUsingExternalMessageLoop(true);
+
+ bool ran = false;
+ a.task_runner()->PostTask(
+ FROM_HERE, base::Bind([](bool* toggled) { *toggled = true; }, &ran));
+ base::RunLoop().RunUntilIdle();
+ EXPECT_TRUE(ran);
+
+ a.Stop();
+ EXPECT_FALSE(a.message_loop());
+ EXPECT_FALSE(a.IsRunning());
+ a.VerifyUsingExternalMessageLoop(true);
+
+ // Confirm that running any remaining tasks posted from Stop() goes smoothly
+ // (e.g. https://codereview.chromium.org/2135413003/#ps300001 crashed if
+ // StopSoon() posted Thread::ThreadQuitHelper() while |run_loop_| was null).
+ base::RunLoop().RunUntilIdle();
+}
diff --git a/base/threading/worker_pool.cc b/base/threading/worker_pool.cc
index 0b7bf8eca1..d47037d79a 100644
--- a/base/threading/worker_pool.cc
+++ b/base/threading/worker_pool.cc
@@ -4,9 +4,11 @@
#include "base/threading/worker_pool.h"
+#include <utility>
+
#include "base/bind.h"
#include "base/compiler_specific.h"
-#include "base/lazy_instance.h"
+#include "base/debug/leak_annotations.h"
#include "base/macros.h"
#include "base/task_runner.h"
#include "base/threading/post_task_and_reply_impl.h"
@@ -97,27 +99,27 @@ struct TaskRunnerHolder {
scoped_refptr<TaskRunner> taskrunners_[2];
};
-base::LazyInstance<TaskRunnerHolder>::Leaky
- g_taskrunners = LAZY_INSTANCE_INITIALIZER;
-
} // namespace
bool WorkerPool::PostTaskAndReply(const tracked_objects::Location& from_here,
- const Closure& task,
- const Closure& reply,
+ Closure task,
+ Closure reply,
bool task_is_slow) {
// Do not report PostTaskAndReplyRelay leaks in tests. There's nothing we can
// do about them because WorkerPool doesn't have a flushing API.
// http://crbug.com/248513
// http://crbug.com/290897
- return PostTaskAndReplyWorkerPool(task_is_slow).PostTaskAndReply(
- from_here, task, reply);
+ // Note: this annotation does not cover tasks posted through a TaskRunner.
+ ANNOTATE_SCOPED_MEMORY_LEAK;
+ return PostTaskAndReplyWorkerPool(task_is_slow)
+ .PostTaskAndReply(from_here, std::move(task), std::move(reply));
}
// static
const scoped_refptr<TaskRunner>&
WorkerPool::GetTaskRunner(bool tasks_are_slow) {
- return g_taskrunners.Get().taskrunners_[tasks_are_slow];
+ static auto* task_runner_holder = new TaskRunnerHolder();
+ return task_runner_holder->taskrunners_[tasks_are_slow];
}
} // namespace base
diff --git a/base/threading/worker_pool.h b/base/threading/worker_pool.h
index a52a41428b..865948e437 100644
--- a/base/threading/worker_pool.h
+++ b/base/threading/worker_pool.h
@@ -6,11 +6,9 @@
#define BASE_THREADING_WORKER_POOL_H_
#include "base/base_export.h"
-#include "base/callback_forward.h"
+#include "base/callback.h"
#include "base/memory/ref_counted.h"
-class Task;
-
namespace tracked_objects {
class Location;
} // namespace tracked_objects
@@ -40,8 +38,8 @@ class BASE_EXPORT WorkerPool {
// for |task| is a worker thread and you can specify |task_is_slow| just
// like you can for PostTask above.
static bool PostTaskAndReply(const tracked_objects::Location& from_here,
- const Closure& task,
- const Closure& reply,
+ Closure task,
+ Closure reply,
bool task_is_slow);
// Return true if the current thread is one that this WorkerPool runs tasks
diff --git a/base/threading/worker_pool_posix.cc b/base/threading/worker_pool_posix.cc
index 6b4c42f601..0e19a1a0fe 100644
--- a/base/threading/worker_pool_posix.cc
+++ b/base/threading/worker_pool_posix.cc
@@ -30,10 +30,21 @@ base::LazyInstance<ThreadLocalBoolean>::Leaky
const int kIdleSecondsBeforeExit = 10 * 60;
+#if defined(OS_MACOSX)
+// On Mac OS X a background thread's default stack size is 512Kb. We need at
+// least 1MB for compilation tasks in V8, so increase this default.
+const int kStackSize = 1 * 1024 * 1024;
+#else
+const int kStackSize = 0;
+#endif
+
class WorkerPoolImpl {
public:
WorkerPoolImpl();
- ~WorkerPoolImpl();
+
+ // WorkerPoolImpl is only instantiated as a leaky LazyInstance, so the
+ // destructor is never called.
+ ~WorkerPoolImpl() = delete;
void PostTask(const tracked_objects::Location& from_here,
const base::Closure& task,
@@ -47,17 +58,13 @@ WorkerPoolImpl::WorkerPoolImpl()
: pool_(new base::PosixDynamicThreadPool("WorkerPool",
kIdleSecondsBeforeExit)) {}
-WorkerPoolImpl::~WorkerPoolImpl() {
- pool_->Terminate();
-}
-
void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
const base::Closure& task,
bool /*task_is_slow*/) {
pool_->PostTask(from_here, task);
}
-base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool =
+base::LazyInstance<WorkerPoolImpl>::Leaky g_lazy_worker_pool =
LAZY_INSTANCE_INITIALIZER;
class WorkerThread : public PlatformThread::Delegate {
@@ -90,7 +97,7 @@ void WorkerThread::ThreadMain() {
tracked_objects::TaskStopwatch stopwatch;
stopwatch.Start();
- pending_task.task.Run();
+ std::move(pending_task.task).Run();
stopwatch.Stop();
tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking(
@@ -121,23 +128,13 @@ PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix,
: name_prefix_(name_prefix),
idle_seconds_before_exit_(idle_seconds_before_exit),
pending_tasks_available_cv_(&lock_),
- num_idle_threads_(0),
- terminated_(false) {}
+ num_idle_threads_(0) {}
PosixDynamicThreadPool::~PosixDynamicThreadPool() {
while (!pending_tasks_.empty())
pending_tasks_.pop();
}
-void PosixDynamicThreadPool::Terminate() {
- {
- AutoLock locked(lock_);
- DCHECK(!terminated_) << "Thread pool is already terminated.";
- terminated_ = true;
- }
- pending_tasks_available_cv_.Broadcast();
-}
-
void PosixDynamicThreadPool::PostTask(
const tracked_objects::Location& from_here,
const base::Closure& task) {
@@ -147,8 +144,6 @@ void PosixDynamicThreadPool::PostTask(
void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) {
AutoLock locked(lock_);
- DCHECK(!terminated_)
- << "This thread pool is already terminated. Do not post new tasks.";
pending_tasks_.push(std::move(*pending_task));
@@ -159,16 +154,13 @@ void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) {
// The new PlatformThread will take ownership of the WorkerThread object,
// which will delete itself on exit.
WorkerThread* worker = new WorkerThread(name_prefix_, this);
- PlatformThread::CreateNonJoinable(0, worker);
+ PlatformThread::CreateNonJoinable(kStackSize, worker);
}
}
PendingTask PosixDynamicThreadPool::WaitForTask() {
AutoLock locked(lock_);
- if (terminated_)
- return PendingTask(FROM_HERE, base::Closure());
-
if (pending_tasks_.empty()) { // No work available, wait for work.
num_idle_threads_++;
if (num_idle_threads_cv_.get())
diff --git a/base/threading/worker_pool_posix.h b/base/threading/worker_pool_posix.h
index 628e2b6420..d65ae8f8cf 100644
--- a/base/threading/worker_pool_posix.h
+++ b/base/threading/worker_pool_posix.h
@@ -38,8 +38,6 @@
#include "base/threading/platform_thread.h"
#include "base/tracked_objects.h"
-class Task;
-
namespace base {
class BASE_EXPORT PosixDynamicThreadPool
@@ -52,10 +50,6 @@ class BASE_EXPORT PosixDynamicThreadPool
PosixDynamicThreadPool(const std::string& name_prefix,
int idle_seconds_before_exit);
- // Indicates that the thread pool is going away. Stops handing out tasks to
- // worker threads. Wakes up all the idle threads to let them exit.
- void Terminate();
-
// Adds |task| to the thread pool.
void PostTask(const tracked_objects::Location& from_here,
const Closure& task);
@@ -85,7 +79,6 @@ class BASE_EXPORT PosixDynamicThreadPool
ConditionVariable pending_tasks_available_cv_;
int num_idle_threads_;
TaskQueue pending_tasks_;
- bool terminated_;
// Only used for tests to ensure correct thread ordering. It will always be
// NULL in non-test code.
std::unique_ptr<ConditionVariable> num_idle_threads_cv_;
diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc
index 6cefeed34e..b4e8b58520 100644
--- a/base/threading/worker_pool_posix_unittest.cc
+++ b/base/threading/worker_pool_posix_unittest.cc
@@ -103,12 +103,6 @@ class PosixDynamicThreadPoolTest : public testing::Test {
peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
}
- void TearDown() override {
- // Wake up the idle threads so they can terminate.
- if (pool_.get())
- pool_->Terminate();
- }
-
void WaitForTasksToStart(int num_tasks) {
base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
while (num_waiting_to_start_ < num_tasks) {