diff options
Diffstat (limited to 'impl_core/src/main/java/io/opencensus/implcore/stats')
13 files changed, 2198 insertions, 0 deletions
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java b/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java new file mode 100644 index 00000000..172db539 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java @@ -0,0 +1,95 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Maps; +import io.opencensus.common.Duration; +import io.opencensus.common.Timestamp; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.Measure; +import io.opencensus.tags.TagValue; +import java.util.List; +import java.util.Map; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +/** The bucket with aggregated {@code MeasureValue}s used for {@code IntervalViewData}. */ +final class IntervalBucket { + + private static final Duration ZERO = Duration.create(0, 0); + + private final Timestamp start; + private final Duration duration; + private final Aggregation aggregation; + private final Measure measure; + private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap = + Maps.newHashMap(); + + IntervalBucket(Timestamp start, Duration duration, Aggregation aggregation, Measure measure) { + this.start = checkNotNull(start, "Start"); + this.duration = checkNotNull(duration, "Duration"); + checkArgument(duration.compareTo(ZERO) > 0, "Duration must be positive"); + this.aggregation = checkNotNull(aggregation, "Aggregation"); + this.measure = checkNotNull(measure, "measure"); + } + + Map<List</*@Nullable*/ TagValue>, MutableAggregation> getTagValueAggregationMap() { + return tagValueAggregationMap; + } + + Timestamp getStart() { + return start; + } + + // Puts a new value into the internal MutableAggregations, based on the TagValues. + void record( + List</*@Nullable*/ TagValue> tagValues, + double value, + Map<String, String> attachments, + Timestamp timestamp) { + if (!tagValueAggregationMap.containsKey(tagValues)) { + tagValueAggregationMap.put( + tagValues, RecordUtils.createMutableAggregation(aggregation, measure)); + } + tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp); + } + + /* + * Returns how much fraction of duration has passed in this IntervalBucket. For example, if this + * bucket starts at 10s and has a duration of 20s, and now is 15s, then getFraction() should + * return (15 - 10) / 20 = 0.25. + * + * This IntervalBucket must be current, i.e. the current timestamp must be within + * [this.start, this.start + this.duration). + */ + double getFraction(Timestamp now) { + Duration elapsedTime = now.subtractTimestamp(start); + checkArgument( + elapsedTime.compareTo(ZERO) >= 0 && elapsedTime.compareTo(duration) < 0, + "This bucket must be current."); + return ((double) elapsedTime.toMillis()) / duration.toMillis(); + } + + void clearStats() { + tagValueAggregationMap.clear(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java new file mode 100644 index 00000000..ee51796c --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java @@ -0,0 +1,66 @@ +/* + * Copyright 2016-17, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import io.opencensus.stats.Measure.MeasureDouble; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.stats.MeasureMap; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.unsafe.ContextUtils; + +/** Implementation of {@link MeasureMap}. */ +final class MeasureMapImpl extends MeasureMap { + private final StatsManager statsManager; + private final MeasureMapInternal.Builder builder = MeasureMapInternal.builder(); + + static MeasureMapImpl create(StatsManager statsManager) { + return new MeasureMapImpl(statsManager); + } + + private MeasureMapImpl(StatsManager statsManager) { + this.statsManager = statsManager; + } + + @Override + public MeasureMapImpl put(MeasureDouble measure, double value) { + builder.put(measure, value); + return this; + } + + @Override + public MeasureMapImpl put(MeasureLong measure, long value) { + builder.put(measure, value); + return this; + } + + @Override + public MeasureMap putAttachment(String key, String value) { + builder.putAttachment(key, value); + return this; + } + + @Override + public void record() { + // Use the context key directly, to avoid depending on the tags implementation. + record(ContextUtils.TAG_CONTEXT_KEY.get()); + } + + @Override + public void record(TagContext tags) { + statsManager.record(tags, builder.build()); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java new file mode 100644 index 00000000..d867b342 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java @@ -0,0 +1,138 @@ +/* + * Copyright 2016-17, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measure.MeasureDouble; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.stats.Measurement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; + +// TODO(songya): consider combining MeasureMapImpl and this class. +/** A map from {@link Measure}'s to measured values. */ +final class MeasureMapInternal { + + /** Returns a {@link Builder} for the {@link MeasureMapInternal} class. */ + static Builder builder() { + return new Builder(); + } + + /** + * Returns an {@link Iterator} over the measure/value mappings in this {@link MeasureMapInternal}. + * The {@code Iterator} does not support {@link Iterator#remove()}. + */ + Iterator<Measurement> iterator() { + return new MeasureMapInternalIterator(); + } + + // Returns the contextual information associated with an example value. + Map<String, String> getAttachments() { + return attachments; + } + + private final ArrayList<Measurement> measurements; + private final Map<String, String> attachments; + + private MeasureMapInternal(ArrayList<Measurement> measurements, Map<String, String> attachments) { + this.measurements = measurements; + this.attachments = Collections.unmodifiableMap(new HashMap<String, String>(attachments)); + } + + /** Builder for the {@link MeasureMapInternal} class. */ + static class Builder { + /** + * Associates the {@link MeasureDouble} with the given value. Subsequent updates to the same + * {@link MeasureDouble} will overwrite the previous value. + * + * @param measure the {@link MeasureDouble} + * @param value the value to be associated with {@code measure} + * @return this + */ + Builder put(MeasureDouble measure, double value) { + measurements.add(Measurement.MeasurementDouble.create(measure, value)); + return this; + } + + /** + * Associates the {@link MeasureLong} with the given value. Subsequent updates to the same + * {@link MeasureLong} will overwrite the previous value. + * + * @param measure the {@link MeasureLong} + * @param value the value to be associated with {@code measure} + * @return this + */ + Builder put(MeasureLong measure, long value) { + measurements.add(Measurement.MeasurementLong.create(measure, value)); + return this; + } + + Builder putAttachment(String key, String value) { + this.attachments.put(key, value); + return this; + } + + /** Constructs a {@link MeasureMapInternal} from the current measurements. */ + MeasureMapInternal build() { + // Note: this makes adding measurements quadratic but is fastest for the sizes of + // MeasureMapInternals that we should see. We may want to go to a strategy of sort/eliminate + // for larger MeasureMapInternals. + for (int i = measurements.size() - 1; i >= 0; i--) { + for (int j = i - 1; j >= 0; j--) { + if (measurements.get(i).getMeasure() == measurements.get(j).getMeasure()) { + measurements.remove(j); + j--; + } + } + } + return new MeasureMapInternal(measurements, attachments); + } + + private final ArrayList<Measurement> measurements = new ArrayList<Measurement>(); + private final Map<String, String> attachments = new HashMap<String, String>(); + + private Builder() {} + } + + // Provides an unmodifiable Iterator over this instance's measurements. + private final class MeasureMapInternalIterator implements Iterator<Measurement> { + @Override + public boolean hasNext() { + return position < length; + } + + @Override + public Measurement next() { + if (position >= measurements.size()) { + throw new NoSuchElementException(); + } + return measurements.get(position++); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private final int length = measurements.size(); + private int position = 0; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java new file mode 100644 index 00000000..5da0cad8 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java @@ -0,0 +1,194 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.opencensus.common.Clock; +import io.opencensus.common.Timestamp; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.metrics.export.Metric; +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measurement; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagContext; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import javax.annotation.concurrent.GuardedBy; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +/** A class that stores a singleton map from {@code MeasureName}s to {@link MutableViewData}s. */ +@SuppressWarnings("deprecation") +final class MeasureToViewMap { + + /* + * A synchronized singleton map that stores the one-to-many mapping from Measures + * to MutableViewDatas. + */ + @GuardedBy("this") + private final Multimap<String, MutableViewData> mutableMap = + HashMultimap.<String, MutableViewData>create(); + + @GuardedBy("this") + private final Map<View.Name, View> registeredViews = new HashMap<View.Name, View>(); + + // TODO(songya): consider adding a Measure.Name class + @GuardedBy("this") + private final Map<String, Measure> registeredMeasures = Maps.newHashMap(); + + // Cached set of exported views. It must be set to null whenever a view is registered or + // unregistered. + @javax.annotation.Nullable private volatile Set<View> exportedViews; + + /** Returns a {@link ViewData} corresponding to the given {@link View.Name}. */ + @javax.annotation.Nullable + synchronized ViewData getView(View.Name viewName, Clock clock, State state) { + MutableViewData view = getMutableViewData(viewName); + return view == null ? null : view.toViewData(clock.now(), state); + } + + Set<View> getExportedViews() { + Set<View> views = exportedViews; + if (views == null) { + synchronized (this) { + exportedViews = views = filterExportedViews(registeredViews.values()); + } + } + return views; + } + + // Returns the subset of the given views that should be exported + private static Set<View> filterExportedViews(Collection<View> allViews) { + Set<View> views = Sets.newHashSet(); + for (View view : allViews) { + if (view.getWindow() instanceof View.AggregationWindow.Cumulative) { + views.add(view); + } + } + return Collections.unmodifiableSet(views); + } + + /** Enable stats collection for the given {@link View}. */ + synchronized void registerView(View view, Clock clock) { + exportedViews = null; + View existing = registeredViews.get(view.getName()); + if (existing != null) { + if (existing.equals(view)) { + // Ignore views that are already registered. + return; + } else { + throw new IllegalArgumentException( + "A different view with the same name is already registered: " + existing); + } + } + Measure measure = view.getMeasure(); + Measure registeredMeasure = registeredMeasures.get(measure.getName()); + if (registeredMeasure != null && !registeredMeasure.equals(measure)) { + throw new IllegalArgumentException( + "A different measure with the same name is already registered: " + registeredMeasure); + } + registeredViews.put(view.getName(), view); + if (registeredMeasure == null) { + registeredMeasures.put(measure.getName(), measure); + } + Timestamp now = clock.now(); + mutableMap.put(view.getMeasure().getName(), MutableViewData.create(view, now)); + } + + @javax.annotation.Nullable + private synchronized MutableViewData getMutableViewData(View.Name viewName) { + View view = registeredViews.get(viewName); + if (view == null) { + return null; + } + Collection<MutableViewData> views = mutableMap.get(view.getMeasure().getName()); + for (MutableViewData viewData : views) { + if (viewData.getView().getName().equals(viewName)) { + return viewData; + } + } + throw new AssertionError( + "Internal error: Not recording stats for view: \"" + + viewName + + "\" registeredViews=" + + registeredViews + + ", mutableMap=" + + mutableMap); + } + + // Records stats with a set of tags. + synchronized void record(TagContext tags, MeasureMapInternal stats, Timestamp timestamp) { + Iterator<Measurement> iterator = stats.iterator(); + Map<String, String> attachments = stats.getAttachments(); + while (iterator.hasNext()) { + Measurement measurement = iterator.next(); + Measure measure = measurement.getMeasure(); + if (!measure.equals(registeredMeasures.get(measure.getName()))) { + // unregistered measures will be ignored. + continue; + } + Collection<MutableViewData> viewDataCollection = mutableMap.get(measure.getName()); + for (MutableViewData viewData : viewDataCollection) { + viewData.record( + tags, RecordUtils.getDoubleValueFromMeasurement(measurement), timestamp, attachments); + } + } + } + + synchronized List<Metric> getMetrics(Clock clock, State state) { + List<Metric> metrics = new ArrayList<Metric>(); + Timestamp now = clock.now(); + for (Entry<String, MutableViewData> entry : mutableMap.entries()) { + Metric metric = entry.getValue().toMetric(now, state); + if (metric != null) { + metrics.add(metric); + } + } + return metrics; + } + + // Clear stats for all the current MutableViewData + synchronized void clearStats() { + for (Entry<String, Collection<MutableViewData>> entry : mutableMap.asMap().entrySet()) { + for (MutableViewData mutableViewData : entry.getValue()) { + mutableViewData.clearStats(); + } + } + } + + // Resume stats collection for all MutableViewData. + synchronized void resumeStatsCollection(Timestamp now) { + for (Entry<String, Collection<MutableViewData>> entry : mutableMap.asMap().entrySet()) { + for (MutableViewData mutableViewData : entry.getValue()) { + mutableViewData.resumeStatsCollection(now); + } + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java new file mode 100644 index 00000000..7bf92572 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java @@ -0,0 +1,38 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import io.opencensus.metrics.export.Metric; +import io.opencensus.metrics.export.MetricProducer; +import java.util.Collection; +import javax.annotation.concurrent.ThreadSafe; + +/** Implementation of {@link MetricProducer}. */ +@ThreadSafe +final class MetricProducerImpl extends MetricProducer { + + private final StatsManager statsManager; + + MetricProducerImpl(StatsManager statsManager) { + this.statsManager = statsManager; + } + + @Override + public Collection<Metric> getMetrics() { + return statsManager.getMetrics(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java new file mode 100644 index 00000000..0dfb1d26 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java @@ -0,0 +1,118 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import com.google.common.annotations.VisibleForTesting; +import io.opencensus.common.Function; +import io.opencensus.common.Functions; +import io.opencensus.metrics.LabelKey; +import io.opencensus.metrics.LabelValue; +import io.opencensus.metrics.export.MetricDescriptor; +import io.opencensus.metrics.export.MetricDescriptor.Type; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.Measure; +import io.opencensus.stats.View; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.util.ArrayList; +import java.util.List; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +@SuppressWarnings("deprecation") +// Utils to convert Stats data models to Metric data models. +final class MetricUtils { + + @javax.annotation.Nullable + static MetricDescriptor viewToMetricDescriptor(View view) { + if (view.getWindow() instanceof View.AggregationWindow.Interval) { + // Only creates Metric for cumulative stats. + return null; + } + List<LabelKey> labelKeys = new ArrayList<LabelKey>(); + for (TagKey tagKey : view.getColumns()) { + // TODO: add description + labelKeys.add(LabelKey.create(tagKey.getName(), "")); + } + Measure measure = view.getMeasure(); + return MetricDescriptor.create( + view.getName().asString(), + view.getDescription(), + measure.getUnit(), + getType(measure, view.getAggregation()), + labelKeys); + } + + @VisibleForTesting + static Type getType(Measure measure, Aggregation aggregation) { + return aggregation.match( + Functions.returnConstant( + measure.match( + TYPE_CUMULATIVE_DOUBLE_FUNCTION, // Sum Double + TYPE_CUMULATIVE_INT64_FUNCTION, // Sum Int64 + TYPE_UNRECOGNIZED_FUNCTION)), + TYPE_CUMULATIVE_INT64_FUNCTION, // Count + TYPE_CUMULATIVE_DISTRIBUTION_FUNCTION, // Distribution + Functions.returnConstant( + measure.match( + TYPE_GAUGE_DOUBLE_FUNCTION, // LastValue Double + TYPE_GAUGE_INT64_FUNCTION, // LastValue Long + TYPE_UNRECOGNIZED_FUNCTION)), + AGGREGATION_TYPE_DEFAULT_FUNCTION); + } + + static List<LabelValue> tagValuesToLabelValues(List</*@Nullable*/ TagValue> tagValues) { + List<LabelValue> labelValues = new ArrayList<LabelValue>(); + for (/*@Nullable*/ TagValue tagValue : tagValues) { + labelValues.add(LabelValue.create(tagValue == null ? null : tagValue.asString())); + } + return labelValues; + } + + private static final Function<Object, Type> TYPE_CUMULATIVE_DOUBLE_FUNCTION = + Functions.returnConstant(Type.CUMULATIVE_DOUBLE); + + private static final Function<Object, Type> TYPE_CUMULATIVE_INT64_FUNCTION = + Functions.returnConstant(Type.CUMULATIVE_INT64); + + private static final Function<Object, Type> TYPE_CUMULATIVE_DISTRIBUTION_FUNCTION = + Functions.returnConstant(Type.CUMULATIVE_DISTRIBUTION); + + private static final Function<Object, Type> TYPE_GAUGE_DOUBLE_FUNCTION = + Functions.returnConstant(Type.GAUGE_DOUBLE); + + private static final Function<Object, Type> TYPE_GAUGE_INT64_FUNCTION = + Functions.returnConstant(Type.GAUGE_INT64); + + private static final Function<Object, Type> TYPE_UNRECOGNIZED_FUNCTION = + Functions.<Type>throwAssertionError(); + + private static final Function<Aggregation, Type> AGGREGATION_TYPE_DEFAULT_FUNCTION = + new Function<Aggregation, Type>() { + @Override + public Type apply(Aggregation arg) { + if (arg instanceof Aggregation.Mean) { + return Type.CUMULATIVE_DOUBLE; // Mean + } + throw new AssertionError(); + } + }; + + private MetricUtils() {} +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java new file mode 100644 index 00000000..6e2bff1c --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java @@ -0,0 +1,556 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import io.opencensus.common.Timestamp; +import io.opencensus.metrics.export.Distribution; +import io.opencensus.metrics.export.Distribution.BucketOptions; +import io.opencensus.metrics.export.Point; +import io.opencensus.metrics.export.Value; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.AggregationData.DistributionData; +import io.opencensus.stats.AggregationData.DistributionData.Exemplar; +import io.opencensus.stats.BucketBoundaries; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** Mutable version of {@link Aggregation} that supports adding values. */ +abstract class MutableAggregation { + + private MutableAggregation() {} + + // Tolerance for double comparison. + private static final double TOLERANCE = 1e-6; + + /** + * Put a new value into the MutableAggregation. + * + * @param value new value to be added to population + * @param attachments the contextual information on an {@link Exemplar} + * @param timestamp the timestamp when the value is recorded + */ + abstract void add(double value, Map<String, String> attachments, Timestamp timestamp); + + // TODO(songya): remove this method once interval stats is completely removed. + /** + * Combine the internal values of this MutableAggregation and value of the given + * MutableAggregation, with the given fraction. Then set the internal value of this + * MutableAggregation to the combined value. + * + * @param other the other {@code MutableAggregation}. The type of this and other {@code + * MutableAggregation} must match. + * @param fraction the fraction that the value in other {@code MutableAggregation} should + * contribute. Must be within [0.0, 1.0]. + */ + abstract void combine(MutableAggregation other, double fraction); + + abstract AggregationData toAggregationData(); + + abstract Point toPoint(Timestamp timestamp); + + /** Calculate sum of doubles on aggregated {@code MeasureValue}s. */ + static class MutableSumDouble extends MutableAggregation { + + private double sum = 0.0; + + private MutableSumDouble() {} + + /** + * Construct a {@code MutableSumDouble}. + * + * @return an empty {@code MutableSumDouble}. + */ + static MutableSumDouble create() { + return new MutableSumDouble(); + } + + @Override + void add(double value, Map<String, String> attachments, Timestamp timestamp) { + sum += value; + } + + @Override + void combine(MutableAggregation other, double fraction) { + checkArgument(other instanceof MutableSumDouble, "MutableSumDouble expected."); + this.sum += fraction * ((MutableSumDouble) other).sum; + } + + @Override + AggregationData toAggregationData() { + return AggregationData.SumDataDouble.create(sum); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.doubleValue(sum), timestamp); + } + + @VisibleForTesting + double getSum() { + return sum; + } + } + + /** Calculate sum of longs on aggregated {@code MeasureValue}s. */ + static final class MutableSumLong extends MutableSumDouble { + private MutableSumLong() { + super(); + } + + /** + * Construct a {@code MutableSumLong}. + * + * @return an empty {@code MutableSumLong}. + */ + static MutableSumLong create() { + return new MutableSumLong(); + } + + @Override + AggregationData toAggregationData() { + return AggregationData.SumDataLong.create(Math.round(getSum())); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.longValue(Math.round(getSum())), timestamp); + } + } + + /** Calculate count on aggregated {@code MeasureValue}s. */ + static final class MutableCount extends MutableAggregation { + + private long count = 0; + + private MutableCount() {} + + /** + * Construct a {@code MutableCount}. + * + * @return an empty {@code MutableCount}. + */ + static MutableCount create() { + return new MutableCount(); + } + + @Override + void add(double value, Map<String, String> attachments, Timestamp timestamp) { + count++; + } + + @Override + void combine(MutableAggregation other, double fraction) { + checkArgument(other instanceof MutableCount, "MutableCount expected."); + this.count += Math.round(fraction * ((MutableCount) other).getCount()); + } + + @Override + AggregationData toAggregationData() { + return AggregationData.CountData.create(count); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.longValue(count), timestamp); + } + + /** + * Returns the aggregated count. + * + * @return the aggregated count. + */ + long getCount() { + return count; + } + } + + /** Calculate mean on aggregated {@code MeasureValue}s. */ + static final class MutableMean extends MutableAggregation { + + private double sum = 0.0; + private long count = 0; + + private MutableMean() {} + + /** + * Construct a {@code MutableMean}. + * + * @return an empty {@code MutableMean}. + */ + static MutableMean create() { + return new MutableMean(); + } + + @Override + void add(double value, Map<String, String> attachments, Timestamp timestamp) { + count++; + sum += value; + } + + @Override + void combine(MutableAggregation other, double fraction) { + checkArgument(other instanceof MutableMean, "MutableMean expected."); + MutableMean mutableMean = (MutableMean) other; + this.count += Math.round(mutableMean.count * fraction); + this.sum += mutableMean.sum * fraction; + } + + @SuppressWarnings("deprecation") + @Override + AggregationData toAggregationData() { + return AggregationData.MeanData.create(getMean(), count); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.doubleValue(getMean()), timestamp); + } + + /** + * Returns the aggregated mean. + * + * @return the aggregated mean. + */ + double getMean() { + return count == 0 ? 0 : sum / count; + } + + /** + * Returns the aggregated count. + * + * @return the aggregated count. + */ + long getCount() { + return count; + } + + @VisibleForTesting + double getSum() { + return sum; + } + } + + /** Calculate distribution stats on aggregated {@code MeasureValue}s. */ + static final class MutableDistribution extends MutableAggregation { + + private double sum = 0.0; + private double mean = 0.0; + private long count = 0; + private double sumOfSquaredDeviations = 0.0; + + // Initial "impossible" values, that will get reset as soon as first value is added. + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + + private final BucketBoundaries bucketBoundaries; + private final long[] bucketCounts; + + // If there's a histogram (i.e bucket boundaries are not empty) in this MutableDistribution, + // exemplars will have the same size to bucketCounts; otherwise exemplars are null. + // Only the newest exemplar will be kept at each index. + @javax.annotation.Nullable private final Exemplar[] exemplars; + + private MutableDistribution(BucketBoundaries bucketBoundaries) { + this.bucketBoundaries = bucketBoundaries; + int buckets = bucketBoundaries.getBoundaries().size() + 1; + this.bucketCounts = new long[buckets]; + // In the implementation, each histogram bucket can have up to one exemplar, and the exemplar + // array is guaranteed to be in ascending order. + // If there's no histogram, don't record exemplars. + this.exemplars = bucketBoundaries.getBoundaries().isEmpty() ? null : new Exemplar[buckets]; + } + + /** + * Construct a {@code MutableDistribution}. + * + * @return an empty {@code MutableDistribution}. + */ + static MutableDistribution create(BucketBoundaries bucketBoundaries) { + checkNotNull(bucketBoundaries, "bucketBoundaries should not be null."); + return new MutableDistribution(bucketBoundaries); + } + + @Override + void add(double value, Map<String, String> attachments, Timestamp timestamp) { + sum += value; + count++; + + /* + * Update the sum of squared deviations from the mean with the given value. For values + * x_i this is Sum[i=1..n]((x_i - mean)^2) + * + * Computed using Welfords method (see + * https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance, or Knuth, "The Art of + * Computer Programming", Vol. 2, page 323, 3rd edition) + */ + double deltaFromMean = value - mean; + mean += deltaFromMean / count; + double deltaFromMean2 = value - mean; + sumOfSquaredDeviations += deltaFromMean * deltaFromMean2; + + if (value < min) { + min = value; + } + if (value > max) { + max = value; + } + + int bucket = 0; + for (; bucket < bucketBoundaries.getBoundaries().size(); bucket++) { + if (value < bucketBoundaries.getBoundaries().get(bucket)) { + break; + } + } + bucketCounts[bucket]++; + + // No implicit recording for exemplars - if there are no attachments (contextual information), + // don't record exemplars. + if (!attachments.isEmpty() && exemplars != null) { + exemplars[bucket] = Exemplar.create(value, timestamp, attachments); + } + } + + // We don't compute fractional MutableDistribution, it's either whole or none. + @Override + void combine(MutableAggregation other, double fraction) { + checkArgument(other instanceof MutableDistribution, "MutableDistribution expected."); + if (Math.abs(1.0 - fraction) > TOLERANCE) { + return; + } + + MutableDistribution mutableDistribution = (MutableDistribution) other; + checkArgument( + this.bucketBoundaries.equals(mutableDistribution.bucketBoundaries), + "Bucket boundaries should match."); + + // Algorithm for calculating the combination of sum of squared deviations: + // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm. + if (this.count + mutableDistribution.count > 0) { + double delta = mutableDistribution.mean - this.mean; + this.sumOfSquaredDeviations = + this.sumOfSquaredDeviations + + mutableDistribution.sumOfSquaredDeviations + + Math.pow(delta, 2) + * this.count + * mutableDistribution.count + / (this.count + mutableDistribution.count); + } + + this.count += mutableDistribution.count; + this.sum += mutableDistribution.sum; + this.mean = this.sum / this.count; + + if (mutableDistribution.min < this.min) { + this.min = mutableDistribution.min; + } + if (mutableDistribution.max > this.max) { + this.max = mutableDistribution.max; + } + + long[] bucketCounts = mutableDistribution.getBucketCounts(); + for (int i = 0; i < bucketCounts.length; i++) { + this.bucketCounts[i] += bucketCounts[i]; + } + + Exemplar[] otherExemplars = mutableDistribution.getExemplars(); + if (exemplars != null && otherExemplars != null) { + for (int i = 0; i < otherExemplars.length; i++) { + Exemplar exemplar = otherExemplars[i]; + // Assume other is always newer than this, because we combined interval buckets in time + // order. + // If there's a newer exemplar, overwrite current value. + if (exemplar != null) { + this.exemplars[i] = exemplar; + } + } + } + } + + @Override + AggregationData toAggregationData() { + List<Long> boxedBucketCounts = new ArrayList<Long>(); + for (long bucketCount : bucketCounts) { + boxedBucketCounts.add(bucketCount); + } + List<Exemplar> exemplarList = new ArrayList<Exemplar>(); + if (exemplars != null) { + for (Exemplar exemplar : exemplars) { + if (exemplar != null) { + exemplarList.add(exemplar); + } + } + } + return DistributionData.create( + mean, count, min, max, sumOfSquaredDeviations, boxedBucketCounts, exemplarList); + } + + @Override + Point toPoint(Timestamp timestamp) { + List<Distribution.Bucket> buckets = new ArrayList<Distribution.Bucket>(); + for (int bucket = 0; bucket < bucketCounts.length; bucket++) { + long bucketCount = bucketCounts[bucket]; + @javax.annotation.Nullable AggregationData.DistributionData.Exemplar exemplar = null; + if (exemplars != null) { + exemplar = exemplars[bucket]; + } + + Distribution.Bucket metricBucket; + if (exemplar != null) { + // Bucket with an Exemplar. + metricBucket = + Distribution.Bucket.create( + bucketCount, + Distribution.Exemplar.create( + exemplar.getValue(), exemplar.getTimestamp(), exemplar.getAttachments())); + } else { + // Bucket with no Exemplar. + metricBucket = Distribution.Bucket.create(bucketCount); + } + buckets.add(metricBucket); + } + + // TODO(mayurkale): Drop the first bucket when converting to metrics. + // Reason: In Stats API, bucket bounds begin with -infinity (first bucket is (-infinity, 0)). + BucketOptions bucketOptions = BucketOptions.explicitOptions(bucketBoundaries.getBoundaries()); + + return Point.create( + Value.distributionValue( + Distribution.create( + count, mean * count, sumOfSquaredDeviations, bucketOptions, buckets)), + timestamp); + } + + double getMean() { + return mean; + } + + long getCount() { + return count; + } + + double getMin() { + return min; + } + + double getMax() { + return max; + } + + // Returns the aggregated sum of squared deviations. + double getSumOfSquaredDeviations() { + return sumOfSquaredDeviations; + } + + long[] getBucketCounts() { + return bucketCounts; + } + + BucketBoundaries getBucketBoundaries() { + return bucketBoundaries; + } + + @javax.annotation.Nullable + Exemplar[] getExemplars() { + return exemplars; + } + } + + /** Calculate double last value on aggregated {@code MeasureValue}s. */ + static class MutableLastValueDouble extends MutableAggregation { + + // Initial value that will get reset as soon as first value is added. + private double lastValue = Double.NaN; + // TODO(songya): remove this once interval stats is completely removed. + private boolean initialized = false; + + private MutableLastValueDouble() {} + + /** + * Construct a {@code MutableLastValueDouble}. + * + * @return an empty {@code MutableLastValueDouble}. + */ + static MutableLastValueDouble create() { + return new MutableLastValueDouble(); + } + + @Override + void add(double value, Map<String, String> attachments, Timestamp timestamp) { + lastValue = value; + // TODO(songya): remove this once interval stats is completely removed. + if (!initialized) { + initialized = true; + } + } + + @Override + void combine(MutableAggregation other, double fraction) { + checkArgument(other instanceof MutableLastValueDouble, "MutableLastValueDouble expected."); + MutableLastValueDouble otherValue = (MutableLastValueDouble) other; + // Assume other is always newer than this, because we combined interval buckets in time order. + // If there's a newer value, overwrite current value. + this.lastValue = otherValue.initialized ? otherValue.getLastValue() : this.lastValue; + } + + @Override + AggregationData toAggregationData() { + return AggregationData.LastValueDataDouble.create(lastValue); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.doubleValue(lastValue), timestamp); + } + + @VisibleForTesting + double getLastValue() { + return lastValue; + } + } + + /** Calculate last long value on aggregated {@code MeasureValue}s. */ + static final class MutableLastValueLong extends MutableLastValueDouble { + private MutableLastValueLong() { + super(); + } + + /** + * Construct a {@code MutableLastValueLong}. + * + * @return an empty {@code MutableLastValueLong}. + */ + static MutableLastValueLong create() { + return new MutableLastValueLong(); + } + + @Override + AggregationData toAggregationData() { + return AggregationData.LastValueDataLong.create(Math.round(getLastValue())); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.longValue(Math.round(getLastValue())), timestamp); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java new file mode 100644 index 00000000..928675e9 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java @@ -0,0 +1,464 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.opencensus.implcore.stats.RecordUtils.createAggregationMap; +import static io.opencensus.implcore.stats.RecordUtils.createMutableAggregation; +import static io.opencensus.implcore.stats.RecordUtils.getTagMap; +import static io.opencensus.implcore.stats.RecordUtils.getTagValues; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import io.opencensus.common.Duration; +import io.opencensus.common.Function; +import io.opencensus.common.Functions; +import io.opencensus.common.Timestamp; +import io.opencensus.implcore.internal.CheckerFrameworkUtils; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.metrics.LabelValue; +import io.opencensus.metrics.export.Metric; +import io.opencensus.metrics.export.MetricDescriptor; +import io.opencensus.metrics.export.MetricDescriptor.Type; +import io.opencensus.metrics.export.Point; +import io.opencensus.metrics.export.TimeSeries; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.Measure; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagValue; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +/** A mutable version of {@link ViewData}, used for recording stats and start/end time. */ +@SuppressWarnings("deprecation") +abstract class MutableViewData { + + @VisibleForTesting static final Timestamp ZERO_TIMESTAMP = Timestamp.create(0, 0); + + private final View view; + + private MutableViewData(View view) { + this.view = view; + } + + /** + * Constructs a new {@link MutableViewData}. + * + * @param view the {@code View} linked with this {@code MutableViewData}. + * @param start the start {@code Timestamp}. + * @return a {@code MutableViewData}. + */ + static MutableViewData create(final View view, final Timestamp start) { + return view.getWindow() + .match( + new CreateCumulative(view, start), + new CreateInterval(view, start), + Functions.<MutableViewData>throwAssertionError()); + } + + /** The {@link View} associated with this {@link ViewData}. */ + View getView() { + return view; + } + + @javax.annotation.Nullable + abstract Metric toMetric(Timestamp now, State state); + + /** Record stats with the given tags. */ + abstract void record( + TagContext context, double value, Timestamp timestamp, Map<String, String> attachments); + + /** Convert this {@link MutableViewData} to {@link ViewData}. */ + abstract ViewData toViewData(Timestamp now, State state); + + // Clear recorded stats. + abstract void clearStats(); + + // Resume stats collection, and reset Start Timestamp (for CumulativeMutableViewData), or refresh + // bucket list (for InternalMutableViewData). + abstract void resumeStatsCollection(Timestamp now); + + private static final class CumulativeMutableViewData extends MutableViewData { + + private Timestamp start; + private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap = + Maps.newHashMap(); + // Cache a MetricDescriptor to avoid converting View to MetricDescriptor in the future. + private final MetricDescriptor metricDescriptor; + + private CumulativeMutableViewData(View view, Timestamp start) { + super(view); + this.start = start; + MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view); + if (metricDescriptor == null) { + throw new AssertionError( + "Cumulative view should be converted to a non-null MetricDescriptor."); + } else { + this.metricDescriptor = metricDescriptor; + } + } + + @javax.annotation.Nullable + @Override + Metric toMetric(Timestamp now, State state) { + if (state == State.DISABLED) { + return null; + } + Type type = metricDescriptor.getType(); + @javax.annotation.Nullable + Timestamp startTime = type == Type.GAUGE_INT64 || type == Type.GAUGE_DOUBLE ? null : start; + List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(); + for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry : + tagValueAggregationMap.entrySet()) { + List<LabelValue> labelValues = MetricUtils.tagValuesToLabelValues(entry.getKey()); + Point point = entry.getValue().toPoint(now); + timeSeriesList.add(TimeSeries.createWithOnePoint(labelValues, point, startTime)); + } + return Metric.create(metricDescriptor, timeSeriesList); + } + + @Override + void record( + TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) { + List</*@Nullable*/ TagValue> tagValues = + getTagValues(getTagMap(context), super.view.getColumns()); + if (!tagValueAggregationMap.containsKey(tagValues)) { + tagValueAggregationMap.put( + tagValues, + createMutableAggregation(super.view.getAggregation(), super.getView().getMeasure())); + } + tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp); + } + + @Override + ViewData toViewData(Timestamp now, State state) { + if (state == State.ENABLED) { + return ViewData.create( + super.view, + createAggregationMap(tagValueAggregationMap, super.view.getMeasure()), + ViewData.AggregationWindowData.CumulativeData.create(start, now)); + } else { + // If Stats state is DISABLED, return an empty ViewData. + return ViewData.create( + super.view, + Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(), + ViewData.AggregationWindowData.CumulativeData.create(ZERO_TIMESTAMP, ZERO_TIMESTAMP)); + } + } + + @Override + void clearStats() { + tagValueAggregationMap.clear(); + } + + @Override + void resumeStatsCollection(Timestamp now) { + start = now; + } + } + + /* + * For each IntervalView, we always keep a queue of N + 1 buckets (by default N is 4). + * Each bucket has a duration which is interval duration / N. + * Ideally: + * 1. the buckets should always be up-to-date, + * 2. current time should always be within the latest bucket, currently recorded stats should fall + * into the latest bucket, + * 3. there are always N buckets before the current one, which holds the stats in the past + * interval duration. + * + * When getView() is called, we will extract and combine the stats from the current and past + * buckets (part of the stats from the oldest bucket could have expired). + * + * However, in reality, we couldn't track the status of buckets all the time (keep monitoring and + * updating the bucket queue will be expensive). When we call record() or getView(), some or all + * of the buckets might be outdated, and we will need to "pad" new buckets to the queue and remove + * outdated ones. After refreshing buckets, the bucket queue will able to maintain the three + * invariants in the ideal situation. + * + * For example: + * 1. We have an IntervalView which has a duration of 8 seconds, we register this view at 10s. + * 2. Initially there will be 5 buckets: [2.0, 4.0), [4.0, 6.0), ..., [10.0, 12.0). + * 3. If users don't call record() or getView(), bucket queue will remain as it is, and some + * buckets could expire. + * 4. Suppose record() is called at 15s, now we need to refresh the bucket queue. We need to add + * two new buckets [12.0, 14.0) and [14.0, 16.0), and remove two expired buckets [2.0, 4.0) + * and [4.0, 6.0) + * 5. Suppose record() is called again at 30s, all the current buckets should have expired. We add + * 5 new buckets [22.0, 24.0) ... [30.0, 32.0) and remove all the previous buckets. + * 6. Suppose users call getView() at 35s, again we need to add two new buckets and remove two + * expired one, so that bucket queue is up-to-date. Now we combine stats from all buckets and + * return the combined IntervalViewData. + */ + private static final class IntervalMutableViewData extends MutableViewData { + + // TODO(songya): allow customizable bucket size in the future. + private static final int N = 4; // IntervalView has N + 1 buckets + + private final ArrayDeque<IntervalBucket> buckets = new ArrayDeque<IntervalBucket>(); + + private final Duration totalDuration; // Duration of the whole interval. + private final Duration bucketDuration; // Duration of a single bucket (totalDuration / N) + + private IntervalMutableViewData(View view, Timestamp start) { + super(view); + Duration totalDuration = ((View.AggregationWindow.Interval) view.getWindow()).getDuration(); + this.totalDuration = totalDuration; + this.bucketDuration = Duration.fromMillis(totalDuration.toMillis() / N); + + // When initializing. add N empty buckets prior to the start timestamp of this + // IntervalMutableViewData, so that the last bucket will be the current one in effect. + shiftBucketList(N + 1, start); + } + + @javax.annotation.Nullable + @Override + Metric toMetric(Timestamp now, State state) { + return null; + } + + @Override + void record( + TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) { + List</*@Nullable*/ TagValue> tagValues = + getTagValues(getTagMap(context), super.view.getColumns()); + refreshBucketList(timestamp); + // It is always the last bucket that does the recording. + CheckerFrameworkUtils.castNonNull(buckets.peekLast()) + .record(tagValues, value, attachments, timestamp); + } + + @Override + ViewData toViewData(Timestamp now, State state) { + refreshBucketList(now); + if (state == State.ENABLED) { + return ViewData.create( + super.view, + combineBucketsAndGetAggregationMap(now), + ViewData.AggregationWindowData.IntervalData.create(now)); + } else { + // If Stats state is DISABLED, return an empty ViewData. + return ViewData.create( + super.view, + Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(), + ViewData.AggregationWindowData.IntervalData.create(ZERO_TIMESTAMP)); + } + } + + @Override + void clearStats() { + for (IntervalBucket bucket : buckets) { + bucket.clearStats(); + } + } + + @Override + void resumeStatsCollection(Timestamp now) { + // Refresh bucket list to be ready for stats recording, so that if record() is called right + // after stats state is turned back on, record() will be faster. + refreshBucketList(now); + } + + // Add new buckets and remove expired buckets by comparing the current timestamp with + // timestamp of the last bucket. + private void refreshBucketList(Timestamp now) { + if (buckets.size() != N + 1) { + throw new AssertionError("Bucket list must have exactly " + (N + 1) + " buckets."); + } + Timestamp startOfLastBucket = + CheckerFrameworkUtils.castNonNull(buckets.peekLast()).getStart(); + // TODO(songya): decide what to do when time goes backwards + checkArgument( + now.compareTo(startOfLastBucket) >= 0, + "Current time must be within or after the last bucket."); + long elapsedTimeMillis = now.subtractTimestamp(startOfLastBucket).toMillis(); + long numOfPadBuckets = elapsedTimeMillis / bucketDuration.toMillis(); + + shiftBucketList(numOfPadBuckets, now); + } + + // Add specified number of new buckets, and remove expired buckets + private void shiftBucketList(long numOfPadBuckets, Timestamp now) { + Timestamp startOfNewBucket; + + if (!buckets.isEmpty()) { + startOfNewBucket = + CheckerFrameworkUtils.castNonNull(buckets.peekLast()) + .getStart() + .addDuration(bucketDuration); + } else { + // Initialize bucket list. Should only enter this block once. + startOfNewBucket = subtractDuration(now, totalDuration); + } + + if (numOfPadBuckets > N + 1) { + // All current buckets expired, need to add N + 1 new buckets. The start time of the latest + // bucket will be current time. + startOfNewBucket = subtractDuration(now, totalDuration); + numOfPadBuckets = N + 1; + } + + for (int i = 0; i < numOfPadBuckets; i++) { + buckets.add( + new IntervalBucket( + startOfNewBucket, + bucketDuration, + super.view.getAggregation(), + super.view.getMeasure())); + startOfNewBucket = startOfNewBucket.addDuration(bucketDuration); + } + + // removed expired buckets + while (buckets.size() > N + 1) { + buckets.pollFirst(); + } + } + + // Combine stats within each bucket, aggregate stats by tag values, and return the mapping from + // tag values to aggregation data. + private Map<List</*@Nullable*/ TagValue>, AggregationData> combineBucketsAndGetAggregationMap( + Timestamp now) { + // Need to maintain the order of inserted MutableAggregations (inserted based on time order). + Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap = + LinkedHashMultimap.create(); + + ArrayDeque<IntervalBucket> shallowCopy = new ArrayDeque<IntervalBucket>(buckets); + + Aggregation aggregation = super.view.getAggregation(); + Measure measure = super.view.getMeasure(); + putBucketsIntoMultiMap(shallowCopy, multimap, aggregation, measure, now); + Map<List</*@Nullable*/ TagValue>, MutableAggregation> singleMap = + aggregateOnEachTagValueList(multimap, aggregation, measure); + return createAggregationMap(singleMap, super.getView().getMeasure()); + } + + // Put stats within each bucket to a multimap. Each tag value list (map key) could have multiple + // mutable aggregations (map value) from different buckets. + private static void putBucketsIntoMultiMap( + ArrayDeque<IntervalBucket> buckets, + Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap, + Aggregation aggregation, + Measure measure, + Timestamp now) { + // Put fractional stats of the head (oldest) bucket. + IntervalBucket head = CheckerFrameworkUtils.castNonNull(buckets.peekFirst()); + IntervalBucket tail = CheckerFrameworkUtils.castNonNull(buckets.peekLast()); + double fractionTail = tail.getFraction(now); + // TODO(songya): decide what to do when time goes backwards + checkArgument( + 0.0 <= fractionTail && fractionTail <= 1.0, + "Fraction " + fractionTail + " should be within [0.0, 1.0]."); + double fractionHead = 1.0 - fractionTail; + putFractionalMutableAggregationsToMultiMap( + head.getTagValueAggregationMap(), multimap, aggregation, measure, fractionHead); + + // Put whole data of other buckets. + boolean shouldSkipFirst = true; + for (IntervalBucket bucket : buckets) { + if (shouldSkipFirst) { + shouldSkipFirst = false; + continue; // skip the first bucket + } + for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry : + bucket.getTagValueAggregationMap().entrySet()) { + multimap.put(entry.getKey(), entry.getValue()); + } + } + } + + // Put stats within one bucket into multimap, multiplied by a given fraction. + private static <T> void putFractionalMutableAggregationsToMultiMap( + Map<T, MutableAggregation> mutableAggrMap, + Multimap<T, MutableAggregation> multimap, + Aggregation aggregation, + Measure measure, + double fraction) { + for (Entry<T, MutableAggregation> entry : mutableAggrMap.entrySet()) { + // Initially empty MutableAggregations. + MutableAggregation fractionalMutableAgg = createMutableAggregation(aggregation, measure); + fractionalMutableAgg.combine(entry.getValue(), fraction); + multimap.put(entry.getKey(), fractionalMutableAgg); + } + } + + // For each tag value list (key of AggregationMap), combine mutable aggregations into one + // mutable aggregation, thus convert the multimap into a single map. + private static <T> Map<T, MutableAggregation> aggregateOnEachTagValueList( + Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure) { + Map<T, MutableAggregation> map = Maps.newHashMap(); + for (T tagValues : multimap.keySet()) { + // Initially empty MutableAggregations. + MutableAggregation combinedAggregation = createMutableAggregation(aggregation, measure); + for (MutableAggregation mutableAggregation : multimap.get(tagValues)) { + combinedAggregation.combine(mutableAggregation, 1.0); + } + map.put(tagValues, combinedAggregation); + } + return map; + } + + // Subtract a Duration from a Timestamp, and return a new Timestamp. + private static Timestamp subtractDuration(Timestamp timestamp, Duration duration) { + return timestamp.addDuration(Duration.create(-duration.getSeconds(), -duration.getNanos())); + } + } + + private static final class CreateCumulative + implements Function<View.AggregationWindow.Cumulative, MutableViewData> { + @Override + public MutableViewData apply(View.AggregationWindow.Cumulative arg) { + return new CumulativeMutableViewData(view, start); + } + + private final View view; + private final Timestamp start; + + private CreateCumulative(View view, Timestamp start) { + this.view = view; + this.start = start; + } + } + + private static final class CreateInterval + implements Function<View.AggregationWindow.Interval, MutableViewData> { + @Override + public MutableViewData apply(View.AggregationWindow.Interval arg) { + return new IntervalMutableViewData(view, start); + } + + private final View view; + private final Timestamp start; + + private CreateInterval(View view, Timestamp start) { + this.view = view; + this.start = start; + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java b/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java new file mode 100644 index 00000000..fbb593f5 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java @@ -0,0 +1,241 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import io.opencensus.common.Function; +import io.opencensus.common.Functions; +import io.opencensus.implcore.stats.MutableAggregation.MutableCount; +import io.opencensus.implcore.stats.MutableAggregation.MutableDistribution; +import io.opencensus.implcore.stats.MutableAggregation.MutableLastValueDouble; +import io.opencensus.implcore.stats.MutableAggregation.MutableLastValueLong; +import io.opencensus.implcore.stats.MutableAggregation.MutableMean; +import io.opencensus.implcore.stats.MutableAggregation.MutableSumDouble; +import io.opencensus.implcore.stats.MutableAggregation.MutableSumLong; +import io.opencensus.implcore.tags.TagContextImpl; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.Aggregation.Count; +import io.opencensus.stats.Aggregation.Distribution; +import io.opencensus.stats.Aggregation.LastValue; +import io.opencensus.stats.Aggregation.Sum; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measure.MeasureDouble; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.stats.Measurement; +import io.opencensus.stats.Measurement.MeasurementDouble; +import io.opencensus.stats.Measurement.MeasurementLong; +import io.opencensus.tags.InternalUtils; +import io.opencensus.tags.Tag; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +@SuppressWarnings("deprecation") +/* Common static utilities for stats recording. */ +final class RecordUtils { + + @javax.annotation.Nullable @VisibleForTesting static final TagValue UNKNOWN_TAG_VALUE = null; + + static Map<TagKey, TagValue> getTagMap(TagContext ctx) { + if (ctx instanceof TagContextImpl) { + return ((TagContextImpl) ctx).getTags(); + } else { + Map<TagKey, TagValue> tags = Maps.newHashMap(); + for (Iterator<Tag> i = InternalUtils.getTags(ctx); i.hasNext(); ) { + Tag tag = i.next(); + tags.put(tag.getKey(), tag.getValue()); + } + return tags; + } + } + + @VisibleForTesting + static List</*@Nullable*/ TagValue> getTagValues( + Map<? extends TagKey, ? extends TagValue> tags, List<? extends TagKey> columns) { + List</*@Nullable*/ TagValue> tagValues = new ArrayList</*@Nullable*/ TagValue>(columns.size()); + // Record all the measures in a "Greedy" way. + // Every view aggregates every measure. This is similar to doing a GROUPBY view’s keys. + for (int i = 0; i < columns.size(); ++i) { + TagKey tagKey = columns.get(i); + if (!tags.containsKey(tagKey)) { + // replace not found key values by null. + tagValues.add(UNKNOWN_TAG_VALUE); + } else { + tagValues.add(tags.get(tagKey)); + } + } + return tagValues; + } + + /** + * Create an empty {@link MutableAggregation} based on the given {@link Aggregation}. + * + * @param aggregation {@code Aggregation}. + * @return an empty {@code MutableAggregation}. + */ + @VisibleForTesting + static MutableAggregation createMutableAggregation( + Aggregation aggregation, final Measure measure) { + return aggregation.match( + new Function<Sum, MutableAggregation>() { + @Override + public MutableAggregation apply(Sum arg) { + return measure.match( + CreateMutableSumDouble.INSTANCE, + CreateMutableSumLong.INSTANCE, + Functions.<MutableAggregation>throwAssertionError()); + } + }, + CreateMutableCount.INSTANCE, + CreateMutableDistribution.INSTANCE, + new Function<LastValue, MutableAggregation>() { + @Override + public MutableAggregation apply(LastValue arg) { + return measure.match( + CreateMutableLastValueDouble.INSTANCE, + CreateMutableLastValueLong.INSTANCE, + Functions.<MutableAggregation>throwAssertionError()); + } + }, + AggregationDefaultFunction.INSTANCE); + } + + // Covert a mapping from TagValues to MutableAggregation, to a mapping from TagValues to + // AggregationData. + static <T> Map<T, AggregationData> createAggregationMap( + Map<T, MutableAggregation> tagValueAggregationMap, Measure measure) { + Map<T, AggregationData> map = Maps.newHashMap(); + for (Entry<T, MutableAggregation> entry : tagValueAggregationMap.entrySet()) { + map.put(entry.getKey(), entry.getValue().toAggregationData()); + } + return map; + } + + static double getDoubleValueFromMeasurement(Measurement measurement) { + return measurement.match( + GET_VALUE_FROM_MEASUREMENT_DOUBLE, + GET_VALUE_FROM_MEASUREMENT_LONG, + Functions.<Double>throwAssertionError()); + } + + // static inner Function classes + + private static final Function<MeasurementDouble, Double> GET_VALUE_FROM_MEASUREMENT_DOUBLE = + new Function<MeasurementDouble, Double>() { + @Override + public Double apply(MeasurementDouble arg) { + return arg.getValue(); + } + }; + + private static final Function<MeasurementLong, Double> GET_VALUE_FROM_MEASUREMENT_LONG = + new Function<MeasurementLong, Double>() { + @Override + public Double apply(MeasurementLong arg) { + // TODO: consider checking truncation here. + return (double) arg.getValue(); + } + }; + + private static final class CreateMutableSumDouble + implements Function<MeasureDouble, MutableAggregation> { + @Override + public MutableAggregation apply(MeasureDouble arg) { + return MutableSumDouble.create(); + } + + private static final CreateMutableSumDouble INSTANCE = new CreateMutableSumDouble(); + } + + private static final class CreateMutableSumLong + implements Function<MeasureLong, MutableAggregation> { + @Override + public MutableAggregation apply(MeasureLong arg) { + return MutableSumLong.create(); + } + + private static final CreateMutableSumLong INSTANCE = new CreateMutableSumLong(); + } + + private static final class CreateMutableCount implements Function<Count, MutableAggregation> { + @Override + public MutableAggregation apply(Count arg) { + return MutableCount.create(); + } + + private static final CreateMutableCount INSTANCE = new CreateMutableCount(); + } + + // TODO(songya): remove this once Mean aggregation is completely removed. Before that + // we need to continue supporting Mean, since it could still be used by users and some + // deprecated RPC views. + private static final class AggregationDefaultFunction + implements Function<Aggregation, MutableAggregation> { + @Override + public MutableAggregation apply(Aggregation arg) { + if (arg instanceof Aggregation.Mean) { + return MutableMean.create(); + } + throw new IllegalArgumentException("Unknown Aggregation."); + } + + private static final AggregationDefaultFunction INSTANCE = new AggregationDefaultFunction(); + } + + private static final class CreateMutableDistribution + implements Function<Distribution, MutableAggregation> { + @Override + public MutableAggregation apply(Distribution arg) { + return MutableDistribution.create(arg.getBucketBoundaries()); + } + + private static final CreateMutableDistribution INSTANCE = new CreateMutableDistribution(); + } + + private static final class CreateMutableLastValueDouble + implements Function<MeasureDouble, MutableAggregation> { + @Override + public MutableAggregation apply(MeasureDouble arg) { + return MutableLastValueDouble.create(); + } + + private static final CreateMutableLastValueDouble INSTANCE = new CreateMutableLastValueDouble(); + } + + private static final class CreateMutableLastValueLong + implements Function<MeasureLong, MutableAggregation> { + @Override + public MutableAggregation apply(MeasureLong arg) { + return MutableLastValueLong.create(); + } + + private static final CreateMutableLastValueLong INSTANCE = new CreateMutableLastValueLong(); + } + + private RecordUtils() {} +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java new file mode 100644 index 00000000..741b399b --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java @@ -0,0 +1,92 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import com.google.common.base.Preconditions; +import io.opencensus.common.Clock; +import io.opencensus.implcore.internal.CurrentState; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.implcore.internal.EventQueue; +import io.opencensus.metrics.Metrics; +import io.opencensus.metrics.export.MetricProducer; +import io.opencensus.stats.StatsCollectionState; +import io.opencensus.stats.StatsComponent; + +/** Base implementation of {@link StatsComponent}. */ +public class StatsComponentImplBase extends StatsComponent { + private static final State DEFAULT_STATE = State.ENABLED; + + // The State shared between the StatsComponent, StatsRecorder and ViewManager. + private final CurrentState currentState = new CurrentState(DEFAULT_STATE); + + private final ViewManagerImpl viewManager; + private final StatsRecorderImpl statsRecorder; + + /** + * Creates a new {@code StatsComponentImplBase}. + * + * @param queue the queue implementation. + * @param clock the clock to use when recording stats. + */ + public StatsComponentImplBase(EventQueue queue, Clock clock) { + StatsManager statsManager = new StatsManager(queue, clock, currentState); + this.viewManager = new ViewManagerImpl(statsManager); + this.statsRecorder = new StatsRecorderImpl(statsManager); + + // Create a new MetricProducerImpl and register it to MetricProducerManager when + // StatsComponentImplBase is initialized. + MetricProducer metricProducer = new MetricProducerImpl(statsManager); + Metrics.getExportComponent().getMetricProducerManager().add(metricProducer); + } + + @Override + public ViewManagerImpl getViewManager() { + return viewManager; + } + + @Override + public StatsRecorderImpl getStatsRecorder() { + return statsRecorder; + } + + @Override + public StatsCollectionState getState() { + return stateToStatsState(currentState.get()); + } + + @Override + @SuppressWarnings("deprecation") + public synchronized void setState(StatsCollectionState newState) { + boolean stateChanged = + currentState.set(statsStateToState(Preconditions.checkNotNull(newState, "newState"))); + if (stateChanged) { + if (newState == StatsCollectionState.DISABLED) { + viewManager.clearStats(); + } else { + viewManager.resumeStatsCollection(); + } + } + } + + private static State statsStateToState(StatsCollectionState statsCollectionState) { + return statsCollectionState == StatsCollectionState.ENABLED ? State.ENABLED : State.DISABLED; + } + + private static StatsCollectionState stateToStatsState(State state) { + return state == State.ENABLED ? StatsCollectionState.ENABLED : StatsCollectionState.DISABLED; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java new file mode 100644 index 00000000..17e99d46 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java @@ -0,0 +1,104 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.opencensus.common.Clock; +import io.opencensus.implcore.internal.CurrentState; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.implcore.internal.EventQueue; +import io.opencensus.metrics.export.Metric; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagContext; +import java.util.Collection; +import java.util.Set; +import javax.annotation.Nullable; + +/** Object that stores all views and stats. */ +final class StatsManager { + + private final EventQueue queue; + + // clock used throughout the stats implementation + private final Clock clock; + + private final CurrentState state; + private final MeasureToViewMap measureToViewMap = new MeasureToViewMap(); + + StatsManager(EventQueue queue, Clock clock, CurrentState state) { + checkNotNull(queue, "EventQueue"); + checkNotNull(clock, "Clock"); + checkNotNull(state, "state"); + this.queue = queue; + this.clock = clock; + this.state = state; + } + + void registerView(View view) { + measureToViewMap.registerView(view, clock); + } + + @Nullable + ViewData getView(View.Name viewName) { + return measureToViewMap.getView(viewName, clock, state.getInternal()); + } + + Set<View> getExportedViews() { + return measureToViewMap.getExportedViews(); + } + + void record(TagContext tags, MeasureMapInternal measurementValues) { + // TODO(songya): consider exposing No-op MeasureMap and use it when stats state is DISABLED, so + // that we don't need to create actual MeasureMapImpl. + if (state.getInternal() == State.ENABLED) { + queue.enqueue(new StatsEvent(this, tags, measurementValues)); + } + } + + Collection<Metric> getMetrics() { + return measureToViewMap.getMetrics(clock, state.getInternal()); + } + + void clearStats() { + measureToViewMap.clearStats(); + } + + void resumeStatsCollection() { + measureToViewMap.resumeStatsCollection(clock.now()); + } + + // An EventQueue entry that records the stats from one call to StatsManager.record(...). + private static final class StatsEvent implements EventQueue.Entry { + private final TagContext tags; + private final MeasureMapInternal stats; + private final StatsManager statsManager; + + StatsEvent(StatsManager statsManager, TagContext tags, MeasureMapInternal stats) { + this.statsManager = statsManager; + this.tags = tags; + this.stats = stats; + } + + @Override + public void process() { + // Add Timestamp to value after it went through the DisruptorQueue. + statsManager.measureToViewMap.record(tags, stats, statsManager.clock.now()); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java new file mode 100644 index 00000000..f9ebea41 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.opencensus.stats.StatsRecorder; + +/** Implementation of {@link StatsRecorder}. */ +public final class StatsRecorderImpl extends StatsRecorder { + private final StatsManager statsManager; + + StatsRecorderImpl(StatsManager statsManager) { + checkNotNull(statsManager, "StatsManager"); + this.statsManager = statsManager; + } + + @Override + public MeasureMapImpl newMeasureMap() { + return MeasureMapImpl.create(statsManager); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java new file mode 100644 index 00000000..20ea97f8 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java @@ -0,0 +1,56 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.stats.ViewManager; +import java.util.Set; +import javax.annotation.Nullable; + +/** Implementation of {@link ViewManager}. */ +public final class ViewManagerImpl extends ViewManager { + private final StatsManager statsManager; + + ViewManagerImpl(StatsManager statsManager) { + this.statsManager = statsManager; + } + + @Override + public void registerView(View view) { + statsManager.registerView(view); + } + + @Override + @Nullable + public ViewData getView(View.Name viewName) { + return statsManager.getView(viewName); + } + + @Override + public Set<View> getAllExportedViews() { + return statsManager.getExportedViews(); + } + + void clearStats() { + statsManager.clearStats(); + } + + void resumeStatsCollection() { + statsManager.resumeStatsCollection(); + } +} |