From fee27583a43f54b0bad105e51e3755b248fe4d4d Mon Sep 17 00:00:00 2001 From: Hailong Wen Date: Tue, 6 Feb 2018 10:57:58 -0800 Subject: Make `XxxSpanStoreImpl` abstract and add no-op implementation. (fixes #914) (#964) --- .../implcore/trace/export/ExportComponentImpl.java | 21 +- .../export/InProcessRunningSpanStoreImpl.java | 81 +++++ .../export/InProcessSampledSpanStoreImpl.java | 381 +++++++++++++++++++++ .../trace/export/RunningSpanStoreImpl.java | 79 ++--- .../trace/export/SampledSpanStoreImpl.java | 371 ++------------------ 5 files changed, 538 insertions(+), 395 deletions(-) create mode 100644 impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java create mode 100644 impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java (limited to 'impl_core/src/main/java/io/opencensus') diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java index 24042e84..e77d1f8e 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java @@ -21,7 +21,6 @@ import io.opencensus.implcore.internal.EventQueue; import io.opencensus.trace.export.ExportComponent; import io.opencensus.trace.export.RunningSpanStore; import io.opencensus.trace.export.SampledSpanStore; -import javax.annotation.Nullable; /** Implementation of the {@link ExportComponent}. */ public final class ExportComponentImpl extends ExportComponent { @@ -30,8 +29,8 @@ public final class ExportComponentImpl extends ExportComponent { private static final Duration EXPORTER_SCHEDULE_DELAY = Duration.create(5, 0); private final SpanExporterImpl spanExporter; - @Nullable private final RunningSpanStoreImpl runningSpanStore; - @Nullable private final SampledSpanStoreImpl sampledSpanStore; + private final RunningSpanStoreImpl runningSpanStore; + private final SampledSpanStoreImpl sampledSpanStore; @Override public SpanExporterImpl getSpanExporter() { @@ -39,17 +38,11 @@ public final class ExportComponentImpl extends ExportComponent { } @Override - // TODO(#914): This method shouldn't be nullable. - @SuppressWarnings("nullness") - @Nullable public RunningSpanStoreImpl getRunningSpanStore() { return runningSpanStore; } @Override - // TODO(#914): This method shouldn't be nullable. - @SuppressWarnings("nullness") - @Nullable public SampledSpanStoreImpl getSampledSpanStore() { return sampledSpanStore; } @@ -82,7 +75,13 @@ public final class ExportComponentImpl extends ExportComponent { */ private ExportComponentImpl(boolean supportInProcessStores, EventQueue eventQueue) { this.spanExporter = SpanExporterImpl.create(EXPORTER_BUFFER_SIZE, EXPORTER_SCHEDULE_DELAY); - this.runningSpanStore = supportInProcessStores ? new RunningSpanStoreImpl() : null; - this.sampledSpanStore = supportInProcessStores ? new SampledSpanStoreImpl(eventQueue) : null; + this.runningSpanStore = + supportInProcessStores + ? new InProcessRunningSpanStoreImpl() + : RunningSpanStoreImpl.getNoopRunningSpanStoreImpl(); + this.sampledSpanStore = + supportInProcessStores + ? new InProcessSampledSpanStoreImpl(eventQueue) + : SampledSpanStoreImpl.getNoopSampledSpanStoreImpl(); } } diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java new file mode 100644 index 00000000..3d8fb9ae --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java @@ -0,0 +1,81 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.export; + +import io.opencensus.implcore.trace.SpanImpl; +import io.opencensus.implcore.trace.internal.ConcurrentIntrusiveList; +import io.opencensus.trace.export.RunningSpanStore; +import io.opencensus.trace.export.SpanData; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.concurrent.ThreadSafe; + +/** In-process implementation of the {@link RunningSpanStore}. */ +@ThreadSafe +public final class InProcessRunningSpanStoreImpl extends RunningSpanStoreImpl { + private final ConcurrentIntrusiveList runningSpans; + + public InProcessRunningSpanStoreImpl() { + runningSpans = new ConcurrentIntrusiveList(); + } + + @Override + public void onStart(SpanImpl span) { + runningSpans.addElement(span); + } + + @Override + public void onEnd(SpanImpl span) { + runningSpans.removeElement(span); + } + + @Override + public Summary getSummary() { + Collection allRunningSpans = runningSpans.getAll(); + Map numSpansPerName = new HashMap(); + for (SpanImpl span : allRunningSpans) { + Integer prevValue = numSpansPerName.get(span.getName()); + numSpansPerName.put(span.getName(), prevValue != null ? prevValue + 1 : 1); + } + Map perSpanNameSummary = new HashMap(); + for (Map.Entry it : numSpansPerName.entrySet()) { + perSpanNameSummary.put(it.getKey(), PerSpanNameSummary.create(it.getValue())); + } + Summary summary = Summary.create(perSpanNameSummary); + return summary; + } + + @Override + public Collection getRunningSpans(Filter filter) { + Collection allRunningSpans = runningSpans.getAll(); + int maxSpansToReturn = + filter.getMaxSpansToReturn() == 0 ? allRunningSpans.size() : filter.getMaxSpansToReturn(); + List ret = new ArrayList(maxSpansToReturn); + for (SpanImpl span : allRunningSpans) { + if (ret.size() == maxSpansToReturn) { + break; + } + if (span.getName().equals(filter.getSpanName())) { + ret.add(span.toSpanData()); + } + } + return ret; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java new file mode 100644 index 00000000..1bdb3f41 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java @@ -0,0 +1,381 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.export; + +import com.google.common.collect.EvictingQueue; +import io.opencensus.implcore.internal.EventQueue; +import io.opencensus.implcore.trace.SpanImpl; +import io.opencensus.trace.Status; +import io.opencensus.trace.Status.CanonicalCode; +import io.opencensus.trace.export.SampledSpanStore; +import io.opencensus.trace.export.SpanData; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +/** In-process implementation of the {@link SampledSpanStore}. */ +@ThreadSafe +public final class InProcessSampledSpanStoreImpl extends SampledSpanStoreImpl { + private static final int NUM_SAMPLES_PER_LATENCY_BUCKET = 10; + private static final int NUM_SAMPLES_PER_ERROR_BUCKET = 5; + private static final long TIME_BETWEEN_SAMPLES = TimeUnit.SECONDS.toNanos(1); + private static final int NUM_LATENCY_BUCKETS = LatencyBucketBoundaries.values().length; + // The total number of canonical codes - 1 (the OK code). + private static final int NUM_ERROR_BUCKETS = CanonicalCode.values().length - 1; + private static final int MAX_PER_SPAN_NAME_SAMPLES = + NUM_SAMPLES_PER_LATENCY_BUCKET * NUM_LATENCY_BUCKETS + + NUM_SAMPLES_PER_ERROR_BUCKET * NUM_ERROR_BUCKETS; + + // Used to stream the register/unregister events to the implementation to avoid lock contention + // between the main threads and the worker thread. + private final EventQueue eventQueue; + + @GuardedBy("samples") + private final Map samples; + + private static final class Bucket { + + private final EvictingQueue sampledSpansQueue; + private final EvictingQueue notSampledSpansQueue; + private long lastSampledNanoTime; + private long lastNotSampledNanoTime; + + private Bucket(int numSamples) { + sampledSpansQueue = EvictingQueue.create(numSamples); + notSampledSpansQueue = EvictingQueue.create(numSamples); + } + + private void considerForSampling(SpanImpl span) { + long spanEndNanoTime = span.getEndNanoTime(); + if (span.getContext().getTraceOptions().isSampled()) { + // Need to compare by doing the subtraction all the time because in case of an overflow, + // this may never sample again (at least for the next ~200 years). No real chance to + // overflow two times because that means the process runs for ~200 years. + if (spanEndNanoTime - lastSampledNanoTime > TIME_BETWEEN_SAMPLES) { + sampledSpansQueue.add(span); + lastSampledNanoTime = spanEndNanoTime; + } + } else { + // Need to compare by doing the subtraction all the time because in case of an overflow, + // this may never sample again (at least for the next ~200 years). No real chance to + // overflow two times because that means the process runs for ~200 years. + if (spanEndNanoTime - lastNotSampledNanoTime > TIME_BETWEEN_SAMPLES) { + notSampledSpansQueue.add(span); + lastNotSampledNanoTime = spanEndNanoTime; + } + } + } + + private void getSamples(int maxSpansToReturn, List output) { + getSamples(maxSpansToReturn, output, sampledSpansQueue); + getSamples(maxSpansToReturn, output, notSampledSpansQueue); + } + + private static void getSamples( + int maxSpansToReturn, List output, EvictingQueue queue) { + for (SpanImpl span : queue) { + if (output.size() >= maxSpansToReturn) { + break; + } + output.add(span); + } + } + + private void getSamplesFilteredByLatency( + long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn, List output) { + getSamplesFilteredByLatency( + latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, sampledSpansQueue); + getSamplesFilteredByLatency( + latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, notSampledSpansQueue); + } + + private static void getSamplesFilteredByLatency( + long latencyLowerNs, + long latencyUpperNs, + int maxSpansToReturn, + List output, + EvictingQueue queue) { + for (SpanImpl span : queue) { + if (output.size() >= maxSpansToReturn) { + break; + } + long spanLatencyNs = span.getLatencyNs(); + if (spanLatencyNs >= latencyLowerNs && spanLatencyNs < latencyUpperNs) { + output.add(span); + } + } + } + + private int getNumSamples() { + return sampledSpansQueue.size() + notSampledSpansQueue.size(); + } + } + + /** + * Keeps samples for a given span name. Samples for all the latency buckets and for all canonical + * codes other than OK. + */ + private static final class PerSpanNameSamples { + + private final Bucket[] latencyBuckets; + private final Bucket[] errorBuckets; + + private PerSpanNameSamples() { + latencyBuckets = new Bucket[NUM_LATENCY_BUCKETS]; + for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { + latencyBuckets[i] = new Bucket(NUM_SAMPLES_PER_LATENCY_BUCKET); + } + errorBuckets = new Bucket[NUM_ERROR_BUCKETS]; + for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { + errorBuckets[i] = new Bucket(NUM_SAMPLES_PER_ERROR_BUCKET); + } + } + + @Nullable + private Bucket getLatencyBucket(long latencyNs) { + for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { + LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i]; + if (latencyNs >= boundaries.getLatencyLowerNs() + && latencyNs < boundaries.getLatencyUpperNs()) { + return latencyBuckets[i]; + } + } + // latencyNs is negative or Long.MAX_VALUE, so this Span can be ignored. This cannot happen + // in real production because System#nanoTime is monotonic. + return null; + } + + private Bucket getErrorBucket(CanonicalCode code) { + return errorBuckets[code.value() - 1]; + } + + private void considerForSampling(SpanImpl span) { + Status status = span.getStatus(); + // Null status means running Span, this should not happen in production, but the library + // should not crash because of this. + if (status != null) { + Bucket bucket = + status.isOk() + ? getLatencyBucket(span.getLatencyNs()) + : getErrorBucket(status.getCanonicalCode()); + // If unable to find the bucket, ignore this Span. + if (bucket != null) { + bucket.considerForSampling(span); + } + } + } + + private Map getNumbersOfLatencySampledSpans() { + Map latencyBucketSummaries = + new EnumMap(LatencyBucketBoundaries.class); + for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { + latencyBucketSummaries.put( + LatencyBucketBoundaries.values()[i], latencyBuckets[i].getNumSamples()); + } + return latencyBucketSummaries; + } + + private Map getNumbersOfErrorSampledSpans() { + Map errorBucketSummaries = + new EnumMap(CanonicalCode.class); + for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { + errorBucketSummaries.put(CanonicalCode.values()[i + 1], errorBuckets[i].getNumSamples()); + } + return errorBucketSummaries; + } + + private List getErrorSamples(@Nullable CanonicalCode code, int maxSpansToReturn) { + ArrayList output = new ArrayList(maxSpansToReturn); + if (code != null) { + getErrorBucket(code).getSamples(maxSpansToReturn, output); + } else { + for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { + errorBuckets[i].getSamples(maxSpansToReturn, output); + } + } + return output; + } + + private List getLatencySamples( + long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn) { + ArrayList output = new ArrayList(maxSpansToReturn); + for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { + LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i]; + if (latencyUpperNs >= boundaries.getLatencyLowerNs() + && latencyLowerNs < boundaries.getLatencyUpperNs()) { + latencyBuckets[i].getSamplesFilteredByLatency( + latencyLowerNs, latencyUpperNs, maxSpansToReturn, output); + } + } + return output; + } + } + + /** Constructs a new {@code InProcessSampledSpanStoreImpl}. */ + InProcessSampledSpanStoreImpl(EventQueue eventQueue) { + samples = new HashMap(); + this.eventQueue = eventQueue; + } + + @Override + public Summary getSummary() { + Map ret = new HashMap(); + synchronized (samples) { + for (Map.Entry it : samples.entrySet()) { + ret.put( + it.getKey(), + PerSpanNameSummary.create( + it.getValue().getNumbersOfLatencySampledSpans(), + it.getValue().getNumbersOfErrorSampledSpans())); + } + } + return Summary.create(ret); + } + + @Override + public void considerForSampling(SpanImpl span) { + synchronized (samples) { + String spanName = span.getName(); + if (span.getSampleToLocalSpanStore() && !samples.containsKey(spanName)) { + samples.put(spanName, new PerSpanNameSamples()); + } + PerSpanNameSamples perSpanNameSamples = samples.get(spanName); + if (perSpanNameSamples != null) { + perSpanNameSamples.considerForSampling(span); + } + } + } + + @Override + public void registerSpanNamesForCollection(Collection spanNames) { + eventQueue.enqueue(new RegisterSpanNameEvent(this, spanNames)); + } + + private void internaltRegisterSpanNamesForCollection(Collection spanNames) { + synchronized (samples) { + for (String spanName : spanNames) { + if (!samples.containsKey(spanName)) { + samples.put(spanName, new PerSpanNameSamples()); + } + } + } + } + + private static final class RegisterSpanNameEvent implements EventQueue.Entry { + private final InProcessSampledSpanStoreImpl sampledSpanStore; + private final Collection spanNames; + + private RegisterSpanNameEvent( + InProcessSampledSpanStoreImpl sampledSpanStore, Collection spanNames) { + this.sampledSpanStore = sampledSpanStore; + this.spanNames = new ArrayList(spanNames); + } + + @Override + public void process() { + sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames); + } + } + + @Override + public void unregisterSpanNamesForCollection(Collection spanNames) { + eventQueue.enqueue(new UnregisterSpanNameEvent(this, spanNames)); + } + + private void internalUnregisterSpanNamesForCollection(Collection spanNames) { + synchronized (samples) { + samples.keySet().removeAll(spanNames); + } + } + + private static final class UnregisterSpanNameEvent implements EventQueue.Entry { + private final InProcessSampledSpanStoreImpl sampledSpanStore; + private final Collection spanNames; + + private UnregisterSpanNameEvent( + InProcessSampledSpanStoreImpl sampledSpanStore, Collection spanNames) { + this.sampledSpanStore = sampledSpanStore; + this.spanNames = new ArrayList(spanNames); + } + + @Override + public void process() { + sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames); + } + } + + @Override + public Set getRegisteredSpanNamesForCollection() { + synchronized (samples) { + return Collections.unmodifiableSet(new HashSet(samples.keySet())); + } + } + + @Override + public Collection getErrorSampledSpans(ErrorFilter filter) { + int numSpansToReturn = + filter.getMaxSpansToReturn() == 0 + ? MAX_PER_SPAN_NAME_SAMPLES + : filter.getMaxSpansToReturn(); + List spans = Collections.emptyList(); + // Try to not keep the lock to much, do the SpanImpl -> SpanData conversion outside the lock. + synchronized (samples) { + PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName()); + if (perSpanNameSamples != null) { + spans = perSpanNameSamples.getErrorSamples(filter.getCanonicalCode(), numSpansToReturn); + } + } + List ret = new ArrayList(spans.size()); + for (SpanImpl span : spans) { + ret.add(span.toSpanData()); + } + return Collections.unmodifiableList(ret); + } + + @Override + public Collection getLatencySampledSpans(LatencyFilter filter) { + int numSpansToReturn = + filter.getMaxSpansToReturn() == 0 + ? MAX_PER_SPAN_NAME_SAMPLES + : filter.getMaxSpansToReturn(); + List spans = Collections.emptyList(); + // Try to not keep the lock to much, do the SpanImpl -> SpanData conversion outside the lock. + synchronized (samples) { + PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName()); + if (perSpanNameSamples != null) { + spans = + perSpanNameSamples.getLatencySamples( + filter.getLatencyLowerNs(), filter.getLatencyUpperNs(), numSpansToReturn); + } + } + List ret = new ArrayList(spans.size()); + for (SpanImpl span : spans) { + ret.add(span.toSpanData()); + } + return Collections.unmodifiableList(ret); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java index 98a51707..53147def 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java @@ -1,5 +1,5 @@ /* - * Copyright 2017, OpenCensus Authors + * Copyright 2018, OpenCensus Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,23 +17,23 @@ package io.opencensus.implcore.trace.export; import io.opencensus.implcore.trace.SpanImpl; -import io.opencensus.implcore.trace.internal.ConcurrentIntrusiveList; import io.opencensus.trace.export.RunningSpanStore; +import io.opencensus.trace.export.RunningSpanStore.Filter; +import io.opencensus.trace.export.RunningSpanStore.PerSpanNameSummary; +import io.opencensus.trace.export.RunningSpanStore.Summary; import io.opencensus.trace.export.SpanData; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.annotation.concurrent.ThreadSafe; +import java.util.Collections; -/** Implementation of the {@link RunningSpanStore}. */ -@ThreadSafe -public final class RunningSpanStoreImpl extends RunningSpanStore { - private final ConcurrentIntrusiveList runningSpans; +/** Abstract implementation of the {@link RunningSpanStore}. */ +public abstract class RunningSpanStoreImpl extends RunningSpanStore { - public RunningSpanStoreImpl() { - runningSpans = new ConcurrentIntrusiveList(); + private static final RunningSpanStoreImpl NOOP_RUNNING_SPAN_STORE_IMPL = + new NoopRunningSpanStoreImpl(); + + /** Returns the no-op implementation of the {@link RunningSpanStoreImpl}. */ + static RunningSpanStoreImpl getNoopRunningSpanStoreImpl() { + return NOOP_RUNNING_SPAN_STORE_IMPL; } /** @@ -41,49 +41,34 @@ public final class RunningSpanStoreImpl extends RunningSpanStore { * * @param span the {@code Span} that started. */ - public void onStart(SpanImpl span) { - runningSpans.addElement(span); - } + public abstract void onStart(SpanImpl span); /** * Removes the {@code Span} from the running spans list when the {@code Span} ends. * * @param span the {@code Span} that ended. */ - public void onEnd(SpanImpl span) { - runningSpans.removeElement(span); - } + public abstract void onEnd(SpanImpl span); - @Override - public Summary getSummary() { - Collection allRunningSpans = runningSpans.getAll(); - Map numSpansPerName = new HashMap(); - for (SpanImpl span : allRunningSpans) { - Integer prevValue = numSpansPerName.get(span.getName()); - numSpansPerName.put(span.getName(), prevValue != null ? prevValue + 1 : 1); - } - Map perSpanNameSummary = new HashMap(); - for (Map.Entry it : numSpansPerName.entrySet()) { - perSpanNameSummary.put(it.getKey(), PerSpanNameSummary.create(it.getValue())); + private static final class NoopRunningSpanStoreImpl extends RunningSpanStoreImpl { + + private static final Summary EMPTY_SUMMARY = + RunningSpanStore.Summary.create(Collections.emptyMap()); + + @Override + public void onStart(SpanImpl span) {} + + @Override + public void onEnd(SpanImpl span) {} + + @Override + public Summary getSummary() { + return EMPTY_SUMMARY; } - Summary summary = Summary.create(perSpanNameSummary); - return summary; - } - @Override - public Collection getRunningSpans(Filter filter) { - Collection allRunningSpans = runningSpans.getAll(); - int maxSpansToReturn = - filter.getMaxSpansToReturn() == 0 ? allRunningSpans.size() : filter.getMaxSpansToReturn(); - List ret = new ArrayList(maxSpansToReturn); - for (SpanImpl span : allRunningSpans) { - if (ret.size() == maxSpansToReturn) { - break; - } - if (span.getName().equals(filter.getSpanName())) { - ret.add(span.toSpanData()); - } + @Override + public Collection getRunningSpans(Filter filter) { + return Collections.emptyList(); } - return ret; } } diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java index 0fad80fb..0c83a05a 100644 --- a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java @@ -1,5 +1,5 @@ /* - * Copyright 2017, OpenCensus Authors + * Copyright 2018, OpenCensus Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,244 +16,25 @@ package io.opencensus.implcore.trace.export; -import com.google.common.collect.EvictingQueue; -import io.opencensus.implcore.internal.EventQueue; import io.opencensus.implcore.trace.SpanImpl; -import io.opencensus.trace.Status; -import io.opencensus.trace.Status.CanonicalCode; import io.opencensus.trace.export.SampledSpanStore; +import io.opencensus.trace.export.SampledSpanStore.ErrorFilter; +import io.opencensus.trace.export.SampledSpanStore.LatencyFilter; +import io.opencensus.trace.export.SampledSpanStore.PerSpanNameSummary; +import io.opencensus.trace.export.SampledSpanStore.Summary; import io.opencensus.trace.export.SpanData; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; -/** Implementation of the {@link SampledSpanStore}. */ -@ThreadSafe -public final class SampledSpanStoreImpl extends SampledSpanStore { - private static final int NUM_SAMPLES_PER_LATENCY_BUCKET = 10; - private static final int NUM_SAMPLES_PER_ERROR_BUCKET = 5; - private static final long TIME_BETWEEN_SAMPLES = TimeUnit.SECONDS.toNanos(1); - private static final int NUM_LATENCY_BUCKETS = LatencyBucketBoundaries.values().length; - // The total number of canonical codes - 1 (the OK code). - private static final int NUM_ERROR_BUCKETS = CanonicalCode.values().length - 1; - private static final int MAX_PER_SPAN_NAME_SAMPLES = - NUM_SAMPLES_PER_LATENCY_BUCKET * NUM_LATENCY_BUCKETS - + NUM_SAMPLES_PER_ERROR_BUCKET * NUM_ERROR_BUCKETS; +/** Abstract implementation of the {@link SampledSpanStore}. */ +public abstract class SampledSpanStoreImpl extends SampledSpanStore { + private static final SampledSpanStoreImpl NOOP_SAMPLED_SPAN_STORE_IMPL = + new NoopSampledSpanStoreImpl(); - // Used to stream the register/unregister events to the implementation to avoid lock contention - // between the main threads and the worker thread. - private final EventQueue eventQueue; - - @GuardedBy("samples") - private final Map samples; - - private static final class Bucket { - - private final EvictingQueue sampledSpansQueue; - private final EvictingQueue notSampledSpansQueue; - private long lastSampledNanoTime; - private long lastNotSampledNanoTime; - - private Bucket(int numSamples) { - sampledSpansQueue = EvictingQueue.create(numSamples); - notSampledSpansQueue = EvictingQueue.create(numSamples); - } - - private void considerForSampling(SpanImpl span) { - long spanEndNanoTime = span.getEndNanoTime(); - if (span.getContext().getTraceOptions().isSampled()) { - // Need to compare by doing the subtraction all the time because in case of an overflow, - // this may never sample again (at least for the next ~200 years). No real chance to - // overflow two times because that means the process runs for ~200 years. - if (spanEndNanoTime - lastSampledNanoTime > TIME_BETWEEN_SAMPLES) { - sampledSpansQueue.add(span); - lastSampledNanoTime = spanEndNanoTime; - } - } else { - // Need to compare by doing the subtraction all the time because in case of an overflow, - // this may never sample again (at least for the next ~200 years). No real chance to - // overflow two times because that means the process runs for ~200 years. - if (spanEndNanoTime - lastNotSampledNanoTime > TIME_BETWEEN_SAMPLES) { - notSampledSpansQueue.add(span); - lastNotSampledNanoTime = spanEndNanoTime; - } - } - } - - private void getSamples(int maxSpansToReturn, List output) { - getSamples(maxSpansToReturn, output, sampledSpansQueue); - getSamples(maxSpansToReturn, output, notSampledSpansQueue); - } - - private static void getSamples( - int maxSpansToReturn, List output, EvictingQueue queue) { - for (SpanImpl span : queue) { - if (output.size() >= maxSpansToReturn) { - break; - } - output.add(span); - } - } - - private void getSamplesFilteredByLatency( - long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn, List output) { - getSamplesFilteredByLatency( - latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, sampledSpansQueue); - getSamplesFilteredByLatency( - latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, notSampledSpansQueue); - } - - private static void getSamplesFilteredByLatency( - long latencyLowerNs, - long latencyUpperNs, - int maxSpansToReturn, - List output, - EvictingQueue queue) { - for (SpanImpl span : queue) { - if (output.size() >= maxSpansToReturn) { - break; - } - long spanLatencyNs = span.getLatencyNs(); - if (spanLatencyNs >= latencyLowerNs && spanLatencyNs < latencyUpperNs) { - output.add(span); - } - } - } - - private int getNumSamples() { - return sampledSpansQueue.size() + notSampledSpansQueue.size(); - } - } - - /** - * Keeps samples for a given span name. Samples for all the latency buckets and for all canonical - * codes other than OK. - */ - private static final class PerSpanNameSamples { - - private final Bucket[] latencyBuckets; - private final Bucket[] errorBuckets; - - private PerSpanNameSamples() { - latencyBuckets = new Bucket[NUM_LATENCY_BUCKETS]; - for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { - latencyBuckets[i] = new Bucket(NUM_SAMPLES_PER_LATENCY_BUCKET); - } - errorBuckets = new Bucket[NUM_ERROR_BUCKETS]; - for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { - errorBuckets[i] = new Bucket(NUM_SAMPLES_PER_ERROR_BUCKET); - } - } - - @Nullable - private Bucket getLatencyBucket(long latencyNs) { - for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { - LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i]; - if (latencyNs >= boundaries.getLatencyLowerNs() - && latencyNs < boundaries.getLatencyUpperNs()) { - return latencyBuckets[i]; - } - } - // latencyNs is negative or Long.MAX_VALUE, so this Span can be ignored. This cannot happen - // in real production because System#nanoTime is monotonic. - return null; - } - - private Bucket getErrorBucket(CanonicalCode code) { - return errorBuckets[code.value() - 1]; - } - - private void considerForSampling(SpanImpl span) { - Status status = span.getStatus(); - // Null status means running Span, this should not happen in production, but the library - // should not crash because of this. - if (status != null) { - Bucket bucket = - status.isOk() - ? getLatencyBucket(span.getLatencyNs()) - : getErrorBucket(status.getCanonicalCode()); - // If unable to find the bucket, ignore this Span. - if (bucket != null) { - bucket.considerForSampling(span); - } - } - } - - private Map getNumbersOfLatencySampledSpans() { - Map latencyBucketSummaries = - new EnumMap(LatencyBucketBoundaries.class); - for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { - latencyBucketSummaries.put( - LatencyBucketBoundaries.values()[i], latencyBuckets[i].getNumSamples()); - } - return latencyBucketSummaries; - } - - private Map getNumbersOfErrorSampledSpans() { - Map errorBucketSummaries = - new EnumMap(CanonicalCode.class); - for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { - errorBucketSummaries.put(CanonicalCode.values()[i + 1], errorBuckets[i].getNumSamples()); - } - return errorBucketSummaries; - } - - private List getErrorSamples(@Nullable CanonicalCode code, int maxSpansToReturn) { - ArrayList output = new ArrayList(maxSpansToReturn); - if (code != null) { - getErrorBucket(code).getSamples(maxSpansToReturn, output); - } else { - for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { - errorBuckets[i].getSamples(maxSpansToReturn, output); - } - } - return output; - } - - private List getLatencySamples( - long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn) { - ArrayList output = new ArrayList(maxSpansToReturn); - for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { - LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i]; - if (latencyUpperNs >= boundaries.getLatencyLowerNs() - && latencyLowerNs < boundaries.getLatencyUpperNs()) { - latencyBuckets[i].getSamplesFilteredByLatency( - latencyLowerNs, latencyUpperNs, maxSpansToReturn, output); - } - } - return output; - } - } - - /** Constructs a new {@code SampledSpanStoreImpl}. */ - SampledSpanStoreImpl(EventQueue eventQueue) { - samples = new HashMap(); - this.eventQueue = eventQueue; - } - - @Override - public Summary getSummary() { - Map ret = new HashMap(); - synchronized (samples) { - for (Map.Entry it : samples.entrySet()) { - ret.put( - it.getKey(), - PerSpanNameSummary.create( - it.getValue().getNumbersOfLatencySampledSpans(), - it.getValue().getNumbersOfErrorSampledSpans())); - } - } - return Summary.create(ret); + /** Returns the new no-op implmentation of {@link SampledSpanStoreImpl}. */ + public static SampledSpanStoreImpl getNoopSampledSpanStoreImpl() { + return NOOP_SAMPLED_SPAN_STORE_IMPL; } /** @@ -262,125 +43,41 @@ public final class SampledSpanStoreImpl extends SampledSpanStore { * * @param span the span to be consider for storing into the store buckets. */ - public void considerForSampling(SpanImpl span) { - synchronized (samples) { - String spanName = span.getName(); - if (span.getSampleToLocalSpanStore() && !samples.containsKey(spanName)) { - samples.put(spanName, new PerSpanNameSamples()); - } - PerSpanNameSamples perSpanNameSamples = samples.get(spanName); - if (perSpanNameSamples != null) { - perSpanNameSamples.considerForSampling(span); - } - } - } - - @Override - public void registerSpanNamesForCollection(Collection spanNames) { - eventQueue.enqueue(new RegisterSpanNameEvent(this, spanNames)); - } + public abstract void considerForSampling(SpanImpl span); - private void internaltRegisterSpanNamesForCollection(Collection spanNames) { - synchronized (samples) { - for (String spanName : spanNames) { - if (!samples.containsKey(spanName)) { - samples.put(spanName, new PerSpanNameSamples()); - } - } - } - } - - private static final class RegisterSpanNameEvent implements EventQueue.Entry { - private final SampledSpanStoreImpl sampledSpanStore; - private final Collection spanNames; - - private RegisterSpanNameEvent( - SampledSpanStoreImpl sampledSpanStore, Collection spanNames) { - this.sampledSpanStore = sampledSpanStore; - this.spanNames = new ArrayList(spanNames); - } + private static final class NoopSampledSpanStoreImpl extends SampledSpanStoreImpl { + private static final Summary EMPTY_SUMMARY = + Summary.create(Collections.emptyMap()); + private static final Set EMPTY_REGISTERED_SPAN_NAMES = Collections.emptySet(); + private static final Collection EMPTY_SPANDATA = Collections.emptySet(); @Override - public void process() { - sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames); + public Summary getSummary() { + return EMPTY_SUMMARY; } - } - @Override - public void unregisterSpanNamesForCollection(Collection spanNames) { - eventQueue.enqueue(new UnregisterSpanNameEvent(this, spanNames)); - } - - private void internalUnregisterSpanNamesForCollection(Collection spanNames) { - synchronized (samples) { - samples.keySet().removeAll(spanNames); - } - } - - private static final class UnregisterSpanNameEvent implements EventQueue.Entry { - private final SampledSpanStoreImpl sampledSpanStore; - private final Collection spanNames; + @Override + public void considerForSampling(SpanImpl span) {} - private UnregisterSpanNameEvent( - SampledSpanStoreImpl sampledSpanStore, Collection spanNames) { - this.sampledSpanStore = sampledSpanStore; - this.spanNames = new ArrayList(spanNames); - } + @Override + public void registerSpanNamesForCollection(Collection spanNames) {} @Override - public void process() { - sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames); - } - } + public void unregisterSpanNamesForCollection(Collection spanNames) {} - @Override - public Set getRegisteredSpanNamesForCollection() { - synchronized (samples) { - return Collections.unmodifiableSet(new HashSet(samples.keySet())); + @Override + public Set getRegisteredSpanNamesForCollection() { + return EMPTY_REGISTERED_SPAN_NAMES; } - } - @Override - public Collection getErrorSampledSpans(ErrorFilter filter) { - int numSpansToReturn = - filter.getMaxSpansToReturn() == 0 - ? MAX_PER_SPAN_NAME_SAMPLES - : filter.getMaxSpansToReturn(); - List spans = Collections.emptyList(); - // Try to not keep the lock to much, do the SpanImpl -> SpanData conversion outside the lock. - synchronized (samples) { - PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName()); - if (perSpanNameSamples != null) { - spans = perSpanNameSamples.getErrorSamples(filter.getCanonicalCode(), numSpansToReturn); - } - } - List ret = new ArrayList(spans.size()); - for (SpanImpl span : spans) { - ret.add(span.toSpanData()); + @Override + public Collection getErrorSampledSpans(ErrorFilter filter) { + return EMPTY_SPANDATA; } - return Collections.unmodifiableList(ret); - } - @Override - public Collection getLatencySampledSpans(LatencyFilter filter) { - int numSpansToReturn = - filter.getMaxSpansToReturn() == 0 - ? MAX_PER_SPAN_NAME_SAMPLES - : filter.getMaxSpansToReturn(); - List spans = Collections.emptyList(); - // Try to not keep the lock to much, do the SpanImpl -> SpanData conversion outside the lock. - synchronized (samples) { - PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName()); - if (perSpanNameSamples != null) { - spans = - perSpanNameSamples.getLatencySamples( - filter.getLatencyLowerNs(), filter.getLatencyUpperNs(), numSpansToReturn); - } - } - List ret = new ArrayList(spans.size()); - for (SpanImpl span : spans) { - ret.add(span.toSpanData()); + @Override + public Collection getLatencySampledSpans(LatencyFilter filter) { + return EMPTY_SPANDATA; } - return Collections.unmodifiableList(ret); } } -- cgit v1.2.3