aboutsummaryrefslogtreecommitdiff
path: root/exporters/stats/signalfx
diff options
context:
space:
mode:
Diffstat (limited to 'exporters/stats/signalfx')
-rw-r--r--exporters/stats/signalfx/README.md76
-rw-r--r--exporters/stats/signalfx/build.gradle23
-rw-r--r--exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxMetricsSenderFactory.java59
-rw-r--r--exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptor.java188
-rw-r--r--exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfiguration.java153
-rw-r--r--exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporter.java109
-rw-r--r--exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThread.java105
-rw-r--r--exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptorTest.java320
-rw-r--r--exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfigurationTest.java90
-rw-r--r--exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterTest.java93
-rw-r--r--exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThreadTest.java149
11 files changed, 1365 insertions, 0 deletions
diff --git a/exporters/stats/signalfx/README.md b/exporters/stats/signalfx/README.md
new file mode 100644
index 00000000..7c61f896
--- /dev/null
+++ b/exporters/stats/signalfx/README.md
@@ -0,0 +1,76 @@
+# OpenCensus SignalFx Stats Exporter
+
+The _OpenCensus SignalFx Stats Exporter_ is a stats exporter that
+exports data to [SignalFx](https://signalfx.com), a real-time monitoring
+solution for cloud and distributed applications. SignalFx ingests that
+data and offers various visualizations on charts, dashboards and service
+maps, as well as real-time anomaly detection.
+
+## Quickstart
+
+### Prerequisites
+
+To use this exporter, you must have a [SignalFx](https://signalfx.com)
+account and corresponding [data ingest
+token](https://docs.signalfx.com/en/latest/admin-guide/tokens.html).
+
+#### Java versions
+
+This exporter requires Java 7 or above.
+
+### Add the dependencies to your project
+
+For Maven add to your `pom.xml`:
+
+```xml
+<dependencies>
+ <dependency>
+ <groupId>io.opencensus</groupId>
+ <artifactId>opencensus-api</artifactId>
+ <version>0.16.1</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opencensus</groupId>
+ <artifactId>opencensus-exporter-stats-signalfx</artifactId>
+ <version>0.16.1</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opencensus</groupId>
+ <artifactId>opencensus-impl</artifactId>
+ <version>0.16.1</version>
+ <scope>runtime</scope>
+ </dependency>
+</dependencies>
+```
+
+For Gradle add to your dependencies:
+
+```
+compile 'io.opencensus:opencensus-api:0.16.1'
+compile 'io.opencensus:opencensus-exporter-stats-signalfx:0.16.1'
+runtime 'io.opencensus:opencensus-impl:0.16.1'
+```
+
+### Register the exporter
+
+```java
+public class MyMainClass {
+ public static void main(String[] args) {
+ // SignalFx token is read from Java system properties.
+ // Stats will be reported every second by default.
+ SignalFxStatsExporter.create(SignalFxStatsConfiguration.builder().build());
+ }
+}
+```
+
+If you want to pass in the token yourself, or set a different reporting
+interval, use:
+
+```java
+// Use token "your_signalfx_token" and report every 5 seconds.
+SignalFxStatsExporter.create(
+ SignalFxStatsConfiguration.builder()
+ .setToken("your_signalfx_token")
+ .setExportInterval(Duration.create(5, 0))
+ .build());
+```
diff --git a/exporters/stats/signalfx/build.gradle b/exporters/stats/signalfx/build.gradle
new file mode 100644
index 00000000..d496b1e5
--- /dev/null
+++ b/exporters/stats/signalfx/build.gradle
@@ -0,0 +1,23 @@
+description = 'OpenCensus SignalFx Stats Exporter'
+
+[compileJava, compileTestJava].each() {
+ it.sourceCompatibility = 1.7
+ it.targetCompatibility = 1.7
+}
+
+dependencies {
+ compileOnly libraries.auto_value
+
+ compile project(':opencensus-api'),
+ libraries.guava
+
+ compile (libraries.signalfx_java) {
+ // Prefer library version.
+ exclude group: 'com.google.guava', module: 'guava'
+ }
+
+ testCompile project(':opencensus-api')
+
+ signature "org.codehaus.mojo.signature:java17:1.0@signature"
+ signature "net.sf.androidscents.signature:android-api-level-14:4.0_r4@signature"
+}
diff --git a/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxMetricsSenderFactory.java b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxMetricsSenderFactory.java
new file mode 100644
index 00000000..5601a54c
--- /dev/null
+++ b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxMetricsSenderFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.exporter.stats.signalfx;
+
+import com.signalfx.endpoint.SignalFxEndpoint;
+import com.signalfx.metrics.auth.StaticAuthToken;
+import com.signalfx.metrics.connection.HttpDataPointProtobufReceiverFactory;
+import com.signalfx.metrics.connection.HttpEventProtobufReceiverFactory;
+import com.signalfx.metrics.errorhandler.OnSendErrorHandler;
+import com.signalfx.metrics.flush.AggregateMetricSender;
+import java.net.URI;
+import java.util.Collections;
+
+/** Interface for creators of {@link AggregateMetricSender}. */
+interface SignalFxMetricsSenderFactory {
+
+ /**
+ * Creates a new SignalFx metrics sender instance.
+ *
+ * @param endpoint The SignalFx ingest endpoint URL.
+ * @param token The SignalFx ingest token.
+ * @param errorHandler An {@link OnSendErrorHandler} through which errors when sending data to
+ * SignalFx will be communicated.
+ * @return The created {@link AggregateMetricSender} instance.
+ */
+ AggregateMetricSender create(URI endpoint, String token, OnSendErrorHandler errorHandler);
+
+ /** The default, concrete implementation of this interface. */
+ SignalFxMetricsSenderFactory DEFAULT =
+ new SignalFxMetricsSenderFactory() {
+ @Override
+ @SuppressWarnings("nullness")
+ public AggregateMetricSender create(
+ URI endpoint, String token, OnSendErrorHandler errorHandler) {
+ SignalFxEndpoint sfx =
+ new SignalFxEndpoint(endpoint.getScheme(), endpoint.getHost(), endpoint.getPort());
+ return new AggregateMetricSender(
+ null,
+ new HttpDataPointProtobufReceiverFactory(sfx).setVersion(2),
+ new HttpEventProtobufReceiverFactory(sfx),
+ new StaticAuthToken(token),
+ Collections.singleton(errorHandler));
+ }
+ };
+}
diff --git a/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptor.java b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptor.java
new file mode 100644
index 00000000..2eb75c4c
--- /dev/null
+++ b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptor.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.exporter.stats.signalfx;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.DataPoint;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Datum;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Dimension;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType;
+import io.opencensus.common.Function;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.AggregationData.CountData;
+import io.opencensus.stats.AggregationData.DistributionData;
+import io.opencensus.stats.AggregationData.LastValueDataDouble;
+import io.opencensus.stats.AggregationData.LastValueDataLong;
+import io.opencensus.stats.AggregationData.SumDataDouble;
+import io.opencensus.stats.AggregationData.SumDataLong;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+/** Adapter for a {@code ViewData}'s contents into SignalFx datapoints. */
+@SuppressWarnings("deprecation")
+final class SignalFxSessionAdaptor {
+
+ private SignalFxSessionAdaptor() {}
+
+ /**
+ * Converts the given view data into datapoints that can be sent to SignalFx.
+ *
+ * <p>The view name is used as the metric name, and the aggregation type and aggregation window
+ * type determine the metric type.
+ *
+ * @param data The {@link ViewData} containing the aggregation data of each combination of tag
+ * values.
+ * @return A list of datapoints for the corresponding metric timeseries of this view's metric.
+ */
+ static List<DataPoint> adapt(ViewData data) {
+ View view = data.getView();
+ List<TagKey> keys = view.getColumns();
+
+ MetricType metricType = getMetricTypeForAggregation(view.getAggregation(), view.getWindow());
+ if (metricType == null) {
+ return Collections.emptyList();
+ }
+
+ List<DataPoint> datapoints = new ArrayList<>(data.getAggregationMap().size());
+ for (Map.Entry<List</*@Nullable*/ TagValue>, AggregationData> entry :
+ data.getAggregationMap().entrySet()) {
+ datapoints.add(
+ DataPoint.newBuilder()
+ .setMetric(view.getName().asString())
+ .setMetricType(metricType)
+ .addAllDimensions(createDimensions(keys, entry.getKey()))
+ .setValue(createDatum(entry.getValue()))
+ .build());
+ }
+ return datapoints;
+ }
+
+ @VisibleForTesting
+ @javax.annotation.Nullable
+ static MetricType getMetricTypeForAggregation(
+ Aggregation aggregation, View.AggregationWindow window) {
+ if (aggregation instanceof Aggregation.Mean || aggregation instanceof Aggregation.LastValue) {
+ return MetricType.GAUGE;
+ } else if (aggregation instanceof Aggregation.Count || aggregation instanceof Aggregation.Sum) {
+ if (window instanceof View.AggregationWindow.Cumulative) {
+ return MetricType.CUMULATIVE_COUNTER;
+ }
+ // TODO(mpetazzoni): support incremental counters when AggregationWindow.Interval is ready
+ }
+
+ // TODO(mpetazzoni): add support for histograms (Aggregation.Distribution).
+ return null;
+ }
+
+ @VisibleForTesting
+ static Iterable<Dimension> createDimensions(
+ List<TagKey> keys, List</*@Nullable*/ TagValue> values) {
+ Preconditions.checkArgument(
+ keys.size() == values.size(), "TagKeys and TagValues don't have the same size.");
+ List<Dimension> dimensions = new ArrayList<>(keys.size());
+ for (ListIterator<TagKey> it = keys.listIterator(); it.hasNext(); ) {
+ TagKey key = it.next();
+ TagValue value = values.get(it.previousIndex());
+ if (value == null || Strings.isNullOrEmpty(value.asString())) {
+ continue;
+ }
+ dimensions.add(createDimension(key, value));
+ }
+ return dimensions;
+ }
+
+ @VisibleForTesting
+ static Dimension createDimension(TagKey key, TagValue value) {
+ return Dimension.newBuilder().setKey(key.getName()).setValue(value.asString()).build();
+ }
+
+ @VisibleForTesting
+ static Datum createDatum(AggregationData data) {
+ final Datum.Builder builder = Datum.newBuilder();
+ data.match(
+ new Function<SumDataDouble, Void>() {
+ @Override
+ public Void apply(SumDataDouble arg) {
+ builder.setDoubleValue(arg.getSum());
+ return null;
+ }
+ },
+ new Function<SumDataLong, Void>() {
+ @Override
+ public Void apply(SumDataLong arg) {
+ builder.setIntValue(arg.getSum());
+ return null;
+ }
+ },
+ new Function<CountData, Void>() {
+ @Override
+ public Void apply(CountData arg) {
+ builder.setIntValue(arg.getCount());
+ return null;
+ }
+ },
+ new Function<DistributionData, Void>() {
+ @Override
+ public Void apply(DistributionData arg) {
+ // TODO(mpetazzoni): add histogram support.
+ throw new IllegalArgumentException("Distribution aggregations are not supported");
+ }
+ },
+ new Function<LastValueDataDouble, Void>() {
+ @Override
+ public Void apply(LastValueDataDouble arg) {
+ builder.setDoubleValue(arg.getLastValue());
+ return null;
+ }
+ },
+ new Function<LastValueDataLong, Void>() {
+ @Override
+ public Void apply(LastValueDataLong arg) {
+ builder.setIntValue(arg.getLastValue());
+ return null;
+ }
+ },
+ new Function<AggregationData, Void>() {
+ @Override
+ public Void apply(AggregationData arg) {
+ // TODO(songya): remove this once Mean aggregation is completely removed. Before that
+ // we need to continue supporting Mean, since it could still be used by users and some
+ // deprecated RPC views.
+ if (arg instanceof AggregationData.MeanData) {
+ builder.setDoubleValue(((AggregationData.MeanData) arg).getMean());
+ return null;
+ }
+ throw new IllegalArgumentException("Unknown Aggregation.");
+ }
+ });
+ return builder.build();
+ }
+}
diff --git a/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfiguration.java b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfiguration.java
new file mode 100644
index 00000000..e8b4d756
--- /dev/null
+++ b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfiguration.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.exporter.stats.signalfx;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import io.opencensus.common.Duration;
+import java.net.URI;
+import java.net.URISyntaxException;
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Configurations for {@link SignalFxStatsExporter}.
+ *
+ * @since 0.11
+ */
+@AutoValue
+@Immutable
+public abstract class SignalFxStatsConfiguration {
+
+ /**
+ * The default SignalFx ingest API URL.
+ *
+ * @since 0.11
+ */
+ public static final URI DEFAULT_SIGNALFX_ENDPOINT;
+
+ static {
+ try {
+ DEFAULT_SIGNALFX_ENDPOINT = new URI("https://ingest.signalfx.com");
+ } catch (URISyntaxException e) {
+ // This shouldn't happen if DEFAULT_SIGNALFX_ENDPOINT was typed in correctly.
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * The default stats export interval.
+ *
+ * @since 0.11
+ */
+ public static final Duration DEFAULT_EXPORT_INTERVAL = Duration.create(1, 0);
+
+ private static final Duration ZERO = Duration.create(0, 0);
+
+ SignalFxStatsConfiguration() {}
+
+ /**
+ * Returns the SignalFx ingest API URL.
+ *
+ * @return the SignalFx ingest API URL.
+ * @since 0.11
+ */
+ public abstract URI getIngestEndpoint();
+
+ /**
+ * Returns the authentication token.
+ *
+ * @return the authentication token.
+ * @since 0.11
+ */
+ public abstract String getToken();
+
+ /**
+ * Returns the export interval between pushes to SignalFx.
+ *
+ * @return the export interval.
+ * @since 0.11
+ */
+ public abstract Duration getExportInterval();
+
+ /**
+ * Returns a new {@link Builder}.
+ *
+ * @return a {@code Builder}.
+ * @since 0.11
+ */
+ public static Builder builder() {
+ return new AutoValue_SignalFxStatsConfiguration.Builder()
+ .setIngestEndpoint(DEFAULT_SIGNALFX_ENDPOINT)
+ .setExportInterval(DEFAULT_EXPORT_INTERVAL);
+ }
+
+ /**
+ * Builder for {@link SignalFxStatsConfiguration}.
+ *
+ * @since 0.11
+ */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ Builder() {}
+
+ /**
+ * Sets the given SignalFx ingest API URL.
+ *
+ * @param url the SignalFx ingest API URL.
+ * @return this.
+ * @since 0.11
+ */
+ public abstract Builder setIngestEndpoint(URI url);
+
+ /**
+ * Sets the given authentication token.
+ *
+ * @param token the authentication token.
+ * @return this.
+ * @since 0.11
+ */
+ public abstract Builder setToken(String token);
+
+ /**
+ * Sets the export interval.
+ *
+ * @param exportInterval the export interval between pushes to SignalFx.
+ * @return this.
+ * @since 0.11
+ */
+ public abstract Builder setExportInterval(Duration exportInterval);
+
+ abstract SignalFxStatsConfiguration autoBuild();
+
+ /**
+ * Builds a new {@link SignalFxStatsConfiguration} with current settings.
+ *
+ * @return a {@code SignalFxStatsConfiguration}.
+ * @since 0.11
+ */
+ public SignalFxStatsConfiguration build() {
+ SignalFxStatsConfiguration config = autoBuild();
+ Preconditions.checkArgument(
+ !Strings.isNullOrEmpty(config.getToken()), "Invalid SignalFx token");
+ Preconditions.checkArgument(
+ config.getExportInterval().compareTo(ZERO) > 0, "Interval duration must be positive");
+ return config;
+ }
+ }
+}
diff --git a/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporter.java b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporter.java
new file mode 100644
index 00000000..f7915b71
--- /dev/null
+++ b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporter.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.exporter.stats.signalfx;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.opencensus.stats.Stats;
+import io.opencensus.stats.ViewManager;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * Exporter to SignalFx.
+ *
+ * <p>Example of usage:
+ *
+ * <pre><code>
+ * public static void main(String[] args) {
+ * SignalFxStatsExporter.create(SignalFxStatsConfiguration.builder().build());
+ * ... // Do work.
+ * }
+ * </code></pre>
+ *
+ * @since 0.11
+ */
+public final class SignalFxStatsExporter {
+
+ private static final Object monitor = new Object();
+
+ private final SignalFxStatsConfiguration configuration;
+ private final SignalFxStatsExporterWorkerThread workerThread;
+
+ @GuardedBy("monitor")
+ @Nullable
+ private static SignalFxStatsExporter exporter = null;
+
+ private SignalFxStatsExporter(SignalFxStatsConfiguration configuration, ViewManager viewManager) {
+ Preconditions.checkNotNull(configuration, "SignalFx stats exporter configuration");
+ this.configuration = configuration;
+ this.workerThread =
+ new SignalFxStatsExporterWorkerThread(
+ SignalFxMetricsSenderFactory.DEFAULT,
+ configuration.getIngestEndpoint(),
+ configuration.getToken(),
+ configuration.getExportInterval(),
+ viewManager);
+ }
+
+ /**
+ * Creates a SignalFx Stats exporter from the given {@link SignalFxStatsConfiguration}.
+ *
+ * <p>If {@code ingestEndpoint} is not set on the configuration, the exporter will use {@link
+ * SignalFxStatsConfiguration#DEFAULT_SIGNALFX_ENDPOINT}.
+ *
+ * <p>If {@code exportInterval} is not set on the configuration, the exporter will use {@link
+ * SignalFxStatsConfiguration#DEFAULT_EXPORT_INTERVAL}.
+ *
+ * @param configuration the {@code SignalFxStatsConfiguration}.
+ * @throws IllegalStateException if a SignalFx exporter is already created.
+ * @since 0.11
+ */
+ public static void create(SignalFxStatsConfiguration configuration) {
+ synchronized (monitor) {
+ Preconditions.checkState(exporter == null, "SignalFx stats exporter is already created.");
+ exporter = new SignalFxStatsExporter(configuration, Stats.getViewManager());
+ exporter.workerThread.start();
+ }
+ }
+
+ @VisibleForTesting
+ static void unsafeResetExporter() {
+ synchronized (monitor) {
+ if (exporter != null) {
+ SignalFxStatsExporterWorkerThread workerThread = exporter.workerThread;
+ if (workerThread != null && workerThread.isAlive()) {
+ try {
+ workerThread.interrupt();
+ workerThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ exporter = null;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ @Nullable
+ static SignalFxStatsConfiguration unsafeGetConfig() {
+ synchronized (monitor) {
+ return exporter != null ? exporter.configuration : null;
+ }
+ }
+}
diff --git a/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThread.java b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThread.java
new file mode 100644
index 00000000..348778e2
--- /dev/null
+++ b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThread.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.exporter.stats.signalfx;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.signalfx.metrics.errorhandler.MetricError;
+import com.signalfx.metrics.errorhandler.OnSendErrorHandler;
+import com.signalfx.metrics.flush.AggregateMetricSender;
+import com.signalfx.metrics.flush.AggregateMetricSender.Session;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.DataPoint;
+import io.opencensus.common.Duration;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.stats.ViewManager;
+import java.io.IOException;
+import java.net.URI;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Worker {@code Thread} that polls ViewData from the Stats's ViewManager and exports to SignalFx.
+ *
+ * <p>{@code SignalFxStatsExporterWorkerThread} is a daemon {@code Thread}
+ */
+final class SignalFxStatsExporterWorkerThread extends Thread {
+
+ private static final Logger logger =
+ Logger.getLogger(SignalFxStatsExporterWorkerThread.class.getName());
+
+ private static final OnSendErrorHandler ERROR_HANDLER =
+ new OnSendErrorHandler() {
+ @Override
+ public void handleError(MetricError error) {
+ logger.log(Level.WARNING, "Unable to send metrics to SignalFx: {0}", error.getMessage());
+ }
+ };
+
+ private final long intervalMs;
+ private final ViewManager views;
+ private final AggregateMetricSender sender;
+
+ SignalFxStatsExporterWorkerThread(
+ SignalFxMetricsSenderFactory factory,
+ URI endpoint,
+ String token,
+ Duration interval,
+ ViewManager views) {
+ this.intervalMs = interval.toMillis();
+ this.views = views;
+ this.sender = factory.create(endpoint, token, ERROR_HANDLER);
+
+ setDaemon(true);
+ setName(getClass().getSimpleName());
+ logger.log(Level.FINE, "Initialized SignalFx exporter to {0}.", endpoint);
+ }
+
+ @VisibleForTesting
+ void export() throws IOException {
+ Session session = sender.createSession();
+ try {
+ for (View view : views.getAllExportedViews()) {
+ ViewData data = views.getView(view.getName());
+ if (data == null) {
+ continue;
+ }
+
+ for (DataPoint datapoint : SignalFxSessionAdaptor.adapt(data)) {
+ session.setDatapoint(datapoint);
+ }
+ }
+ } finally {
+ session.close();
+ }
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ export();
+ Thread.sleep(intervalMs);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Throwable e) {
+ logger.log(Level.WARNING, "Exception thrown by the SignalFx stats exporter", e);
+ }
+ }
+ logger.log(Level.INFO, "SignalFx stats exporter stopped.");
+ }
+}
diff --git a/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptorTest.java b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptorTest.java
new file mode 100644
index 00000000..34f4dfa7
--- /dev/null
+++ b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptorTest.java
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.exporter.stats.signalfx;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.DataPoint;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Datum;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Dimension;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType;
+import io.opencensus.common.Duration;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.AggregationData.CountData;
+import io.opencensus.stats.AggregationData.DistributionData;
+import io.opencensus.stats.AggregationData.LastValueDataDouble;
+import io.opencensus.stats.AggregationData.LastValueDataLong;
+import io.opencensus.stats.AggregationData.MeanData;
+import io.opencensus.stats.AggregationData.SumDataDouble;
+import io.opencensus.stats.AggregationData.SumDataLong;
+import io.opencensus.stats.BucketBoundaries;
+import io.opencensus.stats.View;
+import io.opencensus.stats.View.AggregationWindow;
+import io.opencensus.stats.View.Name;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import java.util.List;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SignalFxSessionAdaptorTest {
+
+ private static final Duration ONE_SECOND = Duration.create(1, 0);
+
+ @Rule public final ExpectedException thrown = ExpectedException.none();
+
+ @Mock private View view;
+
+ @Mock private ViewData viewData;
+
+ @Before
+ public void setUp() {
+ Mockito.when(view.getName()).thenReturn(Name.create("view-name"));
+ Mockito.when(view.getColumns()).thenReturn(ImmutableList.of(TagKey.create("animal")));
+ Mockito.when(viewData.getView()).thenReturn(view);
+ }
+
+ @Test
+ public void checkMetricTypeFromAggregation() {
+ assertNull(SignalFxSessionAdaptor.getMetricTypeForAggregation(null, null));
+ assertNull(
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(
+ null, AggregationWindow.Cumulative.create()));
+ assertEquals(
+ MetricType.GAUGE,
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(
+ Aggregation.Mean.create(), AggregationWindow.Cumulative.create()));
+ assertEquals(
+ MetricType.GAUGE,
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(
+ Aggregation.Mean.create(), AggregationWindow.Interval.create(ONE_SECOND)));
+ assertEquals(
+ MetricType.CUMULATIVE_COUNTER,
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(
+ Aggregation.Count.create(), AggregationWindow.Cumulative.create()));
+ assertEquals(
+ MetricType.CUMULATIVE_COUNTER,
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(
+ Aggregation.Sum.create(), AggregationWindow.Cumulative.create()));
+ assertNull(
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(Aggregation.Count.create(), null));
+ assertNull(SignalFxSessionAdaptor.getMetricTypeForAggregation(Aggregation.Sum.create(), null));
+ assertNull(
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(
+ Aggregation.Count.create(), AggregationWindow.Interval.create(ONE_SECOND)));
+ assertNull(
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(
+ Aggregation.Sum.create(), AggregationWindow.Interval.create(ONE_SECOND)));
+ assertNull(
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(
+ Aggregation.Distribution.create(BucketBoundaries.create(ImmutableList.of(3.15d))),
+ AggregationWindow.Cumulative.create()));
+ assertEquals(
+ MetricType.GAUGE,
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(
+ Aggregation.LastValue.create(), AggregationWindow.Cumulative.create()));
+ assertEquals(
+ MetricType.GAUGE,
+ SignalFxSessionAdaptor.getMetricTypeForAggregation(
+ Aggregation.LastValue.create(), AggregationWindow.Interval.create(ONE_SECOND)));
+ }
+
+ @Test
+ public void createDimensionsWithNonMatchingListSizes() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("don't have the same size");
+ SignalFxSessionAdaptor.createDimensions(
+ ImmutableList.of(TagKey.create("animal"), TagKey.create("color")),
+ ImmutableList.of(TagValue.create("dog")));
+ }
+
+ @Test
+ public void createDimensionsIgnoresEmptyValues() {
+ List<Dimension> dimensions =
+ Lists.newArrayList(
+ SignalFxSessionAdaptor.createDimensions(
+ ImmutableList.of(TagKey.create("animal"), TagKey.create("color")),
+ ImmutableList.of(TagValue.create("dog"), TagValue.create(""))));
+ assertEquals(1, dimensions.size());
+ assertEquals("animal", dimensions.get(0).getKey());
+ assertEquals("dog", dimensions.get(0).getValue());
+ }
+
+ @Test
+ public void createDimension() {
+ Dimension dimension =
+ SignalFxSessionAdaptor.createDimension(TagKey.create("animal"), TagValue.create("dog"));
+ assertEquals("animal", dimension.getKey());
+ assertEquals("dog", dimension.getValue());
+ }
+
+ @Test
+ public void unsupportedAggregationYieldsNoDatapoints() {
+ Mockito.when(view.getAggregation())
+ .thenReturn(
+ Aggregation.Distribution.create(BucketBoundaries.create(ImmutableList.of(3.15d))));
+ Mockito.when(view.getWindow()).thenReturn(AggregationWindow.Cumulative.create());
+ List<DataPoint> datapoints = SignalFxSessionAdaptor.adapt(viewData);
+ assertEquals(0, datapoints.size());
+ }
+
+ @Test
+ public void noAggregationDataYieldsNoDatapoints() {
+ Mockito.when(view.getAggregation()).thenReturn(Aggregation.Count.create());
+ Mockito.when(view.getWindow()).thenReturn(AggregationWindow.Cumulative.create());
+ List<DataPoint> datapoints = SignalFxSessionAdaptor.adapt(viewData);
+ assertEquals(0, datapoints.size());
+ }
+
+ @Test
+ public void createDatumFromDoubleSum() {
+ SumDataDouble data = SumDataDouble.create(3.15d);
+ Datum datum = SignalFxSessionAdaptor.createDatum(data);
+ assertTrue(datum.hasDoubleValue());
+ assertFalse(datum.hasIntValue());
+ assertFalse(datum.hasStrValue());
+ assertEquals(3.15d, datum.getDoubleValue(), 0d);
+ }
+
+ @Test
+ public void createDatumFromLongSum() {
+ SumDataLong data = SumDataLong.create(42L);
+ Datum datum = SignalFxSessionAdaptor.createDatum(data);
+ assertFalse(datum.hasDoubleValue());
+ assertTrue(datum.hasIntValue());
+ assertFalse(datum.hasStrValue());
+ assertEquals(42L, datum.getIntValue());
+ }
+
+ @Test
+ public void createDatumFromCount() {
+ CountData data = CountData.create(42L);
+ Datum datum = SignalFxSessionAdaptor.createDatum(data);
+ assertFalse(datum.hasDoubleValue());
+ assertTrue(datum.hasIntValue());
+ assertFalse(datum.hasStrValue());
+ assertEquals(42L, datum.getIntValue());
+ }
+
+ @Test
+ public void createDatumFromMean() {
+ MeanData data = MeanData.create(3.15d, 2L);
+ Datum datum = SignalFxSessionAdaptor.createDatum(data);
+ assertTrue(datum.hasDoubleValue());
+ assertFalse(datum.hasIntValue());
+ assertFalse(datum.hasStrValue());
+ assertEquals(3.15d, datum.getDoubleValue(), 0d);
+ }
+
+ @Test
+ public void createDatumFromDistributionThrows() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Distribution aggregations are not supported");
+ SignalFxSessionAdaptor.createDatum(
+ DistributionData.create(5, 2, 0, 10, 40, ImmutableList.of(1L)));
+ }
+
+ @Test
+ public void createDatumFromLastValueDouble() {
+ LastValueDataDouble data = LastValueDataDouble.create(12.2);
+ Datum datum = SignalFxSessionAdaptor.createDatum(data);
+ assertTrue(datum.hasDoubleValue());
+ assertFalse(datum.hasIntValue());
+ assertFalse(datum.hasStrValue());
+ assertEquals(12.2, datum.getDoubleValue(), 0d);
+ }
+
+ @Test
+ public void createDatumFromLastValueLong() {
+ LastValueDataLong data = LastValueDataLong.create(100000);
+ Datum datum = SignalFxSessionAdaptor.createDatum(data);
+ assertFalse(datum.hasDoubleValue());
+ assertTrue(datum.hasIntValue());
+ assertFalse(datum.hasStrValue());
+ assertEquals(100000, datum.getIntValue());
+ }
+
+ @Test
+ public void adaptViewIntoDatapoints() {
+ Map<List<TagValue>, AggregationData> map =
+ ImmutableMap.<List<TagValue>, AggregationData>of(
+ ImmutableList.of(TagValue.create("dog")),
+ SumDataLong.create(2L),
+ ImmutableList.of(TagValue.create("cat")),
+ SumDataLong.create(3L));
+ Mockito.when(viewData.getAggregationMap()).thenReturn(map);
+ Mockito.when(view.getAggregation()).thenReturn(Aggregation.Count.create());
+ Mockito.when(view.getWindow()).thenReturn(AggregationWindow.Cumulative.create());
+
+ List<DataPoint> datapoints = SignalFxSessionAdaptor.adapt(viewData);
+ assertEquals(2, datapoints.size());
+ for (DataPoint dp : datapoints) {
+ assertEquals("view-name", dp.getMetric());
+ assertEquals(MetricType.CUMULATIVE_COUNTER, dp.getMetricType());
+ assertEquals(1, dp.getDimensionsCount());
+ assertTrue(dp.hasValue());
+ assertFalse(dp.hasSource());
+
+ Datum datum = dp.getValue();
+ assertTrue(datum.hasIntValue());
+ assertFalse(datum.hasDoubleValue());
+ assertFalse(datum.hasStrValue());
+
+ Dimension dimension = dp.getDimensions(0);
+ assertEquals("animal", dimension.getKey());
+ switch (dimension.getValue()) {
+ case "dog":
+ assertEquals(2L, datum.getIntValue());
+ break;
+ case "cat":
+ assertEquals(3L, datum.getIntValue());
+ break;
+ default:
+ fail("unexpected dimension value");
+ }
+ }
+ }
+
+ @Test
+ public void adaptViewWithEmptyTagValueIntoDatapoints() {
+ Map<List<TagValue>, AggregationData> map =
+ ImmutableMap.<List<TagValue>, AggregationData>of(
+ ImmutableList.of(TagValue.create("dog")),
+ SumDataLong.create(2L),
+ ImmutableList.of(TagValue.create("")),
+ SumDataLong.create(3L));
+ Mockito.when(viewData.getAggregationMap()).thenReturn(map);
+ Mockito.when(view.getAggregation()).thenReturn(Aggregation.Count.create());
+ Mockito.when(view.getWindow()).thenReturn(AggregationWindow.Cumulative.create());
+
+ List<DataPoint> datapoints = SignalFxSessionAdaptor.adapt(viewData);
+ assertEquals(2, datapoints.size());
+ for (DataPoint dp : datapoints) {
+ assertEquals("view-name", dp.getMetric());
+ assertEquals(MetricType.CUMULATIVE_COUNTER, dp.getMetricType());
+ assertTrue(dp.hasValue());
+ assertFalse(dp.hasSource());
+
+ Datum datum = dp.getValue();
+ assertTrue(datum.hasIntValue());
+ assertFalse(datum.hasDoubleValue());
+ assertFalse(datum.hasStrValue());
+
+ switch (dp.getDimensionsCount()) {
+ case 0:
+ assertEquals(3L, datum.getIntValue());
+ break;
+ case 1:
+ Dimension dimension = dp.getDimensions(0);
+ assertEquals("animal", dimension.getKey());
+ assertEquals("dog", dimension.getValue());
+ assertEquals(2L, datum.getIntValue());
+ break;
+ default:
+ fail("Unexpected number of dimensions on the created datapoint");
+ break;
+ }
+ }
+ }
+}
diff --git a/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfigurationTest.java b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfigurationTest.java
new file mode 100644
index 00000000..1d3508fb
--- /dev/null
+++ b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfigurationTest.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.exporter.stats.signalfx;
+
+import static org.junit.Assert.assertEquals;
+
+import io.opencensus.common.Duration;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link SignalFxStatsConfiguration}. */
+@RunWith(JUnit4.class)
+public class SignalFxStatsConfigurationTest {
+
+ private static final String TEST_TOKEN = "token";
+
+ @Rule public final ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void buildWithDefaults() {
+ SignalFxStatsConfiguration configuration =
+ SignalFxStatsConfiguration.builder().setToken(TEST_TOKEN).build();
+ assertEquals(TEST_TOKEN, configuration.getToken());
+ assertEquals(
+ SignalFxStatsConfiguration.DEFAULT_SIGNALFX_ENDPOINT, configuration.getIngestEndpoint());
+ assertEquals(
+ SignalFxStatsConfiguration.DEFAULT_EXPORT_INTERVAL, configuration.getExportInterval());
+ }
+
+ @Test
+ public void buildWithFields() throws URISyntaxException {
+ URI url = new URI("http://example.com");
+ Duration duration = Duration.create(5, 0);
+ SignalFxStatsConfiguration configuration =
+ SignalFxStatsConfiguration.builder()
+ .setToken(TEST_TOKEN)
+ .setIngestEndpoint(url)
+ .setExportInterval(duration)
+ .build();
+ assertEquals(TEST_TOKEN, configuration.getToken());
+ assertEquals(url, configuration.getIngestEndpoint());
+ assertEquals(duration, configuration.getExportInterval());
+ }
+
+ @Test
+ public void sameConfigurationsAreEqual() {
+ SignalFxStatsConfiguration config1 =
+ SignalFxStatsConfiguration.builder().setToken(TEST_TOKEN).build();
+ SignalFxStatsConfiguration config2 =
+ SignalFxStatsConfiguration.builder().setToken(TEST_TOKEN).build();
+ assertEquals(config1, config2);
+ assertEquals(config1.hashCode(), config2.hashCode());
+ }
+
+ @Test
+ public void buildWithEmptyToken() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid SignalFx token");
+ SignalFxStatsConfiguration.builder().setToken("").build();
+ }
+
+ @Test
+ public void buildWithNegativeDuration() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Interval duration must be positive");
+ SignalFxStatsConfiguration.builder()
+ .setToken(TEST_TOKEN)
+ .setExportInterval(Duration.create(-1, 0))
+ .build();
+ }
+}
diff --git a/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterTest.java b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterTest.java
new file mode 100644
index 00000000..cc5730b1
--- /dev/null
+++ b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterTest.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.exporter.stats.signalfx;
+
+import static org.junit.Assert.assertEquals;
+
+import io.opencensus.common.Duration;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link SignalFxStatsExporter}. */
+@RunWith(JUnit4.class)
+public class SignalFxStatsExporterTest {
+
+ private static final String TEST_TOKEN = "token";
+ private static final String TEST_ENDPOINT = "https://example.com";
+ private static final Duration ONE_SECOND = Duration.create(1, 0);
+
+ @Rule public final ExpectedException thrown = ExpectedException.none();
+
+ @After
+ public void tearDown() {
+ SignalFxStatsExporter.unsafeResetExporter();
+ }
+
+ @Test
+ public void createWithNullConfiguration() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("configuration");
+ SignalFxStatsExporter.create(null);
+ }
+
+ @Test
+ public void createWithNullHostUsesDefault() {
+ SignalFxStatsExporter.create(SignalFxStatsConfiguration.builder().setToken(TEST_TOKEN).build());
+ assertEquals(
+ SignalFxStatsConfiguration.DEFAULT_SIGNALFX_ENDPOINT,
+ SignalFxStatsExporter.unsafeGetConfig().getIngestEndpoint());
+ }
+
+ @Test
+ public void createWithNullIntervalUsesDefault() {
+ SignalFxStatsExporter.create(SignalFxStatsConfiguration.builder().setToken(TEST_TOKEN).build());
+ assertEquals(
+ SignalFxStatsConfiguration.DEFAULT_EXPORT_INTERVAL,
+ SignalFxStatsExporter.unsafeGetConfig().getExportInterval());
+ }
+
+ @Test
+ public void createExporterTwice() {
+ SignalFxStatsConfiguration config =
+ SignalFxStatsConfiguration.builder()
+ .setToken(TEST_TOKEN)
+ .setExportInterval(ONE_SECOND)
+ .build();
+ SignalFxStatsExporter.create(config);
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("SignalFx stats exporter is already created.");
+ SignalFxStatsExporter.create(config);
+ }
+
+ @Test
+ public void createWithConfiguration() throws URISyntaxException {
+ SignalFxStatsConfiguration config =
+ SignalFxStatsConfiguration.builder()
+ .setToken(TEST_TOKEN)
+ .setIngestEndpoint(new URI(TEST_ENDPOINT))
+ .setExportInterval(ONE_SECOND)
+ .build();
+ SignalFxStatsExporter.create(config);
+ assertEquals(config, SignalFxStatsExporter.unsafeGetConfig());
+ }
+}
diff --git a/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThreadTest.java b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThreadTest.java
new file mode 100644
index 00000000..d8852d5f
--- /dev/null
+++ b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThreadTest.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.exporter.stats.signalfx;
+
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.signalfx.metrics.errorhandler.OnSendErrorHandler;
+import com.signalfx.metrics.flush.AggregateMetricSender;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.DataPoint;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Datum;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Dimension;
+import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType;
+import io.opencensus.common.Duration;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.AggregationData.MeanData;
+import io.opencensus.stats.View;
+import io.opencensus.stats.View.AggregationWindow;
+import io.opencensus.stats.View.Name;
+import io.opencensus.stats.ViewData;
+import io.opencensus.stats.ViewManager;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SignalFxStatsExporterWorkerThreadTest {
+
+ private static final String TEST_TOKEN = "token";
+ private static final Duration ONE_SECOND = Duration.create(1, 0);
+
+ @Mock private AggregateMetricSender.Session session;
+
+ @Mock private ViewManager viewManager;
+
+ @Mock private SignalFxMetricsSenderFactory factory;
+
+ private URI endpoint;
+
+ @Before
+ public void setUp() throws Exception {
+ endpoint = new URI("http://example.com");
+
+ Mockito.when(
+ factory.create(
+ Mockito.any(URI.class), Mockito.anyString(), Mockito.any(OnSendErrorHandler.class)))
+ .thenAnswer(
+ new Answer<AggregateMetricSender>() {
+ @Override
+ public AggregateMetricSender answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ AggregateMetricSender sender =
+ SignalFxMetricsSenderFactory.DEFAULT.create(
+ (URI) args[0], (String) args[1], (OnSendErrorHandler) args[2]);
+ AggregateMetricSender spy = Mockito.spy(sender);
+ Mockito.doReturn(session).when(spy).createSession();
+ return spy;
+ }
+ });
+ }
+
+ @Test
+ public void createThread() {
+ SignalFxStatsExporterWorkerThread thread =
+ new SignalFxStatsExporterWorkerThread(
+ factory, endpoint, TEST_TOKEN, ONE_SECOND, viewManager);
+ assertTrue(thread.isDaemon());
+ assertThat(thread.getName(), startsWith("SignalFx"));
+ }
+
+ @Test
+ public void senderThreadInterruptStopsLoop() throws InterruptedException {
+ Mockito.when(session.setDatapoint(Mockito.any(DataPoint.class))).thenReturn(session);
+ Mockito.when(viewManager.getAllExportedViews()).thenReturn(ImmutableSet.<View>of());
+
+ SignalFxStatsExporterWorkerThread thread =
+ new SignalFxStatsExporterWorkerThread(
+ factory, endpoint, TEST_TOKEN, ONE_SECOND, viewManager);
+ thread.start();
+ thread.interrupt();
+ thread.join(5000, 0);
+ assertFalse("Worker thread should have stopped", thread.isAlive());
+ }
+
+ @Test
+ public void setsDatapointsFromViewOnSession() throws IOException {
+ View view = Mockito.mock(View.class);
+ Name viewName = Name.create("test");
+ Mockito.when(view.getName()).thenReturn(viewName);
+ Mockito.when(view.getAggregation()).thenReturn(Aggregation.Mean.create());
+ Mockito.when(view.getWindow()).thenReturn(AggregationWindow.Cumulative.create());
+ Mockito.when(view.getColumns()).thenReturn(ImmutableList.of(TagKey.create("animal")));
+
+ ViewData viewData = Mockito.mock(ViewData.class);
+ Mockito.when(viewData.getView()).thenReturn(view);
+ Mockito.when(viewData.getAggregationMap())
+ .thenReturn(
+ ImmutableMap.<List<TagValue>, AggregationData>of(
+ ImmutableList.of(TagValue.create("cat")), MeanData.create(3.15d, 1)));
+
+ Mockito.when(viewManager.getAllExportedViews()).thenReturn(ImmutableSet.of(view));
+ Mockito.when(viewManager.getView(Mockito.eq(viewName))).thenReturn(viewData);
+
+ SignalFxStatsExporterWorkerThread thread =
+ new SignalFxStatsExporterWorkerThread(
+ factory, endpoint, TEST_TOKEN, ONE_SECOND, viewManager);
+ thread.export();
+
+ DataPoint datapoint =
+ DataPoint.newBuilder()
+ .setMetric("test")
+ .setMetricType(MetricType.GAUGE)
+ .addDimensions(Dimension.newBuilder().setKey("animal").setValue("cat").build())
+ .setValue(Datum.newBuilder().setDoubleValue(3.15d).build())
+ .build();
+ Mockito.verify(session).setDatapoint(Mockito.eq(datapoint));
+ Mockito.verify(session).close();
+ }
+}