diff options
author | Vova Sharaienko <sharaienko@google.com> | 2024-01-05 01:34:37 +0000 |
---|---|---|
committer | Gerrit Code Review <noreply-gerritcodereview@google.com> | 2024-01-05 01:34:37 +0000 |
commit | 4023b61c71826864cf08b9d3840b959c9632d147 (patch) | |
tree | 7eb67a4539dae70733394bca72732d8ec44fb3de | |
parent | e0e198346020ad949bfb92f4bd4524bc033fec7b (diff) | |
parent | f0e84ad0b0c1e8bf3e0e6aa8414762178567b541 (diff) | |
download | StatsD-4023b61c71826864cf08b9d3840b959c9632d147.tar.gz |
Merge "[statsd] Added tracking event queue max size seen so far" into main
-rw-r--r-- | statsd/src/StatsService.cpp | 3 | ||||
-rw-r--r-- | statsd/src/guardrail/StatsdStats.cpp | 24 | ||||
-rw-r--r-- | statsd/src/guardrail/StatsdStats.h | 57 | ||||
-rw-r--r-- | statsd/src/logd/LogEventQueue.cpp | 5 | ||||
-rw-r--r-- | statsd/src/logd/LogEventQueue.h | 2 | ||||
-rw-r--r-- | statsd/src/socket/StatsSocketListener.cpp | 8 | ||||
-rw-r--r-- | statsd/src/socket/StatsSocketListener.h | 1 | ||||
-rw-r--r-- | statsd/src/stats_log.proto | 7 | ||||
-rw-r--r-- | statsd/tests/SocketListener_test.cpp | 7 | ||||
-rw-r--r-- | statsd/tests/guardrail/StatsdStats_test.cpp | 10 | ||||
-rw-r--r-- | statsd/tests/log_event/LogEventQueue_test.cpp | 122 |
11 files changed, 202 insertions, 44 deletions
diff --git a/statsd/src/StatsService.cpp b/statsd/src/StatsService.cpp index 8b2c218c..86159a72 100644 --- a/statsd/src/StatsService.cpp +++ b/statsd/src/StatsService.cpp @@ -1515,8 +1515,9 @@ void StatsService::stopReadingLogs() { // Push this event so that readLogs will process and break out of the loop // after the stop is requested. int64_t timeStamp; + int32_t newSize; std::unique_ptr<LogEvent> logEvent = std::make_unique<LogEvent>(/*uid=*/0, /*pid=*/0); - mEventQueue->push(std::move(logEvent), &timeStamp); + mEventQueue->push(std::move(logEvent), timeStamp, newSize); } } // namespace statsd diff --git a/statsd/src/guardrail/StatsdStats.cpp b/statsd/src/guardrail/StatsdStats.cpp index 939d39fa..d8220c6a 100644 --- a/statsd/src/guardrail/StatsdStats.cpp +++ b/statsd/src/guardrail/StatsdStats.cpp @@ -62,6 +62,7 @@ const int FIELD_ID_SHARD_OFFSET = 21; const int FIELD_ID_STATSD_STATS_ID = 22; const int FIELD_ID_SUBSCRIPTION_STATS = 23; const int FIELD_ID_SOCKET_LOSS_STATS = 24; +const int FIELD_ID_QUEUE_STATS = 25; const int FIELD_ID_RESTRICTED_METRIC_QUERY_STATS_CALLING_UID = 1; const int FIELD_ID_RESTRICTED_METRIC_QUERY_STATS_CONFIG_ID = 2; @@ -93,6 +94,9 @@ const int FIELD_ID_OVERFLOW_COUNT = 1; const int FIELD_ID_OVERFLOW_MAX_HISTORY = 2; const int FIELD_ID_OVERFLOW_MIN_HISTORY = 3; +const int FIELD_ID_QUEUE_MAX_SIZE_OBSERVED = 1; +const int FIELD_ID_QUEUE_MAX_SIZE_OBSERVED_ELAPSED_NANOS = 2; + const int FIELD_ID_CONFIG_STATS_UID = 1; const int FIELD_ID_CONFIG_STATS_ID = 2; const int FIELD_ID_CONFIG_STATS_CREATION = 3; @@ -361,6 +365,15 @@ void StatsdStats::noteEventQueueOverflow(int64_t oldestEventTimestampNs, int32_t noteAtomDroppedLocked(atomId); } +void StatsdStats::noteEventQueueSize(int32_t size, int64_t eventTimestampNs) { + lock_guard<std::mutex> lock(mLock); + + if (mEventQueueMaxSizeObserved < size) { + mEventQueueMaxSizeObserved = size; + mEventQueueMaxSizeObservedElapsedNanos = eventTimestampNs; + } +} + void StatsdStats::noteAtomDroppedLocked(int32_t atomId) { constexpr int kMaxPushedAtomDroppedStatsSize = kMaxPushedAtomId + kMaxNonPlatformPushedAtoms; if (mPushedAtomDropsStats.size() < kMaxPushedAtomDroppedStatsSize || @@ -1029,6 +1042,8 @@ void StatsdStats::resetInternalLocked() { mOverflowCount = 0; mMinQueueHistoryNs = kInt64Max; mMaxQueueHistoryNs = 0; + mEventQueueMaxSizeObserved = 0; + mEventQueueMaxSizeObservedElapsedNanos = 0; for (auto& config : mConfigStats) { config.second->broadcast_sent_time_sec.clear(); config.second->activation_time_sec.clear(); @@ -1428,6 +1443,8 @@ void StatsdStats::dumpStats(int out) const { dprintf(out, "********EventQueueOverflow stats***********\n"); dprintf(out, "Event queue overflow: %d; MaxHistoryNs: %lld; MinHistoryNs: %lld\n", mOverflowCount, (long long)mMaxQueueHistoryNs, (long long)mMinQueueHistoryNs); + dprintf(out, "Event queue max size: %d; Observed at : %lld\n", mEventQueueMaxSizeObserved, + (long long)mEventQueueMaxSizeObservedElapsedNanos); if (mActivationBroadcastGuardrailStats.size() > 0) { dprintf(out, "********mActivationBroadcastGuardrail stats***********\n"); @@ -1783,6 +1800,13 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) { proto.end(token); } + uint64_t queueStatsToken = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_QUEUE_STATS); + proto.write(FIELD_TYPE_INT32 | FIELD_ID_QUEUE_MAX_SIZE_OBSERVED, + (int32_t)mEventQueueMaxSizeObserved); + proto.write(FIELD_TYPE_INT64 | FIELD_ID_QUEUE_MAX_SIZE_OBSERVED_ELAPSED_NANOS, + (long long)mEventQueueMaxSizeObservedElapsedNanos); + proto.end(queueStatsToken); + for (const auto& restart : mSystemServerRestartSec) { proto.write(FIELD_TYPE_INT32 | FIELD_ID_SYSTEM_SERVER_RESTART | FIELD_COUNT_REPEATED, restart); diff --git a/statsd/src/guardrail/StatsdStats.h b/statsd/src/guardrail/StatsdStats.h index c7ca8d0c..eadb353c 100644 --- a/statsd/src/guardrail/StatsdStats.h +++ b/statsd/src/guardrail/StatsdStats.h @@ -614,6 +614,9 @@ public: * in the queue */ void noteEventQueueOverflow(int64_t oldestEventTimestampNs, int32_t atomId, bool isSkipped); + /* Notes queue max size seen so far and associated timestamp */ + void noteEventQueueSize(int32_t size, int64_t eventTimestampNs); + /** * Reports that the activation broadcast guardrail was hit for this uid. Namely, the broadcast * should have been sent, but instead was skipped due to hitting the guardrail. @@ -922,6 +925,12 @@ private: // Total number of events that are lost due to queue overflow. int32_t mOverflowCount = 0; + // Max number of events stored into the queue seen so far. + int32_t mEventQueueMaxSizeObserved = 0; + + // Event timestamp for associated max size hit. + int64_t mEventQueueMaxSizeObservedElapsedNanos = 0; + // Timestamps when we detect log loss, and the number of logs lost. std::list<LogLossStats> mLogLossStats; @@ -1009,40 +1018,42 @@ private: */ StatsdStats::AtomMetricStats& getAtomMetricStats(int64_t metricId); - FRIEND_TEST(StatsdStatsTest, TestValidConfigAdd); + FRIEND_TEST(LogEventQueue_test, TestQueueMaxSize); + FRIEND_TEST(SocketParseMessageTest, TestProcessMessage); + FRIEND_TEST(StatsLogProcessorTest, InvalidConfigRemoved); + FRIEND_TEST(StatsdStatsTest, TestActivationBroadcastGuardrailHit); + FRIEND_TEST(StatsdStatsTest, TestAnomalyMonitor); + FRIEND_TEST(StatsdStatsTest, TestAtomDroppedStats); + FRIEND_TEST(StatsdStatsTest, TestAtomErrorStats); + FRIEND_TEST(StatsdStatsTest, TestAtomLog); + FRIEND_TEST(StatsdStatsTest, TestAtomLoggedAndDroppedAndSkippedStats); + FRIEND_TEST(StatsdStatsTest, TestAtomLoggedAndDroppedStats); + FRIEND_TEST(StatsdStatsTest, TestAtomMetricsStats); + FRIEND_TEST(StatsdStatsTest, TestAtomSkippedStats); + FRIEND_TEST(StatsdStatsTest, TestConfigRemove); + FRIEND_TEST(StatsdStatsTest, TestHasHitDimensionGuardrail); FRIEND_TEST(StatsdStatsTest, TestInvalidConfigAdd); FRIEND_TEST(StatsdStatsTest, TestInvalidConfigMissingMetricId); FRIEND_TEST(StatsdStatsTest, TestInvalidConfigOnlyMetricId); - FRIEND_TEST(StatsdStatsTest, TestConfigRemove); - FRIEND_TEST(StatsdStatsTest, TestSubStats); - FRIEND_TEST(StatsdStatsTest, TestAtomLog); FRIEND_TEST(StatsdStatsTest, TestNonPlatformAtomLog); - FRIEND_TEST(StatsdStatsTest, TestTimestampThreshold); - FRIEND_TEST(StatsdStatsTest, TestAnomalyMonitor); - FRIEND_TEST(StatsdStatsTest, TestSystemServerCrash); FRIEND_TEST(StatsdStatsTest, TestPullAtomStats); - FRIEND_TEST(StatsdStatsTest, TestAtomMetricsStats); - FRIEND_TEST(StatsdStatsTest, TestActivationBroadcastGuardrailHit); - FRIEND_TEST(StatsdStatsTest, TestAtomErrorStats); - FRIEND_TEST(StatsdStatsTest, TestAtomSkippedStats); - FRIEND_TEST(StatsdStatsTest, TestRestrictedMetricsStats); + FRIEND_TEST(StatsdStatsTest, TestQueueStats); FRIEND_TEST(StatsdStatsTest, TestRestrictedMetricsQueryStats); - FRIEND_TEST(StatsdStatsTest, TestAtomDroppedStats); - FRIEND_TEST(StatsdStatsTest, TestAtomLoggedAndDroppedStats); - FRIEND_TEST(StatsdStatsTest, TestAtomLoggedAndDroppedAndSkippedStats); + FRIEND_TEST(StatsdStatsTest, TestRestrictedMetricsStats); FRIEND_TEST(StatsdStatsTest, TestShardOffsetProvider); - FRIEND_TEST(StatsdStatsTest, TestHasHitDimensionGuardrail); - FRIEND_TEST(StatsdStatsTest, TestSubscriptionStarted); - FRIEND_TEST(StatsdStatsTest, TestSubscriptionFlushed); - FRIEND_TEST(StatsdStatsTest, TestSubscriptionEnded); + FRIEND_TEST(StatsdStatsTest, TestSocketLossStats); + FRIEND_TEST(StatsdStatsTest, TestSocketLossStatsOverflowCounter); + FRIEND_TEST(StatsdStatsTest, TestSubStats); FRIEND_TEST(StatsdStatsTest, TestSubscriptionAtomPulled); + FRIEND_TEST(StatsdStatsTest, TestSubscriptionEnded); + FRIEND_TEST(StatsdStatsTest, TestSubscriptionFlushed); FRIEND_TEST(StatsdStatsTest, TestSubscriptionPullThreadWakeup); + FRIEND_TEST(StatsdStatsTest, TestSubscriptionStarted); FRIEND_TEST(StatsdStatsTest, TestSubscriptionStartedMaxActiveSubscriptions); FRIEND_TEST(StatsdStatsTest, TestSubscriptionStartedRemoveFinishedSubscription); - FRIEND_TEST(StatsdStatsTest, TestSocketLossStats); - FRIEND_TEST(StatsdStatsTest, TestSocketLossStatsOverflowCounter); - - FRIEND_TEST(StatsLogProcessorTest, InvalidConfigRemoved); + FRIEND_TEST(StatsdStatsTest, TestSystemServerCrash); + FRIEND_TEST(StatsdStatsTest, TestTimestampThreshold); + FRIEND_TEST(StatsdStatsTest, TestValidConfigAdd); }; InvalidConfigReason createInvalidConfigReasonWithMatcher(const InvalidConfigReasonEnum reason, diff --git a/statsd/src/logd/LogEventQueue.cpp b/statsd/src/logd/LogEventQueue.cpp index 96b7f015..34843e73 100644 --- a/statsd/src/logd/LogEventQueue.cpp +++ b/statsd/src/logd/LogEventQueue.cpp @@ -39,7 +39,7 @@ unique_ptr<LogEvent> LogEventQueue::waitPop() { return item; } -bool LogEventQueue::push(unique_ptr<LogEvent> item, int64_t* oldestTimestampNs) { +bool LogEventQueue::push(unique_ptr<LogEvent> item, int64_t& oldestTimestampNs, int32_t& newSize) { bool success; { std::unique_lock<std::mutex> lock(mMutex); @@ -48,9 +48,10 @@ bool LogEventQueue::push(unique_ptr<LogEvent> item, int64_t* oldestTimestampNs) success = true; } else { // safe operation as queue must not be empty. - *oldestTimestampNs = mQueue.front()->GetElapsedTimestampNs(); + oldestTimestampNs = mQueue.front()->GetElapsedTimestampNs(); success = false; } + newSize = mQueue.size(); } mCondition.notify_one(); diff --git a/statsd/src/logd/LogEventQueue.h b/statsd/src/logd/LogEventQueue.h index 53f2adf2..f7cf850d 100644 --- a/statsd/src/logd/LogEventQueue.h +++ b/statsd/src/logd/LogEventQueue.h @@ -45,7 +45,7 @@ public: * Returns false on failure when the queue is full, and output the oldest event timestamp * in the queue. */ - bool push(std::unique_ptr<LogEvent> event, int64_t* oldestTimestampNs); + bool push(std::unique_ptr<LogEvent> event, int64_t& oldestTimestampNs, int32_t& newSize); private: const size_t mQueueLimit; diff --git a/statsd/src/socket/StatsSocketListener.cpp b/statsd/src/socket/StatsSocketListener.cpp index 3f360333..96c7f1f7 100644 --- a/statsd/src/socket/StatsSocketListener.cpp +++ b/statsd/src/socket/StatsSocketListener.cpp @@ -148,6 +148,7 @@ void StatsSocketListener::processMessage(const uint8_t* msg, uint32_t len, uint3 const int32_t atomId = logEvent->GetTagId(); const bool isAtomSkipped = logEvent->isParsedHeaderOnly(); + const int64_t atomTimestamp = logEvent->GetElapsedTimestampNs(); if (atomId == util::STATS_SOCKET_LOSS_REPORTED) { if (isAtomSkipped) { @@ -164,9 +165,12 @@ void StatsSocketListener::processMessage(const uint8_t* msg, uint32_t len, uint3 } } - int64_t oldestTimestamp; - if (!queue->push(std::move(logEvent), &oldestTimestamp)) { + int64_t oldestTimestamp = 0; + int32_t queueSize = 0; + if (!queue->push(std::move(logEvent), oldestTimestamp, queueSize)) { StatsdStats::getInstance().noteEventQueueOverflow(oldestTimestamp, atomId, isAtomSkipped); + } else { + StatsdStats::getInstance().noteEventQueueSize(queueSize, atomTimestamp); } } diff --git a/statsd/src/socket/StatsSocketListener.h b/statsd/src/socket/StatsSocketListener.h index 903eff39..1c56ff3a 100644 --- a/statsd/src/socket/StatsSocketListener.h +++ b/statsd/src/socket/StatsSocketListener.h @@ -81,6 +81,7 @@ private: FRIEND_TEST(SocketParseMessageTest, TestProcessMessageFilterCompleteSet); FRIEND_TEST(SocketParseMessageTest, TestProcessMessageFilterPartialSet); FRIEND_TEST(SocketParseMessageTest, TestProcessMessageFilterToggle); + FRIEND_TEST(LogEventQueue_test, TestQueueMaxSize); }; } // namespace statsd diff --git a/statsd/src/stats_log.proto b/statsd/src/stats_log.proto index b0955467..fdb031a9 100644 --- a/statsd/src/stats_log.proto +++ b/statsd/src/stats_log.proto @@ -682,6 +682,13 @@ message StatsdStatsReport { } optional SocketLossStats socket_loss_stats = 24; + + message EventQueueStats { + optional int32 max_size_observed = 1; + optional int64 max_size_observed_elapsed_nanos = 2; + } + + optional EventQueueStats event_queue_stats = 25; } message AlertTriggerDetails { diff --git a/statsd/tests/SocketListener_test.cpp b/statsd/tests/SocketListener_test.cpp index 90e5d3e1..3636508f 100644 --- a/statsd/tests/SocketListener_test.cpp +++ b/statsd/tests/SocketListener_test.cpp @@ -87,8 +87,11 @@ INSTANTIATE_TEST_SUITE_P(SocketParseMessageTest, SocketParseMessageTest, testing SocketParseMessageTest::ToString); TEST_P(SocketParseMessageTest, TestProcessMessage) { + StatsdStats::getInstance().reset(); + generateAtomLogging(mEventQueue, mLogEventFilter, kEventCount, kAtomId); + int64_t lastEventTs = 0; // check content of the queue EXPECT_EQ(kEventCount, mEventQueue->mQueue.size()); for (int i = 0; i < kEventCount; i++) { @@ -96,7 +99,11 @@ TEST_P(SocketParseMessageTest, TestProcessMessage) { EXPECT_TRUE(logEvent->isValid()); EXPECT_EQ(kAtomId + i, logEvent->GetTagId()); EXPECT_EQ(logEvent->isParsedHeaderOnly(), GetParam()); + lastEventTs = logEvent->GetElapsedTimestampNs(); } + + EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObserved, kEventCount); + EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos, lastEventTs); } TEST_P(SocketParseMessageTest, TestProcessMessageEmptySetExplicitSet) { diff --git a/statsd/tests/guardrail/StatsdStats_test.cpp b/statsd/tests/guardrail/StatsdStats_test.cpp index 6e61b620..1d56caaf 100644 --- a/statsd/tests/guardrail/StatsdStats_test.cpp +++ b/statsd/tests/guardrail/StatsdStats_test.cpp @@ -750,6 +750,16 @@ TEST(StatsdStatsTest, TestAtomDroppedStats) { EXPECT_FALSE(nonPlatformPushedAtomStats.has_skip_count()); } +TEST(StatsdStatsTest, TestQueueStats) { + StatsdStats stats; + + stats.noteEventQueueSize(100, 1000); + StatsdStatsReport report = getStatsdStatsReport(stats, /* reset stats */ true); + + ASSERT_EQ(100, report.event_queue_stats().max_size_observed()); + ASSERT_EQ(1000, report.event_queue_stats().max_size_observed_elapsed_nanos()); +} + TEST(StatsdStatsTest, TestAtomLoggedAndDroppedStats) { StatsdStats stats; diff --git a/statsd/tests/log_event/LogEventQueue_test.cpp b/statsd/tests/log_event/LogEventQueue_test.cpp index a15f95be..05e4f93e 100644 --- a/statsd/tests/log_event/LogEventQueue_test.cpp +++ b/statsd/tests/log_event/LogEventQueue_test.cpp @@ -20,6 +20,7 @@ #include <thread> +#include "socket/StatsSocketListener.h" #include "stats_event.h" #include "tests/statsd_test_util.h" @@ -34,37 +35,44 @@ using std::unique_ptr; namespace { -std::unique_ptr<LogEvent> makeLogEvent(uint64_t timestampNs) { +AStatsEvent* makeStatsEvent(uint64_t timestampNs) { AStatsEvent* statsEvent = AStatsEvent_obtain(); AStatsEvent_setAtomId(statsEvent, 10); AStatsEvent_overwriteTimestamp(statsEvent, timestampNs); + AStatsEvent_build(statsEvent); + return statsEvent; +} +std::unique_ptr<LogEvent> makeLogEvent(uint64_t timestampNs) { + AStatsEvent* statsEvent = makeStatsEvent(timestampNs); std::unique_ptr<LogEvent> logEvent = std::make_unique<LogEvent>(/*uid=*/0, /*pid=*/0); parseStatsEventToLogEvent(statsEvent, logEvent.get()); + EXPECT_EQ(logEvent->GetElapsedTimestampNs(), timestampNs); return logEvent; } -} // anonymous namespace +} // anonymous namespace #ifdef __ANDROID__ TEST(LogEventQueue_test, TestGoodConsumer) { LogEventQueue queue(50); - int64_t timeBaseNs = 100; - std::thread writer([&queue, timeBaseNs] { + int64_t eventTimeNs = 100; + std::thread writer([&queue, eventTimeNs] { + int64_t oldestEventNs = 0; + int32_t newSize = 0; for (int i = 0; i < 100; i++) { - int64_t oldestEventNs; - bool success = queue.push(makeLogEvent(timeBaseNs + i * 1000), &oldestEventNs); + bool success = queue.push(makeLogEvent(eventTimeNs + i * 1000), oldestEventNs, newSize); EXPECT_TRUE(success); std::this_thread::sleep_for(std::chrono::milliseconds(1)); } }); - std::thread reader([&queue, timeBaseNs] { + std::thread reader([&queue, eventTimeNs] { for (int i = 0; i < 100; i++) { auto event = queue.waitPop(); EXPECT_TRUE(event != nullptr); // All events are in right order. - EXPECT_EQ(timeBaseNs + i * 1000, event->GetElapsedTimestampNs()); + EXPECT_EQ(eventTimeNs + i * 1000, event->GetElapsedTimestampNs()); } }); @@ -74,13 +82,16 @@ TEST(LogEventQueue_test, TestGoodConsumer) { TEST(LogEventQueue_test, TestSlowConsumer) { LogEventQueue queue(50); - int64_t timeBaseNs = 100; - std::thread writer([&queue, timeBaseNs] { + int64_t eventTimeNs = 100; + std::thread writer([&queue, eventTimeNs] { int failure_count = 0; - int64_t oldestEventNs; + int64_t oldestEventNs = 0; + int32_t newSize = 0; for (int i = 0; i < 100; i++) { - bool success = queue.push(makeLogEvent(timeBaseNs + i * 1000), &oldestEventNs); - if (!success) failure_count++; + bool success = queue.push(makeLogEvent(eventTimeNs + i * 1000), oldestEventNs, newSize); + if (!success) { + failure_count++; + } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } @@ -92,13 +103,13 @@ TEST(LogEventQueue_test, TestSlowConsumer) { EXPECT_TRUE(oldestEventNs <= (100 + 5 * 1000)); }); - std::thread reader([&queue, timeBaseNs] { + std::thread reader([&queue, eventTimeNs] { // The consumer quickly processed 5 events, then it got stuck (not reading anymore). for (int i = 0; i < 5; i++) { auto event = queue.waitPop(); EXPECT_TRUE(event != nullptr); // All events are in right order. - EXPECT_EQ(timeBaseNs + i * 1000, event->GetElapsedTimestampNs()); + EXPECT_EQ(eventTimeNs + i * 1000, event->GetElapsedTimestampNs()); } }); @@ -106,6 +117,87 @@ TEST(LogEventQueue_test, TestSlowConsumer) { writer.join(); } +TEST(LogEventQueue_test, TestQueueMaxSize) { + StatsdStats::getInstance().reset(); + + std::shared_ptr<LogEventQueue> queue(std::make_shared<LogEventQueue>(50)); + std::shared_ptr<LogEventFilter> filter(std::make_shared<LogEventFilter>()); + filter->setFilteringEnabled(false); + + int64_t eventTimeNs = 100; + int64_t oldestEventNs = 0; + int32_t newSize = 0; + for (int i = 0; i < 30; i++, eventTimeNs++) { + auto statsEvent = makeStatsEvent(eventTimeNs); + size_t bufferSize; + const uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &bufferSize); + StatsSocketListener::processMessage(buffer, bufferSize, 0, 0, queue, filter); + AStatsEvent_release(statsEvent); + EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObserved, i + 1); + EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos, eventTimeNs); + } + + const int32_t lastMaxSizeObserved = StatsdStats::getInstance().mEventQueueMaxSizeObserved; + const int64_t lastMaxSizeElapsedNanos = + StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos; + + // consumer reads the entire queue + int64_t nextEventTs = 100; + for (int i = 0; i < 30; i++, nextEventTs++) { + auto event = queue->waitPop(); + EXPECT_TRUE(event != nullptr); + // All events are in right order. + EXPECT_EQ(nextEventTs, event->GetElapsedTimestampNs()); + } + + // the expectation after queue drained entirely the max count & ts do not update for + // smaller values + { + auto statsEvent = makeStatsEvent(eventTimeNs); + size_t bufferSize; + const uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &bufferSize); + StatsSocketListener::processMessage(buffer, bufferSize, 0, 0, queue, filter); + AStatsEvent_release(statsEvent); + EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObserved, lastMaxSizeObserved); + EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos, + lastMaxSizeElapsedNanos); + eventTimeNs++; + } + + for (int i = 0; i < 1; i++, nextEventTs++) { + auto event = queue->waitPop(); + EXPECT_TRUE(event != nullptr); + // All events are in right order. + EXPECT_EQ(nextEventTs, event->GetElapsedTimestampNs()); + } + + // the expectation after queue drained entirely the max count & ts do update for + // bigger values + // fill up to the the previous max values observed - stats are not changed + for (int i = 0; i < lastMaxSizeObserved; i++, eventTimeNs++) { + auto statsEvent = makeStatsEvent(eventTimeNs); + size_t bufferSize; + const uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &bufferSize); + StatsSocketListener::processMessage(buffer, bufferSize, 0, 0, queue, filter); + AStatsEvent_release(statsEvent); + EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObserved, lastMaxSizeObserved); + EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos, + lastMaxSizeElapsedNanos); + } + + // add extra elements to update the stats + for (int i = 0; i < 10; i++, eventTimeNs++) { + auto statsEvent = makeStatsEvent(eventTimeNs); + size_t bufferSize; + const uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &bufferSize); + StatsSocketListener::processMessage(buffer, bufferSize, 0, 0, queue, filter); + AStatsEvent_release(statsEvent); + EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObserved, + lastMaxSizeObserved + i + 1); + EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos, eventTimeNs); + } +} + #else GTEST_LOG_(INFO) << "This test does nothing.\n"; #endif |