From 18aa2793facbb3d7e252ac32e6bb4d09f08a37ac Mon Sep 17 00:00:00 2001 From: Yang Song Date: Wed, 8 Aug 2018 12:34:25 -0700 Subject: Metrics: Produce and store Metrics in Stats impl. (#1338) Support recording `Metric`s in stats impl. This works as the following: - When a `View` is registered, convert that `View` to a `MetricDescriptor` and register it; - When a `Measurement` along with a `TagContext` are recorded, convert them into a data row consisting of `LabelValue`s and `Point`s; - If `getMetrics()` is called, convert the `MetricDescriptor`s and data rows into `Metric`s. Then flush all data rows. This PR only contains internal support in the impl. None of these changes are user-visible yet. --- .../implcore/stats/MeasureToViewMap.java | 13 +- .../io/opencensus/implcore/stats/MetricMap.java | 197 +++++++++++++++++++++ .../io/opencensus/implcore/stats/MetricUtils.java | 185 +++++++++++++++++++ .../implcore/stats/MutableAggregation.java | 4 + .../opencensus/implcore/stats/MutableViewData.java | 30 +++- 5 files changed, 422 insertions(+), 7 deletions(-) create mode 100644 impl_core/src/main/java/io/opencensus/implcore/stats/MetricMap.java create mode 100644 impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java (limited to 'impl_core/src/main/java/io/opencensus/implcore/stats') diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java index 79e913db..0cd5045f 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java @@ -22,6 +22,7 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import io.opencensus.common.Clock; import io.opencensus.common.Timestamp; +import io.opencensus.metrics.MetricDescriptor; import io.opencensus.stats.Measure; import io.opencensus.stats.Measurement; import io.opencensus.stats.StatsCollectionState; @@ -60,6 +61,9 @@ final class MeasureToViewMap { @GuardedBy("this") private final Map registeredMeasures = Maps.newHashMap(); + @GuardedBy("this") + private final MetricMap metricMap = new MetricMap(); + // Cached set of exported views. It must be set to null whenever a view is registered or // unregistered. @javax.annotation.Nullable private volatile Set exportedViews; @@ -115,7 +119,12 @@ final class MeasureToViewMap { if (registeredMeasure == null) { registeredMeasures.put(measure.getName(), measure); } - mutableMap.put(view.getMeasure().getName(), MutableViewData.create(view, clock.now())); + Timestamp now = clock.now(); + mutableMap.put(view.getMeasure().getName(), MutableViewData.create(view, now, metricMap)); + MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view); + if (metricDescriptor != null) { + metricMap.registerMetricDescriptor(metricDescriptor, now); + } } @javax.annotation.Nullable @@ -165,6 +174,7 @@ final class MeasureToViewMap { mutableViewData.clearStats(); } } + metricMap.clearStats(); } // Resume stats collection for all MutableViewData. @@ -174,5 +184,6 @@ final class MeasureToViewMap { mutableViewData.resumeStatsCollection(now); } } + metricMap.resumeStatsCollection(now); } } diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MetricMap.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricMap.java new file mode 100644 index 00000000..5ae2eed5 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricMap.java @@ -0,0 +1,197 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import io.opencensus.common.Timestamp; +import io.opencensus.metrics.LabelValue; +import io.opencensus.metrics.Metric; +import io.opencensus.metrics.MetricDescriptor; +import io.opencensus.metrics.MetricDescriptor.Type; +import io.opencensus.metrics.Point; +import io.opencensus.metrics.TimeSeriesCumulative; +import io.opencensus.metrics.TimeSeriesGauge; +import io.opencensus.metrics.TimeSeriesList; +import io.opencensus.metrics.TimeSeriesList.TimeSeriesCumulativeList; +import io.opencensus.metrics.TimeSeriesList.TimeSeriesGaugeList; +import io.opencensus.tags.TagValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import javax.annotation.concurrent.GuardedBy; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +// A class that stores a mapping from MetricDescriptor to lists of MutableMetricRows. +final class MetricMap { + + @GuardedBy("this") + private final Map map = + new HashMap(); + + // Registers a MetricDescriptor, creates an entry in the map. + // This method should only be called from MeasureToMap.registerView(). + synchronized void registerMetricDescriptor( + MetricDescriptor metricDescriptor, Timestamp timestamp) { + if (map.containsKey(metricDescriptor)) { + return; + } + map.put(metricDescriptor, MutableMetricRows.create(metricDescriptor.getType(), timestamp)); + } + + // Records a data point to this MetricMap. + // This method should only be called from CumulativeMutableViewData.record(). + synchronized void record( + MetricDescriptor metricDescriptor, + List tagValues, + MutableAggregation mutableAggregation, + Timestamp now) { + if (!map.containsKey(metricDescriptor)) { + return; + } + map.get(metricDescriptor) + .record(tagValues, mutableAggregation, now, metricDescriptor.getType()); + } + + synchronized void clearStats() { + for (Entry entry : map.entrySet()) { + entry.getValue().map.clear(); + } + } + + synchronized void resumeStatsCollection(Timestamp now) { + for (Entry entry : map.entrySet()) { + entry.getValue().resumeStatsCollection(now); + } + } + + synchronized List toMetrics() { + List metrics = new ArrayList(); + for (Entry entry : map.entrySet()) { + MutableMetricRows mutableMetricRows = entry.getValue(); + if (mutableMetricRows.map.isEmpty()) { + continue; // Skip MetricDescriptor with no data. + } + metrics.add(Metric.create(entry.getKey(), mutableMetricRows.toTimeSeriesList())); + + // Reset the data map once the rows are exported, so that we don't export duplicated Points. + mutableMetricRows.map.clear(); + } + return metrics; + } + + // A class that stores a mapping from lists of label values to lists of points. + // Each MutableMetricRows correspond to one MetricDescriptor. + // Think of this class as a set of mutable time series. + private abstract static class MutableMetricRows { + + /* + * Each entry in this map is a list of rows, for example: + * [v1, v2] -> [1, 5, 10] + * [v1, v3] -> [-5, -8] + * ... + */ + private final Map, List> map = + new LinkedHashMap, List>(); + + // Create MutableMetricRows based on the given type. + private static MutableMetricRows create(Type type, Timestamp timestamp) { + switch (type) { + case GAUGE_INT64: + case GAUGE_DOUBLE: + return createGauge(); + case CUMULATIVE_DISTRIBUTION: + case CUMULATIVE_DOUBLE: + case CUMULATIVE_INT64: + return createCumulative(timestamp); + } + throw new AssertionError(); + } + + private static MutableMetricRows createCumulative(Timestamp timestamp) { + return new MutableMetricRowsCumulative(timestamp); + } + + private static MutableMetricRows createGauge() { + return new MutableMetricRowsGauge(); + } + + private void record( + List tagValues, + MutableAggregation mutableAggregation, + Timestamp timestamp, + Type type) { + List labelValues = MetricUtils.tagValuesToLabelValues(tagValues); + Point point = MetricUtils.mutableAggregationToPoint(mutableAggregation, timestamp, type); + if (!map.containsKey(labelValues)) { + map.put(labelValues, new ArrayList()); + } + map.get(labelValues).add(point); + } + + abstract TimeSeriesList toTimeSeriesList(); + + abstract void resumeStatsCollection(Timestamp now); + + private static final class MutableMetricRowsCumulative extends MutableMetricRows { + + // Only cumulative time series has a start timestamp. + private Timestamp startTime; + + private MutableMetricRowsCumulative(Timestamp startTime) { + this.startTime = startTime; + } + + @Override + TimeSeriesList toTimeSeriesList() { + List timeSeriesCumulatives = new ArrayList(); + for (Entry, List> entry : super.map.entrySet()) { + timeSeriesCumulatives.add( + TimeSeriesCumulative.create(entry.getKey(), entry.getValue(), startTime)); + } + return TimeSeriesCumulativeList.create(timeSeriesCumulatives); + } + + @Override + void resumeStatsCollection(Timestamp now) { + // Reset start time to current time. + this.startTime = now; + } + } + + private static final class MutableMetricRowsGauge extends MutableMetricRows { + + @Override + TimeSeriesList toTimeSeriesList() { + List timeSeriesGauges = new ArrayList(); + for (Entry, List> entry : super.map.entrySet()) { + timeSeriesGauges.add(TimeSeriesGauge.create(entry.getKey(), entry.getValue())); + } + return TimeSeriesGaugeList.create(timeSeriesGauges); + } + + @Override + void resumeStatsCollection(Timestamp now) { + // Do nothing for Gauge stats. + } + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java new file mode 100644 index 00000000..8e770458 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java @@ -0,0 +1,185 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import com.google.common.annotations.VisibleForTesting; +import io.opencensus.common.Function; +import io.opencensus.common.Functions; +import io.opencensus.common.Timestamp; +import io.opencensus.metrics.Distribution; +import io.opencensus.metrics.LabelKey; +import io.opencensus.metrics.LabelValue; +import io.opencensus.metrics.MetricDescriptor; +import io.opencensus.metrics.MetricDescriptor.Type; +import io.opencensus.metrics.Point; +import io.opencensus.metrics.Value; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.Measure; +import io.opencensus.stats.View; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.util.ArrayList; +import java.util.List; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +@SuppressWarnings("deprecation") +// Utils to convert Stats data models to Metric data models. +final class MetricUtils { + + @javax.annotation.Nullable + static MetricDescriptor viewToMetricDescriptor(View view) { + if (view.getWindow() instanceof View.AggregationWindow.Interval) { + // Only creates Metric for cumulative stats. + return null; + } + List labelKeys = new ArrayList(); + for (TagKey tagKey : view.getColumns()) { + // TODO: add description + labelKeys.add(LabelKey.create(tagKey.getName(), "")); + } + Measure measure = view.getMeasure(); + return MetricDescriptor.create( + view.getName().asString(), + view.getDescription(), + measure.getUnit(), + getType(measure, view.getAggregation()), + labelKeys); + } + + @VisibleForTesting + static Type getType(Measure measure, Aggregation aggregation) { + return aggregation.match( + Functions.returnConstant( + measure.match( + Functions.returnConstant(Type.CUMULATIVE_DOUBLE), // Sum Double + Functions.returnConstant(Type.CUMULATIVE_INT64), // Sum Int64 + Functions.throwAssertionError())), + Functions.returnConstant(Type.CUMULATIVE_INT64), // Count + Functions.returnConstant(Type.CUMULATIVE_DISTRIBUTION), // Distribution + Functions.returnConstant( + measure.match( + Functions.returnConstant(Type.GAUGE_DOUBLE), // LastValue Double + Functions.returnConstant(Type.GAUGE_INT64), // LastValue Long + Functions.throwAssertionError())), + new Function() { + @Override + public Type apply(Aggregation arg) { + if (arg instanceof Aggregation.Mean) { + return Type.CUMULATIVE_DOUBLE; // Mean + } + throw new AssertionError(); + } + }); + } + + static List tagValuesToLabelValues(List tagValues) { + List labelValues = new ArrayList(); + for (/*@Nullable*/ TagValue tagValue : tagValues) { + labelValues.add(LabelValue.create(tagValue == null ? null : tagValue.asString())); + } + return labelValues; + } + + static Point mutableAggregationToPoint( + MutableAggregation mutableAggregation, final Timestamp timestamp, final Type type) { + return mutableAggregation.match( + new Function() { + @Override + public Point apply(MutableAggregation.MutableSum arg) { + return Point.create(toDoubleOrInt64Value(arg.getSum(), type), timestamp); + } + }, + new Function() { + @Override + public Point apply(MutableAggregation.MutableCount arg) { + return Point.create(Value.longValue(arg.getCount()), timestamp); + } + }, + new Function() { + @Override + public Point apply(MutableAggregation.MutableMean arg) { + return Point.create(toDoubleOrInt64Value(arg.getMean(), type), timestamp); + } + }, + new Function() { + @Override + public Point apply(MutableAggregation.MutableDistribution arg) { + return Point.create(toDistributionValue(arg), timestamp); + } + }, + new Function() { + @Override + public Point apply(MutableAggregation.MutableLastValue arg) { + return Point.create(toDoubleOrInt64Value(arg.getLastValue(), type), timestamp); + } + }); + } + + private static Value toDoubleOrInt64Value(double valueInDouble, Type type) { + switch (type) { + case CUMULATIVE_INT64: + case GAUGE_INT64: + return Value.longValue(Math.round(valueInDouble)); + case CUMULATIVE_DOUBLE: + case GAUGE_DOUBLE: + return Value.doubleValue(valueInDouble); + case CUMULATIVE_DISTRIBUTION: + throw new AssertionError(); + } + throw new AssertionError(); + } + + private static Value toDistributionValue(MutableAggregation.MutableDistribution distribution) { + List buckets = new ArrayList(); + @javax.annotation.Nullable + AggregationData.DistributionData.Exemplar[] exemplars = distribution.getExemplars(); + for (int bucket = 0; bucket < distribution.getBucketCounts().length; bucket++) { + long bucketCount = distribution.getBucketCounts()[bucket]; + @javax.annotation.Nullable AggregationData.DistributionData.Exemplar exemplar = null; + if (exemplars != null) { + exemplar = exemplars[bucket]; + } + + Distribution.Bucket metricBucket; + if (exemplar != null) { + // Bucket with an Exemplar. + metricBucket = + Distribution.Bucket.create( + bucketCount, + Distribution.Exemplar.create( + exemplar.getValue(), exemplar.getTimestamp(), exemplar.getAttachments())); + } else { + // Bucket with no Exemplar. + metricBucket = Distribution.Bucket.create(bucketCount); + } + buckets.add(metricBucket); + } + return Value.distributionValue( + Distribution.create( + distribution.getMean(), + distribution.getCount(), + distribution.getSumOfSquaredDeviations(), + distribution.getBucketBoundaries().getBoundaries(), + buckets)); + } + + private MetricUtils() {} +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java index ddfce618..770ec4b2 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java @@ -387,6 +387,10 @@ abstract class MutableAggregation { return bucketCounts; } + BucketBoundaries getBucketBoundaries() { + return bucketBoundaries; + } + @javax.annotation.Nullable Exemplar[] getExemplars() { return exemplars; diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java index 876415a3..0664681c 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java @@ -31,6 +31,7 @@ import io.opencensus.common.Function; import io.opencensus.common.Functions; import io.opencensus.common.Timestamp; import io.opencensus.implcore.internal.CheckerFrameworkUtils; +import io.opencensus.metrics.MetricDescriptor; import io.opencensus.stats.Aggregation; import io.opencensus.stats.AggregationData; import io.opencensus.stats.StatsCollectionState; @@ -65,12 +66,13 @@ abstract class MutableViewData { * * @param view the {@code View} linked with this {@code MutableViewData}. * @param start the start {@code Timestamp}. + * @param metricMap a reference to {@code MetricMap}. * @return a {@code MutableViewData}. */ - static MutableViewData create(final View view, final Timestamp start) { + static MutableViewData create(final View view, final Timestamp start, final MetricMap metricMap) { return view.getWindow() .match( - new CreateCumulative(view, start), + new CreateCumulative(view, start, metricMap), new CreateInterval(view, start), Functions.throwAssertionError()); } @@ -99,10 +101,22 @@ abstract class MutableViewData { private Timestamp start; private final Map, MutableAggregation> tagValueAggregationMap = Maps.newHashMap(); + private final MetricMap metricMap; - private CumulativeMutableViewData(View view, Timestamp start) { + // Cache a MetricDescriptor to avoid converting View to MetricDescriptor in the future. + private final MetricDescriptor metricDescriptor; + + private CumulativeMutableViewData(View view, Timestamp start, MetricMap metricMap) { super(view); this.start = start; + this.metricMap = metricMap; + MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view); + if (metricDescriptor == null) { + throw new AssertionError( + "Cumulative view should be converted to a non-null MetricDescriptor."); + } else { + this.metricDescriptor = metricDescriptor; + } } @Override @@ -114,7 +128,9 @@ abstract class MutableViewData { tagValueAggregationMap.put( tagValues, createMutableAggregation(super.view.getAggregation())); } - tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp); + MutableAggregation mutableAggregation = tagValueAggregationMap.get(tagValues); + mutableAggregation.add(value, attachments, timestamp); + metricMap.record(metricDescriptor, tagValues, mutableAggregation, timestamp); } @Override @@ -385,15 +401,17 @@ abstract class MutableViewData { implements Function { @Override public MutableViewData apply(View.AggregationWindow.Cumulative arg) { - return new CumulativeMutableViewData(view, start); + return new CumulativeMutableViewData(view, start, metricMap); } private final View view; private final Timestamp start; + private final MetricMap metricMap; - private CreateCumulative(View view, Timestamp start) { + private CreateCumulative(View view, Timestamp start, MetricMap metricMap) { this.view = view; this.start = start; + this.metricMap = metricMap; } } -- cgit v1.2.3