diff options
Diffstat (limited to 'impl_core/src/main/java/io/opencensus/implcore/internal')
9 files changed, 698 insertions, 0 deletions
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/CheckerFrameworkUtils.java b/impl_core/src/main/java/io/opencensus/implcore/internal/CheckerFrameworkUtils.java new file mode 100644 index 00000000..f08289cf --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/CheckerFrameworkUtils.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import javax.annotation.Nullable; + +/** + * Utility methods for suppressing nullness warnings and working around Checker Framework issues. + */ +public final class CheckerFrameworkUtils { + private CheckerFrameworkUtils() {} + + /** Suppresses warnings about a nullable value. */ + // TODO(sebright): Try to remove all uses of this method. + @SuppressWarnings("nullness") + public static <T> T castNonNull(@Nullable T arg) { + return arg; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/CurrentState.java b/impl_core/src/main/java/io/opencensus/implcore/internal/CurrentState.java new file mode 100644 index 00000000..d7b1b112 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/CurrentState.java @@ -0,0 +1,131 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.concurrent.ThreadSafe; + +/** The current state base implementation for stats and tags. */ +@ThreadSafe +public final class CurrentState { + + /** Current state for stats or tags. */ + public enum State { + /** State that fully enables stats collection or tag propagation. */ + ENABLED, + + /** State that disables stats collection or tag propagation. */ + DISABLED + } + + private enum InternalState { + // Enabled and not read. + ENABLED_NOT_READ(State.ENABLED, false), + + // Enabled and read. + ENABLED_READ(State.ENABLED, true), + + // Disable and not read. + DISABLED_NOT_READ(State.DISABLED, false), + + // Disable and read. + DISABLED_READ(State.DISABLED, true); + + private final State state; + private final boolean isRead; + + InternalState(State state, boolean isRead) { + this.state = state; + this.isRead = isRead; + } + } + + private final AtomicReference<InternalState> currentInternalState; + + /** + * Constructs a new {@code CurrentState}. + * + * @param defaultState the default initial state. + */ + public CurrentState(State defaultState) { + this.currentInternalState = + new AtomicReference<InternalState>( + defaultState == State.ENABLED + ? InternalState.ENABLED_NOT_READ + : InternalState.DISABLED_NOT_READ); + } + + /** + * Returns the current state and updates the status as being read. + * + * @return the current state and updates the status as being read. + */ + public State get() { + InternalState internalState = currentInternalState.get(); + while (!internalState.isRead) { + // Slow path, the state is first time read. Change the state only if no other changes + // happened between the moment initialState is read and this moment. This ensures that this + // method only changes the isRead part of the internal state. + currentInternalState.compareAndSet( + internalState, + internalState.state == State.ENABLED + ? InternalState.ENABLED_READ + : InternalState.DISABLED_READ); + internalState = currentInternalState.get(); + } + return internalState.state; + } + + /** + * Returns the current state without updating the status as being read. + * + * @return the current state without updating the status as being read. + */ + public State getInternal() { + return currentInternalState.get().state; + } + + /** + * Sets current state to the given state. Returns true if the current state is changed, false + * otherwise. + * + * @param state the state to be set. + * @return true if the current state is changed, false otherwise. + */ + public boolean set(State state) { + while (true) { + InternalState internalState = currentInternalState.get(); + checkState(!internalState.isRead, "State was already read, cannot set state."); + if (state == internalState.state) { + return false; + } else { + if (!currentInternalState.compareAndSet( + internalState, + state == State.ENABLED + ? InternalState.ENABLED_NOT_READ + : InternalState.DISABLED_NOT_READ)) { + // The state was changed between the moment the internalState was read and this point. + // Some conditions may be not correct, reset at the beginning and recheck all conditions. + continue; + } + return true; + } + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/DaemonThreadFactory.java b/impl_core/src/main/java/io/opencensus/implcore/internal/DaemonThreadFactory.java new file mode 100644 index 00000000..2baa5000 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/DaemonThreadFactory.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** A {@link ThreadFactory} implementation that starts all {@link Thread} as daemons. */ +public final class DaemonThreadFactory implements ThreadFactory { + // AppEngine runtimes have constraints on threading and socket handling + // that need to be accommodated. + public static final boolean IS_RESTRICTED_APPENGINE = + System.getProperty("com.google.appengine.runtime.environment") != null + && "1.7".equals(System.getProperty("java.specification.version")); + private static final String DELIMITER = "-"; + private static final ThreadFactory threadFactory = MoreExecutors.platformThreadFactory(); + private final AtomicInteger threadIdGen = new AtomicInteger(); + private final String threadPrefix; + + /** + * Constructs a new {@code DaemonThreadFactory}. + * + * @param threadPrefix used to prefix all thread names. (E.g. "CensusDisruptor"). + */ + public DaemonThreadFactory(String threadPrefix) { + this.threadPrefix = threadPrefix + DELIMITER; + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = threadFactory.newThread(r); + if (!IS_RESTRICTED_APPENGINE) { + thread.setName(threadPrefix + threadIdGen.getAndIncrement()); + thread.setDaemon(true); + } + return thread; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java new file mode 100644 index 00000000..6eb1149a --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +/** A queue that processes events. See {@code DisruptorEventQueue} for an example. */ +public interface EventQueue { + void enqueue(Entry entry); + + void shutdown(); + + /** + * Base interface to be used for all entries in {@link EventQueue}. For example usage, see {@code + * DisruptorEventQueue}. + */ + interface Entry { + /** + * Process the event associated with this entry. This will be called for every event in the + * associated {@link EventQueue}. + */ + void process(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/NoopScope.java b/impl_core/src/main/java/io/opencensus/implcore/internal/NoopScope.java new file mode 100644 index 00000000..51efe894 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/NoopScope.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import io.opencensus.common.Scope; + +/** A {@link Scope} that does nothing when it is created or closed. */ +public final class NoopScope implements Scope { + private static final Scope INSTANCE = new NoopScope(); + + private NoopScope() {} + + /** + * Returns a {@code NoopScope}. + * + * @return a {@code NoopScope}. + */ + public static Scope getInstance() { + return INSTANCE; + } + + @Override + public void close() {} +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java new file mode 100644 index 00000000..58c61c89 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java @@ -0,0 +1,32 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +/** + * An {@link EventQueue} that processes events in the current thread. This class can be used for + * testing. + */ +public class SimpleEventQueue implements EventQueue { + + @Override + public void enqueue(Entry entry) { + entry.process(); + } + + @Override + public void shutdown() {} +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/TimestampConverter.java b/impl_core/src/main/java/io/opencensus/implcore/internal/TimestampConverter.java new file mode 100644 index 00000000..c70f5860 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/TimestampConverter.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import io.opencensus.common.Clock; +import io.opencensus.common.Timestamp; +import javax.annotation.concurrent.Immutable; + +/** + * This class provides a mechanism for converting {@link System#nanoTime() nanoTime} values to + * {@link Timestamp}. + */ +@Immutable +public final class TimestampConverter { + private final Timestamp timestamp; + private final long nanoTime; + + // Returns a WallTimeConverter initialized to now. + public static TimestampConverter now(Clock clock) { + return new TimestampConverter(clock.now(), clock.nowNanos()); + } + + /** + * Converts a {@link System#nanoTime() nanoTime} value to {@link Timestamp}. + * + * @param nanoTime value to convert. + * @return the {@code Timestamp} representation of the {@code time}. + */ + public Timestamp convertNanoTime(long nanoTime) { + return timestamp.addNanos(nanoTime - this.nanoTime); + } + + private TimestampConverter(Timestamp timestamp, long nanoTime) { + this.timestamp = timestamp; + this.nanoTime = nanoTime; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/Utils.java b/impl_core/src/main/java/io/opencensus/implcore/internal/Utils.java new file mode 100644 index 00000000..05a039b9 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/Utils.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import java.util.List; + +/** General internal utility methods. */ +public final class Utils { + + private Utils() {} + + /** + * Throws a {@link NullPointerException} if any of the list elements is null. + * + * @param list the argument list to check for null. + * @param errorMessage the message to use for the exception. Will be converted to a string using + * {@link String#valueOf(Object)}. + */ + public static <T> void checkListElementNotNull( + List<T> list, @javax.annotation.Nullable Object errorMessage) { + for (T element : list) { + if (element == null) { + throw new NullPointerException(String.valueOf(errorMessage)); + } + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/VarInt.java b/impl_core/src/main/java/io/opencensus/implcore/internal/VarInt.java new file mode 100644 index 00000000..944f62fd --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/VarInt.java @@ -0,0 +1,283 @@ +/* + * Copyright 2016-17, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** Common methods to encode and decode varints and varlongs into ByteBuffers and arrays. */ +// CHECKSTYLE:OFF +@SuppressWarnings("UngroupedOverloads") +public class VarInt { + + /** Maximum encoded size of 32-bit positive integers (in bytes) */ + public static final int MAX_VARINT_SIZE = 5; + + /** maximum encoded size of 64-bit longs, and negative 32-bit ints (in bytes) */ + public static final int MAX_VARLONG_SIZE = 10; + + private VarInt() {} + + /** + * Returns the encoding size in bytes of its input value. + * + * @param i the integer to be measured + * @return the encoding size in bytes of its input value + */ + public static int varIntSize(int i) { + int result = 0; + do { + result++; + i >>>= 7; + } while (i != 0); + return result; + } + + /** + * Reads a varint from src, places its values into the first element of dst and returns the offset + * in to src of the first byte after the varint. + * + * @param src source buffer to retrieve from + * @param offset offset within src + * @param dst the resulting int value + * @return the updated offset after reading the varint + */ + public static int getVarInt(byte[] src, int offset, int[] dst) { + int result = 0; + int shift = 0; + int b; + do { + if (shift >= 32) { + // Out of range + throw new IndexOutOfBoundsException("varint too long"); + } + // Get 7 bits from next byte + b = src[offset++]; + result |= (b & 0x7F) << shift; + shift += 7; + } while ((b & 0x80) != 0); + dst[0] = result; + return offset; + } + + /** + * Encodes an integer in a variable-length encoding, 7 bits per byte, into a destination byte[], + * following the protocol buffer convention. + * + * @param v the int value to write to sink + * @param sink the sink buffer to write to + * @param offset the offset within sink to begin writing + * @return the updated offset after writing the varint + */ + public static int putVarInt(int v, byte[] sink, int offset) { + do { + // Encode next 7 bits + terminator bit + int bits = v & 0x7F; + v >>>= 7; + byte b = (byte) (bits + ((v != 0) ? 0x80 : 0)); + sink[offset++] = b; + } while (v != 0); + return offset; + } + + /** + * Reads a varint from the current position of the given ByteBuffer and returns the decoded value + * as 32 bit integer. + * + * <p>The position of the buffer is advanced to the first byte after the decoded varint. + * + * @param src the ByteBuffer to get the var int from + * @return The integer value of the decoded varint + */ + public static int getVarInt(ByteBuffer src) { + int tmp; + if ((tmp = src.get()) >= 0) { + return tmp; + } + int result = tmp & 0x7f; + if ((tmp = src.get()) >= 0) { + result |= tmp << 7; + } else { + result |= (tmp & 0x7f) << 7; + if ((tmp = src.get()) >= 0) { + result |= tmp << 14; + } else { + result |= (tmp & 0x7f) << 14; + if ((tmp = src.get()) >= 0) { + result |= tmp << 21; + } else { + result |= (tmp & 0x7f) << 21; + result |= (tmp = src.get()) << 28; + while (tmp < 0) { + // We get into this loop only in the case of overflow. + // By doing this, we can call getVarInt() instead of + // getVarLong() when we only need an int. + tmp = src.get(); + } + } + } + } + return result; + } + + /** + * Encodes an integer in a variable-length encoding, 7 bits per byte, to a ByteBuffer sink. + * + * @param v the value to encode + * @param sink the ByteBuffer to add the encoded value + */ + public static void putVarInt(int v, ByteBuffer sink) { + while (true) { + int bits = v & 0x7f; + v >>>= 7; + if (v == 0) { + sink.put((byte) bits); + return; + } + sink.put((byte) (bits | 0x80)); + } + } + + /** + * Reads a varint from the given InputStream and returns the decoded value as an int. + * + * @param inputStream the InputStream to read from + */ + public static int getVarInt(InputStream inputStream) throws IOException { + int result = 0; + int shift = 0; + int b; + do { + if (shift >= 32) { + // Out of range + throw new IndexOutOfBoundsException("varint too long"); + } + // Get 7 bits from next byte + b = inputStream.read(); + result |= (b & 0x7F) << shift; + shift += 7; + } while ((b & 0x80) != 0); + return result; + } + + /** + * Encodes an integer in a variable-length encoding, 7 bits per byte, and writes it to the given + * OutputStream. + * + * @param v the value to encode + * @param outputStream the OutputStream to write to + */ + public static void putVarInt(int v, OutputStream outputStream) throws IOException { + byte[] bytes = new byte[varIntSize(v)]; + putVarInt(v, bytes, 0); + outputStream.write(bytes); + } + + /** + * Returns the encoding size in bytes of its input value. + * + * @param v the long to be measured + * @return the encoding size in bytes of a given long value. + */ + public static int varLongSize(long v) { + int result = 0; + do { + result++; + v >>>= 7; + } while (v != 0); + return result; + } + + /** + * Reads an up to 64 bit long varint from the current position of the given ByteBuffer and returns + * the decoded value as long. + * + * <p>The position of the buffer is advanced to the first byte after the decoded varint. + * + * @param src the ByteBuffer to get the var int from + * @return The integer value of the decoded long varint + */ + public static long getVarLong(ByteBuffer src) { + long tmp; + if ((tmp = src.get()) >= 0) { + return tmp; + } + long result = tmp & 0x7f; + if ((tmp = src.get()) >= 0) { + result |= tmp << 7; + } else { + result |= (tmp & 0x7f) << 7; + if ((tmp = src.get()) >= 0) { + result |= tmp << 14; + } else { + result |= (tmp & 0x7f) << 14; + if ((tmp = src.get()) >= 0) { + result |= tmp << 21; + } else { + result |= (tmp & 0x7f) << 21; + if ((tmp = src.get()) >= 0) { + result |= tmp << 28; + } else { + result |= (tmp & 0x7f) << 28; + if ((tmp = src.get()) >= 0) { + result |= tmp << 35; + } else { + result |= (tmp & 0x7f) << 35; + if ((tmp = src.get()) >= 0) { + result |= tmp << 42; + } else { + result |= (tmp & 0x7f) << 42; + if ((tmp = src.get()) >= 0) { + result |= tmp << 49; + } else { + result |= (tmp & 0x7f) << 49; + if ((tmp = src.get()) >= 0) { + result |= tmp << 56; + } else { + result |= (tmp & 0x7f) << 56; + result |= ((long) src.get()) << 63; + } + } + } + } + } + } + } + } + return result; + } + + /** + * Encodes a long integer in a variable-length encoding, 7 bits per byte, to a ByteBuffer sink. + * + * @param v the value to encode + * @param sink the ByteBuffer to add the encoded value + */ + public static void putVarLong(long v, ByteBuffer sink) { + while (true) { + int bits = ((int) v) & 0x7f; + v >>>= 7; + if (v == 0) { + sink.put((byte) bits); + return; + } + sink.put((byte) (bits | 0x80)); + } + } +} |