diff options
Diffstat (limited to 'exporters/stats/signalfx')
11 files changed, 1365 insertions, 0 deletions
diff --git a/exporters/stats/signalfx/README.md b/exporters/stats/signalfx/README.md new file mode 100644 index 00000000..7c61f896 --- /dev/null +++ b/exporters/stats/signalfx/README.md @@ -0,0 +1,76 @@ +# OpenCensus SignalFx Stats Exporter + +The _OpenCensus SignalFx Stats Exporter_ is a stats exporter that +exports data to [SignalFx](https://signalfx.com), a real-time monitoring +solution for cloud and distributed applications. SignalFx ingests that +data and offers various visualizations on charts, dashboards and service +maps, as well as real-time anomaly detection. + +## Quickstart + +### Prerequisites + +To use this exporter, you must have a [SignalFx](https://signalfx.com) +account and corresponding [data ingest +token](https://docs.signalfx.com/en/latest/admin-guide/tokens.html). + +#### Java versions + +This exporter requires Java 7 or above. + +### Add the dependencies to your project + +For Maven add to your `pom.xml`: + +```xml +<dependencies> + <dependency> + <groupId>io.opencensus</groupId> + <artifactId>opencensus-api</artifactId> + <version>0.16.1</version> + </dependency> + <dependency> + <groupId>io.opencensus</groupId> + <artifactId>opencensus-exporter-stats-signalfx</artifactId> + <version>0.16.1</version> + </dependency> + <dependency> + <groupId>io.opencensus</groupId> + <artifactId>opencensus-impl</artifactId> + <version>0.16.1</version> + <scope>runtime</scope> + </dependency> +</dependencies> +``` + +For Gradle add to your dependencies: + +``` +compile 'io.opencensus:opencensus-api:0.16.1' +compile 'io.opencensus:opencensus-exporter-stats-signalfx:0.16.1' +runtime 'io.opencensus:opencensus-impl:0.16.1' +``` + +### Register the exporter + +```java +public class MyMainClass { + public static void main(String[] args) { + // SignalFx token is read from Java system properties. + // Stats will be reported every second by default. + SignalFxStatsExporter.create(SignalFxStatsConfiguration.builder().build()); + } +} +``` + +If you want to pass in the token yourself, or set a different reporting +interval, use: + +```java +// Use token "your_signalfx_token" and report every 5 seconds. +SignalFxStatsExporter.create( + SignalFxStatsConfiguration.builder() + .setToken("your_signalfx_token") + .setExportInterval(Duration.create(5, 0)) + .build()); +``` diff --git a/exporters/stats/signalfx/build.gradle b/exporters/stats/signalfx/build.gradle new file mode 100644 index 00000000..d496b1e5 --- /dev/null +++ b/exporters/stats/signalfx/build.gradle @@ -0,0 +1,23 @@ +description = 'OpenCensus SignalFx Stats Exporter' + +[compileJava, compileTestJava].each() { + it.sourceCompatibility = 1.7 + it.targetCompatibility = 1.7 +} + +dependencies { + compileOnly libraries.auto_value + + compile project(':opencensus-api'), + libraries.guava + + compile (libraries.signalfx_java) { + // Prefer library version. + exclude group: 'com.google.guava', module: 'guava' + } + + testCompile project(':opencensus-api') + + signature "org.codehaus.mojo.signature:java17:1.0@signature" + signature "net.sf.androidscents.signature:android-api-level-14:4.0_r4@signature" +} diff --git a/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxMetricsSenderFactory.java b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxMetricsSenderFactory.java new file mode 100644 index 00000000..5601a54c --- /dev/null +++ b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxMetricsSenderFactory.java @@ -0,0 +1,59 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.exporter.stats.signalfx; + +import com.signalfx.endpoint.SignalFxEndpoint; +import com.signalfx.metrics.auth.StaticAuthToken; +import com.signalfx.metrics.connection.HttpDataPointProtobufReceiverFactory; +import com.signalfx.metrics.connection.HttpEventProtobufReceiverFactory; +import com.signalfx.metrics.errorhandler.OnSendErrorHandler; +import com.signalfx.metrics.flush.AggregateMetricSender; +import java.net.URI; +import java.util.Collections; + +/** Interface for creators of {@link AggregateMetricSender}. */ +interface SignalFxMetricsSenderFactory { + + /** + * Creates a new SignalFx metrics sender instance. + * + * @param endpoint The SignalFx ingest endpoint URL. + * @param token The SignalFx ingest token. + * @param errorHandler An {@link OnSendErrorHandler} through which errors when sending data to + * SignalFx will be communicated. + * @return The created {@link AggregateMetricSender} instance. + */ + AggregateMetricSender create(URI endpoint, String token, OnSendErrorHandler errorHandler); + + /** The default, concrete implementation of this interface. */ + SignalFxMetricsSenderFactory DEFAULT = + new SignalFxMetricsSenderFactory() { + @Override + @SuppressWarnings("nullness") + public AggregateMetricSender create( + URI endpoint, String token, OnSendErrorHandler errorHandler) { + SignalFxEndpoint sfx = + new SignalFxEndpoint(endpoint.getScheme(), endpoint.getHost(), endpoint.getPort()); + return new AggregateMetricSender( + null, + new HttpDataPointProtobufReceiverFactory(sfx).setVersion(2), + new HttpEventProtobufReceiverFactory(sfx), + new StaticAuthToken(token), + Collections.singleton(errorHandler)); + } + }; +} diff --git a/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptor.java b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptor.java new file mode 100644 index 00000000..2eb75c4c --- /dev/null +++ b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptor.java @@ -0,0 +1,188 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.exporter.stats.signalfx; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.DataPoint; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Datum; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Dimension; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType; +import io.opencensus.common.Function; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.AggregationData.CountData; +import io.opencensus.stats.AggregationData.DistributionData; +import io.opencensus.stats.AggregationData.LastValueDataDouble; +import io.opencensus.stats.AggregationData.LastValueDataLong; +import io.opencensus.stats.AggregationData.SumDataDouble; +import io.opencensus.stats.AggregationData.SumDataLong; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +/** Adapter for a {@code ViewData}'s contents into SignalFx datapoints. */ +@SuppressWarnings("deprecation") +final class SignalFxSessionAdaptor { + + private SignalFxSessionAdaptor() {} + + /** + * Converts the given view data into datapoints that can be sent to SignalFx. + * + * <p>The view name is used as the metric name, and the aggregation type and aggregation window + * type determine the metric type. + * + * @param data The {@link ViewData} containing the aggregation data of each combination of tag + * values. + * @return A list of datapoints for the corresponding metric timeseries of this view's metric. + */ + static List<DataPoint> adapt(ViewData data) { + View view = data.getView(); + List<TagKey> keys = view.getColumns(); + + MetricType metricType = getMetricTypeForAggregation(view.getAggregation(), view.getWindow()); + if (metricType == null) { + return Collections.emptyList(); + } + + List<DataPoint> datapoints = new ArrayList<>(data.getAggregationMap().size()); + for (Map.Entry<List</*@Nullable*/ TagValue>, AggregationData> entry : + data.getAggregationMap().entrySet()) { + datapoints.add( + DataPoint.newBuilder() + .setMetric(view.getName().asString()) + .setMetricType(metricType) + .addAllDimensions(createDimensions(keys, entry.getKey())) + .setValue(createDatum(entry.getValue())) + .build()); + } + return datapoints; + } + + @VisibleForTesting + @javax.annotation.Nullable + static MetricType getMetricTypeForAggregation( + Aggregation aggregation, View.AggregationWindow window) { + if (aggregation instanceof Aggregation.Mean || aggregation instanceof Aggregation.LastValue) { + return MetricType.GAUGE; + } else if (aggregation instanceof Aggregation.Count || aggregation instanceof Aggregation.Sum) { + if (window instanceof View.AggregationWindow.Cumulative) { + return MetricType.CUMULATIVE_COUNTER; + } + // TODO(mpetazzoni): support incremental counters when AggregationWindow.Interval is ready + } + + // TODO(mpetazzoni): add support for histograms (Aggregation.Distribution). + return null; + } + + @VisibleForTesting + static Iterable<Dimension> createDimensions( + List<TagKey> keys, List</*@Nullable*/ TagValue> values) { + Preconditions.checkArgument( + keys.size() == values.size(), "TagKeys and TagValues don't have the same size."); + List<Dimension> dimensions = new ArrayList<>(keys.size()); + for (ListIterator<TagKey> it = keys.listIterator(); it.hasNext(); ) { + TagKey key = it.next(); + TagValue value = values.get(it.previousIndex()); + if (value == null || Strings.isNullOrEmpty(value.asString())) { + continue; + } + dimensions.add(createDimension(key, value)); + } + return dimensions; + } + + @VisibleForTesting + static Dimension createDimension(TagKey key, TagValue value) { + return Dimension.newBuilder().setKey(key.getName()).setValue(value.asString()).build(); + } + + @VisibleForTesting + static Datum createDatum(AggregationData data) { + final Datum.Builder builder = Datum.newBuilder(); + data.match( + new Function<SumDataDouble, Void>() { + @Override + public Void apply(SumDataDouble arg) { + builder.setDoubleValue(arg.getSum()); + return null; + } + }, + new Function<SumDataLong, Void>() { + @Override + public Void apply(SumDataLong arg) { + builder.setIntValue(arg.getSum()); + return null; + } + }, + new Function<CountData, Void>() { + @Override + public Void apply(CountData arg) { + builder.setIntValue(arg.getCount()); + return null; + } + }, + new Function<DistributionData, Void>() { + @Override + public Void apply(DistributionData arg) { + // TODO(mpetazzoni): add histogram support. + throw new IllegalArgumentException("Distribution aggregations are not supported"); + } + }, + new Function<LastValueDataDouble, Void>() { + @Override + public Void apply(LastValueDataDouble arg) { + builder.setDoubleValue(arg.getLastValue()); + return null; + } + }, + new Function<LastValueDataLong, Void>() { + @Override + public Void apply(LastValueDataLong arg) { + builder.setIntValue(arg.getLastValue()); + return null; + } + }, + new Function<AggregationData, Void>() { + @Override + public Void apply(AggregationData arg) { + // TODO(songya): remove this once Mean aggregation is completely removed. Before that + // we need to continue supporting Mean, since it could still be used by users and some + // deprecated RPC views. + if (arg instanceof AggregationData.MeanData) { + builder.setDoubleValue(((AggregationData.MeanData) arg).getMean()); + return null; + } + throw new IllegalArgumentException("Unknown Aggregation."); + } + }); + return builder.build(); + } +} diff --git a/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfiguration.java b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfiguration.java new file mode 100644 index 00000000..e8b4d756 --- /dev/null +++ b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfiguration.java @@ -0,0 +1,153 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.exporter.stats.signalfx; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import io.opencensus.common.Duration; +import java.net.URI; +import java.net.URISyntaxException; +import javax.annotation.concurrent.Immutable; + +/** + * Configurations for {@link SignalFxStatsExporter}. + * + * @since 0.11 + */ +@AutoValue +@Immutable +public abstract class SignalFxStatsConfiguration { + + /** + * The default SignalFx ingest API URL. + * + * @since 0.11 + */ + public static final URI DEFAULT_SIGNALFX_ENDPOINT; + + static { + try { + DEFAULT_SIGNALFX_ENDPOINT = new URI("https://ingest.signalfx.com"); + } catch (URISyntaxException e) { + // This shouldn't happen if DEFAULT_SIGNALFX_ENDPOINT was typed in correctly. + throw new IllegalStateException(e); + } + } + + /** + * The default stats export interval. + * + * @since 0.11 + */ + public static final Duration DEFAULT_EXPORT_INTERVAL = Duration.create(1, 0); + + private static final Duration ZERO = Duration.create(0, 0); + + SignalFxStatsConfiguration() {} + + /** + * Returns the SignalFx ingest API URL. + * + * @return the SignalFx ingest API URL. + * @since 0.11 + */ + public abstract URI getIngestEndpoint(); + + /** + * Returns the authentication token. + * + * @return the authentication token. + * @since 0.11 + */ + public abstract String getToken(); + + /** + * Returns the export interval between pushes to SignalFx. + * + * @return the export interval. + * @since 0.11 + */ + public abstract Duration getExportInterval(); + + /** + * Returns a new {@link Builder}. + * + * @return a {@code Builder}. + * @since 0.11 + */ + public static Builder builder() { + return new AutoValue_SignalFxStatsConfiguration.Builder() + .setIngestEndpoint(DEFAULT_SIGNALFX_ENDPOINT) + .setExportInterval(DEFAULT_EXPORT_INTERVAL); + } + + /** + * Builder for {@link SignalFxStatsConfiguration}. + * + * @since 0.11 + */ + @AutoValue.Builder + public abstract static class Builder { + + Builder() {} + + /** + * Sets the given SignalFx ingest API URL. + * + * @param url the SignalFx ingest API URL. + * @return this. + * @since 0.11 + */ + public abstract Builder setIngestEndpoint(URI url); + + /** + * Sets the given authentication token. + * + * @param token the authentication token. + * @return this. + * @since 0.11 + */ + public abstract Builder setToken(String token); + + /** + * Sets the export interval. + * + * @param exportInterval the export interval between pushes to SignalFx. + * @return this. + * @since 0.11 + */ + public abstract Builder setExportInterval(Duration exportInterval); + + abstract SignalFxStatsConfiguration autoBuild(); + + /** + * Builds a new {@link SignalFxStatsConfiguration} with current settings. + * + * @return a {@code SignalFxStatsConfiguration}. + * @since 0.11 + */ + public SignalFxStatsConfiguration build() { + SignalFxStatsConfiguration config = autoBuild(); + Preconditions.checkArgument( + !Strings.isNullOrEmpty(config.getToken()), "Invalid SignalFx token"); + Preconditions.checkArgument( + config.getExportInterval().compareTo(ZERO) > 0, "Interval duration must be positive"); + return config; + } + } +} diff --git a/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporter.java b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporter.java new file mode 100644 index 00000000..f7915b71 --- /dev/null +++ b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporter.java @@ -0,0 +1,109 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.exporter.stats.signalfx; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.opencensus.stats.Stats; +import io.opencensus.stats.ViewManager; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +/** + * Exporter to SignalFx. + * + * <p>Example of usage: + * + * <pre><code> + * public static void main(String[] args) { + * SignalFxStatsExporter.create(SignalFxStatsConfiguration.builder().build()); + * ... // Do work. + * } + * </code></pre> + * + * @since 0.11 + */ +public final class SignalFxStatsExporter { + + private static final Object monitor = new Object(); + + private final SignalFxStatsConfiguration configuration; + private final SignalFxStatsExporterWorkerThread workerThread; + + @GuardedBy("monitor") + @Nullable + private static SignalFxStatsExporter exporter = null; + + private SignalFxStatsExporter(SignalFxStatsConfiguration configuration, ViewManager viewManager) { + Preconditions.checkNotNull(configuration, "SignalFx stats exporter configuration"); + this.configuration = configuration; + this.workerThread = + new SignalFxStatsExporterWorkerThread( + SignalFxMetricsSenderFactory.DEFAULT, + configuration.getIngestEndpoint(), + configuration.getToken(), + configuration.getExportInterval(), + viewManager); + } + + /** + * Creates a SignalFx Stats exporter from the given {@link SignalFxStatsConfiguration}. + * + * <p>If {@code ingestEndpoint} is not set on the configuration, the exporter will use {@link + * SignalFxStatsConfiguration#DEFAULT_SIGNALFX_ENDPOINT}. + * + * <p>If {@code exportInterval} is not set on the configuration, the exporter will use {@link + * SignalFxStatsConfiguration#DEFAULT_EXPORT_INTERVAL}. + * + * @param configuration the {@code SignalFxStatsConfiguration}. + * @throws IllegalStateException if a SignalFx exporter is already created. + * @since 0.11 + */ + public static void create(SignalFxStatsConfiguration configuration) { + synchronized (monitor) { + Preconditions.checkState(exporter == null, "SignalFx stats exporter is already created."); + exporter = new SignalFxStatsExporter(configuration, Stats.getViewManager()); + exporter.workerThread.start(); + } + } + + @VisibleForTesting + static void unsafeResetExporter() { + synchronized (monitor) { + if (exporter != null) { + SignalFxStatsExporterWorkerThread workerThread = exporter.workerThread; + if (workerThread != null && workerThread.isAlive()) { + try { + workerThread.interrupt(); + workerThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + exporter = null; + } + } + } + + @VisibleForTesting + @Nullable + static SignalFxStatsConfiguration unsafeGetConfig() { + synchronized (monitor) { + return exporter != null ? exporter.configuration : null; + } + } +} diff --git a/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThread.java b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThread.java new file mode 100644 index 00000000..348778e2 --- /dev/null +++ b/exporters/stats/signalfx/src/main/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThread.java @@ -0,0 +1,105 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.exporter.stats.signalfx; + +import com.google.common.annotations.VisibleForTesting; +import com.signalfx.metrics.errorhandler.MetricError; +import com.signalfx.metrics.errorhandler.OnSendErrorHandler; +import com.signalfx.metrics.flush.AggregateMetricSender; +import com.signalfx.metrics.flush.AggregateMetricSender.Session; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.DataPoint; +import io.opencensus.common.Duration; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.stats.ViewManager; +import java.io.IOException; +import java.net.URI; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Worker {@code Thread} that polls ViewData from the Stats's ViewManager and exports to SignalFx. + * + * <p>{@code SignalFxStatsExporterWorkerThread} is a daemon {@code Thread} + */ +final class SignalFxStatsExporterWorkerThread extends Thread { + + private static final Logger logger = + Logger.getLogger(SignalFxStatsExporterWorkerThread.class.getName()); + + private static final OnSendErrorHandler ERROR_HANDLER = + new OnSendErrorHandler() { + @Override + public void handleError(MetricError error) { + logger.log(Level.WARNING, "Unable to send metrics to SignalFx: {0}", error.getMessage()); + } + }; + + private final long intervalMs; + private final ViewManager views; + private final AggregateMetricSender sender; + + SignalFxStatsExporterWorkerThread( + SignalFxMetricsSenderFactory factory, + URI endpoint, + String token, + Duration interval, + ViewManager views) { + this.intervalMs = interval.toMillis(); + this.views = views; + this.sender = factory.create(endpoint, token, ERROR_HANDLER); + + setDaemon(true); + setName(getClass().getSimpleName()); + logger.log(Level.FINE, "Initialized SignalFx exporter to {0}.", endpoint); + } + + @VisibleForTesting + void export() throws IOException { + Session session = sender.createSession(); + try { + for (View view : views.getAllExportedViews()) { + ViewData data = views.getView(view.getName()); + if (data == null) { + continue; + } + + for (DataPoint datapoint : SignalFxSessionAdaptor.adapt(data)) { + session.setDatapoint(datapoint); + } + } + } finally { + session.close(); + } + } + + @Override + public void run() { + while (true) { + try { + export(); + Thread.sleep(intervalMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } catch (Throwable e) { + logger.log(Level.WARNING, "Exception thrown by the SignalFx stats exporter", e); + } + } + logger.log(Level.INFO, "SignalFx stats exporter stopped."); + } +} diff --git a/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptorTest.java b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptorTest.java new file mode 100644 index 00000000..34f4dfa7 --- /dev/null +++ b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxSessionAdaptorTest.java @@ -0,0 +1,320 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.exporter.stats.signalfx; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.DataPoint; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Datum; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Dimension; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType; +import io.opencensus.common.Duration; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.AggregationData.CountData; +import io.opencensus.stats.AggregationData.DistributionData; +import io.opencensus.stats.AggregationData.LastValueDataDouble; +import io.opencensus.stats.AggregationData.LastValueDataLong; +import io.opencensus.stats.AggregationData.MeanData; +import io.opencensus.stats.AggregationData.SumDataDouble; +import io.opencensus.stats.AggregationData.SumDataLong; +import io.opencensus.stats.BucketBoundaries; +import io.opencensus.stats.View; +import io.opencensus.stats.View.AggregationWindow; +import io.opencensus.stats.View.Name; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class SignalFxSessionAdaptorTest { + + private static final Duration ONE_SECOND = Duration.create(1, 0); + + @Rule public final ExpectedException thrown = ExpectedException.none(); + + @Mock private View view; + + @Mock private ViewData viewData; + + @Before + public void setUp() { + Mockito.when(view.getName()).thenReturn(Name.create("view-name")); + Mockito.when(view.getColumns()).thenReturn(ImmutableList.of(TagKey.create("animal"))); + Mockito.when(viewData.getView()).thenReturn(view); + } + + @Test + public void checkMetricTypeFromAggregation() { + assertNull(SignalFxSessionAdaptor.getMetricTypeForAggregation(null, null)); + assertNull( + SignalFxSessionAdaptor.getMetricTypeForAggregation( + null, AggregationWindow.Cumulative.create())); + assertEquals( + MetricType.GAUGE, + SignalFxSessionAdaptor.getMetricTypeForAggregation( + Aggregation.Mean.create(), AggregationWindow.Cumulative.create())); + assertEquals( + MetricType.GAUGE, + SignalFxSessionAdaptor.getMetricTypeForAggregation( + Aggregation.Mean.create(), AggregationWindow.Interval.create(ONE_SECOND))); + assertEquals( + MetricType.CUMULATIVE_COUNTER, + SignalFxSessionAdaptor.getMetricTypeForAggregation( + Aggregation.Count.create(), AggregationWindow.Cumulative.create())); + assertEquals( + MetricType.CUMULATIVE_COUNTER, + SignalFxSessionAdaptor.getMetricTypeForAggregation( + Aggregation.Sum.create(), AggregationWindow.Cumulative.create())); + assertNull( + SignalFxSessionAdaptor.getMetricTypeForAggregation(Aggregation.Count.create(), null)); + assertNull(SignalFxSessionAdaptor.getMetricTypeForAggregation(Aggregation.Sum.create(), null)); + assertNull( + SignalFxSessionAdaptor.getMetricTypeForAggregation( + Aggregation.Count.create(), AggregationWindow.Interval.create(ONE_SECOND))); + assertNull( + SignalFxSessionAdaptor.getMetricTypeForAggregation( + Aggregation.Sum.create(), AggregationWindow.Interval.create(ONE_SECOND))); + assertNull( + SignalFxSessionAdaptor.getMetricTypeForAggregation( + Aggregation.Distribution.create(BucketBoundaries.create(ImmutableList.of(3.15d))), + AggregationWindow.Cumulative.create())); + assertEquals( + MetricType.GAUGE, + SignalFxSessionAdaptor.getMetricTypeForAggregation( + Aggregation.LastValue.create(), AggregationWindow.Cumulative.create())); + assertEquals( + MetricType.GAUGE, + SignalFxSessionAdaptor.getMetricTypeForAggregation( + Aggregation.LastValue.create(), AggregationWindow.Interval.create(ONE_SECOND))); + } + + @Test + public void createDimensionsWithNonMatchingListSizes() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("don't have the same size"); + SignalFxSessionAdaptor.createDimensions( + ImmutableList.of(TagKey.create("animal"), TagKey.create("color")), + ImmutableList.of(TagValue.create("dog"))); + } + + @Test + public void createDimensionsIgnoresEmptyValues() { + List<Dimension> dimensions = + Lists.newArrayList( + SignalFxSessionAdaptor.createDimensions( + ImmutableList.of(TagKey.create("animal"), TagKey.create("color")), + ImmutableList.of(TagValue.create("dog"), TagValue.create("")))); + assertEquals(1, dimensions.size()); + assertEquals("animal", dimensions.get(0).getKey()); + assertEquals("dog", dimensions.get(0).getValue()); + } + + @Test + public void createDimension() { + Dimension dimension = + SignalFxSessionAdaptor.createDimension(TagKey.create("animal"), TagValue.create("dog")); + assertEquals("animal", dimension.getKey()); + assertEquals("dog", dimension.getValue()); + } + + @Test + public void unsupportedAggregationYieldsNoDatapoints() { + Mockito.when(view.getAggregation()) + .thenReturn( + Aggregation.Distribution.create(BucketBoundaries.create(ImmutableList.of(3.15d)))); + Mockito.when(view.getWindow()).thenReturn(AggregationWindow.Cumulative.create()); + List<DataPoint> datapoints = SignalFxSessionAdaptor.adapt(viewData); + assertEquals(0, datapoints.size()); + } + + @Test + public void noAggregationDataYieldsNoDatapoints() { + Mockito.when(view.getAggregation()).thenReturn(Aggregation.Count.create()); + Mockito.when(view.getWindow()).thenReturn(AggregationWindow.Cumulative.create()); + List<DataPoint> datapoints = SignalFxSessionAdaptor.adapt(viewData); + assertEquals(0, datapoints.size()); + } + + @Test + public void createDatumFromDoubleSum() { + SumDataDouble data = SumDataDouble.create(3.15d); + Datum datum = SignalFxSessionAdaptor.createDatum(data); + assertTrue(datum.hasDoubleValue()); + assertFalse(datum.hasIntValue()); + assertFalse(datum.hasStrValue()); + assertEquals(3.15d, datum.getDoubleValue(), 0d); + } + + @Test + public void createDatumFromLongSum() { + SumDataLong data = SumDataLong.create(42L); + Datum datum = SignalFxSessionAdaptor.createDatum(data); + assertFalse(datum.hasDoubleValue()); + assertTrue(datum.hasIntValue()); + assertFalse(datum.hasStrValue()); + assertEquals(42L, datum.getIntValue()); + } + + @Test + public void createDatumFromCount() { + CountData data = CountData.create(42L); + Datum datum = SignalFxSessionAdaptor.createDatum(data); + assertFalse(datum.hasDoubleValue()); + assertTrue(datum.hasIntValue()); + assertFalse(datum.hasStrValue()); + assertEquals(42L, datum.getIntValue()); + } + + @Test + public void createDatumFromMean() { + MeanData data = MeanData.create(3.15d, 2L); + Datum datum = SignalFxSessionAdaptor.createDatum(data); + assertTrue(datum.hasDoubleValue()); + assertFalse(datum.hasIntValue()); + assertFalse(datum.hasStrValue()); + assertEquals(3.15d, datum.getDoubleValue(), 0d); + } + + @Test + public void createDatumFromDistributionThrows() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Distribution aggregations are not supported"); + SignalFxSessionAdaptor.createDatum( + DistributionData.create(5, 2, 0, 10, 40, ImmutableList.of(1L))); + } + + @Test + public void createDatumFromLastValueDouble() { + LastValueDataDouble data = LastValueDataDouble.create(12.2); + Datum datum = SignalFxSessionAdaptor.createDatum(data); + assertTrue(datum.hasDoubleValue()); + assertFalse(datum.hasIntValue()); + assertFalse(datum.hasStrValue()); + assertEquals(12.2, datum.getDoubleValue(), 0d); + } + + @Test + public void createDatumFromLastValueLong() { + LastValueDataLong data = LastValueDataLong.create(100000); + Datum datum = SignalFxSessionAdaptor.createDatum(data); + assertFalse(datum.hasDoubleValue()); + assertTrue(datum.hasIntValue()); + assertFalse(datum.hasStrValue()); + assertEquals(100000, datum.getIntValue()); + } + + @Test + public void adaptViewIntoDatapoints() { + Map<List<TagValue>, AggregationData> map = + ImmutableMap.<List<TagValue>, AggregationData>of( + ImmutableList.of(TagValue.create("dog")), + SumDataLong.create(2L), + ImmutableList.of(TagValue.create("cat")), + SumDataLong.create(3L)); + Mockito.when(viewData.getAggregationMap()).thenReturn(map); + Mockito.when(view.getAggregation()).thenReturn(Aggregation.Count.create()); + Mockito.when(view.getWindow()).thenReturn(AggregationWindow.Cumulative.create()); + + List<DataPoint> datapoints = SignalFxSessionAdaptor.adapt(viewData); + assertEquals(2, datapoints.size()); + for (DataPoint dp : datapoints) { + assertEquals("view-name", dp.getMetric()); + assertEquals(MetricType.CUMULATIVE_COUNTER, dp.getMetricType()); + assertEquals(1, dp.getDimensionsCount()); + assertTrue(dp.hasValue()); + assertFalse(dp.hasSource()); + + Datum datum = dp.getValue(); + assertTrue(datum.hasIntValue()); + assertFalse(datum.hasDoubleValue()); + assertFalse(datum.hasStrValue()); + + Dimension dimension = dp.getDimensions(0); + assertEquals("animal", dimension.getKey()); + switch (dimension.getValue()) { + case "dog": + assertEquals(2L, datum.getIntValue()); + break; + case "cat": + assertEquals(3L, datum.getIntValue()); + break; + default: + fail("unexpected dimension value"); + } + } + } + + @Test + public void adaptViewWithEmptyTagValueIntoDatapoints() { + Map<List<TagValue>, AggregationData> map = + ImmutableMap.<List<TagValue>, AggregationData>of( + ImmutableList.of(TagValue.create("dog")), + SumDataLong.create(2L), + ImmutableList.of(TagValue.create("")), + SumDataLong.create(3L)); + Mockito.when(viewData.getAggregationMap()).thenReturn(map); + Mockito.when(view.getAggregation()).thenReturn(Aggregation.Count.create()); + Mockito.when(view.getWindow()).thenReturn(AggregationWindow.Cumulative.create()); + + List<DataPoint> datapoints = SignalFxSessionAdaptor.adapt(viewData); + assertEquals(2, datapoints.size()); + for (DataPoint dp : datapoints) { + assertEquals("view-name", dp.getMetric()); + assertEquals(MetricType.CUMULATIVE_COUNTER, dp.getMetricType()); + assertTrue(dp.hasValue()); + assertFalse(dp.hasSource()); + + Datum datum = dp.getValue(); + assertTrue(datum.hasIntValue()); + assertFalse(datum.hasDoubleValue()); + assertFalse(datum.hasStrValue()); + + switch (dp.getDimensionsCount()) { + case 0: + assertEquals(3L, datum.getIntValue()); + break; + case 1: + Dimension dimension = dp.getDimensions(0); + assertEquals("animal", dimension.getKey()); + assertEquals("dog", dimension.getValue()); + assertEquals(2L, datum.getIntValue()); + break; + default: + fail("Unexpected number of dimensions on the created datapoint"); + break; + } + } + } +} diff --git a/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfigurationTest.java b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfigurationTest.java new file mode 100644 index 00000000..1d3508fb --- /dev/null +++ b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsConfigurationTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.exporter.stats.signalfx; + +import static org.junit.Assert.assertEquals; + +import io.opencensus.common.Duration; +import java.net.URI; +import java.net.URISyntaxException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link SignalFxStatsConfiguration}. */ +@RunWith(JUnit4.class) +public class SignalFxStatsConfigurationTest { + + private static final String TEST_TOKEN = "token"; + + @Rule public final ExpectedException thrown = ExpectedException.none(); + + @Test + public void buildWithDefaults() { + SignalFxStatsConfiguration configuration = + SignalFxStatsConfiguration.builder().setToken(TEST_TOKEN).build(); + assertEquals(TEST_TOKEN, configuration.getToken()); + assertEquals( + SignalFxStatsConfiguration.DEFAULT_SIGNALFX_ENDPOINT, configuration.getIngestEndpoint()); + assertEquals( + SignalFxStatsConfiguration.DEFAULT_EXPORT_INTERVAL, configuration.getExportInterval()); + } + + @Test + public void buildWithFields() throws URISyntaxException { + URI url = new URI("http://example.com"); + Duration duration = Duration.create(5, 0); + SignalFxStatsConfiguration configuration = + SignalFxStatsConfiguration.builder() + .setToken(TEST_TOKEN) + .setIngestEndpoint(url) + .setExportInterval(duration) + .build(); + assertEquals(TEST_TOKEN, configuration.getToken()); + assertEquals(url, configuration.getIngestEndpoint()); + assertEquals(duration, configuration.getExportInterval()); + } + + @Test + public void sameConfigurationsAreEqual() { + SignalFxStatsConfiguration config1 = + SignalFxStatsConfiguration.builder().setToken(TEST_TOKEN).build(); + SignalFxStatsConfiguration config2 = + SignalFxStatsConfiguration.builder().setToken(TEST_TOKEN).build(); + assertEquals(config1, config2); + assertEquals(config1.hashCode(), config2.hashCode()); + } + + @Test + public void buildWithEmptyToken() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid SignalFx token"); + SignalFxStatsConfiguration.builder().setToken("").build(); + } + + @Test + public void buildWithNegativeDuration() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Interval duration must be positive"); + SignalFxStatsConfiguration.builder() + .setToken(TEST_TOKEN) + .setExportInterval(Duration.create(-1, 0)) + .build(); + } +} diff --git a/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterTest.java b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterTest.java new file mode 100644 index 00000000..cc5730b1 --- /dev/null +++ b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.exporter.stats.signalfx; + +import static org.junit.Assert.assertEquals; + +import io.opencensus.common.Duration; +import java.net.URI; +import java.net.URISyntaxException; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link SignalFxStatsExporter}. */ +@RunWith(JUnit4.class) +public class SignalFxStatsExporterTest { + + private static final String TEST_TOKEN = "token"; + private static final String TEST_ENDPOINT = "https://example.com"; + private static final Duration ONE_SECOND = Duration.create(1, 0); + + @Rule public final ExpectedException thrown = ExpectedException.none(); + + @After + public void tearDown() { + SignalFxStatsExporter.unsafeResetExporter(); + } + + @Test + public void createWithNullConfiguration() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("configuration"); + SignalFxStatsExporter.create(null); + } + + @Test + public void createWithNullHostUsesDefault() { + SignalFxStatsExporter.create(SignalFxStatsConfiguration.builder().setToken(TEST_TOKEN).build()); + assertEquals( + SignalFxStatsConfiguration.DEFAULT_SIGNALFX_ENDPOINT, + SignalFxStatsExporter.unsafeGetConfig().getIngestEndpoint()); + } + + @Test + public void createWithNullIntervalUsesDefault() { + SignalFxStatsExporter.create(SignalFxStatsConfiguration.builder().setToken(TEST_TOKEN).build()); + assertEquals( + SignalFxStatsConfiguration.DEFAULT_EXPORT_INTERVAL, + SignalFxStatsExporter.unsafeGetConfig().getExportInterval()); + } + + @Test + public void createExporterTwice() { + SignalFxStatsConfiguration config = + SignalFxStatsConfiguration.builder() + .setToken(TEST_TOKEN) + .setExportInterval(ONE_SECOND) + .build(); + SignalFxStatsExporter.create(config); + thrown.expect(IllegalStateException.class); + thrown.expectMessage("SignalFx stats exporter is already created."); + SignalFxStatsExporter.create(config); + } + + @Test + public void createWithConfiguration() throws URISyntaxException { + SignalFxStatsConfiguration config = + SignalFxStatsConfiguration.builder() + .setToken(TEST_TOKEN) + .setIngestEndpoint(new URI(TEST_ENDPOINT)) + .setExportInterval(ONE_SECOND) + .build(); + SignalFxStatsExporter.create(config); + assertEquals(config, SignalFxStatsExporter.unsafeGetConfig()); + } +} diff --git a/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThreadTest.java b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThreadTest.java new file mode 100644 index 00000000..d8852d5f --- /dev/null +++ b/exporters/stats/signalfx/src/test/java/io/opencensus/exporter/stats/signalfx/SignalFxStatsExporterWorkerThreadTest.java @@ -0,0 +1,149 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.exporter.stats.signalfx; + +import static org.hamcrest.CoreMatchers.startsWith; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.signalfx.metrics.errorhandler.OnSendErrorHandler; +import com.signalfx.metrics.flush.AggregateMetricSender; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.DataPoint; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Datum; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.Dimension; +import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType; +import io.opencensus.common.Duration; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.AggregationData.MeanData; +import io.opencensus.stats.View; +import io.opencensus.stats.View.AggregationWindow; +import io.opencensus.stats.View.Name; +import io.opencensus.stats.ViewData; +import io.opencensus.stats.ViewManager; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +@RunWith(MockitoJUnitRunner.class) +public class SignalFxStatsExporterWorkerThreadTest { + + private static final String TEST_TOKEN = "token"; + private static final Duration ONE_SECOND = Duration.create(1, 0); + + @Mock private AggregateMetricSender.Session session; + + @Mock private ViewManager viewManager; + + @Mock private SignalFxMetricsSenderFactory factory; + + private URI endpoint; + + @Before + public void setUp() throws Exception { + endpoint = new URI("http://example.com"); + + Mockito.when( + factory.create( + Mockito.any(URI.class), Mockito.anyString(), Mockito.any(OnSendErrorHandler.class))) + .thenAnswer( + new Answer<AggregateMetricSender>() { + @Override + public AggregateMetricSender answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + AggregateMetricSender sender = + SignalFxMetricsSenderFactory.DEFAULT.create( + (URI) args[0], (String) args[1], (OnSendErrorHandler) args[2]); + AggregateMetricSender spy = Mockito.spy(sender); + Mockito.doReturn(session).when(spy).createSession(); + return spy; + } + }); + } + + @Test + public void createThread() { + SignalFxStatsExporterWorkerThread thread = + new SignalFxStatsExporterWorkerThread( + factory, endpoint, TEST_TOKEN, ONE_SECOND, viewManager); + assertTrue(thread.isDaemon()); + assertThat(thread.getName(), startsWith("SignalFx")); + } + + @Test + public void senderThreadInterruptStopsLoop() throws InterruptedException { + Mockito.when(session.setDatapoint(Mockito.any(DataPoint.class))).thenReturn(session); + Mockito.when(viewManager.getAllExportedViews()).thenReturn(ImmutableSet.<View>of()); + + SignalFxStatsExporterWorkerThread thread = + new SignalFxStatsExporterWorkerThread( + factory, endpoint, TEST_TOKEN, ONE_SECOND, viewManager); + thread.start(); + thread.interrupt(); + thread.join(5000, 0); + assertFalse("Worker thread should have stopped", thread.isAlive()); + } + + @Test + public void setsDatapointsFromViewOnSession() throws IOException { + View view = Mockito.mock(View.class); + Name viewName = Name.create("test"); + Mockito.when(view.getName()).thenReturn(viewName); + Mockito.when(view.getAggregation()).thenReturn(Aggregation.Mean.create()); + Mockito.when(view.getWindow()).thenReturn(AggregationWindow.Cumulative.create()); + Mockito.when(view.getColumns()).thenReturn(ImmutableList.of(TagKey.create("animal"))); + + ViewData viewData = Mockito.mock(ViewData.class); + Mockito.when(viewData.getView()).thenReturn(view); + Mockito.when(viewData.getAggregationMap()) + .thenReturn( + ImmutableMap.<List<TagValue>, AggregationData>of( + ImmutableList.of(TagValue.create("cat")), MeanData.create(3.15d, 1))); + + Mockito.when(viewManager.getAllExportedViews()).thenReturn(ImmutableSet.of(view)); + Mockito.when(viewManager.getView(Mockito.eq(viewName))).thenReturn(viewData); + + SignalFxStatsExporterWorkerThread thread = + new SignalFxStatsExporterWorkerThread( + factory, endpoint, TEST_TOKEN, ONE_SECOND, viewManager); + thread.export(); + + DataPoint datapoint = + DataPoint.newBuilder() + .setMetric("test") + .setMetricType(MetricType.GAUGE) + .addDimensions(Dimension.newBuilder().setKey("animal").setValue("cat").build()) + .setValue(Datum.newBuilder().setDoubleValue(3.15d).build()) + .build(); + Mockito.verify(session).setDatapoint(Mockito.eq(datapoint)); + Mockito.verify(session).close(); + } +} |