aboutsummaryrefslogtreecommitdiff
path: root/impl
diff options
context:
space:
mode:
authorBogdan Drutu <bdrutu@google.com>2018-02-26 11:45:10 -0800
committerGitHub <noreply@github.com>2018-02-26 11:45:10 -0800
commit2e9b2f02c5b02e541e6941add4ee28f4fd44636c (patch)
tree577e684de6b7f31f1cc6bc436bd74f4d88e9be4e /impl
parent1c00ee83a02e10407fcdd1d4e7d8d9f2f669540e (diff)
downloadopencensus-java-2e9b2f02c5b02e541e6941add4ee28f4fd44636c.tar.gz
Fix bugs, fix warnings and update disruptor. (#1021)
* Update disruptor and fix warnings. * Fix bug that holds the memory to all recorded events in disruptor.
Diffstat (limited to 'impl')
-rw-r--r--impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java41
1 files changed, 25 insertions, 16 deletions
diff --git a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java
index 02fa1b4c..0c58cb59 100644
--- a/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java
+++ b/impl/src/main/java/io/opencensus/impl/internal/DisruptorEventQueue.java
@@ -24,7 +24,6 @@ import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.opencensus.implcore.internal.DaemonThreadFactory;
import io.opencensus.implcore.internal.EventQueue;
-import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -101,19 +100,20 @@ public final class DisruptorEventQueue implements EventQueue {
private final RingBuffer<DisruptorEvent> ringBuffer;
// Creates a new EventQueue. Private to prevent creation of non-singleton instance.
- // Suppress warnings for disruptor.handleEventsWith and Disruptor constructor
- @SuppressWarnings({"deprecation", "unchecked", "varargs"})
+ // Suppress warnings for disruptor.handleEventsWith.
+ @SuppressWarnings({"unchecked"})
private DisruptorEventQueue() {
- // Create new Disruptor for processing. Note that this uses a single thread for processing; this
- // ensures that the event handler can take unsynchronized actions whenever possible.
+ // Create new Disruptor for processing. Note that Disruptor creates a single thread per
+ // consumer (see https://github.com/LMAX-Exchange/disruptor/issues/121 for details);
+ // this ensures that the event handler can take unsynchronized actions whenever possible.
disruptor =
- new Disruptor<DisruptorEvent>(
- new DisruptorEventFactory(),
+ new Disruptor<>(
+ DisruptorEventFactory.INSTANCE,
DISRUPTOR_BUFFER_SIZE,
- Executors.newSingleThreadExecutor(new DaemonThreadFactory("OpenCensus.Disruptor")),
+ new DaemonThreadFactory("OpenCensus.Disruptor"),
ProducerType.MULTI,
new SleepingWaitStrategy());
- disruptor.handleEventsWith(new DisruptorEventHandler());
+ disruptor.handleEventsWith(DisruptorEventHandler.INSTANCE);
disruptor.start();
ringBuffer = disruptor.getRingBuffer();
}
@@ -145,10 +145,12 @@ public final class DisruptorEventQueue implements EventQueue {
// An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
private static final class DisruptorEvent {
- @Nullable private Entry entry;
+ // TODO(bdrutu): Investigate if volatile is needed. This object is shared between threads so
+ // intuitively this variable must be volatile.
+ @Nullable private volatile Entry entry = null;
// Sets the EventQueueEntry associated with this DisruptorEvent.
- void setEntry(Entry entry) {
+ void setEntry(@Nullable Entry entry) {
this.entry = entry;
}
@@ -160,7 +162,9 @@ public final class DisruptorEventQueue implements EventQueue {
}
// Factory for DisruptorEvent.
- private static final class DisruptorEventFactory implements EventFactory<DisruptorEvent> {
+ private enum DisruptorEventFactory implements EventFactory<DisruptorEvent> {
+ INSTANCE;
+
@Override
public DisruptorEvent newInstance() {
return new DisruptorEvent();
@@ -171,12 +175,17 @@ public final class DisruptorEventQueue implements EventQueue {
* Every event that gets added to {@link EventQueue} will get processed here. Just calls the
* underlying process() method.
*/
- private static final class DisruptorEventHandler implements EventHandler<DisruptorEvent> {
+ private enum DisruptorEventHandler implements EventHandler<DisruptorEvent> {
+ INSTANCE;
+
@Override
- // TODO(sebright): Fix the Checker Framework warning.
- @SuppressWarnings("nullness")
public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) {
- event.getEntry().process();
+ Entry entry = event.getEntry();
+ if (entry != null) {
+ entry.process();
+ }
+ // Remove the reference to the previous entry to allow the memory to be gc'ed.
+ event.setEntry(null);
}
}
}