aboutsummaryrefslogtreecommitdiff
path: root/impl
diff options
context:
space:
mode:
authorChris K Wensel <chris@wensel.net>2018-05-11 17:22:35 -0700
committersebright <sebright@google.com>2018-05-11 17:22:35 -0700
commit83fd63784edaed486e43be5570549143375fdefc (patch)
tree7e80a2ac9528ae77ec2c10327f71a82218db5393 /impl
parent36c018e66414fd19632afd2c0f2c8e664274e2c3 (diff)
downloadopencensus-java-83fd63784edaed486e43be5570549143375fdefc.tar.gz
Adds Tracing.getExportComponent().flushAndShutdown() for use within application shutdown hooks. (#1141)
Adds the ability to flush pending spans via a call to Tracing.getExportComponent().shutdown() This allows a developer to force a flush from within a shutdown hook or other means. Unfortunately the underlying Disruptor instance only provides a #shutdown() call, not a flush, or a public method for testing for backlog. Thus shutdown has propagated up to the above api call.
Diffstat (limited to 'impl')
-rw-r--r--impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java57
1 files changed, 49 insertions, 8 deletions
diff --git a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java
index 0c58cb59..5145ca3b 100644
--- a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java
+++ b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java
@@ -24,6 +24,9 @@ import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.opencensus.implcore.internal.DaemonThreadFactory;
import io.opencensus.implcore.internal.EventQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -85,6 +88,9 @@ import javax.annotation.concurrent.ThreadSafe;
*/
@ThreadSafe
public final class DisruptorEventQueue implements EventQueue {
+
+ private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName());
+
// Number of events that can be enqueued at any one time. If more than this are enqueued,
// then subsequent attempts to enqueue new entries will block.
// TODO(aveitch): consider making this a parameter to the constructor, so the queue can be
@@ -99,9 +105,11 @@ public final class DisruptorEventQueue implements EventQueue {
// Ring Buffer for the {@link Disruptor} that underlies the queue.
private final RingBuffer<DisruptorEvent> ringBuffer;
+ private volatile DisruptorEnqueuer enqueuer;
+
// Creates a new EventQueue. Private to prevent creation of non-singleton instance.
// Suppress warnings for disruptor.handleEventsWith.
- @SuppressWarnings({"unchecked"})
+ @SuppressWarnings({"unchecked", "nullness"})
private DisruptorEventQueue() {
// Create new Disruptor for processing. Note that Disruptor creates a single thread per
// consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details);
@@ -116,6 +124,20 @@ public final class DisruptorEventQueue implements EventQueue {
disruptor.handleEventsWith(DisruptorEventHandler.INSTANCE);
disruptor.start();
ringBuffer = disruptor.getRingBuffer();
+
+ enqueuer =
+ new DisruptorEnqueuer() {
+ @Override
+ public void enqueue(Entry entry) {
+ long sequence = ringBuffer.next();
+ try {
+ DisruptorEvent event = ringBuffer.get(sequence);
+ event.setEntry(entry);
+ } finally {
+ ringBuffer.publish(sequence);
+ }
+ }
+ };
}
/**
@@ -134,17 +156,36 @@ public final class DisruptorEventQueue implements EventQueue {
*/
@Override
public void enqueue(Entry entry) {
- long sequence = ringBuffer.next();
- try {
- DisruptorEvent event = ringBuffer.get(sequence);
- event.setEntry(entry);
- } finally {
- ringBuffer.publish(sequence);
- }
+ enqueuer.enqueue(entry);
+ }
+
+ /** Shuts down the underlying disruptor. */
+ @Override
+ public void shutdown() {
+ enqueuer =
+ new DisruptorEnqueuer() {
+ final AtomicBoolean logged = new AtomicBoolean(false);
+
+ @Override
+ public void enqueue(Entry entry) {
+ if (!logged.getAndSet(true)) {
+ logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown.");
+ }
+ }
+ };
+
+ disruptor.shutdown();
+ }
+
+ // Allows this event queue to safely shutdown by not enqueuing events on the ring buffer
+ private abstract static class DisruptorEnqueuer {
+
+ public abstract void enqueue(Entry entry);
}
// An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
private static final class DisruptorEvent {
+
// TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so
// intuitively this variable must be volatile.
@Nullable private volatile Entry entry = null;