summaryrefslogtreecommitdiff
path: root/lib/libstatspull/stats_pull_atom_callback.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/libstatspull/stats_pull_atom_callback.cpp')
-rw-r--r--lib/libstatspull/stats_pull_atom_callback.cpp121
1 files changed, 116 insertions, 5 deletions
diff --git a/lib/libstatspull/stats_pull_atom_callback.cpp b/lib/libstatspull/stats_pull_atom_callback.cpp
index 8a1c89b5..8c72944f 100644
--- a/lib/libstatspull/stats_pull_atom_callback.cpp
+++ b/lib/libstatspull/stats_pull_atom_callback.cpp
@@ -25,6 +25,7 @@
#include <stats_pull_atom_callback.h>
#include <map>
+#include <queue>
#include <thread>
#include <vector>
@@ -179,6 +180,10 @@ public:
}
std::shared_ptr<IStatsd> getStatsService() {
+ // There are host unit tests which are using libstatspull
+ // Since we do not have statsd on host - the getStatsService() is no-op and
+ // should return nullptr
+#ifdef __ANDROID__
std::lock_guard<std::mutex> lock(mStatsdMutex);
if (!mStatsd) {
// Fetch statsd
@@ -188,6 +193,7 @@ public:
AIBinder_linkToDeath(binder.get(), mDeathRecipient.get(), this);
}
}
+#endif // __ANDROID__
return mStatsd;
}
@@ -259,6 +265,114 @@ void unregisterStatsPullAtomCallbackBlocking(int32_t atomTag,
statsService->unregisterNativePullAtomCallback(atomTag);
}
+class CallbackOperationsHandler {
+ struct Cmd {
+ enum Type { CMD_REGISTER, CMD_UNREGISTER };
+
+ Type type;
+ int atomTag;
+ std::shared_ptr<StatsPullAtomCallbackInternal> callback;
+ };
+
+public:
+ ~CallbackOperationsHandler() {
+ for (auto& workThread : mWorkThreads) {
+ if (workThread.joinable()) {
+ mCondition.notify_one();
+ workThread.join();
+ }
+ }
+ }
+
+ static CallbackOperationsHandler& getInstance() {
+ static CallbackOperationsHandler handler;
+ return handler;
+ }
+
+ void registerCallback(int atomTag, std::shared_ptr<StatsPullAtomCallbackInternal> callback) {
+ auto registerCmd = std::make_unique<Cmd>();
+ registerCmd->type = Cmd::CMD_REGISTER;
+ registerCmd->atomTag = atomTag;
+ registerCmd->callback = std::move(callback);
+ pushToQueue(std::move(registerCmd));
+
+ std::thread registerThread(&CallbackOperationsHandler::processCommands, this,
+ statsProvider);
+ mWorkThreads.push_back(std::move(registerThread));
+ }
+
+ void unregisterCallback(int atomTag) {
+ auto unregisterCmd = std::make_unique<Cmd>();
+ unregisterCmd->type = Cmd::CMD_UNREGISTER;
+ unregisterCmd->atomTag = atomTag;
+ pushToQueue(std::move(unregisterCmd));
+
+ std::thread unregisterThread(&CallbackOperationsHandler::processCommands, this,
+ statsProvider);
+ mWorkThreads.push_back(std::move(unregisterThread));
+ }
+
+private:
+ std::vector<std::thread> mWorkThreads;
+
+ std::condition_variable mCondition;
+ std::mutex mMutex;
+ std::queue<std::unique_ptr<Cmd>> mCmdQueue;
+
+ CallbackOperationsHandler() {
+ }
+
+ void pushToQueue(std::unique_ptr<Cmd> cmd) {
+ {
+ std::unique_lock<std::mutex> lock(mMutex);
+ mCmdQueue.push(std::move(cmd));
+ }
+ mCondition.notify_one();
+ }
+
+ void processCommands(std::shared_ptr<StatsdProvider> statsProvider) {
+ /**
+ * First trying to obtain stats service instance
+ * This is a blocking call, which waits on service readiness
+ */
+ const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
+
+ /**
+ * To guarantee sequential commands processing we need to lock mutex queue
+ */
+ std::unique_lock<std::mutex> lock(mMutex);
+ /**
+ * This should never really block in practice, since the command was already queued
+ * from the main thread by registerCallback or unregisterCallback.
+ * We are putting command to the queue, and only after a worker thread is created,
+ * which will pop a single command from a queue and will be terminated after processing.
+ * It makes producer/consumer as 1:1 match
+ */
+ if (mCmdQueue.empty()) {
+ mCondition.wait(lock, [this] { return !this->mCmdQueue.empty(); });
+ }
+
+ std::unique_ptr<Cmd> cmd = std::move(mCmdQueue.front());
+ mCmdQueue.pop();
+
+ if (!statsService) {
+ // Statsd not available - dropping command request
+ return;
+ }
+
+ switch (cmd->type) {
+ case Cmd::CMD_REGISTER: {
+ registerStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider, cmd->callback);
+ break;
+ }
+ case Cmd::CMD_UNREGISTER: {
+ unregisterStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider);
+ break;
+ }
+ }
+ }
+};
+
void AStatsManager_setPullAtomCallback(int32_t atom_tag, AStatsManager_PullAtomMetadata* metadata,
AStatsManager_PullAtomCallback callback, void* cookie) {
int64_t coolDownMillis =
@@ -280,9 +394,7 @@ void AStatsManager_setPullAtomCallback(int32_t atom_tag, AStatsManager_PullAtomM
pullers[atom_tag] = callbackBinder;
}
- std::thread registerThread(registerStatsPullAtomCallbackBlocking, atom_tag, statsProvider,
- callbackBinder);
- registerThread.detach();
+ CallbackOperationsHandler::getInstance().registerCallback(atom_tag, callbackBinder);
}
void AStatsManager_clearPullAtomCallback(int32_t atom_tag) {
@@ -293,6 +405,5 @@ void AStatsManager_clearPullAtomCallback(int32_t atom_tag) {
pullers.erase(atom_tag);
}
- std::thread unregisterThread(unregisterStatsPullAtomCallbackBlocking, atom_tag, statsProvider);
- unregisterThread.detach();
+ CallbackOperationsHandler::getInstance().unregisterCallback(atom_tag);
}