diff options
author | Bogdan Drutu <bdrutu@google.com> | 2017-06-15 11:29:30 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-15 11:29:30 -0700 |
commit | 3b74070f74a75e780828370126b713524eece13c (patch) | |
tree | ae6bc5deb968cbfab938b355718898c84fbb0826 /impl | |
parent | 461b4ed4a37e74a42c8ba6264807a15e36ab9dc6 (diff) | |
download | opencensus-java-3b74070f74a75e780828370126b713524eece13c.tar.gz |
Prepare the release of the opencensus library. (#357)
Diffstat (limited to 'impl')
-rw-r--r-- | impl/README.md | 5 | ||||
-rw-r--r-- | impl/build.gradle | 18 | ||||
-rw-r--r-- | impl/src/main/java/io/opencensus/internal/DisruptorEventQueue.java | 172 | ||||
-rw-r--r-- | impl/src/main/java/io/opencensus/trace/TraceComponentImpl.java | 30 | ||||
-rw-r--r-- | impl/src/main/java/io/opencensus/trace/internal/ThreadLocalRandomHandler.java | 34 | ||||
-rw-r--r-- | impl/src/test/java/io/opencensus/internal/DisruptorEventQueueTest.java | 100 | ||||
-rw-r--r-- | impl/src/test/java/io/opencensus/trace/TracingTest.java | 48 |
7 files changed, 407 insertions, 0 deletions
diff --git a/impl/README.md b/impl/README.md new file mode 100644 index 00000000..3dee26f9 --- /dev/null +++ b/impl/README.md @@ -0,0 +1,5 @@ +OpenCensus Java implementation +====================================================== + +* Java 7 compatible. +* Contains any classes not compatible with Android. diff --git a/impl/build.gradle b/impl/build.gradle new file mode 100644 index 00000000..be4e959c --- /dev/null +++ b/impl/build.gradle @@ -0,0 +1,18 @@ +description = 'OpenCensus Implementation' + +apply plugin: 'java' + +[compileJava, compileTestJava].each() { + it.sourceCompatibility = 1.7 + it.targetCompatibility = 1.7 +} + +dependencies { + compile project(':opencensus-api'), + project(':opencensus-impl-core'), + libraries.disruptor + + signature "org.codehaus.mojo.signature:java17:+@signature" +} + +javadoc.exclude 'io/opencensus/internal/**'
\ No newline at end of file diff --git a/impl/src/main/java/io/opencensus/internal/DisruptorEventQueue.java b/impl/src/main/java/io/opencensus/internal/DisruptorEventQueue.java new file mode 100644 index 00000000..6360fe0e --- /dev/null +++ b/impl/src/main/java/io/opencensus/internal/DisruptorEventQueue.java @@ -0,0 +1,172 @@ +/* + * Copyright 2017, Google Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.internal; + +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import java.util.concurrent.Executors; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A low-latency event queue for background updating of (possibly contended) objects. This is + * intended for use by instrumentation methods to ensure that they do not block foreground + * activities. To customize the action taken on reading the queue, derive a new class from {@link + * EventQueue.Entry} and pass it to the {@link #enqueue(Entry)} method. The {@link Entry#process()} + * method of your class will be called and executed in a background thread. This class is a + * Singleton. + * + * <p>Example Usage: Given a class as follows: + * + * <pre> + * public class someClass { + * public void doSomething() { + * // Do the work of the method. One result is a measurement of something. + * int measurement = doSomeWork(); + * // Make an update to the class state, based on this measurement. This work can take some + * // time, but can be done asynchronously, in the background. + * update(measurement); + * } + * + * public void update(int arg) { + * // do something + * } + * } + * </pre> + * + * <p>The work of calling {@code someClass.update()} can be executed in the backgound as follows: + * + * <pre> + * public class someClass { + * // Add a EventQueueEntry class that will process the update call. + * private static final class SomeClassUpdateEvent implements EventQueueEntry { + * private final SomeClass someClassInstance; + * private final int arg; + * + * SomeObjectUpdateEvent(SomeObject someClassInstance, int arg) { + * this.someClassInstance = someClassInstance; + * this.arg = arg; + * } + * + * @Override + * public void process() { + * someClassInstance.update(arg); + * } + * } + * + * public void doSomething() { + * int measurement = doSomeWork(); + * // Instead of calling update() directly, create an event to do the processing, and insert + * // it into the EventQueue. It will be processed in a background thread, and doSomething() + * // can return immediately. + * EventQueue.getInstance.enqueue(new SomeClassUpdateEvent(this, measurement)); + * } + * } + * </pre> + */ +@ThreadSafe +public final class DisruptorEventQueue implements EventQueue { + // An event in the {@link EventQueue}. Just holds a reference to an EventQueueEntry. + private static final class InstrumentationEvent { + private Entry entry = null; + + // Sets the EventQueueEntry associated with this InstrumentationEvent. + void setEntry(Entry entry) { + this.entry = entry; + } + + // Returns the EventQueueEntry associated with this InstrumentationEvent. + Entry getEntry() { + return entry; + } + } + + // Factory for InstrumentationEvent. + private static final class InstrumentationEventFactory + implements EventFactory<InstrumentationEvent> { + @Override + public InstrumentationEvent newInstance() { + return new InstrumentationEvent(); + } + } + + // Every event that gets added to {@link EventQueue} will get processed here. Just calls the + // underlying process() method. + private static final class InstrumentationEventHandler + implements EventHandler<InstrumentationEvent> { + @Override + public void onEvent(InstrumentationEvent event, long sequence, boolean endOfBatch) { + event.getEntry().process(); + } + } + + // The single instance of the class. + private static final DisruptorEventQueue eventQueue = new DisruptorEventQueue(); + // The event queue is built on this {@link Disruptor}. + private final Disruptor<InstrumentationEvent> disruptor; + // Ring Buffer for the {@link Disruptor} that underlies the queue. + private final RingBuffer<InstrumentationEvent> ringBuffer; + + // Creates a new EventQueue. Private to prevent creation of non-singleton instance. + // Suppress warnings for disruptor.handleEventsWith and Disruptor constructor + @SuppressWarnings({"deprecation", "unchecked", "varargs"}) + private DisruptorEventQueue() { + // Number of events that can be enqueued at any one time. If more than this are enqueued, + // then subsequent attempts to enqueue new entries will block. + // TODO(aveitch): consider making this a parameter to the constructor, so the queue can be + // configured to a size appropriate to the system (smaller/less busy systems will not need as + // large a queue. + final int bufferSize = 8192; + // Create new Disruptor for processing. Note that this uses a single thread for processing; this + // ensures that the event handler can take unsynchronized actions whenever possible. + disruptor = + new Disruptor<InstrumentationEvent>( + new InstrumentationEventFactory(), + bufferSize, + Executors.newSingleThreadExecutor(), + ProducerType.MULTI, + new SleepingWaitStrategy()); + disruptor.handleEventsWith(new InstrumentationEventHandler()); + disruptor.start(); + ringBuffer = disruptor.getRingBuffer(); + } + + /** + * Returns the {@link DisruptorEventQueue} instance. + * + * @return the singleton {@code EventQueue} instance. + */ + public static DisruptorEventQueue getInstance() { + return eventQueue; + } + + /** + * Enqueues an event on the {@link DisruptorEventQueue}. + * + * @param entry a class encapsulating the actions to be taken for event processing. + */ + @Override + public void enqueue(Entry entry) { + long sequence = ringBuffer.next(); + try { + InstrumentationEvent event = ringBuffer.get(sequence); + event.setEntry(entry); + } finally { + ringBuffer.publish(sequence); + } + } +} diff --git a/impl/src/main/java/io/opencensus/trace/TraceComponentImpl.java b/impl/src/main/java/io/opencensus/trace/TraceComponentImpl.java new file mode 100644 index 00000000..5d0700bd --- /dev/null +++ b/impl/src/main/java/io/opencensus/trace/TraceComponentImpl.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017, Google Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.trace; + +import io.opencensus.common.MillisClock; +import io.opencensus.internal.DisruptorEventQueue; +import io.opencensus.trace.internal.ThreadLocalRandomHandler; + +/** Java 7 and 8 implementation of the {@link TraceComponent}. */ +public final class TraceComponentImpl extends TraceComponentImplBase { + + /** Public constructor to be used with reflection loading. */ + public TraceComponentImpl() { + super( + MillisClock.getInstance(), + new ThreadLocalRandomHandler(), + DisruptorEventQueue.getInstance()); + } +} diff --git a/impl/src/main/java/io/opencensus/trace/internal/ThreadLocalRandomHandler.java b/impl/src/main/java/io/opencensus/trace/internal/ThreadLocalRandomHandler.java new file mode 100644 index 00000000..7cb9b2f2 --- /dev/null +++ b/impl/src/main/java/io/opencensus/trace/internal/ThreadLocalRandomHandler.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017, Google Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.trace.internal; + +import io.opencensus.trace.internal.RandomHandler; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.concurrent.ThreadSafe; + +/** Implementation of the {@link RandomHandler} using {@link ThreadLocalRandom}. */ +@ThreadSafe +public final class ThreadLocalRandomHandler extends RandomHandler { + + /** + * Constructs a new {@code ThreadLocalRandomHandler}. + */ + public ThreadLocalRandomHandler() {} + + @Override + public Random current() { + return ThreadLocalRandom.current(); + } +} diff --git a/impl/src/test/java/io/opencensus/internal/DisruptorEventQueueTest.java b/impl/src/test/java/io/opencensus/internal/DisruptorEventQueueTest.java new file mode 100644 index 00000000..83a00847 --- /dev/null +++ b/impl/src/test/java/io/opencensus/internal/DisruptorEventQueueTest.java @@ -0,0 +1,100 @@ +/* + * Copyright 2016, Google Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.internal; + +import static com.google.common.truth.Truth.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link DisruptorEventQueue}. */ +@RunWith(JUnit4.class) +public class DisruptorEventQueueTest { + // Simple class to use that keeps an incrementing counter. Will fail with an assertion if + // increment is used from multiple threads, or if the stored value is different from that expected + // by the caller. + private static class Counter { + private int count; + private volatile long id; // stores thread ID used in first increment operation. + + public Counter() { + count = 0; + id = -1; + } + + // Increments counter by 1. Will fail in assertion if multiple different threads are used + // (the EventQueue backend should be single-threaded). + public void increment() { + long tid = Thread.currentThread().getId(); + if (id == -1) { + assertThat(count).isEqualTo(0); + id = tid; + } else { + assertThat(id).isEqualTo(tid); + } + count++; + } + + // Check the current value of the counter. Assert if it is not the expected value. + public void check(int value) { + assertThat(count).isEqualTo(value); + } + } + + // EventQueueEntry for incrementing a Counter. + private static class IncrementEvent implements EventQueue.Entry { + private final Counter counter; + + IncrementEvent(Counter counter) { + this.counter = counter; + } + + @Override + public void process() { + counter.increment(); + } + } + + @Test + public void incrementOnce() { + Counter counter = new Counter(); + IncrementEvent ie = new IncrementEvent(counter); + DisruptorEventQueue.getInstance().enqueue(ie); + // Sleep briefly, to allow background operations to complete. + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + counter.check(1); + } + + @Test + public void incrementTenK() { + final int tenK = 10000; + Counter counter = new Counter(); + for (int i = 0; i < tenK; i++) { + IncrementEvent ie = new IncrementEvent(counter); + DisruptorEventQueue.getInstance().enqueue(ie); + } + // Sleep briefly, to allow background operations to complete. + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + counter.check(tenK); + } +} diff --git a/impl/src/test/java/io/opencensus/trace/TracingTest.java b/impl/src/test/java/io/opencensus/trace/TracingTest.java new file mode 100644 index 00000000..5f5fbeee --- /dev/null +++ b/impl/src/test/java/io/opencensus/trace/TracingTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2017, Google Inc. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.trace; + +import static com.google.common.truth.Truth.assertThat; + +import io.opencensus.common.MillisClock; +import io.opencensus.trace.export.ExportComponentImpl; +import io.opencensus.trace.propagation.PropagationComponent; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test for accessing the {@link TraceComponent} through the {@link Tracing} class. */ +@RunWith(JUnit4.class) +public class TracingTest { + @Test + public void implementationOfTracer() { + assertThat(Tracing.getTracer()).isInstanceOf(TracerImpl.class); + } + + @Test + public void implementationOfBinaryPropagationHandler() { + assertThat(Tracing.getPropagationComponent()) + .isInstanceOf(PropagationComponent.class); + } + + @Test + public void implementationOfClock() { + assertThat(Tracing.getClock()).isInstanceOf(MillisClock.class); + } + + @Test + public void implementationOfTraceExporter() { + assertThat(Tracing.getTraceExporter()).isInstanceOf(ExportComponentImpl.class); + } +} |