diff options
Diffstat (limited to 'impl_core/src/main/java')
60 files changed, 7386 insertions, 0 deletions
diff --git a/impl_core/src/main/java/io/opencensus/implcore/common/MillisClock.java b/impl_core/src/main/java/io/opencensus/implcore/common/MillisClock.java new file mode 100644 index 00000000..98626926 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/common/MillisClock.java @@ -0,0 +1,48 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.common; + +import io.opencensus.common.Clock; +import io.opencensus.common.Timestamp; +import javax.annotation.concurrent.ThreadSafe; + +/** A {@link Clock} that uses {@link System#currentTimeMillis()} and {@link System#nanoTime()}. */ +@ThreadSafe +public final class MillisClock extends Clock { + private static final MillisClock INSTANCE = new MillisClock(); + + private MillisClock() {} + + /** + * Returns a {@code MillisClock}. + * + * @return a {@code MillisClock}. + */ + public static MillisClock getInstance() { + return INSTANCE; + } + + @Override + public Timestamp now() { + return Timestamp.fromMillis(System.currentTimeMillis()); + } + + @Override + public long nowNanos() { + return System.nanoTime(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/CheckerFrameworkUtils.java b/impl_core/src/main/java/io/opencensus/implcore/internal/CheckerFrameworkUtils.java new file mode 100644 index 00000000..f08289cf --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/CheckerFrameworkUtils.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import javax.annotation.Nullable; + +/** + * Utility methods for suppressing nullness warnings and working around Checker Framework issues. + */ +public final class CheckerFrameworkUtils { + private CheckerFrameworkUtils() {} + + /** Suppresses warnings about a nullable value. */ + // TODO(sebright): Try to remove all uses of this method. + @SuppressWarnings("nullness") + public static <T> T castNonNull(@Nullable T arg) { + return arg; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/CurrentState.java b/impl_core/src/main/java/io/opencensus/implcore/internal/CurrentState.java new file mode 100644 index 00000000..d7b1b112 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/CurrentState.java @@ -0,0 +1,131 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.concurrent.ThreadSafe; + +/** The current state base implementation for stats and tags. */ +@ThreadSafe +public final class CurrentState { + + /** Current state for stats or tags. */ + public enum State { + /** State that fully enables stats collection or tag propagation. */ + ENABLED, + + /** State that disables stats collection or tag propagation. */ + DISABLED + } + + private enum InternalState { + // Enabled and not read. + ENABLED_NOT_READ(State.ENABLED, false), + + // Enabled and read. + ENABLED_READ(State.ENABLED, true), + + // Disable and not read. + DISABLED_NOT_READ(State.DISABLED, false), + + // Disable and read. + DISABLED_READ(State.DISABLED, true); + + private final State state; + private final boolean isRead; + + InternalState(State state, boolean isRead) { + this.state = state; + this.isRead = isRead; + } + } + + private final AtomicReference<InternalState> currentInternalState; + + /** + * Constructs a new {@code CurrentState}. + * + * @param defaultState the default initial state. + */ + public CurrentState(State defaultState) { + this.currentInternalState = + new AtomicReference<InternalState>( + defaultState == State.ENABLED + ? InternalState.ENABLED_NOT_READ + : InternalState.DISABLED_NOT_READ); + } + + /** + * Returns the current state and updates the status as being read. + * + * @return the current state and updates the status as being read. + */ + public State get() { + InternalState internalState = currentInternalState.get(); + while (!internalState.isRead) { + // Slow path, the state is first time read. Change the state only if no other changes + // happened between the moment initialState is read and this moment. This ensures that this + // method only changes the isRead part of the internal state. + currentInternalState.compareAndSet( + internalState, + internalState.state == State.ENABLED + ? InternalState.ENABLED_READ + : InternalState.DISABLED_READ); + internalState = currentInternalState.get(); + } + return internalState.state; + } + + /** + * Returns the current state without updating the status as being read. + * + * @return the current state without updating the status as being read. + */ + public State getInternal() { + return currentInternalState.get().state; + } + + /** + * Sets current state to the given state. Returns true if the current state is changed, false + * otherwise. + * + * @param state the state to be set. + * @return true if the current state is changed, false otherwise. + */ + public boolean set(State state) { + while (true) { + InternalState internalState = currentInternalState.get(); + checkState(!internalState.isRead, "State was already read, cannot set state."); + if (state == internalState.state) { + return false; + } else { + if (!currentInternalState.compareAndSet( + internalState, + state == State.ENABLED + ? InternalState.ENABLED_NOT_READ + : InternalState.DISABLED_NOT_READ)) { + // The state was changed between the moment the internalState was read and this point. + // Some conditions may be not correct, reset at the beginning and recheck all conditions. + continue; + } + return true; + } + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/DaemonThreadFactory.java b/impl_core/src/main/java/io/opencensus/implcore/internal/DaemonThreadFactory.java new file mode 100644 index 00000000..2baa5000 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/DaemonThreadFactory.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** A {@link ThreadFactory} implementation that starts all {@link Thread} as daemons. */ +public final class DaemonThreadFactory implements ThreadFactory { + // AppEngine runtimes have constraints on threading and socket handling + // that need to be accommodated. + public static final boolean IS_RESTRICTED_APPENGINE = + System.getProperty("com.google.appengine.runtime.environment") != null + && "1.7".equals(System.getProperty("java.specification.version")); + private static final String DELIMITER = "-"; + private static final ThreadFactory threadFactory = MoreExecutors.platformThreadFactory(); + private final AtomicInteger threadIdGen = new AtomicInteger(); + private final String threadPrefix; + + /** + * Constructs a new {@code DaemonThreadFactory}. + * + * @param threadPrefix used to prefix all thread names. (E.g. "CensusDisruptor"). + */ + public DaemonThreadFactory(String threadPrefix) { + this.threadPrefix = threadPrefix + DELIMITER; + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = threadFactory.newThread(r); + if (!IS_RESTRICTED_APPENGINE) { + thread.setName(threadPrefix + threadIdGen.getAndIncrement()); + thread.setDaemon(true); + } + return thread; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java new file mode 100644 index 00000000..6eb1149a --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +/** A queue that processes events. See {@code DisruptorEventQueue} for an example. */ +public interface EventQueue { + void enqueue(Entry entry); + + void shutdown(); + + /** + * Base interface to be used for all entries in {@link EventQueue}. For example usage, see {@code + * DisruptorEventQueue}. + */ + interface Entry { + /** + * Process the event associated with this entry. This will be called for every event in the + * associated {@link EventQueue}. + */ + void process(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/NoopScope.java b/impl_core/src/main/java/io/opencensus/implcore/internal/NoopScope.java new file mode 100644 index 00000000..51efe894 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/NoopScope.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import io.opencensus.common.Scope; + +/** A {@link Scope} that does nothing when it is created or closed. */ +public final class NoopScope implements Scope { + private static final Scope INSTANCE = new NoopScope(); + + private NoopScope() {} + + /** + * Returns a {@code NoopScope}. + * + * @return a {@code NoopScope}. + */ + public static Scope getInstance() { + return INSTANCE; + } + + @Override + public void close() {} +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java new file mode 100644 index 00000000..58c61c89 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java @@ -0,0 +1,32 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +/** + * An {@link EventQueue} that processes events in the current thread. This class can be used for + * testing. + */ +public class SimpleEventQueue implements EventQueue { + + @Override + public void enqueue(Entry entry) { + entry.process(); + } + + @Override + public void shutdown() {} +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/TimestampConverter.java b/impl_core/src/main/java/io/opencensus/implcore/internal/TimestampConverter.java new file mode 100644 index 00000000..c70f5860 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/TimestampConverter.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import io.opencensus.common.Clock; +import io.opencensus.common.Timestamp; +import javax.annotation.concurrent.Immutable; + +/** + * This class provides a mechanism for converting {@link System#nanoTime() nanoTime} values to + * {@link Timestamp}. + */ +@Immutable +public final class TimestampConverter { + private final Timestamp timestamp; + private final long nanoTime; + + // Returns a WallTimeConverter initialized to now. + public static TimestampConverter now(Clock clock) { + return new TimestampConverter(clock.now(), clock.nowNanos()); + } + + /** + * Converts a {@link System#nanoTime() nanoTime} value to {@link Timestamp}. + * + * @param nanoTime value to convert. + * @return the {@code Timestamp} representation of the {@code time}. + */ + public Timestamp convertNanoTime(long nanoTime) { + return timestamp.addNanos(nanoTime - this.nanoTime); + } + + private TimestampConverter(Timestamp timestamp, long nanoTime) { + this.timestamp = timestamp; + this.nanoTime = nanoTime; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/Utils.java b/impl_core/src/main/java/io/opencensus/implcore/internal/Utils.java new file mode 100644 index 00000000..05a039b9 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/Utils.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import java.util.List; + +/** General internal utility methods. */ +public final class Utils { + + private Utils() {} + + /** + * Throws a {@link NullPointerException} if any of the list elements is null. + * + * @param list the argument list to check for null. + * @param errorMessage the message to use for the exception. Will be converted to a string using + * {@link String#valueOf(Object)}. + */ + public static <T> void checkListElementNotNull( + List<T> list, @javax.annotation.Nullable Object errorMessage) { + for (T element : list) { + if (element == null) { + throw new NullPointerException(String.valueOf(errorMessage)); + } + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/VarInt.java b/impl_core/src/main/java/io/opencensus/implcore/internal/VarInt.java new file mode 100644 index 00000000..944f62fd --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/internal/VarInt.java @@ -0,0 +1,283 @@ +/* + * Copyright 2016-17, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.internal; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** Common methods to encode and decode varints and varlongs into ByteBuffers and arrays. */ +// CHECKSTYLE:OFF +@SuppressWarnings("UngroupedOverloads") +public class VarInt { + + /** Maximum encoded size of 32-bit positive integers (in bytes) */ + public static final int MAX_VARINT_SIZE = 5; + + /** maximum encoded size of 64-bit longs, and negative 32-bit ints (in bytes) */ + public static final int MAX_VARLONG_SIZE = 10; + + private VarInt() {} + + /** + * Returns the encoding size in bytes of its input value. + * + * @param i the integer to be measured + * @return the encoding size in bytes of its input value + */ + public static int varIntSize(int i) { + int result = 0; + do { + result++; + i >>>= 7; + } while (i != 0); + return result; + } + + /** + * Reads a varint from src, places its values into the first element of dst and returns the offset + * in to src of the first byte after the varint. + * + * @param src source buffer to retrieve from + * @param offset offset within src + * @param dst the resulting int value + * @return the updated offset after reading the varint + */ + public static int getVarInt(byte[] src, int offset, int[] dst) { + int result = 0; + int shift = 0; + int b; + do { + if (shift >= 32) { + // Out of range + throw new IndexOutOfBoundsException("varint too long"); + } + // Get 7 bits from next byte + b = src[offset++]; + result |= (b & 0x7F) << shift; + shift += 7; + } while ((b & 0x80) != 0); + dst[0] = result; + return offset; + } + + /** + * Encodes an integer in a variable-length encoding, 7 bits per byte, into a destination byte[], + * following the protocol buffer convention. + * + * @param v the int value to write to sink + * @param sink the sink buffer to write to + * @param offset the offset within sink to begin writing + * @return the updated offset after writing the varint + */ + public static int putVarInt(int v, byte[] sink, int offset) { + do { + // Encode next 7 bits + terminator bit + int bits = v & 0x7F; + v >>>= 7; + byte b = (byte) (bits + ((v != 0) ? 0x80 : 0)); + sink[offset++] = b; + } while (v != 0); + return offset; + } + + /** + * Reads a varint from the current position of the given ByteBuffer and returns the decoded value + * as 32 bit integer. + * + * <p>The position of the buffer is advanced to the first byte after the decoded varint. + * + * @param src the ByteBuffer to get the var int from + * @return The integer value of the decoded varint + */ + public static int getVarInt(ByteBuffer src) { + int tmp; + if ((tmp = src.get()) >= 0) { + return tmp; + } + int result = tmp & 0x7f; + if ((tmp = src.get()) >= 0) { + result |= tmp << 7; + } else { + result |= (tmp & 0x7f) << 7; + if ((tmp = src.get()) >= 0) { + result |= tmp << 14; + } else { + result |= (tmp & 0x7f) << 14; + if ((tmp = src.get()) >= 0) { + result |= tmp << 21; + } else { + result |= (tmp & 0x7f) << 21; + result |= (tmp = src.get()) << 28; + while (tmp < 0) { + // We get into this loop only in the case of overflow. + // By doing this, we can call getVarInt() instead of + // getVarLong() when we only need an int. + tmp = src.get(); + } + } + } + } + return result; + } + + /** + * Encodes an integer in a variable-length encoding, 7 bits per byte, to a ByteBuffer sink. + * + * @param v the value to encode + * @param sink the ByteBuffer to add the encoded value + */ + public static void putVarInt(int v, ByteBuffer sink) { + while (true) { + int bits = v & 0x7f; + v >>>= 7; + if (v == 0) { + sink.put((byte) bits); + return; + } + sink.put((byte) (bits | 0x80)); + } + } + + /** + * Reads a varint from the given InputStream and returns the decoded value as an int. + * + * @param inputStream the InputStream to read from + */ + public static int getVarInt(InputStream inputStream) throws IOException { + int result = 0; + int shift = 0; + int b; + do { + if (shift >= 32) { + // Out of range + throw new IndexOutOfBoundsException("varint too long"); + } + // Get 7 bits from next byte + b = inputStream.read(); + result |= (b & 0x7F) << shift; + shift += 7; + } while ((b & 0x80) != 0); + return result; + } + + /** + * Encodes an integer in a variable-length encoding, 7 bits per byte, and writes it to the given + * OutputStream. + * + * @param v the value to encode + * @param outputStream the OutputStream to write to + */ + public static void putVarInt(int v, OutputStream outputStream) throws IOException { + byte[] bytes = new byte[varIntSize(v)]; + putVarInt(v, bytes, 0); + outputStream.write(bytes); + } + + /** + * Returns the encoding size in bytes of its input value. + * + * @param v the long to be measured + * @return the encoding size in bytes of a given long value. + */ + public static int varLongSize(long v) { + int result = 0; + do { + result++; + v >>>= 7; + } while (v != 0); + return result; + } + + /** + * Reads an up to 64 bit long varint from the current position of the given ByteBuffer and returns + * the decoded value as long. + * + * <p>The position of the buffer is advanced to the first byte after the decoded varint. + * + * @param src the ByteBuffer to get the var int from + * @return The integer value of the decoded long varint + */ + public static long getVarLong(ByteBuffer src) { + long tmp; + if ((tmp = src.get()) >= 0) { + return tmp; + } + long result = tmp & 0x7f; + if ((tmp = src.get()) >= 0) { + result |= tmp << 7; + } else { + result |= (tmp & 0x7f) << 7; + if ((tmp = src.get()) >= 0) { + result |= tmp << 14; + } else { + result |= (tmp & 0x7f) << 14; + if ((tmp = src.get()) >= 0) { + result |= tmp << 21; + } else { + result |= (tmp & 0x7f) << 21; + if ((tmp = src.get()) >= 0) { + result |= tmp << 28; + } else { + result |= (tmp & 0x7f) << 28; + if ((tmp = src.get()) >= 0) { + result |= tmp << 35; + } else { + result |= (tmp & 0x7f) << 35; + if ((tmp = src.get()) >= 0) { + result |= tmp << 42; + } else { + result |= (tmp & 0x7f) << 42; + if ((tmp = src.get()) >= 0) { + result |= tmp << 49; + } else { + result |= (tmp & 0x7f) << 49; + if ((tmp = src.get()) >= 0) { + result |= tmp << 56; + } else { + result |= (tmp & 0x7f) << 56; + result |= ((long) src.get()) << 63; + } + } + } + } + } + } + } + } + return result; + } + + /** + * Encodes a long integer in a variable-length encoding, 7 bits per byte, to a ByteBuffer sink. + * + * @param v the value to encode + * @param sink the ByteBuffer to add the encoded value + */ + public static void putVarLong(long v, ByteBuffer sink) { + while (true) { + int bits = ((int) v) & 0x7f; + v >>>= 7; + if (v == 0) { + sink.put((byte) bits); + return; + } + sink.put((byte) (bits | 0x80)); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedDoubleGaugeImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedDoubleGaugeImpl.java new file mode 100644 index 00000000..b7104c9b --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedDoubleGaugeImpl.java @@ -0,0 +1,155 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.metrics; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import io.opencensus.common.Clock; +import io.opencensus.common.ToDoubleFunction; +import io.opencensus.implcore.internal.Utils; +import io.opencensus.metrics.DerivedDoubleGauge; +import io.opencensus.metrics.LabelKey; +import io.opencensus.metrics.LabelValue; +import io.opencensus.metrics.export.Metric; +import io.opencensus.metrics.export.MetricDescriptor; +import io.opencensus.metrics.export.MetricDescriptor.Type; +import io.opencensus.metrics.export.Point; +import io.opencensus.metrics.export.TimeSeries; +import io.opencensus.metrics.export.Value; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +/** Implementation of {@link DerivedDoubleGauge}. */ +public final class DerivedDoubleGaugeImpl extends DerivedDoubleGauge implements Meter { + private final MetricDescriptor metricDescriptor; + private final int labelKeysSize; + + @SuppressWarnings("rawtypes") + private volatile Map<List<LabelValue>, PointWithFunction> registeredPoints = + Collections.<List<LabelValue>, PointWithFunction>emptyMap(); + + DerivedDoubleGaugeImpl(String name, String description, String unit, List<LabelKey> labelKeys) { + labelKeysSize = labelKeys.size(); + this.metricDescriptor = + MetricDescriptor.create(name, description, unit, Type.GAUGE_DOUBLE, labelKeys); + } + + @Override + @SuppressWarnings("rawtypes") + public synchronized <T> void createTimeSeries( + List<LabelValue> labelValues, + /*@Nullable*/ T obj, + ToDoubleFunction</*@Nullable*/ T> function) { + Utils.checkListElementNotNull( + checkNotNull(labelValues, "labelValues"), "labelValue element should not be null."); + checkArgument(labelKeysSize == labelValues.size(), "Incorrect number of labels."); + checkNotNull(function, "function"); + + List<LabelValue> labelValuesCopy = + Collections.<LabelValue>unmodifiableList(new ArrayList<LabelValue>(labelValues)); + + PointWithFunction existingPoint = registeredPoints.get(labelValuesCopy); + if (existingPoint != null) { + throw new IllegalArgumentException( + "A different time series with the same labels already exists."); + } + + PointWithFunction newPoint = new PointWithFunction<T>(labelValuesCopy, obj, function); + // Updating the map of time series happens under a lock to avoid multiple add operations + // to happen in the same time. + Map<List<LabelValue>, PointWithFunction> registeredPointsCopy = + new LinkedHashMap<List<LabelValue>, PointWithFunction>(registeredPoints); + registeredPointsCopy.put(labelValuesCopy, newPoint); + registeredPoints = Collections.unmodifiableMap(registeredPointsCopy); + } + + @Override + @SuppressWarnings("rawtypes") + public synchronized void removeTimeSeries(List<LabelValue> labelValues) { + checkNotNull(labelValues, "labelValues"); + + Map<List<LabelValue>, PointWithFunction> registeredPointsCopy = + new LinkedHashMap<List<LabelValue>, PointWithFunction>(registeredPoints); + if (registeredPointsCopy.remove(labelValues) == null) { + // The element not present, no need to update the current map of time series. + return; + } + registeredPoints = Collections.unmodifiableMap(registeredPointsCopy); + } + + @Override + @SuppressWarnings("rawtypes") + public synchronized void clear() { + registeredPoints = Collections.<List<LabelValue>, PointWithFunction>emptyMap(); + } + + /*@Nullable*/ + @Override + @SuppressWarnings("rawtypes") + public Metric getMetric(Clock clock) { + Map<List<LabelValue>, PointWithFunction> currentRegisteredPoints = registeredPoints; + if (currentRegisteredPoints.isEmpty()) { + return null; + } + + if (currentRegisteredPoints.size() == 1) { + PointWithFunction point = currentRegisteredPoints.values().iterator().next(); + return Metric.createWithOneTimeSeries(metricDescriptor, point.getTimeSeries(clock)); + } + + List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(currentRegisteredPoints.size()); + for (Map.Entry<List<LabelValue>, PointWithFunction> entry : + currentRegisteredPoints.entrySet()) { + timeSeriesList.add(entry.getValue().getTimeSeries(clock)); + } + return Metric.create(metricDescriptor, timeSeriesList); + } + + /** Implementation of {@link PointWithFunction} with an object and a callback function. */ + public static final class PointWithFunction<T> { + private final List<LabelValue> labelValues; + @javax.annotation.Nullable private final WeakReference<T> ref; + private final ToDoubleFunction</*@Nullable*/ T> function; + + PointWithFunction( + List<LabelValue> labelValues, + /*@Nullable*/ T obj, + ToDoubleFunction</*@Nullable*/ T> function) { + this.labelValues = labelValues; + ref = obj != null ? new WeakReference<T>(obj) : null; + this.function = function; + } + + private TimeSeries getTimeSeries(Clock clock) { + final T obj = ref != null ? ref.get() : null; + double value = function.applyAsDouble(obj); + + // TODO(mayurkale): OPTIMIZATION: Avoid re-evaluate the labelValues all the time (issue#1490). + return TimeSeries.createWithOnePoint( + labelValues, Point.create(Value.doubleValue(value), clock.now()), null); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedLongGaugeImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedLongGaugeImpl.java new file mode 100644 index 00000000..90e3e706 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedLongGaugeImpl.java @@ -0,0 +1,153 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.metrics; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import io.opencensus.common.Clock; +import io.opencensus.common.ToLongFunction; +import io.opencensus.implcore.internal.Utils; +import io.opencensus.metrics.DerivedLongGauge; +import io.opencensus.metrics.LabelKey; +import io.opencensus.metrics.LabelValue; +import io.opencensus.metrics.export.Metric; +import io.opencensus.metrics.export.MetricDescriptor; +import io.opencensus.metrics.export.MetricDescriptor.Type; +import io.opencensus.metrics.export.Point; +import io.opencensus.metrics.export.TimeSeries; +import io.opencensus.metrics.export.Value; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +/** Implementation of {@link DerivedLongGauge}. */ +public final class DerivedLongGaugeImpl extends DerivedLongGauge implements Meter { + private final MetricDescriptor metricDescriptor; + private final int labelKeysSize; + + @SuppressWarnings("rawtypes") + private volatile Map<List<LabelValue>, PointWithFunction> registeredPoints = + Collections.<List<LabelValue>, PointWithFunction>emptyMap(); + + DerivedLongGaugeImpl(String name, String description, String unit, List<LabelKey> labelKeys) { + labelKeysSize = labelKeys.size(); + this.metricDescriptor = + MetricDescriptor.create(name, description, unit, Type.GAUGE_INT64, labelKeys); + } + + @Override + @SuppressWarnings("rawtypes") + public synchronized <T> void createTimeSeries( + List<LabelValue> labelValues, /*@Nullable*/ T obj, ToLongFunction</*@Nullable*/ T> function) { + Utils.checkListElementNotNull( + checkNotNull(labelValues, "labelValues"), "labelValue element should not be null."); + checkArgument(labelKeysSize == labelValues.size(), "Incorrect number of labels."); + checkNotNull(function, "function"); + + List<LabelValue> labelValuesCopy = + Collections.unmodifiableList(new ArrayList<LabelValue>(labelValues)); + + PointWithFunction existingPoint = registeredPoints.get(labelValuesCopy); + if (existingPoint != null) { + throw new IllegalArgumentException( + "A different time series with the same labels already exists."); + } + + PointWithFunction newPoint = new PointWithFunction<T>(labelValuesCopy, obj, function); + // Updating the map of time series happens under a lock to avoid multiple add operations + // to happen in the same time. + Map<List<LabelValue>, PointWithFunction> registeredPointsCopy = + new LinkedHashMap<List<LabelValue>, PointWithFunction>(registeredPoints); + registeredPointsCopy.put(labelValuesCopy, newPoint); + registeredPoints = Collections.unmodifiableMap(registeredPointsCopy); + } + + @Override + @SuppressWarnings("rawtypes") + public synchronized void removeTimeSeries(List<LabelValue> labelValues) { + checkNotNull(labelValues, "labelValues"); + + Map<List<LabelValue>, PointWithFunction> registeredPointsCopy = + new LinkedHashMap<List<LabelValue>, PointWithFunction>(registeredPoints); + if (registeredPointsCopy.remove(labelValues) == null) { + // The element not present, no need to update the current map of time series. + return; + } + registeredPoints = Collections.unmodifiableMap(registeredPointsCopy); + } + + @Override + @SuppressWarnings("rawtypes") + public synchronized void clear() { + registeredPoints = Collections.<List<LabelValue>, PointWithFunction>emptyMap(); + } + + /*@Nullable*/ + @Override + @SuppressWarnings("rawtypes") + public Metric getMetric(Clock clock) { + Map<List<LabelValue>, PointWithFunction> currentRegisteredPoints = registeredPoints; + if (currentRegisteredPoints.isEmpty()) { + return null; + } + + if (currentRegisteredPoints.size() == 1) { + PointWithFunction point = currentRegisteredPoints.values().iterator().next(); + return Metric.createWithOneTimeSeries(metricDescriptor, point.getTimeSeries(clock)); + } + + List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(currentRegisteredPoints.size()); + for (Map.Entry<List<LabelValue>, PointWithFunction> entry : + currentRegisteredPoints.entrySet()) { + timeSeriesList.add(entry.getValue().getTimeSeries(clock)); + } + return Metric.create(metricDescriptor, timeSeriesList); + } + + /** Implementation of {@link PointWithFunction} with an object and a callback function. */ + public static final class PointWithFunction<T> { + private final List<LabelValue> labelValues; + @javax.annotation.Nullable private final WeakReference<T> ref; + private final ToLongFunction</*@Nullable*/ T> function; + + PointWithFunction( + List<LabelValue> labelValues, + /*@Nullable*/ T obj, + ToLongFunction</*@Nullable*/ T> function) { + this.labelValues = labelValues; + ref = obj != null ? new WeakReference<T>(obj) : null; + this.function = function; + } + + private TimeSeries getTimeSeries(Clock clock) { + final T obj = ref != null ? ref.get() : null; + long value = function.applyAsLong(obj); + + // TODO(mayurkale): OPTIMIZATION: Avoid re-evaluate the labelValues all the time (issue#1490). + return TimeSeries.createWithOnePoint( + labelValues, Point.create(Value.longValue(value), clock.now()), null); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/DoubleGaugeImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/DoubleGaugeImpl.java new file mode 100644 index 00000000..c314e980 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/DoubleGaugeImpl.java @@ -0,0 +1,174 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.metrics; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.AtomicDouble; +import io.opencensus.common.Clock; +import io.opencensus.implcore.internal.Utils; +import io.opencensus.metrics.DoubleGauge; +import io.opencensus.metrics.LabelKey; +import io.opencensus.metrics.LabelValue; +import io.opencensus.metrics.export.Metric; +import io.opencensus.metrics.export.MetricDescriptor; +import io.opencensus.metrics.export.MetricDescriptor.Type; +import io.opencensus.metrics.export.Point; +import io.opencensus.metrics.export.TimeSeries; +import io.opencensus.metrics.export.Value; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** Implementation of {@link DoubleGauge}. */ +public final class DoubleGaugeImpl extends DoubleGauge implements Meter { + @VisibleForTesting static final LabelValue UNSET_VALUE = LabelValue.create(null); + + private final MetricDescriptor metricDescriptor; + private volatile Map<List<LabelValue>, PointImpl> registeredPoints = + Collections.<List<LabelValue>, PointImpl>emptyMap(); + private final int labelKeysSize; + private final List<LabelValue> defaultLabelValues; + + DoubleGaugeImpl(String name, String description, String unit, List<LabelKey> labelKeys) { + labelKeysSize = labelKeys.size(); + this.metricDescriptor = + MetricDescriptor.create(name, description, unit, Type.GAUGE_DOUBLE, labelKeys); + + // initialize defaultLabelValues + defaultLabelValues = new ArrayList<LabelValue>(labelKeysSize); + for (int i = 0; i < labelKeysSize; i++) { + defaultLabelValues.add(UNSET_VALUE); + } + } + + @Override + public DoublePoint getOrCreateTimeSeries(List<LabelValue> labelValues) { + // lock free point retrieval, if it is present + PointImpl existingPoint = registeredPoints.get(labelValues); + if (existingPoint != null) { + return existingPoint; + } + + List<LabelValue> labelValuesCopy = + Collections.unmodifiableList( + new ArrayList<LabelValue>(checkNotNull(labelValues, "labelValues"))); + return registerTimeSeries(labelValuesCopy); + } + + @Override + public DoublePoint getDefaultTimeSeries() { + // lock free default point retrieval, if it is present + PointImpl existingPoint = registeredPoints.get(defaultLabelValues); + if (existingPoint != null) { + return existingPoint; + } + return registerTimeSeries(Collections.unmodifiableList(defaultLabelValues)); + } + + @Override + public synchronized void removeTimeSeries(List<LabelValue> labelValues) { + checkNotNull(labelValues, "labelValues"); + + Map<List<LabelValue>, PointImpl> registeredPointsCopy = + new LinkedHashMap<List<LabelValue>, PointImpl>(registeredPoints); + if (registeredPointsCopy.remove(labelValues) == null) { + // The element not present, no need to update the current map of points. + return; + } + registeredPoints = Collections.unmodifiableMap(registeredPointsCopy); + } + + @Override + public synchronized void clear() { + registeredPoints = Collections.<List<LabelValue>, PointImpl>emptyMap(); + } + + private synchronized DoublePoint registerTimeSeries(List<LabelValue> labelValues) { + PointImpl existingPoint = registeredPoints.get(labelValues); + if (existingPoint != null) { + // Return a Point that are already registered. This can happen if a multiple threads + // concurrently try to register the same {@code TimeSeries}. + return existingPoint; + } + + checkArgument(labelKeysSize == labelValues.size(), "Incorrect number of labels."); + Utils.checkListElementNotNull(labelValues, "labelValue element should not be null."); + + PointImpl newPoint = new PointImpl(labelValues); + // Updating the map of points happens under a lock to avoid multiple add operations + // to happen in the same time. + Map<List<LabelValue>, PointImpl> registeredPointsCopy = + new LinkedHashMap<List<LabelValue>, PointImpl>(registeredPoints); + registeredPointsCopy.put(labelValues, newPoint); + registeredPoints = Collections.unmodifiableMap(registeredPointsCopy); + + return newPoint; + } + + @Nullable + @Override + public Metric getMetric(Clock clock) { + Map<List<LabelValue>, PointImpl> currentRegisteredPoints = registeredPoints; + if (currentRegisteredPoints.isEmpty()) { + return null; + } + + if (currentRegisteredPoints.size() == 1) { + PointImpl point = currentRegisteredPoints.values().iterator().next(); + return Metric.createWithOneTimeSeries(metricDescriptor, point.getTimeSeries(clock)); + } + + List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(currentRegisteredPoints.size()); + for (Map.Entry<List<LabelValue>, PointImpl> entry : currentRegisteredPoints.entrySet()) { + timeSeriesList.add(entry.getValue().getTimeSeries(clock)); + } + return Metric.create(metricDescriptor, timeSeriesList); + } + + /** Implementation of {@link DoubleGauge.DoublePoint}. */ + public static final class PointImpl extends DoublePoint { + + // TODO(mayurkale): Consider to use DoubleAdder here, once we upgrade to Java8. + private final AtomicDouble value = new AtomicDouble(0); + private final List<LabelValue> labelValues; + + PointImpl(List<LabelValue> labelValues) { + this.labelValues = labelValues; + } + + @Override + public void add(double amt) { + value.addAndGet(amt); + } + + @Override + public void set(double val) { + value.set(val); + } + + private TimeSeries getTimeSeries(Clock clock) { + return TimeSeries.createWithOnePoint( + labelValues, Point.create(Value.doubleValue(value.get()), clock.now()), null); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/LongGaugeImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/LongGaugeImpl.java new file mode 100644 index 00000000..3460d7a4 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/LongGaugeImpl.java @@ -0,0 +1,174 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.metrics; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import io.opencensus.common.Clock; +import io.opencensus.implcore.internal.Utils; +import io.opencensus.metrics.LabelKey; +import io.opencensus.metrics.LabelValue; +import io.opencensus.metrics.LongGauge; +import io.opencensus.metrics.export.Metric; +import io.opencensus.metrics.export.MetricDescriptor; +import io.opencensus.metrics.export.MetricDescriptor.Type; +import io.opencensus.metrics.export.Point; +import io.opencensus.metrics.export.TimeSeries; +import io.opencensus.metrics.export.Value; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; + +/** Implementation of {@link LongGauge}. */ +public final class LongGaugeImpl extends LongGauge implements Meter { + @VisibleForTesting static final LabelValue UNSET_VALUE = LabelValue.create(null); + + private final MetricDescriptor metricDescriptor; + private volatile Map<List<LabelValue>, PointImpl> registeredPoints = + Collections.<List<LabelValue>, PointImpl>emptyMap(); + private final int labelKeysSize; + private final List<LabelValue> defaultLabelValues; + + LongGaugeImpl(String name, String description, String unit, List<LabelKey> labelKeys) { + labelKeysSize = labelKeys.size(); + this.metricDescriptor = + MetricDescriptor.create(name, description, unit, Type.GAUGE_INT64, labelKeys); + + // initialize defaultLabelValues + defaultLabelValues = new ArrayList<LabelValue>(labelKeysSize); + for (int i = 0; i < labelKeysSize; i++) { + defaultLabelValues.add(UNSET_VALUE); + } + } + + @Override + public LongPoint getOrCreateTimeSeries(List<LabelValue> labelValues) { + // lock free point retrieval, if it is present + PointImpl existingPoint = registeredPoints.get(labelValues); + if (existingPoint != null) { + return existingPoint; + } + + List<LabelValue> labelValuesCopy = + Collections.unmodifiableList( + new ArrayList<LabelValue>(checkNotNull(labelValues, "labelValues"))); + return registerTimeSeries(labelValuesCopy); + } + + @Override + public LongPoint getDefaultTimeSeries() { + // lock free default point retrieval, if it is present + PointImpl existingPoint = registeredPoints.get(defaultLabelValues); + if (existingPoint != null) { + return existingPoint; + } + return registerTimeSeries(Collections.unmodifiableList(defaultLabelValues)); + } + + @Override + public synchronized void removeTimeSeries(List<LabelValue> labelValues) { + checkNotNull(labelValues, "labelValues"); + + Map<List<LabelValue>, PointImpl> registeredPointsCopy = + new LinkedHashMap<List<LabelValue>, PointImpl>(registeredPoints); + if (registeredPointsCopy.remove(labelValues) == null) { + // The element not present, no need to update the current map of points. + return; + } + registeredPoints = Collections.unmodifiableMap(registeredPointsCopy); + } + + @Override + public synchronized void clear() { + registeredPoints = Collections.<List<LabelValue>, PointImpl>emptyMap(); + } + + private synchronized LongPoint registerTimeSeries(List<LabelValue> labelValues) { + PointImpl existingPoint = registeredPoints.get(labelValues); + if (existingPoint != null) { + // Return a Point that are already registered. This can happen if a multiple threads + // concurrently try to register the same {@code TimeSeries}. + return existingPoint; + } + + checkArgument(labelKeysSize == labelValues.size(), "Incorrect number of labels."); + Utils.checkListElementNotNull(labelValues, "labelValue element should not be null."); + + PointImpl newPoint = new PointImpl(labelValues); + // Updating the map of points happens under a lock to avoid multiple add operations + // to happen in the same time. + Map<List<LabelValue>, PointImpl> registeredPointsCopy = + new LinkedHashMap<List<LabelValue>, PointImpl>(registeredPoints); + registeredPointsCopy.put(labelValues, newPoint); + registeredPoints = Collections.unmodifiableMap(registeredPointsCopy); + + return newPoint; + } + + @Nullable + @Override + public Metric getMetric(Clock clock) { + Map<List<LabelValue>, PointImpl> currentRegisteredPoints = registeredPoints; + if (currentRegisteredPoints.isEmpty()) { + return null; + } + + if (currentRegisteredPoints.size() == 1) { + PointImpl point = currentRegisteredPoints.values().iterator().next(); + return Metric.createWithOneTimeSeries(metricDescriptor, point.getTimeSeries(clock)); + } + + List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(currentRegisteredPoints.size()); + for (Map.Entry<List<LabelValue>, PointImpl> entry : currentRegisteredPoints.entrySet()) { + timeSeriesList.add(entry.getValue().getTimeSeries(clock)); + } + return Metric.create(metricDescriptor, timeSeriesList); + } + + /** Implementation of {@link LongGauge.LongPoint}. */ + public static final class PointImpl extends LongPoint { + + // TODO(mayurkale): Consider to use LongAdder here, once we upgrade to Java8. + private final AtomicLong value = new AtomicLong(0); + private final List<LabelValue> labelValues; + + PointImpl(List<LabelValue> labelValues) { + this.labelValues = labelValues; + } + + @Override + public void add(long amt) { + value.addAndGet(amt); + } + + @Override + public void set(long val) { + value.set(val); + } + + private TimeSeries getTimeSeries(Clock clock) { + return TimeSeries.createWithOnePoint( + labelValues, Point.create(Value.longValue(value.get()), clock.now()), null); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/Meter.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/Meter.java new file mode 100644 index 00000000..f5a8dc8f --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/Meter.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.metrics; + +import io.opencensus.common.Clock; +import io.opencensus.metrics.export.Metric; +import javax.annotation.Nullable; + +interface Meter { + /** + * Provides a {@link io.opencensus.metrics.export.Metric} with one or more {@link + * io.opencensus.metrics.export.TimeSeries}. + * + * @param clock the clock used to get the time. + * @throws NullPointerException if {@code TimeSeries} is not present in {@code Metric}. + * @return a {@code Metric}. + */ + @Nullable + Metric getMetric(Clock clock); +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricRegistryImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricRegistryImpl.java new file mode 100644 index 00000000..1a301ecf --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricRegistryImpl.java @@ -0,0 +1,160 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.metrics; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.opencensus.common.Clock; +import io.opencensus.implcore.internal.Utils; +import io.opencensus.metrics.DerivedDoubleGauge; +import io.opencensus.metrics.DerivedLongGauge; +import io.opencensus.metrics.DoubleGauge; +import io.opencensus.metrics.LabelKey; +import io.opencensus.metrics.LongGauge; +import io.opencensus.metrics.MetricRegistry; +import io.opencensus.metrics.export.Metric; +import io.opencensus.metrics.export.MetricProducer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** Implementation of {@link MetricRegistry}. */ +public final class MetricRegistryImpl extends MetricRegistry { + private final RegisteredMeters registeredMeters; + private final MetricProducer metricProducer; + + MetricRegistryImpl(Clock clock) { + registeredMeters = new RegisteredMeters(); + metricProducer = new MetricProducerForRegistry(registeredMeters, clock); + } + + @Override + public LongGauge addLongGauge( + String name, String description, String unit, List<LabelKey> labelKeys) { + Utils.checkListElementNotNull( + checkNotNull(labelKeys, "labelKeys"), "labelKey element should not be null."); + LongGaugeImpl longGaugeMetric = + new LongGaugeImpl( + checkNotNull(name, "name"), + checkNotNull(description, "description"), + checkNotNull(unit, "unit"), + Collections.unmodifiableList(new ArrayList<LabelKey>(labelKeys))); + registeredMeters.registerMeter(name, longGaugeMetric); + return longGaugeMetric; + } + + @Override + public DoubleGauge addDoubleGauge( + String name, String description, String unit, List<LabelKey> labelKeys) { + Utils.checkListElementNotNull( + checkNotNull(labelKeys, "labelKeys"), "labelKey element should not be null."); + DoubleGaugeImpl doubleGaugeMetric = + new DoubleGaugeImpl( + checkNotNull(name, "name"), + checkNotNull(description, "description"), + checkNotNull(unit, "unit"), + Collections.unmodifiableList(new ArrayList<LabelKey>(labelKeys))); + registeredMeters.registerMeter(name, doubleGaugeMetric); + return doubleGaugeMetric; + } + + @Override + public DerivedLongGauge addDerivedLongGauge( + String name, String description, String unit, List<LabelKey> labelKeys) { + Utils.checkListElementNotNull( + checkNotNull(labelKeys, "labelKeys"), "labelKey element should not be null."); + DerivedLongGaugeImpl derivedLongGauge = + new DerivedLongGaugeImpl( + checkNotNull(name, "name"), + checkNotNull(description, "description"), + checkNotNull(unit, "unit"), + Collections.unmodifiableList(new ArrayList<LabelKey>(labelKeys))); + registeredMeters.registerMeter(name, derivedLongGauge); + return derivedLongGauge; + } + + @Override + public DerivedDoubleGauge addDerivedDoubleGauge( + String name, String description, String unit, List<LabelKey> labelKeys) { + Utils.checkListElementNotNull( + checkNotNull(labelKeys, "labelKeys"), "labelKey element should not be null."); + DerivedDoubleGaugeImpl derivedDoubleGauge = + new DerivedDoubleGaugeImpl( + checkNotNull(name, "name"), + checkNotNull(description, "description"), + checkNotNull(unit, "unit"), + Collections.unmodifiableList(new ArrayList<LabelKey>(labelKeys))); + registeredMeters.registerMeter(name, derivedDoubleGauge); + return derivedDoubleGauge; + } + + private static final class RegisteredMeters { + private volatile Map<String, Meter> registeredMeters = Collections.emptyMap(); + + private Map<String, Meter> getRegisteredMeters() { + return registeredMeters; + } + + private synchronized void registerMeter(String meterName, Meter meter) { + Meter existingMeter = registeredMeters.get(meterName); + if (existingMeter != null) { + // TODO(mayurkale): Allow users to register the same Meter multiple times without exception. + throw new IllegalArgumentException( + "A different metric with the same name already registered."); + } + + Map<String, Meter> registeredMetersCopy = new LinkedHashMap<String, Meter>(registeredMeters); + registeredMetersCopy.put(meterName, meter); + registeredMeters = Collections.unmodifiableMap(registeredMetersCopy); + } + } + + private static final class MetricProducerForRegistry extends MetricProducer { + private final RegisteredMeters registeredMeters; + private final Clock clock; + + private MetricProducerForRegistry(RegisteredMeters registeredMeters, Clock clock) { + this.registeredMeters = registeredMeters; + this.clock = clock; + } + + @Override + public Collection<Metric> getMetrics() { + // Get a snapshot of the current registered meters. + Map<String, Meter> meters = registeredMeters.getRegisteredMeters(); + if (meters.isEmpty()) { + return Collections.emptyList(); + } + + List<Metric> metrics = new ArrayList<Metric>(meters.size()); + for (Map.Entry<String, Meter> entry : meters.entrySet()) { + Metric metric = entry.getValue().getMetric(clock); + if (metric != null) { + metrics.add(metric); + } + } + return metrics; + } + } + + MetricProducer getMetricProducer() { + return metricProducer; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricsComponentImplBase.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricsComponentImplBase.java new file mode 100644 index 00000000..1aef6727 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricsComponentImplBase.java @@ -0,0 +1,45 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.metrics; + +import io.opencensus.common.Clock; +import io.opencensus.implcore.metrics.export.ExportComponentImpl; +import io.opencensus.metrics.MetricsComponent; + +/** Implementation of {@link MetricsComponent}. */ +public class MetricsComponentImplBase extends MetricsComponent { + + private final ExportComponentImpl exportComponent; + private final MetricRegistryImpl metricRegistry; + + @Override + public ExportComponentImpl getExportComponent() { + return exportComponent; + } + + @Override + public MetricRegistryImpl getMetricRegistry() { + return metricRegistry; + } + + protected MetricsComponentImplBase(Clock clock) { + exportComponent = new ExportComponentImpl(); + metricRegistry = new MetricRegistryImpl(clock); + // Register the MetricRegistry's MetricProducer to the global MetricProducerManager. + exportComponent.getMetricProducerManager().add(metricRegistry.getMetricProducer()); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/export/ExportComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/export/ExportComponentImpl.java new file mode 100644 index 00000000..173c3aec --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/export/ExportComponentImpl.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.metrics.export; + +import io.opencensus.metrics.export.ExportComponent; +import io.opencensus.metrics.export.MetricProducerManager; + +/** Implementation of {@link ExportComponent}. */ +public final class ExportComponentImpl extends ExportComponent { + + private final MetricProducerManager metricProducerManager = new MetricProducerManagerImpl(); + + @Override + public MetricProducerManager getMetricProducerManager() { + return metricProducerManager; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/export/MetricProducerManagerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/export/MetricProducerManagerImpl.java new file mode 100644 index 00000000..6f585a10 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/export/MetricProducerManagerImpl.java @@ -0,0 +1,64 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.metrics.export; + +import com.google.common.base.Preconditions; +import io.opencensus.metrics.export.MetricProducer; +import io.opencensus.metrics.export.MetricProducerManager; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; +import javax.annotation.concurrent.ThreadSafe; + +/** Implementation of {@link MetricProducerManager}. */ +@ThreadSafe +public final class MetricProducerManagerImpl extends MetricProducerManager { + + private volatile Set<MetricProducer> metricProducers = + Collections.unmodifiableSet(new LinkedHashSet<MetricProducer>()); + + @Override + public synchronized void add(MetricProducer metricProducer) { + Preconditions.checkNotNull(metricProducer, "metricProducer"); + // Updating the set of MetricProducers happens under a lock to avoid multiple add or remove + // operations to happen in the same time. + Set<MetricProducer> newMetricProducers = new LinkedHashSet<MetricProducer>(metricProducers); + if (!newMetricProducers.add(metricProducer)) { + // The element already present, no need to update the current set of MetricProducers. + return; + } + metricProducers = Collections.unmodifiableSet(newMetricProducers); + } + + @Override + public synchronized void remove(MetricProducer metricProducer) { + Preconditions.checkNotNull(metricProducer, "metricProducer"); + // Updating the set of MetricProducers happens under a lock to avoid multiple add or remove + // operations to happen in the same time. + Set<MetricProducer> newMetricProducers = new LinkedHashSet<MetricProducer>(metricProducers); + if (!newMetricProducers.remove(metricProducer)) { + // The element not present, no need to update the current set of MetricProducers. + return; + } + metricProducers = Collections.unmodifiableSet(newMetricProducers); + } + + @Override + public Set<MetricProducer> getAllMetricProducer() { + return metricProducers; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java b/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java new file mode 100644 index 00000000..172db539 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java @@ -0,0 +1,95 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Maps; +import io.opencensus.common.Duration; +import io.opencensus.common.Timestamp; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.Measure; +import io.opencensus.tags.TagValue; +import java.util.List; +import java.util.Map; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +/** The bucket with aggregated {@code MeasureValue}s used for {@code IntervalViewData}. */ +final class IntervalBucket { + + private static final Duration ZERO = Duration.create(0, 0); + + private final Timestamp start; + private final Duration duration; + private final Aggregation aggregation; + private final Measure measure; + private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap = + Maps.newHashMap(); + + IntervalBucket(Timestamp start, Duration duration, Aggregation aggregation, Measure measure) { + this.start = checkNotNull(start, "Start"); + this.duration = checkNotNull(duration, "Duration"); + checkArgument(duration.compareTo(ZERO) > 0, "Duration must be positive"); + this.aggregation = checkNotNull(aggregation, "Aggregation"); + this.measure = checkNotNull(measure, "measure"); + } + + Map<List</*@Nullable*/ TagValue>, MutableAggregation> getTagValueAggregationMap() { + return tagValueAggregationMap; + } + + Timestamp getStart() { + return start; + } + + // Puts a new value into the internal MutableAggregations, based on the TagValues. + void record( + List</*@Nullable*/ TagValue> tagValues, + double value, + Map<String, String> attachments, + Timestamp timestamp) { + if (!tagValueAggregationMap.containsKey(tagValues)) { + tagValueAggregationMap.put( + tagValues, RecordUtils.createMutableAggregation(aggregation, measure)); + } + tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp); + } + + /* + * Returns how much fraction of duration has passed in this IntervalBucket. For example, if this + * bucket starts at 10s and has a duration of 20s, and now is 15s, then getFraction() should + * return (15 - 10) / 20 = 0.25. + * + * This IntervalBucket must be current, i.e. the current timestamp must be within + * [this.start, this.start + this.duration). + */ + double getFraction(Timestamp now) { + Duration elapsedTime = now.subtractTimestamp(start); + checkArgument( + elapsedTime.compareTo(ZERO) >= 0 && elapsedTime.compareTo(duration) < 0, + "This bucket must be current."); + return ((double) elapsedTime.toMillis()) / duration.toMillis(); + } + + void clearStats() { + tagValueAggregationMap.clear(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java new file mode 100644 index 00000000..ee51796c --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java @@ -0,0 +1,66 @@ +/* + * Copyright 2016-17, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import io.opencensus.stats.Measure.MeasureDouble; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.stats.MeasureMap; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.unsafe.ContextUtils; + +/** Implementation of {@link MeasureMap}. */ +final class MeasureMapImpl extends MeasureMap { + private final StatsManager statsManager; + private final MeasureMapInternal.Builder builder = MeasureMapInternal.builder(); + + static MeasureMapImpl create(StatsManager statsManager) { + return new MeasureMapImpl(statsManager); + } + + private MeasureMapImpl(StatsManager statsManager) { + this.statsManager = statsManager; + } + + @Override + public MeasureMapImpl put(MeasureDouble measure, double value) { + builder.put(measure, value); + return this; + } + + @Override + public MeasureMapImpl put(MeasureLong measure, long value) { + builder.put(measure, value); + return this; + } + + @Override + public MeasureMap putAttachment(String key, String value) { + builder.putAttachment(key, value); + return this; + } + + @Override + public void record() { + // Use the context key directly, to avoid depending on the tags implementation. + record(ContextUtils.TAG_CONTEXT_KEY.get()); + } + + @Override + public void record(TagContext tags) { + statsManager.record(tags, builder.build()); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java new file mode 100644 index 00000000..d867b342 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java @@ -0,0 +1,138 @@ +/* + * Copyright 2016-17, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measure.MeasureDouble; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.stats.Measurement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; + +// TODO(songya): consider combining MeasureMapImpl and this class. +/** A map from {@link Measure}'s to measured values. */ +final class MeasureMapInternal { + + /** Returns a {@link Builder} for the {@link MeasureMapInternal} class. */ + static Builder builder() { + return new Builder(); + } + + /** + * Returns an {@link Iterator} over the measure/value mappings in this {@link MeasureMapInternal}. + * The {@code Iterator} does not support {@link Iterator#remove()}. + */ + Iterator<Measurement> iterator() { + return new MeasureMapInternalIterator(); + } + + // Returns the contextual information associated with an example value. + Map<String, String> getAttachments() { + return attachments; + } + + private final ArrayList<Measurement> measurements; + private final Map<String, String> attachments; + + private MeasureMapInternal(ArrayList<Measurement> measurements, Map<String, String> attachments) { + this.measurements = measurements; + this.attachments = Collections.unmodifiableMap(new HashMap<String, String>(attachments)); + } + + /** Builder for the {@link MeasureMapInternal} class. */ + static class Builder { + /** + * Associates the {@link MeasureDouble} with the given value. Subsequent updates to the same + * {@link MeasureDouble} will overwrite the previous value. + * + * @param measure the {@link MeasureDouble} + * @param value the value to be associated with {@code measure} + * @return this + */ + Builder put(MeasureDouble measure, double value) { + measurements.add(Measurement.MeasurementDouble.create(measure, value)); + return this; + } + + /** + * Associates the {@link MeasureLong} with the given value. Subsequent updates to the same + * {@link MeasureLong} will overwrite the previous value. + * + * @param measure the {@link MeasureLong} + * @param value the value to be associated with {@code measure} + * @return this + */ + Builder put(MeasureLong measure, long value) { + measurements.add(Measurement.MeasurementLong.create(measure, value)); + return this; + } + + Builder putAttachment(String key, String value) { + this.attachments.put(key, value); + return this; + } + + /** Constructs a {@link MeasureMapInternal} from the current measurements. */ + MeasureMapInternal build() { + // Note: this makes adding measurements quadratic but is fastest for the sizes of + // MeasureMapInternals that we should see. We may want to go to a strategy of sort/eliminate + // for larger MeasureMapInternals. + for (int i = measurements.size() - 1; i >= 0; i--) { + for (int j = i - 1; j >= 0; j--) { + if (measurements.get(i).getMeasure() == measurements.get(j).getMeasure()) { + measurements.remove(j); + j--; + } + } + } + return new MeasureMapInternal(measurements, attachments); + } + + private final ArrayList<Measurement> measurements = new ArrayList<Measurement>(); + private final Map<String, String> attachments = new HashMap<String, String>(); + + private Builder() {} + } + + // Provides an unmodifiable Iterator over this instance's measurements. + private final class MeasureMapInternalIterator implements Iterator<Measurement> { + @Override + public boolean hasNext() { + return position < length; + } + + @Override + public Measurement next() { + if (position >= measurements.size()) { + throw new NoSuchElementException(); + } + return measurements.get(position++); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private final int length = measurements.size(); + private int position = 0; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java new file mode 100644 index 00000000..5da0cad8 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java @@ -0,0 +1,194 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.opencensus.common.Clock; +import io.opencensus.common.Timestamp; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.metrics.export.Metric; +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measurement; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagContext; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import javax.annotation.concurrent.GuardedBy; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +/** A class that stores a singleton map from {@code MeasureName}s to {@link MutableViewData}s. */ +@SuppressWarnings("deprecation") +final class MeasureToViewMap { + + /* + * A synchronized singleton map that stores the one-to-many mapping from Measures + * to MutableViewDatas. + */ + @GuardedBy("this") + private final Multimap<String, MutableViewData> mutableMap = + HashMultimap.<String, MutableViewData>create(); + + @GuardedBy("this") + private final Map<View.Name, View> registeredViews = new HashMap<View.Name, View>(); + + // TODO(songya): consider adding a Measure.Name class + @GuardedBy("this") + private final Map<String, Measure> registeredMeasures = Maps.newHashMap(); + + // Cached set of exported views. It must be set to null whenever a view is registered or + // unregistered. + @javax.annotation.Nullable private volatile Set<View> exportedViews; + + /** Returns a {@link ViewData} corresponding to the given {@link View.Name}. */ + @javax.annotation.Nullable + synchronized ViewData getView(View.Name viewName, Clock clock, State state) { + MutableViewData view = getMutableViewData(viewName); + return view == null ? null : view.toViewData(clock.now(), state); + } + + Set<View> getExportedViews() { + Set<View> views = exportedViews; + if (views == null) { + synchronized (this) { + exportedViews = views = filterExportedViews(registeredViews.values()); + } + } + return views; + } + + // Returns the subset of the given views that should be exported + private static Set<View> filterExportedViews(Collection<View> allViews) { + Set<View> views = Sets.newHashSet(); + for (View view : allViews) { + if (view.getWindow() instanceof View.AggregationWindow.Cumulative) { + views.add(view); + } + } + return Collections.unmodifiableSet(views); + } + + /** Enable stats collection for the given {@link View}. */ + synchronized void registerView(View view, Clock clock) { + exportedViews = null; + View existing = registeredViews.get(view.getName()); + if (existing != null) { + if (existing.equals(view)) { + // Ignore views that are already registered. + return; + } else { + throw new IllegalArgumentException( + "A different view with the same name is already registered: " + existing); + } + } + Measure measure = view.getMeasure(); + Measure registeredMeasure = registeredMeasures.get(measure.getName()); + if (registeredMeasure != null && !registeredMeasure.equals(measure)) { + throw new IllegalArgumentException( + "A different measure with the same name is already registered: " + registeredMeasure); + } + registeredViews.put(view.getName(), view); + if (registeredMeasure == null) { + registeredMeasures.put(measure.getName(), measure); + } + Timestamp now = clock.now(); + mutableMap.put(view.getMeasure().getName(), MutableViewData.create(view, now)); + } + + @javax.annotation.Nullable + private synchronized MutableViewData getMutableViewData(View.Name viewName) { + View view = registeredViews.get(viewName); + if (view == null) { + return null; + } + Collection<MutableViewData> views = mutableMap.get(view.getMeasure().getName()); + for (MutableViewData viewData : views) { + if (viewData.getView().getName().equals(viewName)) { + return viewData; + } + } + throw new AssertionError( + "Internal error: Not recording stats for view: \"" + + viewName + + "\" registeredViews=" + + registeredViews + + ", mutableMap=" + + mutableMap); + } + + // Records stats with a set of tags. + synchronized void record(TagContext tags, MeasureMapInternal stats, Timestamp timestamp) { + Iterator<Measurement> iterator = stats.iterator(); + Map<String, String> attachments = stats.getAttachments(); + while (iterator.hasNext()) { + Measurement measurement = iterator.next(); + Measure measure = measurement.getMeasure(); + if (!measure.equals(registeredMeasures.get(measure.getName()))) { + // unregistered measures will be ignored. + continue; + } + Collection<MutableViewData> viewDataCollection = mutableMap.get(measure.getName()); + for (MutableViewData viewData : viewDataCollection) { + viewData.record( + tags, RecordUtils.getDoubleValueFromMeasurement(measurement), timestamp, attachments); + } + } + } + + synchronized List<Metric> getMetrics(Clock clock, State state) { + List<Metric> metrics = new ArrayList<Metric>(); + Timestamp now = clock.now(); + for (Entry<String, MutableViewData> entry : mutableMap.entries()) { + Metric metric = entry.getValue().toMetric(now, state); + if (metric != null) { + metrics.add(metric); + } + } + return metrics; + } + + // Clear stats for all the current MutableViewData + synchronized void clearStats() { + for (Entry<String, Collection<MutableViewData>> entry : mutableMap.asMap().entrySet()) { + for (MutableViewData mutableViewData : entry.getValue()) { + mutableViewData.clearStats(); + } + } + } + + // Resume stats collection for all MutableViewData. + synchronized void resumeStatsCollection(Timestamp now) { + for (Entry<String, Collection<MutableViewData>> entry : mutableMap.asMap().entrySet()) { + for (MutableViewData mutableViewData : entry.getValue()) { + mutableViewData.resumeStatsCollection(now); + } + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java new file mode 100644 index 00000000..7bf92572 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java @@ -0,0 +1,38 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import io.opencensus.metrics.export.Metric; +import io.opencensus.metrics.export.MetricProducer; +import java.util.Collection; +import javax.annotation.concurrent.ThreadSafe; + +/** Implementation of {@link MetricProducer}. */ +@ThreadSafe +final class MetricProducerImpl extends MetricProducer { + + private final StatsManager statsManager; + + MetricProducerImpl(StatsManager statsManager) { + this.statsManager = statsManager; + } + + @Override + public Collection<Metric> getMetrics() { + return statsManager.getMetrics(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java new file mode 100644 index 00000000..0dfb1d26 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java @@ -0,0 +1,118 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import com.google.common.annotations.VisibleForTesting; +import io.opencensus.common.Function; +import io.opencensus.common.Functions; +import io.opencensus.metrics.LabelKey; +import io.opencensus.metrics.LabelValue; +import io.opencensus.metrics.export.MetricDescriptor; +import io.opencensus.metrics.export.MetricDescriptor.Type; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.Measure; +import io.opencensus.stats.View; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.util.ArrayList; +import java.util.List; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +@SuppressWarnings("deprecation") +// Utils to convert Stats data models to Metric data models. +final class MetricUtils { + + @javax.annotation.Nullable + static MetricDescriptor viewToMetricDescriptor(View view) { + if (view.getWindow() instanceof View.AggregationWindow.Interval) { + // Only creates Metric for cumulative stats. + return null; + } + List<LabelKey> labelKeys = new ArrayList<LabelKey>(); + for (TagKey tagKey : view.getColumns()) { + // TODO: add description + labelKeys.add(LabelKey.create(tagKey.getName(), "")); + } + Measure measure = view.getMeasure(); + return MetricDescriptor.create( + view.getName().asString(), + view.getDescription(), + measure.getUnit(), + getType(measure, view.getAggregation()), + labelKeys); + } + + @VisibleForTesting + static Type getType(Measure measure, Aggregation aggregation) { + return aggregation.match( + Functions.returnConstant( + measure.match( + TYPE_CUMULATIVE_DOUBLE_FUNCTION, // Sum Double + TYPE_CUMULATIVE_INT64_FUNCTION, // Sum Int64 + TYPE_UNRECOGNIZED_FUNCTION)), + TYPE_CUMULATIVE_INT64_FUNCTION, // Count + TYPE_CUMULATIVE_DISTRIBUTION_FUNCTION, // Distribution + Functions.returnConstant( + measure.match( + TYPE_GAUGE_DOUBLE_FUNCTION, // LastValue Double + TYPE_GAUGE_INT64_FUNCTION, // LastValue Long + TYPE_UNRECOGNIZED_FUNCTION)), + AGGREGATION_TYPE_DEFAULT_FUNCTION); + } + + static List<LabelValue> tagValuesToLabelValues(List</*@Nullable*/ TagValue> tagValues) { + List<LabelValue> labelValues = new ArrayList<LabelValue>(); + for (/*@Nullable*/ TagValue tagValue : tagValues) { + labelValues.add(LabelValue.create(tagValue == null ? null : tagValue.asString())); + } + return labelValues; + } + + private static final Function<Object, Type> TYPE_CUMULATIVE_DOUBLE_FUNCTION = + Functions.returnConstant(Type.CUMULATIVE_DOUBLE); + + private static final Function<Object, Type> TYPE_CUMULATIVE_INT64_FUNCTION = + Functions.returnConstant(Type.CUMULATIVE_INT64); + + private static final Function<Object, Type> TYPE_CUMULATIVE_DISTRIBUTION_FUNCTION = + Functions.returnConstant(Type.CUMULATIVE_DISTRIBUTION); + + private static final Function<Object, Type> TYPE_GAUGE_DOUBLE_FUNCTION = + Functions.returnConstant(Type.GAUGE_DOUBLE); + + private static final Function<Object, Type> TYPE_GAUGE_INT64_FUNCTION = + Functions.returnConstant(Type.GAUGE_INT64); + + private static final Function<Object, Type> TYPE_UNRECOGNIZED_FUNCTION = + Functions.<Type>throwAssertionError(); + + private static final Function<Aggregation, Type> AGGREGATION_TYPE_DEFAULT_FUNCTION = + new Function<Aggregation, Type>() { + @Override + public Type apply(Aggregation arg) { + if (arg instanceof Aggregation.Mean) { + return Type.CUMULATIVE_DOUBLE; // Mean + } + throw new AssertionError(); + } + }; + + private MetricUtils() {} +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java new file mode 100644 index 00000000..6e2bff1c --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java @@ -0,0 +1,556 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import io.opencensus.common.Timestamp; +import io.opencensus.metrics.export.Distribution; +import io.opencensus.metrics.export.Distribution.BucketOptions; +import io.opencensus.metrics.export.Point; +import io.opencensus.metrics.export.Value; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.AggregationData.DistributionData; +import io.opencensus.stats.AggregationData.DistributionData.Exemplar; +import io.opencensus.stats.BucketBoundaries; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** Mutable version of {@link Aggregation} that supports adding values. */ +abstract class MutableAggregation { + + private MutableAggregation() {} + + // Tolerance for double comparison. + private static final double TOLERANCE = 1e-6; + + /** + * Put a new value into the MutableAggregation. + * + * @param value new value to be added to population + * @param attachments the contextual information on an {@link Exemplar} + * @param timestamp the timestamp when the value is recorded + */ + abstract void add(double value, Map<String, String> attachments, Timestamp timestamp); + + // TODO(songya): remove this method once interval stats is completely removed. + /** + * Combine the internal values of this MutableAggregation and value of the given + * MutableAggregation, with the given fraction. Then set the internal value of this + * MutableAggregation to the combined value. + * + * @param other the other {@code MutableAggregation}. The type of this and other {@code + * MutableAggregation} must match. + * @param fraction the fraction that the value in other {@code MutableAggregation} should + * contribute. Must be within [0.0, 1.0]. + */ + abstract void combine(MutableAggregation other, double fraction); + + abstract AggregationData toAggregationData(); + + abstract Point toPoint(Timestamp timestamp); + + /** Calculate sum of doubles on aggregated {@code MeasureValue}s. */ + static class MutableSumDouble extends MutableAggregation { + + private double sum = 0.0; + + private MutableSumDouble() {} + + /** + * Construct a {@code MutableSumDouble}. + * + * @return an empty {@code MutableSumDouble}. + */ + static MutableSumDouble create() { + return new MutableSumDouble(); + } + + @Override + void add(double value, Map<String, String> attachments, Timestamp timestamp) { + sum += value; + } + + @Override + void combine(MutableAggregation other, double fraction) { + checkArgument(other instanceof MutableSumDouble, "MutableSumDouble expected."); + this.sum += fraction * ((MutableSumDouble) other).sum; + } + + @Override + AggregationData toAggregationData() { + return AggregationData.SumDataDouble.create(sum); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.doubleValue(sum), timestamp); + } + + @VisibleForTesting + double getSum() { + return sum; + } + } + + /** Calculate sum of longs on aggregated {@code MeasureValue}s. */ + static final class MutableSumLong extends MutableSumDouble { + private MutableSumLong() { + super(); + } + + /** + * Construct a {@code MutableSumLong}. + * + * @return an empty {@code MutableSumLong}. + */ + static MutableSumLong create() { + return new MutableSumLong(); + } + + @Override + AggregationData toAggregationData() { + return AggregationData.SumDataLong.create(Math.round(getSum())); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.longValue(Math.round(getSum())), timestamp); + } + } + + /** Calculate count on aggregated {@code MeasureValue}s. */ + static final class MutableCount extends MutableAggregation { + + private long count = 0; + + private MutableCount() {} + + /** + * Construct a {@code MutableCount}. + * + * @return an empty {@code MutableCount}. + */ + static MutableCount create() { + return new MutableCount(); + } + + @Override + void add(double value, Map<String, String> attachments, Timestamp timestamp) { + count++; + } + + @Override + void combine(MutableAggregation other, double fraction) { + checkArgument(other instanceof MutableCount, "MutableCount expected."); + this.count += Math.round(fraction * ((MutableCount) other).getCount()); + } + + @Override + AggregationData toAggregationData() { + return AggregationData.CountData.create(count); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.longValue(count), timestamp); + } + + /** + * Returns the aggregated count. + * + * @return the aggregated count. + */ + long getCount() { + return count; + } + } + + /** Calculate mean on aggregated {@code MeasureValue}s. */ + static final class MutableMean extends MutableAggregation { + + private double sum = 0.0; + private long count = 0; + + private MutableMean() {} + + /** + * Construct a {@code MutableMean}. + * + * @return an empty {@code MutableMean}. + */ + static MutableMean create() { + return new MutableMean(); + } + + @Override + void add(double value, Map<String, String> attachments, Timestamp timestamp) { + count++; + sum += value; + } + + @Override + void combine(MutableAggregation other, double fraction) { + checkArgument(other instanceof MutableMean, "MutableMean expected."); + MutableMean mutableMean = (MutableMean) other; + this.count += Math.round(mutableMean.count * fraction); + this.sum += mutableMean.sum * fraction; + } + + @SuppressWarnings("deprecation") + @Override + AggregationData toAggregationData() { + return AggregationData.MeanData.create(getMean(), count); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.doubleValue(getMean()), timestamp); + } + + /** + * Returns the aggregated mean. + * + * @return the aggregated mean. + */ + double getMean() { + return count == 0 ? 0 : sum / count; + } + + /** + * Returns the aggregated count. + * + * @return the aggregated count. + */ + long getCount() { + return count; + } + + @VisibleForTesting + double getSum() { + return sum; + } + } + + /** Calculate distribution stats on aggregated {@code MeasureValue}s. */ + static final class MutableDistribution extends MutableAggregation { + + private double sum = 0.0; + private double mean = 0.0; + private long count = 0; + private double sumOfSquaredDeviations = 0.0; + + // Initial "impossible" values, that will get reset as soon as first value is added. + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + + private final BucketBoundaries bucketBoundaries; + private final long[] bucketCounts; + + // If there's a histogram (i.e bucket boundaries are not empty) in this MutableDistribution, + // exemplars will have the same size to bucketCounts; otherwise exemplars are null. + // Only the newest exemplar will be kept at each index. + @javax.annotation.Nullable private final Exemplar[] exemplars; + + private MutableDistribution(BucketBoundaries bucketBoundaries) { + this.bucketBoundaries = bucketBoundaries; + int buckets = bucketBoundaries.getBoundaries().size() + 1; + this.bucketCounts = new long[buckets]; + // In the implementation, each histogram bucket can have up to one exemplar, and the exemplar + // array is guaranteed to be in ascending order. + // If there's no histogram, don't record exemplars. + this.exemplars = bucketBoundaries.getBoundaries().isEmpty() ? null : new Exemplar[buckets]; + } + + /** + * Construct a {@code MutableDistribution}. + * + * @return an empty {@code MutableDistribution}. + */ + static MutableDistribution create(BucketBoundaries bucketBoundaries) { + checkNotNull(bucketBoundaries, "bucketBoundaries should not be null."); + return new MutableDistribution(bucketBoundaries); + } + + @Override + void add(double value, Map<String, String> attachments, Timestamp timestamp) { + sum += value; + count++; + + /* + * Update the sum of squared deviations from the mean with the given value. For values + * x_i this is Sum[i=1..n]((x_i - mean)^2) + * + * Computed using Welfords method (see + * https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance, or Knuth, "The Art of + * Computer Programming", Vol. 2, page 323, 3rd edition) + */ + double deltaFromMean = value - mean; + mean += deltaFromMean / count; + double deltaFromMean2 = value - mean; + sumOfSquaredDeviations += deltaFromMean * deltaFromMean2; + + if (value < min) { + min = value; + } + if (value > max) { + max = value; + } + + int bucket = 0; + for (; bucket < bucketBoundaries.getBoundaries().size(); bucket++) { + if (value < bucketBoundaries.getBoundaries().get(bucket)) { + break; + } + } + bucketCounts[bucket]++; + + // No implicit recording for exemplars - if there are no attachments (contextual information), + // don't record exemplars. + if (!attachments.isEmpty() && exemplars != null) { + exemplars[bucket] = Exemplar.create(value, timestamp, attachments); + } + } + + // We don't compute fractional MutableDistribution, it's either whole or none. + @Override + void combine(MutableAggregation other, double fraction) { + checkArgument(other instanceof MutableDistribution, "MutableDistribution expected."); + if (Math.abs(1.0 - fraction) > TOLERANCE) { + return; + } + + MutableDistribution mutableDistribution = (MutableDistribution) other; + checkArgument( + this.bucketBoundaries.equals(mutableDistribution.bucketBoundaries), + "Bucket boundaries should match."); + + // Algorithm for calculating the combination of sum of squared deviations: + // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm. + if (this.count + mutableDistribution.count > 0) { + double delta = mutableDistribution.mean - this.mean; + this.sumOfSquaredDeviations = + this.sumOfSquaredDeviations + + mutableDistribution.sumOfSquaredDeviations + + Math.pow(delta, 2) + * this.count + * mutableDistribution.count + / (this.count + mutableDistribution.count); + } + + this.count += mutableDistribution.count; + this.sum += mutableDistribution.sum; + this.mean = this.sum / this.count; + + if (mutableDistribution.min < this.min) { + this.min = mutableDistribution.min; + } + if (mutableDistribution.max > this.max) { + this.max = mutableDistribution.max; + } + + long[] bucketCounts = mutableDistribution.getBucketCounts(); + for (int i = 0; i < bucketCounts.length; i++) { + this.bucketCounts[i] += bucketCounts[i]; + } + + Exemplar[] otherExemplars = mutableDistribution.getExemplars(); + if (exemplars != null && otherExemplars != null) { + for (int i = 0; i < otherExemplars.length; i++) { + Exemplar exemplar = otherExemplars[i]; + // Assume other is always newer than this, because we combined interval buckets in time + // order. + // If there's a newer exemplar, overwrite current value. + if (exemplar != null) { + this.exemplars[i] = exemplar; + } + } + } + } + + @Override + AggregationData toAggregationData() { + List<Long> boxedBucketCounts = new ArrayList<Long>(); + for (long bucketCount : bucketCounts) { + boxedBucketCounts.add(bucketCount); + } + List<Exemplar> exemplarList = new ArrayList<Exemplar>(); + if (exemplars != null) { + for (Exemplar exemplar : exemplars) { + if (exemplar != null) { + exemplarList.add(exemplar); + } + } + } + return DistributionData.create( + mean, count, min, max, sumOfSquaredDeviations, boxedBucketCounts, exemplarList); + } + + @Override + Point toPoint(Timestamp timestamp) { + List<Distribution.Bucket> buckets = new ArrayList<Distribution.Bucket>(); + for (int bucket = 0; bucket < bucketCounts.length; bucket++) { + long bucketCount = bucketCounts[bucket]; + @javax.annotation.Nullable AggregationData.DistributionData.Exemplar exemplar = null; + if (exemplars != null) { + exemplar = exemplars[bucket]; + } + + Distribution.Bucket metricBucket; + if (exemplar != null) { + // Bucket with an Exemplar. + metricBucket = + Distribution.Bucket.create( + bucketCount, + Distribution.Exemplar.create( + exemplar.getValue(), exemplar.getTimestamp(), exemplar.getAttachments())); + } else { + // Bucket with no Exemplar. + metricBucket = Distribution.Bucket.create(bucketCount); + } + buckets.add(metricBucket); + } + + // TODO(mayurkale): Drop the first bucket when converting to metrics. + // Reason: In Stats API, bucket bounds begin with -infinity (first bucket is (-infinity, 0)). + BucketOptions bucketOptions = BucketOptions.explicitOptions(bucketBoundaries.getBoundaries()); + + return Point.create( + Value.distributionValue( + Distribution.create( + count, mean * count, sumOfSquaredDeviations, bucketOptions, buckets)), + timestamp); + } + + double getMean() { + return mean; + } + + long getCount() { + return count; + } + + double getMin() { + return min; + } + + double getMax() { + return max; + } + + // Returns the aggregated sum of squared deviations. + double getSumOfSquaredDeviations() { + return sumOfSquaredDeviations; + } + + long[] getBucketCounts() { + return bucketCounts; + } + + BucketBoundaries getBucketBoundaries() { + return bucketBoundaries; + } + + @javax.annotation.Nullable + Exemplar[] getExemplars() { + return exemplars; + } + } + + /** Calculate double last value on aggregated {@code MeasureValue}s. */ + static class MutableLastValueDouble extends MutableAggregation { + + // Initial value that will get reset as soon as first value is added. + private double lastValue = Double.NaN; + // TODO(songya): remove this once interval stats is completely removed. + private boolean initialized = false; + + private MutableLastValueDouble() {} + + /** + * Construct a {@code MutableLastValueDouble}. + * + * @return an empty {@code MutableLastValueDouble}. + */ + static MutableLastValueDouble create() { + return new MutableLastValueDouble(); + } + + @Override + void add(double value, Map<String, String> attachments, Timestamp timestamp) { + lastValue = value; + // TODO(songya): remove this once interval stats is completely removed. + if (!initialized) { + initialized = true; + } + } + + @Override + void combine(MutableAggregation other, double fraction) { + checkArgument(other instanceof MutableLastValueDouble, "MutableLastValueDouble expected."); + MutableLastValueDouble otherValue = (MutableLastValueDouble) other; + // Assume other is always newer than this, because we combined interval buckets in time order. + // If there's a newer value, overwrite current value. + this.lastValue = otherValue.initialized ? otherValue.getLastValue() : this.lastValue; + } + + @Override + AggregationData toAggregationData() { + return AggregationData.LastValueDataDouble.create(lastValue); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.doubleValue(lastValue), timestamp); + } + + @VisibleForTesting + double getLastValue() { + return lastValue; + } + } + + /** Calculate last long value on aggregated {@code MeasureValue}s. */ + static final class MutableLastValueLong extends MutableLastValueDouble { + private MutableLastValueLong() { + super(); + } + + /** + * Construct a {@code MutableLastValueLong}. + * + * @return an empty {@code MutableLastValueLong}. + */ + static MutableLastValueLong create() { + return new MutableLastValueLong(); + } + + @Override + AggregationData toAggregationData() { + return AggregationData.LastValueDataLong.create(Math.round(getLastValue())); + } + + @Override + Point toPoint(Timestamp timestamp) { + return Point.create(Value.longValue(Math.round(getLastValue())), timestamp); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java new file mode 100644 index 00000000..928675e9 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java @@ -0,0 +1,464 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.opencensus.implcore.stats.RecordUtils.createAggregationMap; +import static io.opencensus.implcore.stats.RecordUtils.createMutableAggregation; +import static io.opencensus.implcore.stats.RecordUtils.getTagMap; +import static io.opencensus.implcore.stats.RecordUtils.getTagValues; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import io.opencensus.common.Duration; +import io.opencensus.common.Function; +import io.opencensus.common.Functions; +import io.opencensus.common.Timestamp; +import io.opencensus.implcore.internal.CheckerFrameworkUtils; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.metrics.LabelValue; +import io.opencensus.metrics.export.Metric; +import io.opencensus.metrics.export.MetricDescriptor; +import io.opencensus.metrics.export.MetricDescriptor.Type; +import io.opencensus.metrics.export.Point; +import io.opencensus.metrics.export.TimeSeries; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.Measure; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagValue; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +/** A mutable version of {@link ViewData}, used for recording stats and start/end time. */ +@SuppressWarnings("deprecation") +abstract class MutableViewData { + + @VisibleForTesting static final Timestamp ZERO_TIMESTAMP = Timestamp.create(0, 0); + + private final View view; + + private MutableViewData(View view) { + this.view = view; + } + + /** + * Constructs a new {@link MutableViewData}. + * + * @param view the {@code View} linked with this {@code MutableViewData}. + * @param start the start {@code Timestamp}. + * @return a {@code MutableViewData}. + */ + static MutableViewData create(final View view, final Timestamp start) { + return view.getWindow() + .match( + new CreateCumulative(view, start), + new CreateInterval(view, start), + Functions.<MutableViewData>throwAssertionError()); + } + + /** The {@link View} associated with this {@link ViewData}. */ + View getView() { + return view; + } + + @javax.annotation.Nullable + abstract Metric toMetric(Timestamp now, State state); + + /** Record stats with the given tags. */ + abstract void record( + TagContext context, double value, Timestamp timestamp, Map<String, String> attachments); + + /** Convert this {@link MutableViewData} to {@link ViewData}. */ + abstract ViewData toViewData(Timestamp now, State state); + + // Clear recorded stats. + abstract void clearStats(); + + // Resume stats collection, and reset Start Timestamp (for CumulativeMutableViewData), or refresh + // bucket list (for InternalMutableViewData). + abstract void resumeStatsCollection(Timestamp now); + + private static final class CumulativeMutableViewData extends MutableViewData { + + private Timestamp start; + private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap = + Maps.newHashMap(); + // Cache a MetricDescriptor to avoid converting View to MetricDescriptor in the future. + private final MetricDescriptor metricDescriptor; + + private CumulativeMutableViewData(View view, Timestamp start) { + super(view); + this.start = start; + MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view); + if (metricDescriptor == null) { + throw new AssertionError( + "Cumulative view should be converted to a non-null MetricDescriptor."); + } else { + this.metricDescriptor = metricDescriptor; + } + } + + @javax.annotation.Nullable + @Override + Metric toMetric(Timestamp now, State state) { + if (state == State.DISABLED) { + return null; + } + Type type = metricDescriptor.getType(); + @javax.annotation.Nullable + Timestamp startTime = type == Type.GAUGE_INT64 || type == Type.GAUGE_DOUBLE ? null : start; + List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(); + for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry : + tagValueAggregationMap.entrySet()) { + List<LabelValue> labelValues = MetricUtils.tagValuesToLabelValues(entry.getKey()); + Point point = entry.getValue().toPoint(now); + timeSeriesList.add(TimeSeries.createWithOnePoint(labelValues, point, startTime)); + } + return Metric.create(metricDescriptor, timeSeriesList); + } + + @Override + void record( + TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) { + List</*@Nullable*/ TagValue> tagValues = + getTagValues(getTagMap(context), super.view.getColumns()); + if (!tagValueAggregationMap.containsKey(tagValues)) { + tagValueAggregationMap.put( + tagValues, + createMutableAggregation(super.view.getAggregation(), super.getView().getMeasure())); + } + tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp); + } + + @Override + ViewData toViewData(Timestamp now, State state) { + if (state == State.ENABLED) { + return ViewData.create( + super.view, + createAggregationMap(tagValueAggregationMap, super.view.getMeasure()), + ViewData.AggregationWindowData.CumulativeData.create(start, now)); + } else { + // If Stats state is DISABLED, return an empty ViewData. + return ViewData.create( + super.view, + Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(), + ViewData.AggregationWindowData.CumulativeData.create(ZERO_TIMESTAMP, ZERO_TIMESTAMP)); + } + } + + @Override + void clearStats() { + tagValueAggregationMap.clear(); + } + + @Override + void resumeStatsCollection(Timestamp now) { + start = now; + } + } + + /* + * For each IntervalView, we always keep a queue of N + 1 buckets (by default N is 4). + * Each bucket has a duration which is interval duration / N. + * Ideally: + * 1. the buckets should always be up-to-date, + * 2. current time should always be within the latest bucket, currently recorded stats should fall + * into the latest bucket, + * 3. there are always N buckets before the current one, which holds the stats in the past + * interval duration. + * + * When getView() is called, we will extract and combine the stats from the current and past + * buckets (part of the stats from the oldest bucket could have expired). + * + * However, in reality, we couldn't track the status of buckets all the time (keep monitoring and + * updating the bucket queue will be expensive). When we call record() or getView(), some or all + * of the buckets might be outdated, and we will need to "pad" new buckets to the queue and remove + * outdated ones. After refreshing buckets, the bucket queue will able to maintain the three + * invariants in the ideal situation. + * + * For example: + * 1. We have an IntervalView which has a duration of 8 seconds, we register this view at 10s. + * 2. Initially there will be 5 buckets: [2.0, 4.0), [4.0, 6.0), ..., [10.0, 12.0). + * 3. If users don't call record() or getView(), bucket queue will remain as it is, and some + * buckets could expire. + * 4. Suppose record() is called at 15s, now we need to refresh the bucket queue. We need to add + * two new buckets [12.0, 14.0) and [14.0, 16.0), and remove two expired buckets [2.0, 4.0) + * and [4.0, 6.0) + * 5. Suppose record() is called again at 30s, all the current buckets should have expired. We add + * 5 new buckets [22.0, 24.0) ... [30.0, 32.0) and remove all the previous buckets. + * 6. Suppose users call getView() at 35s, again we need to add two new buckets and remove two + * expired one, so that bucket queue is up-to-date. Now we combine stats from all buckets and + * return the combined IntervalViewData. + */ + private static final class IntervalMutableViewData extends MutableViewData { + + // TODO(songya): allow customizable bucket size in the future. + private static final int N = 4; // IntervalView has N + 1 buckets + + private final ArrayDeque<IntervalBucket> buckets = new ArrayDeque<IntervalBucket>(); + + private final Duration totalDuration; // Duration of the whole interval. + private final Duration bucketDuration; // Duration of a single bucket (totalDuration / N) + + private IntervalMutableViewData(View view, Timestamp start) { + super(view); + Duration totalDuration = ((View.AggregationWindow.Interval) view.getWindow()).getDuration(); + this.totalDuration = totalDuration; + this.bucketDuration = Duration.fromMillis(totalDuration.toMillis() / N); + + // When initializing. add N empty buckets prior to the start timestamp of this + // IntervalMutableViewData, so that the last bucket will be the current one in effect. + shiftBucketList(N + 1, start); + } + + @javax.annotation.Nullable + @Override + Metric toMetric(Timestamp now, State state) { + return null; + } + + @Override + void record( + TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) { + List</*@Nullable*/ TagValue> tagValues = + getTagValues(getTagMap(context), super.view.getColumns()); + refreshBucketList(timestamp); + // It is always the last bucket that does the recording. + CheckerFrameworkUtils.castNonNull(buckets.peekLast()) + .record(tagValues, value, attachments, timestamp); + } + + @Override + ViewData toViewData(Timestamp now, State state) { + refreshBucketList(now); + if (state == State.ENABLED) { + return ViewData.create( + super.view, + combineBucketsAndGetAggregationMap(now), + ViewData.AggregationWindowData.IntervalData.create(now)); + } else { + // If Stats state is DISABLED, return an empty ViewData. + return ViewData.create( + super.view, + Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(), + ViewData.AggregationWindowData.IntervalData.create(ZERO_TIMESTAMP)); + } + } + + @Override + void clearStats() { + for (IntervalBucket bucket : buckets) { + bucket.clearStats(); + } + } + + @Override + void resumeStatsCollection(Timestamp now) { + // Refresh bucket list to be ready for stats recording, so that if record() is called right + // after stats state is turned back on, record() will be faster. + refreshBucketList(now); + } + + // Add new buckets and remove expired buckets by comparing the current timestamp with + // timestamp of the last bucket. + private void refreshBucketList(Timestamp now) { + if (buckets.size() != N + 1) { + throw new AssertionError("Bucket list must have exactly " + (N + 1) + " buckets."); + } + Timestamp startOfLastBucket = + CheckerFrameworkUtils.castNonNull(buckets.peekLast()).getStart(); + // TODO(songya): decide what to do when time goes backwards + checkArgument( + now.compareTo(startOfLastBucket) >= 0, + "Current time must be within or after the last bucket."); + long elapsedTimeMillis = now.subtractTimestamp(startOfLastBucket).toMillis(); + long numOfPadBuckets = elapsedTimeMillis / bucketDuration.toMillis(); + + shiftBucketList(numOfPadBuckets, now); + } + + // Add specified number of new buckets, and remove expired buckets + private void shiftBucketList(long numOfPadBuckets, Timestamp now) { + Timestamp startOfNewBucket; + + if (!buckets.isEmpty()) { + startOfNewBucket = + CheckerFrameworkUtils.castNonNull(buckets.peekLast()) + .getStart() + .addDuration(bucketDuration); + } else { + // Initialize bucket list. Should only enter this block once. + startOfNewBucket = subtractDuration(now, totalDuration); + } + + if (numOfPadBuckets > N + 1) { + // All current buckets expired, need to add N + 1 new buckets. The start time of the latest + // bucket will be current time. + startOfNewBucket = subtractDuration(now, totalDuration); + numOfPadBuckets = N + 1; + } + + for (int i = 0; i < numOfPadBuckets; i++) { + buckets.add( + new IntervalBucket( + startOfNewBucket, + bucketDuration, + super.view.getAggregation(), + super.view.getMeasure())); + startOfNewBucket = startOfNewBucket.addDuration(bucketDuration); + } + + // removed expired buckets + while (buckets.size() > N + 1) { + buckets.pollFirst(); + } + } + + // Combine stats within each bucket, aggregate stats by tag values, and return the mapping from + // tag values to aggregation data. + private Map<List</*@Nullable*/ TagValue>, AggregationData> combineBucketsAndGetAggregationMap( + Timestamp now) { + // Need to maintain the order of inserted MutableAggregations (inserted based on time order). + Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap = + LinkedHashMultimap.create(); + + ArrayDeque<IntervalBucket> shallowCopy = new ArrayDeque<IntervalBucket>(buckets); + + Aggregation aggregation = super.view.getAggregation(); + Measure measure = super.view.getMeasure(); + putBucketsIntoMultiMap(shallowCopy, multimap, aggregation, measure, now); + Map<List</*@Nullable*/ TagValue>, MutableAggregation> singleMap = + aggregateOnEachTagValueList(multimap, aggregation, measure); + return createAggregationMap(singleMap, super.getView().getMeasure()); + } + + // Put stats within each bucket to a multimap. Each tag value list (map key) could have multiple + // mutable aggregations (map value) from different buckets. + private static void putBucketsIntoMultiMap( + ArrayDeque<IntervalBucket> buckets, + Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap, + Aggregation aggregation, + Measure measure, + Timestamp now) { + // Put fractional stats of the head (oldest) bucket. + IntervalBucket head = CheckerFrameworkUtils.castNonNull(buckets.peekFirst()); + IntervalBucket tail = CheckerFrameworkUtils.castNonNull(buckets.peekLast()); + double fractionTail = tail.getFraction(now); + // TODO(songya): decide what to do when time goes backwards + checkArgument( + 0.0 <= fractionTail && fractionTail <= 1.0, + "Fraction " + fractionTail + " should be within [0.0, 1.0]."); + double fractionHead = 1.0 - fractionTail; + putFractionalMutableAggregationsToMultiMap( + head.getTagValueAggregationMap(), multimap, aggregation, measure, fractionHead); + + // Put whole data of other buckets. + boolean shouldSkipFirst = true; + for (IntervalBucket bucket : buckets) { + if (shouldSkipFirst) { + shouldSkipFirst = false; + continue; // skip the first bucket + } + for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry : + bucket.getTagValueAggregationMap().entrySet()) { + multimap.put(entry.getKey(), entry.getValue()); + } + } + } + + // Put stats within one bucket into multimap, multiplied by a given fraction. + private static <T> void putFractionalMutableAggregationsToMultiMap( + Map<T, MutableAggregation> mutableAggrMap, + Multimap<T, MutableAggregation> multimap, + Aggregation aggregation, + Measure measure, + double fraction) { + for (Entry<T, MutableAggregation> entry : mutableAggrMap.entrySet()) { + // Initially empty MutableAggregations. + MutableAggregation fractionalMutableAgg = createMutableAggregation(aggregation, measure); + fractionalMutableAgg.combine(entry.getValue(), fraction); + multimap.put(entry.getKey(), fractionalMutableAgg); + } + } + + // For each tag value list (key of AggregationMap), combine mutable aggregations into one + // mutable aggregation, thus convert the multimap into a single map. + private static <T> Map<T, MutableAggregation> aggregateOnEachTagValueList( + Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure) { + Map<T, MutableAggregation> map = Maps.newHashMap(); + for (T tagValues : multimap.keySet()) { + // Initially empty MutableAggregations. + MutableAggregation combinedAggregation = createMutableAggregation(aggregation, measure); + for (MutableAggregation mutableAggregation : multimap.get(tagValues)) { + combinedAggregation.combine(mutableAggregation, 1.0); + } + map.put(tagValues, combinedAggregation); + } + return map; + } + + // Subtract a Duration from a Timestamp, and return a new Timestamp. + private static Timestamp subtractDuration(Timestamp timestamp, Duration duration) { + return timestamp.addDuration(Duration.create(-duration.getSeconds(), -duration.getNanos())); + } + } + + private static final class CreateCumulative + implements Function<View.AggregationWindow.Cumulative, MutableViewData> { + @Override + public MutableViewData apply(View.AggregationWindow.Cumulative arg) { + return new CumulativeMutableViewData(view, start); + } + + private final View view; + private final Timestamp start; + + private CreateCumulative(View view, Timestamp start) { + this.view = view; + this.start = start; + } + } + + private static final class CreateInterval + implements Function<View.AggregationWindow.Interval, MutableViewData> { + @Override + public MutableViewData apply(View.AggregationWindow.Interval arg) { + return new IntervalMutableViewData(view, start); + } + + private final View view; + private final Timestamp start; + + private CreateInterval(View view, Timestamp start) { + this.view = view; + this.start = start; + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java b/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java new file mode 100644 index 00000000..fbb593f5 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java @@ -0,0 +1,241 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import io.opencensus.common.Function; +import io.opencensus.common.Functions; +import io.opencensus.implcore.stats.MutableAggregation.MutableCount; +import io.opencensus.implcore.stats.MutableAggregation.MutableDistribution; +import io.opencensus.implcore.stats.MutableAggregation.MutableLastValueDouble; +import io.opencensus.implcore.stats.MutableAggregation.MutableLastValueLong; +import io.opencensus.implcore.stats.MutableAggregation.MutableMean; +import io.opencensus.implcore.stats.MutableAggregation.MutableSumDouble; +import io.opencensus.implcore.stats.MutableAggregation.MutableSumLong; +import io.opencensus.implcore.tags.TagContextImpl; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.Aggregation.Count; +import io.opencensus.stats.Aggregation.Distribution; +import io.opencensus.stats.Aggregation.LastValue; +import io.opencensus.stats.Aggregation.Sum; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measure.MeasureDouble; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.stats.Measurement; +import io.opencensus.stats.Measurement.MeasurementDouble; +import io.opencensus.stats.Measurement.MeasurementLong; +import io.opencensus.tags.InternalUtils; +import io.opencensus.tags.Tag; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/*>>> +import org.checkerframework.checker.nullness.qual.Nullable; +*/ + +@SuppressWarnings("deprecation") +/* Common static utilities for stats recording. */ +final class RecordUtils { + + @javax.annotation.Nullable @VisibleForTesting static final TagValue UNKNOWN_TAG_VALUE = null; + + static Map<TagKey, TagValue> getTagMap(TagContext ctx) { + if (ctx instanceof TagContextImpl) { + return ((TagContextImpl) ctx).getTags(); + } else { + Map<TagKey, TagValue> tags = Maps.newHashMap(); + for (Iterator<Tag> i = InternalUtils.getTags(ctx); i.hasNext(); ) { + Tag tag = i.next(); + tags.put(tag.getKey(), tag.getValue()); + } + return tags; + } + } + + @VisibleForTesting + static List</*@Nullable*/ TagValue> getTagValues( + Map<? extends TagKey, ? extends TagValue> tags, List<? extends TagKey> columns) { + List</*@Nullable*/ TagValue> tagValues = new ArrayList</*@Nullable*/ TagValue>(columns.size()); + // Record all the measures in a "Greedy" way. + // Every view aggregates every measure. This is similar to doing a GROUPBY view’s keys. + for (int i = 0; i < columns.size(); ++i) { + TagKey tagKey = columns.get(i); + if (!tags.containsKey(tagKey)) { + // replace not found key values by null. + tagValues.add(UNKNOWN_TAG_VALUE); + } else { + tagValues.add(tags.get(tagKey)); + } + } + return tagValues; + } + + /** + * Create an empty {@link MutableAggregation} based on the given {@link Aggregation}. + * + * @param aggregation {@code Aggregation}. + * @return an empty {@code MutableAggregation}. + */ + @VisibleForTesting + static MutableAggregation createMutableAggregation( + Aggregation aggregation, final Measure measure) { + return aggregation.match( + new Function<Sum, MutableAggregation>() { + @Override + public MutableAggregation apply(Sum arg) { + return measure.match( + CreateMutableSumDouble.INSTANCE, + CreateMutableSumLong.INSTANCE, + Functions.<MutableAggregation>throwAssertionError()); + } + }, + CreateMutableCount.INSTANCE, + CreateMutableDistribution.INSTANCE, + new Function<LastValue, MutableAggregation>() { + @Override + public MutableAggregation apply(LastValue arg) { + return measure.match( + CreateMutableLastValueDouble.INSTANCE, + CreateMutableLastValueLong.INSTANCE, + Functions.<MutableAggregation>throwAssertionError()); + } + }, + AggregationDefaultFunction.INSTANCE); + } + + // Covert a mapping from TagValues to MutableAggregation, to a mapping from TagValues to + // AggregationData. + static <T> Map<T, AggregationData> createAggregationMap( + Map<T, MutableAggregation> tagValueAggregationMap, Measure measure) { + Map<T, AggregationData> map = Maps.newHashMap(); + for (Entry<T, MutableAggregation> entry : tagValueAggregationMap.entrySet()) { + map.put(entry.getKey(), entry.getValue().toAggregationData()); + } + return map; + } + + static double getDoubleValueFromMeasurement(Measurement measurement) { + return measurement.match( + GET_VALUE_FROM_MEASUREMENT_DOUBLE, + GET_VALUE_FROM_MEASUREMENT_LONG, + Functions.<Double>throwAssertionError()); + } + + // static inner Function classes + + private static final Function<MeasurementDouble, Double> GET_VALUE_FROM_MEASUREMENT_DOUBLE = + new Function<MeasurementDouble, Double>() { + @Override + public Double apply(MeasurementDouble arg) { + return arg.getValue(); + } + }; + + private static final Function<MeasurementLong, Double> GET_VALUE_FROM_MEASUREMENT_LONG = + new Function<MeasurementLong, Double>() { + @Override + public Double apply(MeasurementLong arg) { + // TODO: consider checking truncation here. + return (double) arg.getValue(); + } + }; + + private static final class CreateMutableSumDouble + implements Function<MeasureDouble, MutableAggregation> { + @Override + public MutableAggregation apply(MeasureDouble arg) { + return MutableSumDouble.create(); + } + + private static final CreateMutableSumDouble INSTANCE = new CreateMutableSumDouble(); + } + + private static final class CreateMutableSumLong + implements Function<MeasureLong, MutableAggregation> { + @Override + public MutableAggregation apply(MeasureLong arg) { + return MutableSumLong.create(); + } + + private static final CreateMutableSumLong INSTANCE = new CreateMutableSumLong(); + } + + private static final class CreateMutableCount implements Function<Count, MutableAggregation> { + @Override + public MutableAggregation apply(Count arg) { + return MutableCount.create(); + } + + private static final CreateMutableCount INSTANCE = new CreateMutableCount(); + } + + // TODO(songya): remove this once Mean aggregation is completely removed. Before that + // we need to continue supporting Mean, since it could still be used by users and some + // deprecated RPC views. + private static final class AggregationDefaultFunction + implements Function<Aggregation, MutableAggregation> { + @Override + public MutableAggregation apply(Aggregation arg) { + if (arg instanceof Aggregation.Mean) { + return MutableMean.create(); + } + throw new IllegalArgumentException("Unknown Aggregation."); + } + + private static final AggregationDefaultFunction INSTANCE = new AggregationDefaultFunction(); + } + + private static final class CreateMutableDistribution + implements Function<Distribution, MutableAggregation> { + @Override + public MutableAggregation apply(Distribution arg) { + return MutableDistribution.create(arg.getBucketBoundaries()); + } + + private static final CreateMutableDistribution INSTANCE = new CreateMutableDistribution(); + } + + private static final class CreateMutableLastValueDouble + implements Function<MeasureDouble, MutableAggregation> { + @Override + public MutableAggregation apply(MeasureDouble arg) { + return MutableLastValueDouble.create(); + } + + private static final CreateMutableLastValueDouble INSTANCE = new CreateMutableLastValueDouble(); + } + + private static final class CreateMutableLastValueLong + implements Function<MeasureLong, MutableAggregation> { + @Override + public MutableAggregation apply(MeasureLong arg) { + return MutableLastValueLong.create(); + } + + private static final CreateMutableLastValueLong INSTANCE = new CreateMutableLastValueLong(); + } + + private RecordUtils() {} +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java new file mode 100644 index 00000000..741b399b --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java @@ -0,0 +1,92 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import com.google.common.base.Preconditions; +import io.opencensus.common.Clock; +import io.opencensus.implcore.internal.CurrentState; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.implcore.internal.EventQueue; +import io.opencensus.metrics.Metrics; +import io.opencensus.metrics.export.MetricProducer; +import io.opencensus.stats.StatsCollectionState; +import io.opencensus.stats.StatsComponent; + +/** Base implementation of {@link StatsComponent}. */ +public class StatsComponentImplBase extends StatsComponent { + private static final State DEFAULT_STATE = State.ENABLED; + + // The State shared between the StatsComponent, StatsRecorder and ViewManager. + private final CurrentState currentState = new CurrentState(DEFAULT_STATE); + + private final ViewManagerImpl viewManager; + private final StatsRecorderImpl statsRecorder; + + /** + * Creates a new {@code StatsComponentImplBase}. + * + * @param queue the queue implementation. + * @param clock the clock to use when recording stats. + */ + public StatsComponentImplBase(EventQueue queue, Clock clock) { + StatsManager statsManager = new StatsManager(queue, clock, currentState); + this.viewManager = new ViewManagerImpl(statsManager); + this.statsRecorder = new StatsRecorderImpl(statsManager); + + // Create a new MetricProducerImpl and register it to MetricProducerManager when + // StatsComponentImplBase is initialized. + MetricProducer metricProducer = new MetricProducerImpl(statsManager); + Metrics.getExportComponent().getMetricProducerManager().add(metricProducer); + } + + @Override + public ViewManagerImpl getViewManager() { + return viewManager; + } + + @Override + public StatsRecorderImpl getStatsRecorder() { + return statsRecorder; + } + + @Override + public StatsCollectionState getState() { + return stateToStatsState(currentState.get()); + } + + @Override + @SuppressWarnings("deprecation") + public synchronized void setState(StatsCollectionState newState) { + boolean stateChanged = + currentState.set(statsStateToState(Preconditions.checkNotNull(newState, "newState"))); + if (stateChanged) { + if (newState == StatsCollectionState.DISABLED) { + viewManager.clearStats(); + } else { + viewManager.resumeStatsCollection(); + } + } + } + + private static State statsStateToState(StatsCollectionState statsCollectionState) { + return statsCollectionState == StatsCollectionState.ENABLED ? State.ENABLED : State.DISABLED; + } + + private static StatsCollectionState stateToStatsState(State state) { + return state == State.ENABLED ? StatsCollectionState.ENABLED : StatsCollectionState.DISABLED; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java new file mode 100644 index 00000000..17e99d46 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java @@ -0,0 +1,104 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.opencensus.common.Clock; +import io.opencensus.implcore.internal.CurrentState; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.implcore.internal.EventQueue; +import io.opencensus.metrics.export.Metric; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagContext; +import java.util.Collection; +import java.util.Set; +import javax.annotation.Nullable; + +/** Object that stores all views and stats. */ +final class StatsManager { + + private final EventQueue queue; + + // clock used throughout the stats implementation + private final Clock clock; + + private final CurrentState state; + private final MeasureToViewMap measureToViewMap = new MeasureToViewMap(); + + StatsManager(EventQueue queue, Clock clock, CurrentState state) { + checkNotNull(queue, "EventQueue"); + checkNotNull(clock, "Clock"); + checkNotNull(state, "state"); + this.queue = queue; + this.clock = clock; + this.state = state; + } + + void registerView(View view) { + measureToViewMap.registerView(view, clock); + } + + @Nullable + ViewData getView(View.Name viewName) { + return measureToViewMap.getView(viewName, clock, state.getInternal()); + } + + Set<View> getExportedViews() { + return measureToViewMap.getExportedViews(); + } + + void record(TagContext tags, MeasureMapInternal measurementValues) { + // TODO(songya): consider exposing No-op MeasureMap and use it when stats state is DISABLED, so + // that we don't need to create actual MeasureMapImpl. + if (state.getInternal() == State.ENABLED) { + queue.enqueue(new StatsEvent(this, tags, measurementValues)); + } + } + + Collection<Metric> getMetrics() { + return measureToViewMap.getMetrics(clock, state.getInternal()); + } + + void clearStats() { + measureToViewMap.clearStats(); + } + + void resumeStatsCollection() { + measureToViewMap.resumeStatsCollection(clock.now()); + } + + // An EventQueue entry that records the stats from one call to StatsManager.record(...). + private static final class StatsEvent implements EventQueue.Entry { + private final TagContext tags; + private final MeasureMapInternal stats; + private final StatsManager statsManager; + + StatsEvent(StatsManager statsManager, TagContext tags, MeasureMapInternal stats) { + this.statsManager = statsManager; + this.tags = tags; + this.stats = stats; + } + + @Override + public void process() { + // Add Timestamp to value after it went through the DisruptorQueue. + statsManager.measureToViewMap.record(tags, stats, statsManager.clock.now()); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java new file mode 100644 index 00000000..f9ebea41 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.opencensus.stats.StatsRecorder; + +/** Implementation of {@link StatsRecorder}. */ +public final class StatsRecorderImpl extends StatsRecorder { + private final StatsManager statsManager; + + StatsRecorderImpl(StatsManager statsManager) { + checkNotNull(statsManager, "StatsManager"); + this.statsManager = statsManager; + } + + @Override + public MeasureMapImpl newMeasureMap() { + return MeasureMapImpl.create(statsManager); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java new file mode 100644 index 00000000..20ea97f8 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java @@ -0,0 +1,56 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.stats; + +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.stats.ViewManager; +import java.util.Set; +import javax.annotation.Nullable; + +/** Implementation of {@link ViewManager}. */ +public final class ViewManagerImpl extends ViewManager { + private final StatsManager statsManager; + + ViewManagerImpl(StatsManager statsManager) { + this.statsManager = statsManager; + } + + @Override + public void registerView(View view) { + statsManager.registerView(view); + } + + @Override + @Nullable + public ViewData getView(View.Name viewName) { + return statsManager.getView(viewName); + } + + @Override + public Set<View> getAllExportedViews() { + return statsManager.getExportedViews(); + } + + void clearStats() { + statsManager.clearStats(); + } + + void resumeStatsCollection() { + statsManager.resumeStatsCollection(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/CurrentTagContextUtils.java b/impl_core/src/main/java/io/opencensus/implcore/tags/CurrentTagContextUtils.java new file mode 100644 index 00000000..e6bb12f5 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/tags/CurrentTagContextUtils.java @@ -0,0 +1,71 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.tags; + +import io.grpc.Context; +import io.opencensus.common.Scope; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.unsafe.ContextUtils; + +/** + * Utility methods for accessing the {@link TagContext} contained in the {@link io.grpc.Context}. + */ +final class CurrentTagContextUtils { + + private CurrentTagContextUtils() {} + + /** + * Returns the {@link TagContext} from the current context. + * + * @return the {@code TagContext} from the current context. + */ + static TagContext getCurrentTagContext() { + return ContextUtils.TAG_CONTEXT_KEY.get(); + } + + /** + * Enters the scope of code where the given {@link TagContext} is in the current context and + * returns an object that represents that scope. The scope is exited when the returned object is + * closed. + * + * @param tags the {@code TagContext} to be set to the current context. + * @return an object that defines a scope where the given {@code TagContext} is set to the current + * context. + */ + static Scope withTagContext(TagContext tags) { + return new WithTagContext(tags); + } + + private static final class WithTagContext implements Scope { + + private final Context orig; + + /** + * Constructs a new {@link WithTagContext}. + * + * @param tags the {@code TagContext} to be added to the current {@code Context}. + */ + private WithTagContext(TagContext tags) { + orig = Context.current().withValue(ContextUtils.TAG_CONTEXT_KEY, tags).attach(); + } + + @Override + public void close() { + Context.current().detach(orig); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/NoopTagContextBuilder.java b/impl_core/src/main/java/io/opencensus/implcore/tags/NoopTagContextBuilder.java new file mode 100644 index 00000000..eae54c5d --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/tags/NoopTagContextBuilder.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.tags; + +import io.opencensus.common.Scope; +import io.opencensus.implcore.internal.NoopScope; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; + +/** {@link TagContextBuilder} that is used when tagging is disabled. */ +final class NoopTagContextBuilder extends TagContextBuilder { + static final NoopTagContextBuilder INSTANCE = new NoopTagContextBuilder(); + + private NoopTagContextBuilder() {} + + @Override + public TagContextBuilder put(TagKey key, TagValue value) { + return this; + } + + @Override + public TagContextBuilder remove(TagKey key) { + return this; + } + + @Override + public TagContext build() { + return TagContextImpl.EMPTY; + } + + @Override + public Scope buildScoped() { + return NoopScope.getInstance(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextBuilderImpl.java b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextBuilderImpl.java new file mode 100644 index 00000000..a17198d8 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextBuilderImpl.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.tags; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.opencensus.common.Scope; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.util.HashMap; +import java.util.Map; + +final class TagContextBuilderImpl extends TagContextBuilder { + private final Map<TagKey, TagValue> tags; + + TagContextBuilderImpl(Map<TagKey, TagValue> tags) { + this.tags = new HashMap<TagKey, TagValue>(tags); + } + + TagContextBuilderImpl() { + this.tags = new HashMap<TagKey, TagValue>(); + } + + @Override + public TagContextBuilderImpl put(TagKey key, TagValue value) { + tags.put(checkNotNull(key, "key"), checkNotNull(value, "value")); + return this; + } + + @Override + public TagContextBuilderImpl remove(TagKey key) { + tags.remove(checkNotNull(key, "key")); + return this; + } + + @Override + public TagContextImpl build() { + return new TagContextImpl(tags); + } + + @Override + public Scope buildScoped() { + return CurrentTagContextUtils.withTagContext(build()); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextImpl.java b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextImpl.java new file mode 100644 index 00000000..f7a8ff82 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextImpl.java @@ -0,0 +1,85 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.tags; + +import io.opencensus.tags.Tag; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +@Immutable +public final class TagContextImpl extends TagContext { + + public static final TagContextImpl EMPTY = + new TagContextImpl(Collections.<TagKey, TagValue>emptyMap()); + + // The types of the TagKey and value must match for each entry. + private final Map<TagKey, TagValue> tags; + + public TagContextImpl(Map<? extends TagKey, ? extends TagValue> tags) { + this.tags = Collections.unmodifiableMap(new HashMap<TagKey, TagValue>(tags)); + } + + public Map<TagKey, TagValue> getTags() { + return tags; + } + + @Override + protected Iterator<Tag> getIterator() { + return new TagIterator(tags); + } + + @Override + public boolean equals(@Nullable Object other) { + // Directly compare the tags when both objects are TagContextImpls, for efficiency. + if (other instanceof TagContextImpl) { + return getTags().equals(((TagContextImpl) other).getTags()); + } + return super.equals(other); + } + + private static final class TagIterator implements Iterator<Tag> { + Iterator<Map.Entry<TagKey, TagValue>> iterator; + + TagIterator(Map<TagKey, TagValue> tags) { + iterator = tags.entrySet().iterator(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Tag next() { + final Entry<TagKey, TagValue> next = iterator.next(); + return Tag.create(next.getKey(), next.getValue()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("TagIterator.remove()"); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextUtils.java b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextUtils.java new file mode 100644 index 00000000..5fbc5050 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextUtils.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.tags; + +import io.opencensus.tags.Tag; + +final class TagContextUtils { + private TagContextUtils() {} + + /** + * Add a {@code Tag} of any type to a builder. + * + * @param tag tag containing the key and value to set. + * @param builder the builder to update. + */ + static void addTagToBuilder(Tag tag, TagContextBuilderImpl builder) { + builder.put(tag.getKey(), tag.getValue()); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/TaggerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/tags/TaggerImpl.java new file mode 100644 index 00000000..dcf9a1b7 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/tags/TaggerImpl.java @@ -0,0 +1,116 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.tags; + +import io.opencensus.common.Scope; +import io.opencensus.implcore.internal.CurrentState; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.implcore.internal.NoopScope; +import io.opencensus.tags.InternalUtils; +import io.opencensus.tags.Tag; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.Tagger; +import java.util.Iterator; + +/** Implementation of {@link Tagger}. */ +public final class TaggerImpl extends Tagger { + // All methods in this class use TagContextImpl and TagContextBuilderImpl. For example, + // withTagContext(...) always puts a TagContextImpl into scope, even if the argument is another + // TagContext subclass. + + private final CurrentState state; + + TaggerImpl(CurrentState state) { + this.state = state; + } + + @Override + public TagContextImpl empty() { + return TagContextImpl.EMPTY; + } + + @Override + public TagContextImpl getCurrentTagContext() { + return state.getInternal() == State.DISABLED + ? TagContextImpl.EMPTY + : toTagContextImpl(CurrentTagContextUtils.getCurrentTagContext()); + } + + @Override + public TagContextBuilder emptyBuilder() { + return state.getInternal() == State.DISABLED + ? NoopTagContextBuilder.INSTANCE + : new TagContextBuilderImpl(); + } + + @Override + public TagContextBuilder currentBuilder() { + return state.getInternal() == State.DISABLED + ? NoopTagContextBuilder.INSTANCE + : toBuilder(CurrentTagContextUtils.getCurrentTagContext()); + } + + @Override + public TagContextBuilder toBuilder(TagContext tags) { + return state.getInternal() == State.DISABLED + ? NoopTagContextBuilder.INSTANCE + : toTagContextBuilderImpl(tags); + } + + @Override + public Scope withTagContext(TagContext tags) { + return state.getInternal() == State.DISABLED + ? NoopScope.getInstance() + : CurrentTagContextUtils.withTagContext(toTagContextImpl(tags)); + } + + private static TagContextImpl toTagContextImpl(TagContext tags) { + if (tags instanceof TagContextImpl) { + return (TagContextImpl) tags; + } else { + Iterator<Tag> i = InternalUtils.getTags(tags); + if (!i.hasNext()) { + return TagContextImpl.EMPTY; + } + TagContextBuilderImpl builder = new TagContextBuilderImpl(); + while (i.hasNext()) { + Tag tag = i.next(); + if (tag != null) { + TagContextUtils.addTagToBuilder(tag, builder); + } + } + return builder.build(); + } + } + + private static TagContextBuilderImpl toTagContextBuilderImpl(TagContext tags) { + // Copy the tags more efficiently in the expected case, when the TagContext is a TagContextImpl. + if (tags instanceof TagContextImpl) { + return new TagContextBuilderImpl(((TagContextImpl) tags).getTags()); + } else { + TagContextBuilderImpl builder = new TagContextBuilderImpl(); + for (Iterator<Tag> i = InternalUtils.getTags(tags); i.hasNext(); ) { + Tag tag = i.next(); + if (tag != null) { + TagContextUtils.addTagToBuilder(tag, builder); + } + } + return builder; + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/TagsComponentImplBase.java b/impl_core/src/main/java/io/opencensus/implcore/tags/TagsComponentImplBase.java new file mode 100644 index 00000000..88c31bae --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/tags/TagsComponentImplBase.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.tags; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.opencensus.implcore.internal.CurrentState; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.implcore.tags.propagation.TagPropagationComponentImpl; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.TaggingState; +import io.opencensus.tags.TagsComponent; +import io.opencensus.tags.propagation.TagPropagationComponent; + +/** Base implementation of {@link TagsComponent}. */ +public class TagsComponentImplBase extends TagsComponent { + private static final State DEFAULT_STATE = State.ENABLED; + + // The State shared between the TagsComponent, Tagger, and TagPropagationComponent + private final CurrentState currentState = new CurrentState(DEFAULT_STATE); + + private final Tagger tagger = new TaggerImpl(currentState); + private final TagPropagationComponent tagPropagationComponent = + new TagPropagationComponentImpl(currentState); + + @Override + public Tagger getTagger() { + return tagger; + } + + @Override + public TagPropagationComponent getTagPropagationComponent() { + return tagPropagationComponent; + } + + @Override + public TaggingState getState() { + return stateToTaggingState(currentState.get()); + } + + @Override + @Deprecated + public void setState(TaggingState newState) { + currentState.set(taggingStateToState(checkNotNull(newState, "newState"))); + } + + private static State taggingStateToState(TaggingState taggingState) { + return taggingState == TaggingState.ENABLED ? State.ENABLED : State.DISABLED; + } + + private static TaggingState stateToTaggingState(State state) { + return state == State.ENABLED ? TaggingState.ENABLED : TaggingState.DISABLED; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/SerializationUtils.java b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/SerializationUtils.java new file mode 100644 index 00000000..2daad95e --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/SerializationUtils.java @@ -0,0 +1,190 @@ +/* + * Copyright 2016-17, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.tags.propagation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import io.opencensus.implcore.internal.VarInt; +import io.opencensus.implcore.tags.TagContextImpl; +import io.opencensus.tags.InternalUtils; +import io.opencensus.tags.Tag; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.propagation.TagContextDeserializationException; +import io.opencensus.tags.propagation.TagContextSerializationException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Methods for serializing and deserializing {@link TagContext}s. + * + * <p>The format defined in this class is shared across all implementations of OpenCensus. It allows + * tags to propagate across requests. + * + * <p>OpenCensus tag context encoding: + * + * <ul> + * <li>Tags are encoded in single byte sequence. The version 0 format is: + * <li>{@code <version_id><encoded_tags>} + * <li>{@code <version_id> == a single byte, value 0} + * <li>{@code <encoded_tags> == (<tag_field_id><tag_encoding>)*} + * <ul> + * <li>{@code <tag_field_id>} == a single byte, value 0 + * <li>{@code <tag_encoding>}: + * <ul> + * <li>{@code <tag_key_len><tag_key><tag_val_len><tag_val>} + * <ul> + * <li>{@code <tag_key_len>} == varint encoded integer + * <li>{@code <tag_key>} == tag_key_len bytes comprising tag key name + * <li>{@code <tag_val_len>} == varint encoded integer + * <li>{@code <tag_val>} == tag_val_len bytes comprising UTF-8 string + * </ul> + * </ul> + * </ul> + * </ul> + */ +final class SerializationUtils { + private SerializationUtils() {} + + @VisibleForTesting static final int VERSION_ID = 0; + @VisibleForTesting static final int TAG_FIELD_ID = 0; + // This size limit only applies to the bytes representing tag keys and values. + @VisibleForTesting static final int TAGCONTEXT_SERIALIZED_SIZE_LIMIT = 8192; + + // Serializes a TagContext to the on-the-wire format. + // Encoded tags are of the form: <version_id><encoded_tags> + static byte[] serializeBinary(TagContext tags) throws TagContextSerializationException { + // Use a ByteArrayDataOutput to avoid needing to handle IOExceptions. + final ByteArrayDataOutput byteArrayDataOutput = ByteStreams.newDataOutput(); + byteArrayDataOutput.write(VERSION_ID); + int totalChars = 0; // Here chars are equivalent to bytes, since we're using ascii chars. + for (Iterator<Tag> i = InternalUtils.getTags(tags); i.hasNext(); ) { + Tag tag = i.next(); + totalChars += tag.getKey().getName().length(); + totalChars += tag.getValue().asString().length(); + encodeTag(tag, byteArrayDataOutput); + } + if (totalChars > TAGCONTEXT_SERIALIZED_SIZE_LIMIT) { + throw new TagContextSerializationException( + "Size of TagContext exceeds the maximum serialized size " + + TAGCONTEXT_SERIALIZED_SIZE_LIMIT); + } + return byteArrayDataOutput.toByteArray(); + } + + // Deserializes input to TagContext based on the binary format standard. + // The encoded tags are of the form: <version_id><encoded_tags> + static TagContextImpl deserializeBinary(byte[] bytes) throws TagContextDeserializationException { + try { + if (bytes.length == 0) { + // Does not allow empty byte array. + throw new TagContextDeserializationException("Input byte[] can not be empty."); + } + + ByteBuffer buffer = ByteBuffer.wrap(bytes).asReadOnlyBuffer(); + int versionId = buffer.get(); + if (versionId > VERSION_ID || versionId < 0) { + throw new TagContextDeserializationException( + "Wrong Version ID: " + versionId + ". Currently supports version up to: " + VERSION_ID); + } + return new TagContextImpl(parseTags(buffer)); + } catch (BufferUnderflowException exn) { + throw new TagContextDeserializationException(exn.toString()); // byte array format error. + } + } + + private static Map<TagKey, TagValue> parseTags(ByteBuffer buffer) + throws TagContextDeserializationException { + Map<TagKey, TagValue> tags = new HashMap<TagKey, TagValue>(); + int limit = buffer.limit(); + int totalChars = 0; // Here chars are equivalent to bytes, since we're using ascii chars. + while (buffer.position() < limit) { + int type = buffer.get(); + if (type == TAG_FIELD_ID) { + TagKey key = createTagKey(decodeString(buffer)); + TagValue val = createTagValue(key, decodeString(buffer)); + totalChars += key.getName().length(); + totalChars += val.asString().length(); + tags.put(key, val); + } else { + // Stop parsing at the first unknown field ID, since there is no way to know its length. + // TODO(sebright): Consider storing the rest of the byte array in the TagContext. + break; + } + } + if (totalChars > TAGCONTEXT_SERIALIZED_SIZE_LIMIT) { + throw new TagContextDeserializationException( + "Size of TagContext exceeds the maximum serialized size " + + TAGCONTEXT_SERIALIZED_SIZE_LIMIT); + } + return tags; + } + + // TODO(sebright): Consider exposing a TagKey name validation method to avoid needing to catch an + // IllegalArgumentException here. + private static final TagKey createTagKey(String name) throws TagContextDeserializationException { + try { + return TagKey.create(name); + } catch (IllegalArgumentException e) { + throw new TagContextDeserializationException("Invalid tag key: " + name, e); + } + } + + // TODO(sebright): Consider exposing a TagValue validation method to avoid needing to catch + // an IllegalArgumentException here. + private static final TagValue createTagValue(TagKey key, String value) + throws TagContextDeserializationException { + try { + return TagValue.create(value); + } catch (IllegalArgumentException e) { + throw new TagContextDeserializationException( + "Invalid tag value for key " + key + ": " + value, e); + } + } + + private static final void encodeTag(Tag tag, ByteArrayDataOutput byteArrayDataOutput) { + byteArrayDataOutput.write(TAG_FIELD_ID); + encodeString(tag.getKey().getName(), byteArrayDataOutput); + encodeString(tag.getValue().asString(), byteArrayDataOutput); + } + + private static final void encodeString(String input, ByteArrayDataOutput byteArrayDataOutput) { + putVarInt(input.length(), byteArrayDataOutput); + byteArrayDataOutput.write(input.getBytes(Charsets.UTF_8)); + } + + private static final void putVarInt(int input, ByteArrayDataOutput byteArrayDataOutput) { + byte[] output = new byte[VarInt.varIntSize(input)]; + VarInt.putVarInt(input, output, 0); + byteArrayDataOutput.write(output); + } + + private static final String decodeString(ByteBuffer buffer) { + int length = VarInt.getVarInt(buffer); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < length; i++) { + builder.append((char) buffer.get()); + } + return builder.toString(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagContextBinarySerializerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagContextBinarySerializerImpl.java new file mode 100644 index 00000000..5a25da5b --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagContextBinarySerializerImpl.java @@ -0,0 +1,49 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.tags.propagation; + +import io.opencensus.implcore.internal.CurrentState; +import io.opencensus.implcore.internal.CurrentState.State; +import io.opencensus.implcore.tags.TagContextImpl; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextDeserializationException; +import io.opencensus.tags.propagation.TagContextSerializationException; + +final class TagContextBinarySerializerImpl extends TagContextBinarySerializer { + private static final byte[] EMPTY_BYTE_ARRAY = {}; + + private final CurrentState state; + + TagContextBinarySerializerImpl(CurrentState state) { + this.state = state; + } + + @Override + public byte[] toByteArray(TagContext tags) throws TagContextSerializationException { + return state.getInternal() == State.DISABLED + ? EMPTY_BYTE_ARRAY + : SerializationUtils.serializeBinary(tags); + } + + @Override + public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationException { + return state.getInternal() == State.DISABLED + ? TagContextImpl.EMPTY + : SerializationUtils.deserializeBinary(bytes); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagPropagationComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagPropagationComponentImpl.java new file mode 100644 index 00000000..9ba0da40 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagPropagationComponentImpl.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.tags.propagation; + +import io.opencensus.implcore.internal.CurrentState; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagPropagationComponent; + +/** Implementation of {@link TagPropagationComponent}. */ +public final class TagPropagationComponentImpl extends TagPropagationComponent { + private final TagContextBinarySerializer tagContextBinarySerializer; + + public TagPropagationComponentImpl(CurrentState state) { + tagContextBinarySerializer = new TagContextBinarySerializerImpl(state); + } + + @Override + public TagContextBinarySerializer getBinarySerializer() { + return tagContextBinarySerializer; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/NoRecordEventsSpanImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/NoRecordEventsSpanImpl.java new file mode 100644 index 00000000..8a5f8e05 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/NoRecordEventsSpanImpl.java @@ -0,0 +1,85 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace; + +import com.google.common.base.Preconditions; +import io.opencensus.trace.Annotation; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.EndSpanOptions; +import io.opencensus.trace.Link; +import io.opencensus.trace.Span; +import io.opencensus.trace.SpanContext; +import io.opencensus.trace.Status; +import java.util.EnumSet; +import java.util.Map; + +/** Implementation for the {@link Span} class that does not record trace events. */ +final class NoRecordEventsSpanImpl extends Span { + + private static final EnumSet<Options> NOT_RECORD_EVENTS_SPAN_OPTIONS = + EnumSet.noneOf(Span.Options.class); + + static NoRecordEventsSpanImpl create(SpanContext context) { + return new NoRecordEventsSpanImpl(context); + } + + @Override + public void addAnnotation(String description, Map<String, AttributeValue> attributes) { + Preconditions.checkNotNull(description, "description"); + Preconditions.checkNotNull(attributes, "attribute"); + } + + @Override + public void addAnnotation(Annotation annotation) { + Preconditions.checkNotNull(annotation, "annotation"); + } + + @Override + public void putAttribute(String key, AttributeValue value) { + Preconditions.checkNotNull(key, "key"); + Preconditions.checkNotNull(value, "value"); + } + + @Override + public void putAttributes(Map<String, AttributeValue> attributes) { + Preconditions.checkNotNull(attributes, "attributes"); + } + + @Override + public void addMessageEvent(io.opencensus.trace.MessageEvent messageEvent) { + Preconditions.checkNotNull(messageEvent, "messageEvent"); + } + + @Override + public void addLink(Link link) { + Preconditions.checkNotNull(link, "link"); + } + + @Override + public void setStatus(Status status) { + Preconditions.checkNotNull(status, "status"); + } + + @Override + public void end(EndSpanOptions options) { + Preconditions.checkNotNull(options, "options"); + } + + private NoRecordEventsSpanImpl(SpanContext context) { + super(context, NOT_RECORD_EVENTS_SPAN_OPTIONS); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/RecordEventsSpanImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/RecordEventsSpanImpl.java new file mode 100644 index 00000000..af3545bc --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/RecordEventsSpanImpl.java @@ -0,0 +1,579 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.EvictingQueue; +import io.opencensus.common.Clock; +import io.opencensus.implcore.internal.CheckerFrameworkUtils; +import io.opencensus.implcore.internal.TimestampConverter; +import io.opencensus.implcore.trace.internal.ConcurrentIntrusiveList.Element; +import io.opencensus.trace.Annotation; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.EndSpanOptions; +import io.opencensus.trace.Link; +import io.opencensus.trace.Span; +import io.opencensus.trace.SpanContext; +import io.opencensus.trace.SpanId; +import io.opencensus.trace.Status; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.config.TraceParams; +import io.opencensus.trace.export.SpanData; +import io.opencensus.trace.export.SpanData.TimedEvent; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +// TODO(hailongwen): remove the usage of `NetworkEvent` in the future. +/** Implementation for the {@link Span} class that records trace events. */ +@ThreadSafe +public final class RecordEventsSpanImpl extends Span implements Element<RecordEventsSpanImpl> { + private static final Logger logger = Logger.getLogger(Tracer.class.getName()); + + private static final EnumSet<Span.Options> RECORD_EVENTS_SPAN_OPTIONS = + EnumSet.of(Span.Options.RECORD_EVENTS); + + // The parent SpanId of this span. Null if this is a root span. + @Nullable private final SpanId parentSpanId; + // True if the parent is on a different process. + @Nullable private final Boolean hasRemoteParent; + // Active trace params when the Span was created. + private final TraceParams traceParams; + // Handler called when the span starts and ends. + private final StartEndHandler startEndHandler; + // The displayed name of the span. + private final String name; + // The kind of the span. + @Nullable private final Kind kind; + // The clock used to get the time. + private final Clock clock; + // The time converter used to convert nano time to Timestamp. This is needed because Java has + // millisecond granularity for Timestamp and tracing events are recorded more often. + @Nullable private final TimestampConverter timestampConverter; + // The start time of the span. + private final long startNanoTime; + // Set of recorded attributes. DO NOT CALL any other method that changes the ordering of events. + @GuardedBy("this") + @Nullable + private AttributesWithCapacity attributes; + // List of recorded annotations. + @GuardedBy("this") + @Nullable + private TraceEvents<EventWithNanoTime<Annotation>> annotations; + // List of recorded network events. + @GuardedBy("this") + @Nullable + private TraceEvents<EventWithNanoTime<io.opencensus.trace.MessageEvent>> messageEvents; + // List of recorded links to parent and child spans. + @GuardedBy("this") + @Nullable + private TraceEvents<Link> links; + // The status of the span. + @GuardedBy("this") + @Nullable + private Status status; + // The end time of the span. + @GuardedBy("this") + private long endNanoTime; + // True if the span is ended. + @GuardedBy("this") + private boolean hasBeenEnded; + + @GuardedBy("this") + private boolean sampleToLocalSpanStore; + + // Pointers for the ConcurrentIntrusiveList$Element. Guarded by the ConcurrentIntrusiveList. + @Nullable private RecordEventsSpanImpl next = null; + @Nullable private RecordEventsSpanImpl prev = null; + + /** + * Creates and starts a span with the given configuration. + * + * @param context supplies the trace_id and span_id for the newly started span. + * @param name the displayed name for the new span. + * @param parentSpanId the span_id of the parent span, or null if the new span is a root span. + * @param hasRemoteParent {@code true} if the parentContext is remote. {@code null} if this is a + * root span. + * @param traceParams trace parameters like sampler and probability. + * @param startEndHandler handler called when the span starts and ends. + * @param timestampConverter null if the span is a root span or the parent is not sampled. If the + * parent is sampled, we should use the same converter to ensure ordering between tracing + * events. + * @param clock the clock used to get the time. + * @return a new and started span. + */ + @VisibleForTesting + public static RecordEventsSpanImpl startSpan( + SpanContext context, + String name, + @Nullable Kind kind, + @Nullable SpanId parentSpanId, + @Nullable Boolean hasRemoteParent, + TraceParams traceParams, + StartEndHandler startEndHandler, + @Nullable TimestampConverter timestampConverter, + Clock clock) { + RecordEventsSpanImpl span = + new RecordEventsSpanImpl( + context, + name, + kind, + parentSpanId, + hasRemoteParent, + traceParams, + startEndHandler, + timestampConverter, + clock); + // Call onStart here instead of calling in the constructor to make sure the span is completely + // initialized. + startEndHandler.onStart(span); + return span; + } + + /** + * Returns the name of the {@code Span}. + * + * @return the name of the {@code Span}. + */ + public String getName() { + return name; + } + + /** + * Returns the status of the {@code Span}. If not set defaults to {@link Status#OK}. + * + * @return the status of the {@code Span}. + */ + public Status getStatus() { + synchronized (this) { + return getStatusWithDefault(); + } + } + + /** + * Returns the end nano time (see {@link System#nanoTime()}). If the current {@code Span} is not + * ended then returns {@link Clock#nowNanos()}. + * + * @return the end nano time. + */ + public long getEndNanoTime() { + synchronized (this) { + return hasBeenEnded ? endNanoTime : clock.nowNanos(); + } + } + + /** + * Returns the latency of the {@code Span} in nanos. If still active then returns now() - start + * time. + * + * @return the latency of the {@code Span} in nanos. + */ + public long getLatencyNs() { + synchronized (this) { + return hasBeenEnded ? endNanoTime - startNanoTime : clock.nowNanos() - startNanoTime; + } + } + + /** + * Returns if the name of this {@code Span} must be register to the {@code SampledSpanStore}. + * + * @return if the name of this {@code Span} must be register to the {@code SampledSpanStore}. + */ + public boolean getSampleToLocalSpanStore() { + synchronized (this) { + checkState(hasBeenEnded, "Running span does not have the SampleToLocalSpanStore set."); + return sampleToLocalSpanStore; + } + } + + /** + * Returns the kind of this {@code Span}. + * + * @return the kind of this {@code Span}. + */ + @Nullable + public Kind getKind() { + return kind; + } + + /** + * Returns the {@code TimestampConverter} used by this {@code Span}. + * + * @return the {@code TimestampConverter} used by this {@code Span}. + */ + @Nullable + TimestampConverter getTimestampConverter() { + return timestampConverter; + } + + /** + * Returns an immutable representation of all the data from this {@code Span}. + * + * @return an immutable representation of all the data from this {@code Span}. + * @throws IllegalStateException if the Span doesn't have RECORD_EVENTS option. + */ + public SpanData toSpanData() { + synchronized (this) { + SpanData.Attributes attributesSpanData = + attributes == null + ? SpanData.Attributes.create(Collections.<String, AttributeValue>emptyMap(), 0) + : SpanData.Attributes.create(attributes, attributes.getNumberOfDroppedAttributes()); + SpanData.TimedEvents<Annotation> annotationsSpanData = + createTimedEvents(getInitializedAnnotations(), timestampConverter); + SpanData.TimedEvents<io.opencensus.trace.MessageEvent> messageEventsSpanData = + createTimedEvents(getInitializedNetworkEvents(), timestampConverter); + SpanData.Links linksSpanData = + links == null + ? SpanData.Links.create(Collections.<Link>emptyList(), 0) + : SpanData.Links.create( + new ArrayList<Link>(links.events), links.getNumberOfDroppedEvents()); + return SpanData.create( + getContext(), + parentSpanId, + hasRemoteParent, + name, + kind, + CheckerFrameworkUtils.castNonNull(timestampConverter).convertNanoTime(startNanoTime), + attributesSpanData, + annotationsSpanData, + messageEventsSpanData, + linksSpanData, + null, // Not supported yet. + hasBeenEnded ? getStatusWithDefault() : null, + hasBeenEnded + ? CheckerFrameworkUtils.castNonNull(timestampConverter).convertNanoTime(endNanoTime) + : null); + } + } + + @Override + public void putAttribute(String key, AttributeValue value) { + Preconditions.checkNotNull(key, "key"); + Preconditions.checkNotNull(value, "value"); + synchronized (this) { + if (hasBeenEnded) { + logger.log(Level.FINE, "Calling putAttributes() on an ended Span."); + return; + } + getInitializedAttributes().putAttribute(key, value); + } + } + + @Override + public void putAttributes(Map<String, AttributeValue> attributes) { + Preconditions.checkNotNull(attributes, "attributes"); + synchronized (this) { + if (hasBeenEnded) { + logger.log(Level.FINE, "Calling putAttributes() on an ended Span."); + return; + } + getInitializedAttributes().putAttributes(attributes); + } + } + + @Override + public void addAnnotation(String description, Map<String, AttributeValue> attributes) { + Preconditions.checkNotNull(description, "description"); + Preconditions.checkNotNull(attributes, "attribute"); + synchronized (this) { + if (hasBeenEnded) { + logger.log(Level.FINE, "Calling addAnnotation() on an ended Span."); + return; + } + getInitializedAnnotations() + .addEvent( + new EventWithNanoTime<Annotation>( + clock.nowNanos(), + Annotation.fromDescriptionAndAttributes(description, attributes))); + } + } + + @Override + public void addAnnotation(Annotation annotation) { + Preconditions.checkNotNull(annotation, "annotation"); + synchronized (this) { + if (hasBeenEnded) { + logger.log(Level.FINE, "Calling addAnnotation() on an ended Span."); + return; + } + getInitializedAnnotations() + .addEvent(new EventWithNanoTime<Annotation>(clock.nowNanos(), annotation)); + } + } + + @Override + public void addMessageEvent(io.opencensus.trace.MessageEvent messageEvent) { + Preconditions.checkNotNull(messageEvent, "messageEvent"); + synchronized (this) { + if (hasBeenEnded) { + logger.log(Level.FINE, "Calling addNetworkEvent() on an ended Span."); + return; + } + getInitializedNetworkEvents() + .addEvent( + new EventWithNanoTime<io.opencensus.trace.MessageEvent>( + clock.nowNanos(), checkNotNull(messageEvent, "networkEvent"))); + } + } + + @Override + public void addLink(Link link) { + Preconditions.checkNotNull(link, "link"); + synchronized (this) { + if (hasBeenEnded) { + logger.log(Level.FINE, "Calling addLink() on an ended Span."); + return; + } + getInitializedLinks().addEvent(link); + } + } + + @Override + public void setStatus(Status status) { + Preconditions.checkNotNull(status, "status"); + synchronized (this) { + if (hasBeenEnded) { + logger.log(Level.FINE, "Calling setStatus() on an ended Span."); + return; + } + this.status = status; + } + } + + @Override + public void end(EndSpanOptions options) { + Preconditions.checkNotNull(options, "options"); + synchronized (this) { + if (hasBeenEnded) { + logger.log(Level.FINE, "Calling end() on an ended Span."); + return; + } + if (options.getStatus() != null) { + status = options.getStatus(); + } + sampleToLocalSpanStore = options.getSampleToLocalSpanStore(); + endNanoTime = clock.nowNanos(); + hasBeenEnded = true; + } + startEndHandler.onEnd(this); + } + + @GuardedBy("this") + private AttributesWithCapacity getInitializedAttributes() { + if (attributes == null) { + attributes = new AttributesWithCapacity(traceParams.getMaxNumberOfAttributes()); + } + return attributes; + } + + @GuardedBy("this") + private TraceEvents<EventWithNanoTime<Annotation>> getInitializedAnnotations() { + if (annotations == null) { + annotations = + new TraceEvents<EventWithNanoTime<Annotation>>(traceParams.getMaxNumberOfAnnotations()); + } + return annotations; + } + + @GuardedBy("this") + private TraceEvents<EventWithNanoTime<io.opencensus.trace.MessageEvent>> + getInitializedNetworkEvents() { + if (messageEvents == null) { + messageEvents = + new TraceEvents<EventWithNanoTime<io.opencensus.trace.MessageEvent>>( + traceParams.getMaxNumberOfMessageEvents()); + } + return messageEvents; + } + + @GuardedBy("this") + private TraceEvents<Link> getInitializedLinks() { + if (links == null) { + links = new TraceEvents<Link>(traceParams.getMaxNumberOfLinks()); + } + return links; + } + + @GuardedBy("this") + private Status getStatusWithDefault() { + return status == null ? Status.OK : status; + } + + private static <T> SpanData.TimedEvents<T> createTimedEvents( + TraceEvents<EventWithNanoTime<T>> events, @Nullable TimestampConverter timestampConverter) { + if (events == null) { + return SpanData.TimedEvents.create(Collections.<TimedEvent<T>>emptyList(), 0); + } + List<TimedEvent<T>> eventsList = new ArrayList<TimedEvent<T>>(events.events.size()); + for (EventWithNanoTime<T> networkEvent : events.events) { + eventsList.add( + networkEvent.toSpanDataTimedEvent(CheckerFrameworkUtils.castNonNull(timestampConverter))); + } + return SpanData.TimedEvents.create(eventsList, events.getNumberOfDroppedEvents()); + } + + @Override + @Nullable + public RecordEventsSpanImpl getNext() { + return next; + } + + @Override + public void setNext(@Nullable RecordEventsSpanImpl element) { + next = element; + } + + @Override + @Nullable + public RecordEventsSpanImpl getPrev() { + return prev; + } + + @Override + public void setPrev(@Nullable RecordEventsSpanImpl element) { + prev = element; + } + + /** + * Interface to handle the start and end operations for a {@link Span} only when the {@code Span} + * has {@link Options#RECORD_EVENTS} option. + * + * <p>Implementation must avoid high overhead work in any of the methods because the code is + * executed on the critical path. + * + * <p>One instance can be called by multiple threads in the same time, so the implementation must + * be thread-safe. + */ + public interface StartEndHandler { + void onStart(RecordEventsSpanImpl span); + + void onEnd(RecordEventsSpanImpl span); + } + + // A map implementation with a fixed capacity that drops events when the map gets full. Eviction + // is based on the access order. + private static final class AttributesWithCapacity extends LinkedHashMap<String, AttributeValue> { + private final int capacity; + private int totalRecordedAttributes = 0; + // Here because -Werror complains about this: [serial] serializable class AttributesWithCapacity + // has no definition of serialVersionUID. This class shouldn't be serialized. + private static final long serialVersionUID = 42L; + + private AttributesWithCapacity(int capacity) { + // Capacity of the map is capacity + 1 to avoid resizing because removeEldestEntry is invoked + // by put and putAll after inserting a new entry into the map. The loadFactor is set to 1 + // to avoid resizing because. The accessOrder is set to true. + super(capacity + 1, 1, /*accessOrder=*/ true); + this.capacity = capacity; + } + + // Users must call this method instead of put to keep count of the total number of entries + // inserted. + private void putAttribute(String key, AttributeValue value) { + totalRecordedAttributes += 1; + put(key, value); + } + + // Users must call this method instead of putAll to keep count of the total number of entries + // inserted. + private void putAttributes(Map<String, AttributeValue> attributes) { + totalRecordedAttributes += attributes.size(); + putAll(attributes); + } + + private int getNumberOfDroppedAttributes() { + return totalRecordedAttributes - size(); + } + + // It is called after each put or putAll call in order to determine if the eldest inserted + // entry should be removed or not. + @Override + protected boolean removeEldestEntry(Map.Entry<String, AttributeValue> eldest) { + return size() > this.capacity; + } + } + + private static final class TraceEvents<T> { + private int totalRecordedEvents = 0; + private final EvictingQueue<T> events; + + private int getNumberOfDroppedEvents() { + return totalRecordedEvents - events.size(); + } + + TraceEvents(int maxNumEvents) { + events = EvictingQueue.create(maxNumEvents); + } + + void addEvent(T event) { + totalRecordedEvents++; + events.add(event); + } + } + + // Timed event that uses nanoTime to represent the Timestamp. + private static final class EventWithNanoTime<T> { + private final long nanoTime; + private final T event; + + private EventWithNanoTime(long nanoTime, T event) { + this.nanoTime = nanoTime; + this.event = event; + } + + private TimedEvent<T> toSpanDataTimedEvent(TimestampConverter timestampConverter) { + return TimedEvent.create(timestampConverter.convertNanoTime(nanoTime), event); + } + } + + private RecordEventsSpanImpl( + SpanContext context, + String name, + @Nullable Kind kind, + @Nullable SpanId parentSpanId, + @Nullable Boolean hasRemoteParent, + TraceParams traceParams, + StartEndHandler startEndHandler, + @Nullable TimestampConverter timestampConverter, + Clock clock) { + super(context, RECORD_EVENTS_SPAN_OPTIONS); + this.parentSpanId = parentSpanId; + this.hasRemoteParent = hasRemoteParent; + this.name = name; + this.kind = kind; + this.traceParams = traceParams; + this.startEndHandler = startEndHandler; + this.clock = clock; + this.hasBeenEnded = false; + this.sampleToLocalSpanStore = false; + this.timestampConverter = + timestampConverter != null ? timestampConverter : TimestampConverter.now(clock); + startNanoTime = clock.nowNanos(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/SpanBuilderImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/SpanBuilderImpl.java new file mode 100644 index 00000000..5565e9de --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/SpanBuilderImpl.java @@ -0,0 +1,253 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.opencensus.common.Clock; +import io.opencensus.implcore.internal.TimestampConverter; +import io.opencensus.implcore.trace.internal.RandomHandler; +import io.opencensus.trace.Link; +import io.opencensus.trace.Link.Type; +import io.opencensus.trace.Sampler; +import io.opencensus.trace.Span; +import io.opencensus.trace.Span.Kind; +import io.opencensus.trace.SpanBuilder; +import io.opencensus.trace.SpanContext; +import io.opencensus.trace.SpanId; +import io.opencensus.trace.TraceId; +import io.opencensus.trace.TraceOptions; +import io.opencensus.trace.Tracestate; +import io.opencensus.trace.config.TraceConfig; +import io.opencensus.trace.config.TraceParams; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import javax.annotation.Nullable; + +/** Implementation of the {@link SpanBuilder}. */ +final class SpanBuilderImpl extends SpanBuilder { + private static final Tracestate TRACESTATE_DEFAULT = Tracestate.builder().build(); + + private static final TraceOptions SAMPLED_TRACE_OPTIONS = + TraceOptions.builder().setIsSampled(true).build(); + private static final TraceOptions NOT_SAMPLED_TRACE_OPTIONS = + TraceOptions.builder().setIsSampled(false).build(); + + private final Options options; + private final String name; + @Nullable private final Span parent; + @Nullable private final SpanContext remoteParentSpanContext; + @Nullable private Sampler sampler; + private List<Span> parentLinks = Collections.<Span>emptyList(); + @Nullable private Boolean recordEvents; + @Nullable private Kind kind; + + private Span startSpanInternal( + @Nullable SpanContext parent, + @Nullable Boolean hasRemoteParent, + String name, + @Nullable Sampler sampler, + List<Span> parentLinks, + @Nullable Boolean recordEvents, + @Nullable Kind kind, + @Nullable TimestampConverter timestampConverter) { + TraceParams activeTraceParams = options.traceConfig.getActiveTraceParams(); + Random random = options.randomHandler.current(); + TraceId traceId; + SpanId spanId = SpanId.generateRandomId(random); + SpanId parentSpanId = null; + // TODO(bdrutu): Handle tracestate correctly not just propagate. + Tracestate tracestate = TRACESTATE_DEFAULT; + if (parent == null || !parent.isValid()) { + // New root span. + traceId = TraceId.generateRandomId(random); + // This is a root span so no remote or local parent. + hasRemoteParent = null; + } else { + // New child span. + traceId = parent.getTraceId(); + parentSpanId = parent.getSpanId(); + tracestate = parent.getTracestate(); + } + TraceOptions traceOptions = + makeSamplingDecision( + parent, + hasRemoteParent, + name, + sampler, + parentLinks, + traceId, + spanId, + activeTraceParams) + ? SAMPLED_TRACE_OPTIONS + : NOT_SAMPLED_TRACE_OPTIONS; + Span span = + (traceOptions.isSampled() || Boolean.TRUE.equals(recordEvents)) + ? RecordEventsSpanImpl.startSpan( + SpanContext.create(traceId, spanId, traceOptions, tracestate), + name, + kind, + parentSpanId, + hasRemoteParent, + activeTraceParams, + options.startEndHandler, + timestampConverter, + options.clock) + : NoRecordEventsSpanImpl.create( + SpanContext.create(traceId, spanId, traceOptions, tracestate)); + linkSpans(span, parentLinks); + return span; + } + + private static boolean makeSamplingDecision( + @Nullable SpanContext parent, + @Nullable Boolean hasRemoteParent, + String name, + @Nullable Sampler sampler, + List<Span> parentLinks, + TraceId traceId, + SpanId spanId, + TraceParams activeTraceParams) { + // If users set a specific sampler in the SpanBuilder, use it. + if (sampler != null) { + return sampler.shouldSample(parent, hasRemoteParent, traceId, spanId, name, parentLinks); + } + // Use the default sampler if this is a root Span or this is an entry point Span (has remote + // parent). + if (Boolean.TRUE.equals(hasRemoteParent) || parent == null || !parent.isValid()) { + return activeTraceParams + .getSampler() + .shouldSample(parent, hasRemoteParent, traceId, spanId, name, parentLinks); + } + // Parent is always different than null because otherwise we use the default sampler. + return parent.getTraceOptions().isSampled() || isAnyParentLinkSampled(parentLinks); + } + + private static boolean isAnyParentLinkSampled(List<Span> parentLinks) { + for (Span parentLink : parentLinks) { + if (parentLink.getContext().getTraceOptions().isSampled()) { + return true; + } + } + return false; + } + + private static void linkSpans(Span span, List<Span> parentLinks) { + if (!parentLinks.isEmpty()) { + Link childLink = Link.fromSpanContext(span.getContext(), Type.CHILD_LINKED_SPAN); + for (Span linkedSpan : parentLinks) { + linkedSpan.addLink(childLink); + span.addLink(Link.fromSpanContext(linkedSpan.getContext(), Type.PARENT_LINKED_SPAN)); + } + } + } + + static SpanBuilderImpl createWithParent(String spanName, @Nullable Span parent, Options options) { + return new SpanBuilderImpl(spanName, null, parent, options); + } + + static SpanBuilderImpl createWithRemoteParent( + String spanName, @Nullable SpanContext remoteParentSpanContext, Options options) { + return new SpanBuilderImpl(spanName, remoteParentSpanContext, null, options); + } + + private SpanBuilderImpl( + String name, + @Nullable SpanContext remoteParentSpanContext, + @Nullable Span parent, + Options options) { + this.name = checkNotNull(name, "name"); + this.parent = parent; + this.remoteParentSpanContext = remoteParentSpanContext; + this.options = options; + } + + @Override + public Span startSpan() { + SpanContext parentContext = remoteParentSpanContext; + Boolean hasRemoteParent = Boolean.TRUE; + TimestampConverter timestampConverter = null; + if (remoteParentSpanContext == null) { + // This is not a child of a remote Span. Get the parent SpanContext from the parent Span if + // any. + Span parent = this.parent; + hasRemoteParent = Boolean.FALSE; + if (parent != null) { + parentContext = parent.getContext(); + // Pass the timestamp converter from the parent to ensure that the recorded events are in + // the right order. Implementation uses System.nanoTime() which is monotonically increasing. + if (parent instanceof RecordEventsSpanImpl) { + timestampConverter = ((RecordEventsSpanImpl) parent).getTimestampConverter(); + } + } else { + hasRemoteParent = null; + } + } + return startSpanInternal( + parentContext, + hasRemoteParent, + name, + sampler, + parentLinks, + recordEvents, + kind, + timestampConverter); + } + + static final class Options { + private final RandomHandler randomHandler; + private final RecordEventsSpanImpl.StartEndHandler startEndHandler; + private final Clock clock; + private final TraceConfig traceConfig; + + Options( + RandomHandler randomHandler, + RecordEventsSpanImpl.StartEndHandler startEndHandler, + Clock clock, + TraceConfig traceConfig) { + this.randomHandler = checkNotNull(randomHandler, "randomHandler"); + this.startEndHandler = checkNotNull(startEndHandler, "startEndHandler"); + this.clock = checkNotNull(clock, "clock"); + this.traceConfig = checkNotNull(traceConfig, "traceConfig"); + } + } + + @Override + public SpanBuilderImpl setSampler(Sampler sampler) { + this.sampler = checkNotNull(sampler, "sampler"); + return this; + } + + @Override + public SpanBuilderImpl setParentLinks(List<Span> parentLinks) { + this.parentLinks = checkNotNull(parentLinks, "parentLinks"); + return this; + } + + @Override + public SpanBuilderImpl setRecordEvents(boolean recordEvents) { + this.recordEvents = recordEvents; + return this; + } + + @Override + public SpanBuilderImpl setSpanKind(@Nullable Kind kind) { + this.kind = kind; + return this; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java new file mode 100644 index 00000000..6adaa200 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java @@ -0,0 +1,127 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace; + +import io.opencensus.implcore.internal.EventQueue; +import io.opencensus.implcore.trace.RecordEventsSpanImpl.StartEndHandler; +import io.opencensus.implcore.trace.export.RunningSpanStoreImpl; +import io.opencensus.implcore.trace.export.SampledSpanStoreImpl; +import io.opencensus.implcore.trace.export.SpanExporterImpl; +import io.opencensus.trace.Span.Options; +import io.opencensus.trace.export.SpanData; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Uses the provided {@link EventQueue} to defer processing/exporting of the {@link SpanData} to + * avoid impacting the critical path. + */ +@ThreadSafe +public final class StartEndHandlerImpl implements StartEndHandler { + private final SpanExporterImpl spanExporter; + @Nullable private final RunningSpanStoreImpl runningSpanStore; + @Nullable private final SampledSpanStoreImpl sampledSpanStore; + private final EventQueue eventQueue; + // true if any of (runningSpanStore OR sampledSpanStore) are different than null, which + // means the spans with RECORD_EVENTS should be enqueued in the queue. + private final boolean enqueueEventForNonSampledSpans; + + /** + * Constructs a new {@code StartEndHandlerImpl}. + * + * @param spanExporter the {@code SpanExporter} implementation. + * @param runningSpanStore the {@code RunningSpanStore} implementation. + * @param sampledSpanStore the {@code SampledSpanStore} implementation. + * @param eventQueue the event queue where all the events are enqueued. + */ + public StartEndHandlerImpl( + SpanExporterImpl spanExporter, + @Nullable RunningSpanStoreImpl runningSpanStore, + @Nullable SampledSpanStoreImpl sampledSpanStore, + EventQueue eventQueue) { + this.spanExporter = spanExporter; + this.runningSpanStore = runningSpanStore; + this.sampledSpanStore = sampledSpanStore; + this.enqueueEventForNonSampledSpans = runningSpanStore != null || sampledSpanStore != null; + this.eventQueue = eventQueue; + } + + @Override + public void onStart(RecordEventsSpanImpl span) { + if (span.getOptions().contains(Options.RECORD_EVENTS) && enqueueEventForNonSampledSpans) { + eventQueue.enqueue(new SpanStartEvent(span, runningSpanStore)); + } + } + + @Override + public void onEnd(RecordEventsSpanImpl span) { + if ((span.getOptions().contains(Options.RECORD_EVENTS) && enqueueEventForNonSampledSpans) + || span.getContext().getTraceOptions().isSampled()) { + eventQueue.enqueue(new SpanEndEvent(span, spanExporter, runningSpanStore, sampledSpanStore)); + } + } + + // An EventQueue entry that records the start of the span event. + private static final class SpanStartEvent implements EventQueue.Entry { + private final RecordEventsSpanImpl span; + @Nullable private final RunningSpanStoreImpl activeSpansExporter; + + SpanStartEvent(RecordEventsSpanImpl span, @Nullable RunningSpanStoreImpl activeSpansExporter) { + this.span = span; + this.activeSpansExporter = activeSpansExporter; + } + + @Override + public void process() { + if (activeSpansExporter != null) { + activeSpansExporter.onStart(span); + } + } + } + + // An EventQueue entry that records the end of the span event. + private static final class SpanEndEvent implements EventQueue.Entry { + private final RecordEventsSpanImpl span; + @Nullable private final RunningSpanStoreImpl runningSpanStore; + private final SpanExporterImpl spanExporter; + @Nullable private final SampledSpanStoreImpl sampledSpanStore; + + SpanEndEvent( + RecordEventsSpanImpl span, + SpanExporterImpl spanExporter, + @Nullable RunningSpanStoreImpl runningSpanStore, + @Nullable SampledSpanStoreImpl sampledSpanStore) { + this.span = span; + this.runningSpanStore = runningSpanStore; + this.spanExporter = spanExporter; + this.sampledSpanStore = sampledSpanStore; + } + + @Override + public void process() { + if (span.getContext().getTraceOptions().isSampled()) { + spanExporter.addSpan(span); + } + if (runningSpanStore != null) { + runningSpanStore.onEnd(span); + } + if (sampledSpanStore != null) { + sampledSpanStore.considerForSampling(span); + } + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/TraceComponentImplBase.java b/impl_core/src/main/java/io/opencensus/implcore/trace/TraceComponentImplBase.java new file mode 100644 index 00000000..c1432432 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/TraceComponentImplBase.java @@ -0,0 +1,90 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace; + +import io.opencensus.common.Clock; +import io.opencensus.implcore.internal.EventQueue; +import io.opencensus.implcore.internal.SimpleEventQueue; +import io.opencensus.implcore.trace.RecordEventsSpanImpl.StartEndHandler; +import io.opencensus.implcore.trace.config.TraceConfigImpl; +import io.opencensus.implcore.trace.export.ExportComponentImpl; +import io.opencensus.implcore.trace.internal.RandomHandler; +import io.opencensus.implcore.trace.propagation.PropagationComponentImpl; +import io.opencensus.trace.TraceComponent; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.config.TraceConfig; +import io.opencensus.trace.export.ExportComponent; +import io.opencensus.trace.propagation.PropagationComponent; + +/** + * Helper class to allow sharing the code for all the {@link TraceComponent} implementations. This + * class cannot use inheritance because in version 0.5.* the constructor of the {@code + * TraceComponent} is package protected. + * + * <p>This can be changed back to inheritance when version 0.5.* is no longer supported. + */ +public final class TraceComponentImplBase { + private final ExportComponentImpl exportComponent; + private final PropagationComponent propagationComponent = new PropagationComponentImpl(); + private final Clock clock; + private final TraceConfig traceConfig = new TraceConfigImpl(); + private final Tracer tracer; + + /** + * Creates a new {@code TraceComponentImplBase}. + * + * @param clock the clock to use throughout tracing. + * @param randomHandler the random number generator for generating trace and span IDs. + * @param eventQueue the queue implementation. + */ + public TraceComponentImplBase(Clock clock, RandomHandler randomHandler, EventQueue eventQueue) { + this.clock = clock; + // TODO(bdrutu): Add a config/argument for supportInProcessStores. + if (eventQueue instanceof SimpleEventQueue) { + exportComponent = ExportComponentImpl.createWithoutInProcessStores(eventQueue); + } else { + exportComponent = ExportComponentImpl.createWithInProcessStores(eventQueue); + } + StartEndHandler startEndHandler = + new StartEndHandlerImpl( + exportComponent.getSpanExporter(), + exportComponent.getRunningSpanStore(), + exportComponent.getSampledSpanStore(), + eventQueue); + tracer = new TracerImpl(randomHandler, startEndHandler, clock, traceConfig); + } + + public Tracer getTracer() { + return tracer; + } + + public PropagationComponent getPropagationComponent() { + return propagationComponent; + } + + public final Clock getClock() { + return clock; + } + + public ExportComponent getExportComponent() { + return exportComponent; + } + + public TraceConfig getTraceConfig() { + return traceConfig; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/TracerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/TracerImpl.java new file mode 100644 index 00000000..48df8055 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/TracerImpl.java @@ -0,0 +1,52 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace; + +import io.opencensus.common.Clock; +import io.opencensus.implcore.trace.internal.RandomHandler; +import io.opencensus.trace.Span; +import io.opencensus.trace.SpanBuilder; +import io.opencensus.trace.SpanContext; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.config.TraceConfig; +import javax.annotation.Nullable; + +/** Implementation of the {@link Tracer}. */ +public final class TracerImpl extends Tracer { + private final SpanBuilderImpl.Options spanBuilderOptions; + + TracerImpl( + RandomHandler randomHandler, + RecordEventsSpanImpl.StartEndHandler startEndHandler, + Clock clock, + TraceConfig traceConfig) { + spanBuilderOptions = + new SpanBuilderImpl.Options(randomHandler, startEndHandler, clock, traceConfig); + } + + @Override + public SpanBuilder spanBuilderWithExplicitParent(String spanName, @Nullable Span parent) { + return SpanBuilderImpl.createWithParent(spanName, parent, spanBuilderOptions); + } + + @Override + public SpanBuilder spanBuilderWithRemoteParent( + String spanName, @Nullable SpanContext remoteParentSpanContext) { + return SpanBuilderImpl.createWithRemoteParent( + spanName, remoteParentSpanContext, spanBuilderOptions); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/config/TraceConfigImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/config/TraceConfigImpl.java new file mode 100644 index 00000000..25f0c613 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/config/TraceConfigImpl.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.config; + +import io.opencensus.trace.config.TraceConfig; +import io.opencensus.trace.config.TraceParams; + +/** + * Global configuration of the trace service. This allows users to change configs for the default + * sampler, maximum events to be kept, etc. + */ +public final class TraceConfigImpl extends TraceConfig { + // Reads and writes are atomic for reference variables. Use volatile to ensure that these + // operations are visible on other CPUs as well. + private volatile TraceParams activeTraceParams = TraceParams.DEFAULT; + + /** Constructs a new {@code TraceConfigImpl}. */ + public TraceConfigImpl() {} + + @Override + public TraceParams getActiveTraceParams() { + return activeTraceParams; + } + + @Override + public void updateActiveTraceParams(TraceParams traceParams) { + activeTraceParams = traceParams; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java new file mode 100644 index 00000000..19817380 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java @@ -0,0 +1,93 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.export; + +import io.opencensus.common.Duration; +import io.opencensus.implcore.internal.EventQueue; +import io.opencensus.trace.export.ExportComponent; +import io.opencensus.trace.export.RunningSpanStore; +import io.opencensus.trace.export.SampledSpanStore; + +/** Implementation of the {@link ExportComponent}. */ +public final class ExportComponentImpl extends ExportComponent { + private static final int EXPORTER_BUFFER_SIZE = 32; + // Enforces that trace export exports data at least once every 5 seconds. + private static final Duration EXPORTER_SCHEDULE_DELAY = Duration.create(5, 0); + + private final SpanExporterImpl spanExporter; + private final RunningSpanStoreImpl runningSpanStore; + private final SampledSpanStoreImpl sampledSpanStore; + + @Override + public SpanExporterImpl getSpanExporter() { + return spanExporter; + } + + @Override + public RunningSpanStoreImpl getRunningSpanStore() { + return runningSpanStore; + } + + @Override + public SampledSpanStoreImpl getSampledSpanStore() { + return sampledSpanStore; + } + + @Override + public void shutdown() { + sampledSpanStore.shutdown(); + spanExporter.shutdown(); + } + + /** + * Returns a new {@code ExportComponentImpl} that has valid instances for {@link RunningSpanStore} + * and {@link SampledSpanStore}. + * + * @return a new {@code ExportComponentImpl}. + */ + public static ExportComponentImpl createWithInProcessStores(EventQueue eventQueue) { + return new ExportComponentImpl(true, eventQueue); + } + + /** + * Returns a new {@code ExportComponentImpl} that has {@code null} instances for {@link + * RunningSpanStore} and {@link SampledSpanStore}. + * + * @return a new {@code ExportComponentImpl}. + */ + public static ExportComponentImpl createWithoutInProcessStores(EventQueue eventQueue) { + return new ExportComponentImpl(false, eventQueue); + } + + /** + * Constructs a new {@code ExportComponentImpl}. + * + * @param supportInProcessStores {@code true} to instantiate {@link RunningSpanStore} and {@link + * SampledSpanStore}. + */ + private ExportComponentImpl(boolean supportInProcessStores, EventQueue eventQueue) { + this.spanExporter = SpanExporterImpl.create(EXPORTER_BUFFER_SIZE, EXPORTER_SCHEDULE_DELAY); + this.runningSpanStore = + supportInProcessStores + ? new InProcessRunningSpanStoreImpl() + : RunningSpanStoreImpl.getNoopRunningSpanStoreImpl(); + this.sampledSpanStore = + supportInProcessStores + ? new InProcessSampledSpanStoreImpl(eventQueue) + : SampledSpanStoreImpl.getNoopSampledSpanStoreImpl(); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java new file mode 100644 index 00000000..f7aeac71 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java @@ -0,0 +1,81 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.export; + +import io.opencensus.implcore.trace.RecordEventsSpanImpl; +import io.opencensus.implcore.trace.internal.ConcurrentIntrusiveList; +import io.opencensus.trace.export.RunningSpanStore; +import io.opencensus.trace.export.SpanData; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.concurrent.ThreadSafe; + +/** In-process implementation of the {@link RunningSpanStore}. */ +@ThreadSafe +public final class InProcessRunningSpanStoreImpl extends RunningSpanStoreImpl { + private final ConcurrentIntrusiveList<RecordEventsSpanImpl> runningSpans; + + public InProcessRunningSpanStoreImpl() { + runningSpans = new ConcurrentIntrusiveList<RecordEventsSpanImpl>(); + } + + @Override + public void onStart(RecordEventsSpanImpl span) { + runningSpans.addElement(span); + } + + @Override + public void onEnd(RecordEventsSpanImpl span) { + runningSpans.removeElement(span); + } + + @Override + public Summary getSummary() { + Collection<RecordEventsSpanImpl> allRunningSpans = runningSpans.getAll(); + Map<String, Integer> numSpansPerName = new HashMap<String, Integer>(); + for (RecordEventsSpanImpl span : allRunningSpans) { + Integer prevValue = numSpansPerName.get(span.getName()); + numSpansPerName.put(span.getName(), prevValue != null ? prevValue + 1 : 1); + } + Map<String, PerSpanNameSummary> perSpanNameSummary = new HashMap<String, PerSpanNameSummary>(); + for (Map.Entry<String, Integer> it : numSpansPerName.entrySet()) { + perSpanNameSummary.put(it.getKey(), PerSpanNameSummary.create(it.getValue())); + } + Summary summary = Summary.create(perSpanNameSummary); + return summary; + } + + @Override + public Collection<SpanData> getRunningSpans(Filter filter) { + Collection<RecordEventsSpanImpl> allRunningSpans = runningSpans.getAll(); + int maxSpansToReturn = + filter.getMaxSpansToReturn() == 0 ? allRunningSpans.size() : filter.getMaxSpansToReturn(); + List<SpanData> ret = new ArrayList<SpanData>(maxSpansToReturn); + for (RecordEventsSpanImpl span : allRunningSpans) { + if (ret.size() == maxSpansToReturn) { + break; + } + if (span.getName().equals(filter.getSpanName())) { + ret.add(span.toSpanData()); + } + } + return ret; + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java new file mode 100644 index 00000000..0d8e493b --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java @@ -0,0 +1,396 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.export; + +import com.google.common.collect.EvictingQueue; +import io.opencensus.implcore.internal.EventQueue; +import io.opencensus.implcore.trace.RecordEventsSpanImpl; +import io.opencensus.trace.Status; +import io.opencensus.trace.Status.CanonicalCode; +import io.opencensus.trace.export.SampledSpanStore; +import io.opencensus.trace.export.SpanData; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +/** In-process implementation of the {@link SampledSpanStore}. */ +@ThreadSafe +public final class InProcessSampledSpanStoreImpl extends SampledSpanStoreImpl { + private static final int NUM_SAMPLES_PER_LATENCY_BUCKET = 10; + private static final int NUM_SAMPLES_PER_ERROR_BUCKET = 5; + private static final long TIME_BETWEEN_SAMPLES = TimeUnit.SECONDS.toNanos(1); + private static final int NUM_LATENCY_BUCKETS = LatencyBucketBoundaries.values().length; + // The total number of canonical codes - 1 (the OK code). + private static final int NUM_ERROR_BUCKETS = CanonicalCode.values().length - 1; + private static final int MAX_PER_SPAN_NAME_SAMPLES = + NUM_SAMPLES_PER_LATENCY_BUCKET * NUM_LATENCY_BUCKETS + + NUM_SAMPLES_PER_ERROR_BUCKET * NUM_ERROR_BUCKETS; + + // Used to stream the register/unregister events to the implementation to avoid lock contention + // between the main threads and the worker thread. + private final EventQueue eventQueue; + + @GuardedBy("samples") + private final Map<String, PerSpanNameSamples> samples; + + private static final class Bucket { + + private final EvictingQueue<RecordEventsSpanImpl> sampledSpansQueue; + private final EvictingQueue<RecordEventsSpanImpl> notSampledSpansQueue; + private long lastSampledNanoTime; + private long lastNotSampledNanoTime; + + private Bucket(int numSamples) { + sampledSpansQueue = EvictingQueue.create(numSamples); + notSampledSpansQueue = EvictingQueue.create(numSamples); + } + + private void considerForSampling(RecordEventsSpanImpl span) { + long spanEndNanoTime = span.getEndNanoTime(); + if (span.getContext().getTraceOptions().isSampled()) { + // Need to compare by doing the subtraction all the time because in case of an overflow, + // this may never sample again (at least for the next ~200 years). No real chance to + // overflow two times because that means the process runs for ~200 years. + if (spanEndNanoTime - lastSampledNanoTime > TIME_BETWEEN_SAMPLES) { + sampledSpansQueue.add(span); + lastSampledNanoTime = spanEndNanoTime; + } + } else { + // Need to compare by doing the subtraction all the time because in case of an overflow, + // this may never sample again (at least for the next ~200 years). No real chance to + // overflow two times because that means the process runs for ~200 years. + if (spanEndNanoTime - lastNotSampledNanoTime > TIME_BETWEEN_SAMPLES) { + notSampledSpansQueue.add(span); + lastNotSampledNanoTime = spanEndNanoTime; + } + } + } + + private void getSamples(int maxSpansToReturn, List<RecordEventsSpanImpl> output) { + getSamples(maxSpansToReturn, output, sampledSpansQueue); + getSamples(maxSpansToReturn, output, notSampledSpansQueue); + } + + private static void getSamples( + int maxSpansToReturn, + List<RecordEventsSpanImpl> output, + EvictingQueue<RecordEventsSpanImpl> queue) { + for (RecordEventsSpanImpl span : queue) { + if (output.size() >= maxSpansToReturn) { + break; + } + output.add(span); + } + } + + private void getSamplesFilteredByLatency( + long latencyLowerNs, + long latencyUpperNs, + int maxSpansToReturn, + List<RecordEventsSpanImpl> output) { + getSamplesFilteredByLatency( + latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, sampledSpansQueue); + getSamplesFilteredByLatency( + latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, notSampledSpansQueue); + } + + private static void getSamplesFilteredByLatency( + long latencyLowerNs, + long latencyUpperNs, + int maxSpansToReturn, + List<RecordEventsSpanImpl> output, + EvictingQueue<RecordEventsSpanImpl> queue) { + for (RecordEventsSpanImpl span : queue) { + if (output.size() >= maxSpansToReturn) { + break; + } + long spanLatencyNs = span.getLatencyNs(); + if (spanLatencyNs >= latencyLowerNs && spanLatencyNs < latencyUpperNs) { + output.add(span); + } + } + } + + private int getNumSamples() { + return sampledSpansQueue.size() + notSampledSpansQueue.size(); + } + } + + /** + * Keeps samples for a given span name. Samples for all the latency buckets and for all canonical + * codes other than OK. + */ + private static final class PerSpanNameSamples { + + private final Bucket[] latencyBuckets; + private final Bucket[] errorBuckets; + + private PerSpanNameSamples() { + latencyBuckets = new Bucket[NUM_LATENCY_BUCKETS]; + for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { + latencyBuckets[i] = new Bucket(NUM_SAMPLES_PER_LATENCY_BUCKET); + } + errorBuckets = new Bucket[NUM_ERROR_BUCKETS]; + for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { + errorBuckets[i] = new Bucket(NUM_SAMPLES_PER_ERROR_BUCKET); + } + } + + @Nullable + private Bucket getLatencyBucket(long latencyNs) { + for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { + LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i]; + if (latencyNs >= boundaries.getLatencyLowerNs() + && latencyNs < boundaries.getLatencyUpperNs()) { + return latencyBuckets[i]; + } + } + // latencyNs is negative or Long.MAX_VALUE, so this Span can be ignored. This cannot happen + // in real production because System#nanoTime is monotonic. + return null; + } + + private Bucket getErrorBucket(CanonicalCode code) { + return errorBuckets[code.value() - 1]; + } + + private void considerForSampling(RecordEventsSpanImpl span) { + Status status = span.getStatus(); + // Null status means running Span, this should not happen in production, but the library + // should not crash because of this. + if (status != null) { + Bucket bucket = + status.isOk() + ? getLatencyBucket(span.getLatencyNs()) + : getErrorBucket(status.getCanonicalCode()); + // If unable to find the bucket, ignore this Span. + if (bucket != null) { + bucket.considerForSampling(span); + } + } + } + + private Map<LatencyBucketBoundaries, Integer> getNumbersOfLatencySampledSpans() { + Map<LatencyBucketBoundaries, Integer> latencyBucketSummaries = + new EnumMap<LatencyBucketBoundaries, Integer>(LatencyBucketBoundaries.class); + for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { + latencyBucketSummaries.put( + LatencyBucketBoundaries.values()[i], latencyBuckets[i].getNumSamples()); + } + return latencyBucketSummaries; + } + + private Map<CanonicalCode, Integer> getNumbersOfErrorSampledSpans() { + Map<CanonicalCode, Integer> errorBucketSummaries = + new EnumMap<CanonicalCode, Integer>(CanonicalCode.class); + for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { + errorBucketSummaries.put(CanonicalCode.values()[i + 1], errorBuckets[i].getNumSamples()); + } + return errorBucketSummaries; + } + + private List<RecordEventsSpanImpl> getErrorSamples( + @Nullable CanonicalCode code, int maxSpansToReturn) { + ArrayList<RecordEventsSpanImpl> output = + new ArrayList<RecordEventsSpanImpl>(maxSpansToReturn); + if (code != null) { + getErrorBucket(code).getSamples(maxSpansToReturn, output); + } else { + for (int i = 0; i < NUM_ERROR_BUCKETS; i++) { + errorBuckets[i].getSamples(maxSpansToReturn, output); + } + } + return output; + } + + private List<RecordEventsSpanImpl> getLatencySamples( + long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn) { + ArrayList<RecordEventsSpanImpl> output = + new ArrayList<RecordEventsSpanImpl>(maxSpansToReturn); + for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) { + LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i]; + if (latencyUpperNs >= boundaries.getLatencyLowerNs() + && latencyLowerNs < boundaries.getLatencyUpperNs()) { + latencyBuckets[i].getSamplesFilteredByLatency( + latencyLowerNs, latencyUpperNs, maxSpansToReturn, output); + } + } + return output; + } + } + + /** Constructs a new {@code InProcessSampledSpanStoreImpl}. */ + InProcessSampledSpanStoreImpl(EventQueue eventQueue) { + samples = new HashMap<String, PerSpanNameSamples>(); + this.eventQueue = eventQueue; + } + + @Override + public Summary getSummary() { + Map<String, PerSpanNameSummary> ret = new HashMap<String, PerSpanNameSummary>(); + synchronized (samples) { + for (Map.Entry<String, PerSpanNameSamples> it : samples.entrySet()) { + ret.put( + it.getKey(), + PerSpanNameSummary.create( + it.getValue().getNumbersOfLatencySampledSpans(), + it.getValue().getNumbersOfErrorSampledSpans())); + } + } + return Summary.create(ret); + } + + @Override + public void considerForSampling(RecordEventsSpanImpl span) { + synchronized (samples) { + String spanName = span.getName(); + if (span.getSampleToLocalSpanStore() && !samples.containsKey(spanName)) { + samples.put(spanName, new PerSpanNameSamples()); + } + PerSpanNameSamples perSpanNameSamples = samples.get(spanName); + if (perSpanNameSamples != null) { + perSpanNameSamples.considerForSampling(span); + } + } + } + + @Override + public void registerSpanNamesForCollection(Collection<String> spanNames) { + eventQueue.enqueue(new RegisterSpanNameEvent(this, spanNames)); + } + + @Override + protected void shutdown() { + eventQueue.shutdown(); + } + + private void internaltRegisterSpanNamesForCollection(Collection<String> spanNames) { + synchronized (samples) { + for (String spanName : spanNames) { + if (!samples.containsKey(spanName)) { + samples.put(spanName, new PerSpanNameSamples()); + } + } + } + } + + private static final class RegisterSpanNameEvent implements EventQueue.Entry { + private final InProcessSampledSpanStoreImpl sampledSpanStore; + private final Collection<String> spanNames; + + private RegisterSpanNameEvent( + InProcessSampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) { + this.sampledSpanStore = sampledSpanStore; + this.spanNames = new ArrayList<String>(spanNames); + } + + @Override + public void process() { + sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames); + } + } + + @Override + public void unregisterSpanNamesForCollection(Collection<String> spanNames) { + eventQueue.enqueue(new UnregisterSpanNameEvent(this, spanNames)); + } + + private void internalUnregisterSpanNamesForCollection(Collection<String> spanNames) { + synchronized (samples) { + samples.keySet().removeAll(spanNames); + } + } + + private static final class UnregisterSpanNameEvent implements EventQueue.Entry { + private final InProcessSampledSpanStoreImpl sampledSpanStore; + private final Collection<String> spanNames; + + private UnregisterSpanNameEvent( + InProcessSampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) { + this.sampledSpanStore = sampledSpanStore; + this.spanNames = new ArrayList<String>(spanNames); + } + + @Override + public void process() { + sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames); + } + } + + @Override + public Set<String> getRegisteredSpanNamesForCollection() { + synchronized (samples) { + return Collections.unmodifiableSet(new HashSet<String>(samples.keySet())); + } + } + + @Override + public Collection<SpanData> getErrorSampledSpans(ErrorFilter filter) { + int numSpansToReturn = + filter.getMaxSpansToReturn() == 0 + ? MAX_PER_SPAN_NAME_SAMPLES + : filter.getMaxSpansToReturn(); + List<RecordEventsSpanImpl> spans = Collections.emptyList(); + // Try to not keep the lock to much, do the RecordEventsSpanImpl -> SpanData conversion outside + // the lock. + synchronized (samples) { + PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName()); + if (perSpanNameSamples != null) { + spans = perSpanNameSamples.getErrorSamples(filter.getCanonicalCode(), numSpansToReturn); + } + } + List<SpanData> ret = new ArrayList<SpanData>(spans.size()); + for (RecordEventsSpanImpl span : spans) { + ret.add(span.toSpanData()); + } + return Collections.unmodifiableList(ret); + } + + @Override + public Collection<SpanData> getLatencySampledSpans(LatencyFilter filter) { + int numSpansToReturn = + filter.getMaxSpansToReturn() == 0 + ? MAX_PER_SPAN_NAME_SAMPLES + : filter.getMaxSpansToReturn(); + List<RecordEventsSpanImpl> spans = Collections.emptyList(); + // Try to not keep the lock to much, do the RecordEventsSpanImpl -> SpanData conversion outside + // the lock. + synchronized (samples) { + PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName()); + if (perSpanNameSamples != null) { + spans = + perSpanNameSamples.getLatencySamples( + filter.getLatencyLowerNs(), filter.getLatencyUpperNs(), numSpansToReturn); + } + } + List<SpanData> ret = new ArrayList<SpanData>(spans.size()); + for (RecordEventsSpanImpl span : spans) { + ret.add(span.toSpanData()); + } + return Collections.unmodifiableList(ret); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java new file mode 100644 index 00000000..962f5b01 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java @@ -0,0 +1,71 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.export; + +import io.opencensus.implcore.trace.RecordEventsSpanImpl; +import io.opencensus.trace.export.RunningSpanStore; +import io.opencensus.trace.export.SpanData; +import java.util.Collection; +import java.util.Collections; + +/** Abstract implementation of the {@link RunningSpanStore}. */ +public abstract class RunningSpanStoreImpl extends RunningSpanStore { + + private static final RunningSpanStoreImpl NOOP_RUNNING_SPAN_STORE_IMPL = + new NoopRunningSpanStoreImpl(); + + /** Returns the no-op implementation of the {@link RunningSpanStoreImpl}. */ + static RunningSpanStoreImpl getNoopRunningSpanStoreImpl() { + return NOOP_RUNNING_SPAN_STORE_IMPL; + } + + /** + * Adds the {@code Span} into the running spans list when the {@code Span} starts. + * + * @param span the {@code Span} that started. + */ + public abstract void onStart(RecordEventsSpanImpl span); + + /** + * Removes the {@code Span} from the running spans list when the {@code Span} ends. + * + * @param span the {@code Span} that ended. + */ + public abstract void onEnd(RecordEventsSpanImpl span); + + private static final class NoopRunningSpanStoreImpl extends RunningSpanStoreImpl { + + private static final Summary EMPTY_SUMMARY = + RunningSpanStore.Summary.create(Collections.<String, PerSpanNameSummary>emptyMap()); + + @Override + public void onStart(RecordEventsSpanImpl span) {} + + @Override + public void onEnd(RecordEventsSpanImpl span) {} + + @Override + public Summary getSummary() { + return EMPTY_SUMMARY; + } + + @Override + public Collection<SpanData> getRunningSpans(Filter filter) { + return Collections.<SpanData>emptyList(); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java new file mode 100644 index 00000000..e67c2f8e --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java @@ -0,0 +1,81 @@ +/* + * Copyright 2018, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.export; + +import io.opencensus.implcore.trace.RecordEventsSpanImpl; +import io.opencensus.trace.export.SampledSpanStore; +import io.opencensus.trace.export.SpanData; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +/** Abstract implementation of the {@link SampledSpanStore}. */ +public abstract class SampledSpanStoreImpl extends SampledSpanStore { + private static final SampledSpanStoreImpl NOOP_SAMPLED_SPAN_STORE_IMPL = + new NoopSampledSpanStoreImpl(); + + /** Returns the new no-op implmentation of {@link SampledSpanStoreImpl}. */ + public static SampledSpanStoreImpl getNoopSampledSpanStoreImpl() { + return NOOP_SAMPLED_SPAN_STORE_IMPL; + } + + /** + * Considers to save the given spans to the stored samples. This must be called at the end of each + * Span with the option RECORD_EVENTS. + * + * @param span the span to be consider for storing into the store buckets. + */ + public abstract void considerForSampling(RecordEventsSpanImpl span); + + protected void shutdown() {} + + private static final class NoopSampledSpanStoreImpl extends SampledSpanStoreImpl { + private static final Summary EMPTY_SUMMARY = + Summary.create(Collections.<String, PerSpanNameSummary>emptyMap()); + private static final Set<String> EMPTY_REGISTERED_SPAN_NAMES = Collections.<String>emptySet(); + private static final Collection<SpanData> EMPTY_SPANDATA = Collections.<SpanData>emptySet(); + + @Override + public Summary getSummary() { + return EMPTY_SUMMARY; + } + + @Override + public void considerForSampling(RecordEventsSpanImpl span) {} + + @Override + public void registerSpanNamesForCollection(Collection<String> spanNames) {} + + @Override + public void unregisterSpanNamesForCollection(Collection<String> spanNames) {} + + @Override + public Set<String> getRegisteredSpanNamesForCollection() { + return EMPTY_REGISTERED_SPAN_NAMES; + } + + @Override + public Collection<SpanData> getErrorSampledSpans(ErrorFilter filter) { + return EMPTY_SPANDATA; + } + + @Override + public Collection<SpanData> getLatencySampledSpans(LatencyFilter filter) { + return EMPTY_SPANDATA; + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java new file mode 100644 index 00000000..51a7b05c --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java @@ -0,0 +1,214 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.export; + +import com.google.common.annotations.VisibleForTesting; +import io.opencensus.common.Duration; +import io.opencensus.implcore.internal.DaemonThreadFactory; +import io.opencensus.implcore.trace.RecordEventsSpanImpl; +import io.opencensus.trace.export.ExportComponent; +import io.opencensus.trace.export.SpanData; +import io.opencensus.trace.export.SpanExporter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.concurrent.GuardedBy; + +/** Implementation of the {@link SpanExporter}. */ +public final class SpanExporterImpl extends SpanExporter { + private static final Logger logger = Logger.getLogger(ExportComponent.class.getName()); + + private final Worker worker; + private final Thread workerThread; + + /** + * Constructs a {@code SpanExporterImpl} that exports the {@link SpanData} asynchronously. + * + * <p>Starts a separate thread that wakes up every {@code scheduleDelay} and exports any available + * spans data. If the number of buffered SpanData objects is greater than {@code bufferSize} then + * the thread wakes up sooner. + * + * @param bufferSize the size of the buffered span data. + * @param scheduleDelay the maximum delay. + */ + static SpanExporterImpl create(int bufferSize, Duration scheduleDelay) { + // TODO(bdrutu): Consider to add a shutdown hook to not avoid dropping data. + Worker worker = new Worker(bufferSize, scheduleDelay); + return new SpanExporterImpl(worker); + } + + /** + * Adds a Span to the exporting service. + * + * @param span the {@code Span} to be added. + */ + public void addSpan(RecordEventsSpanImpl span) { + worker.addSpan(span); + } + + @Override + public void registerHandler(String name, Handler handler) { + worker.registerHandler(name, handler); + } + + @Override + public void unregisterHandler(String name) { + worker.unregisterHandler(name); + } + + void flush() { + worker.flush(); + } + + void shutdown() { + flush(); + workerThread.interrupt(); + } + + private SpanExporterImpl(Worker worker) { + this.workerThread = + new DaemonThreadFactory("ExportComponent.ServiceExporterThread").newThread(worker); + this.workerThread.start(); + this.worker = worker; + } + + @VisibleForTesting + Thread getServiceExporterThread() { + return workerThread; + } + + // Worker in a thread that batches multiple span data and calls the registered services to export + // that data. + // + // The map of registered handlers is implemented using ConcurrentHashMap ensuring full + // concurrency of retrievals and adjustable expected concurrency for updates. Retrievals + // reflect the results of the most recently completed update operations held upon their onset. + // + // The list of batched data is protected by an explicit monitor object which ensures full + // concurrency. + private static final class Worker implements Runnable { + private final Object monitor = new Object(); + + @GuardedBy("monitor") + private final List<RecordEventsSpanImpl> spans; + + private final Map<String, Handler> serviceHandlers = new ConcurrentHashMap<String, Handler>(); + private final int bufferSize; + private final long scheduleDelayMillis; + + // See SpanExporterImpl#addSpan. + private void addSpan(RecordEventsSpanImpl span) { + synchronized (monitor) { + this.spans.add(span); + if (spans.size() > bufferSize) { + monitor.notifyAll(); + } + } + } + + // See SpanExporter#registerHandler. + private void registerHandler(String name, Handler serviceHandler) { + serviceHandlers.put(name, serviceHandler); + } + + // See SpanExporter#unregisterHandler. + private void unregisterHandler(String name) { + serviceHandlers.remove(name); + } + + // Exports the list of SpanData to all the ServiceHandlers. + private void onBatchExport(List<SpanData> spanDataList) { + // From the java documentation of the ConcurrentHashMap#entrySet(): + // The view's iterator is a "weakly consistent" iterator that will never throw + // ConcurrentModificationException, and guarantees to traverse elements as they existed + // upon construction of the iterator, and may (but is not guaranteed to) reflect any + // modifications subsequent to construction. + for (Map.Entry<String, Handler> it : serviceHandlers.entrySet()) { + // In case of any exception thrown by the service handlers continue to run. + try { + it.getValue().export(spanDataList); + } catch (Throwable e) { + logger.log(Level.WARNING, "Exception thrown by the service export " + it.getKey(), e); + } + } + } + + private Worker(int bufferSize, Duration scheduleDelay) { + spans = new ArrayList<RecordEventsSpanImpl>(bufferSize); + this.bufferSize = bufferSize; + this.scheduleDelayMillis = scheduleDelay.toMillis(); + } + + // Returns an unmodifiable list of all buffered spans data to ensure that any registered + // service handler cannot modify the list. + private static List<SpanData> fromSpanImplToSpanData(List<RecordEventsSpanImpl> spans) { + List<SpanData> spanDatas = new ArrayList<SpanData>(spans.size()); + for (RecordEventsSpanImpl span : spans) { + spanDatas.add(span.toSpanData()); + } + return Collections.unmodifiableList(spanDatas); + } + + @Override + public void run() { + while (true) { + // Copy all the batched spans in a separate list to release the monitor lock asap to + // avoid blocking the producer thread. + List<RecordEventsSpanImpl> spansCopy; + synchronized (monitor) { + if (spans.size() < bufferSize) { + do { + // In the case of a spurious wakeup we export only if we have at least one span in + // the batch. It is acceptable because batching is a best effort mechanism here. + try { + monitor.wait(scheduleDelayMillis); + } catch (InterruptedException ie) { + // Preserve the interruption status as per guidance and stop doing any work. + Thread.currentThread().interrupt(); + return; + } + } while (spans.isEmpty()); + } + spansCopy = new ArrayList<RecordEventsSpanImpl>(spans); + spans.clear(); + } + // Execute the batch export outside the synchronized to not block all producers. + final List<SpanData> spanDataList = fromSpanImplToSpanData(spansCopy); + if (!spanDataList.isEmpty()) { + onBatchExport(spanDataList); + } + } + } + + void flush() { + List<RecordEventsSpanImpl> spansCopy; + synchronized (monitor) { + spansCopy = new ArrayList<RecordEventsSpanImpl>(spans); + spans.clear(); + } + + final List<SpanData> spanDataList = fromSpanImplToSpanData(spansCopy); + if (!spanDataList.isEmpty()) { + onBatchExport(spanDataList); + } + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/internal/ConcurrentIntrusiveList.java b/impl_core/src/main/java/io/opencensus/implcore/trace/internal/ConcurrentIntrusiveList.java new file mode 100644 index 00000000..22d8e41a --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/internal/ConcurrentIntrusiveList.java @@ -0,0 +1,181 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.internal; + +import static com.google.common.base.Preconditions.checkArgument; + +import io.opencensus.implcore.internal.CheckerFrameworkUtils; +import io.opencensus.implcore.trace.internal.ConcurrentIntrusiveList.Element; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * An {@code ConcurrentIntrusiveList<T>} is a doubly-linked list where the link pointers are + * embedded in the elements. This makes insertion and removal into a known position constant time. + * + * <p>Elements must derive from the {@code Element<T extends Element<T>>} interface: + * + * <pre><code> + * class MyClass implements {@code Element<MyClass>} { + * private MyClass next = null; + * private MyClass prev = null; + * + * {@literal @}Override + * MyClass getNext() { + * return next; + * } + * + * {@literal @}Override + * void setNext(MyClass element) { + * next = element; + * } + * + * {@literal @}Override + * MyClass getPrev() { + * return prev; + * } + * + * {@literal @}Override + * void setPrev(MyClass element) { + * prev = element; + * } + * } + * </code></pre> + */ +@ThreadSafe +public final class ConcurrentIntrusiveList<T extends Element<T>> { + private int size = 0; + @Nullable private T head = null; + + public ConcurrentIntrusiveList() {} + + /** + * Adds the given {@code element} to the list. + * + * @param element the element to add. + * @throws IllegalArgumentException if the element is already in a list. + */ + public synchronized void addElement(T element) { + checkArgument( + element.getNext() == null && element.getPrev() == null && element != head, + "Element already in a list."); + size++; + if (head == null) { + head = element; + } else { + head.setPrev(element); + element.setNext(head); + head = element; + } + } + + /** + * Removes the given {@code element} from the list. + * + * @param element the element to remove. + * @throws IllegalArgumentException if the element is not in the list. + */ + public synchronized void removeElement(T element) { + checkArgument( + element.getNext() != null || element.getPrev() != null || element == head, + "Element not in the list."); + size--; + if (element.getPrev() == null) { + // This is the first element + head = element.getNext(); + if (head != null) { + // If more than one element in the list. + head.setPrev(null); + element.setNext(null); + } + } else if (element.getNext() == null) { + // This is the last element, and there is at least another element because + // element.getPrev() != null. + CheckerFrameworkUtils.castNonNull(element.getPrev()).setNext(null); + element.setPrev(null); + } else { + CheckerFrameworkUtils.castNonNull(element.getPrev()).setNext(element.getNext()); + CheckerFrameworkUtils.castNonNull(element.getNext()).setPrev(element.getPrev()); + element.setNext(null); + element.setPrev(null); + } + } + + /** + * Returns the number of elements in this list. + * + * @return the number of elements in this list. + */ + public synchronized int size() { + return size; + } + + /** + * Returns all the elements from this list. + * + * @return all the elements from this list. + */ + public synchronized Collection<T> getAll() { + List<T> all = new ArrayList<T>(size); + for (T e = head; e != null; e = e.getNext()) { + all.add(e); + } + return all; + } + + /** + * This is an interface that must be implemented by any element that uses {@link + * ConcurrentIntrusiveList}. + * + * @param <T> the element that will be used for the list. + */ + public interface Element<T extends Element<T>> { + + /** + * Returns a reference to the next element in the list. + * + * @return a reference to the next element in the list. + */ + @Nullable + T getNext(); + + /** + * Sets the reference to the next element in the list. + * + * @param element the reference to the next element in the list. + */ + void setNext(@Nullable T element); + + /** + * Returns a reference to the previous element in the list. + * + * @return a reference to the previous element in the list. + */ + @Nullable + T getPrev(); + + /** + * Sets the reference to the previous element in the list. + * + * @param element the reference to the previous element in the list. + */ + void setPrev(@Nullable T element); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/internal/RandomHandler.java b/impl_core/src/main/java/io/opencensus/implcore/trace/internal/RandomHandler.java new file mode 100644 index 00000000..70be5a90 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/internal/RandomHandler.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.internal; + +import java.security.SecureRandom; +import java.util.Random; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Abstract class to access the current {@link Random}. + * + * <p>Implementation can have a per thread instance or a single global instance. + */ +@ThreadSafe +public abstract class RandomHandler { + /** + * Returns the current {@link Random}. + * + * @return the current {@code Random}. + */ + public abstract Random current(); + + /** Implementation of the {@link RandomHandler} using {@link SecureRandom}. */ + @ThreadSafe + public static final class SecureRandomHandler extends RandomHandler { + private final Random random = new SecureRandom(); + + /** Constructs a new {@link SecureRandomHandler}. */ + public SecureRandomHandler() {} + + @Override + public Random current() { + return random; + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/B3Format.java b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/B3Format.java new file mode 100644 index 00000000..d928d93c --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/B3Format.java @@ -0,0 +1,113 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.propagation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import io.opencensus.trace.SpanContext; +import io.opencensus.trace.SpanId; +import io.opencensus.trace.TraceId; +import io.opencensus.trace.TraceOptions; +import io.opencensus.trace.Tracestate; +import io.opencensus.trace.propagation.SpanContextParseException; +import io.opencensus.trace.propagation.TextFormat; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/*>>> +import org.checkerframework.checker.nullness.qual.NonNull; +*/ + +/** + * Implementation of the B3 propagation protocol. See <a + * href=https://github.com/openzipkin/b3-propagation>b3-propagation</a>. + */ +final class B3Format extends TextFormat { + private static final Tracestate TRACESTATE_DEFAULT = Tracestate.builder().build(); + @VisibleForTesting static final String X_B3_TRACE_ID = "X-B3-TraceId"; + @VisibleForTesting static final String X_B3_SPAN_ID = "X-B3-SpanId"; + @VisibleForTesting static final String X_B3_PARENT_SPAN_ID = "X-B3-ParentSpanId"; + @VisibleForTesting static final String X_B3_SAMPLED = "X-B3-Sampled"; + @VisibleForTesting static final String X_B3_FLAGS = "X-B3-Flags"; + private static final List<String> FIELDS = + Collections.unmodifiableList( + Arrays.asList( + X_B3_TRACE_ID, X_B3_SPAN_ID, X_B3_PARENT_SPAN_ID, X_B3_SAMPLED, X_B3_FLAGS)); + + // Used as the upper TraceId.SIZE hex characters of the traceID. B3-propagation used to send + // TraceId.SIZE hex characters (8-bytes traceId) in the past. + private static final String UPPER_TRACE_ID = "0000000000000000"; + // Sampled value via the X_B3_SAMPLED header. + private static final String SAMPLED_VALUE = "1"; + // "Debug" sampled value. + private static final String FLAGS_VALUE = "1"; + + @Override + public List<String> fields() { + return FIELDS; + } + + @Override + public <C /*>>> extends @NonNull Object*/> void inject( + SpanContext spanContext, C carrier, Setter<C> setter) { + checkNotNull(spanContext, "spanContext"); + checkNotNull(setter, "setter"); + checkNotNull(carrier, "carrier"); + setter.put(carrier, X_B3_TRACE_ID, spanContext.getTraceId().toLowerBase16()); + setter.put(carrier, X_B3_SPAN_ID, spanContext.getSpanId().toLowerBase16()); + if (spanContext.getTraceOptions().isSampled()) { + setter.put(carrier, X_B3_SAMPLED, SAMPLED_VALUE); + } + } + + @Override + public <C /*>>> extends @NonNull Object*/> SpanContext extract(C carrier, Getter<C> getter) + throws SpanContextParseException { + checkNotNull(carrier, "carrier"); + checkNotNull(getter, "getter"); + try { + TraceId traceId; + String traceIdStr = getter.get(carrier, X_B3_TRACE_ID); + if (traceIdStr != null) { + if (traceIdStr.length() == TraceId.SIZE) { + // This is an 8-byte traceID. + traceIdStr = UPPER_TRACE_ID + traceIdStr; + } + traceId = TraceId.fromLowerBase16(traceIdStr); + } else { + throw new SpanContextParseException("Missing X_B3_TRACE_ID."); + } + SpanId spanId; + String spanIdStr = getter.get(carrier, X_B3_SPAN_ID); + if (spanIdStr != null) { + spanId = SpanId.fromLowerBase16(spanIdStr); + } else { + throw new SpanContextParseException("Missing X_B3_SPAN_ID."); + } + TraceOptions traceOptions = TraceOptions.DEFAULT; + if (SAMPLED_VALUE.equals(getter.get(carrier, X_B3_SAMPLED)) + || FLAGS_VALUE.equals(getter.get(carrier, X_B3_FLAGS))) { + traceOptions = TraceOptions.builder().setIsSampled(true).build(); + } + return SpanContext.create(traceId, spanId, traceOptions, TRACESTATE_DEFAULT); + } catch (IllegalArgumentException e) { + throw new SpanContextParseException("Invalid input.", e); + } + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/BinaryFormatImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/BinaryFormatImpl.java new file mode 100644 index 00000000..233fbd31 --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/BinaryFormatImpl.java @@ -0,0 +1,148 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.propagation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import io.opencensus.trace.SpanContext; +import io.opencensus.trace.SpanId; +import io.opencensus.trace.TraceId; +import io.opencensus.trace.TraceOptions; +import io.opencensus.trace.Tracestate; +import io.opencensus.trace.propagation.BinaryFormat; +import io.opencensus.trace.propagation.SpanContextParseException; + +/** + * Implementation of the {@link BinaryFormat}. + * + * <p>BinaryFormat format: + * + * <ul> + * <li>Binary value: <version_id><version_format> + * <li>version_id: 1-byte representing the version id. + * <li>For version_id = 0: + * <ul> + * <li>version_format: <field><field> + * <li>field_format: <field_id><field_format> + * <li>Fields: + * <ul> + * <li>TraceId: (field_id = 0, len = 16, default = "0000000000000000") - + * 16-byte array representing the trace_id. + * <li>SpanId: (field_id = 1, len = 8, default = "00000000") - 8-byte array + * representing the span_id. + * <li>TraceOptions: (field_id = 2, len = 1, default = "0") - 1-byte array + * representing the trace_options. + * </ul> + * <li>Fields MUST be encoded using the field id order (smaller to higher). + * <li>Valid value example: + * <ul> + * <li>{0, 0, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 1, 97, + * 98, 99, 100, 101, 102, 103, 104, 2, 1} + * <li>version_id = 0; + * <li>trace_id = {64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79} + * <li>span_id = {97, 98, 99, 100, 101, 102, 103, 104}; + * <li>trace_options = {1}; + * </ul> + * </ul> + * </ul> + */ +final class BinaryFormatImpl extends BinaryFormat { + private static final Tracestate TRACESTATE_DEFAULT = Tracestate.builder().build(); + private static final byte VERSION_ID = 0; + private static final int VERSION_ID_OFFSET = 0; + // The version_id/field_id size in bytes. + private static final byte ID_SIZE = 1; + private static final byte TRACE_ID_FIELD_ID = 0; + + // TODO: clarify if offsets are correct here. While the specification suggests you should stop + // parsing when you hit an unknown field, it does not suggest that fields must be declared in + // ID order. Rather it only groups by data type order, in this case Trace Context + // https://github.com/census-instrumentation/opencensus-specs/blob/master/encodings/BinaryEncoding.md#deserialization-rules + @VisibleForTesting static final int TRACE_ID_FIELD_ID_OFFSET = VERSION_ID_OFFSET + ID_SIZE; + + private static final int TRACE_ID_OFFSET = TRACE_ID_FIELD_ID_OFFSET + ID_SIZE; + private static final byte SPAN_ID_FIELD_ID = 1; + + @VisibleForTesting static final int SPAN_ID_FIELD_ID_OFFSET = TRACE_ID_OFFSET + TraceId.SIZE; + + private static final int SPAN_ID_OFFSET = SPAN_ID_FIELD_ID_OFFSET + ID_SIZE; + private static final byte TRACE_OPTION_FIELD_ID = 2; + + @VisibleForTesting static final int TRACE_OPTION_FIELD_ID_OFFSET = SPAN_ID_OFFSET + SpanId.SIZE; + + private static final int TRACE_OPTIONS_OFFSET = TRACE_OPTION_FIELD_ID_OFFSET + ID_SIZE; + /** Version, Trace and Span IDs are required fields. */ + private static final int REQUIRED_FORMAT_LENGTH = 3 * ID_SIZE + TraceId.SIZE + SpanId.SIZE; + /** Use {@link TraceOptions#DEFAULT} unless its optional field is present. */ + private static final int ALL_FORMAT_LENGTH = REQUIRED_FORMAT_LENGTH + ID_SIZE + TraceOptions.SIZE; + + @Override + public byte[] toByteArray(SpanContext spanContext) { + checkNotNull(spanContext, "spanContext"); + byte[] bytes = new byte[ALL_FORMAT_LENGTH]; + bytes[VERSION_ID_OFFSET] = VERSION_ID; + bytes[TRACE_ID_FIELD_ID_OFFSET] = TRACE_ID_FIELD_ID; + spanContext.getTraceId().copyBytesTo(bytes, TRACE_ID_OFFSET); + bytes[SPAN_ID_FIELD_ID_OFFSET] = SPAN_ID_FIELD_ID; + spanContext.getSpanId().copyBytesTo(bytes, SPAN_ID_OFFSET); + bytes[TRACE_OPTION_FIELD_ID_OFFSET] = TRACE_OPTION_FIELD_ID; + spanContext.getTraceOptions().copyBytesTo(bytes, TRACE_OPTIONS_OFFSET); + return bytes; + } + + @Override + public SpanContext fromByteArray(byte[] bytes) throws SpanContextParseException { + checkNotNull(bytes, "bytes"); + if (bytes.length == 0 || bytes[0] != VERSION_ID) { + throw new SpanContextParseException("Unsupported version."); + } + if (bytes.length < REQUIRED_FORMAT_LENGTH) { + throw new SpanContextParseException("Invalid input: truncated"); + } + // TODO: the following logic assumes that fields are written in ID order. The spec does not say + // that. If it decides not to, this logic would need to be more like a loop + TraceId traceId; + SpanId spanId; + TraceOptions traceOptions = TraceOptions.DEFAULT; + int pos = 1; + if (bytes[pos] == TRACE_ID_FIELD_ID) { + traceId = TraceId.fromBytes(bytes, pos + ID_SIZE); + pos += ID_SIZE + TraceId.SIZE; + } else { + // TODO: update the spec to suggest that the trace ID is not actually optional + throw new SpanContextParseException("Invalid input: expected trace ID at offset " + pos); + } + if (bytes[pos] == SPAN_ID_FIELD_ID) { + spanId = SpanId.fromBytes(bytes, pos + ID_SIZE); + pos += ID_SIZE + SpanId.SIZE; + } else { + // TODO: update the spec to suggest that the span ID is not actually optional. + throw new SpanContextParseException("Invalid input: expected span ID at offset " + pos); + } + // Check to see if we are long enough to include an options field, and also that the next field + // is an options field. Per spec we simply stop parsing at first unknown field instead of + // failing. + if (bytes.length > pos && bytes[pos] == TRACE_OPTION_FIELD_ID) { + if (bytes.length < ALL_FORMAT_LENGTH) { + throw new SpanContextParseException("Invalid input: truncated"); + } + traceOptions = TraceOptions.fromByte(bytes[pos + ID_SIZE]); + } + return SpanContext.create(traceId, spanId, traceOptions, TRACESTATE_DEFAULT); + } +} diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/PropagationComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/PropagationComponentImpl.java new file mode 100644 index 00000000..f608543d --- /dev/null +++ b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/PropagationComponentImpl.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017, OpenCensus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opencensus.implcore.trace.propagation; + +import io.opencensus.trace.propagation.BinaryFormat; +import io.opencensus.trace.propagation.PropagationComponent; +import io.opencensus.trace.propagation.TextFormat; + +/** Implementation of the {@link PropagationComponent}. */ +public class PropagationComponentImpl extends PropagationComponent { + private final BinaryFormat binaryFormat = new BinaryFormatImpl(); + private final B3Format b3Format = new B3Format(); + + @Override + public BinaryFormat getBinaryFormat() { + return binaryFormat; + } + + @Override + public TextFormat getB3Format() { + return b3Format; + } +} |