aboutsummaryrefslogtreecommitdiff
path: root/impl_core/src/main/java/io/opencensus/implcore/stats
diff options
context:
space:
mode:
Diffstat (limited to 'impl_core/src/main/java/io/opencensus/implcore/stats')
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java95
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java66
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java138
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java194
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java38
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java118
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java556
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java464
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java241
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java92
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java104
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java36
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java56
13 files changed, 2198 insertions, 0 deletions
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java b/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java
new file mode 100644
index 00000000..172db539
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Maps;
+import io.opencensus.common.Duration;
+import io.opencensus.common.Timestamp;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.Measure;
+import io.opencensus.tags.TagValue;
+import java.util.List;
+import java.util.Map;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+/** The bucket with aggregated {@code MeasureValue}s used for {@code IntervalViewData}. */
+final class IntervalBucket {
+
+ private static final Duration ZERO = Duration.create(0, 0);
+
+ private final Timestamp start;
+ private final Duration duration;
+ private final Aggregation aggregation;
+ private final Measure measure;
+ private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap =
+ Maps.newHashMap();
+
+ IntervalBucket(Timestamp start, Duration duration, Aggregation aggregation, Measure measure) {
+ this.start = checkNotNull(start, "Start");
+ this.duration = checkNotNull(duration, "Duration");
+ checkArgument(duration.compareTo(ZERO) > 0, "Duration must be positive");
+ this.aggregation = checkNotNull(aggregation, "Aggregation");
+ this.measure = checkNotNull(measure, "measure");
+ }
+
+ Map<List</*@Nullable*/ TagValue>, MutableAggregation> getTagValueAggregationMap() {
+ return tagValueAggregationMap;
+ }
+
+ Timestamp getStart() {
+ return start;
+ }
+
+ // Puts a new value into the internal MutableAggregations, based on the TagValues.
+ void record(
+ List</*@Nullable*/ TagValue> tagValues,
+ double value,
+ Map<String, String> attachments,
+ Timestamp timestamp) {
+ if (!tagValueAggregationMap.containsKey(tagValues)) {
+ tagValueAggregationMap.put(
+ tagValues, RecordUtils.createMutableAggregation(aggregation, measure));
+ }
+ tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp);
+ }
+
+ /*
+ * Returns how much fraction of duration has passed in this IntervalBucket. For example, if this
+ * bucket starts at 10s and has a duration of 20s, and now is 15s, then getFraction() should
+ * return (15 - 10) / 20 = 0.25.
+ *
+ * This IntervalBucket must be current, i.e. the current timestamp must be within
+ * [this.start, this.start + this.duration).
+ */
+ double getFraction(Timestamp now) {
+ Duration elapsedTime = now.subtractTimestamp(start);
+ checkArgument(
+ elapsedTime.compareTo(ZERO) >= 0 && elapsedTime.compareTo(duration) < 0,
+ "This bucket must be current.");
+ return ((double) elapsedTime.toMillis()) / duration.toMillis();
+ }
+
+ void clearStats() {
+ tagValueAggregationMap.clear();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java
new file mode 100644
index 00000000..ee51796c
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2016-17, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import io.opencensus.stats.Measure.MeasureDouble;
+import io.opencensus.stats.Measure.MeasureLong;
+import io.opencensus.stats.MeasureMap;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.unsafe.ContextUtils;
+
+/** Implementation of {@link MeasureMap}. */
+final class MeasureMapImpl extends MeasureMap {
+ private final StatsManager statsManager;
+ private final MeasureMapInternal.Builder builder = MeasureMapInternal.builder();
+
+ static MeasureMapImpl create(StatsManager statsManager) {
+ return new MeasureMapImpl(statsManager);
+ }
+
+ private MeasureMapImpl(StatsManager statsManager) {
+ this.statsManager = statsManager;
+ }
+
+ @Override
+ public MeasureMapImpl put(MeasureDouble measure, double value) {
+ builder.put(measure, value);
+ return this;
+ }
+
+ @Override
+ public MeasureMapImpl put(MeasureLong measure, long value) {
+ builder.put(measure, value);
+ return this;
+ }
+
+ @Override
+ public MeasureMap putAttachment(String key, String value) {
+ builder.putAttachment(key, value);
+ return this;
+ }
+
+ @Override
+ public void record() {
+ // Use the context key directly, to avoid depending on the tags implementation.
+ record(ContextUtils.TAG_CONTEXT_KEY.get());
+ }
+
+ @Override
+ public void record(TagContext tags) {
+ statsManager.record(tags, builder.build());
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java
new file mode 100644
index 00000000..d867b342
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2016-17, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.Measure.MeasureDouble;
+import io.opencensus.stats.Measure.MeasureLong;
+import io.opencensus.stats.Measurement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+// TODO(songya): consider combining MeasureMapImpl and this class.
+/** A map from {@link Measure}'s to measured values. */
+final class MeasureMapInternal {
+
+ /** Returns a {@link Builder} for the {@link MeasureMapInternal} class. */
+ static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Returns an {@link Iterator} over the measure/value mappings in this {@link MeasureMapInternal}.
+ * The {@code Iterator} does not support {@link Iterator#remove()}.
+ */
+ Iterator<Measurement> iterator() {
+ return new MeasureMapInternalIterator();
+ }
+
+ // Returns the contextual information associated with an example value.
+ Map<String, String> getAttachments() {
+ return attachments;
+ }
+
+ private final ArrayList<Measurement> measurements;
+ private final Map<String, String> attachments;
+
+ private MeasureMapInternal(ArrayList<Measurement> measurements, Map<String, String> attachments) {
+ this.measurements = measurements;
+ this.attachments = Collections.unmodifiableMap(new HashMap<String, String>(attachments));
+ }
+
+ /** Builder for the {@link MeasureMapInternal} class. */
+ static class Builder {
+ /**
+ * Associates the {@link MeasureDouble} with the given value. Subsequent updates to the same
+ * {@link MeasureDouble} will overwrite the previous value.
+ *
+ * @param measure the {@link MeasureDouble}
+ * @param value the value to be associated with {@code measure}
+ * @return this
+ */
+ Builder put(MeasureDouble measure, double value) {
+ measurements.add(Measurement.MeasurementDouble.create(measure, value));
+ return this;
+ }
+
+ /**
+ * Associates the {@link MeasureLong} with the given value. Subsequent updates to the same
+ * {@link MeasureLong} will overwrite the previous value.
+ *
+ * @param measure the {@link MeasureLong}
+ * @param value the value to be associated with {@code measure}
+ * @return this
+ */
+ Builder put(MeasureLong measure, long value) {
+ measurements.add(Measurement.MeasurementLong.create(measure, value));
+ return this;
+ }
+
+ Builder putAttachment(String key, String value) {
+ this.attachments.put(key, value);
+ return this;
+ }
+
+ /** Constructs a {@link MeasureMapInternal} from the current measurements. */
+ MeasureMapInternal build() {
+ // Note: this makes adding measurements quadratic but is fastest for the sizes of
+ // MeasureMapInternals that we should see. We may want to go to a strategy of sort/eliminate
+ // for larger MeasureMapInternals.
+ for (int i = measurements.size() - 1; i >= 0; i--) {
+ for (int j = i - 1; j >= 0; j--) {
+ if (measurements.get(i).getMeasure() == measurements.get(j).getMeasure()) {
+ measurements.remove(j);
+ j--;
+ }
+ }
+ }
+ return new MeasureMapInternal(measurements, attachments);
+ }
+
+ private final ArrayList<Measurement> measurements = new ArrayList<Measurement>();
+ private final Map<String, String> attachments = new HashMap<String, String>();
+
+ private Builder() {}
+ }
+
+ // Provides an unmodifiable Iterator over this instance's measurements.
+ private final class MeasureMapInternalIterator implements Iterator<Measurement> {
+ @Override
+ public boolean hasNext() {
+ return position < length;
+ }
+
+ @Override
+ public Measurement next() {
+ if (position >= measurements.size()) {
+ throw new NoSuchElementException();
+ }
+ return measurements.get(position++);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private final int length = measurements.size();
+ private int position = 0;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java
new file mode 100644
index 00000000..5da0cad8
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import io.opencensus.common.Clock;
+import io.opencensus.common.Timestamp;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.Measurement;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.TagContext;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import javax.annotation.concurrent.GuardedBy;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+/** A class that stores a singleton map from {@code MeasureName}s to {@link MutableViewData}s. */
+@SuppressWarnings("deprecation")
+final class MeasureToViewMap {
+
+ /*
+ * A synchronized singleton map that stores the one-to-many mapping from Measures
+ * to MutableViewDatas.
+ */
+ @GuardedBy("this")
+ private final Multimap<String, MutableViewData> mutableMap =
+ HashMultimap.<String, MutableViewData>create();
+
+ @GuardedBy("this")
+ private final Map<View.Name, View> registeredViews = new HashMap<View.Name, View>();
+
+ // TODO(songya): consider adding a Measure.Name class
+ @GuardedBy("this")
+ private final Map<String, Measure> registeredMeasures = Maps.newHashMap();
+
+ // Cached set of exported views. It must be set to null whenever a view is registered or
+ // unregistered.
+ @javax.annotation.Nullable private volatile Set<View> exportedViews;
+
+ /** Returns a {@link ViewData} corresponding to the given {@link View.Name}. */
+ @javax.annotation.Nullable
+ synchronized ViewData getView(View.Name viewName, Clock clock, State state) {
+ MutableViewData view = getMutableViewData(viewName);
+ return view == null ? null : view.toViewData(clock.now(), state);
+ }
+
+ Set<View> getExportedViews() {
+ Set<View> views = exportedViews;
+ if (views == null) {
+ synchronized (this) {
+ exportedViews = views = filterExportedViews(registeredViews.values());
+ }
+ }
+ return views;
+ }
+
+ // Returns the subset of the given views that should be exported
+ private static Set<View> filterExportedViews(Collection<View> allViews) {
+ Set<View> views = Sets.newHashSet();
+ for (View view : allViews) {
+ if (view.getWindow() instanceof View.AggregationWindow.Cumulative) {
+ views.add(view);
+ }
+ }
+ return Collections.unmodifiableSet(views);
+ }
+
+ /** Enable stats collection for the given {@link View}. */
+ synchronized void registerView(View view, Clock clock) {
+ exportedViews = null;
+ View existing = registeredViews.get(view.getName());
+ if (existing != null) {
+ if (existing.equals(view)) {
+ // Ignore views that are already registered.
+ return;
+ } else {
+ throw new IllegalArgumentException(
+ "A different view with the same name is already registered: " + existing);
+ }
+ }
+ Measure measure = view.getMeasure();
+ Measure registeredMeasure = registeredMeasures.get(measure.getName());
+ if (registeredMeasure != null && !registeredMeasure.equals(measure)) {
+ throw new IllegalArgumentException(
+ "A different measure with the same name is already registered: " + registeredMeasure);
+ }
+ registeredViews.put(view.getName(), view);
+ if (registeredMeasure == null) {
+ registeredMeasures.put(measure.getName(), measure);
+ }
+ Timestamp now = clock.now();
+ mutableMap.put(view.getMeasure().getName(), MutableViewData.create(view, now));
+ }
+
+ @javax.annotation.Nullable
+ private synchronized MutableViewData getMutableViewData(View.Name viewName) {
+ View view = registeredViews.get(viewName);
+ if (view == null) {
+ return null;
+ }
+ Collection<MutableViewData> views = mutableMap.get(view.getMeasure().getName());
+ for (MutableViewData viewData : views) {
+ if (viewData.getView().getName().equals(viewName)) {
+ return viewData;
+ }
+ }
+ throw new AssertionError(
+ "Internal error: Not recording stats for view: \""
+ + viewName
+ + "\" registeredViews="
+ + registeredViews
+ + ", mutableMap="
+ + mutableMap);
+ }
+
+ // Records stats with a set of tags.
+ synchronized void record(TagContext tags, MeasureMapInternal stats, Timestamp timestamp) {
+ Iterator<Measurement> iterator = stats.iterator();
+ Map<String, String> attachments = stats.getAttachments();
+ while (iterator.hasNext()) {
+ Measurement measurement = iterator.next();
+ Measure measure = measurement.getMeasure();
+ if (!measure.equals(registeredMeasures.get(measure.getName()))) {
+ // unregistered measures will be ignored.
+ continue;
+ }
+ Collection<MutableViewData> viewDataCollection = mutableMap.get(measure.getName());
+ for (MutableViewData viewData : viewDataCollection) {
+ viewData.record(
+ tags, RecordUtils.getDoubleValueFromMeasurement(measurement), timestamp, attachments);
+ }
+ }
+ }
+
+ synchronized List<Metric> getMetrics(Clock clock, State state) {
+ List<Metric> metrics = new ArrayList<Metric>();
+ Timestamp now = clock.now();
+ for (Entry<String, MutableViewData> entry : mutableMap.entries()) {
+ Metric metric = entry.getValue().toMetric(now, state);
+ if (metric != null) {
+ metrics.add(metric);
+ }
+ }
+ return metrics;
+ }
+
+ // Clear stats for all the current MutableViewData
+ synchronized void clearStats() {
+ for (Entry<String, Collection<MutableViewData>> entry : mutableMap.asMap().entrySet()) {
+ for (MutableViewData mutableViewData : entry.getValue()) {
+ mutableViewData.clearStats();
+ }
+ }
+ }
+
+ // Resume stats collection for all MutableViewData.
+ synchronized void resumeStatsCollection(Timestamp now) {
+ for (Entry<String, Collection<MutableViewData>> entry : mutableMap.asMap().entrySet()) {
+ for (MutableViewData mutableViewData : entry.getValue()) {
+ mutableViewData.resumeStatsCollection(now);
+ }
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java
new file mode 100644
index 00000000..7bf92572
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.metrics.export.MetricProducer;
+import java.util.Collection;
+import javax.annotation.concurrent.ThreadSafe;
+
+/** Implementation of {@link MetricProducer}. */
+@ThreadSafe
+final class MetricProducerImpl extends MetricProducer {
+
+ private final StatsManager statsManager;
+
+ MetricProducerImpl(StatsManager statsManager) {
+ this.statsManager = statsManager;
+ }
+
+ @Override
+ public Collection<Metric> getMetrics() {
+ return statsManager.getMetrics();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java
new file mode 100644
index 00000000..0dfb1d26
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opencensus.common.Function;
+import io.opencensus.common.Functions;
+import io.opencensus.metrics.LabelKey;
+import io.opencensus.metrics.LabelValue;
+import io.opencensus.metrics.export.MetricDescriptor;
+import io.opencensus.metrics.export.MetricDescriptor.Type;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.View;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import java.util.ArrayList;
+import java.util.List;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+@SuppressWarnings("deprecation")
+// Utils to convert Stats data models to Metric data models.
+final class MetricUtils {
+
+ @javax.annotation.Nullable
+ static MetricDescriptor viewToMetricDescriptor(View view) {
+ if (view.getWindow() instanceof View.AggregationWindow.Interval) {
+ // Only creates Metric for cumulative stats.
+ return null;
+ }
+ List<LabelKey> labelKeys = new ArrayList<LabelKey>();
+ for (TagKey tagKey : view.getColumns()) {
+ // TODO: add description
+ labelKeys.add(LabelKey.create(tagKey.getName(), ""));
+ }
+ Measure measure = view.getMeasure();
+ return MetricDescriptor.create(
+ view.getName().asString(),
+ view.getDescription(),
+ measure.getUnit(),
+ getType(measure, view.getAggregation()),
+ labelKeys);
+ }
+
+ @VisibleForTesting
+ static Type getType(Measure measure, Aggregation aggregation) {
+ return aggregation.match(
+ Functions.returnConstant(
+ measure.match(
+ TYPE_CUMULATIVE_DOUBLE_FUNCTION, // Sum Double
+ TYPE_CUMULATIVE_INT64_FUNCTION, // Sum Int64
+ TYPE_UNRECOGNIZED_FUNCTION)),
+ TYPE_CUMULATIVE_INT64_FUNCTION, // Count
+ TYPE_CUMULATIVE_DISTRIBUTION_FUNCTION, // Distribution
+ Functions.returnConstant(
+ measure.match(
+ TYPE_GAUGE_DOUBLE_FUNCTION, // LastValue Double
+ TYPE_GAUGE_INT64_FUNCTION, // LastValue Long
+ TYPE_UNRECOGNIZED_FUNCTION)),
+ AGGREGATION_TYPE_DEFAULT_FUNCTION);
+ }
+
+ static List<LabelValue> tagValuesToLabelValues(List</*@Nullable*/ TagValue> tagValues) {
+ List<LabelValue> labelValues = new ArrayList<LabelValue>();
+ for (/*@Nullable*/ TagValue tagValue : tagValues) {
+ labelValues.add(LabelValue.create(tagValue == null ? null : tagValue.asString()));
+ }
+ return labelValues;
+ }
+
+ private static final Function<Object, Type> TYPE_CUMULATIVE_DOUBLE_FUNCTION =
+ Functions.returnConstant(Type.CUMULATIVE_DOUBLE);
+
+ private static final Function<Object, Type> TYPE_CUMULATIVE_INT64_FUNCTION =
+ Functions.returnConstant(Type.CUMULATIVE_INT64);
+
+ private static final Function<Object, Type> TYPE_CUMULATIVE_DISTRIBUTION_FUNCTION =
+ Functions.returnConstant(Type.CUMULATIVE_DISTRIBUTION);
+
+ private static final Function<Object, Type> TYPE_GAUGE_DOUBLE_FUNCTION =
+ Functions.returnConstant(Type.GAUGE_DOUBLE);
+
+ private static final Function<Object, Type> TYPE_GAUGE_INT64_FUNCTION =
+ Functions.returnConstant(Type.GAUGE_INT64);
+
+ private static final Function<Object, Type> TYPE_UNRECOGNIZED_FUNCTION =
+ Functions.<Type>throwAssertionError();
+
+ private static final Function<Aggregation, Type> AGGREGATION_TYPE_DEFAULT_FUNCTION =
+ new Function<Aggregation, Type>() {
+ @Override
+ public Type apply(Aggregation arg) {
+ if (arg instanceof Aggregation.Mean) {
+ return Type.CUMULATIVE_DOUBLE; // Mean
+ }
+ throw new AssertionError();
+ }
+ };
+
+ private MetricUtils() {}
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java
new file mode 100644
index 00000000..6e2bff1c
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java
@@ -0,0 +1,556 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opencensus.common.Timestamp;
+import io.opencensus.metrics.export.Distribution;
+import io.opencensus.metrics.export.Distribution.BucketOptions;
+import io.opencensus.metrics.export.Point;
+import io.opencensus.metrics.export.Value;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.AggregationData.DistributionData;
+import io.opencensus.stats.AggregationData.DistributionData.Exemplar;
+import io.opencensus.stats.BucketBoundaries;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Mutable version of {@link Aggregation} that supports adding values. */
+abstract class MutableAggregation {
+
+ private MutableAggregation() {}
+
+ // Tolerance for double comparison.
+ private static final double TOLERANCE = 1e-6;
+
+ /**
+ * Put a new value into the MutableAggregation.
+ *
+ * @param value new value to be added to population
+ * @param attachments the contextual information on an {@link Exemplar}
+ * @param timestamp the timestamp when the value is recorded
+ */
+ abstract void add(double value, Map<String, String> attachments, Timestamp timestamp);
+
+ // TODO(songya): remove this method once interval stats is completely removed.
+ /**
+ * Combine the internal values of this MutableAggregation and value of the given
+ * MutableAggregation, with the given fraction. Then set the internal value of this
+ * MutableAggregation to the combined value.
+ *
+ * @param other the other {@code MutableAggregation}. The type of this and other {@code
+ * MutableAggregation} must match.
+ * @param fraction the fraction that the value in other {@code MutableAggregation} should
+ * contribute. Must be within [0.0, 1.0].
+ */
+ abstract void combine(MutableAggregation other, double fraction);
+
+ abstract AggregationData toAggregationData();
+
+ abstract Point toPoint(Timestamp timestamp);
+
+ /** Calculate sum of doubles on aggregated {@code MeasureValue}s. */
+ static class MutableSumDouble extends MutableAggregation {
+
+ private double sum = 0.0;
+
+ private MutableSumDouble() {}
+
+ /**
+ * Construct a {@code MutableSumDouble}.
+ *
+ * @return an empty {@code MutableSumDouble}.
+ */
+ static MutableSumDouble create() {
+ return new MutableSumDouble();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ sum += value;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableSumDouble, "MutableSumDouble expected.");
+ this.sum += fraction * ((MutableSumDouble) other).sum;
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.SumDataDouble.create(sum);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.doubleValue(sum), timestamp);
+ }
+
+ @VisibleForTesting
+ double getSum() {
+ return sum;
+ }
+ }
+
+ /** Calculate sum of longs on aggregated {@code MeasureValue}s. */
+ static final class MutableSumLong extends MutableSumDouble {
+ private MutableSumLong() {
+ super();
+ }
+
+ /**
+ * Construct a {@code MutableSumLong}.
+ *
+ * @return an empty {@code MutableSumLong}.
+ */
+ static MutableSumLong create() {
+ return new MutableSumLong();
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.SumDataLong.create(Math.round(getSum()));
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.longValue(Math.round(getSum())), timestamp);
+ }
+ }
+
+ /** Calculate count on aggregated {@code MeasureValue}s. */
+ static final class MutableCount extends MutableAggregation {
+
+ private long count = 0;
+
+ private MutableCount() {}
+
+ /**
+ * Construct a {@code MutableCount}.
+ *
+ * @return an empty {@code MutableCount}.
+ */
+ static MutableCount create() {
+ return new MutableCount();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ count++;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableCount, "MutableCount expected.");
+ this.count += Math.round(fraction * ((MutableCount) other).getCount());
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.CountData.create(count);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.longValue(count), timestamp);
+ }
+
+ /**
+ * Returns the aggregated count.
+ *
+ * @return the aggregated count.
+ */
+ long getCount() {
+ return count;
+ }
+ }
+
+ /** Calculate mean on aggregated {@code MeasureValue}s. */
+ static final class MutableMean extends MutableAggregation {
+
+ private double sum = 0.0;
+ private long count = 0;
+
+ private MutableMean() {}
+
+ /**
+ * Construct a {@code MutableMean}.
+ *
+ * @return an empty {@code MutableMean}.
+ */
+ static MutableMean create() {
+ return new MutableMean();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ count++;
+ sum += value;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableMean, "MutableMean expected.");
+ MutableMean mutableMean = (MutableMean) other;
+ this.count += Math.round(mutableMean.count * fraction);
+ this.sum += mutableMean.sum * fraction;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.MeanData.create(getMean(), count);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.doubleValue(getMean()), timestamp);
+ }
+
+ /**
+ * Returns the aggregated mean.
+ *
+ * @return the aggregated mean.
+ */
+ double getMean() {
+ return count == 0 ? 0 : sum / count;
+ }
+
+ /**
+ * Returns the aggregated count.
+ *
+ * @return the aggregated count.
+ */
+ long getCount() {
+ return count;
+ }
+
+ @VisibleForTesting
+ double getSum() {
+ return sum;
+ }
+ }
+
+ /** Calculate distribution stats on aggregated {@code MeasureValue}s. */
+ static final class MutableDistribution extends MutableAggregation {
+
+ private double sum = 0.0;
+ private double mean = 0.0;
+ private long count = 0;
+ private double sumOfSquaredDeviations = 0.0;
+
+ // Initial "impossible" values, that will get reset as soon as first value is added.
+ private double min = Double.POSITIVE_INFINITY;
+ private double max = Double.NEGATIVE_INFINITY;
+
+ private final BucketBoundaries bucketBoundaries;
+ private final long[] bucketCounts;
+
+ // If there's a histogram (i.e bucket boundaries are not empty) in this MutableDistribution,
+ // exemplars will have the same size to bucketCounts; otherwise exemplars are null.
+ // Only the newest exemplar will be kept at each index.
+ @javax.annotation.Nullable private final Exemplar[] exemplars;
+
+ private MutableDistribution(BucketBoundaries bucketBoundaries) {
+ this.bucketBoundaries = bucketBoundaries;
+ int buckets = bucketBoundaries.getBoundaries().size() + 1;
+ this.bucketCounts = new long[buckets];
+ // In the implementation, each histogram bucket can have up to one exemplar, and the exemplar
+ // array is guaranteed to be in ascending order.
+ // If there's no histogram, don't record exemplars.
+ this.exemplars = bucketBoundaries.getBoundaries().isEmpty() ? null : new Exemplar[buckets];
+ }
+
+ /**
+ * Construct a {@code MutableDistribution}.
+ *
+ * @return an empty {@code MutableDistribution}.
+ */
+ static MutableDistribution create(BucketBoundaries bucketBoundaries) {
+ checkNotNull(bucketBoundaries, "bucketBoundaries should not be null.");
+ return new MutableDistribution(bucketBoundaries);
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ sum += value;
+ count++;
+
+ /*
+ * Update the sum of squared deviations from the mean with the given value. For values
+ * x_i this is Sum[i=1..n]((x_i - mean)^2)
+ *
+ * Computed using Welfords method (see
+ * https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance, or Knuth, "The Art of
+ * Computer Programming", Vol. 2, page 323, 3rd edition)
+ */
+ double deltaFromMean = value - mean;
+ mean += deltaFromMean / count;
+ double deltaFromMean2 = value - mean;
+ sumOfSquaredDeviations += deltaFromMean * deltaFromMean2;
+
+ if (value < min) {
+ min = value;
+ }
+ if (value > max) {
+ max = value;
+ }
+
+ int bucket = 0;
+ for (; bucket < bucketBoundaries.getBoundaries().size(); bucket++) {
+ if (value < bucketBoundaries.getBoundaries().get(bucket)) {
+ break;
+ }
+ }
+ bucketCounts[bucket]++;
+
+ // No implicit recording for exemplars - if there are no attachments (contextual information),
+ // don't record exemplars.
+ if (!attachments.isEmpty() && exemplars != null) {
+ exemplars[bucket] = Exemplar.create(value, timestamp, attachments);
+ }
+ }
+
+ // We don't compute fractional MutableDistribution, it's either whole or none.
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableDistribution, "MutableDistribution expected.");
+ if (Math.abs(1.0 - fraction) > TOLERANCE) {
+ return;
+ }
+
+ MutableDistribution mutableDistribution = (MutableDistribution) other;
+ checkArgument(
+ this.bucketBoundaries.equals(mutableDistribution.bucketBoundaries),
+ "Bucket boundaries should match.");
+
+ // Algorithm for calculating the combination of sum of squared deviations:
+ // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm.
+ if (this.count + mutableDistribution.count > 0) {
+ double delta = mutableDistribution.mean - this.mean;
+ this.sumOfSquaredDeviations =
+ this.sumOfSquaredDeviations
+ + mutableDistribution.sumOfSquaredDeviations
+ + Math.pow(delta, 2)
+ * this.count
+ * mutableDistribution.count
+ / (this.count + mutableDistribution.count);
+ }
+
+ this.count += mutableDistribution.count;
+ this.sum += mutableDistribution.sum;
+ this.mean = this.sum / this.count;
+
+ if (mutableDistribution.min < this.min) {
+ this.min = mutableDistribution.min;
+ }
+ if (mutableDistribution.max > this.max) {
+ this.max = mutableDistribution.max;
+ }
+
+ long[] bucketCounts = mutableDistribution.getBucketCounts();
+ for (int i = 0; i < bucketCounts.length; i++) {
+ this.bucketCounts[i] += bucketCounts[i];
+ }
+
+ Exemplar[] otherExemplars = mutableDistribution.getExemplars();
+ if (exemplars != null && otherExemplars != null) {
+ for (int i = 0; i < otherExemplars.length; i++) {
+ Exemplar exemplar = otherExemplars[i];
+ // Assume other is always newer than this, because we combined interval buckets in time
+ // order.
+ // If there's a newer exemplar, overwrite current value.
+ if (exemplar != null) {
+ this.exemplars[i] = exemplar;
+ }
+ }
+ }
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ List<Long> boxedBucketCounts = new ArrayList<Long>();
+ for (long bucketCount : bucketCounts) {
+ boxedBucketCounts.add(bucketCount);
+ }
+ List<Exemplar> exemplarList = new ArrayList<Exemplar>();
+ if (exemplars != null) {
+ for (Exemplar exemplar : exemplars) {
+ if (exemplar != null) {
+ exemplarList.add(exemplar);
+ }
+ }
+ }
+ return DistributionData.create(
+ mean, count, min, max, sumOfSquaredDeviations, boxedBucketCounts, exemplarList);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ List<Distribution.Bucket> buckets = new ArrayList<Distribution.Bucket>();
+ for (int bucket = 0; bucket < bucketCounts.length; bucket++) {
+ long bucketCount = bucketCounts[bucket];
+ @javax.annotation.Nullable AggregationData.DistributionData.Exemplar exemplar = null;
+ if (exemplars != null) {
+ exemplar = exemplars[bucket];
+ }
+
+ Distribution.Bucket metricBucket;
+ if (exemplar != null) {
+ // Bucket with an Exemplar.
+ metricBucket =
+ Distribution.Bucket.create(
+ bucketCount,
+ Distribution.Exemplar.create(
+ exemplar.getValue(), exemplar.getTimestamp(), exemplar.getAttachments()));
+ } else {
+ // Bucket with no Exemplar.
+ metricBucket = Distribution.Bucket.create(bucketCount);
+ }
+ buckets.add(metricBucket);
+ }
+
+ // TODO(mayurkale): Drop the first bucket when converting to metrics.
+ // Reason: In Stats API, bucket bounds begin with -infinity (first bucket is (-infinity, 0)).
+ BucketOptions bucketOptions = BucketOptions.explicitOptions(bucketBoundaries.getBoundaries());
+
+ return Point.create(
+ Value.distributionValue(
+ Distribution.create(
+ count, mean * count, sumOfSquaredDeviations, bucketOptions, buckets)),
+ timestamp);
+ }
+
+ double getMean() {
+ return mean;
+ }
+
+ long getCount() {
+ return count;
+ }
+
+ double getMin() {
+ return min;
+ }
+
+ double getMax() {
+ return max;
+ }
+
+ // Returns the aggregated sum of squared deviations.
+ double getSumOfSquaredDeviations() {
+ return sumOfSquaredDeviations;
+ }
+
+ long[] getBucketCounts() {
+ return bucketCounts;
+ }
+
+ BucketBoundaries getBucketBoundaries() {
+ return bucketBoundaries;
+ }
+
+ @javax.annotation.Nullable
+ Exemplar[] getExemplars() {
+ return exemplars;
+ }
+ }
+
+ /** Calculate double last value on aggregated {@code MeasureValue}s. */
+ static class MutableLastValueDouble extends MutableAggregation {
+
+ // Initial value that will get reset as soon as first value is added.
+ private double lastValue = Double.NaN;
+ // TODO(songya): remove this once interval stats is completely removed.
+ private boolean initialized = false;
+
+ private MutableLastValueDouble() {}
+
+ /**
+ * Construct a {@code MutableLastValueDouble}.
+ *
+ * @return an empty {@code MutableLastValueDouble}.
+ */
+ static MutableLastValueDouble create() {
+ return new MutableLastValueDouble();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ lastValue = value;
+ // TODO(songya): remove this once interval stats is completely removed.
+ if (!initialized) {
+ initialized = true;
+ }
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableLastValueDouble, "MutableLastValueDouble expected.");
+ MutableLastValueDouble otherValue = (MutableLastValueDouble) other;
+ // Assume other is always newer than this, because we combined interval buckets in time order.
+ // If there's a newer value, overwrite current value.
+ this.lastValue = otherValue.initialized ? otherValue.getLastValue() : this.lastValue;
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.LastValueDataDouble.create(lastValue);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.doubleValue(lastValue), timestamp);
+ }
+
+ @VisibleForTesting
+ double getLastValue() {
+ return lastValue;
+ }
+ }
+
+ /** Calculate last long value on aggregated {@code MeasureValue}s. */
+ static final class MutableLastValueLong extends MutableLastValueDouble {
+ private MutableLastValueLong() {
+ super();
+ }
+
+ /**
+ * Construct a {@code MutableLastValueLong}.
+ *
+ * @return an empty {@code MutableLastValueLong}.
+ */
+ static MutableLastValueLong create() {
+ return new MutableLastValueLong();
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.LastValueDataLong.create(Math.round(getLastValue()));
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.longValue(Math.round(getLastValue())), timestamp);
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java
new file mode 100644
index 00000000..928675e9
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java
@@ -0,0 +1,464 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.opencensus.implcore.stats.RecordUtils.createAggregationMap;
+import static io.opencensus.implcore.stats.RecordUtils.createMutableAggregation;
+import static io.opencensus.implcore.stats.RecordUtils.getTagMap;
+import static io.opencensus.implcore.stats.RecordUtils.getTagValues;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import io.opencensus.common.Duration;
+import io.opencensus.common.Function;
+import io.opencensus.common.Functions;
+import io.opencensus.common.Timestamp;
+import io.opencensus.implcore.internal.CheckerFrameworkUtils;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.metrics.LabelValue;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.metrics.export.MetricDescriptor;
+import io.opencensus.metrics.export.MetricDescriptor.Type;
+import io.opencensus.metrics.export.Point;
+import io.opencensus.metrics.export.TimeSeries;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagValue;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+/** A mutable version of {@link ViewData}, used for recording stats and start/end time. */
+@SuppressWarnings("deprecation")
+abstract class MutableViewData {
+
+ @VisibleForTesting static final Timestamp ZERO_TIMESTAMP = Timestamp.create(0, 0);
+
+ private final View view;
+
+ private MutableViewData(View view) {
+ this.view = view;
+ }
+
+ /**
+ * Constructs a new {@link MutableViewData}.
+ *
+ * @param view the {@code View} linked with this {@code MutableViewData}.
+ * @param start the start {@code Timestamp}.
+ * @return a {@code MutableViewData}.
+ */
+ static MutableViewData create(final View view, final Timestamp start) {
+ return view.getWindow()
+ .match(
+ new CreateCumulative(view, start),
+ new CreateInterval(view, start),
+ Functions.<MutableViewData>throwAssertionError());
+ }
+
+ /** The {@link View} associated with this {@link ViewData}. */
+ View getView() {
+ return view;
+ }
+
+ @javax.annotation.Nullable
+ abstract Metric toMetric(Timestamp now, State state);
+
+ /** Record stats with the given tags. */
+ abstract void record(
+ TagContext context, double value, Timestamp timestamp, Map<String, String> attachments);
+
+ /** Convert this {@link MutableViewData} to {@link ViewData}. */
+ abstract ViewData toViewData(Timestamp now, State state);
+
+ // Clear recorded stats.
+ abstract void clearStats();
+
+ // Resume stats collection, and reset Start Timestamp (for CumulativeMutableViewData), or refresh
+ // bucket list (for InternalMutableViewData).
+ abstract void resumeStatsCollection(Timestamp now);
+
+ private static final class CumulativeMutableViewData extends MutableViewData {
+
+ private Timestamp start;
+ private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap =
+ Maps.newHashMap();
+ // Cache a MetricDescriptor to avoid converting View to MetricDescriptor in the future.
+ private final MetricDescriptor metricDescriptor;
+
+ private CumulativeMutableViewData(View view, Timestamp start) {
+ super(view);
+ this.start = start;
+ MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view);
+ if (metricDescriptor == null) {
+ throw new AssertionError(
+ "Cumulative view should be converted to a non-null MetricDescriptor.");
+ } else {
+ this.metricDescriptor = metricDescriptor;
+ }
+ }
+
+ @javax.annotation.Nullable
+ @Override
+ Metric toMetric(Timestamp now, State state) {
+ if (state == State.DISABLED) {
+ return null;
+ }
+ Type type = metricDescriptor.getType();
+ @javax.annotation.Nullable
+ Timestamp startTime = type == Type.GAUGE_INT64 || type == Type.GAUGE_DOUBLE ? null : start;
+ List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>();
+ for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry :
+ tagValueAggregationMap.entrySet()) {
+ List<LabelValue> labelValues = MetricUtils.tagValuesToLabelValues(entry.getKey());
+ Point point = entry.getValue().toPoint(now);
+ timeSeriesList.add(TimeSeries.createWithOnePoint(labelValues, point, startTime));
+ }
+ return Metric.create(metricDescriptor, timeSeriesList);
+ }
+
+ @Override
+ void record(
+ TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) {
+ List</*@Nullable*/ TagValue> tagValues =
+ getTagValues(getTagMap(context), super.view.getColumns());
+ if (!tagValueAggregationMap.containsKey(tagValues)) {
+ tagValueAggregationMap.put(
+ tagValues,
+ createMutableAggregation(super.view.getAggregation(), super.getView().getMeasure()));
+ }
+ tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp);
+ }
+
+ @Override
+ ViewData toViewData(Timestamp now, State state) {
+ if (state == State.ENABLED) {
+ return ViewData.create(
+ super.view,
+ createAggregationMap(tagValueAggregationMap, super.view.getMeasure()),
+ ViewData.AggregationWindowData.CumulativeData.create(start, now));
+ } else {
+ // If Stats state is DISABLED, return an empty ViewData.
+ return ViewData.create(
+ super.view,
+ Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(),
+ ViewData.AggregationWindowData.CumulativeData.create(ZERO_TIMESTAMP, ZERO_TIMESTAMP));
+ }
+ }
+
+ @Override
+ void clearStats() {
+ tagValueAggregationMap.clear();
+ }
+
+ @Override
+ void resumeStatsCollection(Timestamp now) {
+ start = now;
+ }
+ }
+
+ /*
+ * For each IntervalView, we always keep a queue of N + 1 buckets (by default N is 4).
+ * Each bucket has a duration which is interval duration / N.
+ * Ideally:
+ * 1. the buckets should always be up-to-date,
+ * 2. current time should always be within the latest bucket, currently recorded stats should fall
+ * into the latest bucket,
+ * 3. there are always N buckets before the current one, which holds the stats in the past
+ * interval duration.
+ *
+ * When getView() is called, we will extract and combine the stats from the current and past
+ * buckets (part of the stats from the oldest bucket could have expired).
+ *
+ * However, in reality, we couldn't track the status of buckets all the time (keep monitoring and
+ * updating the bucket queue will be expensive). When we call record() or getView(), some or all
+ * of the buckets might be outdated, and we will need to "pad" new buckets to the queue and remove
+ * outdated ones. After refreshing buckets, the bucket queue will able to maintain the three
+ * invariants in the ideal situation.
+ *
+ * For example:
+ * 1. We have an IntervalView which has a duration of 8 seconds, we register this view at 10s.
+ * 2. Initially there will be 5 buckets: [2.0, 4.0), [4.0, 6.0), ..., [10.0, 12.0).
+ * 3. If users don't call record() or getView(), bucket queue will remain as it is, and some
+ * buckets could expire.
+ * 4. Suppose record() is called at 15s, now we need to refresh the bucket queue. We need to add
+ * two new buckets [12.0, 14.0) and [14.0, 16.0), and remove two expired buckets [2.0, 4.0)
+ * and [4.0, 6.0)
+ * 5. Suppose record() is called again at 30s, all the current buckets should have expired. We add
+ * 5 new buckets [22.0, 24.0) ... [30.0, 32.0) and remove all the previous buckets.
+ * 6. Suppose users call getView() at 35s, again we need to add two new buckets and remove two
+ * expired one, so that bucket queue is up-to-date. Now we combine stats from all buckets and
+ * return the combined IntervalViewData.
+ */
+ private static final class IntervalMutableViewData extends MutableViewData {
+
+ // TODO(songya): allow customizable bucket size in the future.
+ private static final int N = 4; // IntervalView has N + 1 buckets
+
+ private final ArrayDeque<IntervalBucket> buckets = new ArrayDeque<IntervalBucket>();
+
+ private final Duration totalDuration; // Duration of the whole interval.
+ private final Duration bucketDuration; // Duration of a single bucket (totalDuration / N)
+
+ private IntervalMutableViewData(View view, Timestamp start) {
+ super(view);
+ Duration totalDuration = ((View.AggregationWindow.Interval) view.getWindow()).getDuration();
+ this.totalDuration = totalDuration;
+ this.bucketDuration = Duration.fromMillis(totalDuration.toMillis() / N);
+
+ // When initializing. add N empty buckets prior to the start timestamp of this
+ // IntervalMutableViewData, so that the last bucket will be the current one in effect.
+ shiftBucketList(N + 1, start);
+ }
+
+ @javax.annotation.Nullable
+ @Override
+ Metric toMetric(Timestamp now, State state) {
+ return null;
+ }
+
+ @Override
+ void record(
+ TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) {
+ List</*@Nullable*/ TagValue> tagValues =
+ getTagValues(getTagMap(context), super.view.getColumns());
+ refreshBucketList(timestamp);
+ // It is always the last bucket that does the recording.
+ CheckerFrameworkUtils.castNonNull(buckets.peekLast())
+ .record(tagValues, value, attachments, timestamp);
+ }
+
+ @Override
+ ViewData toViewData(Timestamp now, State state) {
+ refreshBucketList(now);
+ if (state == State.ENABLED) {
+ return ViewData.create(
+ super.view,
+ combineBucketsAndGetAggregationMap(now),
+ ViewData.AggregationWindowData.IntervalData.create(now));
+ } else {
+ // If Stats state is DISABLED, return an empty ViewData.
+ return ViewData.create(
+ super.view,
+ Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(),
+ ViewData.AggregationWindowData.IntervalData.create(ZERO_TIMESTAMP));
+ }
+ }
+
+ @Override
+ void clearStats() {
+ for (IntervalBucket bucket : buckets) {
+ bucket.clearStats();
+ }
+ }
+
+ @Override
+ void resumeStatsCollection(Timestamp now) {
+ // Refresh bucket list to be ready for stats recording, so that if record() is called right
+ // after stats state is turned back on, record() will be faster.
+ refreshBucketList(now);
+ }
+
+ // Add new buckets and remove expired buckets by comparing the current timestamp with
+ // timestamp of the last bucket.
+ private void refreshBucketList(Timestamp now) {
+ if (buckets.size() != N + 1) {
+ throw new AssertionError("Bucket list must have exactly " + (N + 1) + " buckets.");
+ }
+ Timestamp startOfLastBucket =
+ CheckerFrameworkUtils.castNonNull(buckets.peekLast()).getStart();
+ // TODO(songya): decide what to do when time goes backwards
+ checkArgument(
+ now.compareTo(startOfLastBucket) >= 0,
+ "Current time must be within or after the last bucket.");
+ long elapsedTimeMillis = now.subtractTimestamp(startOfLastBucket).toMillis();
+ long numOfPadBuckets = elapsedTimeMillis / bucketDuration.toMillis();
+
+ shiftBucketList(numOfPadBuckets, now);
+ }
+
+ // Add specified number of new buckets, and remove expired buckets
+ private void shiftBucketList(long numOfPadBuckets, Timestamp now) {
+ Timestamp startOfNewBucket;
+
+ if (!buckets.isEmpty()) {
+ startOfNewBucket =
+ CheckerFrameworkUtils.castNonNull(buckets.peekLast())
+ .getStart()
+ .addDuration(bucketDuration);
+ } else {
+ // Initialize bucket list. Should only enter this block once.
+ startOfNewBucket = subtractDuration(now, totalDuration);
+ }
+
+ if (numOfPadBuckets > N + 1) {
+ // All current buckets expired, need to add N + 1 new buckets. The start time of the latest
+ // bucket will be current time.
+ startOfNewBucket = subtractDuration(now, totalDuration);
+ numOfPadBuckets = N + 1;
+ }
+
+ for (int i = 0; i < numOfPadBuckets; i++) {
+ buckets.add(
+ new IntervalBucket(
+ startOfNewBucket,
+ bucketDuration,
+ super.view.getAggregation(),
+ super.view.getMeasure()));
+ startOfNewBucket = startOfNewBucket.addDuration(bucketDuration);
+ }
+
+ // removed expired buckets
+ while (buckets.size() > N + 1) {
+ buckets.pollFirst();
+ }
+ }
+
+ // Combine stats within each bucket, aggregate stats by tag values, and return the mapping from
+ // tag values to aggregation data.
+ private Map<List</*@Nullable*/ TagValue>, AggregationData> combineBucketsAndGetAggregationMap(
+ Timestamp now) {
+ // Need to maintain the order of inserted MutableAggregations (inserted based on time order).
+ Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap =
+ LinkedHashMultimap.create();
+
+ ArrayDeque<IntervalBucket> shallowCopy = new ArrayDeque<IntervalBucket>(buckets);
+
+ Aggregation aggregation = super.view.getAggregation();
+ Measure measure = super.view.getMeasure();
+ putBucketsIntoMultiMap(shallowCopy, multimap, aggregation, measure, now);
+ Map<List</*@Nullable*/ TagValue>, MutableAggregation> singleMap =
+ aggregateOnEachTagValueList(multimap, aggregation, measure);
+ return createAggregationMap(singleMap, super.getView().getMeasure());
+ }
+
+ // Put stats within each bucket to a multimap. Each tag value list (map key) could have multiple
+ // mutable aggregations (map value) from different buckets.
+ private static void putBucketsIntoMultiMap(
+ ArrayDeque<IntervalBucket> buckets,
+ Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap,
+ Aggregation aggregation,
+ Measure measure,
+ Timestamp now) {
+ // Put fractional stats of the head (oldest) bucket.
+ IntervalBucket head = CheckerFrameworkUtils.castNonNull(buckets.peekFirst());
+ IntervalBucket tail = CheckerFrameworkUtils.castNonNull(buckets.peekLast());
+ double fractionTail = tail.getFraction(now);
+ // TODO(songya): decide what to do when time goes backwards
+ checkArgument(
+ 0.0 <= fractionTail && fractionTail <= 1.0,
+ "Fraction " + fractionTail + " should be within [0.0, 1.0].");
+ double fractionHead = 1.0 - fractionTail;
+ putFractionalMutableAggregationsToMultiMap(
+ head.getTagValueAggregationMap(), multimap, aggregation, measure, fractionHead);
+
+ // Put whole data of other buckets.
+ boolean shouldSkipFirst = true;
+ for (IntervalBucket bucket : buckets) {
+ if (shouldSkipFirst) {
+ shouldSkipFirst = false;
+ continue; // skip the first bucket
+ }
+ for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry :
+ bucket.getTagValueAggregationMap().entrySet()) {
+ multimap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ // Put stats within one bucket into multimap, multiplied by a given fraction.
+ private static <T> void putFractionalMutableAggregationsToMultiMap(
+ Map<T, MutableAggregation> mutableAggrMap,
+ Multimap<T, MutableAggregation> multimap,
+ Aggregation aggregation,
+ Measure measure,
+ double fraction) {
+ for (Entry<T, MutableAggregation> entry : mutableAggrMap.entrySet()) {
+ // Initially empty MutableAggregations.
+ MutableAggregation fractionalMutableAgg = createMutableAggregation(aggregation, measure);
+ fractionalMutableAgg.combine(entry.getValue(), fraction);
+ multimap.put(entry.getKey(), fractionalMutableAgg);
+ }
+ }
+
+ // For each tag value list (key of AggregationMap), combine mutable aggregations into one
+ // mutable aggregation, thus convert the multimap into a single map.
+ private static <T> Map<T, MutableAggregation> aggregateOnEachTagValueList(
+ Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure) {
+ Map<T, MutableAggregation> map = Maps.newHashMap();
+ for (T tagValues : multimap.keySet()) {
+ // Initially empty MutableAggregations.
+ MutableAggregation combinedAggregation = createMutableAggregation(aggregation, measure);
+ for (MutableAggregation mutableAggregation : multimap.get(tagValues)) {
+ combinedAggregation.combine(mutableAggregation, 1.0);
+ }
+ map.put(tagValues, combinedAggregation);
+ }
+ return map;
+ }
+
+ // Subtract a Duration from a Timestamp, and return a new Timestamp.
+ private static Timestamp subtractDuration(Timestamp timestamp, Duration duration) {
+ return timestamp.addDuration(Duration.create(-duration.getSeconds(), -duration.getNanos()));
+ }
+ }
+
+ private static final class CreateCumulative
+ implements Function<View.AggregationWindow.Cumulative, MutableViewData> {
+ @Override
+ public MutableViewData apply(View.AggregationWindow.Cumulative arg) {
+ return new CumulativeMutableViewData(view, start);
+ }
+
+ private final View view;
+ private final Timestamp start;
+
+ private CreateCumulative(View view, Timestamp start) {
+ this.view = view;
+ this.start = start;
+ }
+ }
+
+ private static final class CreateInterval
+ implements Function<View.AggregationWindow.Interval, MutableViewData> {
+ @Override
+ public MutableViewData apply(View.AggregationWindow.Interval arg) {
+ return new IntervalMutableViewData(view, start);
+ }
+
+ private final View view;
+ private final Timestamp start;
+
+ private CreateInterval(View view, Timestamp start) {
+ this.view = view;
+ this.start = start;
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java b/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java
new file mode 100644
index 00000000..fbb593f5
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import io.opencensus.common.Function;
+import io.opencensus.common.Functions;
+import io.opencensus.implcore.stats.MutableAggregation.MutableCount;
+import io.opencensus.implcore.stats.MutableAggregation.MutableDistribution;
+import io.opencensus.implcore.stats.MutableAggregation.MutableLastValueDouble;
+import io.opencensus.implcore.stats.MutableAggregation.MutableLastValueLong;
+import io.opencensus.implcore.stats.MutableAggregation.MutableMean;
+import io.opencensus.implcore.stats.MutableAggregation.MutableSumDouble;
+import io.opencensus.implcore.stats.MutableAggregation.MutableSumLong;
+import io.opencensus.implcore.tags.TagContextImpl;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.Aggregation.Count;
+import io.opencensus.stats.Aggregation.Distribution;
+import io.opencensus.stats.Aggregation.LastValue;
+import io.opencensus.stats.Aggregation.Sum;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.Measure.MeasureDouble;
+import io.opencensus.stats.Measure.MeasureLong;
+import io.opencensus.stats.Measurement;
+import io.opencensus.stats.Measurement.MeasurementDouble;
+import io.opencensus.stats.Measurement.MeasurementLong;
+import io.opencensus.tags.InternalUtils;
+import io.opencensus.tags.Tag;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+@SuppressWarnings("deprecation")
+/* Common static utilities for stats recording. */
+final class RecordUtils {
+
+ @javax.annotation.Nullable @VisibleForTesting static final TagValue UNKNOWN_TAG_VALUE = null;
+
+ static Map<TagKey, TagValue> getTagMap(TagContext ctx) {
+ if (ctx instanceof TagContextImpl) {
+ return ((TagContextImpl) ctx).getTags();
+ } else {
+ Map<TagKey, TagValue> tags = Maps.newHashMap();
+ for (Iterator<Tag> i = InternalUtils.getTags(ctx); i.hasNext(); ) {
+ Tag tag = i.next();
+ tags.put(tag.getKey(), tag.getValue());
+ }
+ return tags;
+ }
+ }
+
+ @VisibleForTesting
+ static List</*@Nullable*/ TagValue> getTagValues(
+ Map<? extends TagKey, ? extends TagValue> tags, List<? extends TagKey> columns) {
+ List</*@Nullable*/ TagValue> tagValues = new ArrayList</*@Nullable*/ TagValue>(columns.size());
+ // Record all the measures in a "Greedy" way.
+ // Every view aggregates every measure. This is similar to doing a GROUPBY view’s keys.
+ for (int i = 0; i < columns.size(); ++i) {
+ TagKey tagKey = columns.get(i);
+ if (!tags.containsKey(tagKey)) {
+ // replace not found key values by null.
+ tagValues.add(UNKNOWN_TAG_VALUE);
+ } else {
+ tagValues.add(tags.get(tagKey));
+ }
+ }
+ return tagValues;
+ }
+
+ /**
+ * Create an empty {@link MutableAggregation} based on the given {@link Aggregation}.
+ *
+ * @param aggregation {@code Aggregation}.
+ * @return an empty {@code MutableAggregation}.
+ */
+ @VisibleForTesting
+ static MutableAggregation createMutableAggregation(
+ Aggregation aggregation, final Measure measure) {
+ return aggregation.match(
+ new Function<Sum, MutableAggregation>() {
+ @Override
+ public MutableAggregation apply(Sum arg) {
+ return measure.match(
+ CreateMutableSumDouble.INSTANCE,
+ CreateMutableSumLong.INSTANCE,
+ Functions.<MutableAggregation>throwAssertionError());
+ }
+ },
+ CreateMutableCount.INSTANCE,
+ CreateMutableDistribution.INSTANCE,
+ new Function<LastValue, MutableAggregation>() {
+ @Override
+ public MutableAggregation apply(LastValue arg) {
+ return measure.match(
+ CreateMutableLastValueDouble.INSTANCE,
+ CreateMutableLastValueLong.INSTANCE,
+ Functions.<MutableAggregation>throwAssertionError());
+ }
+ },
+ AggregationDefaultFunction.INSTANCE);
+ }
+
+ // Covert a mapping from TagValues to MutableAggregation, to a mapping from TagValues to
+ // AggregationData.
+ static <T> Map<T, AggregationData> createAggregationMap(
+ Map<T, MutableAggregation> tagValueAggregationMap, Measure measure) {
+ Map<T, AggregationData> map = Maps.newHashMap();
+ for (Entry<T, MutableAggregation> entry : tagValueAggregationMap.entrySet()) {
+ map.put(entry.getKey(), entry.getValue().toAggregationData());
+ }
+ return map;
+ }
+
+ static double getDoubleValueFromMeasurement(Measurement measurement) {
+ return measurement.match(
+ GET_VALUE_FROM_MEASUREMENT_DOUBLE,
+ GET_VALUE_FROM_MEASUREMENT_LONG,
+ Functions.<Double>throwAssertionError());
+ }
+
+ // static inner Function classes
+
+ private static final Function<MeasurementDouble, Double> GET_VALUE_FROM_MEASUREMENT_DOUBLE =
+ new Function<MeasurementDouble, Double>() {
+ @Override
+ public Double apply(MeasurementDouble arg) {
+ return arg.getValue();
+ }
+ };
+
+ private static final Function<MeasurementLong, Double> GET_VALUE_FROM_MEASUREMENT_LONG =
+ new Function<MeasurementLong, Double>() {
+ @Override
+ public Double apply(MeasurementLong arg) {
+ // TODO: consider checking truncation here.
+ return (double) arg.getValue();
+ }
+ };
+
+ private static final class CreateMutableSumDouble
+ implements Function<MeasureDouble, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(MeasureDouble arg) {
+ return MutableSumDouble.create();
+ }
+
+ private static final CreateMutableSumDouble INSTANCE = new CreateMutableSumDouble();
+ }
+
+ private static final class CreateMutableSumLong
+ implements Function<MeasureLong, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(MeasureLong arg) {
+ return MutableSumLong.create();
+ }
+
+ private static final CreateMutableSumLong INSTANCE = new CreateMutableSumLong();
+ }
+
+ private static final class CreateMutableCount implements Function<Count, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(Count arg) {
+ return MutableCount.create();
+ }
+
+ private static final CreateMutableCount INSTANCE = new CreateMutableCount();
+ }
+
+ // TODO(songya): remove this once Mean aggregation is completely removed. Before that
+ // we need to continue supporting Mean, since it could still be used by users and some
+ // deprecated RPC views.
+ private static final class AggregationDefaultFunction
+ implements Function<Aggregation, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(Aggregation arg) {
+ if (arg instanceof Aggregation.Mean) {
+ return MutableMean.create();
+ }
+ throw new IllegalArgumentException("Unknown Aggregation.");
+ }
+
+ private static final AggregationDefaultFunction INSTANCE = new AggregationDefaultFunction();
+ }
+
+ private static final class CreateMutableDistribution
+ implements Function<Distribution, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(Distribution arg) {
+ return MutableDistribution.create(arg.getBucketBoundaries());
+ }
+
+ private static final CreateMutableDistribution INSTANCE = new CreateMutableDistribution();
+ }
+
+ private static final class CreateMutableLastValueDouble
+ implements Function<MeasureDouble, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(MeasureDouble arg) {
+ return MutableLastValueDouble.create();
+ }
+
+ private static final CreateMutableLastValueDouble INSTANCE = new CreateMutableLastValueDouble();
+ }
+
+ private static final class CreateMutableLastValueLong
+ implements Function<MeasureLong, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(MeasureLong arg) {
+ return MutableLastValueLong.create();
+ }
+
+ private static final CreateMutableLastValueLong INSTANCE = new CreateMutableLastValueLong();
+ }
+
+ private RecordUtils() {}
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java
new file mode 100644
index 00000000..741b399b
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import com.google.common.base.Preconditions;
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.internal.CurrentState;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.implcore.internal.EventQueue;
+import io.opencensus.metrics.Metrics;
+import io.opencensus.metrics.export.MetricProducer;
+import io.opencensus.stats.StatsCollectionState;
+import io.opencensus.stats.StatsComponent;
+
+/** Base implementation of {@link StatsComponent}. */
+public class StatsComponentImplBase extends StatsComponent {
+ private static final State DEFAULT_STATE = State.ENABLED;
+
+ // The State shared between the StatsComponent, StatsRecorder and ViewManager.
+ private final CurrentState currentState = new CurrentState(DEFAULT_STATE);
+
+ private final ViewManagerImpl viewManager;
+ private final StatsRecorderImpl statsRecorder;
+
+ /**
+ * Creates a new {@code StatsComponentImplBase}.
+ *
+ * @param queue the queue implementation.
+ * @param clock the clock to use when recording stats.
+ */
+ public StatsComponentImplBase(EventQueue queue, Clock clock) {
+ StatsManager statsManager = new StatsManager(queue, clock, currentState);
+ this.viewManager = new ViewManagerImpl(statsManager);
+ this.statsRecorder = new StatsRecorderImpl(statsManager);
+
+ // Create a new MetricProducerImpl and register it to MetricProducerManager when
+ // StatsComponentImplBase is initialized.
+ MetricProducer metricProducer = new MetricProducerImpl(statsManager);
+ Metrics.getExportComponent().getMetricProducerManager().add(metricProducer);
+ }
+
+ @Override
+ public ViewManagerImpl getViewManager() {
+ return viewManager;
+ }
+
+ @Override
+ public StatsRecorderImpl getStatsRecorder() {
+ return statsRecorder;
+ }
+
+ @Override
+ public StatsCollectionState getState() {
+ return stateToStatsState(currentState.get());
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public synchronized void setState(StatsCollectionState newState) {
+ boolean stateChanged =
+ currentState.set(statsStateToState(Preconditions.checkNotNull(newState, "newState")));
+ if (stateChanged) {
+ if (newState == StatsCollectionState.DISABLED) {
+ viewManager.clearStats();
+ } else {
+ viewManager.resumeStatsCollection();
+ }
+ }
+ }
+
+ private static State statsStateToState(StatsCollectionState statsCollectionState) {
+ return statsCollectionState == StatsCollectionState.ENABLED ? State.ENABLED : State.DISABLED;
+ }
+
+ private static StatsCollectionState stateToStatsState(State state) {
+ return state == State.ENABLED ? StatsCollectionState.ENABLED : StatsCollectionState.DISABLED;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java
new file mode 100644
index 00000000..17e99d46
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.internal.CurrentState;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.implcore.internal.EventQueue;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.TagContext;
+import java.util.Collection;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/** Object that stores all views and stats. */
+final class StatsManager {
+
+ private final EventQueue queue;
+
+ // clock used throughout the stats implementation
+ private final Clock clock;
+
+ private final CurrentState state;
+ private final MeasureToViewMap measureToViewMap = new MeasureToViewMap();
+
+ StatsManager(EventQueue queue, Clock clock, CurrentState state) {
+ checkNotNull(queue, "EventQueue");
+ checkNotNull(clock, "Clock");
+ checkNotNull(state, "state");
+ this.queue = queue;
+ this.clock = clock;
+ this.state = state;
+ }
+
+ void registerView(View view) {
+ measureToViewMap.registerView(view, clock);
+ }
+
+ @Nullable
+ ViewData getView(View.Name viewName) {
+ return measureToViewMap.getView(viewName, clock, state.getInternal());
+ }
+
+ Set<View> getExportedViews() {
+ return measureToViewMap.getExportedViews();
+ }
+
+ void record(TagContext tags, MeasureMapInternal measurementValues) {
+ // TODO(songya): consider exposing No-op MeasureMap and use it when stats state is DISABLED, so
+ // that we don't need to create actual MeasureMapImpl.
+ if (state.getInternal() == State.ENABLED) {
+ queue.enqueue(new StatsEvent(this, tags, measurementValues));
+ }
+ }
+
+ Collection<Metric> getMetrics() {
+ return measureToViewMap.getMetrics(clock, state.getInternal());
+ }
+
+ void clearStats() {
+ measureToViewMap.clearStats();
+ }
+
+ void resumeStatsCollection() {
+ measureToViewMap.resumeStatsCollection(clock.now());
+ }
+
+ // An EventQueue entry that records the stats from one call to StatsManager.record(...).
+ private static final class StatsEvent implements EventQueue.Entry {
+ private final TagContext tags;
+ private final MeasureMapInternal stats;
+ private final StatsManager statsManager;
+
+ StatsEvent(StatsManager statsManager, TagContext tags, MeasureMapInternal stats) {
+ this.statsManager = statsManager;
+ this.tags = tags;
+ this.stats = stats;
+ }
+
+ @Override
+ public void process() {
+ // Add Timestamp to value after it went through the DisruptorQueue.
+ statsManager.measureToViewMap.record(tags, stats, statsManager.clock.now());
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java
new file mode 100644
index 00000000..f9ebea41
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.stats.StatsRecorder;
+
+/** Implementation of {@link StatsRecorder}. */
+public final class StatsRecorderImpl extends StatsRecorder {
+ private final StatsManager statsManager;
+
+ StatsRecorderImpl(StatsManager statsManager) {
+ checkNotNull(statsManager, "StatsManager");
+ this.statsManager = statsManager;
+ }
+
+ @Override
+ public MeasureMapImpl newMeasureMap() {
+ return MeasureMapImpl.create(statsManager);
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java
new file mode 100644
index 00000000..20ea97f8
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.stats.ViewManager;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/** Implementation of {@link ViewManager}. */
+public final class ViewManagerImpl extends ViewManager {
+ private final StatsManager statsManager;
+
+ ViewManagerImpl(StatsManager statsManager) {
+ this.statsManager = statsManager;
+ }
+
+ @Override
+ public void registerView(View view) {
+ statsManager.registerView(view);
+ }
+
+ @Override
+ @Nullable
+ public ViewData getView(View.Name viewName) {
+ return statsManager.getView(viewName);
+ }
+
+ @Override
+ public Set<View> getAllExportedViews() {
+ return statsManager.getExportedViews();
+ }
+
+ void clearStats() {
+ statsManager.clearStats();
+ }
+
+ void resumeStatsCollection() {
+ statsManager.resumeStatsCollection();
+ }
+}