aboutsummaryrefslogtreecommitdiff
path: root/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java
diff options
context:
space:
mode:
Diffstat (limited to 'impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java')
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java361
1 files changed, 361 insertions, 0 deletions
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java
new file mode 100644
index 00000000..deabdce7
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java
@@ -0,0 +1,361 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.common.Function;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.BucketBoundaries;
+
+/** Mutable version of {@link Aggregation} that supports adding values. */
+abstract class MutableAggregation {
+
+ private MutableAggregation() {}
+
+ // Tolerance for double comparison.
+ private static final double TOLERANCE = 1e-6;
+
+ /**
+ * Put a new value into the MutableAggregation.
+ *
+ * @param value new value to be added to population
+ */
+ abstract void add(double value);
+
+ /**
+ * Combine the internal values of this MutableAggregation and value of the given
+ * MutableAggregation, with the given fraction. Then set the internal value of this
+ * MutableAggregation to the combined value.
+ *
+ * @param other the other {@code MutableAggregation}. The type of this and other {@code
+ * MutableAggregation} must match.
+ * @param fraction the fraction that the value in other {@code MutableAggregation} should
+ * contribute. Must be within [0.0, 1.0].
+ */
+ abstract void combine(MutableAggregation other, double fraction);
+
+ /** Applies the given match function to the underlying data type. */
+ abstract <T> T match(
+ Function<? super MutableSum, T> p0,
+ Function<? super MutableCount, T> p1,
+ Function<? super MutableMean, T> p2,
+ Function<? super MutableDistribution, T> p3);
+
+ /** Calculate sum on aggregated {@code MeasureValue}s. */
+ static final class MutableSum extends MutableAggregation {
+
+ private double sum = 0.0;
+
+ private MutableSum() {}
+
+ /**
+ * Construct a {@code MutableSum}.
+ *
+ * @return an empty {@code MutableSum}.
+ */
+ static MutableSum create() {
+ return new MutableSum();
+ }
+
+ @Override
+ void add(double value) {
+ sum += value;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableSum, "MutableSum expected.");
+ this.sum += fraction * ((MutableSum) other).getSum();
+ }
+
+ /**
+ * Returns the aggregated sum.
+ *
+ * @return the aggregated sum.
+ */
+ double getSum() {
+ return sum;
+ }
+
+ @Override
+ final <T> T match(
+ Function<? super MutableSum, T> p0,
+ Function<? super MutableCount, T> p1,
+ Function<? super MutableMean, T> p2,
+ Function<? super MutableDistribution, T> p3) {
+ return p0.apply(this);
+ }
+ }
+
+ /** Calculate count on aggregated {@code MeasureValue}s. */
+ static final class MutableCount extends MutableAggregation {
+
+ private long count = 0;
+
+ private MutableCount() {}
+
+ /**
+ * Construct a {@code MutableCount}.
+ *
+ * @return an empty {@code MutableCount}.
+ */
+ static MutableCount create() {
+ return new MutableCount();
+ }
+
+ @Override
+ void add(double value) {
+ count++;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableCount, "MutableCount expected.");
+ this.count += Math.round(fraction * ((MutableCount) other).getCount());
+ }
+
+ /**
+ * Returns the aggregated count.
+ *
+ * @return the aggregated count.
+ */
+ long getCount() {
+ return count;
+ }
+
+ @Override
+ final <T> T match(
+ Function<? super MutableSum, T> p0,
+ Function<? super MutableCount, T> p1,
+ Function<? super MutableMean, T> p2,
+ Function<? super MutableDistribution, T> p3) {
+ return p1.apply(this);
+ }
+ }
+
+ /** Calculate mean on aggregated {@code MeasureValue}s. */
+ static final class MutableMean extends MutableAggregation {
+
+ private double sum = 0.0;
+ private long count = 0;
+
+ private MutableMean() {}
+
+ /**
+ * Construct a {@code MutableMean}.
+ *
+ * @return an empty {@code MutableMean}.
+ */
+ static MutableMean create() {
+ return new MutableMean();
+ }
+
+ @Override
+ void add(double value) {
+ count++;
+ sum += value;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableMean, "MutableMean expected.");
+ MutableMean mutableMean = (MutableMean) other;
+ this.count += Math.round(mutableMean.count * fraction);
+ this.sum += mutableMean.sum * fraction;
+ }
+
+ /**
+ * Returns the aggregated mean.
+ *
+ * @return the aggregated mean.
+ */
+ double getMean() {
+ return getCount() == 0 ? 0 : getSum() / getCount();
+ }
+
+ /**
+ * Returns the aggregated count.
+ *
+ * @return the aggregated count.
+ */
+ long getCount() {
+ return count;
+ }
+
+ /**
+ * Returns the aggregated sum.
+ *
+ * @return the aggregated sum.
+ */
+ double getSum() {
+ return sum;
+ }
+
+ @Override
+ final <T> T match(
+ Function<? super MutableSum, T> p0,
+ Function<? super MutableCount, T> p1,
+ Function<? super MutableMean, T> p2,
+ Function<? super MutableDistribution, T> p3) {
+ return p2.apply(this);
+ }
+ }
+
+ /** Calculate distribution stats on aggregated {@code MeasureValue}s. */
+ static final class MutableDistribution extends MutableAggregation {
+
+ private double sum = 0.0;
+ private double mean = 0.0;
+ private long count = 0;
+ private double sumOfSquaredDeviations = 0.0;
+
+ // Initial "impossible" values, that will get reset as soon as first value is added.
+ private double min = Double.POSITIVE_INFINITY;
+ private double max = Double.NEGATIVE_INFINITY;
+
+ private final BucketBoundaries bucketBoundaries;
+ private final long[] bucketCounts;
+
+ private MutableDistribution(BucketBoundaries bucketBoundaries) {
+ this.bucketBoundaries = bucketBoundaries;
+ this.bucketCounts = new long[bucketBoundaries.getBoundaries().size() + 1];
+ }
+
+ /**
+ * Construct a {@code MutableDistribution}.
+ *
+ * @return an empty {@code MutableDistribution}.
+ */
+ static MutableDistribution create(BucketBoundaries bucketBoundaries) {
+ checkNotNull(bucketBoundaries, "bucketBoundaries should not be null.");
+ return new MutableDistribution(bucketBoundaries);
+ }
+
+ @Override
+ void add(double value) {
+ sum += value;
+ count++;
+
+ /*
+ * Update the sum of squared deviations from the mean with the given value. For values
+ * x_i this is Sum[i=1..n]((x_i - mean)^2)
+ *
+ * Computed using Welfords method (see
+ * https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance, or Knuth, "The Art of
+ * Computer Programming", Vol. 2, page 323, 3rd edition)
+ */
+ double deltaFromMean = value - mean;
+ mean += deltaFromMean / count;
+ double deltaFromMean2 = value - mean;
+ sumOfSquaredDeviations += deltaFromMean * deltaFromMean2;
+
+ if (value < min) {
+ min = value;
+ }
+ if (value > max) {
+ max = value;
+ }
+
+ for (int i = 0; i < bucketBoundaries.getBoundaries().size(); i++) {
+ if (value < bucketBoundaries.getBoundaries().get(i)) {
+ bucketCounts[i]++;
+ return;
+ }
+ }
+ bucketCounts[bucketCounts.length - 1]++;
+ }
+
+ // We don't compute fractional MutableDistribution, it's either whole or none.
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableDistribution, "MutableDistribution expected.");
+ if (Math.abs(1.0 - fraction) > TOLERANCE) {
+ return;
+ }
+
+ MutableDistribution mutableDistribution = (MutableDistribution) other;
+ checkArgument(
+ this.bucketBoundaries.equals(mutableDistribution.bucketBoundaries),
+ "Bucket boundaries should match.");
+
+ // Algorithm for calculating the combination of sum of squared deviations:
+ // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm.
+ if (this.count + mutableDistribution.count > 0) {
+ double delta = mutableDistribution.mean - this.mean;
+ this.sumOfSquaredDeviations =
+ this.sumOfSquaredDeviations
+ + mutableDistribution.sumOfSquaredDeviations
+ + Math.pow(delta, 2)
+ * this.count
+ * mutableDistribution.count
+ / (this.count + mutableDistribution.count);
+ }
+
+ this.count += mutableDistribution.count;
+ this.sum += mutableDistribution.sum;
+ this.mean = this.sum / this.count;
+
+ if (mutableDistribution.min < this.min) {
+ this.min = mutableDistribution.min;
+ }
+ if (mutableDistribution.max > this.max) {
+ this.max = mutableDistribution.max;
+ }
+
+ long[] bucketCounts = mutableDistribution.getBucketCounts();
+ for (int i = 0; i < bucketCounts.length; i++) {
+ this.bucketCounts[i] += bucketCounts[i];
+ }
+ }
+
+ double getMean() {
+ return mean;
+ }
+
+ long getCount() {
+ return count;
+ }
+
+ double getMin() {
+ return min;
+ }
+
+ double getMax() {
+ return max;
+ }
+
+ // Returns the aggregated sum of squared deviations.
+ double getSumOfSquaredDeviations() {
+ return sumOfSquaredDeviations;
+ }
+
+ long[] getBucketCounts() {
+ return bucketCounts;
+ }
+
+ @Override
+ final <T> T match(
+ Function<? super MutableSum, T> p0,
+ Function<? super MutableCount, T> p1,
+ Function<? super MutableMean, T> p2,
+ Function<? super MutableDistribution, T> p3) {
+ return p3.apply(this);
+ }
+ }
+}