// Copyright (C) 2017 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include "base/Compiler.h" #include "base/ConditionVariable.h" #include "base/Lock.h" #include "base/FunctorThread.h" #include #include #include // // WorkerThread encapsulates an asynchronous processing queue for objects // of type Item. It manages queue memory, runs processing function in a separate // thread and allows the processing function to stop it at any moment. // // Expected usage of the class: // // - Define an object to store all data for processing: // struct WorkItem { int number; }; // // - Create a WorkerThread with processing function: // WorkerThread worker([](WorkItem&& item) { // std::cout << item.number; // return item.number // ? WorkerProcessingResult::Continue // : WorkerProcessingResult::Stop; // }); // // - Start the worker and send some data for asynchronous processing // worker.start(); // worker.enqueue({1}); // worker.enqueue({2}); // worker.enqueue({}); // <--- this item will stop processing. // worker.join(); // // WorkerThread<>'s all methods are thread-safe, with an expectation that the // work could be added from any number of threads at once. // // Note: destructor calls join() implicitly - it's better to send some // end-of-work marker before trying to destroy a worker thread. // namespace android { namespace base { // Return values for a worker thread's processing function. enum class WorkerProcessingResult { Continue, Stop }; template class WorkerThread { DISALLOW_COPY_AND_ASSIGN(WorkerThread); public: using Result = WorkerProcessingResult; // A function that's called for each enqueued item in a separate thread. using Processor = std::function; WorkerThread(Processor&& processor) : mProcessor(std::move(processor)), mThread([this]() { worker(); }) { mQueue.reserve(10); } ~WorkerThread() { join(); } // Starts the worker thread. bool start() { mStarted = true; if (!mThread.start()) { mFinished = true; return false; } return true; } bool isStarted() const { return mStarted; } // Waits for all enqueue()'d items to finish. void waitQueuedItems() { if (!mStarted || mFinished) return; SyncPoint sync; enqueueImpl(&sync); base::AutoLock lock(sync.lock); sync.cv.wait(&lock, [&sync] { return sync.signaled; }); } // Waits for worker thread to complete. void join() { mThread.wait(); } // Moves the |item| into internal queue for processing. void enqueue(Item&& item) { enqueueImpl(std::move(item)); } private: struct SyncPoint { bool signaled = false; base::ConditionVariable cv; base::Lock lock; }; struct Command { Command(Item&& it) : hasItem(true), workItem(std::move(it)) {} Command(SyncPoint* sp) : hasItem(false), syncPoint(sp) {} Command(Command&& other) : hasItem(other.hasItem) { if (hasItem) { new (&workItem) Item(std::move(other.workItem)); } else { syncPoint = other.syncPoint; } } ~Command() { if (hasItem) { workItem.~Item(); } } bool hasItem; union { SyncPoint* syncPoint; Item workItem; }; }; template void enqueueImpl(T&& x) { base::AutoLock lock(mLock); bool signal = mQueue.empty(); mQueue.emplace_back(Command(std::move(x))); if (signal) { mCv.signalAndUnlock(&lock); } } void worker() { std::vector todo; todo.reserve(10); for (;;) { { base::AutoLock lock(mLock); while (mQueue.empty()) { mCv.wait(&lock); } todo.swap(mQueue); } for (Command& item : todo) { if (item.hasItem) { // Normal work item if (mProcessor(std::move(item.workItem)) == Result::Stop) { return; } } else { // Sync point base::AutoLock lock(item.syncPoint->lock); item.syncPoint->signaled = true; item.syncPoint->cv.signalAndUnlock(&lock); } } todo.clear(); } mFinished = true; } Processor mProcessor; base::FunctorThread mThread; std::vector mQueue; base::Lock mLock; base::ConditionVariable mCv; bool mStarted = false; bool mFinished = false; }; } // namespace base } // namespace android