diff options
Diffstat (limited to 'lib/libstatspull/stats_pull_atom_callback.cpp')
-rw-r--r-- | lib/libstatspull/stats_pull_atom_callback.cpp | 121 |
1 files changed, 116 insertions, 5 deletions
diff --git a/lib/libstatspull/stats_pull_atom_callback.cpp b/lib/libstatspull/stats_pull_atom_callback.cpp index 8a1c89b5..8c72944f 100644 --- a/lib/libstatspull/stats_pull_atom_callback.cpp +++ b/lib/libstatspull/stats_pull_atom_callback.cpp @@ -25,6 +25,7 @@ #include <stats_pull_atom_callback.h> #include <map> +#include <queue> #include <thread> #include <vector> @@ -179,6 +180,10 @@ public: } std::shared_ptr<IStatsd> getStatsService() { + // There are host unit tests which are using libstatspull + // Since we do not have statsd on host - the getStatsService() is no-op and + // should return nullptr +#ifdef __ANDROID__ std::lock_guard<std::mutex> lock(mStatsdMutex); if (!mStatsd) { // Fetch statsd @@ -188,6 +193,7 @@ public: AIBinder_linkToDeath(binder.get(), mDeathRecipient.get(), this); } } +#endif // __ANDROID__ return mStatsd; } @@ -259,6 +265,114 @@ void unregisterStatsPullAtomCallbackBlocking(int32_t atomTag, statsService->unregisterNativePullAtomCallback(atomTag); } +class CallbackOperationsHandler { + struct Cmd { + enum Type { CMD_REGISTER, CMD_UNREGISTER }; + + Type type; + int atomTag; + std::shared_ptr<StatsPullAtomCallbackInternal> callback; + }; + +public: + ~CallbackOperationsHandler() { + for (auto& workThread : mWorkThreads) { + if (workThread.joinable()) { + mCondition.notify_one(); + workThread.join(); + } + } + } + + static CallbackOperationsHandler& getInstance() { + static CallbackOperationsHandler handler; + return handler; + } + + void registerCallback(int atomTag, std::shared_ptr<StatsPullAtomCallbackInternal> callback) { + auto registerCmd = std::make_unique<Cmd>(); + registerCmd->type = Cmd::CMD_REGISTER; + registerCmd->atomTag = atomTag; + registerCmd->callback = std::move(callback); + pushToQueue(std::move(registerCmd)); + + std::thread registerThread(&CallbackOperationsHandler::processCommands, this, + statsProvider); + mWorkThreads.push_back(std::move(registerThread)); + } + + void unregisterCallback(int atomTag) { + auto unregisterCmd = std::make_unique<Cmd>(); + unregisterCmd->type = Cmd::CMD_UNREGISTER; + unregisterCmd->atomTag = atomTag; + pushToQueue(std::move(unregisterCmd)); + + std::thread unregisterThread(&CallbackOperationsHandler::processCommands, this, + statsProvider); + mWorkThreads.push_back(std::move(unregisterThread)); + } + +private: + std::vector<std::thread> mWorkThreads; + + std::condition_variable mCondition; + std::mutex mMutex; + std::queue<std::unique_ptr<Cmd>> mCmdQueue; + + CallbackOperationsHandler() { + } + + void pushToQueue(std::unique_ptr<Cmd> cmd) { + { + std::unique_lock<std::mutex> lock(mMutex); + mCmdQueue.push(std::move(cmd)); + } + mCondition.notify_one(); + } + + void processCommands(std::shared_ptr<StatsdProvider> statsProvider) { + /** + * First trying to obtain stats service instance + * This is a blocking call, which waits on service readiness + */ + const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService(); + + /** + * To guarantee sequential commands processing we need to lock mutex queue + */ + std::unique_lock<std::mutex> lock(mMutex); + /** + * This should never really block in practice, since the command was already queued + * from the main thread by registerCallback or unregisterCallback. + * We are putting command to the queue, and only after a worker thread is created, + * which will pop a single command from a queue and will be terminated after processing. + * It makes producer/consumer as 1:1 match + */ + if (mCmdQueue.empty()) { + mCondition.wait(lock, [this] { return !this->mCmdQueue.empty(); }); + } + + std::unique_ptr<Cmd> cmd = std::move(mCmdQueue.front()); + mCmdQueue.pop(); + + if (!statsService) { + // Statsd not available - dropping command request + return; + } + + switch (cmd->type) { + case Cmd::CMD_REGISTER: { + registerStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider, cmd->callback); + break; + } + case Cmd::CMD_UNREGISTER: { + unregisterStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider); + break; + } + } + } +}; + void AStatsManager_setPullAtomCallback(int32_t atom_tag, AStatsManager_PullAtomMetadata* metadata, AStatsManager_PullAtomCallback callback, void* cookie) { int64_t coolDownMillis = @@ -280,9 +394,7 @@ void AStatsManager_setPullAtomCallback(int32_t atom_tag, AStatsManager_PullAtomM pullers[atom_tag] = callbackBinder; } - std::thread registerThread(registerStatsPullAtomCallbackBlocking, atom_tag, statsProvider, - callbackBinder); - registerThread.detach(); + CallbackOperationsHandler::getInstance().registerCallback(atom_tag, callbackBinder); } void AStatsManager_clearPullAtomCallback(int32_t atom_tag) { @@ -293,6 +405,5 @@ void AStatsManager_clearPullAtomCallback(int32_t atom_tag) { pullers.erase(atom_tag); } - std::thread unregisterThread(unregisterStatsPullAtomCallbackBlocking, atom_tag, statsProvider); - unregisterThread.detach(); + CallbackOperationsHandler::getInstance().unregisterCallback(atom_tag); } |