diff options
author | Bogdan Drutu <bdrutu@google.com> | 2018-02-26 11:45:10 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-26 11:45:10 -0800 |
commit | 2e9b2f02c5b02e541e6941add4ee28f4fd44636c (patch) | |
tree | 577e684de6b7f31f1cc6bc436bd74f4d88e9be4e /impl | |
parent | 1c00ee83a02e10407fcdd1d4e7d8d9f2f669540e (diff) | |
download | opencensus-java-2e9b2f02c5b02e541e6941add4ee28f4fd44636c.tar.gz |
Fix bugs, fix warnings and update disruptor. (#1021)
* Update disruptor and fix warnings.
* Fix bug that holds the memory to all recorded events in disruptor.
Diffstat (limited to 'impl')
-rw-r--r-- | impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java | 41 |
1 files changed, 25 insertions, 16 deletions
diff --git a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java index 02fa1b4c..0c58cb59 100644 --- a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java +++ b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java @@ -24,7 +24,6 @@ import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import io.opencensus.implcore.internal.DaemonThreadFactory; import io.opencensus.implcore.internal.EventQueue; -import java.util.concurrent.Executors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -101,19 +100,20 @@ public final class DisruptorEventQueue implements EventQueue { private final RingBuffer<DisruptorEvent> ringBuffer; // Creates a new EventQueue. Private to prevent creation of non-singleton instance. - // Suppress warnings for disruptor.handleEventsWith and Disruptor constructor - @SuppressWarnings({"deprecation", "unchecked", "varargs"}) + // Suppress warnings for disruptor.handleEventsWith. + @SuppressWarnings({"unchecked"}) private DisruptorEventQueue() { - // Create new Disruptor for processing. Note that this uses a single thread for processing; this - // ensures that the event handler can take unsynchronized actions whenever possible. + // Create new Disruptor for processing. Note that Disruptor creates a single thread per + // consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details); + // this ensures that the event handler can take unsynchronized actions whenever possible. disruptor = - new Disruptor<DisruptorEvent>( - new DisruptorEventFactory(), + new Disruptor<>( + DisruptorEventFactory.INSTANCE, DISRUPTOR_BUFFER_SIZE, - Executors.newSingleThreadExecutor(new DaemonThreadFactory("OpenCensus.Disruptor")), + new DaemonThreadFactory("OpenCensus.Disruptor"), ProducerType.MULTI, new SleepingWaitStrategy()); - disruptor.handleEventsWith(new DisruptorEventHandler()); + disruptor.handleEventsWith(DisruptorEventHandler.INSTANCE); disruptor.start(); ringBuffer = disruptor.getRingBuffer(); } @@ -145,10 +145,12 @@ public final class DisruptorEventQueue implements EventQueue { // An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry. private static final class DisruptorEvent { - @Nullable private Entry entry; + // TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so + // intuitively this variable must be volatile. + @Nullable private volatile Entry entry = null; // Sets the EventQueueEntry associated with this DisruptorEvent. - void setEntry(Entry entry) { + void setEntry(@Nullable Entry entry) { this.entry = entry; } @@ -160,7 +162,9 @@ public final class DisruptorEventQueue implements EventQueue { } // Factory for DisruptorEvent. - private static final class DisruptorEventFactory implements EventFactory<DisruptorEvent> { + private enum DisruptorEventFactory implements EventFactory<DisruptorEvent> { + INSTANCE; + @Override public DisruptorEvent newInstance() { return new DisruptorEvent(); @@ -171,12 +175,17 @@ public final class DisruptorEventQueue implements EventQueue { * Every event that gets added to {@link EventQueue} will get processed here. Just calls the * underlying process() method. */ - private static final class DisruptorEventHandler implements EventHandler<DisruptorEvent> { + private enum DisruptorEventHandler implements EventHandler<DisruptorEvent> { + INSTANCE; + @Override - // TODO(sebright): Fix the Checker Framework warning. - @SuppressWarnings("nullness") public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) { - event.getEntry().process(); + Entry entry = event.getEntry(); + if (entry != null) { + entry.process(); + } + // Remove the reference to the previous entry to allow the memory to be gc'ed. + event.setEntry(null); } } } |