aboutsummaryrefslogtreecommitdiff
path: root/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java
diff options
context:
space:
mode:
Diffstat (limited to 'impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java')
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java556
1 files changed, 556 insertions, 0 deletions
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java
new file mode 100644
index 00000000..6e2bff1c
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java
@@ -0,0 +1,556 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opencensus.common.Timestamp;
+import io.opencensus.metrics.export.Distribution;
+import io.opencensus.metrics.export.Distribution.BucketOptions;
+import io.opencensus.metrics.export.Point;
+import io.opencensus.metrics.export.Value;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.AggregationData.DistributionData;
+import io.opencensus.stats.AggregationData.DistributionData.Exemplar;
+import io.opencensus.stats.BucketBoundaries;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Mutable version of {@link Aggregation} that supports adding values. */
+abstract class MutableAggregation {
+
+ private MutableAggregation() {}
+
+ // Tolerance for double comparison.
+ private static final double TOLERANCE = 1e-6;
+
+ /**
+ * Put a new value into the MutableAggregation.
+ *
+ * @param value new value to be added to population
+ * @param attachments the contextual information on an {@link Exemplar}
+ * @param timestamp the timestamp when the value is recorded
+ */
+ abstract void add(double value, Map<String, String> attachments, Timestamp timestamp);
+
+ // TODO(songya): remove this method once interval stats is completely removed.
+ /**
+ * Combine the internal values of this MutableAggregation and value of the given
+ * MutableAggregation, with the given fraction. Then set the internal value of this
+ * MutableAggregation to the combined value.
+ *
+ * @param other the other {@code MutableAggregation}. The type of this and other {@code
+ * MutableAggregation} must match.
+ * @param fraction the fraction that the value in other {@code MutableAggregation} should
+ * contribute. Must be within [0.0, 1.0].
+ */
+ abstract void combine(MutableAggregation other, double fraction);
+
+ abstract AggregationData toAggregationData();
+
+ abstract Point toPoint(Timestamp timestamp);
+
+ /** Calculate sum of doubles on aggregated {@code MeasureValue}s. */
+ static class MutableSumDouble extends MutableAggregation {
+
+ private double sum = 0.0;
+
+ private MutableSumDouble() {}
+
+ /**
+ * Construct a {@code MutableSumDouble}.
+ *
+ * @return an empty {@code MutableSumDouble}.
+ */
+ static MutableSumDouble create() {
+ return new MutableSumDouble();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ sum += value;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableSumDouble, "MutableSumDouble expected.");
+ this.sum += fraction * ((MutableSumDouble) other).sum;
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.SumDataDouble.create(sum);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.doubleValue(sum), timestamp);
+ }
+
+ @VisibleForTesting
+ double getSum() {
+ return sum;
+ }
+ }
+
+ /** Calculate sum of longs on aggregated {@code MeasureValue}s. */
+ static final class MutableSumLong extends MutableSumDouble {
+ private MutableSumLong() {
+ super();
+ }
+
+ /**
+ * Construct a {@code MutableSumLong}.
+ *
+ * @return an empty {@code MutableSumLong}.
+ */
+ static MutableSumLong create() {
+ return new MutableSumLong();
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.SumDataLong.create(Math.round(getSum()));
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.longValue(Math.round(getSum())), timestamp);
+ }
+ }
+
+ /** Calculate count on aggregated {@code MeasureValue}s. */
+ static final class MutableCount extends MutableAggregation {
+
+ private long count = 0;
+
+ private MutableCount() {}
+
+ /**
+ * Construct a {@code MutableCount}.
+ *
+ * @return an empty {@code MutableCount}.
+ */
+ static MutableCount create() {
+ return new MutableCount();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ count++;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableCount, "MutableCount expected.");
+ this.count += Math.round(fraction * ((MutableCount) other).getCount());
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.CountData.create(count);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.longValue(count), timestamp);
+ }
+
+ /**
+ * Returns the aggregated count.
+ *
+ * @return the aggregated count.
+ */
+ long getCount() {
+ return count;
+ }
+ }
+
+ /** Calculate mean on aggregated {@code MeasureValue}s. */
+ static final class MutableMean extends MutableAggregation {
+
+ private double sum = 0.0;
+ private long count = 0;
+
+ private MutableMean() {}
+
+ /**
+ * Construct a {@code MutableMean}.
+ *
+ * @return an empty {@code MutableMean}.
+ */
+ static MutableMean create() {
+ return new MutableMean();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ count++;
+ sum += value;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableMean, "MutableMean expected.");
+ MutableMean mutableMean = (MutableMean) other;
+ this.count += Math.round(mutableMean.count * fraction);
+ this.sum += mutableMean.sum * fraction;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.MeanData.create(getMean(), count);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.doubleValue(getMean()), timestamp);
+ }
+
+ /**
+ * Returns the aggregated mean.
+ *
+ * @return the aggregated mean.
+ */
+ double getMean() {
+ return count == 0 ? 0 : sum / count;
+ }
+
+ /**
+ * Returns the aggregated count.
+ *
+ * @return the aggregated count.
+ */
+ long getCount() {
+ return count;
+ }
+
+ @VisibleForTesting
+ double getSum() {
+ return sum;
+ }
+ }
+
+ /** Calculate distribution stats on aggregated {@code MeasureValue}s. */
+ static final class MutableDistribution extends MutableAggregation {
+
+ private double sum = 0.0;
+ private double mean = 0.0;
+ private long count = 0;
+ private double sumOfSquaredDeviations = 0.0;
+
+ // Initial "impossible" values, that will get reset as soon as first value is added.
+ private double min = Double.POSITIVE_INFINITY;
+ private double max = Double.NEGATIVE_INFINITY;
+
+ private final BucketBoundaries bucketBoundaries;
+ private final long[] bucketCounts;
+
+ // If there's a histogram (i.e bucket boundaries are not empty) in this MutableDistribution,
+ // exemplars will have the same size to bucketCounts; otherwise exemplars are null.
+ // Only the newest exemplar will be kept at each index.
+ @javax.annotation.Nullable private final Exemplar[] exemplars;
+
+ private MutableDistribution(BucketBoundaries bucketBoundaries) {
+ this.bucketBoundaries = bucketBoundaries;
+ int buckets = bucketBoundaries.getBoundaries().size() + 1;
+ this.bucketCounts = new long[buckets];
+ // In the implementation, each histogram bucket can have up to one exemplar, and the exemplar
+ // array is guaranteed to be in ascending order.
+ // If there's no histogram, don't record exemplars.
+ this.exemplars = bucketBoundaries.getBoundaries().isEmpty() ? null : new Exemplar[buckets];
+ }
+
+ /**
+ * Construct a {@code MutableDistribution}.
+ *
+ * @return an empty {@code MutableDistribution}.
+ */
+ static MutableDistribution create(BucketBoundaries bucketBoundaries) {
+ checkNotNull(bucketBoundaries, "bucketBoundaries should not be null.");
+ return new MutableDistribution(bucketBoundaries);
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ sum += value;
+ count++;
+
+ /*
+ * Update the sum of squared deviations from the mean with the given value. For values
+ * x_i this is Sum[i=1..n]((x_i - mean)^2)
+ *
+ * Computed using Welfords method (see
+ * https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance, or Knuth, "The Art of
+ * Computer Programming", Vol. 2, page 323, 3rd edition)
+ */
+ double deltaFromMean = value - mean;
+ mean += deltaFromMean / count;
+ double deltaFromMean2 = value - mean;
+ sumOfSquaredDeviations += deltaFromMean * deltaFromMean2;
+
+ if (value < min) {
+ min = value;
+ }
+ if (value > max) {
+ max = value;
+ }
+
+ int bucket = 0;
+ for (; bucket < bucketBoundaries.getBoundaries().size(); bucket++) {
+ if (value < bucketBoundaries.getBoundaries().get(bucket)) {
+ break;
+ }
+ }
+ bucketCounts[bucket]++;
+
+ // No implicit recording for exemplars - if there are no attachments (contextual information),
+ // don't record exemplars.
+ if (!attachments.isEmpty() && exemplars != null) {
+ exemplars[bucket] = Exemplar.create(value, timestamp, attachments);
+ }
+ }
+
+ // We don't compute fractional MutableDistribution, it's either whole or none.
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableDistribution, "MutableDistribution expected.");
+ if (Math.abs(1.0 - fraction) > TOLERANCE) {
+ return;
+ }
+
+ MutableDistribution mutableDistribution = (MutableDistribution) other;
+ checkArgument(
+ this.bucketBoundaries.equals(mutableDistribution.bucketBoundaries),
+ "Bucket boundaries should match.");
+
+ // Algorithm for calculating the combination of sum of squared deviations:
+ // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm.
+ if (this.count + mutableDistribution.count > 0) {
+ double delta = mutableDistribution.mean - this.mean;
+ this.sumOfSquaredDeviations =
+ this.sumOfSquaredDeviations
+ + mutableDistribution.sumOfSquaredDeviations
+ + Math.pow(delta, 2)
+ * this.count
+ * mutableDistribution.count
+ / (this.count + mutableDistribution.count);
+ }
+
+ this.count += mutableDistribution.count;
+ this.sum += mutableDistribution.sum;
+ this.mean = this.sum / this.count;
+
+ if (mutableDistribution.min < this.min) {
+ this.min = mutableDistribution.min;
+ }
+ if (mutableDistribution.max > this.max) {
+ this.max = mutableDistribution.max;
+ }
+
+ long[] bucketCounts = mutableDistribution.getBucketCounts();
+ for (int i = 0; i < bucketCounts.length; i++) {
+ this.bucketCounts[i] += bucketCounts[i];
+ }
+
+ Exemplar[] otherExemplars = mutableDistribution.getExemplars();
+ if (exemplars != null && otherExemplars != null) {
+ for (int i = 0; i < otherExemplars.length; i++) {
+ Exemplar exemplar = otherExemplars[i];
+ // Assume other is always newer than this, because we combined interval buckets in time
+ // order.
+ // If there's a newer exemplar, overwrite current value.
+ if (exemplar != null) {
+ this.exemplars[i] = exemplar;
+ }
+ }
+ }
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ List<Long> boxedBucketCounts = new ArrayList<Long>();
+ for (long bucketCount : bucketCounts) {
+ boxedBucketCounts.add(bucketCount);
+ }
+ List<Exemplar> exemplarList = new ArrayList<Exemplar>();
+ if (exemplars != null) {
+ for (Exemplar exemplar : exemplars) {
+ if (exemplar != null) {
+ exemplarList.add(exemplar);
+ }
+ }
+ }
+ return DistributionData.create(
+ mean, count, min, max, sumOfSquaredDeviations, boxedBucketCounts, exemplarList);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ List<Distribution.Bucket> buckets = new ArrayList<Distribution.Bucket>();
+ for (int bucket = 0; bucket < bucketCounts.length; bucket++) {
+ long bucketCount = bucketCounts[bucket];
+ @javax.annotation.Nullable AggregationData.DistributionData.Exemplar exemplar = null;
+ if (exemplars != null) {
+ exemplar = exemplars[bucket];
+ }
+
+ Distribution.Bucket metricBucket;
+ if (exemplar != null) {
+ // Bucket with an Exemplar.
+ metricBucket =
+ Distribution.Bucket.create(
+ bucketCount,
+ Distribution.Exemplar.create(
+ exemplar.getValue(), exemplar.getTimestamp(), exemplar.getAttachments()));
+ } else {
+ // Bucket with no Exemplar.
+ metricBucket = Distribution.Bucket.create(bucketCount);
+ }
+ buckets.add(metricBucket);
+ }
+
+ // TODO(mayurkale): Drop the first bucket when converting to metrics.
+ // Reason: In Stats API, bucket bounds begin with -infinity (first bucket is (-infinity, 0)).
+ BucketOptions bucketOptions = BucketOptions.explicitOptions(bucketBoundaries.getBoundaries());
+
+ return Point.create(
+ Value.distributionValue(
+ Distribution.create(
+ count, mean * count, sumOfSquaredDeviations, bucketOptions, buckets)),
+ timestamp);
+ }
+
+ double getMean() {
+ return mean;
+ }
+
+ long getCount() {
+ return count;
+ }
+
+ double getMin() {
+ return min;
+ }
+
+ double getMax() {
+ return max;
+ }
+
+ // Returns the aggregated sum of squared deviations.
+ double getSumOfSquaredDeviations() {
+ return sumOfSquaredDeviations;
+ }
+
+ long[] getBucketCounts() {
+ return bucketCounts;
+ }
+
+ BucketBoundaries getBucketBoundaries() {
+ return bucketBoundaries;
+ }
+
+ @javax.annotation.Nullable
+ Exemplar[] getExemplars() {
+ return exemplars;
+ }
+ }
+
+ /** Calculate double last value on aggregated {@code MeasureValue}s. */
+ static class MutableLastValueDouble extends MutableAggregation {
+
+ // Initial value that will get reset as soon as first value is added.
+ private double lastValue = Double.NaN;
+ // TODO(songya): remove this once interval stats is completely removed.
+ private boolean initialized = false;
+
+ private MutableLastValueDouble() {}
+
+ /**
+ * Construct a {@code MutableLastValueDouble}.
+ *
+ * @return an empty {@code MutableLastValueDouble}.
+ */
+ static MutableLastValueDouble create() {
+ return new MutableLastValueDouble();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ lastValue = value;
+ // TODO(songya): remove this once interval stats is completely removed.
+ if (!initialized) {
+ initialized = true;
+ }
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableLastValueDouble, "MutableLastValueDouble expected.");
+ MutableLastValueDouble otherValue = (MutableLastValueDouble) other;
+ // Assume other is always newer than this, because we combined interval buckets in time order.
+ // If there's a newer value, overwrite current value.
+ this.lastValue = otherValue.initialized ? otherValue.getLastValue() : this.lastValue;
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.LastValueDataDouble.create(lastValue);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.doubleValue(lastValue), timestamp);
+ }
+
+ @VisibleForTesting
+ double getLastValue() {
+ return lastValue;
+ }
+ }
+
+ /** Calculate last long value on aggregated {@code MeasureValue}s. */
+ static final class MutableLastValueLong extends MutableLastValueDouble {
+ private MutableLastValueLong() {
+ super();
+ }
+
+ /**
+ * Construct a {@code MutableLastValueLong}.
+ *
+ * @return an empty {@code MutableLastValueLong}.
+ */
+ static MutableLastValueLong create() {
+ return new MutableLastValueLong();
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.LastValueDataLong.create(Math.round(getLastValue()));
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.longValue(Math.round(getLastValue())), timestamp);
+ }
+ }
+}