summaryrefslogtreecommitdiff
path: root/statsd/src/metrics/ValueMetricProducer.h
blob: 1c974b52f1703eb0e2f4b2f2f8af93c47bf23faa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
/*
 * Copyright (C) 2017 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <gtest/gtest_prod.h>

#include <optional>

#include "FieldValue.h"
#include "HashableDimensionKey.h"
#include "MetricProducer.h"
#include "anomaly/AnomalyTracker.h"
#include "condition/ConditionTimer.h"
#include "condition/ConditionTracker.h"
#include "external/PullDataReceiver.h"
#include "external/StatsPullerManager.h"
#include "matchers/EventMatcherWizard.h"
#include "src/statsd_config.pb.h"
#include "stats_log_util.h"
#include "stats_util.h"

namespace android {
namespace os {
namespace statsd {

template <typename AggregatedValue>
struct PastBucket {
    int64_t mBucketStartNs;
    int64_t mBucketEndNs;
    std::vector<int> aggIndex;
    std::vector<AggregatedValue> aggregates;
    std::vector<int> sampleSizes;

    /**
     * If the metric has no condition, then this field is just wasted.
     * When we tune statsd memory usage in the future, this is a candidate to optimize.
     */
    int64_t mConditionTrueNs;

    /**
     * The semantic is the value which needs to be applied to mConditionTrueNs for correction
     * to be performed prior normalization calculation on the user (read server) side. Applied only
     * to ValueMetrics with pulled atoms.
     */
    int64_t mConditionCorrectionNs;
};

// Aggregates values within buckets.
//
// There are different events that might complete a bucket
// - a condition change
// - an app upgrade
// - an alarm set to the end of the bucket
template <typename AggregatedValue, typename DimExtras>
class ValueMetricProducer : public MetricProducer, public virtual PullDataReceiver {
public:
    struct PullOptions {
        const int pullAtomId;
        const sp<StatsPullerManager>& pullerManager;
    };

    struct BucketOptions {
        const int64_t timeBaseNs;
        const int64_t startTimeNs;
        const int64_t bucketSizeNs;
        const int64_t minBucketSizeNs;
        const optional<int64_t> conditionCorrectionThresholdNs;
        const optional<bool> splitBucketForAppUpgrade;
    };

    struct WhatOptions {
        const bool containsAnyPositionInDimensionsInWhat;
        const bool shouldUseNestedDimensions;
        const int whatMatcherIndex;
        const sp<EventMatcherWizard>& matcherWizard;
        const FieldMatcher& dimensionsInWhat;
        const vector<Matcher>& fieldMatchers;
    };

    struct ConditionOptions {
        const int conditionIndex;
        const ConditionLinks& conditionLinks;
        const vector<ConditionState>& initialConditionCache;
        const sp<ConditionWizard>& conditionWizard;
    };

    struct StateOptions {
        const StateLinks& stateLinks;
        const vector<int>& slicedStateAtoms;
        const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap;
    };

    struct ActivationOptions {
        const std::unordered_map<int, std::shared_ptr<Activation>>& eventActivationMap;
        const std::unordered_map<int, std::vector<std::shared_ptr<Activation>>>&
                eventDeactivationMap;
    };

    struct GuardrailOptions {
        const size_t dimensionSoftLimit;
        const size_t dimensionHardLimit;
    };

    virtual ~ValueMetricProducer();

    // Process data pulled on bucket boundary.
    virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
                              PullResult pullResult, int64_t originalPullTimeNs) override {
    }

    // Determine if metric needs to pull
    virtual bool isPullNeeded() const override {
        return false;
    }

    // ValueMetric needs special logic if it's a pulled atom.
    void onStatsdInitCompleted(const int64_t& eventTimeNs) override;

    void onStateChanged(int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey,
                        const FieldValue& oldState, const FieldValue& newState) override;

