diff options
author | Chris K Wensel <chris@wensel.net> | 2018-05-11 17:22:35 -0700 |
---|---|---|
committer | sebright <sebright@google.com> | 2018-05-11 17:22:35 -0700 |
commit | 83fd63784edaed486e43be5570549143375fdefc (patch) | |
tree | 7e80a2ac9528ae77ec2c10327f71a82218db5393 /impl | |
parent | 36c018e66414fd19632afd2c0f2c8e664274e2c3 (diff) | |
download | opencensus-java-83fd63784edaed486e43be5570549143375fdefc.tar.gz |
Adds Tracing.getExportComponent().flushAndShutdown() for use within application shutdown hooks. (#1141)
Adds the ability to flush pending spans via a call to Tracing.getExportComponent().shutdown()
This allows a developer to force a flush from within a shutdown hook or other means.
Unfortunately the underlying Disruptor instance only provides a #shutdown() call, not a flush, or a public method
for testing for backlog. Thus shutdown has propagated up to the above api call.
Diffstat (limited to 'impl')
-rw-r--r-- | impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java | 57 |
1 files changed, 49 insertions, 8 deletions
diff --git a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java index 0c58cb59..5145ca3b 100644 --- a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java +++ b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java @@ -24,6 +24,9 @@ import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import io.opencensus.implcore.internal.DaemonThreadFactory; import io.opencensus.implcore.internal.EventQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -85,6 +88,9 @@ import javax.annotation.concurrent.ThreadSafe; */ @ThreadSafe public final class DisruptorEventQueue implements EventQueue { + + private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName()); + // Number of events that can be enqueued at any one time. If more than this are enqueued, // then subsequent attempts to enqueue new entries will block. // TODO(aveitch): consider making this a parameter to the constructor, so the queue can be @@ -99,9 +105,11 @@ public final class DisruptorEventQueue implements EventQueue { // Ring Buffer for the {@link Disruptor} that underlies the queue. private final RingBuffer<DisruptorEvent> ringBuffer; + private volatile DisruptorEnqueuer enqueuer; + // Creates a new EventQueue. Private to prevent creation of non-singleton instance. // Suppress warnings for disruptor.handleEventsWith. - @SuppressWarnings({"unchecked"}) + @SuppressWarnings({"unchecked", "nullness"}) private DisruptorEventQueue() { // Create new Disruptor for processing. Note that Disruptor creates a single thread per // consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details); @@ -116,6 +124,20 @@ public final class DisruptorEventQueue implements EventQueue { disruptor.handleEventsWith(DisruptorEventHandler.INSTANCE); disruptor.start(); ringBuffer = disruptor.getRingBuffer(); + + enqueuer = + new DisruptorEnqueuer() { + @Override + public void enqueue(Entry entry) { + long sequence = ringBuffer.next(); + try { + DisruptorEvent event = ringBuffer.get(sequence); + event.setEntry(entry); + } finally { + ringBuffer.publish(sequence); + } + } + }; } /** @@ -134,17 +156,36 @@ public final class DisruptorEventQueue implements EventQueue { */ @Override public void enqueue(Entry entry) { - long sequence = ringBuffer.next(); - try { - DisruptorEvent event = ringBuffer.get(sequence); - event.setEntry(entry); - } finally { - ringBuffer.publish(sequence); - } + enqueuer.enqueue(entry); + } + + /** Shuts down the underlying disruptor. */ + @Override + public void shutdown() { + enqueuer = + new DisruptorEnqueuer() { + final AtomicBoolean logged = new AtomicBoolean(false); + + @Override + public void enqueue(Entry entry) { + if (!logged.getAndSet(true)) { + logger.log(Level.INFO, "Attempted to enqueue entry after Disruptor shutdown."); + } + } + }; + + disruptor.shutdown(); + } + + // Allows this event queue to safely shutdown by not enqueuing events on the ring buffer + private abstract static class DisruptorEnqueuer { + + public abstract void enqueue(Entry entry); } // An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry. private static final class DisruptorEvent { + // TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so // intuitively this variable must be volatile. @Nullable private volatile Entry entry = null; |