diff options
Diffstat (limited to 'unsupported/test/cxx11_non_blocking_thread_pool.cpp')
-rw-r--r-- | unsupported/test/cxx11_non_blocking_thread_pool.cpp | 83 |
1 files changed, 78 insertions, 5 deletions
diff --git a/unsupported/test/cxx11_non_blocking_thread_pool.cpp b/unsupported/test/cxx11_non_blocking_thread_pool.cpp index 5f9bb938b..993ee1789 100644 --- a/unsupported/test/cxx11_non_blocking_thread_pool.cpp +++ b/unsupported/test/cxx11_non_blocking_thread_pool.cpp @@ -11,22 +11,23 @@ #define EIGEN_USE_THREADS #include "main.h" #include "Eigen/CXX11/ThreadPool" +#include "Eigen/CXX11/Tensor" static void test_create_destroy_empty_pool() { // Just create and destroy the pool. This will wind up and tear down worker // threads. Ensure there are no issues in that logic. for (int i = 0; i < 16; ++i) { - NonBlockingThreadPool tp(i); + ThreadPool tp(i); } } -static void test_parallelism() +static void test_parallelism(bool allow_spinning) { // Test we never-ever fail to match available tasks with idle threads. const int kThreads = 16; // code below expects that this is a multiple of 4 - NonBlockingThreadPool tp(kThreads); + ThreadPool tp(kThreads, allow_spinning); VERIFY_IS_EQUAL(tp.NumThreads(), kThreads); VERIFY_IS_EQUAL(tp.CurrentThreadId(), -1); for (int iter = 0; iter < 100; ++iter) { @@ -100,8 +101,80 @@ static void test_parallelism() } } -void test_cxx11_non_blocking_thread_pool() + +static void test_cancel() +{ + ThreadPool tp(2); + + // Schedule a large number of closure that each sleeps for one second. This + // will keep the thread pool busy for much longer than the default test timeout. + for (int i = 0; i < 1000; ++i) { + tp.Schedule([]() { + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + }); + } + + // Cancel the processing of all the closures that are still pending. + tp.Cancel(); +} + +static void test_pool_partitions() { + const int kThreads = 2; + ThreadPool tp(kThreads); + + // Assign each thread to its own partition, so that stealing other work only + // occurs globally when a thread is idle. + std::vector<std::pair<unsigned, unsigned>> steal_partitions(kThreads); + for (int i = 0; i < kThreads; ++i) { + steal_partitions[i] = std::make_pair(i, i + 1); + } + tp.SetStealPartitions(steal_partitions); + + std::atomic<int> running(0); + std::atomic<int> done(0); + std::atomic<int> phase(0); + + // Schedule kThreads tasks and ensure that they all are running. + for (int i = 0; i < kThreads; ++i) { + tp.Schedule([&]() { + const int thread_id = tp.CurrentThreadId(); + VERIFY_GE(thread_id, 0); + VERIFY_LE(thread_id, kThreads - 1); + ++running; + while (phase < 1) { + } + ++done; + }); + } + while (running != kThreads) { + } + // Schedule each closure to only run on thread 'i' and verify that it does. + for (int i = 0; i < kThreads; ++i) { + tp.ScheduleWithHint( + [&, i]() { + ++running; + const int thread_id = tp.CurrentThreadId(); + VERIFY_IS_EQUAL(thread_id, i); + while (phase < 2) { + } + ++done; + }, + i, i + 1); + } + running = 0; + phase = 1; + while (running != kThreads) { + } + running = 0; + phase = 2; +} + + +EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool) { CALL_SUBTEST(test_create_destroy_empty_pool()); - CALL_SUBTEST(test_parallelism()); + CALL_SUBTEST(test_parallelism(true)); + CALL_SUBTEST(test_parallelism(false)); + CALL_SUBTEST(test_cancel()); + CALL_SUBTEST(test_pool_partitions()); } |