diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h')
-rw-r--r-- | unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h | 354 |
1 files changed, 242 insertions, 112 deletions
diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index 069680a11..e524b535a 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -12,67 +12,6 @@ namespace Eigen { -// Use the SimpleThreadPool by default. We'll switch to the new non blocking -// thread pool later. -#ifndef EIGEN_USE_SIMPLE_THREAD_POOL -template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>; -typedef NonBlockingThreadPool ThreadPool; -#else -template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>; -typedef SimpleThreadPool ThreadPool; -#endif - - -// Barrier is an object that allows one or more threads to wait until -// Notify has been called a specified number of times. -class Barrier { - public: - Barrier(unsigned int count) : state_(count << 1), notified_(false) { - eigen_assert(((count << 1) >> 1) == count); - } - ~Barrier() { - eigen_assert((state_>>1) == 0); - } - - void Notify() { - unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; - if (v != 1) { - eigen_assert(((v + 2) & ~1) != 0); - return; // either count has not dropped to 0, or waiter is not waiting - } - std::unique_lock<std::mutex> l(mu_); - eigen_assert(!notified_); - notified_ = true; - cv_.notify_all(); - } - - void Wait() { - unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); - if ((v >> 1) == 0) return; - std::unique_lock<std::mutex> l(mu_); - while (!notified_) { - cv_.wait(l); - } - } - - private: - std::mutex mu_; - std::condition_variable cv_; - std::atomic<unsigned int> state_; // low bit is waiter flag - bool notified_; -}; - - -// Notification is an object that allows a user to to wait for another -// thread to signal a notification that an event has occurred. -// -// Multiple threads can wait on the same Notification object, -// but only one caller must call Notify() on the object. -struct Notification : Barrier { - Notification() : Barrier(1) {}; -}; - - // Runs an arbitrary function and then calls Notify() on the passed in // Notification. template <typename Function, typename... Args> struct FunctionWrapperWithNotification @@ -102,22 +41,75 @@ static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) { } } +// An abstract interface to a device specific memory allocator. +class Allocator { + public: + virtual ~Allocator() {} + virtual void* allocate(size_t num_bytes) const = 0; + virtual void deallocate(void* buffer) const = 0; +}; // Build a thread pool device on top the an existing pool of threads. struct ThreadPoolDevice { // The ownership of the thread pool remains with the caller. - ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores) : pool_(pool), num_threads_(num_cores) { } + ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr) + : pool_(pool), num_threads_(num_cores), allocator_(allocator) { } EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const { - return internal::aligned_malloc(num_bytes); + return allocator_ ? allocator_->allocate(num_bytes) + : internal::aligned_malloc(num_bytes); } EIGEN_STRONG_INLINE void deallocate(void* buffer) const { - internal::aligned_free(buffer); + if (allocator_) { + allocator_->deallocate(buffer); + } else { + internal::aligned_free(buffer); + } + } + + EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const { + return allocate(num_bytes); + } + + EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const { + deallocate(buffer); + } + + template<typename Type> + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const { + return data; } EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const { +#ifdef __ANDROID__ ::memcpy(dst, src, n); +#else + // TODO(rmlarsen): Align blocks on cache lines. + // We have observed that going beyond 4 threads usually just wastes + // CPU cycles due to the threads competing for memory bandwidth, so we + // statically schedule at most 4 block copies here. + const size_t kMinBlockSize = 32768; + const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4); + if (n <= kMinBlockSize || num_threads < 2) { + ::memcpy(dst, src, n); + } else { + const char* src_ptr = static_cast<const char*>(src); + char* dst_ptr = static_cast<char*>(dst); + const size_t blocksize = (n + (num_threads - 1)) / num_threads; + Barrier barrier(static_cast<int>(num_threads - 1)); + // Launch the last 3 blocks on worker threads. + for (size_t i = 1; i < num_threads; ++i) { + enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] { + ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize, + numext::mini(blocksize, n - (i * blocksize))); + }); + } + // Launch the first block on the main thread. + ::memcpy(dst_ptr, src_ptr, blocksize); + barrier.Wait(); + } +#endif } EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const { memcpy(dst, src, n); @@ -134,6 +126,12 @@ struct ThreadPoolDevice { return num_threads_; } + // Number of theads available in the underlying thread pool. This number can + // be different from the value returned by numThreads(). + EIGEN_STRONG_INLINE int numThreadsInPool() const { + return pool_->NumThreads(); + } + EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const { return l1CacheSize(); } @@ -149,23 +147,31 @@ struct ThreadPoolDevice { } template <class Function, class... Args> - EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const { + EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, + Args&&... args) const { Notification* n = new Notification(); - pool_->Schedule(std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...)); + pool_->Schedule( + std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, + std::move(f), args...)); return n; } template <class Function, class... Args> - EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, - Function&& f, + EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f, Args&&... args) const { - pool_->Schedule(std::bind( - &FunctionWrapperWithBarrier<Function, Args...>::run, b, f, args...)); + pool_->Schedule( + std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run, b, + std::move(f), args...)); } template <class Function, class... Args> - EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const { - pool_->Schedule(std::bind(f, args...)); + EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, + Args&&... args) const { + if (sizeof...(args) > 0) { + pool_->Schedule(std::bind(std::move(f), args...)); + } else { + pool_->Schedule(std::move(f)); + } } // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if @@ -174,45 +180,193 @@ struct ThreadPoolDevice { return pool_->CurrentThreadId(); } - // parallelFor executes f with [0, n) arguments in parallel and waits for - // completion. F accepts a half-open interval [first, last). - // Block size is choosen based on the iteration cost and resulting parallel + // WARNING: This function is synchronous and will block the calling thread. + // + // Synchronous parallelFor executes f with [0, n) arguments in parallel and + // waits for completion. F accepts a half-open interval [first, last). Block + // size is chosen based on the iteration cost and resulting parallel // efficiency. If block_align is not nullptr, it is called to round up the // block size. void parallelFor(Index n, const TensorOpCost& cost, std::function<Index(Index)> block_align, std::function<void(Index, Index)> f) const { - typedef TensorCostModel<ThreadPoolDevice> CostModel; + if (EIGEN_PREDICT_FALSE(n <= 0)){ + return; + // Compute small problems directly in the caller thread. + } else if (n == 1 || numThreads() == 1 || + CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) { + f(0, n); + return; + } + + // Compute block size and total count of blocks. + ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align); + + // Recursively divide size into halves until we reach block_size. + // Division code rounds mid to block_size, so we are guaranteed to get + // block_count leaves that do actual computations. + Barrier barrier(static_cast<unsigned int>(block.count)); + std::function<void(Index, Index)> handleRange; + handleRange = [=, &handleRange, &barrier, &f](Index firstIdx, + Index lastIdx) { + while (lastIdx - firstIdx > block.size) { + // Split into halves and schedule the second half on a different thread. + const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size; + pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); }); + lastIdx = midIdx; + } + // Single block or less, execute directly. + f(firstIdx, lastIdx); + barrier.Notify(); + }; + + if (block.count <= numThreads()) { + // Avoid a thread hop by running the root of the tree and one block on the + // main thread. + handleRange(0, n); + } else { + // Execute the root in the thread pool to avoid running work on more than + // numThreads() threads. + pool_->Schedule([=, &handleRange]() { handleRange(0, n); }); + } + + barrier.Wait(); + } + + // Convenience wrapper for parallelFor that does not align blocks. + void parallelFor(Index n, const TensorOpCost& cost, + std::function<void(Index, Index)> f) const { + parallelFor(n, cost, nullptr, std::move(f)); + } + + // WARNING: This function is asynchronous and will not block the calling thread. + // + // Asynchronous parallelFor executes f with [0, n) arguments in parallel + // without waiting for completion. When the last block finished, it will call + // 'done' callback. F accepts a half-open interval [first, last). Block size + // is chosen based on the iteration cost and resulting parallel efficiency. If + // block_align is not nullptr, it is called to round up the block size. + void parallelForAsync(Index n, const TensorOpCost& cost, + std::function<Index(Index)> block_align, + std::function<void(Index, Index)> f, + std::function<void()> done) const { + // Compute small problems directly in the caller thread. if (n <= 1 || numThreads() == 1 || CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) { f(0, n); + done(); return; } - // Calculate block size based on (1) the iteration cost and (2) parallel - // efficiency. We want blocks to be not too small to mitigate - // parallelization overheads; not too large to mitigate tail - // effect and potential load imbalance and we also want number - // of blocks to be evenly dividable across threads. + // Compute block size and total count of blocks. + ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align); + + ParallelForAsyncContext* const ctx = + new ParallelForAsyncContext(block.count, std::move(f), std::move(done)); + + // Recursively divide size into halves until we reach block_size. + // Division code rounds mid to block_size, so we are guaranteed to get + // block_count leaves that do actual computations. + ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) { + while (lastIdx - firstIdx > block.size) { + // Split into halves and schedule the second half on a different thread. + const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size; + pool_->Schedule( + [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); }); + lastIdx = midIdx; + } + + // Single block or less, execute directly. + ctx->f(firstIdx, lastIdx); + + // Delete async context if it was the last block. + if (ctx->count.fetch_sub(1) == 1) delete ctx; + }; + + if (block.count <= numThreads()) { + // Avoid a thread hop by running the root of the tree and one block on the + // main thread. + ctx->handle_range(0, n); + } else { + // Execute the root in the thread pool to avoid running work on more than + // numThreads() threads. + pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); }); + } + } + + // Convenience wrapper for parallelForAsync that does not align blocks. + void parallelForAsync(Index n, const TensorOpCost& cost, + std::function<void(Index, Index)> f, + std::function<void()> done) const { + parallelForAsync(n, cost, nullptr, std::move(f), std::move(done)); + } + + // Thread pool accessor. + ThreadPoolInterface* getPool() const { return pool_; } + + // Allocator accessor. + Allocator* allocator() const { return allocator_; } + + private: + typedef TensorCostModel<ThreadPoolDevice> CostModel; + + // For parallelForAsync we must keep passed in closures on the heap, and + // delete them only after `done` callback finished. + struct ParallelForAsyncContext { + ParallelForAsyncContext(Index block_count, + std::function<void(Index, Index)> block_f, + std::function<void()> done_callback) + : count(block_count), + f(std::move(block_f)), + done(std::move(done_callback)) {} + ~ParallelForAsyncContext() { done(); } + + std::atomic<Index> count; + std::function<void(Index, Index)> f; + std::function<void()> done; + + std::function<void(Index, Index)> handle_range; + }; + + struct ParallelForBlock { + Index size; // block size + Index count; // number of blocks + }; + + // Calculates block size based on (1) the iteration cost and (2) parallel + // efficiency. We want blocks to be not too small to mitigate parallelization + // overheads; not too large to mitigate tail effect and potential load + // imbalance and we also want number of blocks to be evenly dividable across + // threads. + ParallelForBlock CalculateParallelForBlock( + const Index n, const TensorOpCost& cost, + std::function<Index(Index)> block_align) const { + const double block_size_f = 1.0 / CostModel::taskSize(1, cost); + const Index max_oversharding_factor = 4; + Index block_size = numext::mini( + n, numext::maxi<Index>( + divup<Index>(n, max_oversharding_factor * numThreads()), + block_size_f)); + const Index max_block_size = numext::mini(n, 2 * block_size); - double block_size_f = 1.0 / CostModel::taskSize(1, cost); - Index block_size = numext::mini(n, numext::maxi<Index>(1, block_size_f)); - const Index max_block_size = - numext::mini(n, numext::maxi<Index>(1, 2 * block_size_f)); if (block_align) { Index new_block_size = block_align(block_size); eigen_assert(new_block_size >= block_size); block_size = numext::mini(n, new_block_size); } + Index block_count = divup(n, block_size); + // Calculate parallel efficiency as fraction of total CPU time used for // computations: double max_efficiency = static_cast<double>(block_count) / (divup<int>(block_count, numThreads()) * numThreads()); + // Now try to increase block size up to max_block_size as long as it // doesn't decrease parallel efficiency. - for (Index prev_block_count = block_count; prev_block_count > 1;) { + for (Index prev_block_count = block_count; + max_efficiency < 1.0 && prev_block_count > 1;) { // This is the next block size that divides size into a smaller number // of blocks than the current block_size. Index coarser_block_size = divup(n, prev_block_count - 1); @@ -241,36 +395,12 @@ struct ThreadPoolDevice { } } - // Recursively divide size into halves until we reach block_size. - // Division code rounds mid to block_size, so we are guaranteed to get - // block_count leaves that do actual computations. - Barrier barrier(static_cast<unsigned int>(block_count)); - std::function<void(Index, Index)> handleRange; - handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) { - if (last - first <= block_size) { - // Single block or less, execute directly. - f(first, last); - barrier.Notify(); - return; - } - // Split into halves and submit to the pool. - Index mid = first + divup((last - first) / 2, block_size) * block_size; - pool_->Schedule([=, &handleRange]() { handleRange(mid, last); }); - pool_->Schedule([=, &handleRange]() { handleRange(first, mid); }); - }; - handleRange(0, n); - barrier.Wait(); - } - - // Convenience wrapper for parallelFor that does not align blocks. - void parallelFor(Index n, const TensorOpCost& cost, - std::function<void(Index, Index)> f) const { - parallelFor(n, cost, nullptr, std::move(f)); + return {block_size, block_count}; } - private: ThreadPoolInterface* pool_; int num_threads_; + Allocator* allocator_; }; |