summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVova Sharaienko <sharaienko@google.com>2024-01-05 01:34:37 +0000
committerGerrit Code Review <noreply-gerritcodereview@google.com>2024-01-05 01:34:37 +0000
commit4023b61c71826864cf08b9d3840b959c9632d147 (patch)
tree7eb67a4539dae70733394bca72732d8ec44fb3de
parente0e198346020ad949bfb92f4bd4524bc033fec7b (diff)
parentf0e84ad0b0c1e8bf3e0e6aa8414762178567b541 (diff)
downloadStatsD-4023b61c71826864cf08b9d3840b959c9632d147.tar.gz
Merge "[statsd] Added tracking event queue max size seen so far" into main
-rw-r--r--statsd/src/StatsService.cpp3
-rw-r--r--statsd/src/guardrail/StatsdStats.cpp24
-rw-r--r--statsd/src/guardrail/StatsdStats.h57
-rw-r--r--statsd/src/logd/LogEventQueue.cpp5
-rw-r--r--statsd/src/logd/LogEventQueue.h2
-rw-r--r--statsd/src/socket/StatsSocketListener.cpp8
-rw-r--r--statsd/src/socket/StatsSocketListener.h1
-rw-r--r--statsd/src/stats_log.proto7
-rw-r--r--statsd/tests/SocketListener_test.cpp7
-rw-r--r--statsd/tests/guardrail/StatsdStats_test.cpp10
-rw-r--r--statsd/tests/log_event/LogEventQueue_test.cpp122
11 files changed, 202 insertions, 44 deletions
diff --git a/statsd/src/StatsService.cpp b/statsd/src/StatsService.cpp
index 8b2c218c..86159a72 100644
--- a/statsd/src/StatsService.cpp
+++ b/statsd/src/StatsService.cpp
@@ -1515,8 +1515,9 @@ void StatsService::stopReadingLogs() {
// Push this event so that readLogs will process and break out of the loop
// after the stop is requested.
int64_t timeStamp;
+ int32_t newSize;
std::unique_ptr<LogEvent> logEvent = std::make_unique<LogEvent>(/*uid=*/0, /*pid=*/0);
- mEventQueue->push(std::move(logEvent), &timeStamp);
+ mEventQueue->push(std::move(logEvent), timeStamp, newSize);
}
} // namespace statsd
diff --git a/statsd/src/guardrail/StatsdStats.cpp b/statsd/src/guardrail/StatsdStats.cpp
index 939d39fa..d8220c6a 100644
--- a/statsd/src/guardrail/StatsdStats.cpp
+++ b/statsd/src/guardrail/StatsdStats.cpp
@@ -62,6 +62,7 @@ const int FIELD_ID_SHARD_OFFSET = 21;
const int FIELD_ID_STATSD_STATS_ID = 22;
const int FIELD_ID_SUBSCRIPTION_STATS = 23;
const int FIELD_ID_SOCKET_LOSS_STATS = 24;
+const int FIELD_ID_QUEUE_STATS = 25;
const int FIELD_ID_RESTRICTED_METRIC_QUERY_STATS_CALLING_UID = 1;
const int FIELD_ID_RESTRICTED_METRIC_QUERY_STATS_CONFIG_ID = 2;
@@ -93,6 +94,9 @@ const int FIELD_ID_OVERFLOW_COUNT = 1;
const int FIELD_ID_OVERFLOW_MAX_HISTORY = 2;
const int FIELD_ID_OVERFLOW_MIN_HISTORY = 3;
+const int FIELD_ID_QUEUE_MAX_SIZE_OBSERVED = 1;
+const int FIELD_ID_QUEUE_MAX_SIZE_OBSERVED_ELAPSED_NANOS = 2;
+
const int FIELD_ID_CONFIG_STATS_UID = 1;
const int FIELD_ID_CONFIG_STATS_ID = 2;
const int FIELD_ID_CONFIG_STATS_CREATION = 3;
@@ -361,6 +365,15 @@ void StatsdStats::noteEventQueueOverflow(int64_t oldestEventTimestampNs, int32_t
noteAtomDroppedLocked(atomId);
}
+void StatsdStats::noteEventQueueSize(int32_t size, int64_t eventTimestampNs) {
+ lock_guard<std::mutex> lock(mLock);
+
+ if (mEventQueueMaxSizeObserved < size) {
+ mEventQueueMaxSizeObserved = size;
+ mEventQueueMaxSizeObservedElapsedNanos = eventTimestampNs;
+ }
+}
+
void StatsdStats::noteAtomDroppedLocked(int32_t atomId) {
constexpr int kMaxPushedAtomDroppedStatsSize = kMaxPushedAtomId + kMaxNonPlatformPushedAtoms;
if (mPushedAtomDropsStats.size() < kMaxPushedAtomDroppedStatsSize ||
@@ -1029,6 +1042,8 @@ void StatsdStats::resetInternalLocked() {
mOverflowCount = 0;
mMinQueueHistoryNs = kInt64Max;
mMaxQueueHistoryNs = 0;
+ mEventQueueMaxSizeObserved = 0;
+ mEventQueueMaxSizeObservedElapsedNanos = 0;
for (auto& config : mConfigStats) {
config.second->broadcast_sent_time_sec.clear();
config.second->activation_time_sec.clear();
@@ -1428,6 +1443,8 @@ void StatsdStats::dumpStats(int out) const {
dprintf(out, "********EventQueueOverflow stats***********\n");
dprintf(out, "Event queue overflow: %d; MaxHistoryNs: %lld; MinHistoryNs: %lld\n",
mOverflowCount, (long long)mMaxQueueHistoryNs, (long long)mMinQueueHistoryNs);
+ dprintf(out, "Event queue max size: %d; Observed at : %lld\n", mEventQueueMaxSizeObserved,
+ (long long)mEventQueueMaxSizeObservedElapsedNanos);
if (mActivationBroadcastGuardrailStats.size() > 0) {
dprintf(out, "********mActivationBroadcastGuardrail stats***********\n");
@@ -1783,6 +1800,13 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) {
proto.end(token);
}
+ uint64_t queueStatsToken = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_QUEUE_STATS);
+ proto.write(FIELD_TYPE_INT32 | FIELD_ID_QUEUE_MAX_SIZE_OBSERVED,
+ (int32_t)mEventQueueMaxSizeObserved);
+ proto.write(FIELD_TYPE_INT64 | FIELD_ID_QUEUE_MAX_SIZE_OBSERVED_ELAPSED_NANOS,
+ (long long)mEventQueueMaxSizeObservedElapsedNanos);
+ proto.end(queueStatsToken);
+
for (const auto& restart : mSystemServerRestartSec) {
proto.write(FIELD_TYPE_INT32 | FIELD_ID_SYSTEM_SERVER_RESTART | FIELD_COUNT_REPEATED,
restart);
diff --git a/statsd/src/guardrail/StatsdStats.h b/statsd/src/guardrail/StatsdStats.h
index c7ca8d0c..eadb353c 100644
--- a/statsd/src/guardrail/StatsdStats.h
+++ b/statsd/src/guardrail/StatsdStats.h
@@ -614,6 +614,9 @@ public:
* in the queue */
void noteEventQueueOverflow(int64_t oldestEventTimestampNs, int32_t atomId, bool isSkipped);
+ /* Notes queue max size seen so far and associated timestamp */
+ void noteEventQueueSize(int32_t size, int64_t eventTimestampNs);
+
/**
* Reports that the activation broadcast guardrail was hit for this uid. Namely, the broadcast
* should have been sent, but instead was skipped due to hitting the guardrail.
@@ -922,6 +925,12 @@ private:
// Total number of events that are lost due to queue overflow.
int32_t mOverflowCount = 0;
+ // Max number of events stored into the queue seen so far.
+ int32_t mEventQueueMaxSizeObserved = 0;
+
+ // Event timestamp for associated max size hit.
+ int64_t mEventQueueMaxSizeObservedElapsedNanos = 0;
+
// Timestamps when we detect log loss, and the number of logs lost.
std::list<LogLossStats> mLogLossStats;
@@ -1009,40 +1018,42 @@ private:
*/
StatsdStats::AtomMetricStats& getAtomMetricStats(int64_t metricId);
- FRIEND_TEST(StatsdStatsTest, TestValidConfigAdd);
+ FRIEND_TEST(LogEventQueue_test, TestQueueMaxSize);
+ FRIEND_TEST(SocketParseMessageTest, TestProcessMessage);
+ FRIEND_TEST(StatsLogProcessorTest, InvalidConfigRemoved);
+ FRIEND_TEST(StatsdStatsTest, TestActivationBroadcastGuardrailHit);
+ FRIEND_TEST(StatsdStatsTest, TestAnomalyMonitor);
+ FRIEND_TEST(StatsdStatsTest, TestAtomDroppedStats);
+ FRIEND_TEST(StatsdStatsTest, TestAtomErrorStats);
+ FRIEND_TEST(StatsdStatsTest, TestAtomLog);
+ FRIEND_TEST(StatsdStatsTest, TestAtomLoggedAndDroppedAndSkippedStats);
+ FRIEND_TEST(StatsdStatsTest, TestAtomLoggedAndDroppedStats);
+ FRIEND_TEST(StatsdStatsTest, TestAtomMetricsStats);
+ FRIEND_TEST(StatsdStatsTest, TestAtomSkippedStats);
+ FRIEND_TEST(StatsdStatsTest, TestConfigRemove);
+ FRIEND_TEST(StatsdStatsTest, TestHasHitDimensionGuardrail);
FRIEND_TEST(StatsdStatsTest, TestInvalidConfigAdd);
FRIEND_TEST(StatsdStatsTest, TestInvalidConfigMissingMetricId);
FRIEND_TEST(StatsdStatsTest, TestInvalidConfigOnlyMetricId);
- FRIEND_TEST(StatsdStatsTest, TestConfigRemove);
- FRIEND_TEST(StatsdStatsTest, TestSubStats);
- FRIEND_TEST(StatsdStatsTest, TestAtomLog);
FRIEND_TEST(StatsdStatsTest, TestNonPlatformAtomLog);
- FRIEND_TEST(StatsdStatsTest, TestTimestampThreshold);
- FRIEND_TEST(StatsdStatsTest, TestAnomalyMonitor);
- FRIEND_TEST(StatsdStatsTest, TestSystemServerCrash);
FRIEND_TEST(StatsdStatsTest, TestPullAtomStats);
- FRIEND_TEST(StatsdStatsTest, TestAtomMetricsStats);
- FRIEND_TEST(StatsdStatsTest, TestActivationBroadcastGuardrailHit);
- FRIEND_TEST(StatsdStatsTest, TestAtomErrorStats);
- FRIEND_TEST(StatsdStatsTest, TestAtomSkippedStats);
- FRIEND_TEST(StatsdStatsTest, TestRestrictedMetricsStats);
+ FRIEND_TEST(StatsdStatsTest, TestQueueStats);
FRIEND_TEST(StatsdStatsTest, TestRestrictedMetricsQueryStats);
- FRIEND_TEST(StatsdStatsTest, TestAtomDroppedStats);
- FRIEND_TEST(StatsdStatsTest, TestAtomLoggedAndDroppedStats);
- FRIEND_TEST(StatsdStatsTest, TestAtomLoggedAndDroppedAndSkippedStats);
+ FRIEND_TEST(StatsdStatsTest, TestRestrictedMetricsStats);
FRIEND_TEST(StatsdStatsTest, TestShardOffsetProvider);
- FRIEND_TEST(StatsdStatsTest, TestHasHitDimensionGuardrail);
- FRIEND_TEST(StatsdStatsTest, TestSubscriptionStarted);
- FRIEND_TEST(StatsdStatsTest, TestSubscriptionFlushed);
- FRIEND_TEST(StatsdStatsTest, TestSubscriptionEnded);
+ FRIEND_TEST(StatsdStatsTest, TestSocketLossStats);
+ FRIEND_TEST(StatsdStatsTest, TestSocketLossStatsOverflowCounter);
+ FRIEND_TEST(StatsdStatsTest, TestSubStats);
FRIEND_TEST(StatsdStatsTest, TestSubscriptionAtomPulled);
+ FRIEND_TEST(StatsdStatsTest, TestSubscriptionEnded);
+ FRIEND_TEST(StatsdStatsTest, TestSubscriptionFlushed);
FRIEND_TEST(StatsdStatsTest, TestSubscriptionPullThreadWakeup);
+ FRIEND_TEST(StatsdStatsTest, TestSubscriptionStarted);
FRIEND_TEST(StatsdStatsTest, TestSubscriptionStartedMaxActiveSubscriptions);
FRIEND_TEST(StatsdStatsTest, TestSubscriptionStartedRemoveFinishedSubscription);
- FRIEND_TEST(StatsdStatsTest, TestSocketLossStats);
- FRIEND_TEST(StatsdStatsTest, TestSocketLossStatsOverflowCounter);
-
- FRIEND_TEST(StatsLogProcessorTest, InvalidConfigRemoved);
+ FRIEND_TEST(StatsdStatsTest, TestSystemServerCrash);
+ FRIEND_TEST(StatsdStatsTest, TestTimestampThreshold);
+ FRIEND_TEST(StatsdStatsTest, TestValidConfigAdd);
};
InvalidConfigReason createInvalidConfigReasonWithMatcher(const InvalidConfigReasonEnum reason,
diff --git a/statsd/src/logd/LogEventQueue.cpp b/statsd/src/logd/LogEventQueue.cpp
index 96b7f015..34843e73 100644
--- a/statsd/src/logd/LogEventQueue.cpp
+++ b/statsd/src/logd/LogEventQueue.cpp
@@ -39,7 +39,7 @@ unique_ptr<LogEvent> LogEventQueue::waitPop() {
return item;
}
-bool LogEventQueue::push(unique_ptr<LogEvent> item, int64_t* oldestTimestampNs) {
+bool LogEventQueue::push(unique_ptr<LogEvent> item, int64_t& oldestTimestampNs, int32_t& newSize) {
bool success;
{
std::unique_lock<std::mutex> lock(mMutex);
@@ -48,9 +48,10 @@ bool LogEventQueue::push(unique_ptr<LogEvent> item, int64_t* oldestTimestampNs)
success = true;
} else {
// safe operation as queue must not be empty.
- *oldestTimestampNs = mQueue.front()->GetElapsedTimestampNs();
+ oldestTimestampNs = mQueue.front()->GetElapsedTimestampNs();
success = false;
}
+ newSize = mQueue.size();
}
mCondition.notify_one();
diff --git a/statsd/src/logd/LogEventQueue.h b/statsd/src/logd/LogEventQueue.h
index 53f2adf2..f7cf850d 100644
--- a/statsd/src/logd/LogEventQueue.h
+++ b/statsd/src/logd/LogEventQueue.h
@@ -45,7 +45,7 @@ public:
* Returns false on failure when the queue is full, and output the oldest event timestamp
* in the queue.
*/
- bool push(std::unique_ptr<LogEvent> event, int64_t* oldestTimestampNs);
+ bool push(std::unique_ptr<LogEvent> event, int64_t& oldestTimestampNs, int32_t& newSize);
private:
const size_t mQueueLimit;
diff --git a/statsd/src/socket/StatsSocketListener.cpp b/statsd/src/socket/StatsSocketListener.cpp
index 3f360333..96c7f1f7 100644
--- a/statsd/src/socket/StatsSocketListener.cpp
+++ b/statsd/src/socket/StatsSocketListener.cpp
@@ -148,6 +148,7 @@ void StatsSocketListener::processMessage(const uint8_t* msg, uint32_t len, uint3
const int32_t atomId = logEvent->GetTagId();
const bool isAtomSkipped = logEvent->isParsedHeaderOnly();
+ const int64_t atomTimestamp = logEvent->GetElapsedTimestampNs();
if (atomId == util::STATS_SOCKET_LOSS_REPORTED) {
if (isAtomSkipped) {
@@ -164,9 +165,12 @@ void StatsSocketListener::processMessage(const uint8_t* msg, uint32_t len, uint3
}
}
- int64_t oldestTimestamp;
- if (!queue->push(std::move(logEvent), &oldestTimestamp)) {
+ int64_t oldestTimestamp = 0;
+ int32_t queueSize = 0;
+ if (!queue->push(std::move(logEvent), oldestTimestamp, queueSize)) {
StatsdStats::getInstance().noteEventQueueOverflow(oldestTimestamp, atomId, isAtomSkipped);
+ } else {
+ StatsdStats::getInstance().noteEventQueueSize(queueSize, atomTimestamp);
}
}
diff --git a/statsd/src/socket/StatsSocketListener.h b/statsd/src/socket/StatsSocketListener.h
index 903eff39..1c56ff3a 100644
--- a/statsd/src/socket/StatsSocketListener.h
+++ b/statsd/src/socket/StatsSocketListener.h
@@ -81,6 +81,7 @@ private:
FRIEND_TEST(SocketParseMessageTest, TestProcessMessageFilterCompleteSet);
FRIEND_TEST(SocketParseMessageTest, TestProcessMessageFilterPartialSet);
FRIEND_TEST(SocketParseMessageTest, TestProcessMessageFilterToggle);
+ FRIEND_TEST(LogEventQueue_test, TestQueueMaxSize);
};
} // namespace statsd
diff --git a/statsd/src/stats_log.proto b/statsd/src/stats_log.proto
index b0955467..fdb031a9 100644
--- a/statsd/src/stats_log.proto
+++ b/statsd/src/stats_log.proto
@@ -682,6 +682,13 @@ message StatsdStatsReport {
}
optional SocketLossStats socket_loss_stats = 24;
+
+ message EventQueueStats {
+ optional int32 max_size_observed = 1;
+ optional int64 max_size_observed_elapsed_nanos = 2;
+ }
+
+ optional EventQueueStats event_queue_stats = 25;
}
message AlertTriggerDetails {
diff --git a/statsd/tests/SocketListener_test.cpp b/statsd/tests/SocketListener_test.cpp
index 90e5d3e1..3636508f 100644
--- a/statsd/tests/SocketListener_test.cpp
+++ b/statsd/tests/SocketListener_test.cpp
@@ -87,8 +87,11 @@ INSTANTIATE_TEST_SUITE_P(SocketParseMessageTest, SocketParseMessageTest, testing
SocketParseMessageTest::ToString);
TEST_P(SocketParseMessageTest, TestProcessMessage) {
+ StatsdStats::getInstance().reset();
+
generateAtomLogging(mEventQueue, mLogEventFilter, kEventCount, kAtomId);
+ int64_t lastEventTs = 0;
// check content of the queue
EXPECT_EQ(kEventCount, mEventQueue->mQueue.size());
for (int i = 0; i < kEventCount; i++) {
@@ -96,7 +99,11 @@ TEST_P(SocketParseMessageTest, TestProcessMessage) {
EXPECT_TRUE(logEvent->isValid());
EXPECT_EQ(kAtomId + i, logEvent->GetTagId());
EXPECT_EQ(logEvent->isParsedHeaderOnly(), GetParam());
+ lastEventTs = logEvent->GetElapsedTimestampNs();
}
+
+ EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObserved, kEventCount);
+ EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos, lastEventTs);
}
TEST_P(SocketParseMessageTest, TestProcessMessageEmptySetExplicitSet) {
diff --git a/statsd/tests/guardrail/StatsdStats_test.cpp b/statsd/tests/guardrail/StatsdStats_test.cpp
index 6e61b620..1d56caaf 100644
--- a/statsd/tests/guardrail/StatsdStats_test.cpp
+++ b/statsd/tests/guardrail/StatsdStats_test.cpp
@@ -750,6 +750,16 @@ TEST(StatsdStatsTest, TestAtomDroppedStats) {
EXPECT_FALSE(nonPlatformPushedAtomStats.has_skip_count());
}
+TEST(StatsdStatsTest, TestQueueStats) {
+ StatsdStats stats;
+
+ stats.noteEventQueueSize(100, 1000);
+ StatsdStatsReport report = getStatsdStatsReport(stats, /* reset stats */ true);
+
+ ASSERT_EQ(100, report.event_queue_stats().max_size_observed());
+ ASSERT_EQ(1000, report.event_queue_stats().max_size_observed_elapsed_nanos());
+}
+
TEST(StatsdStatsTest, TestAtomLoggedAndDroppedStats) {
StatsdStats stats;
diff --git a/statsd/tests/log_event/LogEventQueue_test.cpp b/statsd/tests/log_event/LogEventQueue_test.cpp
index a15f95be..05e4f93e 100644
--- a/statsd/tests/log_event/LogEventQueue_test.cpp
+++ b/statsd/tests/log_event/LogEventQueue_test.cpp
@@ -20,6 +20,7 @@
#include <thread>
+#include "socket/StatsSocketListener.h"
#include "stats_event.h"
#include "tests/statsd_test_util.h"
@@ -34,37 +35,44 @@ using std::unique_ptr;
namespace {
-std::unique_ptr<LogEvent> makeLogEvent(uint64_t timestampNs) {
+AStatsEvent* makeStatsEvent(uint64_t timestampNs) {
AStatsEvent* statsEvent = AStatsEvent_obtain();
AStatsEvent_setAtomId(statsEvent, 10);
AStatsEvent_overwriteTimestamp(statsEvent, timestampNs);
+ AStatsEvent_build(statsEvent);
+ return statsEvent;
+}
+std::unique_ptr<LogEvent> makeLogEvent(uint64_t timestampNs) {
+ AStatsEvent* statsEvent = makeStatsEvent(timestampNs);
std::unique_ptr<LogEvent> logEvent = std::make_unique<LogEvent>(/*uid=*/0, /*pid=*/0);
parseStatsEventToLogEvent(statsEvent, logEvent.get());
+ EXPECT_EQ(logEvent->GetElapsedTimestampNs(), timestampNs);
return logEvent;
}
-} // anonymous namespace
+} // anonymous namespace
#ifdef __ANDROID__
TEST(LogEventQueue_test, TestGoodConsumer) {
LogEventQueue queue(50);
- int64_t timeBaseNs = 100;
- std::thread writer([&queue, timeBaseNs] {
+ int64_t eventTimeNs = 100;
+ std::thread writer([&queue, eventTimeNs] {
+ int64_t oldestEventNs = 0;
+ int32_t newSize = 0;
for (int i = 0; i < 100; i++) {
- int64_t oldestEventNs;
- bool success = queue.push(makeLogEvent(timeBaseNs + i * 1000), &oldestEventNs);
+ bool success = queue.push(makeLogEvent(eventTimeNs + i * 1000), oldestEventNs, newSize);
EXPECT_TRUE(success);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
});
- std::thread reader([&queue, timeBaseNs] {
+ std::thread reader([&queue, eventTimeNs] {
for (int i = 0; i < 100; i++) {
auto event = queue.waitPop();
EXPECT_TRUE(event != nullptr);
// All events are in right order.
- EXPECT_EQ(timeBaseNs + i * 1000, event->GetElapsedTimestampNs());
+ EXPECT_EQ(eventTimeNs + i * 1000, event->GetElapsedTimestampNs());
}
});
@@ -74,13 +82,16 @@ TEST(LogEventQueue_test, TestGoodConsumer) {
TEST(LogEventQueue_test, TestSlowConsumer) {
LogEventQueue queue(50);
- int64_t timeBaseNs = 100;
- std::thread writer([&queue, timeBaseNs] {
+ int64_t eventTimeNs = 100;
+ std::thread writer([&queue, eventTimeNs] {
int failure_count = 0;
- int64_t oldestEventNs;
+ int64_t oldestEventNs = 0;
+ int32_t newSize = 0;
for (int i = 0; i < 100; i++) {
- bool success = queue.push(makeLogEvent(timeBaseNs + i * 1000), &oldestEventNs);
- if (!success) failure_count++;
+ bool success = queue.push(makeLogEvent(eventTimeNs + i * 1000), oldestEventNs, newSize);
+ if (!success) {
+ failure_count++;
+ }
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
@@ -92,13 +103,13 @@ TEST(LogEventQueue_test, TestSlowConsumer) {
EXPECT_TRUE(oldestEventNs <= (100 + 5 * 1000));
});
- std::thread reader([&queue, timeBaseNs] {
+ std::thread reader([&queue, eventTimeNs] {
// The consumer quickly processed 5 events, then it got stuck (not reading anymore).
for (int i = 0; i < 5; i++) {
auto event = queue.waitPop();
EXPECT_TRUE(event != nullptr);
// All events are in right order.
- EXPECT_EQ(timeBaseNs + i * 1000, event->GetElapsedTimestampNs());
+ EXPECT_EQ(eventTimeNs + i * 1000, event->GetElapsedTimestampNs());
}
});
@@ -106,6 +117,87 @@ TEST(LogEventQueue_test, TestSlowConsumer) {
writer.join();
}
+TEST(LogEventQueue_test, TestQueueMaxSize) {
+ StatsdStats::getInstance().reset();
+
+ std::shared_ptr<LogEventQueue> queue(std::make_shared<LogEventQueue>(50));
+ std::shared_ptr<LogEventFilter> filter(std::make_shared<LogEventFilter>());
+ filter->setFilteringEnabled(false);
+
+ int64_t eventTimeNs = 100;
+ int64_t oldestEventNs = 0;
+ int32_t newSize = 0;
+ for (int i = 0; i < 30; i++, eventTimeNs++) {
+ auto statsEvent = makeStatsEvent(eventTimeNs);
+ size_t bufferSize;
+ const uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &bufferSize);
+ StatsSocketListener::processMessage(buffer, bufferSize, 0, 0, queue, filter);
+ AStatsEvent_release(statsEvent);
+ EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObserved, i + 1);
+ EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos, eventTimeNs);
+ }
+
+ const int32_t lastMaxSizeObserved = StatsdStats::getInstance().mEventQueueMaxSizeObserved;
+ const int64_t lastMaxSizeElapsedNanos =
+ StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos;
+
+ // consumer reads the entire queue
+ int64_t nextEventTs = 100;
+ for (int i = 0; i < 30; i++, nextEventTs++) {
+ auto event = queue->waitPop();
+ EXPECT_TRUE(event != nullptr);
+ // All events are in right order.
+ EXPECT_EQ(nextEventTs, event->GetElapsedTimestampNs());
+ }
+
+ // the expectation after queue drained entirely the max count & ts do not update for
+ // smaller values
+ {
+ auto statsEvent = makeStatsEvent(eventTimeNs);
+ size_t bufferSize;
+ const uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &bufferSize);
+ StatsSocketListener::processMessage(buffer, bufferSize, 0, 0, queue, filter);
+ AStatsEvent_release(statsEvent);
+ EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObserved, lastMaxSizeObserved);
+ EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos,
+ lastMaxSizeElapsedNanos);
+ eventTimeNs++;
+ }
+
+ for (int i = 0; i < 1; i++, nextEventTs++) {
+ auto event = queue->waitPop();
+ EXPECT_TRUE(event != nullptr);
+ // All events are in right order.
+ EXPECT_EQ(nextEventTs, event->GetElapsedTimestampNs());
+ }
+
+ // the expectation after queue drained entirely the max count & ts do update for
+ // bigger values
+ // fill up to the the previous max values observed - stats are not changed
+ for (int i = 0; i < lastMaxSizeObserved; i++, eventTimeNs++) {
+ auto statsEvent = makeStatsEvent(eventTimeNs);
+ size_t bufferSize;
+ const uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &bufferSize);
+ StatsSocketListener::processMessage(buffer, bufferSize, 0, 0, queue, filter);
+ AStatsEvent_release(statsEvent);
+ EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObserved, lastMaxSizeObserved);
+ EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos,
+ lastMaxSizeElapsedNanos);
+ }
+
+ // add extra elements to update the stats
+ for (int i = 0; i < 10; i++, eventTimeNs++) {
+ auto statsEvent = makeStatsEvent(eventTimeNs);
+ size_t bufferSize;
+ const uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &bufferSize);
+ StatsSocketListener::processMessage(buffer, bufferSize, 0, 0, queue, filter);
+ AStatsEvent_release(statsEvent);
+ EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObserved,
+ lastMaxSizeObserved + i + 1);
+ EXPECT_EQ(StatsdStats::getInstance().mEventQueueMaxSizeObservedElapsedNanos, eventTimeNs);
+ }
+}
+
#else
GTEST_LOG_(INFO) << "This test does nothing.\n";
#endif