summaryrefslogtreecommitdiff
path: root/base/threading/sequenced_worker_pool_unittest.cc
diff options
context:
space:
mode:
Diffstat (limited to 'base/threading/sequenced_worker_pool_unittest.cc')
-rw-r--r--base/threading/sequenced_worker_pool_unittest.cc1032
1 files changed, 0 insertions, 1032 deletions
diff --git a/base/threading/sequenced_worker_pool_unittest.cc b/base/threading/sequenced_worker_pool_unittest.cc
deleted file mode 100644
index 05989a5487..0000000000
--- a/base/threading/sequenced_worker_pool_unittest.cc
+++ /dev/null
@@ -1,1032 +0,0 @@
-// Copyright (c) 2012 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "base/threading/sequenced_worker_pool.h"
-
-#include <algorithm>
-
-#include "base/bind.h"
-#include "base/compiler_specific.h"
-#include "base/memory/ref_counted.h"
-#include "base/memory/scoped_ptr.h"
-#include "base/message_loop/message_loop.h"
-#include "base/synchronization/condition_variable.h"
-#include "base/synchronization/lock.h"
-#include "base/test/sequenced_task_runner_test_template.h"
-#include "base/test/sequenced_worker_pool_owner.h"
-#include "base/test/task_runner_test_template.h"
-#include "base/test/test_timeouts.h"
-#include "base/threading/platform_thread.h"
-#include "base/time/time.h"
-#include "base/tracked_objects.h"
-#include "testing/gtest/include/gtest/gtest.h"
-
-namespace base {
-
-// IMPORTANT NOTE:
-//
-// Many of these tests have failure modes where they'll hang forever. These
-// tests should not be flaky, and hanging indicates a type of failure. Do not
-// mark as flaky if they're hanging, it's likely an actual bug.
-
-namespace {
-
-const size_t kNumWorkerThreads = 3;
-
-// Allows a number of threads to all be blocked on the same event, and
-// provides a way to unblock a certain number of them.
-class ThreadBlocker {
- public:
- ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
-
- void Block() {
- {
- base::AutoLock lock(lock_);
- while (unblock_counter_ == 0)
- cond_var_.Wait();
- unblock_counter_--;
- }
- cond_var_.Signal();
- }
-
- void Unblock(size_t count) {
- {
- base::AutoLock lock(lock_);
- DCHECK_EQ(unblock_counter_, 0u);
- unblock_counter_ = count;
- }
- cond_var_.Signal();
- }
-
- private:
- base::Lock lock_;
- base::ConditionVariable cond_var_;
-
- size_t unblock_counter_;
-};
-
-class DestructionDeadlockChecker
- : public base::RefCountedThreadSafe<DestructionDeadlockChecker> {
- public:
- DestructionDeadlockChecker(const scoped_refptr<SequencedWorkerPool>& pool)
- : pool_(pool) {}
-
- protected:
- virtual ~DestructionDeadlockChecker() {
- // This method should not deadlock.
- pool_->RunsTasksOnCurrentThread();
- }
-
- private:
- scoped_refptr<SequencedWorkerPool> pool_;
- friend class base::RefCountedThreadSafe<DestructionDeadlockChecker>;
-};
-
-class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
- public:
- TestTracker()
- : lock_(),
- cond_var_(&lock_),
- started_events_(0) {
- }
-
- // Each of these tasks appends the argument to the complete sequence vector
- // so calling code can see what order they finished in.
- void FastTask(int id) {
- SignalWorkerDone(id);
- }
-
- void SlowTask(int id) {
- base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
- SignalWorkerDone(id);
- }
-
- void BlockTask(int id, ThreadBlocker* blocker) {
- // Note that this task has started and signal anybody waiting for that
- // to happen.
- {
- base::AutoLock lock(lock_);
- started_events_++;
- }
- cond_var_.Signal();
-
- blocker->Block();
- SignalWorkerDone(id);
- }
-
- void PostAdditionalTasks(
- int id, SequencedWorkerPool* pool,
- bool expected_return_value) {
- Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
- EXPECT_EQ(expected_return_value,
- pool->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE, fast_task,
- SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
- EXPECT_EQ(expected_return_value,
- pool->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE, fast_task,
- SequencedWorkerPool::SKIP_ON_SHUTDOWN));
- pool->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE, fast_task,
- SequencedWorkerPool::BLOCK_SHUTDOWN);
- SignalWorkerDone(id);
- }
-
- // This task posts itself back onto the SequencedWorkerPool before it
- // finishes running. Each instance of the task maintains a strong reference
- // to a DestructionDeadlockChecker. The DestructionDeadlockChecker is only
- // destroyed when the task is destroyed without being run, which only happens
- // during destruction of the SequencedWorkerPool.
- void PostRepostingTask(
- const scoped_refptr<SequencedWorkerPool>& pool,
- const scoped_refptr<DestructionDeadlockChecker>& checker) {
- Closure reposting_task =
- base::Bind(&TestTracker::PostRepostingTask, this, pool, checker);
- pool->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE, reposting_task, SequencedWorkerPool::SKIP_ON_SHUTDOWN);
- }
-
- // This task reposts itself back onto the SequencedWorkerPool before it
- // finishes running.
- void PostRepostingBlockingTask(
- const scoped_refptr<SequencedWorkerPool>& pool,
- const SequencedWorkerPool::SequenceToken& token) {
- Closure reposting_task =
- base::Bind(&TestTracker::PostRepostingBlockingTask, this, pool, token);
- pool->PostSequencedWorkerTaskWithShutdownBehavior(token,
- FROM_HERE, reposting_task, SequencedWorkerPool::BLOCK_SHUTDOWN);
- }
-
- void PostBlockingTaskThenUnblockThreads(
- const scoped_refptr<SequencedWorkerPool>& pool,
- ThreadBlocker* blocker,
- size_t threads_to_wake) {
- Closure arbitrary_task = base::Bind(&TestTracker::FastTask, this, 0);
- pool->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE, arbitrary_task, SequencedWorkerPool::BLOCK_SHUTDOWN);
- blocker->Unblock(threads_to_wake);
- }
-
- // Waits until the given number of tasks have started executing.
- void WaitUntilTasksBlocked(size_t count) {
- {
- base::AutoLock lock(lock_);
- while (started_events_ < count)
- cond_var_.Wait();
- }
- cond_var_.Signal();
- }
-
- // Blocks the current thread until at least the given number of tasks are in
- // the completed vector, and then returns a copy.
- std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
- std::vector<int> ret;
- {
- base::AutoLock lock(lock_);
- while (complete_sequence_.size() < num_tasks)
- cond_var_.Wait();
- ret = complete_sequence_;
- }
- cond_var_.Signal();
- return ret;
- }
-
- size_t GetTasksCompletedCount() {
- base::AutoLock lock(lock_);
- return complete_sequence_.size();
- }
-
- void ClearCompleteSequence() {
- base::AutoLock lock(lock_);
- complete_sequence_.clear();
- started_events_ = 0;
- }
-
- private:
- friend class base::RefCountedThreadSafe<TestTracker>;
- ~TestTracker() {}
-
- void SignalWorkerDone(int id) {
- {
- base::AutoLock lock(lock_);
- complete_sequence_.push_back(id);
- }
- cond_var_.Signal();
- }
-
- // Protects the complete_sequence.
- base::Lock lock_;
-
- base::ConditionVariable cond_var_;
-
- // Protected by lock_.
- std::vector<int> complete_sequence_;
-
- // Counter of the number of "block" workers that have started.
- size_t started_events_;
-};
-
-class SequencedWorkerPoolTest : public testing::Test {
- public:
- SequencedWorkerPoolTest()
- : tracker_(new TestTracker) {
- ResetPool();
- }
-
- void TearDown() override { pool()->Shutdown(); }
-
- const scoped_refptr<SequencedWorkerPool>& pool() {
- return pool_owner_->pool();
- }
- TestTracker* tracker() { return tracker_.get(); }
-
- // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
- // down, and creates a new instance.
- void ResetPool() {
- pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
- }
-
- void SetWillWaitForShutdownCallback(const Closure& callback) {
- pool_owner_->SetWillWaitForShutdownCallback(callback);
- }
-
- // Ensures that the given number of worker threads is created by adding
- // tasks and waiting until they complete. Worker thread creation is
- // serialized, can happen on background threads asynchronously, and doesn't
- // happen any more at shutdown. This means that if a test posts a bunch of
- // tasks and calls shutdown, fewer workers will be created than the test may
- // expect.
- //
- // This function ensures that this condition can't happen so tests can make
- // assumptions about the number of workers active. See the comment in
- // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
- // details.
- //
- // It will post tasks to the queue with id -1. It also assumes this is the
- // first thing called in a test since it will clear the complete_sequence_.
- void EnsureAllWorkersCreated() {
- // Create a bunch of threads, all waiting. This will cause that may
- // workers to be created.
- ThreadBlocker blocker;
- for (size_t i = 0; i < kNumWorkerThreads; i++) {
- pool()->PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), -1, &blocker));
- }
- tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
-
- // Now wake them up and wait until they're done.
- blocker.Unblock(kNumWorkerThreads);
- tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
-
- // Clean up the task IDs we added.
- tracker()->ClearCompleteSequence();
- }
-
- int has_work_call_count() const {
- return pool_owner_->has_work_call_count();
- }
-
- private:
- MessageLoop message_loop_;
- scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
- const scoped_refptr<TestTracker> tracker_;
-};
-
-// Checks that the given number of entries are in the tasks to complete of
-// the given tracker, and then signals the given event the given number of
-// times. This is used to wake up blocked background threads before blocking
-// on shutdown.
-void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
- size_t expected_tasks_to_complete,
- ThreadBlocker* blocker,
- size_t threads_to_awake) {
- EXPECT_EQ(
- expected_tasks_to_complete,
- tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
-
- blocker->Unblock(threads_to_awake);
-}
-
-class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
- public:
- explicit DeletionHelper(
- const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
- : deleted_flag_(deleted_flag) {
- }
-
- private:
- friend class base::RefCountedThreadSafe<DeletionHelper>;
- virtual ~DeletionHelper() { deleted_flag_->data = true; }
-
- const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
- DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
-};
-
-void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
- const scoped_refptr<DeletionHelper>& helper) {
- ADD_FAILURE() << "Should never run";
-}
-
-// Tests that delayed tasks are deleted upon shutdown of the pool.
-TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
- // Post something to verify the pool is started up.
- EXPECT_TRUE(pool()->PostTask(
- FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
-
- scoped_refptr<base::RefCountedData<bool> > deleted_flag(
- new base::RefCountedData<bool>(false));
-
- base::Time posted_at(base::Time::Now());
- // Post something that shouldn't run.
- EXPECT_TRUE(pool()->PostDelayedTask(
- FROM_HERE,
- base::Bind(&HoldPoolReference,
- pool(),
- make_scoped_refptr(new DeletionHelper(deleted_flag))),
- TestTimeouts::action_timeout()));
-
- std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
- ASSERT_EQ(1u, completion_sequence.size());
- ASSERT_EQ(1, completion_sequence[0]);
-
- pool()->Shutdown();
- // Shutdown is asynchronous, so use ResetPool() to block until the pool is
- // fully destroyed (and thus shut down).
- ResetPool();
-
- // Verify that we didn't block until the task was due.
- ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
-
- // Verify that the deferred task has not only not run, but has also been
- // destroyed.
- ASSERT_TRUE(deleted_flag->data);
-}
-
-// Tests that same-named tokens have the same ID.
-TEST_F(SequencedWorkerPoolTest, NamedTokens) {
- const std::string name1("hello");
- SequencedWorkerPool::SequenceToken token1 =
- pool()->GetNamedSequenceToken(name1);
-
- SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
-
- const std::string name3("goodbye");
- SequencedWorkerPool::SequenceToken token3 =
- pool()->GetNamedSequenceToken(name3);
-
- // All 3 tokens should be different.
- EXPECT_FALSE(token1.Equals(token2));
- EXPECT_FALSE(token1.Equals(token3));
- EXPECT_FALSE(token2.Equals(token3));
-
- // Requesting the same name again should give the same value.
- SequencedWorkerPool::SequenceToken token1again =
- pool()->GetNamedSequenceToken(name1);
- EXPECT_TRUE(token1.Equals(token1again));
-
- SequencedWorkerPool::SequenceToken token3again =
- pool()->GetNamedSequenceToken(name3);
- EXPECT_TRUE(token3.Equals(token3again));
-}
-
-// Tests that posting a bunch of tasks (many more than the number of worker
-// threads) runs them all.
-TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
- pool()->PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::SlowTask, tracker(), 0));
-
- const size_t kNumTasks = 20;
- for (size_t i = 1; i < kNumTasks; i++) {
- pool()->PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), i));
- }
-
- std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
- EXPECT_EQ(kNumTasks, result.size());
-}
-
-// Tests that posting a bunch of tasks (many more than the number of
-// worker threads) to two pools simultaneously runs them all twice.
-// This test is meant to shake out any concurrency issues between
-// pools (like histograms).
-TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
- SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
- SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
-
- base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
- pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
- pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
-
- const size_t kNumTasks = 20;
- for (size_t i = 1; i < kNumTasks; i++) {
- base::Closure fast_task =
- base::Bind(&TestTracker::FastTask, tracker(), i);
- pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
- pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
- }
-
- std::vector<int> result =
- tracker()->WaitUntilTasksComplete(2*kNumTasks);
- EXPECT_EQ(2 * kNumTasks, result.size());
-
- pool2.pool()->Shutdown();
- pool1.pool()->Shutdown();
-}
-
-// Test that tasks with the same sequence token are executed in order but don't
-// affect other tasks.
-TEST_F(SequencedWorkerPoolTest, Sequence) {
- // Fill all the worker threads except one.
- const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
- ThreadBlocker background_blocker;
- for (size_t i = 0; i < kNumBackgroundTasks; i++) {
- pool()->PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), i, &background_blocker));
- }
- tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
-
- // Create two tasks with the same sequence token, one that will block on the
- // event, and one which will just complete quickly when it's run. Since there
- // is one worker thread free, the first task will start and then block, and
- // the second task should be waiting.
- ThreadBlocker blocker;
- SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
- pool()->PostSequencedWorkerTask(
- token1, FROM_HERE,
- base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
- pool()->PostSequencedWorkerTask(
- token1, FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 101));
- EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
-
- // Create another two tasks as above with a different token. These will be
- // blocked since there are no slots to run.
- SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
- pool()->PostSequencedWorkerTask(
- token2, FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 200));
- pool()->PostSequencedWorkerTask(
- token2, FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 201));
- EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
-
- // Let one background task complete. This should then let both tasks of
- // token2 run to completion in order. The second task of token1 should still
- // be blocked.
- background_blocker.Unblock(1);
- std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
- ASSERT_EQ(3u, result.size());
- EXPECT_EQ(200, result[1]);
- EXPECT_EQ(201, result[2]);
-
- // Finish the rest of the background tasks. This should leave some workers
- // free with the second token1 task still blocked on the first.
- background_blocker.Unblock(kNumBackgroundTasks - 1);
- EXPECT_EQ(kNumBackgroundTasks + 2,
- tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
-
- // Allow the first task of token1 to complete. This should run the second.
- blocker.Unblock(1);
- result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
- ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
- EXPECT_EQ(100, result[result.size() - 2]);
- EXPECT_EQ(101, result[result.size() - 1]);
-}
-
-// Tests that any tasks posted after Shutdown are ignored.
-// Disabled for flakiness. See http://crbug.com/166451.
-TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
- // Start tasks to take all the threads and block them.
- EnsureAllWorkersCreated();
- ThreadBlocker blocker;
- for (size_t i = 0; i < kNumWorkerThreads; i++) {
- pool()->PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), i, &blocker));
- }
- tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
-
- SetWillWaitForShutdownCallback(
- base::Bind(&EnsureTasksToCompleteCountAndUnblock,
- scoped_refptr<TestTracker>(tracker()), 0,
- &blocker, kNumWorkerThreads));
-
- // Shutdown the worker pool. This should discard all non-blocking tasks.
- const int kMaxNewBlockingTasksAfterShutdown = 100;
- pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
-
- int old_has_work_call_count = has_work_call_count();
-
- std::vector<int> result =
- tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
-
- // The kNumWorkerThread items should have completed, in no particular order.
- ASSERT_EQ(kNumWorkerThreads, result.size());
- for (size_t i = 0; i < kNumWorkerThreads; i++) {
- EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
- result.end());
- }
-
- // No further tasks, regardless of shutdown mode, should be allowed.
- EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 100),
- SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
- EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 101),
- SequencedWorkerPool::SKIP_ON_SHUTDOWN));
- EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 102),
- SequencedWorkerPool::BLOCK_SHUTDOWN));
-
- ASSERT_EQ(old_has_work_call_count, has_work_call_count());
-}
-
-TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
- // Test that <n> new blocking tasks are allowed provided they're posted
- // by a running tasks.
- EnsureAllWorkersCreated();
- ThreadBlocker blocker;
-
- // Start tasks to take all the threads and block them.
- const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
- for (int i = 0; i < kNumBlockTasks; ++i) {
- EXPECT_TRUE(pool()->PostWorkerTask(
- FROM_HERE,
- base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
- }
- tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
-
- // Queue up shutdown blocking tasks behind those which will attempt to post
- // additional tasks when run, PostAdditionalTasks attemtps to post 3
- // new FastTasks, one for each shutdown_behavior.
- const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
- for (int i = 0; i < kNumQueuedTasks; ++i) {
- EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE,
- base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
- false),
- SequencedWorkerPool::BLOCK_SHUTDOWN));
- }
-
- // Setup to open the floodgates from within Shutdown().
- SetWillWaitForShutdownCallback(
- base::Bind(&EnsureTasksToCompleteCountAndUnblock,
- scoped_refptr<TestTracker>(tracker()),
- 0, &blocker, kNumBlockTasks));
-
- // Allow half of the additional blocking tasks thru.
- const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
- pool()->Shutdown(kNumNewBlockingTasksToAllow);
-
- // Ensure that the correct number of tasks actually got run.
- tracker()->WaitUntilTasksComplete(static_cast<size_t>(
- kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
-
- // Clean up the task IDs we added and go home.
- tracker()->ClearCompleteSequence();
-}
-
-// Tests that blocking tasks can still be posted during shutdown, as long as
-// the task is not being posted within the context of a running task.
-TEST_F(SequencedWorkerPoolTest,
- AllowsBlockingTasksDuringShutdownOutsideOfRunningTask) {
- EnsureAllWorkersCreated();
- ThreadBlocker blocker;
-
- // Start tasks to take all the threads and block them.
- const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
- for (int i = 0; i < kNumBlockTasks; ++i) {
- EXPECT_TRUE(pool()->PostWorkerTask(
- FROM_HERE,
- base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
- }
- tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
-
- // Setup to open the floodgates from within Shutdown().
- SetWillWaitForShutdownCallback(
- base::Bind(&TestTracker::PostBlockingTaskThenUnblockThreads,
- scoped_refptr<TestTracker>(tracker()), pool(), &blocker,
- kNumWorkerThreads));
- pool()->Shutdown(kNumWorkerThreads + 1);
-
- // Ensure that the correct number of tasks actually got run.
- tracker()->WaitUntilTasksComplete(static_cast<size_t>(kNumWorkerThreads + 1));
- tracker()->ClearCompleteSequence();
-}
-
-// Tests that unrun tasks are discarded properly according to their shutdown
-// mode.
-TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
- // Start tasks to take all the threads and block them.
- EnsureAllWorkersCreated();
- ThreadBlocker blocker;
- for (size_t i = 0; i < kNumWorkerThreads; i++) {
- pool()->PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), i, &blocker));
- }
- tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
-
- // Create some tasks with different shutdown modes.
- pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 100),
- SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
- pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 101),
- SequencedWorkerPool::SKIP_ON_SHUTDOWN);
- pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 102),
- SequencedWorkerPool::BLOCK_SHUTDOWN);
-
- // Shutdown the worker pool. This should discard all non-blocking tasks.
- SetWillWaitForShutdownCallback(
- base::Bind(&EnsureTasksToCompleteCountAndUnblock,
- scoped_refptr<TestTracker>(tracker()), 0,
- &blocker, kNumWorkerThreads));
- pool()->Shutdown();
-
- std::vector<int> result =
- tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
-
- // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
- // one, in no particular order.
- ASSERT_EQ(kNumWorkerThreads + 1, result.size());
- for (size_t i = 0; i < kNumWorkerThreads; i++) {
- EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
- result.end());
- }
- EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
-}
-
-// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
-TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
- scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
- SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
- scoped_refptr<SequencedTaskRunner> sequenced_runner(
- pool()->GetSequencedTaskRunnerWithShutdownBehavior(
- pool()->GetSequenceToken(),
- SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
- EnsureAllWorkersCreated();
- ThreadBlocker blocker;
- pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), 0, &blocker),
- SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
- runner->PostTask(
- FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), 1, &blocker));
- sequenced_runner->PostTask(
- FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), 2, &blocker));
-
- tracker()->WaitUntilTasksBlocked(3);
-
- // This should not block. If this test hangs, it means it failed.
- pool()->Shutdown();
-
- // The task should not have completed yet.
- EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
-
- // Posting more tasks should fail.
- EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
- SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
- EXPECT_FALSE(runner->PostTask(
- FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
- EXPECT_FALSE(sequenced_runner->PostTask(
- FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
-
- // Continue the background thread and make sure the tasks can complete.
- blocker.Unblock(3);
- std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
- EXPECT_EQ(3u, result.size());
-}
-
-// Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
-// until they stop, but tasks not yet started do not.
-TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
- // Start tasks to take all the threads and block them.
- EnsureAllWorkersCreated();
- ThreadBlocker blocker;
-
- // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
- // return until these tasks have completed.
- for (size_t i = 0; i < kNumWorkerThreads; i++) {
- pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE,
- base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
- SequencedWorkerPool::SKIP_ON_SHUTDOWN);
- }
- tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
-
- // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
- // executed once Shutdown() has been called.
- pool()->PostWorkerTaskWithShutdownBehavior(
- FROM_HERE,
- base::Bind(&TestTracker::BlockTask,
- tracker(), 0, &blocker),
- SequencedWorkerPool::SKIP_ON_SHUTDOWN);
-
- // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
- // been started block shutdown.
- SetWillWaitForShutdownCallback(
- base::Bind(&EnsureTasksToCompleteCountAndUnblock,
- scoped_refptr<TestTracker>(tracker()), 0,
- &blocker, kNumWorkerThreads));
-
- // No tasks should have completed yet.
- EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
-
- // This should not block. If this test hangs, it means it failed.
- pool()->Shutdown();
-
- // Shutdown should not return until all of the tasks have completed.
- std::vector<int> result =
- tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
-
- // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
- // allowed to complete. No additional non-blocking tasks should have been
- // started.
- ASSERT_EQ(kNumWorkerThreads, result.size());
- for (size_t i = 0; i < kNumWorkerThreads; i++) {
- EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
- result.end());
- }
-}
-
-// Ensure all worker threads are created, and then trigger a spurious
-// work signal. This shouldn't cause any other work signals to be
-// triggered. This is a regression test for http://crbug.com/117469.
-TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
- EnsureAllWorkersCreated();
- int old_has_work_call_count = has_work_call_count();
- pool()->SignalHasWorkForTesting();
- // This is inherently racy, but can only produce false positives.
- base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
- EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
-}
-
-void IsRunningOnCurrentThreadTask(
- SequencedWorkerPool::SequenceToken test_positive_token,
- SequencedWorkerPool::SequenceToken test_negative_token,
- SequencedWorkerPool* pool,
- SequencedWorkerPool* unused_pool) {
- EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
- EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
- EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
- EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
- EXPECT_FALSE(
- unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
- EXPECT_FALSE(
- unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
-}
-
-// Verify correctness of the IsRunningSequenceOnCurrentThread method.
-TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
- SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
- SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
- SequencedWorkerPool::SequenceToken unsequenced_token;
-
- scoped_refptr<SequencedWorkerPool> unused_pool =
- new SequencedWorkerPool(2, "unused_pool");
-
- EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
- EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
- EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
- EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
- EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
- EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
- EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
- EXPECT_FALSE(
- unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
-
- pool()->PostSequencedWorkerTask(
- token1, FROM_HERE,
- base::Bind(&IsRunningOnCurrentThreadTask,
- token1, token2, pool(), unused_pool));
- pool()->PostSequencedWorkerTask(
- token2, FROM_HERE,
- base::Bind(&IsRunningOnCurrentThreadTask,
- token2, unsequenced_token, pool(), unused_pool));
- pool()->PostWorkerTask(
- FROM_HERE,
- base::Bind(&IsRunningOnCurrentThreadTask,
- unsequenced_token, token1, pool(), unused_pool));
- pool()->Shutdown();
- unused_pool->Shutdown();
-}
-
-// Checks that tasks are destroyed in the right context during shutdown. If a
-// task is destroyed while SequencedWorkerPool's global lock is held,
-// SequencedWorkerPool might deadlock.
-TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) {
- for (int i = 0; i < 4; ++i) {
- scoped_refptr<DestructionDeadlockChecker> checker(
- new DestructionDeadlockChecker(pool()));
- tracker()->PostRepostingTask(pool(), checker);
- }
-
- // Shutting down the pool should destroy the DestructionDeadlockCheckers,
- // which in turn should not deadlock in their destructors.
- pool()->Shutdown();
-}
-
-// Similar to the test AvoidsDeadlockOnShutdown, but there are now also
-// sequenced, blocking tasks in the queue during shutdown.
-TEST_F(SequencedWorkerPoolTest,
- AvoidsDeadlockOnShutdownWithSequencedBlockingTasks) {
- const std::string sequence_token_name("name");
- for (int i = 0; i < 4; ++i) {
- scoped_refptr<DestructionDeadlockChecker> checker(
- new DestructionDeadlockChecker(pool()));
- tracker()->PostRepostingTask(pool(), checker);
-
- SequencedWorkerPool::SequenceToken token1 =
- pool()->GetNamedSequenceToken(sequence_token_name);
- tracker()->PostRepostingBlockingTask(pool(), token1);
- }
-
- // Shutting down the pool should destroy the DestructionDeadlockCheckers,
- // which in turn should not deadlock in their destructors.
- pool()->Shutdown();
-}
-
-// Verify that FlushForTesting works as intended.
-TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
- // Should be fine to call on a new instance.
- pool()->FlushForTesting();
-
- // Queue up a bunch of work, including a long delayed task and
- // a task that produces additional tasks as an artifact.
- pool()->PostDelayedWorkerTask(
- FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 0),
- TimeDelta::FromMinutes(5));
- pool()->PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::SlowTask, tracker(), 0));
- const size_t kNumFastTasks = 20;
- for (size_t i = 0; i < kNumFastTasks; i++) {
- pool()->PostWorkerTask(FROM_HERE,
- base::Bind(&TestTracker::FastTask, tracker(), 0));
- }
- pool()->PostWorkerTask(
- FROM_HERE,
- base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
- true));
-
- // We expect all except the delayed task to have been run. We verify all
- // closures have been deleted by looking at the refcount of the
- // tracker.
- EXPECT_FALSE(tracker()->HasOneRef());
- pool()->FlushForTesting();
- EXPECT_TRUE(tracker()->HasOneRef());
- EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
-
- // Should be fine to call on an idle instance with all threads created, and
- // spamming the method shouldn't deadlock or confuse the class.
- pool()->FlushForTesting();
- pool()->FlushForTesting();
-
- // Should be fine to call after shutdown too.
- pool()->Shutdown();
- pool()->FlushForTesting();
-}
-
-TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
- MessageLoop loop;
- scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
- scoped_refptr<SequencedTaskRunner> task_runner =
- pool->GetSequencedTaskRunnerWithShutdownBehavior(
- pool->GetSequenceToken(),
- base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
-
- // Upon test exit, should shut down without hanging.
- pool->Shutdown();
-}
-
-class SequencedWorkerPoolTaskRunnerTestDelegate {
- public:
- SequencedWorkerPoolTaskRunnerTestDelegate() {}
-
- ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
-
- void StartTaskRunner() {
- pool_owner_.reset(
- new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
- }
-
- scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
- return pool_owner_->pool();
- }
-
- void StopTaskRunner() {
- // Make sure all tasks are run before shutting down. Delayed tasks are
- // not run, they're simply deleted.
- pool_owner_->pool()->FlushForTesting();
- pool_owner_->pool()->Shutdown();
- // Don't reset |pool_owner_| here, as the test may still hold a
- // reference to the pool.
- }
-
- private:
- MessageLoop message_loop_;
- scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
-};
-
-INSTANTIATE_TYPED_TEST_CASE_P(
- SequencedWorkerPool, TaskRunnerTest,
- SequencedWorkerPoolTaskRunnerTestDelegate);
-
-class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
- public:
- SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
-
- ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
- }
-
- void StartTaskRunner() {
- pool_owner_.reset(
- new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
- task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
- SequencedWorkerPool::BLOCK_SHUTDOWN);
- }
-
- scoped_refptr<TaskRunner> GetTaskRunner() {
- return task_runner_;
- }
-
- void StopTaskRunner() {
- // Make sure all tasks are run before shutting down. Delayed tasks are
- // not run, they're simply deleted.
- pool_owner_->pool()->FlushForTesting();
- pool_owner_->pool()->Shutdown();
- // Don't reset |pool_owner_| here, as the test may still hold a
- // reference to the pool.
- }
-
- private:
- MessageLoop message_loop_;
- scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
- scoped_refptr<TaskRunner> task_runner_;
-};
-
-INSTANTIATE_TYPED_TEST_CASE_P(
- SequencedWorkerPoolTaskRunner, TaskRunnerTest,
- SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
-
-class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
- public:
- SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
-
- ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
- }
-
- void StartTaskRunner() {
- pool_owner_.reset(new SequencedWorkerPoolOwner(
- 10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
- task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
- pool_owner_->pool()->GetSequenceToken());
- }
-
- scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
- return task_runner_;
- }
-
- void StopTaskRunner() {
- // Make sure all tasks are run before shutting down. Delayed tasks are
- // not run, they're simply deleted.
- pool_owner_->pool()->FlushForTesting();
- pool_owner_->pool()->Shutdown();
- // Don't reset |pool_owner_| here, as the test may still hold a
- // reference to the pool.
- }
-
- private:
- MessageLoop message_loop_;
- scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
- scoped_refptr<SequencedTaskRunner> task_runner_;
-};
-
-INSTANTIATE_TYPED_TEST_CASE_P(
- SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
- SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
-
-INSTANTIATE_TYPED_TEST_CASE_P(
- SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
- SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
-
-} // namespace
-
-} // namespace base