aboutsummaryrefslogtreecommitdiff
path: root/impl_core/src/main/java/io/opencensus
diff options
context:
space:
mode:
authorHailong Wen <youxiabsyw@gmail.com>2018-02-06 10:57:58 -0800
committerGitHub <noreply@github.com>2018-02-06 10:57:58 -0800
commitfee27583a43f54b0bad105e51e3755b248fe4d4d (patch)
treeb4afbde0bac956d19ec733d301be24ad5c0d8752 /impl_core/src/main/java/io/opencensus
parent67b1f37cd91688cf42fe2ff1adc8a42ef32477d9 (diff)
downloadopencensus-java-fee27583a43f54b0bad105e51e3755b248fe4d4d.tar.gz
Make `XxxSpanStoreImpl` abstract and add no-op implementation. (fixes #914) (#964)
Diffstat (limited to 'impl_core/src/main/java/io/opencensus')
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java21
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java81
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java381
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java79
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java371
5 files changed, 538 insertions, 395 deletions
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java
index 24042e84..e77d1f8e 100644
--- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java
@@ -21,7 +21,6 @@ import io.opencensus.implcore.internal.EventQueue;
import io.opencensus.trace.export.ExportComponent;
import io.opencensus.trace.export.RunningSpanStore;
import io.opencensus.trace.export.SampledSpanStore;
-import javax.annotation.Nullable;
/** Implementation of the {@link ExportComponent}. */
public final class ExportComponentImpl extends ExportComponent {
@@ -30,8 +29,8 @@ public final class ExportComponentImpl extends ExportComponent {
private static final Duration EXPORTER_SCHEDULE_DELAY = Duration.create(5, 0);
private final SpanExporterImpl spanExporter;
- @Nullable private final RunningSpanStoreImpl runningSpanStore;
- @Nullable private final SampledSpanStoreImpl sampledSpanStore;
+ private final RunningSpanStoreImpl runningSpanStore;
+ private final SampledSpanStoreImpl sampledSpanStore;
@Override
public SpanExporterImpl getSpanExporter() {
@@ -39,17 +38,11 @@ public final class ExportComponentImpl extends ExportComponent {
}
@Override
- // TODO(#914): This method shouldn't be nullable.
- @SuppressWarnings("nullness")
- @Nullable
public RunningSpanStoreImpl getRunningSpanStore() {
return runningSpanStore;
}
@Override
- // TODO(#914): This method shouldn't be nullable.
- @SuppressWarnings("nullness")
- @Nullable
public SampledSpanStoreImpl getSampledSpanStore() {
return sampledSpanStore;
}
@@ -82,7 +75,13 @@ public final class ExportComponentImpl extends ExportComponent {
*/
private ExportComponentImpl(boolean supportInProcessStores, EventQueue eventQueue) {
this.spanExporter = SpanExporterImpl.create(EXPORTER_BUFFER_SIZE, EXPORTER_SCHEDULE_DELAY);
- this.runningSpanStore = supportInProcessStores ? new RunningSpanStoreImpl() : null;
- this.sampledSpanStore = supportInProcessStores ? new SampledSpanStoreImpl(eventQueue) : null;
+ this.runningSpanStore =
+ supportInProcessStores
+ ? new InProcessRunningSpanStoreImpl()
+ : RunningSpanStoreImpl.getNoopRunningSpanStoreImpl();
+ this.sampledSpanStore =
+ supportInProcessStores
+ ? new InProcessSampledSpanStoreImpl(eventQueue)
+ : SampledSpanStoreImpl.getNoopSampledSpanStoreImpl();
}
}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java
new file mode 100644
index 00000000..3d8fb9ae
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.export;
+
+import io.opencensus.implcore.trace.SpanImpl;
+import io.opencensus.implcore.trace.internal.ConcurrentIntrusiveList;
+import io.opencensus.trace.export.RunningSpanStore;
+import io.opencensus.trace.export.SpanData;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.ThreadSafe;
+
+/** In-process implementation of the {@link RunningSpanStore}. */
+@ThreadSafe
+public final class InProcessRunningSpanStoreImpl extends RunningSpanStoreImpl {
+ private final ConcurrentIntrusiveList<SpanImpl> runningSpans;
+
+ public InProcessRunningSpanStoreImpl() {
+ runningSpans = new ConcurrentIntrusiveList<SpanImpl>();
+ }
+
+ @Override
+ public void onStart(SpanImpl span) {
+ runningSpans.addElement(span);
+ }
+
+ @Override
+ public void onEnd(SpanImpl span) {
+ runningSpans.removeElement(span);
+ }
+
+ @Override
+ public Summary getSummary() {
+ Collection<SpanImpl> allRunningSpans = runningSpans.getAll();
+ Map<String, Integer> numSpansPerName = new HashMap<String, Integer>();
+ for (SpanImpl span : allRunningSpans) {
+ Integer prevValue = numSpansPerName.get(span.getName());
+ numSpansPerName.put(span.getName(), prevValue != null ? prevValue + 1 : 1);
+ }
+ Map<String, PerSpanNameSummary> perSpanNameSummary = new HashMap<String, PerSpanNameSummary>();
+ for (Map.Entry<String, Integer> it : numSpansPerName.entrySet()) {
+ perSpanNameSummary.put(it.getKey(), PerSpanNameSummary.create(it.getValue()));
+ }
+ Summary summary = Summary.create(perSpanNameSummary);
+ return summary;
+ }
+
+ @Override
+ public Collection<SpanData> getRunningSpans(Filter filter) {
+ Collection<SpanImpl> allRunningSpans = runningSpans.getAll();
+ int maxSpansToReturn =
+ filter.getMaxSpansToReturn() == 0 ? allRunningSpans.size() : filter.getMaxSpansToReturn();
+ List<SpanData> ret = new ArrayList<SpanData>(maxSpansToReturn);
+ for (SpanImpl span : allRunningSpans) {
+ if (ret.size() == maxSpansToReturn) {
+ break;
+ }
+ if (span.getName().equals(filter.getSpanName())) {
+ ret.add(span.toSpanData());
+ }
+ }
+ return ret;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java
new file mode 100644
index 00000000..1bdb3f41
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java
@@ -0,0 +1,381 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.export;
+
+import com.google.common.collect.EvictingQueue;
+import io.opencensus.implcore.internal.EventQueue;
+import io.opencensus.implcore.trace.SpanImpl;
+import io.opencensus.trace.Status;
+import io.opencensus.trace.Status.CanonicalCode;
+import io.opencensus.trace.export.SampledSpanStore;
+import io.opencensus.trace.export.SpanData;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+/** In-process implementation of the {@link SampledSpanStore}. */
+@ThreadSafe
+public final class InProcessSampledSpanStoreImpl extends SampledSpanStoreImpl {
+ private static final int NUM_SAMPLES_PER_LATENCY_BUCKET = 10;
+ private static final int NUM_SAMPLES_PER_ERROR_BUCKET = 5;
+ private static final long TIME_BETWEEN_SAMPLES = TimeUnit.SECONDS.toNanos(1);
+ private static final int NUM_LATENCY_BUCKETS = LatencyBucketBoundaries.values().length;
+ // The total number of canonical codes - 1 (the OK code).
+ private static final int NUM_ERROR_BUCKETS = CanonicalCode.values().length - 1;
+ private static final int MAX_PER_SPAN_NAME_SAMPLES =
+ NUM_SAMPLES_PER_LATENCY_BUCKET * NUM_LATENCY_BUCKETS
+ + NUM_SAMPLES_PER_ERROR_BUCKET * NUM_ERROR_BUCKETS;
+
+ // Used to stream the register/unregister events to the implementation to avoid lock contention
+ // between the main threads and the worker thread.
+ private final EventQueue eventQueue;
+
+ @GuardedBy("samples")
+ private final Map<String, PerSpanNameSamples> samples;
+
+ private static final class Bucket {
+
+ private final EvictingQueue<SpanImpl> sampledSpansQueue;
+ private final EvictingQueue<SpanImpl> notSampledSpansQueue;
+ private long lastSampledNanoTime;
+ private long lastNotSampledNanoTime;
+
+ private Bucket(int numSamples) {
+ sampledSpansQueue = EvictingQueue.create(numSamples);
+ notSampledSpansQueue = EvictingQueue.create(numSamples);
+ }
+
+ private void considerForSampling(SpanImpl span) {
+ long spanEndNanoTime = span.getEndNanoTime();
+ if (span.getContext().getTraceOptions().isSampled()) {
+ // Need to compare by doing the subtraction all the time because in case of an overflow,
+ // this may never sample again (at least for the next ~200 years). No real chance to
+ // overflow two times because that means the process runs for ~200 years.
+ if (spanEndNanoTime - lastSampledNanoTime > TIME_BETWEEN_SAMPLES) {
+ sampledSpansQueue.add(span);
+ lastSampledNanoTime = spanEndNanoTime;
+ }
+ } else {
+ // Need to compare by doing the subtraction all the time because in case of an overflow,
+ // this may never sample again (at least for the next ~200 years). No real chance to
+ // overflow two times because that means the process runs for ~200 years.
+ if (spanEndNanoTime - lastNotSampledNanoTime > TIME_BETWEEN_SAMPLES) {
+ notSampledSpansQueue.add(span);
+ lastNotSampledNanoTime = spanEndNanoTime;
+ }
+ }
+ }
+
+ private void getSamples(int maxSpansToReturn, List<SpanImpl> output) {
+ getSamples(maxSpansToReturn, output, sampledSpansQueue);
+ getSamples(maxSpansToReturn, output, notSampledSpansQueue);
+ }
+
+ private static void getSamples(
+ int maxSpansToReturn, List<SpanImpl> output, EvictingQueue<SpanImpl> queue) {
+ for (SpanImpl span : queue) {
+ if (output.size() >= maxSpansToReturn) {
+ break;
+ }
+ output.add(span);
+ }
+ }
+
+ private void getSamplesFilteredByLatency(
+ long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn, List<SpanImpl> output) {
+ getSamplesFilteredByLatency(
+ latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, sampledSpansQueue);
+ getSamplesFilteredByLatency(
+ latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, notSampledSpansQueue);
+ }
+
+ private static void getSamplesFilteredByLatency(
+ long latencyLowerNs,
+ long latencyUpperNs,
+ int maxSpansToReturn,
+ List<SpanImpl> output,
+ EvictingQueue<SpanImpl> queue) {
+ for (SpanImpl span : queue) {
+ if (output.size() >= maxSpansToReturn) {
+ break;
+ }
+ long spanLatencyNs = span.getLatencyNs();
+ if (spanLatencyNs >= latencyLowerNs && spanLatencyNs < latencyUpperNs) {
+ output.add(span);
+ }
+ }
+ }
+
+ private int getNumSamples() {
+ return sampledSpansQueue.size() + notSampledSpansQueue.size();
+ }
+ }
+
+ /**
+ * Keeps samples for a given span name. Samples for all the latency buckets and for all canonical
+ * codes other than OK.
+ */
+ private static final class PerSpanNameSamples {
+
+ private final Bucket[] latencyBuckets;
+ private final Bucket[] errorBuckets;
+
+ private PerSpanNameSamples() {
+ latencyBuckets = new Bucket[NUM_LATENCY_BUCKETS];
+ for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
+ latencyBuckets[i] = new Bucket(NUM_SAMPLES_PER_LATENCY_BUCKET);
+ }
+ errorBuckets = new Bucket[NUM_ERROR_BUCKETS];
+ for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
+ errorBuckets[i] = new Bucket(NUM_SAMPLES_PER_ERROR_BUCKET);
+ }
+ }
+
+ @Nullable
+ private Bucket getLatencyBucket(long latencyNs) {
+ for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
+ LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i];
+ if (latencyNs >= boundaries.getLatencyLowerNs()
+ && latencyNs < boundaries.getLatencyUpperNs()) {
+ return latencyBuckets[i];
+ }
+ }
+ // latencyNs is negative or Long.MAX_VALUE, so this Span can be ignored. This cannot happen
+ // in real production because System#nanoTime is monotonic.
+ return null;
+ }
+
+ private Bucket getErrorBucket(CanonicalCode code) {
+ return errorBuckets[code.value() - 1];
+ }
+
+ private void considerForSampling(SpanImpl span) {
+ Status status = span.getStatus();
+ // Null status means running Span, this should not happen in production, but the library
+ // should not crash because of this.
+ if (status != null) {
+ Bucket bucket =
+ status.isOk()
+ ? getLatencyBucket(span.getLatencyNs())
+ : getErrorBucket(status.getCanonicalCode());
+ // If unable to find the bucket, ignore this Span.
+ if (bucket != null) {
+ bucket.considerForSampling(span);
+ }
+ }
+ }
+
+ private Map<LatencyBucketBoundaries, Integer> getNumbersOfLatencySampledSpans() {
+ Map<LatencyBucketBoundaries, Integer> latencyBucketSummaries =
+ new EnumMap<LatencyBucketBoundaries, Integer>(LatencyBucketBoundaries.class);
+ for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
+ latencyBucketSummaries.put(
+ LatencyBucketBoundaries.values()[i], latencyBuckets[i].getNumSamples());
+ }
+ return latencyBucketSummaries;
+ }
+
+ private Map<CanonicalCode, Integer> getNumbersOfErrorSampledSpans() {
+ Map<CanonicalCode, Integer> errorBucketSummaries =
+ new EnumMap<CanonicalCode, Integer>(CanonicalCode.class);
+ for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
+ errorBucketSummaries.put(CanonicalCode.values()[i + 1], errorBuckets[i].getNumSamples());
+ }
+ return errorBucketSummaries;
+ }
+
+ private List<SpanImpl> getErrorSamples(@Nullable CanonicalCode code, int maxSpansToReturn) {
+ ArrayList<SpanImpl> output = new ArrayList<SpanImpl>(maxSpansToReturn);
+ if (code != null) {
+ getErrorBucket(code).getSamples(maxSpansToReturn, output);
+ } else {
+ for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
+ errorBuckets[i].getSamples(maxSpansToReturn, output);
+ }
+ }
+ return output;
+ }
+
+ private List<SpanImpl> getLatencySamples(
+ long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn) {
+ ArrayList<SpanImpl> output = new ArrayList<SpanImpl>(maxSpansToReturn);
+ for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
+ LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i];
+ if (latencyUpperNs >= boundaries.getLatencyLowerNs()
+ && latencyLowerNs < boundaries.getLatencyUpperNs()) {
+ latencyBuckets[i].getSamplesFilteredByLatency(
+ latencyLowerNs, latencyUpperNs, maxSpansToReturn, output);
+ }
+ }
+ return output;
+ }
+ }
+
+ /** Constructs a new {@code InProcessSampledSpanStoreImpl}. */
+ InProcessSampledSpanStoreImpl(EventQueue eventQueue) {
+ samples = new HashMap<String, PerSpanNameSamples>();
+ this.eventQueue = eventQueue;
+ }
+
+ @Override
+ public Summary getSummary() {
+ Map<String, PerSpanNameSummary> ret = new HashMap<String, PerSpanNameSummary>();
+ synchronized (samples) {
+ for (Map.Entry<String, PerSpanNameSamples> it : samples.entrySet()) {
+ ret.put(
+ it.getKey(),
+ PerSpanNameSummary.create(
+ it.getValue().getNumbersOfLatencySampledSpans(),
+ it.getValue().getNumbersOfErrorSampledSpans()));
+ }
+ }
+ return Summary.create(ret);
+ }
+
+ @Override
+ public void considerForSampling(SpanImpl span) {
+ synchronized (samples) {
+ String spanName = span.getName();
+ if (span.getSampleToLocalSpanStore() && !samples.containsKey(spanName)) {
+ samples.put(spanName, new PerSpanNameSamples());
+ }
+ PerSpanNameSamples perSpanNameSamples = samples.get(spanName);
+ if (perSpanNameSamples != null) {
+ perSpanNameSamples.considerForSampling(span);
+ }
+ }
+ }
+
+ @Override
+ public void registerSpanNamesForCollection(Collection<String> spanNames) {
+ eventQueue.enqueue(new RegisterSpanNameEvent(this, spanNames));
+ }
+
+ private void internaltRegisterSpanNamesForCollection(Collection<String> spanNames) {
+ synchronized (samples) {
+ for (String spanName : spanNames) {
+ if (!samples.containsKey(spanName)) {
+ samples.put(spanName, new PerSpanNameSamples());
+ }
+ }
+ }
+ }
+
+ private static final class RegisterSpanNameEvent implements EventQueue.Entry {
+ private final InProcessSampledSpanStoreImpl sampledSpanStore;
+ private final Collection<String> spanNames;
+
+ private RegisterSpanNameEvent(
+ InProcessSampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) {
+ this.sampledSpanStore = sampledSpanStore;
+ this.spanNames = new ArrayList<String>(spanNames);
+ }
+
+ @Override
+ public void process() {
+ sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames);
+ }
+ }
+
+ @Override
+ public void unregisterSpanNamesForCollection(Collection<String> spanNames) {
+ eventQueue.enqueue(new UnregisterSpanNameEvent(this, spanNames));
+ }
+
+ private void internalUnregisterSpanNamesForCollection(Collection<String> spanNames) {
+ synchronized (samples) {
+ samples.keySet().removeAll(spanNames);
+ }
+ }
+
+ private static final class UnregisterSpanNameEvent implements EventQueue.Entry {
+ private final InProcessSampledSpanStoreImpl sampledSpanStore;
+ private final Collection<String> spanNames;
+
+ private UnregisterSpanNameEvent(
+ InProcessSampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) {
+ this.sampledSpanStore = sampledSpanStore;
+ this.spanNames = new ArrayList<String>(spanNames);
+ }
+
+ @Override
+ public void process() {
+ sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames);
+ }
+ }
+
+ @Override
+ public Set<String> getRegisteredSpanNamesForCollection() {
+ synchronized (samples) {
+ return Collections.unmodifiableSet(new HashSet<String>(samples.keySet()));
+ }
+ }
+
+ @Override
+ public Collection<SpanData> getErrorSampledSpans(ErrorFilter filter) {
+ int numSpansToReturn =
+ filter.getMaxSpansToReturn() == 0
+ ? MAX_PER_SPAN_NAME_SAMPLES
+ : filter.getMaxSpansToReturn();
+ List<SpanImpl> spans = Collections.emptyList();
+ // Try to not keep the lock to much, do the SpanImpl -> SpanData conversion outside the lock.
+ synchronized (samples) {
+ PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName());
+ if (perSpanNameSamples != null) {
+ spans = perSpanNameSamples.getErrorSamples(filter.getCanonicalCode(), numSpansToReturn);
+ }
+ }
+ List<SpanData> ret = new ArrayList<SpanData>(spans.size());
+ for (SpanImpl span : spans) {
+ ret.add(span.toSpanData());
+ }
+ return Collections.unmodifiableList(ret);
+ }
+
+ @Override
+ public Collection<SpanData> getLatencySampledSpans(LatencyFilter filter) {
+ int numSpansToReturn =
+ filter.getMaxSpansToReturn() == 0
+ ? MAX_PER_SPAN_NAME_SAMPLES
+ : filter.getMaxSpansToReturn();
+ List<SpanImpl> spans = Collections.emptyList();
+ // Try to not keep the lock to much, do the SpanImpl -> SpanData conversion outside the lock.
+ synchronized (samples) {
+ PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName());
+ if (perSpanNameSamples != null) {
+ spans =
+ perSpanNameSamples.getLatencySamples(
+ filter.getLatencyLowerNs(), filter.getLatencyUpperNs(), numSpansToReturn);
+ }
+ }
+ List<SpanData> ret = new ArrayList<SpanData>(spans.size());
+ for (SpanImpl span : spans) {
+ ret.add(span.toSpanData());
+ }
+ return Collections.unmodifiableList(ret);
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java
index 98a51707..53147def 100644
--- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017, OpenCensus Authors
+ * Copyright 2018, OpenCensus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,23 +17,23 @@
package io.opencensus.implcore.trace.export;
import io.opencensus.implcore.trace.SpanImpl;
-import io.opencensus.implcore.trace.internal.ConcurrentIntrusiveList;
import io.opencensus.trace.export.RunningSpanStore;
+import io.opencensus.trace.export.RunningSpanStore.Filter;
+import io.opencensus.trace.export.RunningSpanStore.PerSpanNameSummary;
+import io.opencensus.trace.export.RunningSpanStore.Summary;
import io.opencensus.trace.export.SpanData;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.concurrent.ThreadSafe;
+import java.util.Collections;
-/** Implementation of the {@link RunningSpanStore}. */
-@ThreadSafe
-public final class RunningSpanStoreImpl extends RunningSpanStore {
- private final ConcurrentIntrusiveList<SpanImpl> runningSpans;
+/** Abstract implementation of the {@link RunningSpanStore}. */
+public abstract class RunningSpanStoreImpl extends RunningSpanStore {
- public RunningSpanStoreImpl() {
- runningSpans = new ConcurrentIntrusiveList<SpanImpl>();
+ private static final RunningSpanStoreImpl NOOP_RUNNING_SPAN_STORE_IMPL =
+ new NoopRunningSpanStoreImpl();
+
+ /** Returns the no-op implementation of the {@link RunningSpanStoreImpl}. */
+ static RunningSpanStoreImpl getNoopRunningSpanStoreImpl() {
+ return NOOP_RUNNING_SPAN_STORE_IMPL;
}
/**
@@ -41,49 +41,34 @@ public final class RunningSpanStoreImpl extends RunningSpanStore {
*
* @param span the {@code Span} that started.
*/
- public void onStart(SpanImpl span) {
- runningSpans.addElement(span);
- }
+ public abstract void onStart(SpanImpl span);
/**
* Removes the {@code Span} from the running spans list when the {@code Span} ends.
*
* @param span the {@code Span} that ended.
*/
- public void onEnd(SpanImpl span) {
- runningSpans.removeElement(span);
- }
+ public abstract void onEnd(SpanImpl span);
- @Override
- public Summary getSummary() {
- Collection<SpanImpl> allRunningSpans = runningSpans.getAll();
- Map<String, Integer> numSpansPerName = new HashMap<String, Integer>();
- for (SpanImpl span : allRunningSpans) {
- Integer prevValue = numSpansPerName.get(span.getName());
- numSpansPerName.put(span.getName(), prevValue != null ? prevValue + 1 : 1);
- }
- Map<String, PerSpanNameSummary> perSpanNameSummary = new HashMap<String, PerSpanNameSummary>();
- for (Map.Entry<String, Integer> it : numSpansPerName.entrySet()) {
- perSpanNameSummary.put(it.getKey(), PerSpanNameSummary.create(it.getValue()));
+ private static final class NoopRunningSpanStoreImpl extends RunningSpanStoreImpl {
+
+ private static final Summary EMPTY_SUMMARY =
+ RunningSpanStore.Summary.create(Collections.<String, PerSpanNameSummary>emptyMap());
+
+ @Override
+ public void onStart(SpanImpl span) {}
+
+ @Override
+ public void onEnd(SpanImpl span) {}
+
+ @Override
+ public Summary getSummary() {
+ return EMPTY_SUMMARY;
}
- Summary summary = Summary.create(perSpanNameSummary);
- return summary;
- }
- @Override
- public Collection<SpanData> getRunningSpans(Filter filter) {
- Collection<SpanImpl> allRunningSpans = runningSpans.getAll();
- int maxSpansToReturn =
- filter.getMaxSpansToReturn() == 0 ? allRunningSpans.size() : filter.getMaxSpansToReturn();
- List<SpanData> ret = new ArrayList<SpanData>(maxSpansToReturn);
- for (SpanImpl span : allRunningSpans) {
- if (ret.size() == maxSpansToReturn) {
- break;
- }
- if (span.getName().equals(filter.getSpanName())) {
- ret.add(span.toSpanData());
- }
+ @Override
+ public Collection<SpanData> getRunningSpans(Filter filter) {
+ return Collections.<SpanData>emptyList();
}
- return ret;
}
}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java
index 0fad80fb..0c83a05a 100644
--- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017, OpenCensus Authors
+ * Copyright 2018, OpenCensus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,244 +16,25 @@
package io.opencensus.implcore.trace.export;
-import com.google.common.collect.EvictingQueue;
-import io.opencensus.implcore.internal.EventQueue;
import io.opencensus.implcore.trace.SpanImpl;
-import io.opencensus.trace.Status;
-import io.opencensus.trace.Status.CanonicalCode;
import io.opencensus.trace.export.SampledSpanStore;
+import io.opencensus.trace.export.SampledSpanStore.ErrorFilter;
+import io.opencensus.trace.export.SampledSpanStore.LatencyFilter;
+import io.opencensus.trace.export.SampledSpanStore.PerSpanNameSummary;
+import io.opencensus.trace.export.SampledSpanStore.Summary;
import io.opencensus.trace.export.SpanData;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-/** Implementation of the {@link SampledSpanStore}. */
-@ThreadSafe
-public final class SampledSpanStoreImpl extends SampledSpanStore {
- private static final int NUM_SAMPLES_PER_LATENCY_BUCKET = 10;
- private static final int NUM_SAMPLES_PER_ERROR_BUCKET = 5;
- private static final long TIME_BETWEEN_SAMPLES = TimeUnit.SECONDS.toNanos(1);
- private static final int NUM_LATENCY_BUCKETS = LatencyBucketBoundaries.values().length;
- // The total number of canonical codes - 1 (the OK code).
- private static final int NUM_ERROR_BUCKETS = CanonicalCode.values().length - 1;
- private static final int MAX_PER_SPAN_NAME_SAMPLES =
- NUM_SAMPLES_PER_LATENCY_BUCKET * NUM_LATENCY_BUCKETS
- + NUM_SAMPLES_PER_ERROR_BUCKET * NUM_ERROR_BUCKETS;
+/** Abstract implementation of the {@link SampledSpanStore}. */
+public abstract class SampledSpanStoreImpl extends SampledSpanStore {
+ private static final SampledSpanStoreImpl NOOP_SAMPLED_SPAN_STORE_IMPL =
+ new NoopSampledSpanStoreImpl();
- // Used to stream the register/unregister events to the implementation to avoid lock contention
- // between the main threads and the worker thread.
- private final EventQueue eventQueue;
-
- @GuardedBy("samples")
- private final Map<String, PerSpanNameSamples> samples;
-
- private static final class Bucket {
-
- private final EvictingQueue<SpanImpl> sampledSpansQueue;
- private final EvictingQueue<SpanImpl> notSampledSpansQueue;
- private long lastSampledNanoTime;
- private long lastNotSampledNanoTime;
-
- private Bucket(int numSamples) {
- sampledSpansQueue = EvictingQueue.create(numSamples);
- notSampledSpansQueue = EvictingQueue.create(numSamples);
- }
-
- private void considerForSampling(SpanImpl span) {
- long spanEndNanoTime = span.getEndNanoTime();
- if (span.getContext().getTraceOptions().isSampled()) {
- // Need to compare by doing the subtraction all the time because in case of an overflow,
- // this may never sample again (at least for the next ~200 years). No real chance to
- // overflow two times because that means the process runs for ~200 years.
- if (spanEndNanoTime - lastSampledNanoTime > TIME_BETWEEN_SAMPLES) {
- sampledSpansQueue.add(span);
- lastSampledNanoTime = spanEndNanoTime;
- }
- } else {
- // Need to compare by doing the subtraction all the time because in case of an overflow,
- // this may never sample again (at least for the next ~200 years). No real chance to
- // overflow two times because that means the process runs for ~200 years.
- if (spanEndNanoTime - lastNotSampledNanoTime > TIME_BETWEEN_SAMPLES) {
- notSampledSpansQueue.add(span);
- lastNotSampledNanoTime = spanEndNanoTime;
- }
- }
- }
-
- private void getSamples(int maxSpansToReturn, List<SpanImpl> output) {
- getSamples(maxSpansToReturn, output, sampledSpansQueue);
- getSamples(maxSpansToReturn, output, notSampledSpansQueue);
- }
-
- private static void getSamples(
- int maxSpansToReturn, List<SpanImpl> output, EvictingQueue<SpanImpl> queue) {
- for (SpanImpl span : queue) {
- if (output.size() >= maxSpansToReturn) {
- break;
- }
- output.add(span);
- }
- }
-
- private void getSamplesFilteredByLatency(
- long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn, List<SpanImpl> output) {
- getSamplesFilteredByLatency(
- latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, sampledSpansQueue);
- getSamplesFilteredByLatency(
- latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, notSampledSpansQueue);
- }
-
- private static void getSamplesFilteredByLatency(
- long latencyLowerNs,
- long latencyUpperNs,
- int maxSpansToReturn,
- List<SpanImpl> output,
- EvictingQueue<SpanImpl> queue) {
- for (SpanImpl span : queue) {
- if (output.size() >= maxSpansToReturn) {
- break;
- }
- long spanLatencyNs = span.getLatencyNs();
- if (spanLatencyNs >= latencyLowerNs && spanLatencyNs < latencyUpperNs) {
- output.add(span);
- }
- }
- }
-
- private int getNumSamples() {
- return sampledSpansQueue.size() + notSampledSpansQueue.size();
- }
- }
-
- /**
- * Keeps samples for a given span name. Samples for all the latency buckets and for all canonical
- * codes other than OK.
- */
- private static final class PerSpanNameSamples {
-
- private final Bucket[] latencyBuckets;
- private final Bucket[] errorBuckets;
-
- private PerSpanNameSamples() {
- latencyBuckets = new Bucket[NUM_LATENCY_BUCKETS];
- for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
- latencyBuckets[i] = new Bucket(NUM_SAMPLES_PER_LATENCY_BUCKET);
- }
- errorBuckets = new Bucket[NUM_ERROR_BUCKETS];
- for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
- errorBuckets[i] = new Bucket(NUM_SAMPLES_PER_ERROR_BUCKET);
- }
- }
-
- @Nullable
- private Bucket getLatencyBucket(long latencyNs) {
- for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
- LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i];
- if (latencyNs >= boundaries.getLatencyLowerNs()
- && latencyNs < boundaries.getLatencyUpperNs()) {
- return latencyBuckets[i];
- }
- }
- // latencyNs is negative or Long.MAX_VALUE, so this Span can be ignored. This cannot happen
- // in real production because System#nanoTime is monotonic.
- return null;
- }
-
- private Bucket getErrorBucket(CanonicalCode code) {
- return errorBuckets[code.value() - 1];
- }
-
- private void considerForSampling(SpanImpl span) {
- Status status = span.getStatus();
- // Null status means running Span, this should not happen in production, but the library
- // should not crash because of this.
- if (status != null) {
- Bucket bucket =
- status.isOk()
- ? getLatencyBucket(span.getLatencyNs())
- : getErrorBucket(status.getCanonicalCode());
- // If unable to find the bucket, ignore this Span.
- if (bucket != null) {
- bucket.considerForSampling(span);
- }
- }
- }
-
- private Map<LatencyBucketBoundaries, Integer> getNumbersOfLatencySampledSpans() {
- Map<LatencyBucketBoundaries, Integer> latencyBucketSummaries =
- new EnumMap<LatencyBucketBoundaries, Integer>(LatencyBucketBoundaries.class);
- for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
- latencyBucketSummaries.put(
- LatencyBucketBoundaries.values()[i], latencyBuckets[i].getNumSamples());
- }
- return latencyBucketSummaries;
- }
-
- private Map<CanonicalCode, Integer> getNumbersOfErrorSampledSpans() {
- Map<CanonicalCode, Integer> errorBucketSummaries =
- new EnumMap<CanonicalCode, Integer>(CanonicalCode.class);
- for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
- errorBucketSummaries.put(CanonicalCode.values()[i + 1], errorBuckets[i].getNumSamples());
- }
- return errorBucketSummaries;
- }
-
- private List<SpanImpl> getErrorSamples(@Nullable CanonicalCode code, int maxSpansToReturn) {
- ArrayList<SpanImpl> output = new ArrayList<SpanImpl>(maxSpansToReturn);
- if (code != null) {
- getErrorBucket(code).getSamples(maxSpansToReturn, output);
- } else {
- for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
- errorBuckets[i].getSamples(maxSpansToReturn, output);
- }
- }
- return output;
- }
-
- private List<SpanImpl> getLatencySamples(
- long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn) {
- ArrayList<SpanImpl> output = new ArrayList<SpanImpl>(maxSpansToReturn);
- for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
- LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i];
- if (latencyUpperNs >= boundaries.getLatencyLowerNs()
- && latencyLowerNs < boundaries.getLatencyUpperNs()) {
- latencyBuckets[i].getSamplesFilteredByLatency(
- latencyLowerNs, latencyUpperNs, maxSpansToReturn, output);
- }
- }
- return output;
- }
- }
-
- /** Constructs a new {@code SampledSpanStoreImpl}. */
- SampledSpanStoreImpl(EventQueue eventQueue) {
- samples = new HashMap<String, PerSpanNameSamples>();
- this.eventQueue = eventQueue;
- }
-
- @Override
- public Summary getSummary() {
- Map<String, PerSpanNameSummary> ret = new HashMap<String, PerSpanNameSummary>();
- synchronized (samples) {
- for (Map.Entry<String, PerSpanNameSamples> it : samples.entrySet()) {
- ret.put(
- it.getKey(),
- PerSpanNameSummary.create(
- it.getValue().getNumbersOfLatencySampledSpans(),
- it.getValue().getNumbersOfErrorSampledSpans()));
- }
- }
- return Summary.create(ret);
+ /** Returns the new no-op implmentation of {@link SampledSpanStoreImpl}. */
+ public static SampledSpanStoreImpl getNoopSampledSpanStoreImpl() {
+ return NOOP_SAMPLED_SPAN_STORE_IMPL;
}
/**
@@ -262,125 +43,41 @@ public final class SampledSpanStoreImpl extends SampledSpanStore {
*
* @param span the span to be consider for storing into the store buckets.
*/
- public void considerForSampling(SpanImpl span) {
- synchronized (samples) {
- String spanName = span.getName();
- if (span.getSampleToLocalSpanStore() && !samples.containsKey(spanName)) {
- samples.put(spanName, new PerSpanNameSamples());
- }
- PerSpanNameSamples perSpanNameSamples = samples.get(spanName);
- if (perSpanNameSamples != null) {
- perSpanNameSamples.considerForSampling(span);
- }
- }
- }
-
- @Override
- public void registerSpanNamesForCollection(Collection<String> spanNames) {
- eventQueue.enqueue(new RegisterSpanNameEvent(this, spanNames));
- }
+ public abstract void considerForSampling(SpanImpl span);
- private void internaltRegisterSpanNamesForCollection(Collection<String> spanNames) {
- synchronized (samples) {
- for (String spanName : spanNames) {
- if (!samples.containsKey(spanName)) {
- samples.put(spanName, new PerSpanNameSamples());
- }
- }
- }
- }
-
- private static final class RegisterSpanNameEvent implements EventQueue.Entry {
- private final SampledSpanStoreImpl sampledSpanStore;
- private final Collection<String> spanNames;
-
- private RegisterSpanNameEvent(
- SampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) {
- this.sampledSpanStore = sampledSpanStore;
- this.spanNames = new ArrayList<String>(spanNames);
- }
+ private static final class NoopSampledSpanStoreImpl extends SampledSpanStoreImpl {
+ private static final Summary EMPTY_SUMMARY =
+ Summary.create(Collections.<String, PerSpanNameSummary>emptyMap());
+ private static final Set<String> EMPTY_REGISTERED_SPAN_NAMES = Collections.<String>emptySet();
+ private static final Collection<SpanData> EMPTY_SPANDATA = Collections.<SpanData>emptySet();
@Override
- public void process() {
- sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames);
+ public Summary getSummary() {
+ return EMPTY_SUMMARY;
}
- }
- @Override
- public void unregisterSpanNamesForCollection(Collection<String> spanNames) {
- eventQueue.enqueue(new UnregisterSpanNameEvent(this, spanNames));
- }
-
- private void internalUnregisterSpanNamesForCollection(Collection<String> spanNames) {
- synchronized (samples) {
- samples.keySet().removeAll(spanNames);
- }
- }
-
- private static final class UnregisterSpanNameEvent implements EventQueue.Entry {
- private final SampledSpanStoreImpl sampledSpanStore;
- private final Collection<String> spanNames;
+ @Override
+ public void considerForSampling(SpanImpl span) {}
- private UnregisterSpanNameEvent(
- SampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) {
- this.sampledSpanStore = sampledSpanStore;
- this.spanNames = new ArrayList<String>(spanNames);
- }
+ @Override
+ public void registerSpanNamesForCollection(Collection<String> spanNames) {}
@Override
- public void process() {
- sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames);
- }
- }
+ public void unregisterSpanNamesForCollection(Collection<String> spanNames) {}
- @Override
- public Set<String> getRegisteredSpanNamesForCollection() {
- synchronized (samples) {
- return Collections.unmodifiableSet(new HashSet<String>(samples.keySet()));
+ @Override
+ public Set<String> getRegisteredSpanNamesForCollection() {
+ return EMPTY_REGISTERED_SPAN_NAMES;
}
- }
- @Override
- public Collection<SpanData> getErrorSampledSpans(ErrorFilter filter) {
- int numSpansToReturn =
- filter.getMaxSpansToReturn() == 0
- ? MAX_PER_SPAN_NAME_SAMPLES
- : filter.getMaxSpansToReturn();
- List<SpanImpl> spans = Collections.emptyList();
- // Try to not keep the lock to much, do the SpanImpl -> SpanData conversion outside the lock.
- synchronized (samples) {
- PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName());
- if (perSpanNameSamples != null) {
- spans = perSpanNameSamples.getErrorSamples(filter.getCanonicalCode(), numSpansToReturn);
- }
- }
- List<SpanData> ret = new ArrayList<SpanData>(spans.size());
- for (SpanImpl span : spans) {
- ret.add(span.toSpanData());
+ @Override
+ public Collection<SpanData> getErrorSampledSpans(ErrorFilter filter) {
+ return EMPTY_SPANDATA;
}
- return Collections.unmodifiableList(ret);
- }
- @Override
- public Collection<SpanData> getLatencySampledSpans(LatencyFilter filter) {
- int numSpansToReturn =
- filter.getMaxSpansToReturn() == 0
- ? MAX_PER_SPAN_NAME_SAMPLES
- : filter.getMaxSpansToReturn();
- List<SpanImpl> spans = Collections.emptyList();
- // Try to not keep the lock to much, do the SpanImpl -> SpanData conversion outside the lock.
- synchronized (samples) {
- PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName());
- if (perSpanNameSamples != null) {
- spans =
- perSpanNameSamples.getLatencySamples(
- filter.getLatencyLowerNs(), filter.getLatencyUpperNs(), numSpansToReturn);
- }
- }
- List<SpanData> ret = new ArrayList<SpanData>(spans.size());
- for (SpanImpl span : spans) {
- ret.add(span.toSpanData());
+ @Override
+ public Collection<SpanData> getLatencySampledSpans(LatencyFilter filter) {
+ return EMPTY_SPANDATA;
}
- return Collections.unmodifiableList(ret);
}
}