protected:
    ValueMetricProducer(const int64_t& metricId, const ConfigKey& key, const uint64_t protoHash,
                        const PullOptions& pullOptions, const BucketOptions& bucketOptions,
                        const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
                        const StateOptions& stateOptions,
                        const ActivationOptions& activationOptions,
                        const GuardrailOptions& guardrailOptions);

    void onMatchedLogEventInternalLocked(
            const size_t matcherIndex, const MetricDimensionKey& eventKey,
            const ConditionKey& conditionKey, bool condition, const LogEvent& event,
            const std::map<int, HashableDimensionKey>& statePrimaryKeys) override;

    // Determine whether or not a LogEvent can be skipped.
    virtual inline bool canSkipLogEventLocked(
            const MetricDimensionKey& eventKey, bool condition, int64_t eventTimeNs,
            const std::map<int, HashableDimensionKey>& statePrimaryKeys) const = 0;

    void notifyAppUpgradeInternalLocked(const int64_t eventTimeNs) override;

    void onDumpReportLocked(const int64_t dumpTimeNs, const bool includeCurrentPartialBucket,
                            const bool eraseData, const DumpLatency dumpLatency,
                            std::set<string>* strSet,
                            android::util::ProtoOutputStream* protoOutput) override;

    struct DumpProtoFields {
        const int metricTypeFieldId;
        const int bucketNumFieldId;
        const int startBucketMsFieldId;
        const int endBucketMsFieldId;
        const int conditionTrueNsFieldId;
        const optional<int> conditionCorrectionNsFieldId;
    };

    virtual DumpProtoFields getDumpProtoFields() const = 0;

    void clearPastBucketsLocked(const int64_t dumpTimeNs) override;

    // ValueMetricProducer internal interface to handle active state change.
    void onActiveStateChangedLocked(const int64_t eventTimeNs, const bool isActive) override;

    virtual void onActiveStateChangedInternalLocked(const int64_t eventTimeNs,
                                                    const bool isActive) {
    }

    // ValueMetricProducer internal interface to handle condition change.
    void onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) override;

    // Only called when mIsActive, the event is NOT too late, and after pulling.
    virtual void onConditionChangedInternalLocked(const ConditionState oldCondition,
                                                  const ConditionState newCondition,
                                                  const int64_t eventTimeNs) {
    }

    // Internal interface to handle sliced condition change.
    void onSlicedConditionMayChangeLocked(bool overallCondition, const int64_t eventTime) override;

    void dumpStatesLocked(FILE* out, bool verbose) const override;

    virtual std::string aggregatedValueToString(const AggregatedValue& aggregate) const = 0;

    // For pulled metrics, this method should only be called if a pull has been done. Else we will
    // not have complete data for the bucket.
    void flushIfNeededLocked(const int64_t& eventTime) override;

    // For pulled metrics, this method should only be called if a pulled has been done. Else we will
    // not have complete data for the bucket.
    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
                                  const int64_t& nextBucketStartTimeNs) override;

    void dropDataLocked(const int64_t dropTimeNs) override;

    // Calculate how many buckets are present between the current bucket and eventTimeNs.
    int64_t calcBucketsForwardCount(const int64_t eventTimeNs) const;

    // Mark the data as invalid.
    virtual void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);

    // Skips the current bucket without notifying StatsdStats of the skipped bucket.
    // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that
    // causes the bucket to be invalidated will not notify StatsdStats.
    void skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);

    optional<InvalidConfigReason> onConfigUpdatedLocked(
            const StatsdConfig& config, const int configIndex, const int metricIndex,
            const std::vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
            const std::unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
            const std::unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
            const sp<EventMatcherWizard>& matcherWizard,
            const std::vector<sp<ConditionTracker>>& allConditionTrackers,
            const std::unordered_map<int64_t, int>& conditionTrackerMap,
            const sp<ConditionWizard>& wizard,
            const std::unordered_map<int64_t, int>& metricToActivationMap,
            std::unordered_map<int, std::vector<int>>& trackerToMetricMap,
            std::unordered_map<int, std::vector<int>>& conditionToMetricMap,
            std::unordered_map<int, std::vector<int>>& activationAtomTrackerToMetricMap,
            std::unordered_map<int, std::vector<int>>& deactivationAtomTrackerToMetricMap,
            std::vector<int>& metricsWithActivation) override;

    virtual optional<int64_t> getConditionIdForMetric(const StatsdConfig& config,
                                                      const int configIndex) const = 0;

    virtual int64_t getWhatAtomMatcherIdForMetric(const StatsdConfig& config,
                                                  const int configIndex) const = 0;

    virtual ConditionLinks getConditionLinksForMetric(const StatsdConfig& config,
                                                      const int configIndex) const = 0;

    int mWhatMatcherIndex;

    sp<EventMatcherWizard> mEventMatcherWizard;

    const sp<StatsPullerManager> mPullerManager;

    // Value fields for matching.
    const std::vector<Matcher> mFieldMatchers;

    // Value fields for matching.
    std::set<HashableDimensionKey> mMatchedMetricDimensionKeys;

    // Holds the atom id, primary key pair from a state change.
    // Only used for pulled metrics.
    // TODO(b/185796114): can be passed as function arguments instead.
    pair<int32_t, HashableDimensionKey> mStateChangePrimaryKey;

    // Atom Id for pulled data. -1 if this is not pulled.
    const int mPullAtomId;

    // Tracks the value information of one value field.
    struct Interval {
        // Index in multi value aggregation.
        int aggIndex;

        // Current aggregation, depending on the aggregation type.
        AggregatedValue aggregate;

        // Number of samples collected.
        int sampleSize = 0;

        inline bool hasValue() const {
            return sampleSize > 0;
        }
    };

    // Internal state of an ongoing aggregation bucket.
    struct CurrentBucket {
        // If the `MetricDimensionKey` state key is the current state key, then
        // the condition timer will be updated later (e.g. condition/state/active
        // state change) with the correct condition and time.
        CurrentBucket() : intervals(), conditionTimer(ConditionTimer(false, 0)) {
        }
        // Value information for each value field of the metric.
        std::vector<Interval> intervals;
        // Tracks how long the condition is true.
        ConditionTimer conditionTimer;
    };

    // Tracks the internal state in the ongoing aggregation bucket for each DimensionsInWhat
    // key and StateValuesKey pair.
    std::unordered_map<MetricDimensionKey, CurrentBucket> mCurrentSlicedBucket;

    // State key and any extra information for a specific DimensionsInWhat key.
    struct DimensionsInWhatInfo {
        DimensionsInWhatInfo(const HashableDimensionKey& stateKey)
            : dimExtras(), currentState(stateKey), hasCurrentState(false) {
        }

        DimExtras dimExtras;

        // Whether new data is seen in the bucket.
        // TODO, this could be per base in the dim extras.
        bool seenNewData = false;

        // Last seen state value(s).
        HashableDimensionKey currentState;
        // Whether this dimensions in what key has a current state key.
        bool hasCurrentState;
    };

    // Tracks current state key and other information for each DimensionsInWhat key.
    std::unordered_map<HashableDimensionKey, DimensionsInWhatInfo> mDimInfos;

    // Save the past buckets and we can clear when the StatsLogReport is dumped.
    std::unordered_map<MetricDimensionKey, std::vector<PastBucket<AggregatedValue>>> mPastBuckets;

    const int64_t mMinBucketSizeNs;

    // Util function to check whether the specified dimension hits the guardrail.
    bool hitGuardRailLocked(const MetricDimensionKey& newKey);

    bool hasReachedGuardRailLimit() const;

    virtual void pullAndMatchEventsLocked(const int64_t timestampNs) {
    }

    virtual bool multipleBucketsSkipped(const int64_t numBucketsForward) const = 0;

    virtual PastBucket<AggregatedValue> buildPartialBucket(int64_t bucketEndTime,
                                                           std::vector<Interval>& intervals) = 0;

    virtual void closeCurrentBucket(const int64_t eventTimeNs, const int64_t nextBucketStartTimeNs);

    virtual void initNextSlicedBucket(int64_t nextBucketStartTimeNs);

    // Updates the condition timers in the current sliced bucket when there is a
    // condition change or an active state change.
    void updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs);

    virtual void writePastBucketAggregateToProto(const int aggIndex,
                                                 const AggregatedValue& aggregate,
                                                 const int sampleSize,
                                                 ProtoOutputStream* const protoOutput) const = 0;

    static const size_t kBucketSize = sizeof(PastBucket<AggregatedValue>{});

    const size_t mDimensionSoftLimit;

    const size_t mDimensionHardLimit;

    // This is to track whether or not the bucket is skipped for any of the reasons listed in
    // BucketDropReason, many of which make the bucket potentially invalid.
    bool mCurrentBucketIsSkipped;

    ConditionTimer mConditionTimer;

    /** Stores condition correction threshold from the ValueMetric configuration */
    optional<int64_t> mConditionCorrectionThresholdNs;

    inline bool isEventLateLocked(const int64_t eventTimeNs) const {
        return eventTimeNs < mCurrentBucketStartTimeNs;
    }

    // Returns true if any of the intervals have seen new data.
    // This should return true unless there is an error parsing the value fields from the event.
    virtual bool aggregateFields(const int64_t eventTimeNs, const MetricDimensionKey& eventKey,
                                 const LogEvent& event, std::vector<Interval>& intervals,
                                 DimExtras& dimExtras) = 0;

    // If this is a pulled metric
    inline bool isPulled() const {
        return mPullAtomId != -1;
    }

private:
};  // ValueMetricProducer

}  // namespace statsd
}  // namespace os
}  // namespace android