aboutsummaryrefslogtreecommitdiff
path: root/impl_core/src/main/java/io
diff options
context:
space:
mode:
Diffstat (limited to 'impl_core/src/main/java/io')
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/common/MillisClock.java48
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/CheckerFrameworkUtils.java33
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/CurrentState.java131
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/DaemonThreadFactory.java53
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java36
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/NoopScope.java38
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java32
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/TimestampConverter.java51
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/Utils.java41
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/internal/VarInt.java283
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedDoubleGaugeImpl.java155
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedLongGaugeImpl.java153
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/metrics/DoubleGaugeImpl.java174
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/metrics/LongGaugeImpl.java174
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/metrics/Meter.java34
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/metrics/MetricRegistryImpl.java160
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/metrics/MetricsComponentImplBase.java45
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/metrics/export/ExportComponentImpl.java31
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/metrics/export/MetricProducerManagerImpl.java64
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java95
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java66
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java138
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java194
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java38
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java118
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java556
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java464
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java241
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java92
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java104
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java36
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java56
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/tags/CurrentTagContextUtils.java71
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/tags/NoopTagContextBuilder.java51
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/tags/TagContextBuilderImpl.java60
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/tags/TagContextImpl.java85
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/tags/TagContextUtils.java33
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/tags/TaggerImpl.java116
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/tags/TagsComponentImplBase.java68
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/tags/propagation/SerializationUtils.java190
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagContextBinarySerializerImpl.java49
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagPropagationComponentImpl.java35
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/NoRecordEventsSpanImpl.java85
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/RecordEventsSpanImpl.java579
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/SpanBuilderImpl.java253
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java127
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/TraceComponentImplBase.java90
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/TracerImpl.java52
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/config/TraceConfigImpl.java43
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java93
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java81
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java396
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java71
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java81
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java214
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/internal/ConcurrentIntrusiveList.java181
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/internal/RandomHandler.java50
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/propagation/B3Format.java113
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/propagation/BinaryFormatImpl.java148
-rw-r--r--impl_core/src/main/java/io/opencensus/implcore/trace/propagation/PropagationComponentImpl.java37
60 files changed, 7386 insertions, 0 deletions
diff --git a/impl_core/src/main/java/io/opencensus/implcore/common/MillisClock.java b/impl_core/src/main/java/io/opencensus/implcore/common/MillisClock.java
new file mode 100644
index 00000000..98626926
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/common/MillisClock.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.common;
+
+import io.opencensus.common.Clock;
+import io.opencensus.common.Timestamp;
+import javax.annotation.concurrent.ThreadSafe;
+
+/** A {@link Clock} that uses {@link System#currentTimeMillis()} and {@link System#nanoTime()}. */
+@ThreadSafe
+public final class MillisClock extends Clock {
+ private static final MillisClock INSTANCE = new MillisClock();
+
+ private MillisClock() {}
+
+ /**
+ * Returns a {@code MillisClock}.
+ *
+ * @return a {@code MillisClock}.
+ */
+ public static MillisClock getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public Timestamp now() {
+ return Timestamp.fromMillis(System.currentTimeMillis());
+ }
+
+ @Override
+ public long nowNanos() {
+ return System.nanoTime();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/CheckerFrameworkUtils.java b/impl_core/src/main/java/io/opencensus/implcore/internal/CheckerFrameworkUtils.java
new file mode 100644
index 00000000..f08289cf
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/CheckerFrameworkUtils.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.internal;
+
+import javax.annotation.Nullable;
+
+/**
+ * Utility methods for suppressing nullness warnings and working around Checker Framework issues.
+ */
+public final class CheckerFrameworkUtils {
+ private CheckerFrameworkUtils() {}
+
+ /** Suppresses warnings about a nullable value. */
+ // TODO(sebright): Try to remove all uses of this method.
+ @SuppressWarnings("nullness")
+ public static <T> T castNonNull(@Nullable T arg) {
+ return arg;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/CurrentState.java b/impl_core/src/main/java/io/opencensus/implcore/internal/CurrentState.java
new file mode 100644
index 00000000..d7b1b112
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/CurrentState.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.internal;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.ThreadSafe;
+
+/** The current state base implementation for stats and tags. */
+@ThreadSafe
+public final class CurrentState {
+
+ /** Current state for stats or tags. */
+ public enum State {
+ /** State that fully enables stats collection or tag propagation. */
+ ENABLED,
+
+ /** State that disables stats collection or tag propagation. */
+ DISABLED
+ }
+
+ private enum InternalState {
+ // Enabled and not read.
+ ENABLED_NOT_READ(State.ENABLED, false),
+
+ // Enabled and read.
+ ENABLED_READ(State.ENABLED, true),
+
+ // Disable and not read.
+ DISABLED_NOT_READ(State.DISABLED, false),
+
+ // Disable and read.
+ DISABLED_READ(State.DISABLED, true);
+
+ private final State state;
+ private final boolean isRead;
+
+ InternalState(State state, boolean isRead) {
+ this.state = state;
+ this.isRead = isRead;
+ }
+ }
+
+ private final AtomicReference<InternalState> currentInternalState;
+
+ /**
+ * Constructs a new {@code CurrentState}.
+ *
+ * @param defaultState the default initial state.
+ */
+ public CurrentState(State defaultState) {
+ this.currentInternalState =
+ new AtomicReference<InternalState>(
+ defaultState == State.ENABLED
+ ? InternalState.ENABLED_NOT_READ
+ : InternalState.DISABLED_NOT_READ);
+ }
+
+ /**
+ * Returns the current state and updates the status as being read.
+ *
+ * @return the current state and updates the status as being read.
+ */
+ public State get() {
+ InternalState internalState = currentInternalState.get();
+ while (!internalState.isRead) {
+ // Slow path, the state is first time read. Change the state only if no other changes
+ // happened between the moment initialState is read and this moment. This ensures that this
+ // method only changes the isRead part of the internal state.
+ currentInternalState.compareAndSet(
+ internalState,
+ internalState.state == State.ENABLED
+ ? InternalState.ENABLED_READ
+ : InternalState.DISABLED_READ);
+ internalState = currentInternalState.get();
+ }
+ return internalState.state;
+ }
+
+ /**
+ * Returns the current state without updating the status as being read.
+ *
+ * @return the current state without updating the status as being read.
+ */
+ public State getInternal() {
+ return currentInternalState.get().state;
+ }
+
+ /**
+ * Sets current state to the given state. Returns true if the current state is changed, false
+ * otherwise.
+ *
+ * @param state the state to be set.
+ * @return true if the current state is changed, false otherwise.
+ */
+ public boolean set(State state) {
+ while (true) {
+ InternalState internalState = currentInternalState.get();
+ checkState(!internalState.isRead, "State was already read, cannot set state.");
+ if (state == internalState.state) {
+ return false;
+ } else {
+ if (!currentInternalState.compareAndSet(
+ internalState,
+ state == State.ENABLED
+ ? InternalState.ENABLED_NOT_READ
+ : InternalState.DISABLED_NOT_READ)) {
+ // The state was changed between the moment the internalState was read and this point.
+ // Some conditions may be not correct, reset at the beginning and recheck all conditions.
+ continue;
+ }
+ return true;
+ }
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/DaemonThreadFactory.java b/impl_core/src/main/java/io/opencensus/implcore/internal/DaemonThreadFactory.java
new file mode 100644
index 00000000..2baa5000
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/DaemonThreadFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.internal;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** A {@link ThreadFactory} implementation that starts all {@link Thread} as daemons. */
+public final class DaemonThreadFactory implements ThreadFactory {
+ // AppEngine runtimes have constraints on threading and socket handling
+ // that need to be accommodated.
+ public static final boolean IS_RESTRICTED_APPENGINE =
+ System.getProperty("com.google.appengine.runtime.environment") != null
+ && "1.7".equals(System.getProperty("java.specification.version"));
+ private static final String DELIMITER = "-";
+ private static final ThreadFactory threadFactory = MoreExecutors.platformThreadFactory();
+ private final AtomicInteger threadIdGen = new AtomicInteger();
+ private final String threadPrefix;
+
+ /**
+ * Constructs a new {@code DaemonThreadFactory}.
+ *
+ * @param threadPrefix used to prefix all thread names. (E.g. "CensusDisruptor").
+ */
+ public DaemonThreadFactory(String threadPrefix) {
+ this.threadPrefix = threadPrefix + DELIMITER;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = threadFactory.newThread(r);
+ if (!IS_RESTRICTED_APPENGINE) {
+ thread.setName(threadPrefix + threadIdGen.getAndIncrement());
+ thread.setDaemon(true);
+ }
+ return thread;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java
new file mode 100644
index 00000000..6eb1149a
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/EventQueue.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.internal;
+
+/** A queue that processes events. See {@code DisruptorEventQueue} for an example. */
+public interface EventQueue {
+ void enqueue(Entry entry);
+
+ void shutdown();
+
+ /**
+ * Base interface to be used for all entries in {@link EventQueue}. For example usage, see {@code
+ * DisruptorEventQueue}.
+ */
+ interface Entry {
+ /**
+ * Process the event associated with this entry. This will be called for every event in the
+ * associated {@link EventQueue}.
+ */
+ void process();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/NoopScope.java b/impl_core/src/main/java/io/opencensus/implcore/internal/NoopScope.java
new file mode 100644
index 00000000..51efe894
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/NoopScope.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.internal;
+
+import io.opencensus.common.Scope;
+
+/** A {@link Scope} that does nothing when it is created or closed. */
+public final class NoopScope implements Scope {
+ private static final Scope INSTANCE = new NoopScope();
+
+ private NoopScope() {}
+
+ /**
+ * Returns a {@code NoopScope}.
+ *
+ * @return a {@code NoopScope}.
+ */
+ public static Scope getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java b/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java
new file mode 100644
index 00000000..58c61c89
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/SimpleEventQueue.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.internal;
+
+/**
+ * An {@link EventQueue} that processes events in the current thread. This class can be used for
+ * testing.
+ */
+public class SimpleEventQueue implements EventQueue {
+
+ @Override
+ public void enqueue(Entry entry) {
+ entry.process();
+ }
+
+ @Override
+ public void shutdown() {}
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/TimestampConverter.java b/impl_core/src/main/java/io/opencensus/implcore/internal/TimestampConverter.java
new file mode 100644
index 00000000..c70f5860
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/TimestampConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.internal;
+
+import io.opencensus.common.Clock;
+import io.opencensus.common.Timestamp;
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * This class provides a mechanism for converting {@link System#nanoTime() nanoTime} values to
+ * {@link Timestamp}.
+ */
+@Immutable
+public final class TimestampConverter {
+ private final Timestamp timestamp;
+ private final long nanoTime;
+
+ // Returns a WallTimeConverter initialized to now.
+ public static TimestampConverter now(Clock clock) {
+ return new TimestampConverter(clock.now(), clock.nowNanos());
+ }
+
+ /**
+ * Converts a {@link System#nanoTime() nanoTime} value to {@link Timestamp}.
+ *
+ * @param nanoTime value to convert.
+ * @return the {@code Timestamp} representation of the {@code time}.
+ */
+ public Timestamp convertNanoTime(long nanoTime) {
+ return timestamp.addNanos(nanoTime - this.nanoTime);
+ }
+
+ private TimestampConverter(Timestamp timestamp, long nanoTime) {
+ this.timestamp = timestamp;
+ this.nanoTime = nanoTime;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/Utils.java b/impl_core/src/main/java/io/opencensus/implcore/internal/Utils.java
new file mode 100644
index 00000000..05a039b9
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/Utils.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.internal;
+
+import java.util.List;
+
+/** General internal utility methods. */
+public final class Utils {
+
+ private Utils() {}
+
+ /**
+ * Throws a {@link NullPointerException} if any of the list elements is null.
+ *
+ * @param list the argument list to check for null.
+ * @param errorMessage the message to use for the exception. Will be converted to a string using
+ * {@link String#valueOf(Object)}.
+ */
+ public static <T> void checkListElementNotNull(
+ List<T> list, @javax.annotation.Nullable Object errorMessage) {
+ for (T element : list) {
+ if (element == null) {
+ throw new NullPointerException(String.valueOf(errorMessage));
+ }
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/internal/VarInt.java b/impl_core/src/main/java/io/opencensus/implcore/internal/VarInt.java
new file mode 100644
index 00000000..944f62fd
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/internal/VarInt.java
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2016-17, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/** Common methods to encode and decode varints and varlongs into ByteBuffers and arrays. */
+// CHECKSTYLE:OFF
+@SuppressWarnings("UngroupedOverloads")
+public class VarInt {
+
+ /** Maximum encoded size of 32-bit positive integers (in bytes) */
+ public static final int MAX_VARINT_SIZE = 5;
+
+ /** maximum encoded size of 64-bit longs, and negative 32-bit ints (in bytes) */
+ public static final int MAX_VARLONG_SIZE = 10;
+
+ private VarInt() {}
+
+ /**
+ * Returns the encoding size in bytes of its input value.
+ *
+ * @param i the integer to be measured
+ * @return the encoding size in bytes of its input value
+ */
+ public static int varIntSize(int i) {
+ int result = 0;
+ do {
+ result++;
+ i >>>= 7;
+ } while (i != 0);
+ return result;
+ }
+
+ /**
+ * Reads a varint from src, places its values into the first element of dst and returns the offset
+ * in to src of the first byte after the varint.
+ *
+ * @param src source buffer to retrieve from
+ * @param offset offset within src
+ * @param dst the resulting int value
+ * @return the updated offset after reading the varint
+ */
+ public static int getVarInt(byte[] src, int offset, int[] dst) {
+ int result = 0;
+ int shift = 0;
+ int b;
+ do {
+ if (shift >= 32) {
+ // Out of range
+ throw new IndexOutOfBoundsException("varint too long");
+ }
+ // Get 7 bits from next byte
+ b = src[offset++];
+ result |= (b & 0x7F) << shift;
+ shift += 7;
+ } while ((b & 0x80) != 0);
+ dst[0] = result;
+ return offset;
+ }
+
+ /**
+ * Encodes an integer in a variable-length encoding, 7 bits per byte, into a destination byte[],
+ * following the protocol buffer convention.
+ *
+ * @param v the int value to write to sink
+ * @param sink the sink buffer to write to
+ * @param offset the offset within sink to begin writing
+ * @return the updated offset after writing the varint
+ */
+ public static int putVarInt(int v, byte[] sink, int offset) {
+ do {
+ // Encode next 7 bits + terminator bit
+ int bits = v & 0x7F;
+ v >>>= 7;
+ byte b = (byte) (bits + ((v != 0) ? 0x80 : 0));
+ sink[offset++] = b;
+ } while (v != 0);
+ return offset;
+ }
+
+ /**
+ * Reads a varint from the current position of the given ByteBuffer and returns the decoded value
+ * as 32 bit integer.
+ *
+ * <p>The position of the buffer is advanced to the first byte after the decoded varint.
+ *
+ * @param src the ByteBuffer to get the var int from
+ * @return The integer value of the decoded varint
+ */
+ public static int getVarInt(ByteBuffer src) {
+ int tmp;
+ if ((tmp = src.get()) >= 0) {
+ return tmp;
+ }
+ int result = tmp & 0x7f;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ result |= (tmp = src.get()) << 28;
+ while (tmp < 0) {
+ // We get into this loop only in the case of overflow.
+ // By doing this, we can call getVarInt() instead of
+ // getVarLong() when we only need an int.
+ tmp = src.get();
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Encodes an integer in a variable-length encoding, 7 bits per byte, to a ByteBuffer sink.
+ *
+ * @param v the value to encode
+ * @param sink the ByteBuffer to add the encoded value
+ */
+ public static void putVarInt(int v, ByteBuffer sink) {
+ while (true) {
+ int bits = v & 0x7f;
+ v >>>= 7;
+ if (v == 0) {
+ sink.put((byte) bits);
+ return;
+ }
+ sink.put((byte) (bits | 0x80));
+ }
+ }
+
+ /**
+ * Reads a varint from the given InputStream and returns the decoded value as an int.
+ *
+ * @param inputStream the InputStream to read from
+ */
+ public static int getVarInt(InputStream inputStream) throws IOException {
+ int result = 0;
+ int shift = 0;
+ int b;
+ do {
+ if (shift >= 32) {
+ // Out of range
+ throw new IndexOutOfBoundsException("varint too long");
+ }
+ // Get 7 bits from next byte
+ b = inputStream.read();
+ result |= (b & 0x7F) << shift;
+ shift += 7;
+ } while ((b & 0x80) != 0);
+ return result;
+ }
+
+ /**
+ * Encodes an integer in a variable-length encoding, 7 bits per byte, and writes it to the given
+ * OutputStream.
+ *
+ * @param v the value to encode
+ * @param outputStream the OutputStream to write to
+ */
+ public static void putVarInt(int v, OutputStream outputStream) throws IOException {
+ byte[] bytes = new byte[varIntSize(v)];
+ putVarInt(v, bytes, 0);
+ outputStream.write(bytes);
+ }
+
+ /**
+ * Returns the encoding size in bytes of its input value.
+ *
+ * @param v the long to be measured
+ * @return the encoding size in bytes of a given long value.
+ */
+ public static int varLongSize(long v) {
+ int result = 0;
+ do {
+ result++;
+ v >>>= 7;
+ } while (v != 0);
+ return result;
+ }
+
+ /**
+ * Reads an up to 64 bit long varint from the current position of the given ByteBuffer and returns
+ * the decoded value as long.
+ *
+ * <p>The position of the buffer is advanced to the first byte after the decoded varint.
+ *
+ * @param src the ByteBuffer to get the var int from
+ * @return The integer value of the decoded long varint
+ */
+ public static long getVarLong(ByteBuffer src) {
+ long tmp;
+ if ((tmp = src.get()) >= 0) {
+ return tmp;
+ }
+ long result = tmp & 0x7f;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 28;
+ } else {
+ result |= (tmp & 0x7f) << 28;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 35;
+ } else {
+ result |= (tmp & 0x7f) << 35;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 42;
+ } else {
+ result |= (tmp & 0x7f) << 42;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 49;
+ } else {
+ result |= (tmp & 0x7f) << 49;
+ if ((tmp = src.get()) >= 0) {
+ result |= tmp << 56;
+ } else {
+ result |= (tmp & 0x7f) << 56;
+ result |= ((long) src.get()) << 63;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Encodes a long integer in a variable-length encoding, 7 bits per byte, to a ByteBuffer sink.
+ *
+ * @param v the value to encode
+ * @param sink the ByteBuffer to add the encoded value
+ */
+ public static void putVarLong(long v, ByteBuffer sink) {
+ while (true) {
+ int bits = ((int) v) & 0x7f;
+ v >>>= 7;
+ if (v == 0) {
+ sink.put((byte) bits);
+ return;
+ }
+ sink.put((byte) (bits | 0x80));
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedDoubleGaugeImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedDoubleGaugeImpl.java
new file mode 100644
index 00000000..b7104c9b
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedDoubleGaugeImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.metrics;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.common.Clock;
+import io.opencensus.common.ToDoubleFunction;
+import io.opencensus.implcore.internal.Utils;
+import io.opencensus.metrics.DerivedDoubleGauge;
+import io.opencensus.metrics.LabelKey;
+import io.opencensus.metrics.LabelValue;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.metrics.export.MetricDescriptor;
+import io.opencensus.metrics.export.MetricDescriptor.Type;
+import io.opencensus.metrics.export.Point;
+import io.opencensus.metrics.export.TimeSeries;
+import io.opencensus.metrics.export.Value;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+/** Implementation of {@link DerivedDoubleGauge}. */
+public final class DerivedDoubleGaugeImpl extends DerivedDoubleGauge implements Meter {
+ private final MetricDescriptor metricDescriptor;
+ private final int labelKeysSize;
+
+ @SuppressWarnings("rawtypes")
+ private volatile Map<List<LabelValue>, PointWithFunction> registeredPoints =
+ Collections.<List<LabelValue>, PointWithFunction>emptyMap();
+
+ DerivedDoubleGaugeImpl(String name, String description, String unit, List<LabelKey> labelKeys) {
+ labelKeysSize = labelKeys.size();
+ this.metricDescriptor =
+ MetricDescriptor.create(name, description, unit, Type.GAUGE_DOUBLE, labelKeys);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public synchronized <T> void createTimeSeries(
+ List<LabelValue> labelValues,
+ /*@Nullable*/ T obj,
+ ToDoubleFunction</*@Nullable*/ T> function) {
+ Utils.checkListElementNotNull(
+ checkNotNull(labelValues, "labelValues"), "labelValue element should not be null.");
+ checkArgument(labelKeysSize == labelValues.size(), "Incorrect number of labels.");
+ checkNotNull(function, "function");
+
+ List<LabelValue> labelValuesCopy =
+ Collections.<LabelValue>unmodifiableList(new ArrayList<LabelValue>(labelValues));
+
+ PointWithFunction existingPoint = registeredPoints.get(labelValuesCopy);
+ if (existingPoint != null) {
+ throw new IllegalArgumentException(
+ "A different time series with the same labels already exists.");
+ }
+
+ PointWithFunction newPoint = new PointWithFunction<T>(labelValuesCopy, obj, function);
+ // Updating the map of time series happens under a lock to avoid multiple add operations
+ // to happen in the same time.
+ Map<List<LabelValue>, PointWithFunction> registeredPointsCopy =
+ new LinkedHashMap<List<LabelValue>, PointWithFunction>(registeredPoints);
+ registeredPointsCopy.put(labelValuesCopy, newPoint);
+ registeredPoints = Collections.unmodifiableMap(registeredPointsCopy);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public synchronized void removeTimeSeries(List<LabelValue> labelValues) {
+ checkNotNull(labelValues, "labelValues");
+
+ Map<List<LabelValue>, PointWithFunction> registeredPointsCopy =
+ new LinkedHashMap<List<LabelValue>, PointWithFunction>(registeredPoints);
+ if (registeredPointsCopy.remove(labelValues) == null) {
+ // The element not present, no need to update the current map of time series.
+ return;
+ }
+ registeredPoints = Collections.unmodifiableMap(registeredPointsCopy);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public synchronized void clear() {
+ registeredPoints = Collections.<List<LabelValue>, PointWithFunction>emptyMap();
+ }
+
+ /*@Nullable*/
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Metric getMetric(Clock clock) {
+ Map<List<LabelValue>, PointWithFunction> currentRegisteredPoints = registeredPoints;
+ if (currentRegisteredPoints.isEmpty()) {
+ return null;
+ }
+
+ if (currentRegisteredPoints.size() == 1) {
+ PointWithFunction point = currentRegisteredPoints.values().iterator().next();
+ return Metric.createWithOneTimeSeries(metricDescriptor, point.getTimeSeries(clock));
+ }
+
+ List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(currentRegisteredPoints.size());
+ for (Map.Entry<List<LabelValue>, PointWithFunction> entry :
+ currentRegisteredPoints.entrySet()) {
+ timeSeriesList.add(entry.getValue().getTimeSeries(clock));
+ }
+ return Metric.create(metricDescriptor, timeSeriesList);
+ }
+
+ /** Implementation of {@link PointWithFunction} with an object and a callback function. */
+ public static final class PointWithFunction<T> {
+ private final List<LabelValue> labelValues;
+ @javax.annotation.Nullable private final WeakReference<T> ref;
+ private final ToDoubleFunction</*@Nullable*/ T> function;
+
+ PointWithFunction(
+ List<LabelValue> labelValues,
+ /*@Nullable*/ T obj,
+ ToDoubleFunction</*@Nullable*/ T> function) {
+ this.labelValues = labelValues;
+ ref = obj != null ? new WeakReference<T>(obj) : null;
+ this.function = function;
+ }
+
+ private TimeSeries getTimeSeries(Clock clock) {
+ final T obj = ref != null ? ref.get() : null;
+ double value = function.applyAsDouble(obj);
+
+ // TODO(mayurkale): OPTIMIZATION: Avoid re-evaluate the labelValues all the time (issue#1490).
+ return TimeSeries.createWithOnePoint(
+ labelValues, Point.create(Value.doubleValue(value), clock.now()), null);
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedLongGaugeImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedLongGaugeImpl.java
new file mode 100644
index 00000000..90e3e706
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/DerivedLongGaugeImpl.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.metrics;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.common.Clock;
+import io.opencensus.common.ToLongFunction;
+import io.opencensus.implcore.internal.Utils;
+import io.opencensus.metrics.DerivedLongGauge;
+import io.opencensus.metrics.LabelKey;
+import io.opencensus.metrics.LabelValue;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.metrics.export.MetricDescriptor;
+import io.opencensus.metrics.export.MetricDescriptor.Type;
+import io.opencensus.metrics.export.Point;
+import io.opencensus.metrics.export.TimeSeries;
+import io.opencensus.metrics.export.Value;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+/** Implementation of {@link DerivedLongGauge}. */
+public final class DerivedLongGaugeImpl extends DerivedLongGauge implements Meter {
+ private final MetricDescriptor metricDescriptor;
+ private final int labelKeysSize;
+
+ @SuppressWarnings("rawtypes")
+ private volatile Map<List<LabelValue>, PointWithFunction> registeredPoints =
+ Collections.<List<LabelValue>, PointWithFunction>emptyMap();
+
+ DerivedLongGaugeImpl(String name, String description, String unit, List<LabelKey> labelKeys) {
+ labelKeysSize = labelKeys.size();
+ this.metricDescriptor =
+ MetricDescriptor.create(name, description, unit, Type.GAUGE_INT64, labelKeys);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public synchronized <T> void createTimeSeries(
+ List<LabelValue> labelValues, /*@Nullable*/ T obj, ToLongFunction</*@Nullable*/ T> function) {
+ Utils.checkListElementNotNull(
+ checkNotNull(labelValues, "labelValues"), "labelValue element should not be null.");
+ checkArgument(labelKeysSize == labelValues.size(), "Incorrect number of labels.");
+ checkNotNull(function, "function");
+
+ List<LabelValue> labelValuesCopy =
+ Collections.unmodifiableList(new ArrayList<LabelValue>(labelValues));
+
+ PointWithFunction existingPoint = registeredPoints.get(labelValuesCopy);
+ if (existingPoint != null) {
+ throw new IllegalArgumentException(
+ "A different time series with the same labels already exists.");
+ }
+
+ PointWithFunction newPoint = new PointWithFunction<T>(labelValuesCopy, obj, function);
+ // Updating the map of time series happens under a lock to avoid multiple add operations
+ // to happen in the same time.
+ Map<List<LabelValue>, PointWithFunction> registeredPointsCopy =
+ new LinkedHashMap<List<LabelValue>, PointWithFunction>(registeredPoints);
+ registeredPointsCopy.put(labelValuesCopy, newPoint);
+ registeredPoints = Collections.unmodifiableMap(registeredPointsCopy);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public synchronized void removeTimeSeries(List<LabelValue> labelValues) {
+ checkNotNull(labelValues, "labelValues");
+
+ Map<List<LabelValue>, PointWithFunction> registeredPointsCopy =
+ new LinkedHashMap<List<LabelValue>, PointWithFunction>(registeredPoints);
+ if (registeredPointsCopy.remove(labelValues) == null) {
+ // The element not present, no need to update the current map of time series.
+ return;
+ }
+ registeredPoints = Collections.unmodifiableMap(registeredPointsCopy);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public synchronized void clear() {
+ registeredPoints = Collections.<List<LabelValue>, PointWithFunction>emptyMap();
+ }
+
+ /*@Nullable*/
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Metric getMetric(Clock clock) {
+ Map<List<LabelValue>, PointWithFunction> currentRegisteredPoints = registeredPoints;
+ if (currentRegisteredPoints.isEmpty()) {
+ return null;
+ }
+
+ if (currentRegisteredPoints.size() == 1) {
+ PointWithFunction point = currentRegisteredPoints.values().iterator().next();
+ return Metric.createWithOneTimeSeries(metricDescriptor, point.getTimeSeries(clock));
+ }
+
+ List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(currentRegisteredPoints.size());
+ for (Map.Entry<List<LabelValue>, PointWithFunction> entry :
+ currentRegisteredPoints.entrySet()) {
+ timeSeriesList.add(entry.getValue().getTimeSeries(clock));
+ }
+ return Metric.create(metricDescriptor, timeSeriesList);
+ }
+
+ /** Implementation of {@link PointWithFunction} with an object and a callback function. */
+ public static final class PointWithFunction<T> {
+ private final List<LabelValue> labelValues;
+ @javax.annotation.Nullable private final WeakReference<T> ref;
+ private final ToLongFunction</*@Nullable*/ T> function;
+
+ PointWithFunction(
+ List<LabelValue> labelValues,
+ /*@Nullable*/ T obj,
+ ToLongFunction</*@Nullable*/ T> function) {
+ this.labelValues = labelValues;
+ ref = obj != null ? new WeakReference<T>(obj) : null;
+ this.function = function;
+ }
+
+ private TimeSeries getTimeSeries(Clock clock) {
+ final T obj = ref != null ? ref.get() : null;
+ long value = function.applyAsLong(obj);
+
+ // TODO(mayurkale): OPTIMIZATION: Avoid re-evaluate the labelValues all the time (issue#1490).
+ return TimeSeries.createWithOnePoint(
+ labelValues, Point.create(Value.longValue(value), clock.now()), null);
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/DoubleGaugeImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/DoubleGaugeImpl.java
new file mode 100644
index 00000000..c314e980
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/DoubleGaugeImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.metrics;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AtomicDouble;
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.internal.Utils;
+import io.opencensus.metrics.DoubleGauge;
+import io.opencensus.metrics.LabelKey;
+import io.opencensus.metrics.LabelValue;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.metrics.export.MetricDescriptor;
+import io.opencensus.metrics.export.MetricDescriptor.Type;
+import io.opencensus.metrics.export.Point;
+import io.opencensus.metrics.export.TimeSeries;
+import io.opencensus.metrics.export.Value;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/** Implementation of {@link DoubleGauge}. */
+public final class DoubleGaugeImpl extends DoubleGauge implements Meter {
+ @VisibleForTesting static final LabelValue UNSET_VALUE = LabelValue.create(null);
+
+ private final MetricDescriptor metricDescriptor;
+ private volatile Map<List<LabelValue>, PointImpl> registeredPoints =
+ Collections.<List<LabelValue>, PointImpl>emptyMap();
+ private final int labelKeysSize;
+ private final List<LabelValue> defaultLabelValues;
+
+ DoubleGaugeImpl(String name, String description, String unit, List<LabelKey> labelKeys) {
+ labelKeysSize = labelKeys.size();
+ this.metricDescriptor =
+ MetricDescriptor.create(name, description, unit, Type.GAUGE_DOUBLE, labelKeys);
+
+ // initialize defaultLabelValues
+ defaultLabelValues = new ArrayList<LabelValue>(labelKeysSize);
+ for (int i = 0; i < labelKeysSize; i++) {
+ defaultLabelValues.add(UNSET_VALUE);
+ }
+ }
+
+ @Override
+ public DoublePoint getOrCreateTimeSeries(List<LabelValue> labelValues) {
+ // lock free point retrieval, if it is present
+ PointImpl existingPoint = registeredPoints.get(labelValues);
+ if (existingPoint != null) {
+ return existingPoint;
+ }
+
+ List<LabelValue> labelValuesCopy =
+ Collections.unmodifiableList(
+ new ArrayList<LabelValue>(checkNotNull(labelValues, "labelValues")));
+ return registerTimeSeries(labelValuesCopy);
+ }
+
+ @Override
+ public DoublePoint getDefaultTimeSeries() {
+ // lock free default point retrieval, if it is present
+ PointImpl existingPoint = registeredPoints.get(defaultLabelValues);
+ if (existingPoint != null) {
+ return existingPoint;
+ }
+ return registerTimeSeries(Collections.unmodifiableList(defaultLabelValues));
+ }
+
+ @Override
+ public synchronized void removeTimeSeries(List<LabelValue> labelValues) {
+ checkNotNull(labelValues, "labelValues");
+
+ Map<List<LabelValue>, PointImpl> registeredPointsCopy =
+ new LinkedHashMap<List<LabelValue>, PointImpl>(registeredPoints);
+ if (registeredPointsCopy.remove(labelValues) == null) {
+ // The element not present, no need to update the current map of points.
+ return;
+ }
+ registeredPoints = Collections.unmodifiableMap(registeredPointsCopy);
+ }
+
+ @Override
+ public synchronized void clear() {
+ registeredPoints = Collections.<List<LabelValue>, PointImpl>emptyMap();
+ }
+
+ private synchronized DoublePoint registerTimeSeries(List<LabelValue> labelValues) {
+ PointImpl existingPoint = registeredPoints.get(labelValues);
+ if (existingPoint != null) {
+ // Return a Point that are already registered. This can happen if a multiple threads
+ // concurrently try to register the same {@code TimeSeries}.
+ return existingPoint;
+ }
+
+ checkArgument(labelKeysSize == labelValues.size(), "Incorrect number of labels.");
+ Utils.checkListElementNotNull(labelValues, "labelValue element should not be null.");
+
+ PointImpl newPoint = new PointImpl(labelValues);
+ // Updating the map of points happens under a lock to avoid multiple add operations
+ // to happen in the same time.
+ Map<List<LabelValue>, PointImpl> registeredPointsCopy =
+ new LinkedHashMap<List<LabelValue>, PointImpl>(registeredPoints);
+ registeredPointsCopy.put(labelValues, newPoint);
+ registeredPoints = Collections.unmodifiableMap(registeredPointsCopy);
+
+ return newPoint;
+ }
+
+ @Nullable
+ @Override
+ public Metric getMetric(Clock clock) {
+ Map<List<LabelValue>, PointImpl> currentRegisteredPoints = registeredPoints;
+ if (currentRegisteredPoints.isEmpty()) {
+ return null;
+ }
+
+ if (currentRegisteredPoints.size() == 1) {
+ PointImpl point = currentRegisteredPoints.values().iterator().next();
+ return Metric.createWithOneTimeSeries(metricDescriptor, point.getTimeSeries(clock));
+ }
+
+ List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(currentRegisteredPoints.size());
+ for (Map.Entry<List<LabelValue>, PointImpl> entry : currentRegisteredPoints.entrySet()) {
+ timeSeriesList.add(entry.getValue().getTimeSeries(clock));
+ }
+ return Metric.create(metricDescriptor, timeSeriesList);
+ }
+
+ /** Implementation of {@link DoubleGauge.DoublePoint}. */
+ public static final class PointImpl extends DoublePoint {
+
+ // TODO(mayurkale): Consider to use DoubleAdder here, once we upgrade to Java8.
+ private final AtomicDouble value = new AtomicDouble(0);
+ private final List<LabelValue> labelValues;
+
+ PointImpl(List<LabelValue> labelValues) {
+ this.labelValues = labelValues;
+ }
+
+ @Override
+ public void add(double amt) {
+ value.addAndGet(amt);
+ }
+
+ @Override
+ public void set(double val) {
+ value.set(val);
+ }
+
+ private TimeSeries getTimeSeries(Clock clock) {
+ return TimeSeries.createWithOnePoint(
+ labelValues, Point.create(Value.doubleValue(value.get()), clock.now()), null);
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/LongGaugeImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/LongGaugeImpl.java
new file mode 100644
index 00000000..3460d7a4
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/LongGaugeImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.metrics;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.internal.Utils;
+import io.opencensus.metrics.LabelKey;
+import io.opencensus.metrics.LabelValue;
+import io.opencensus.metrics.LongGauge;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.metrics.export.MetricDescriptor;
+import io.opencensus.metrics.export.MetricDescriptor.Type;
+import io.opencensus.metrics.export.Point;
+import io.opencensus.metrics.export.TimeSeries;
+import io.opencensus.metrics.export.Value;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+
+/** Implementation of {@link LongGauge}. */
+public final class LongGaugeImpl extends LongGauge implements Meter {
+ @VisibleForTesting static final LabelValue UNSET_VALUE = LabelValue.create(null);
+
+ private final MetricDescriptor metricDescriptor;
+ private volatile Map<List<LabelValue>, PointImpl> registeredPoints =
+ Collections.<List<LabelValue>, PointImpl>emptyMap();
+ private final int labelKeysSize;
+ private final List<LabelValue> defaultLabelValues;
+
+ LongGaugeImpl(String name, String description, String unit, List<LabelKey> labelKeys) {
+ labelKeysSize = labelKeys.size();
+ this.metricDescriptor =
+ MetricDescriptor.create(name, description, unit, Type.GAUGE_INT64, labelKeys);
+
+ // initialize defaultLabelValues
+ defaultLabelValues = new ArrayList<LabelValue>(labelKeysSize);
+ for (int i = 0; i < labelKeysSize; i++) {
+ defaultLabelValues.add(UNSET_VALUE);
+ }
+ }
+
+ @Override
+ public LongPoint getOrCreateTimeSeries(List<LabelValue> labelValues) {
+ // lock free point retrieval, if it is present
+ PointImpl existingPoint = registeredPoints.get(labelValues);
+ if (existingPoint != null) {
+ return existingPoint;
+ }
+
+ List<LabelValue> labelValuesCopy =
+ Collections.unmodifiableList(
+ new ArrayList<LabelValue>(checkNotNull(labelValues, "labelValues")));
+ return registerTimeSeries(labelValuesCopy);
+ }
+
+ @Override
+ public LongPoint getDefaultTimeSeries() {
+ // lock free default point retrieval, if it is present
+ PointImpl existingPoint = registeredPoints.get(defaultLabelValues);
+ if (existingPoint != null) {
+ return existingPoint;
+ }
+ return registerTimeSeries(Collections.unmodifiableList(defaultLabelValues));
+ }
+
+ @Override
+ public synchronized void removeTimeSeries(List<LabelValue> labelValues) {
+ checkNotNull(labelValues, "labelValues");
+
+ Map<List<LabelValue>, PointImpl> registeredPointsCopy =
+ new LinkedHashMap<List<LabelValue>, PointImpl>(registeredPoints);
+ if (registeredPointsCopy.remove(labelValues) == null) {
+ // The element not present, no need to update the current map of points.
+ return;
+ }
+ registeredPoints = Collections.unmodifiableMap(registeredPointsCopy);
+ }
+
+ @Override
+ public synchronized void clear() {
+ registeredPoints = Collections.<List<LabelValue>, PointImpl>emptyMap();
+ }
+
+ private synchronized LongPoint registerTimeSeries(List<LabelValue> labelValues) {
+ PointImpl existingPoint = registeredPoints.get(labelValues);
+ if (existingPoint != null) {
+ // Return a Point that are already registered. This can happen if a multiple threads
+ // concurrently try to register the same {@code TimeSeries}.
+ return existingPoint;
+ }
+
+ checkArgument(labelKeysSize == labelValues.size(), "Incorrect number of labels.");
+ Utils.checkListElementNotNull(labelValues, "labelValue element should not be null.");
+
+ PointImpl newPoint = new PointImpl(labelValues);
+ // Updating the map of points happens under a lock to avoid multiple add operations
+ // to happen in the same time.
+ Map<List<LabelValue>, PointImpl> registeredPointsCopy =
+ new LinkedHashMap<List<LabelValue>, PointImpl>(registeredPoints);
+ registeredPointsCopy.put(labelValues, newPoint);
+ registeredPoints = Collections.unmodifiableMap(registeredPointsCopy);
+
+ return newPoint;
+ }
+
+ @Nullable
+ @Override
+ public Metric getMetric(Clock clock) {
+ Map<List<LabelValue>, PointImpl> currentRegisteredPoints = registeredPoints;
+ if (currentRegisteredPoints.isEmpty()) {
+ return null;
+ }
+
+ if (currentRegisteredPoints.size() == 1) {
+ PointImpl point = currentRegisteredPoints.values().iterator().next();
+ return Metric.createWithOneTimeSeries(metricDescriptor, point.getTimeSeries(clock));
+ }
+
+ List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>(currentRegisteredPoints.size());
+ for (Map.Entry<List<LabelValue>, PointImpl> entry : currentRegisteredPoints.entrySet()) {
+ timeSeriesList.add(entry.getValue().getTimeSeries(clock));
+ }
+ return Metric.create(metricDescriptor, timeSeriesList);
+ }
+
+ /** Implementation of {@link LongGauge.LongPoint}. */
+ public static final class PointImpl extends LongPoint {
+
+ // TODO(mayurkale): Consider to use LongAdder here, once we upgrade to Java8.
+ private final AtomicLong value = new AtomicLong(0);
+ private final List<LabelValue> labelValues;
+
+ PointImpl(List<LabelValue> labelValues) {
+ this.labelValues = labelValues;
+ }
+
+ @Override
+ public void add(long amt) {
+ value.addAndGet(amt);
+ }
+
+ @Override
+ public void set(long val) {
+ value.set(val);
+ }
+
+ private TimeSeries getTimeSeries(Clock clock) {
+ return TimeSeries.createWithOnePoint(
+ labelValues, Point.create(Value.longValue(value.get()), clock.now()), null);
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/Meter.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/Meter.java
new file mode 100644
index 00000000..f5a8dc8f
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/Meter.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.metrics;
+
+import io.opencensus.common.Clock;
+import io.opencensus.metrics.export.Metric;
+import javax.annotation.Nullable;
+
+interface Meter {
+ /**
+ * Provides a {@link io.opencensus.metrics.export.Metric} with one or more {@link
+ * io.opencensus.metrics.export.TimeSeries}.
+ *
+ * @param clock the clock used to get the time.
+ * @throws NullPointerException if {@code TimeSeries} is not present in {@code Metric}.
+ * @return a {@code Metric}.
+ */
+ @Nullable
+ Metric getMetric(Clock clock);
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricRegistryImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricRegistryImpl.java
new file mode 100644
index 00000000..1a301ecf
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricRegistryImpl.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.internal.Utils;
+import io.opencensus.metrics.DerivedDoubleGauge;
+import io.opencensus.metrics.DerivedLongGauge;
+import io.opencensus.metrics.DoubleGauge;
+import io.opencensus.metrics.LabelKey;
+import io.opencensus.metrics.LongGauge;
+import io.opencensus.metrics.MetricRegistry;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.metrics.export.MetricProducer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Implementation of {@link MetricRegistry}. */
+public final class MetricRegistryImpl extends MetricRegistry {
+ private final RegisteredMeters registeredMeters;
+ private final MetricProducer metricProducer;
+
+ MetricRegistryImpl(Clock clock) {
+ registeredMeters = new RegisteredMeters();
+ metricProducer = new MetricProducerForRegistry(registeredMeters, clock);
+ }
+
+ @Override
+ public LongGauge addLongGauge(
+ String name, String description, String unit, List<LabelKey> labelKeys) {
+ Utils.checkListElementNotNull(
+ checkNotNull(labelKeys, "labelKeys"), "labelKey element should not be null.");
+ LongGaugeImpl longGaugeMetric =
+ new LongGaugeImpl(
+ checkNotNull(name, "name"),
+ checkNotNull(description, "description"),
+ checkNotNull(unit, "unit"),
+ Collections.unmodifiableList(new ArrayList<LabelKey>(labelKeys)));
+ registeredMeters.registerMeter(name, longGaugeMetric);
+ return longGaugeMetric;
+ }
+
+ @Override
+ public DoubleGauge addDoubleGauge(
+ String name, String description, String unit, List<LabelKey> labelKeys) {
+ Utils.checkListElementNotNull(
+ checkNotNull(labelKeys, "labelKeys"), "labelKey element should not be null.");
+ DoubleGaugeImpl doubleGaugeMetric =
+ new DoubleGaugeImpl(
+ checkNotNull(name, "name"),
+ checkNotNull(description, "description"),
+ checkNotNull(unit, "unit"),
+ Collections.unmodifiableList(new ArrayList<LabelKey>(labelKeys)));
+ registeredMeters.registerMeter(name, doubleGaugeMetric);
+ return doubleGaugeMetric;
+ }
+
+ @Override
+ public DerivedLongGauge addDerivedLongGauge(
+ String name, String description, String unit, List<LabelKey> labelKeys) {
+ Utils.checkListElementNotNull(
+ checkNotNull(labelKeys, "labelKeys"), "labelKey element should not be null.");
+ DerivedLongGaugeImpl derivedLongGauge =
+ new DerivedLongGaugeImpl(
+ checkNotNull(name, "name"),
+ checkNotNull(description, "description"),
+ checkNotNull(unit, "unit"),
+ Collections.unmodifiableList(new ArrayList<LabelKey>(labelKeys)));
+ registeredMeters.registerMeter(name, derivedLongGauge);
+ return derivedLongGauge;
+ }
+
+ @Override
+ public DerivedDoubleGauge addDerivedDoubleGauge(
+ String name, String description, String unit, List<LabelKey> labelKeys) {
+ Utils.checkListElementNotNull(
+ checkNotNull(labelKeys, "labelKeys"), "labelKey element should not be null.");
+ DerivedDoubleGaugeImpl derivedDoubleGauge =
+ new DerivedDoubleGaugeImpl(
+ checkNotNull(name, "name"),
+ checkNotNull(description, "description"),
+ checkNotNull(unit, "unit"),
+ Collections.unmodifiableList(new ArrayList<LabelKey>(labelKeys)));
+ registeredMeters.registerMeter(name, derivedDoubleGauge);
+ return derivedDoubleGauge;
+ }
+
+ private static final class RegisteredMeters {
+ private volatile Map<String, Meter> registeredMeters = Collections.emptyMap();
+
+ private Map<String, Meter> getRegisteredMeters() {
+ return registeredMeters;
+ }
+
+ private synchronized void registerMeter(String meterName, Meter meter) {
+ Meter existingMeter = registeredMeters.get(meterName);
+ if (existingMeter != null) {
+ // TODO(mayurkale): Allow users to register the same Meter multiple times without exception.
+ throw new IllegalArgumentException(
+ "A different metric with the same name already registered.");
+ }
+
+ Map<String, Meter> registeredMetersCopy = new LinkedHashMap<String, Meter>(registeredMeters);
+ registeredMetersCopy.put(meterName, meter);
+ registeredMeters = Collections.unmodifiableMap(registeredMetersCopy);
+ }
+ }
+
+ private static final class MetricProducerForRegistry extends MetricProducer {
+ private final RegisteredMeters registeredMeters;
+ private final Clock clock;
+
+ private MetricProducerForRegistry(RegisteredMeters registeredMeters, Clock clock) {
+ this.registeredMeters = registeredMeters;
+ this.clock = clock;
+ }
+
+ @Override
+ public Collection<Metric> getMetrics() {
+ // Get a snapshot of the current registered meters.
+ Map<String, Meter> meters = registeredMeters.getRegisteredMeters();
+ if (meters.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<Metric> metrics = new ArrayList<Metric>(meters.size());
+ for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+ Metric metric = entry.getValue().getMetric(clock);
+ if (metric != null) {
+ metrics.add(metric);
+ }
+ }
+ return metrics;
+ }
+ }
+
+ MetricProducer getMetricProducer() {
+ return metricProducer;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricsComponentImplBase.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricsComponentImplBase.java
new file mode 100644
index 00000000..1aef6727
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/MetricsComponentImplBase.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.metrics;
+
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.metrics.export.ExportComponentImpl;
+import io.opencensus.metrics.MetricsComponent;
+
+/** Implementation of {@link MetricsComponent}. */
+public class MetricsComponentImplBase extends MetricsComponent {
+
+ private final ExportComponentImpl exportComponent;
+ private final MetricRegistryImpl metricRegistry;
+
+ @Override
+ public ExportComponentImpl getExportComponent() {
+ return exportComponent;
+ }
+
+ @Override
+ public MetricRegistryImpl getMetricRegistry() {
+ return metricRegistry;
+ }
+
+ protected MetricsComponentImplBase(Clock clock) {
+ exportComponent = new ExportComponentImpl();
+ metricRegistry = new MetricRegistryImpl(clock);
+ // Register the MetricRegistry's MetricProducer to the global MetricProducerManager.
+ exportComponent.getMetricProducerManager().add(metricRegistry.getMetricProducer());
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/export/ExportComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/export/ExportComponentImpl.java
new file mode 100644
index 00000000..173c3aec
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/export/ExportComponentImpl.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.metrics.export;
+
+import io.opencensus.metrics.export.ExportComponent;
+import io.opencensus.metrics.export.MetricProducerManager;
+
+/** Implementation of {@link ExportComponent}. */
+public final class ExportComponentImpl extends ExportComponent {
+
+ private final MetricProducerManager metricProducerManager = new MetricProducerManagerImpl();
+
+ @Override
+ public MetricProducerManager getMetricProducerManager() {
+ return metricProducerManager;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/metrics/export/MetricProducerManagerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/metrics/export/MetricProducerManagerImpl.java
new file mode 100644
index 00000000..6f585a10
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/metrics/export/MetricProducerManagerImpl.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.metrics.export;
+
+import com.google.common.base.Preconditions;
+import io.opencensus.metrics.export.MetricProducer;
+import io.opencensus.metrics.export.MetricProducerManager;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import javax.annotation.concurrent.ThreadSafe;
+
+/** Implementation of {@link MetricProducerManager}. */
+@ThreadSafe
+public final class MetricProducerManagerImpl extends MetricProducerManager {
+
+ private volatile Set<MetricProducer> metricProducers =
+ Collections.unmodifiableSet(new LinkedHashSet<MetricProducer>());
+
+ @Override
+ public synchronized void add(MetricProducer metricProducer) {
+ Preconditions.checkNotNull(metricProducer, "metricProducer");
+ // Updating the set of MetricProducers happens under a lock to avoid multiple add or remove
+ // operations to happen in the same time.
+ Set<MetricProducer> newMetricProducers = new LinkedHashSet<MetricProducer>(metricProducers);
+ if (!newMetricProducers.add(metricProducer)) {
+ // The element already present, no need to update the current set of MetricProducers.
+ return;
+ }
+ metricProducers = Collections.unmodifiableSet(newMetricProducers);
+ }
+
+ @Override
+ public synchronized void remove(MetricProducer metricProducer) {
+ Preconditions.checkNotNull(metricProducer, "metricProducer");
+ // Updating the set of MetricProducers happens under a lock to avoid multiple add or remove
+ // operations to happen in the same time.
+ Set<MetricProducer> newMetricProducers = new LinkedHashSet<MetricProducer>(metricProducers);
+ if (!newMetricProducers.remove(metricProducer)) {
+ // The element not present, no need to update the current set of MetricProducers.
+ return;
+ }
+ metricProducers = Collections.unmodifiableSet(newMetricProducers);
+ }
+
+ @Override
+ public Set<MetricProducer> getAllMetricProducer() {
+ return metricProducers;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java b/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java
new file mode 100644
index 00000000..172db539
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/IntervalBucket.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Maps;
+import io.opencensus.common.Duration;
+import io.opencensus.common.Timestamp;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.Measure;
+import io.opencensus.tags.TagValue;
+import java.util.List;
+import java.util.Map;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+/** The bucket with aggregated {@code MeasureValue}s used for {@code IntervalViewData}. */
+final class IntervalBucket {
+
+ private static final Duration ZERO = Duration.create(0, 0);
+
+ private final Timestamp start;
+ private final Duration duration;
+ private final Aggregation aggregation;
+ private final Measure measure;
+ private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap =
+ Maps.newHashMap();
+
+ IntervalBucket(Timestamp start, Duration duration, Aggregation aggregation, Measure measure) {
+ this.start = checkNotNull(start, "Start");
+ this.duration = checkNotNull(duration, "Duration");
+ checkArgument(duration.compareTo(ZERO) > 0, "Duration must be positive");
+ this.aggregation = checkNotNull(aggregation, "Aggregation");
+ this.measure = checkNotNull(measure, "measure");
+ }
+
+ Map<List</*@Nullable*/ TagValue>, MutableAggregation> getTagValueAggregationMap() {
+ return tagValueAggregationMap;
+ }
+
+ Timestamp getStart() {
+ return start;
+ }
+
+ // Puts a new value into the internal MutableAggregations, based on the TagValues.
+ void record(
+ List</*@Nullable*/ TagValue> tagValues,
+ double value,
+ Map<String, String> attachments,
+ Timestamp timestamp) {
+ if (!tagValueAggregationMap.containsKey(tagValues)) {
+ tagValueAggregationMap.put(
+ tagValues, RecordUtils.createMutableAggregation(aggregation, measure));
+ }
+ tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp);
+ }
+
+ /*
+ * Returns how much fraction of duration has passed in this IntervalBucket. For example, if this
+ * bucket starts at 10s and has a duration of 20s, and now is 15s, then getFraction() should
+ * return (15 - 10) / 20 = 0.25.
+ *
+ * This IntervalBucket must be current, i.e. the current timestamp must be within
+ * [this.start, this.start + this.duration).
+ */
+ double getFraction(Timestamp now) {
+ Duration elapsedTime = now.subtractTimestamp(start);
+ checkArgument(
+ elapsedTime.compareTo(ZERO) >= 0 && elapsedTime.compareTo(duration) < 0,
+ "This bucket must be current.");
+ return ((double) elapsedTime.toMillis()) / duration.toMillis();
+ }
+
+ void clearStats() {
+ tagValueAggregationMap.clear();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java
new file mode 100644
index 00000000..ee51796c
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2016-17, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import io.opencensus.stats.Measure.MeasureDouble;
+import io.opencensus.stats.Measure.MeasureLong;
+import io.opencensus.stats.MeasureMap;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.unsafe.ContextUtils;
+
+/** Implementation of {@link MeasureMap}. */
+final class MeasureMapImpl extends MeasureMap {
+ private final StatsManager statsManager;
+ private final MeasureMapInternal.Builder builder = MeasureMapInternal.builder();
+
+ static MeasureMapImpl create(StatsManager statsManager) {
+ return new MeasureMapImpl(statsManager);
+ }
+
+ private MeasureMapImpl(StatsManager statsManager) {
+ this.statsManager = statsManager;
+ }
+
+ @Override
+ public MeasureMapImpl put(MeasureDouble measure, double value) {
+ builder.put(measure, value);
+ return this;
+ }
+
+ @Override
+ public MeasureMapImpl put(MeasureLong measure, long value) {
+ builder.put(measure, value);
+ return this;
+ }
+
+ @Override
+ public MeasureMap putAttachment(String key, String value) {
+ builder.putAttachment(key, value);
+ return this;
+ }
+
+ @Override
+ public void record() {
+ // Use the context key directly, to avoid depending on the tags implementation.
+ record(ContextUtils.TAG_CONTEXT_KEY.get());
+ }
+
+ @Override
+ public void record(TagContext tags) {
+ statsManager.record(tags, builder.build());
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java
new file mode 100644
index 00000000..d867b342
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureMapInternal.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2016-17, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.Measure.MeasureDouble;
+import io.opencensus.stats.Measure.MeasureLong;
+import io.opencensus.stats.Measurement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+// TODO(songya): consider combining MeasureMapImpl and this class.
+/** A map from {@link Measure}'s to measured values. */
+final class MeasureMapInternal {
+
+ /** Returns a {@link Builder} for the {@link MeasureMapInternal} class. */
+ static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Returns an {@link Iterator} over the measure/value mappings in this {@link MeasureMapInternal}.
+ * The {@code Iterator} does not support {@link Iterator#remove()}.
+ */
+ Iterator<Measurement> iterator() {
+ return new MeasureMapInternalIterator();
+ }
+
+ // Returns the contextual information associated with an example value.
+ Map<String, String> getAttachments() {
+ return attachments;
+ }
+
+ private final ArrayList<Measurement> measurements;
+ private final Map<String, String> attachments;
+
+ private MeasureMapInternal(ArrayList<Measurement> measurements, Map<String, String> attachments) {
+ this.measurements = measurements;
+ this.attachments = Collections.unmodifiableMap(new HashMap<String, String>(attachments));
+ }
+
+ /** Builder for the {@link MeasureMapInternal} class. */
+ static class Builder {
+ /**
+ * Associates the {@link MeasureDouble} with the given value. Subsequent updates to the same
+ * {@link MeasureDouble} will overwrite the previous value.
+ *
+ * @param measure the {@link MeasureDouble}
+ * @param value the value to be associated with {@code measure}
+ * @return this
+ */
+ Builder put(MeasureDouble measure, double value) {
+ measurements.add(Measurement.MeasurementDouble.create(measure, value));
+ return this;
+ }
+
+ /**
+ * Associates the {@link MeasureLong} with the given value. Subsequent updates to the same
+ * {@link MeasureLong} will overwrite the previous value.
+ *
+ * @param measure the {@link MeasureLong}
+ * @param value the value to be associated with {@code measure}
+ * @return this
+ */
+ Builder put(MeasureLong measure, long value) {
+ measurements.add(Measurement.MeasurementLong.create(measure, value));
+ return this;
+ }
+
+ Builder putAttachment(String key, String value) {
+ this.attachments.put(key, value);
+ return this;
+ }
+
+ /** Constructs a {@link MeasureMapInternal} from the current measurements. */
+ MeasureMapInternal build() {
+ // Note: this makes adding measurements quadratic but is fastest for the sizes of
+ // MeasureMapInternals that we should see. We may want to go to a strategy of sort/eliminate
+ // for larger MeasureMapInternals.
+ for (int i = measurements.size() - 1; i >= 0; i--) {
+ for (int j = i - 1; j >= 0; j--) {
+ if (measurements.get(i).getMeasure() == measurements.get(j).getMeasure()) {
+ measurements.remove(j);
+ j--;
+ }
+ }
+ }
+ return new MeasureMapInternal(measurements, attachments);
+ }
+
+ private final ArrayList<Measurement> measurements = new ArrayList<Measurement>();
+ private final Map<String, String> attachments = new HashMap<String, String>();
+
+ private Builder() {}
+ }
+
+ // Provides an unmodifiable Iterator over this instance's measurements.
+ private final class MeasureMapInternalIterator implements Iterator<Measurement> {
+ @Override
+ public boolean hasNext() {
+ return position < length;
+ }
+
+ @Override
+ public Measurement next() {
+ if (position >= measurements.size()) {
+ throw new NoSuchElementException();
+ }
+ return measurements.get(position++);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private final int length = measurements.size();
+ private int position = 0;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java
new file mode 100644
index 00000000..5da0cad8
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MeasureToViewMap.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import io.opencensus.common.Clock;
+import io.opencensus.common.Timestamp;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.Measurement;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.TagContext;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import javax.annotation.concurrent.GuardedBy;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+/** A class that stores a singleton map from {@code MeasureName}s to {@link MutableViewData}s. */
+@SuppressWarnings("deprecation")
+final class MeasureToViewMap {
+
+ /*
+ * A synchronized singleton map that stores the one-to-many mapping from Measures
+ * to MutableViewDatas.
+ */
+ @GuardedBy("this")
+ private final Multimap<String, MutableViewData> mutableMap =
+ HashMultimap.<String, MutableViewData>create();
+
+ @GuardedBy("this")
+ private final Map<View.Name, View> registeredViews = new HashMap<View.Name, View>();
+
+ // TODO(songya): consider adding a Measure.Name class
+ @GuardedBy("this")
+ private final Map<String, Measure> registeredMeasures = Maps.newHashMap();
+
+ // Cached set of exported views. It must be set to null whenever a view is registered or
+ // unregistered.
+ @javax.annotation.Nullable private volatile Set<View> exportedViews;
+
+ /** Returns a {@link ViewData} corresponding to the given {@link View.Name}. */
+ @javax.annotation.Nullable
+ synchronized ViewData getView(View.Name viewName, Clock clock, State state) {
+ MutableViewData view = getMutableViewData(viewName);
+ return view == null ? null : view.toViewData(clock.now(), state);
+ }
+
+ Set<View> getExportedViews() {
+ Set<View> views = exportedViews;
+ if (views == null) {
+ synchronized (this) {
+ exportedViews = views = filterExportedViews(registeredViews.values());
+ }
+ }
+ return views;
+ }
+
+ // Returns the subset of the given views that should be exported
+ private static Set<View> filterExportedViews(Collection<View> allViews) {
+ Set<View> views = Sets.newHashSet();
+ for (View view : allViews) {
+ if (view.getWindow() instanceof View.AggregationWindow.Cumulative) {
+ views.add(view);
+ }
+ }
+ return Collections.unmodifiableSet(views);
+ }
+
+ /** Enable stats collection for the given {@link View}. */
+ synchronized void registerView(View view, Clock clock) {
+ exportedViews = null;
+ View existing = registeredViews.get(view.getName());
+ if (existing != null) {
+ if (existing.equals(view)) {
+ // Ignore views that are already registered.
+ return;
+ } else {
+ throw new IllegalArgumentException(
+ "A different view with the same name is already registered: " + existing);
+ }
+ }
+ Measure measure = view.getMeasure();
+ Measure registeredMeasure = registeredMeasures.get(measure.getName());
+ if (registeredMeasure != null && !registeredMeasure.equals(measure)) {
+ throw new IllegalArgumentException(
+ "A different measure with the same name is already registered: " + registeredMeasure);
+ }
+ registeredViews.put(view.getName(), view);
+ if (registeredMeasure == null) {
+ registeredMeasures.put(measure.getName(), measure);
+ }
+ Timestamp now = clock.now();
+ mutableMap.put(view.getMeasure().getName(), MutableViewData.create(view, now));
+ }
+
+ @javax.annotation.Nullable
+ private synchronized MutableViewData getMutableViewData(View.Name viewName) {
+ View view = registeredViews.get(viewName);
+ if (view == null) {
+ return null;
+ }
+ Collection<MutableViewData> views = mutableMap.get(view.getMeasure().getName());
+ for (MutableViewData viewData : views) {
+ if (viewData.getView().getName().equals(viewName)) {
+ return viewData;
+ }
+ }
+ throw new AssertionError(
+ "Internal error: Not recording stats for view: \""
+ + viewName
+ + "\" registeredViews="
+ + registeredViews
+ + ", mutableMap="
+ + mutableMap);
+ }
+
+ // Records stats with a set of tags.
+ synchronized void record(TagContext tags, MeasureMapInternal stats, Timestamp timestamp) {
+ Iterator<Measurement> iterator = stats.iterator();
+ Map<String, String> attachments = stats.getAttachments();
+ while (iterator.hasNext()) {
+ Measurement measurement = iterator.next();
+ Measure measure = measurement.getMeasure();
+ if (!measure.equals(registeredMeasures.get(measure.getName()))) {
+ // unregistered measures will be ignored.
+ continue;
+ }
+ Collection<MutableViewData> viewDataCollection = mutableMap.get(measure.getName());
+ for (MutableViewData viewData : viewDataCollection) {
+ viewData.record(
+ tags, RecordUtils.getDoubleValueFromMeasurement(measurement), timestamp, attachments);
+ }
+ }
+ }
+
+ synchronized List<Metric> getMetrics(Clock clock, State state) {
+ List<Metric> metrics = new ArrayList<Metric>();
+ Timestamp now = clock.now();
+ for (Entry<String, MutableViewData> entry : mutableMap.entries()) {
+ Metric metric = entry.getValue().toMetric(now, state);
+ if (metric != null) {
+ metrics.add(metric);
+ }
+ }
+ return metrics;
+ }
+
+ // Clear stats for all the current MutableViewData
+ synchronized void clearStats() {
+ for (Entry<String, Collection<MutableViewData>> entry : mutableMap.asMap().entrySet()) {
+ for (MutableViewData mutableViewData : entry.getValue()) {
+ mutableViewData.clearStats();
+ }
+ }
+ }
+
+ // Resume stats collection for all MutableViewData.
+ synchronized void resumeStatsCollection(Timestamp now) {
+ for (Entry<String, Collection<MutableViewData>> entry : mutableMap.asMap().entrySet()) {
+ for (MutableViewData mutableViewData : entry.getValue()) {
+ mutableViewData.resumeStatsCollection(now);
+ }
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java
new file mode 100644
index 00000000..7bf92572
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricProducerImpl.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.metrics.export.MetricProducer;
+import java.util.Collection;
+import javax.annotation.concurrent.ThreadSafe;
+
+/** Implementation of {@link MetricProducer}. */
+@ThreadSafe
+final class MetricProducerImpl extends MetricProducer {
+
+ private final StatsManager statsManager;
+
+ MetricProducerImpl(StatsManager statsManager) {
+ this.statsManager = statsManager;
+ }
+
+ @Override
+ public Collection<Metric> getMetrics() {
+ return statsManager.getMetrics();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java
new file mode 100644
index 00000000..0dfb1d26
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MetricUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opencensus.common.Function;
+import io.opencensus.common.Functions;
+import io.opencensus.metrics.LabelKey;
+import io.opencensus.metrics.LabelValue;
+import io.opencensus.metrics.export.MetricDescriptor;
+import io.opencensus.metrics.export.MetricDescriptor.Type;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.View;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import java.util.ArrayList;
+import java.util.List;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+@SuppressWarnings("deprecation")
+// Utils to convert Stats data models to Metric data models.
+final class MetricUtils {
+
+ @javax.annotation.Nullable
+ static MetricDescriptor viewToMetricDescriptor(View view) {
+ if (view.getWindow() instanceof View.AggregationWindow.Interval) {
+ // Only creates Metric for cumulative stats.
+ return null;
+ }
+ List<LabelKey> labelKeys = new ArrayList<LabelKey>();
+ for (TagKey tagKey : view.getColumns()) {
+ // TODO: add description
+ labelKeys.add(LabelKey.create(tagKey.getName(), ""));
+ }
+ Measure measure = view.getMeasure();
+ return MetricDescriptor.create(
+ view.getName().asString(),
+ view.getDescription(),
+ measure.getUnit(),
+ getType(measure, view.getAggregation()),
+ labelKeys);
+ }
+
+ @VisibleForTesting
+ static Type getType(Measure measure, Aggregation aggregation) {
+ return aggregation.match(
+ Functions.returnConstant(
+ measure.match(
+ TYPE_CUMULATIVE_DOUBLE_FUNCTION, // Sum Double
+ TYPE_CUMULATIVE_INT64_FUNCTION, // Sum Int64
+ TYPE_UNRECOGNIZED_FUNCTION)),
+ TYPE_CUMULATIVE_INT64_FUNCTION, // Count
+ TYPE_CUMULATIVE_DISTRIBUTION_FUNCTION, // Distribution
+ Functions.returnConstant(
+ measure.match(
+ TYPE_GAUGE_DOUBLE_FUNCTION, // LastValue Double
+ TYPE_GAUGE_INT64_FUNCTION, // LastValue Long
+ TYPE_UNRECOGNIZED_FUNCTION)),
+ AGGREGATION_TYPE_DEFAULT_FUNCTION);
+ }
+
+ static List<LabelValue> tagValuesToLabelValues(List</*@Nullable*/ TagValue> tagValues) {
+ List<LabelValue> labelValues = new ArrayList<LabelValue>();
+ for (/*@Nullable*/ TagValue tagValue : tagValues) {
+ labelValues.add(LabelValue.create(tagValue == null ? null : tagValue.asString()));
+ }
+ return labelValues;
+ }
+
+ private static final Function<Object, Type> TYPE_CUMULATIVE_DOUBLE_FUNCTION =
+ Functions.returnConstant(Type.CUMULATIVE_DOUBLE);
+
+ private static final Function<Object, Type> TYPE_CUMULATIVE_INT64_FUNCTION =
+ Functions.returnConstant(Type.CUMULATIVE_INT64);
+
+ private static final Function<Object, Type> TYPE_CUMULATIVE_DISTRIBUTION_FUNCTION =
+ Functions.returnConstant(Type.CUMULATIVE_DISTRIBUTION);
+
+ private static final Function<Object, Type> TYPE_GAUGE_DOUBLE_FUNCTION =
+ Functions.returnConstant(Type.GAUGE_DOUBLE);
+
+ private static final Function<Object, Type> TYPE_GAUGE_INT64_FUNCTION =
+ Functions.returnConstant(Type.GAUGE_INT64);
+
+ private static final Function<Object, Type> TYPE_UNRECOGNIZED_FUNCTION =
+ Functions.<Type>throwAssertionError();
+
+ private static final Function<Aggregation, Type> AGGREGATION_TYPE_DEFAULT_FUNCTION =
+ new Function<Aggregation, Type>() {
+ @Override
+ public Type apply(Aggregation arg) {
+ if (arg instanceof Aggregation.Mean) {
+ return Type.CUMULATIVE_DOUBLE; // Mean
+ }
+ throw new AssertionError();
+ }
+ };
+
+ private MetricUtils() {}
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java
new file mode 100644
index 00000000..6e2bff1c
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableAggregation.java
@@ -0,0 +1,556 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opencensus.common.Timestamp;
+import io.opencensus.metrics.export.Distribution;
+import io.opencensus.metrics.export.Distribution.BucketOptions;
+import io.opencensus.metrics.export.Point;
+import io.opencensus.metrics.export.Value;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.AggregationData.DistributionData;
+import io.opencensus.stats.AggregationData.DistributionData.Exemplar;
+import io.opencensus.stats.BucketBoundaries;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Mutable version of {@link Aggregation} that supports adding values. */
+abstract class MutableAggregation {
+
+ private MutableAggregation() {}
+
+ // Tolerance for double comparison.
+ private static final double TOLERANCE = 1e-6;
+
+ /**
+ * Put a new value into the MutableAggregation.
+ *
+ * @param value new value to be added to population
+ * @param attachments the contextual information on an {@link Exemplar}
+ * @param timestamp the timestamp when the value is recorded
+ */
+ abstract void add(double value, Map<String, String> attachments, Timestamp timestamp);
+
+ // TODO(songya): remove this method once interval stats is completely removed.
+ /**
+ * Combine the internal values of this MutableAggregation and value of the given
+ * MutableAggregation, with the given fraction. Then set the internal value of this
+ * MutableAggregation to the combined value.
+ *
+ * @param other the other {@code MutableAggregation}. The type of this and other {@code
+ * MutableAggregation} must match.
+ * @param fraction the fraction that the value in other {@code MutableAggregation} should
+ * contribute. Must be within [0.0, 1.0].
+ */
+ abstract void combine(MutableAggregation other, double fraction);
+
+ abstract AggregationData toAggregationData();
+
+ abstract Point toPoint(Timestamp timestamp);
+
+ /** Calculate sum of doubles on aggregated {@code MeasureValue}s. */
+ static class MutableSumDouble extends MutableAggregation {
+
+ private double sum = 0.0;
+
+ private MutableSumDouble() {}
+
+ /**
+ * Construct a {@code MutableSumDouble}.
+ *
+ * @return an empty {@code MutableSumDouble}.
+ */
+ static MutableSumDouble create() {
+ return new MutableSumDouble();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ sum += value;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableSumDouble, "MutableSumDouble expected.");
+ this.sum += fraction * ((MutableSumDouble) other).sum;
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.SumDataDouble.create(sum);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.doubleValue(sum), timestamp);
+ }
+
+ @VisibleForTesting
+ double getSum() {
+ return sum;
+ }
+ }
+
+ /** Calculate sum of longs on aggregated {@code MeasureValue}s. */
+ static final class MutableSumLong extends MutableSumDouble {
+ private MutableSumLong() {
+ super();
+ }
+
+ /**
+ * Construct a {@code MutableSumLong}.
+ *
+ * @return an empty {@code MutableSumLong}.
+ */
+ static MutableSumLong create() {
+ return new MutableSumLong();
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.SumDataLong.create(Math.round(getSum()));
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.longValue(Math.round(getSum())), timestamp);
+ }
+ }
+
+ /** Calculate count on aggregated {@code MeasureValue}s. */
+ static final class MutableCount extends MutableAggregation {
+
+ private long count = 0;
+
+ private MutableCount() {}
+
+ /**
+ * Construct a {@code MutableCount}.
+ *
+ * @return an empty {@code MutableCount}.
+ */
+ static MutableCount create() {
+ return new MutableCount();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ count++;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableCount, "MutableCount expected.");
+ this.count += Math.round(fraction * ((MutableCount) other).getCount());
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.CountData.create(count);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.longValue(count), timestamp);
+ }
+
+ /**
+ * Returns the aggregated count.
+ *
+ * @return the aggregated count.
+ */
+ long getCount() {
+ return count;
+ }
+ }
+
+ /** Calculate mean on aggregated {@code MeasureValue}s. */
+ static final class MutableMean extends MutableAggregation {
+
+ private double sum = 0.0;
+ private long count = 0;
+
+ private MutableMean() {}
+
+ /**
+ * Construct a {@code MutableMean}.
+ *
+ * @return an empty {@code MutableMean}.
+ */
+ static MutableMean create() {
+ return new MutableMean();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ count++;
+ sum += value;
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableMean, "MutableMean expected.");
+ MutableMean mutableMean = (MutableMean) other;
+ this.count += Math.round(mutableMean.count * fraction);
+ this.sum += mutableMean.sum * fraction;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.MeanData.create(getMean(), count);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.doubleValue(getMean()), timestamp);
+ }
+
+ /**
+ * Returns the aggregated mean.
+ *
+ * @return the aggregated mean.
+ */
+ double getMean() {
+ return count == 0 ? 0 : sum / count;
+ }
+
+ /**
+ * Returns the aggregated count.
+ *
+ * @return the aggregated count.
+ */
+ long getCount() {
+ return count;
+ }
+
+ @VisibleForTesting
+ double getSum() {
+ return sum;
+ }
+ }
+
+ /** Calculate distribution stats on aggregated {@code MeasureValue}s. */
+ static final class MutableDistribution extends MutableAggregation {
+
+ private double sum = 0.0;
+ private double mean = 0.0;
+ private long count = 0;
+ private double sumOfSquaredDeviations = 0.0;
+
+ // Initial "impossible" values, that will get reset as soon as first value is added.
+ private double min = Double.POSITIVE_INFINITY;
+ private double max = Double.NEGATIVE_INFINITY;
+
+ private final BucketBoundaries bucketBoundaries;
+ private final long[] bucketCounts;
+
+ // If there's a histogram (i.e bucket boundaries are not empty) in this MutableDistribution,
+ // exemplars will have the same size to bucketCounts; otherwise exemplars are null.
+ // Only the newest exemplar will be kept at each index.
+ @javax.annotation.Nullable private final Exemplar[] exemplars;
+
+ private MutableDistribution(BucketBoundaries bucketBoundaries) {
+ this.bucketBoundaries = bucketBoundaries;
+ int buckets = bucketBoundaries.getBoundaries().size() + 1;
+ this.bucketCounts = new long[buckets];
+ // In the implementation, each histogram bucket can have up to one exemplar, and the exemplar
+ // array is guaranteed to be in ascending order.
+ // If there's no histogram, don't record exemplars.
+ this.exemplars = bucketBoundaries.getBoundaries().isEmpty() ? null : new Exemplar[buckets];
+ }
+
+ /**
+ * Construct a {@code MutableDistribution}.
+ *
+ * @return an empty {@code MutableDistribution}.
+ */
+ static MutableDistribution create(BucketBoundaries bucketBoundaries) {
+ checkNotNull(bucketBoundaries, "bucketBoundaries should not be null.");
+ return new MutableDistribution(bucketBoundaries);
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ sum += value;
+ count++;
+
+ /*
+ * Update the sum of squared deviations from the mean with the given value. For values
+ * x_i this is Sum[i=1..n]((x_i - mean)^2)
+ *
+ * Computed using Welfords method (see
+ * https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance, or Knuth, "The Art of
+ * Computer Programming", Vol. 2, page 323, 3rd edition)
+ */
+ double deltaFromMean = value - mean;
+ mean += deltaFromMean / count;
+ double deltaFromMean2 = value - mean;
+ sumOfSquaredDeviations += deltaFromMean * deltaFromMean2;
+
+ if (value < min) {
+ min = value;
+ }
+ if (value > max) {
+ max = value;
+ }
+
+ int bucket = 0;
+ for (; bucket < bucketBoundaries.getBoundaries().size(); bucket++) {
+ if (value < bucketBoundaries.getBoundaries().get(bucket)) {
+ break;
+ }
+ }
+ bucketCounts[bucket]++;
+
+ // No implicit recording for exemplars - if there are no attachments (contextual information),
+ // don't record exemplars.
+ if (!attachments.isEmpty() && exemplars != null) {
+ exemplars[bucket] = Exemplar.create(value, timestamp, attachments);
+ }
+ }
+
+ // We don't compute fractional MutableDistribution, it's either whole or none.
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableDistribution, "MutableDistribution expected.");
+ if (Math.abs(1.0 - fraction) > TOLERANCE) {
+ return;
+ }
+
+ MutableDistribution mutableDistribution = (MutableDistribution) other;
+ checkArgument(
+ this.bucketBoundaries.equals(mutableDistribution.bucketBoundaries),
+ "Bucket boundaries should match.");
+
+ // Algorithm for calculating the combination of sum of squared deviations:
+ // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm.
+ if (this.count + mutableDistribution.count > 0) {
+ double delta = mutableDistribution.mean - this.mean;
+ this.sumOfSquaredDeviations =
+ this.sumOfSquaredDeviations
+ + mutableDistribution.sumOfSquaredDeviations
+ + Math.pow(delta, 2)
+ * this.count
+ * mutableDistribution.count
+ / (this.count + mutableDistribution.count);
+ }
+
+ this.count += mutableDistribution.count;
+ this.sum += mutableDistribution.sum;
+ this.mean = this.sum / this.count;
+
+ if (mutableDistribution.min < this.min) {
+ this.min = mutableDistribution.min;
+ }
+ if (mutableDistribution.max > this.max) {
+ this.max = mutableDistribution.max;
+ }
+
+ long[] bucketCounts = mutableDistribution.getBucketCounts();
+ for (int i = 0; i < bucketCounts.length; i++) {
+ this.bucketCounts[i] += bucketCounts[i];
+ }
+
+ Exemplar[] otherExemplars = mutableDistribution.getExemplars();
+ if (exemplars != null && otherExemplars != null) {
+ for (int i = 0; i < otherExemplars.length; i++) {
+ Exemplar exemplar = otherExemplars[i];
+ // Assume other is always newer than this, because we combined interval buckets in time
+ // order.
+ // If there's a newer exemplar, overwrite current value.
+ if (exemplar != null) {
+ this.exemplars[i] = exemplar;
+ }
+ }
+ }
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ List<Long> boxedBucketCounts = new ArrayList<Long>();
+ for (long bucketCount : bucketCounts) {
+ boxedBucketCounts.add(bucketCount);
+ }
+ List<Exemplar> exemplarList = new ArrayList<Exemplar>();
+ if (exemplars != null) {
+ for (Exemplar exemplar : exemplars) {
+ if (exemplar != null) {
+ exemplarList.add(exemplar);
+ }
+ }
+ }
+ return DistributionData.create(
+ mean, count, min, max, sumOfSquaredDeviations, boxedBucketCounts, exemplarList);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ List<Distribution.Bucket> buckets = new ArrayList<Distribution.Bucket>();
+ for (int bucket = 0; bucket < bucketCounts.length; bucket++) {
+ long bucketCount = bucketCounts[bucket];
+ @javax.annotation.Nullable AggregationData.DistributionData.Exemplar exemplar = null;
+ if (exemplars != null) {
+ exemplar = exemplars[bucket];
+ }
+
+ Distribution.Bucket metricBucket;
+ if (exemplar != null) {
+ // Bucket with an Exemplar.
+ metricBucket =
+ Distribution.Bucket.create(
+ bucketCount,
+ Distribution.Exemplar.create(
+ exemplar.getValue(), exemplar.getTimestamp(), exemplar.getAttachments()));
+ } else {
+ // Bucket with no Exemplar.
+ metricBucket = Distribution.Bucket.create(bucketCount);
+ }
+ buckets.add(metricBucket);
+ }
+
+ // TODO(mayurkale): Drop the first bucket when converting to metrics.
+ // Reason: In Stats API, bucket bounds begin with -infinity (first bucket is (-infinity, 0)).
+ BucketOptions bucketOptions = BucketOptions.explicitOptions(bucketBoundaries.getBoundaries());
+
+ return Point.create(
+ Value.distributionValue(
+ Distribution.create(
+ count, mean * count, sumOfSquaredDeviations, bucketOptions, buckets)),
+ timestamp);
+ }
+
+ double getMean() {
+ return mean;
+ }
+
+ long getCount() {
+ return count;
+ }
+
+ double getMin() {
+ return min;
+ }
+
+ double getMax() {
+ return max;
+ }
+
+ // Returns the aggregated sum of squared deviations.
+ double getSumOfSquaredDeviations() {
+ return sumOfSquaredDeviations;
+ }
+
+ long[] getBucketCounts() {
+ return bucketCounts;
+ }
+
+ BucketBoundaries getBucketBoundaries() {
+ return bucketBoundaries;
+ }
+
+ @javax.annotation.Nullable
+ Exemplar[] getExemplars() {
+ return exemplars;
+ }
+ }
+
+ /** Calculate double last value on aggregated {@code MeasureValue}s. */
+ static class MutableLastValueDouble extends MutableAggregation {
+
+ // Initial value that will get reset as soon as first value is added.
+ private double lastValue = Double.NaN;
+ // TODO(songya): remove this once interval stats is completely removed.
+ private boolean initialized = false;
+
+ private MutableLastValueDouble() {}
+
+ /**
+ * Construct a {@code MutableLastValueDouble}.
+ *
+ * @return an empty {@code MutableLastValueDouble}.
+ */
+ static MutableLastValueDouble create() {
+ return new MutableLastValueDouble();
+ }
+
+ @Override
+ void add(double value, Map<String, String> attachments, Timestamp timestamp) {
+ lastValue = value;
+ // TODO(songya): remove this once interval stats is completely removed.
+ if (!initialized) {
+ initialized = true;
+ }
+ }
+
+ @Override
+ void combine(MutableAggregation other, double fraction) {
+ checkArgument(other instanceof MutableLastValueDouble, "MutableLastValueDouble expected.");
+ MutableLastValueDouble otherValue = (MutableLastValueDouble) other;
+ // Assume other is always newer than this, because we combined interval buckets in time order.
+ // If there's a newer value, overwrite current value.
+ this.lastValue = otherValue.initialized ? otherValue.getLastValue() : this.lastValue;
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.LastValueDataDouble.create(lastValue);
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.doubleValue(lastValue), timestamp);
+ }
+
+ @VisibleForTesting
+ double getLastValue() {
+ return lastValue;
+ }
+ }
+
+ /** Calculate last long value on aggregated {@code MeasureValue}s. */
+ static final class MutableLastValueLong extends MutableLastValueDouble {
+ private MutableLastValueLong() {
+ super();
+ }
+
+ /**
+ * Construct a {@code MutableLastValueLong}.
+ *
+ * @return an empty {@code MutableLastValueLong}.
+ */
+ static MutableLastValueLong create() {
+ return new MutableLastValueLong();
+ }
+
+ @Override
+ AggregationData toAggregationData() {
+ return AggregationData.LastValueDataLong.create(Math.round(getLastValue()));
+ }
+
+ @Override
+ Point toPoint(Timestamp timestamp) {
+ return Point.create(Value.longValue(Math.round(getLastValue())), timestamp);
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java
new file mode 100644
index 00000000..928675e9
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/MutableViewData.java
@@ -0,0 +1,464 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.opencensus.implcore.stats.RecordUtils.createAggregationMap;
+import static io.opencensus.implcore.stats.RecordUtils.createMutableAggregation;
+import static io.opencensus.implcore.stats.RecordUtils.getTagMap;
+import static io.opencensus.implcore.stats.RecordUtils.getTagValues;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import io.opencensus.common.Duration;
+import io.opencensus.common.Function;
+import io.opencensus.common.Functions;
+import io.opencensus.common.Timestamp;
+import io.opencensus.implcore.internal.CheckerFrameworkUtils;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.metrics.LabelValue;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.metrics.export.MetricDescriptor;
+import io.opencensus.metrics.export.MetricDescriptor.Type;
+import io.opencensus.metrics.export.Point;
+import io.opencensus.metrics.export.TimeSeries;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagValue;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+/** A mutable version of {@link ViewData}, used for recording stats and start/end time. */
+@SuppressWarnings("deprecation")
+abstract class MutableViewData {
+
+ @VisibleForTesting static final Timestamp ZERO_TIMESTAMP = Timestamp.create(0, 0);
+
+ private final View view;
+
+ private MutableViewData(View view) {
+ this.view = view;
+ }
+
+ /**
+ * Constructs a new {@link MutableViewData}.
+ *
+ * @param view the {@code View} linked with this {@code MutableViewData}.
+ * @param start the start {@code Timestamp}.
+ * @return a {@code MutableViewData}.
+ */
+ static MutableViewData create(final View view, final Timestamp start) {
+ return view.getWindow()
+ .match(
+ new CreateCumulative(view, start),
+ new CreateInterval(view, start),
+ Functions.<MutableViewData>throwAssertionError());
+ }
+
+ /** The {@link View} associated with this {@link ViewData}. */
+ View getView() {
+ return view;
+ }
+
+ @javax.annotation.Nullable
+ abstract Metric toMetric(Timestamp now, State state);
+
+ /** Record stats with the given tags. */
+ abstract void record(
+ TagContext context, double value, Timestamp timestamp, Map<String, String> attachments);
+
+ /** Convert this {@link MutableViewData} to {@link ViewData}. */
+ abstract ViewData toViewData(Timestamp now, State state);
+
+ // Clear recorded stats.
+ abstract void clearStats();
+
+ // Resume stats collection, and reset Start Timestamp (for CumulativeMutableViewData), or refresh
+ // bucket list (for InternalMutableViewData).
+ abstract void resumeStatsCollection(Timestamp now);
+
+ private static final class CumulativeMutableViewData extends MutableViewData {
+
+ private Timestamp start;
+ private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap =
+ Maps.newHashMap();
+ // Cache a MetricDescriptor to avoid converting View to MetricDescriptor in the future.
+ private final MetricDescriptor metricDescriptor;
+
+ private CumulativeMutableViewData(View view, Timestamp start) {
+ super(view);
+ this.start = start;
+ MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view);
+ if (metricDescriptor == null) {
+ throw new AssertionError(
+ "Cumulative view should be converted to a non-null MetricDescriptor.");
+ } else {
+ this.metricDescriptor = metricDescriptor;
+ }
+ }
+
+ @javax.annotation.Nullable
+ @Override
+ Metric toMetric(Timestamp now, State state) {
+ if (state == State.DISABLED) {
+ return null;
+ }
+ Type type = metricDescriptor.getType();
+ @javax.annotation.Nullable
+ Timestamp startTime = type == Type.GAUGE_INT64 || type == Type.GAUGE_DOUBLE ? null : start;
+ List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>();
+ for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry :
+ tagValueAggregationMap.entrySet()) {
+ List<LabelValue> labelValues = MetricUtils.tagValuesToLabelValues(entry.getKey());
+ Point point = entry.getValue().toPoint(now);
+ timeSeriesList.add(TimeSeries.createWithOnePoint(labelValues, point, startTime));
+ }
+ return Metric.create(metricDescriptor, timeSeriesList);
+ }
+
+ @Override
+ void record(
+ TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) {
+ List</*@Nullable*/ TagValue> tagValues =
+ getTagValues(getTagMap(context), super.view.getColumns());
+ if (!tagValueAggregationMap.containsKey(tagValues)) {
+ tagValueAggregationMap.put(
+ tagValues,
+ createMutableAggregation(super.view.getAggregation(), super.getView().getMeasure()));
+ }
+ tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp);
+ }
+
+ @Override
+ ViewData toViewData(Timestamp now, State state) {
+ if (state == State.ENABLED) {
+ return ViewData.create(
+ super.view,
+ createAggregationMap(tagValueAggregationMap, super.view.getMeasure()),
+ ViewData.AggregationWindowData.CumulativeData.create(start, now));
+ } else {
+ // If Stats state is DISABLED, return an empty ViewData.
+ return ViewData.create(
+ super.view,
+ Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(),
+ ViewData.AggregationWindowData.CumulativeData.create(ZERO_TIMESTAMP, ZERO_TIMESTAMP));
+ }
+ }
+
+ @Override
+ void clearStats() {
+ tagValueAggregationMap.clear();
+ }
+
+ @Override
+ void resumeStatsCollection(Timestamp now) {
+ start = now;
+ }
+ }
+
+ /*
+ * For each IntervalView, we always keep a queue of N + 1 buckets (by default N is 4).
+ * Each bucket has a duration which is interval duration / N.
+ * Ideally:
+ * 1. the buckets should always be up-to-date,
+ * 2. current time should always be within the latest bucket, currently recorded stats should fall
+ * into the latest bucket,
+ * 3. there are always N buckets before the current one, which holds the stats in the past
+ * interval duration.
+ *
+ * When getView() is called, we will extract and combine the stats from the current and past
+ * buckets (part of the stats from the oldest bucket could have expired).
+ *
+ * However, in reality, we couldn't track the status of buckets all the time (keep monitoring and
+ * updating the bucket queue will be expensive). When we call record() or getView(), some or all
+ * of the buckets might be outdated, and we will need to "pad" new buckets to the queue and remove
+ * outdated ones. After refreshing buckets, the bucket queue will able to maintain the three
+ * invariants in the ideal situation.
+ *
+ * For example:
+ * 1. We have an IntervalView which has a duration of 8 seconds, we register this view at 10s.
+ * 2. Initially there will be 5 buckets: [2.0, 4.0), [4.0, 6.0), ..., [10.0, 12.0).
+ * 3. If users don't call record() or getView(), bucket queue will remain as it is, and some
+ * buckets could expire.
+ * 4. Suppose record() is called at 15s, now we need to refresh the bucket queue. We need to add
+ * two new buckets [12.0, 14.0) and [14.0, 16.0), and remove two expired buckets [2.0, 4.0)
+ * and [4.0, 6.0)
+ * 5. Suppose record() is called again at 30s, all the current buckets should have expired. We add
+ * 5 new buckets [22.0, 24.0) ... [30.0, 32.0) and remove all the previous buckets.
+ * 6. Suppose users call getView() at 35s, again we need to add two new buckets and remove two
+ * expired one, so that bucket queue is up-to-date. Now we combine stats from all buckets and
+ * return the combined IntervalViewData.
+ */
+ private static final class IntervalMutableViewData extends MutableViewData {
+
+ // TODO(songya): allow customizable bucket size in the future.
+ private static final int N = 4; // IntervalView has N + 1 buckets
+
+ private final ArrayDeque<IntervalBucket> buckets = new ArrayDeque<IntervalBucket>();
+
+ private final Duration totalDuration; // Duration of the whole interval.
+ private final Duration bucketDuration; // Duration of a single bucket (totalDuration / N)
+
+ private IntervalMutableViewData(View view, Timestamp start) {
+ super(view);
+ Duration totalDuration = ((View.AggregationWindow.Interval) view.getWindow()).getDuration();
+ this.totalDuration = totalDuration;
+ this.bucketDuration = Duration.fromMillis(totalDuration.toMillis() / N);
+
+ // When initializing. add N empty buckets prior to the start timestamp of this
+ // IntervalMutableViewData, so that the last bucket will be the current one in effect.
+ shiftBucketList(N + 1, start);
+ }
+
+ @javax.annotation.Nullable
+ @Override
+ Metric toMetric(Timestamp now, State state) {
+ return null;
+ }
+
+ @Override
+ void record(
+ TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) {
+ List</*@Nullable*/ TagValue> tagValues =
+ getTagValues(getTagMap(context), super.view.getColumns());
+ refreshBucketList(timestamp);
+ // It is always the last bucket that does the recording.
+ CheckerFrameworkUtils.castNonNull(buckets.peekLast())
+ .record(tagValues, value, attachments, timestamp);
+ }
+
+ @Override
+ ViewData toViewData(Timestamp now, State state) {
+ refreshBucketList(now);
+ if (state == State.ENABLED) {
+ return ViewData.create(
+ super.view,
+ combineBucketsAndGetAggregationMap(now),
+ ViewData.AggregationWindowData.IntervalData.create(now));
+ } else {
+ // If Stats state is DISABLED, return an empty ViewData.
+ return ViewData.create(
+ super.view,
+ Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(),
+ ViewData.AggregationWindowData.IntervalData.create(ZERO_TIMESTAMP));
+ }
+ }
+
+ @Override
+ void clearStats() {
+ for (IntervalBucket bucket : buckets) {
+ bucket.clearStats();
+ }
+ }
+
+ @Override
+ void resumeStatsCollection(Timestamp now) {
+ // Refresh bucket list to be ready for stats recording, so that if record() is called right
+ // after stats state is turned back on, record() will be faster.
+ refreshBucketList(now);
+ }
+
+ // Add new buckets and remove expired buckets by comparing the current timestamp with
+ // timestamp of the last bucket.
+ private void refreshBucketList(Timestamp now) {
+ if (buckets.size() != N + 1) {
+ throw new AssertionError("Bucket list must have exactly " + (N + 1) + " buckets.");
+ }
+ Timestamp startOfLastBucket =
+ CheckerFrameworkUtils.castNonNull(buckets.peekLast()).getStart();
+ // TODO(songya): decide what to do when time goes backwards
+ checkArgument(
+ now.compareTo(startOfLastBucket) >= 0,
+ "Current time must be within or after the last bucket.");
+ long elapsedTimeMillis = now.subtractTimestamp(startOfLastBucket).toMillis();
+ long numOfPadBuckets = elapsedTimeMillis / bucketDuration.toMillis();
+
+ shiftBucketList(numOfPadBuckets, now);
+ }
+
+ // Add specified number of new buckets, and remove expired buckets
+ private void shiftBucketList(long numOfPadBuckets, Timestamp now) {
+ Timestamp startOfNewBucket;
+
+ if (!buckets.isEmpty()) {
+ startOfNewBucket =
+ CheckerFrameworkUtils.castNonNull(buckets.peekLast())
+ .getStart()
+ .addDuration(bucketDuration);
+ } else {
+ // Initialize bucket list. Should only enter this block once.
+ startOfNewBucket = subtractDuration(now, totalDuration);
+ }
+
+ if (numOfPadBuckets > N + 1) {
+ // All current buckets expired, need to add N + 1 new buckets. The start time of the latest
+ // bucket will be current time.
+ startOfNewBucket = subtractDuration(now, totalDuration);
+ numOfPadBuckets = N + 1;
+ }
+
+ for (int i = 0; i < numOfPadBuckets; i++) {
+ buckets.add(
+ new IntervalBucket(
+ startOfNewBucket,
+ bucketDuration,
+ super.view.getAggregation(),
+ super.view.getMeasure()));
+ startOfNewBucket = startOfNewBucket.addDuration(bucketDuration);
+ }
+
+ // removed expired buckets
+ while (buckets.size() > N + 1) {
+ buckets.pollFirst();
+ }
+ }
+
+ // Combine stats within each bucket, aggregate stats by tag values, and return the mapping from
+ // tag values to aggregation data.
+ private Map<List</*@Nullable*/ TagValue>, AggregationData> combineBucketsAndGetAggregationMap(
+ Timestamp now) {
+ // Need to maintain the order of inserted MutableAggregations (inserted based on time order).
+ Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap =
+ LinkedHashMultimap.create();
+
+ ArrayDeque<IntervalBucket> shallowCopy = new ArrayDeque<IntervalBucket>(buckets);
+
+ Aggregation aggregation = super.view.getAggregation();
+ Measure measure = super.view.getMeasure();
+ putBucketsIntoMultiMap(shallowCopy, multimap, aggregation, measure, now);
+ Map<List</*@Nullable*/ TagValue>, MutableAggregation> singleMap =
+ aggregateOnEachTagValueList(multimap, aggregation, measure);
+ return createAggregationMap(singleMap, super.getView().getMeasure());
+ }
+
+ // Put stats within each bucket to a multimap. Each tag value list (map key) could have multiple
+ // mutable aggregations (map value) from different buckets.
+ private static void putBucketsIntoMultiMap(
+ ArrayDeque<IntervalBucket> buckets,
+ Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap,
+ Aggregation aggregation,
+ Measure measure,
+ Timestamp now) {
+ // Put fractional stats of the head (oldest) bucket.
+ IntervalBucket head = CheckerFrameworkUtils.castNonNull(buckets.peekFirst());
+ IntervalBucket tail = CheckerFrameworkUtils.castNonNull(buckets.peekLast());
+ double fractionTail = tail.getFraction(now);
+ // TODO(songya): decide what to do when time goes backwards
+ checkArgument(
+ 0.0 <= fractionTail && fractionTail <= 1.0,
+ "Fraction " + fractionTail + " should be within [0.0, 1.0].");
+ double fractionHead = 1.0 - fractionTail;
+ putFractionalMutableAggregationsToMultiMap(
+ head.getTagValueAggregationMap(), multimap, aggregation, measure, fractionHead);
+
+ // Put whole data of other buckets.
+ boolean shouldSkipFirst = true;
+ for (IntervalBucket bucket : buckets) {
+ if (shouldSkipFirst) {
+ shouldSkipFirst = false;
+ continue; // skip the first bucket
+ }
+ for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry :
+ bucket.getTagValueAggregationMap().entrySet()) {
+ multimap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ // Put stats within one bucket into multimap, multiplied by a given fraction.
+ private static <T> void putFractionalMutableAggregationsToMultiMap(
+ Map<T, MutableAggregation> mutableAggrMap,
+ Multimap<T, MutableAggregation> multimap,
+ Aggregation aggregation,
+ Measure measure,
+ double fraction) {
+ for (Entry<T, MutableAggregation> entry : mutableAggrMap.entrySet()) {
+ // Initially empty MutableAggregations.
+ MutableAggregation fractionalMutableAgg = createMutableAggregation(aggregation, measure);
+ fractionalMutableAgg.combine(entry.getValue(), fraction);
+ multimap.put(entry.getKey(), fractionalMutableAgg);
+ }
+ }
+
+ // For each tag value list (key of AggregationMap), combine mutable aggregations into one
+ // mutable aggregation, thus convert the multimap into a single map.
+ private static <T> Map<T, MutableAggregation> aggregateOnEachTagValueList(
+ Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure) {
+ Map<T, MutableAggregation> map = Maps.newHashMap();
+ for (T tagValues : multimap.keySet()) {
+ // Initially empty MutableAggregations.
+ MutableAggregation combinedAggregation = createMutableAggregation(aggregation, measure);
+ for (MutableAggregation mutableAggregation : multimap.get(tagValues)) {
+ combinedAggregation.combine(mutableAggregation, 1.0);
+ }
+ map.put(tagValues, combinedAggregation);
+ }
+ return map;
+ }
+
+ // Subtract a Duration from a Timestamp, and return a new Timestamp.
+ private static Timestamp subtractDuration(Timestamp timestamp, Duration duration) {
+ return timestamp.addDuration(Duration.create(-duration.getSeconds(), -duration.getNanos()));
+ }
+ }
+
+ private static final class CreateCumulative
+ implements Function<View.AggregationWindow.Cumulative, MutableViewData> {
+ @Override
+ public MutableViewData apply(View.AggregationWindow.Cumulative arg) {
+ return new CumulativeMutableViewData(view, start);
+ }
+
+ private final View view;
+ private final Timestamp start;
+
+ private CreateCumulative(View view, Timestamp start) {
+ this.view = view;
+ this.start = start;
+ }
+ }
+
+ private static final class CreateInterval
+ implements Function<View.AggregationWindow.Interval, MutableViewData> {
+ @Override
+ public MutableViewData apply(View.AggregationWindow.Interval arg) {
+ return new IntervalMutableViewData(view, start);
+ }
+
+ private final View view;
+ private final Timestamp start;
+
+ private CreateInterval(View view, Timestamp start) {
+ this.view = view;
+ this.start = start;
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java b/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java
new file mode 100644
index 00000000..fbb593f5
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/RecordUtils.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import io.opencensus.common.Function;
+import io.opencensus.common.Functions;
+import io.opencensus.implcore.stats.MutableAggregation.MutableCount;
+import io.opencensus.implcore.stats.MutableAggregation.MutableDistribution;
+import io.opencensus.implcore.stats.MutableAggregation.MutableLastValueDouble;
+import io.opencensus.implcore.stats.MutableAggregation.MutableLastValueLong;
+import io.opencensus.implcore.stats.MutableAggregation.MutableMean;
+import io.opencensus.implcore.stats.MutableAggregation.MutableSumDouble;
+import io.opencensus.implcore.stats.MutableAggregation.MutableSumLong;
+import io.opencensus.implcore.tags.TagContextImpl;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.Aggregation.Count;
+import io.opencensus.stats.Aggregation.Distribution;
+import io.opencensus.stats.Aggregation.LastValue;
+import io.opencensus.stats.Aggregation.Sum;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.Measure.MeasureDouble;
+import io.opencensus.stats.Measure.MeasureLong;
+import io.opencensus.stats.Measurement;
+import io.opencensus.stats.Measurement.MeasurementDouble;
+import io.opencensus.stats.Measurement.MeasurementLong;
+import io.opencensus.tags.InternalUtils;
+import io.opencensus.tags.Tag;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.Nullable;
+*/
+
+@SuppressWarnings("deprecation")
+/* Common static utilities for stats recording. */
+final class RecordUtils {
+
+ @javax.annotation.Nullable @VisibleForTesting static final TagValue UNKNOWN_TAG_VALUE = null;
+
+ static Map<TagKey, TagValue> getTagMap(TagContext ctx) {
+ if (ctx instanceof TagContextImpl) {
+ return ((TagContextImpl) ctx).getTags();
+ } else {
+ Map<TagKey, TagValue> tags = Maps.newHashMap();
+ for (Iterator<Tag> i = InternalUtils.getTags(ctx); i.hasNext(); ) {
+ Tag tag = i.next();
+ tags.put(tag.getKey(), tag.getValue());
+ }
+ return tags;
+ }
+ }
+
+ @VisibleForTesting
+ static List</*@Nullable*/ TagValue> getTagValues(
+ Map<? extends TagKey, ? extends TagValue> tags, List<? extends TagKey> columns) {
+ List</*@Nullable*/ TagValue> tagValues = new ArrayList</*@Nullable*/ TagValue>(columns.size());
+ // Record all the measures in a "Greedy" way.
+ // Every view aggregates every measure. This is similar to doing a GROUPBY view’s keys.
+ for (int i = 0; i < columns.size(); ++i) {
+ TagKey tagKey = columns.get(i);
+ if (!tags.containsKey(tagKey)) {
+ // replace not found key values by null.
+ tagValues.add(UNKNOWN_TAG_VALUE);
+ } else {
+ tagValues.add(tags.get(tagKey));
+ }
+ }
+ return tagValues;
+ }
+
+ /**
+ * Create an empty {@link MutableAggregation} based on the given {@link Aggregation}.
+ *
+ * @param aggregation {@code Aggregation}.
+ * @return an empty {@code MutableAggregation}.
+ */
+ @VisibleForTesting
+ static MutableAggregation createMutableAggregation(
+ Aggregation aggregation, final Measure measure) {
+ return aggregation.match(
+ new Function<Sum, MutableAggregation>() {
+ @Override
+ public MutableAggregation apply(Sum arg) {
+ return measure.match(
+ CreateMutableSumDouble.INSTANCE,
+ CreateMutableSumLong.INSTANCE,
+ Functions.<MutableAggregation>throwAssertionError());
+ }
+ },
+ CreateMutableCount.INSTANCE,
+ CreateMutableDistribution.INSTANCE,
+ new Function<LastValue, MutableAggregation>() {
+ @Override
+ public MutableAggregation apply(LastValue arg) {
+ return measure.match(
+ CreateMutableLastValueDouble.INSTANCE,
+ CreateMutableLastValueLong.INSTANCE,
+ Functions.<MutableAggregation>throwAssertionError());
+ }
+ },
+ AggregationDefaultFunction.INSTANCE);
+ }
+
+ // Covert a mapping from TagValues to MutableAggregation, to a mapping from TagValues to
+ // AggregationData.
+ static <T> Map<T, AggregationData> createAggregationMap(
+ Map<T, MutableAggregation> tagValueAggregationMap, Measure measure) {
+ Map<T, AggregationData> map = Maps.newHashMap();
+ for (Entry<T, MutableAggregation> entry : tagValueAggregationMap.entrySet()) {
+ map.put(entry.getKey(), entry.getValue().toAggregationData());
+ }
+ return map;
+ }
+
+ static double getDoubleValueFromMeasurement(Measurement measurement) {
+ return measurement.match(
+ GET_VALUE_FROM_MEASUREMENT_DOUBLE,
+ GET_VALUE_FROM_MEASUREMENT_LONG,
+ Functions.<Double>throwAssertionError());
+ }
+
+ // static inner Function classes
+
+ private static final Function<MeasurementDouble, Double> GET_VALUE_FROM_MEASUREMENT_DOUBLE =
+ new Function<MeasurementDouble, Double>() {
+ @Override
+ public Double apply(MeasurementDouble arg) {
+ return arg.getValue();
+ }
+ };
+
+ private static final Function<MeasurementLong, Double> GET_VALUE_FROM_MEASUREMENT_LONG =
+ new Function<MeasurementLong, Double>() {
+ @Override
+ public Double apply(MeasurementLong arg) {
+ // TODO: consider checking truncation here.
+ return (double) arg.getValue();
+ }
+ };
+
+ private static final class CreateMutableSumDouble
+ implements Function<MeasureDouble, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(MeasureDouble arg) {
+ return MutableSumDouble.create();
+ }
+
+ private static final CreateMutableSumDouble INSTANCE = new CreateMutableSumDouble();
+ }
+
+ private static final class CreateMutableSumLong
+ implements Function<MeasureLong, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(MeasureLong arg) {
+ return MutableSumLong.create();
+ }
+
+ private static final CreateMutableSumLong INSTANCE = new CreateMutableSumLong();
+ }
+
+ private static final class CreateMutableCount implements Function<Count, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(Count arg) {
+ return MutableCount.create();
+ }
+
+ private static final CreateMutableCount INSTANCE = new CreateMutableCount();
+ }
+
+ // TODO(songya): remove this once Mean aggregation is completely removed. Before that
+ // we need to continue supporting Mean, since it could still be used by users and some
+ // deprecated RPC views.
+ private static final class AggregationDefaultFunction
+ implements Function<Aggregation, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(Aggregation arg) {
+ if (arg instanceof Aggregation.Mean) {
+ return MutableMean.create();
+ }
+ throw new IllegalArgumentException("Unknown Aggregation.");
+ }
+
+ private static final AggregationDefaultFunction INSTANCE = new AggregationDefaultFunction();
+ }
+
+ private static final class CreateMutableDistribution
+ implements Function<Distribution, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(Distribution arg) {
+ return MutableDistribution.create(arg.getBucketBoundaries());
+ }
+
+ private static final CreateMutableDistribution INSTANCE = new CreateMutableDistribution();
+ }
+
+ private static final class CreateMutableLastValueDouble
+ implements Function<MeasureDouble, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(MeasureDouble arg) {
+ return MutableLastValueDouble.create();
+ }
+
+ private static final CreateMutableLastValueDouble INSTANCE = new CreateMutableLastValueDouble();
+ }
+
+ private static final class CreateMutableLastValueLong
+ implements Function<MeasureLong, MutableAggregation> {
+ @Override
+ public MutableAggregation apply(MeasureLong arg) {
+ return MutableLastValueLong.create();
+ }
+
+ private static final CreateMutableLastValueLong INSTANCE = new CreateMutableLastValueLong();
+ }
+
+ private RecordUtils() {}
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java
new file mode 100644
index 00000000..741b399b
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsComponentImplBase.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import com.google.common.base.Preconditions;
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.internal.CurrentState;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.implcore.internal.EventQueue;
+import io.opencensus.metrics.Metrics;
+import io.opencensus.metrics.export.MetricProducer;
+import io.opencensus.stats.StatsCollectionState;
+import io.opencensus.stats.StatsComponent;
+
+/** Base implementation of {@link StatsComponent}. */
+public class StatsComponentImplBase extends StatsComponent {
+ private static final State DEFAULT_STATE = State.ENABLED;
+
+ // The State shared between the StatsComponent, StatsRecorder and ViewManager.
+ private final CurrentState currentState = new CurrentState(DEFAULT_STATE);
+
+ private final ViewManagerImpl viewManager;
+ private final StatsRecorderImpl statsRecorder;
+
+ /**
+ * Creates a new {@code StatsComponentImplBase}.
+ *
+ * @param queue the queue implementation.
+ * @param clock the clock to use when recording stats.
+ */
+ public StatsComponentImplBase(EventQueue queue, Clock clock) {
+ StatsManager statsManager = new StatsManager(queue, clock, currentState);
+ this.viewManager = new ViewManagerImpl(statsManager);
+ this.statsRecorder = new StatsRecorderImpl(statsManager);
+
+ // Create a new MetricProducerImpl and register it to MetricProducerManager when
+ // StatsComponentImplBase is initialized.
+ MetricProducer metricProducer = new MetricProducerImpl(statsManager);
+ Metrics.getExportComponent().getMetricProducerManager().add(metricProducer);
+ }
+
+ @Override
+ public ViewManagerImpl getViewManager() {
+ return viewManager;
+ }
+
+ @Override
+ public StatsRecorderImpl getStatsRecorder() {
+ return statsRecorder;
+ }
+
+ @Override
+ public StatsCollectionState getState() {
+ return stateToStatsState(currentState.get());
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public synchronized void setState(StatsCollectionState newState) {
+ boolean stateChanged =
+ currentState.set(statsStateToState(Preconditions.checkNotNull(newState, "newState")));
+ if (stateChanged) {
+ if (newState == StatsCollectionState.DISABLED) {
+ viewManager.clearStats();
+ } else {
+ viewManager.resumeStatsCollection();
+ }
+ }
+ }
+
+ private static State statsStateToState(StatsCollectionState statsCollectionState) {
+ return statsCollectionState == StatsCollectionState.ENABLED ? State.ENABLED : State.DISABLED;
+ }
+
+ private static StatsCollectionState stateToStatsState(State state) {
+ return state == State.ENABLED ? StatsCollectionState.ENABLED : StatsCollectionState.DISABLED;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java
new file mode 100644
index 00000000..17e99d46
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsManager.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.internal.CurrentState;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.implcore.internal.EventQueue;
+import io.opencensus.metrics.export.Metric;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.TagContext;
+import java.util.Collection;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/** Object that stores all views and stats. */
+final class StatsManager {
+
+ private final EventQueue queue;
+
+ // clock used throughout the stats implementation
+ private final Clock clock;
+
+ private final CurrentState state;
+ private final MeasureToViewMap measureToViewMap = new MeasureToViewMap();
+
+ StatsManager(EventQueue queue, Clock clock, CurrentState state) {
+ checkNotNull(queue, "EventQueue");
+ checkNotNull(clock, "Clock");
+ checkNotNull(state, "state");
+ this.queue = queue;
+ this.clock = clock;
+ this.state = state;
+ }
+
+ void registerView(View view) {
+ measureToViewMap.registerView(view, clock);
+ }
+
+ @Nullable
+ ViewData getView(View.Name viewName) {
+ return measureToViewMap.getView(viewName, clock, state.getInternal());
+ }
+
+ Set<View> getExportedViews() {
+ return measureToViewMap.getExportedViews();
+ }
+
+ void record(TagContext tags, MeasureMapInternal measurementValues) {
+ // TODO(songya): consider exposing No-op MeasureMap and use it when stats state is DISABLED, so
+ // that we don't need to create actual MeasureMapImpl.
+ if (state.getInternal() == State.ENABLED) {
+ queue.enqueue(new StatsEvent(this, tags, measurementValues));
+ }
+ }
+
+ Collection<Metric> getMetrics() {
+ return measureToViewMap.getMetrics(clock, state.getInternal());
+ }
+
+ void clearStats() {
+ measureToViewMap.clearStats();
+ }
+
+ void resumeStatsCollection() {
+ measureToViewMap.resumeStatsCollection(clock.now());
+ }
+
+ // An EventQueue entry that records the stats from one call to StatsManager.record(...).
+ private static final class StatsEvent implements EventQueue.Entry {
+ private final TagContext tags;
+ private final MeasureMapInternal stats;
+ private final StatsManager statsManager;
+
+ StatsEvent(StatsManager statsManager, TagContext tags, MeasureMapInternal stats) {
+ this.statsManager = statsManager;
+ this.tags = tags;
+ this.stats = stats;
+ }
+
+ @Override
+ public void process() {
+ // Add Timestamp to value after it went through the DisruptorQueue.
+ statsManager.measureToViewMap.record(tags, stats, statsManager.clock.now());
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java
new file mode 100644
index 00000000..f9ebea41
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/StatsRecorderImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.stats.StatsRecorder;
+
+/** Implementation of {@link StatsRecorder}. */
+public final class StatsRecorderImpl extends StatsRecorder {
+ private final StatsManager statsManager;
+
+ StatsRecorderImpl(StatsManager statsManager) {
+ checkNotNull(statsManager, "StatsManager");
+ this.statsManager = statsManager;
+ }
+
+ @Override
+ public MeasureMapImpl newMeasureMap() {
+ return MeasureMapImpl.create(statsManager);
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java
new file mode 100644
index 00000000..20ea97f8
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/stats/ViewManagerImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.stats;
+
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.stats.ViewManager;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/** Implementation of {@link ViewManager}. */
+public final class ViewManagerImpl extends ViewManager {
+ private final StatsManager statsManager;
+
+ ViewManagerImpl(StatsManager statsManager) {
+ this.statsManager = statsManager;
+ }
+
+ @Override
+ public void registerView(View view) {
+ statsManager.registerView(view);
+ }
+
+ @Override
+ @Nullable
+ public ViewData getView(View.Name viewName) {
+ return statsManager.getView(viewName);
+ }
+
+ @Override
+ public Set<View> getAllExportedViews() {
+ return statsManager.getExportedViews();
+ }
+
+ void clearStats() {
+ statsManager.clearStats();
+ }
+
+ void resumeStatsCollection() {
+ statsManager.resumeStatsCollection();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/CurrentTagContextUtils.java b/impl_core/src/main/java/io/opencensus/implcore/tags/CurrentTagContextUtils.java
new file mode 100644
index 00000000..e6bb12f5
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/tags/CurrentTagContextUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.tags;
+
+import io.grpc.Context;
+import io.opencensus.common.Scope;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.unsafe.ContextUtils;
+
+/**
+ * Utility methods for accessing the {@link TagContext} contained in the {@link io.grpc.Context}.
+ */
+final class CurrentTagContextUtils {
+
+ private CurrentTagContextUtils() {}
+
+ /**
+ * Returns the {@link TagContext} from the current context.
+ *
+ * @return the {@code TagContext} from the current context.
+ */
+ static TagContext getCurrentTagContext() {
+ return ContextUtils.TAG_CONTEXT_KEY.get();
+ }
+
+ /**
+ * Enters the scope of code where the given {@link TagContext} is in the current context and
+ * returns an object that represents that scope. The scope is exited when the returned object is
+ * closed.
+ *
+ * @param tags the {@code TagContext} to be set to the current context.
+ * @return an object that defines a scope where the given {@code TagContext} is set to the current
+ * context.
+ */
+ static Scope withTagContext(TagContext tags) {
+ return new WithTagContext(tags);
+ }
+
+ private static final class WithTagContext implements Scope {
+
+ private final Context orig;
+
+ /**
+ * Constructs a new {@link WithTagContext}.
+ *
+ * @param tags the {@code TagContext} to be added to the current {@code Context}.
+ */
+ private WithTagContext(TagContext tags) {
+ orig = Context.current().withValue(ContextUtils.TAG_CONTEXT_KEY, tags).attach();
+ }
+
+ @Override
+ public void close() {
+ Context.current().detach(orig);
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/NoopTagContextBuilder.java b/impl_core/src/main/java/io/opencensus/implcore/tags/NoopTagContextBuilder.java
new file mode 100644
index 00000000..eae54c5d
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/tags/NoopTagContextBuilder.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.tags;
+
+import io.opencensus.common.Scope;
+import io.opencensus.implcore.internal.NoopScope;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagContextBuilder;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+
+/** {@link TagContextBuilder} that is used when tagging is disabled. */
+final class NoopTagContextBuilder extends TagContextBuilder {
+ static final NoopTagContextBuilder INSTANCE = new NoopTagContextBuilder();
+
+ private NoopTagContextBuilder() {}
+
+ @Override
+ public TagContextBuilder put(TagKey key, TagValue value) {
+ return this;
+ }
+
+ @Override
+ public TagContextBuilder remove(TagKey key) {
+ return this;
+ }
+
+ @Override
+ public TagContext build() {
+ return TagContextImpl.EMPTY;
+ }
+
+ @Override
+ public Scope buildScoped() {
+ return NoopScope.getInstance();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextBuilderImpl.java b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextBuilderImpl.java
new file mode 100644
index 00000000..a17198d8
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextBuilderImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.tags;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.common.Scope;
+import io.opencensus.tags.TagContextBuilder;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import java.util.HashMap;
+import java.util.Map;
+
+final class TagContextBuilderImpl extends TagContextBuilder {
+ private final Map<TagKey, TagValue> tags;
+
+ TagContextBuilderImpl(Map<TagKey, TagValue> tags) {
+ this.tags = new HashMap<TagKey, TagValue>(tags);
+ }
+
+ TagContextBuilderImpl() {
+ this.tags = new HashMap<TagKey, TagValue>();
+ }
+
+ @Override
+ public TagContextBuilderImpl put(TagKey key, TagValue value) {
+ tags.put(checkNotNull(key, "key"), checkNotNull(value, "value"));
+ return this;
+ }
+
+ @Override
+ public TagContextBuilderImpl remove(TagKey key) {
+ tags.remove(checkNotNull(key, "key"));
+ return this;
+ }
+
+ @Override
+ public TagContextImpl build() {
+ return new TagContextImpl(tags);
+ }
+
+ @Override
+ public Scope buildScoped() {
+ return CurrentTagContextUtils.withTagContext(build());
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextImpl.java b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextImpl.java
new file mode 100644
index 00000000..f7a8ff82
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.tags;
+
+import io.opencensus.tags.Tag;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+
+@Immutable
+public final class TagContextImpl extends TagContext {
+
+ public static final TagContextImpl EMPTY =
+ new TagContextImpl(Collections.<TagKey, TagValue>emptyMap());
+
+ // The types of the TagKey and value must match for each entry.
+ private final Map<TagKey, TagValue> tags;
+
+ public TagContextImpl(Map<? extends TagKey, ? extends TagValue> tags) {
+ this.tags = Collections.unmodifiableMap(new HashMap<TagKey, TagValue>(tags));
+ }
+
+ public Map<TagKey, TagValue> getTags() {
+ return tags;
+ }
+
+ @Override
+ protected Iterator<Tag> getIterator() {
+ return new TagIterator(tags);
+ }
+
+ @Override
+ public boolean equals(@Nullable Object other) {
+ // Directly compare the tags when both objects are TagContextImpls, for efficiency.
+ if (other instanceof TagContextImpl) {
+ return getTags().equals(((TagContextImpl) other).getTags());
+ }
+ return super.equals(other);
+ }
+
+ private static final class TagIterator implements Iterator<Tag> {
+ Iterator<Map.Entry<TagKey, TagValue>> iterator;
+
+ TagIterator(Map<TagKey, TagValue> tags) {
+ iterator = tags.entrySet().iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Tag next() {
+ final Entry<TagKey, TagValue> next = iterator.next();
+ return Tag.create(next.getKey(), next.getValue());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("TagIterator.remove()");
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextUtils.java b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextUtils.java
new file mode 100644
index 00000000..5fbc5050
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/tags/TagContextUtils.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.tags;
+
+import io.opencensus.tags.Tag;
+
+final class TagContextUtils {
+ private TagContextUtils() {}
+
+ /**
+ * Add a {@code Tag} of any type to a builder.
+ *
+ * @param tag tag containing the key and value to set.
+ * @param builder the builder to update.
+ */
+ static void addTagToBuilder(Tag tag, TagContextBuilderImpl builder) {
+ builder.put(tag.getKey(), tag.getValue());
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/TaggerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/tags/TaggerImpl.java
new file mode 100644
index 00000000..dcf9a1b7
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/tags/TaggerImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.tags;
+
+import io.opencensus.common.Scope;
+import io.opencensus.implcore.internal.CurrentState;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.implcore.internal.NoopScope;
+import io.opencensus.tags.InternalUtils;
+import io.opencensus.tags.Tag;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagContextBuilder;
+import io.opencensus.tags.Tagger;
+import java.util.Iterator;
+
+/** Implementation of {@link Tagger}. */
+public final class TaggerImpl extends Tagger {
+ // All methods in this class use TagContextImpl and TagContextBuilderImpl. For example,
+ // withTagContext(...) always puts a TagContextImpl into scope, even if the argument is another
+ // TagContext subclass.
+
+ private final CurrentState state;
+
+ TaggerImpl(CurrentState state) {
+ this.state = state;
+ }
+
+ @Override
+ public TagContextImpl empty() {
+ return TagContextImpl.EMPTY;
+ }
+
+ @Override
+ public TagContextImpl getCurrentTagContext() {
+ return state.getInternal() == State.DISABLED
+ ? TagContextImpl.EMPTY
+ : toTagContextImpl(CurrentTagContextUtils.getCurrentTagContext());
+ }
+
+ @Override
+ public TagContextBuilder emptyBuilder() {
+ return state.getInternal() == State.DISABLED
+ ? NoopTagContextBuilder.INSTANCE
+ : new TagContextBuilderImpl();
+ }
+
+ @Override
+ public TagContextBuilder currentBuilder() {
+ return state.getInternal() == State.DISABLED
+ ? NoopTagContextBuilder.INSTANCE
+ : toBuilder(CurrentTagContextUtils.getCurrentTagContext());
+ }
+
+ @Override
+ public TagContextBuilder toBuilder(TagContext tags) {
+ return state.getInternal() == State.DISABLED
+ ? NoopTagContextBuilder.INSTANCE
+ : toTagContextBuilderImpl(tags);
+ }
+
+ @Override
+ public Scope withTagContext(TagContext tags) {
+ return state.getInternal() == State.DISABLED
+ ? NoopScope.getInstance()
+ : CurrentTagContextUtils.withTagContext(toTagContextImpl(tags));
+ }
+
+ private static TagContextImpl toTagContextImpl(TagContext tags) {
+ if (tags instanceof TagContextImpl) {
+ return (TagContextImpl) tags;
+ } else {
+ Iterator<Tag> i = InternalUtils.getTags(tags);
+ if (!i.hasNext()) {
+ return TagContextImpl.EMPTY;
+ }
+ TagContextBuilderImpl builder = new TagContextBuilderImpl();
+ while (i.hasNext()) {
+ Tag tag = i.next();
+ if (tag != null) {
+ TagContextUtils.addTagToBuilder(tag, builder);
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ private static TagContextBuilderImpl toTagContextBuilderImpl(TagContext tags) {
+ // Copy the tags more efficiently in the expected case, when the TagContext is a TagContextImpl.
+ if (tags instanceof TagContextImpl) {
+ return new TagContextBuilderImpl(((TagContextImpl) tags).getTags());
+ } else {
+ TagContextBuilderImpl builder = new TagContextBuilderImpl();
+ for (Iterator<Tag> i = InternalUtils.getTags(tags); i.hasNext(); ) {
+ Tag tag = i.next();
+ if (tag != null) {
+ TagContextUtils.addTagToBuilder(tag, builder);
+ }
+ }
+ return builder;
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/TagsComponentImplBase.java b/impl_core/src/main/java/io/opencensus/implcore/tags/TagsComponentImplBase.java
new file mode 100644
index 00000000..88c31bae
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/tags/TagsComponentImplBase.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.tags;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.implcore.internal.CurrentState;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.implcore.tags.propagation.TagPropagationComponentImpl;
+import io.opencensus.tags.Tagger;
+import io.opencensus.tags.TaggingState;
+import io.opencensus.tags.TagsComponent;
+import io.opencensus.tags.propagation.TagPropagationComponent;
+
+/** Base implementation of {@link TagsComponent}. */
+public class TagsComponentImplBase extends TagsComponent {
+ private static final State DEFAULT_STATE = State.ENABLED;
+
+ // The State shared between the TagsComponent, Tagger, and TagPropagationComponent
+ private final CurrentState currentState = new CurrentState(DEFAULT_STATE);
+
+ private final Tagger tagger = new TaggerImpl(currentState);
+ private final TagPropagationComponent tagPropagationComponent =
+ new TagPropagationComponentImpl(currentState);
+
+ @Override
+ public Tagger getTagger() {
+ return tagger;
+ }
+
+ @Override
+ public TagPropagationComponent getTagPropagationComponent() {
+ return tagPropagationComponent;
+ }
+
+ @Override
+ public TaggingState getState() {
+ return stateToTaggingState(currentState.get());
+ }
+
+ @Override
+ @Deprecated
+ public void setState(TaggingState newState) {
+ currentState.set(taggingStateToState(checkNotNull(newState, "newState")));
+ }
+
+ private static State taggingStateToState(TaggingState taggingState) {
+ return taggingState == TaggingState.ENABLED ? State.ENABLED : State.DISABLED;
+ }
+
+ private static TaggingState stateToTaggingState(State state) {
+ return state == State.ENABLED ? TaggingState.ENABLED : TaggingState.DISABLED;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/SerializationUtils.java b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/SerializationUtils.java
new file mode 100644
index 00000000..2daad95e
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/SerializationUtils.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2016-17, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.tags.propagation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import io.opencensus.implcore.internal.VarInt;
+import io.opencensus.implcore.tags.TagContextImpl;
+import io.opencensus.tags.InternalUtils;
+import io.opencensus.tags.Tag;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.propagation.TagContextDeserializationException;
+import io.opencensus.tags.propagation.TagContextSerializationException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Methods for serializing and deserializing {@link TagContext}s.
+ *
+ * <p>The format defined in this class is shared across all implementations of OpenCensus. It allows
+ * tags to propagate across requests.
+ *
+ * <p>OpenCensus tag context encoding:
+ *
+ * <ul>
+ * <li>Tags are encoded in single byte sequence. The version 0 format is:
+ * <li>{@code <version_id><encoded_tags>}
+ * <li>{@code <version_id> == a single byte, value 0}
+ * <li>{@code <encoded_tags> == (<tag_field_id><tag_encoding>)*}
+ * <ul>
+ * <li>{@code <tag_field_id>} == a single byte, value 0
+ * <li>{@code <tag_encoding>}:
+ * <ul>
+ * <li>{@code <tag_key_len><tag_key><tag_val_len><tag_val>}
+ * <ul>
+ * <li>{@code <tag_key_len>} == varint encoded integer
+ * <li>{@code <tag_key>} == tag_key_len bytes comprising tag key name
+ * <li>{@code <tag_val_len>} == varint encoded integer
+ * <li>{@code <tag_val>} == tag_val_len bytes comprising UTF-8 string
+ * </ul>
+ * </ul>
+ * </ul>
+ * </ul>
+ */
+final class SerializationUtils {
+ private SerializationUtils() {}
+
+ @VisibleForTesting static final int VERSION_ID = 0;
+ @VisibleForTesting static final int TAG_FIELD_ID = 0;
+ // This size limit only applies to the bytes representing tag keys and values.
+ @VisibleForTesting static final int TAGCONTEXT_SERIALIZED_SIZE_LIMIT = 8192;
+
+ // Serializes a TagContext to the on-the-wire format.
+ // Encoded tags are of the form: <version_id><encoded_tags>
+ static byte[] serializeBinary(TagContext tags) throws TagContextSerializationException {
+ // Use a ByteArrayDataOutput to avoid needing to handle IOExceptions.
+ final ByteArrayDataOutput byteArrayDataOutput = ByteStreams.newDataOutput();
+ byteArrayDataOutput.write(VERSION_ID);
+ int totalChars = 0; // Here chars are equivalent to bytes, since we're using ascii chars.
+ for (Iterator<Tag> i = InternalUtils.getTags(tags); i.hasNext(); ) {
+ Tag tag = i.next();
+ totalChars += tag.getKey().getName().length();
+ totalChars += tag.getValue().asString().length();
+ encodeTag(tag, byteArrayDataOutput);
+ }
+ if (totalChars > TAGCONTEXT_SERIALIZED_SIZE_LIMIT) {
+ throw new TagContextSerializationException(
+ "Size of TagContext exceeds the maximum serialized size "
+ + TAGCONTEXT_SERIALIZED_SIZE_LIMIT);
+ }
+ return byteArrayDataOutput.toByteArray();
+ }
+
+ // Deserializes input to TagContext based on the binary format standard.
+ // The encoded tags are of the form: <version_id><encoded_tags>
+ static TagContextImpl deserializeBinary(byte[] bytes) throws TagContextDeserializationException {
+ try {
+ if (bytes.length == 0) {
+ // Does not allow empty byte array.
+ throw new TagContextDeserializationException("Input byte[] can not be empty.");
+ }
+
+ ByteBuffer buffer = ByteBuffer.wrap(bytes).asReadOnlyBuffer();
+ int versionId = buffer.get();
+ if (versionId > VERSION_ID || versionId < 0) {
+ throw new TagContextDeserializationException(
+ "Wrong Version ID: " + versionId + ". Currently supports version up to: " + VERSION_ID);
+ }
+ return new TagContextImpl(parseTags(buffer));
+ } catch (BufferUnderflowException exn) {
+ throw new TagContextDeserializationException(exn.toString()); // byte array format error.
+ }
+ }
+
+ private static Map<TagKey, TagValue> parseTags(ByteBuffer buffer)
+ throws TagContextDeserializationException {
+ Map<TagKey, TagValue> tags = new HashMap<TagKey, TagValue>();
+ int limit = buffer.limit();
+ int totalChars = 0; // Here chars are equivalent to bytes, since we're using ascii chars.
+ while (buffer.position() < limit) {
+ int type = buffer.get();
+ if (type == TAG_FIELD_ID) {
+ TagKey key = createTagKey(decodeString(buffer));
+ TagValue val = createTagValue(key, decodeString(buffer));
+ totalChars += key.getName().length();
+ totalChars += val.asString().length();
+ tags.put(key, val);
+ } else {
+ // Stop parsing at the first unknown field ID, since there is no way to know its length.
+ // TODO(sebright): Consider storing the rest of the byte array in the TagContext.
+ break;
+ }
+ }
+ if (totalChars > TAGCONTEXT_SERIALIZED_SIZE_LIMIT) {
+ throw new TagContextDeserializationException(
+ "Size of TagContext exceeds the maximum serialized size "
+ + TAGCONTEXT_SERIALIZED_SIZE_LIMIT);
+ }
+ return tags;
+ }
+
+ // TODO(sebright): Consider exposing a TagKey name validation method to avoid needing to catch an
+ // IllegalArgumentException here.
+ private static final TagKey createTagKey(String name) throws TagContextDeserializationException {
+ try {
+ return TagKey.create(name);
+ } catch (IllegalArgumentException e) {
+ throw new TagContextDeserializationException("Invalid tag key: " + name, e);
+ }
+ }
+
+ // TODO(sebright): Consider exposing a TagValue validation method to avoid needing to catch
+ // an IllegalArgumentException here.
+ private static final TagValue createTagValue(TagKey key, String value)
+ throws TagContextDeserializationException {
+ try {
+ return TagValue.create(value);
+ } catch (IllegalArgumentException e) {
+ throw new TagContextDeserializationException(
+ "Invalid tag value for key " + key + ": " + value, e);
+ }
+ }
+
+ private static final void encodeTag(Tag tag, ByteArrayDataOutput byteArrayDataOutput) {
+ byteArrayDataOutput.write(TAG_FIELD_ID);
+ encodeString(tag.getKey().getName(), byteArrayDataOutput);
+ encodeString(tag.getValue().asString(), byteArrayDataOutput);
+ }
+
+ private static final void encodeString(String input, ByteArrayDataOutput byteArrayDataOutput) {
+ putVarInt(input.length(), byteArrayDataOutput);
+ byteArrayDataOutput.write(input.getBytes(Charsets.UTF_8));
+ }
+
+ private static final void putVarInt(int input, ByteArrayDataOutput byteArrayDataOutput) {
+ byte[] output = new byte[VarInt.varIntSize(input)];
+ VarInt.putVarInt(input, output, 0);
+ byteArrayDataOutput.write(output);
+ }
+
+ private static final String decodeString(ByteBuffer buffer) {
+ int length = VarInt.getVarInt(buffer);
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ builder.append((char) buffer.get());
+ }
+ return builder.toString();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagContextBinarySerializerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagContextBinarySerializerImpl.java
new file mode 100644
index 00000000..5a25da5b
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagContextBinarySerializerImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.tags.propagation;
+
+import io.opencensus.implcore.internal.CurrentState;
+import io.opencensus.implcore.internal.CurrentState.State;
+import io.opencensus.implcore.tags.TagContextImpl;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.propagation.TagContextBinarySerializer;
+import io.opencensus.tags.propagation.TagContextDeserializationException;
+import io.opencensus.tags.propagation.TagContextSerializationException;
+
+final class TagContextBinarySerializerImpl extends TagContextBinarySerializer {
+ private static final byte[] EMPTY_BYTE_ARRAY = {};
+
+ private final CurrentState state;
+
+ TagContextBinarySerializerImpl(CurrentState state) {
+ this.state = state;
+ }
+
+ @Override
+ public byte[] toByteArray(TagContext tags) throws TagContextSerializationException {
+ return state.getInternal() == State.DISABLED
+ ? EMPTY_BYTE_ARRAY
+ : SerializationUtils.serializeBinary(tags);
+ }
+
+ @Override
+ public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationException {
+ return state.getInternal() == State.DISABLED
+ ? TagContextImpl.EMPTY
+ : SerializationUtils.deserializeBinary(bytes);
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagPropagationComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagPropagationComponentImpl.java
new file mode 100644
index 00000000..9ba0da40
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/tags/propagation/TagPropagationComponentImpl.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.tags.propagation;
+
+import io.opencensus.implcore.internal.CurrentState;
+import io.opencensus.tags.propagation.TagContextBinarySerializer;
+import io.opencensus.tags.propagation.TagPropagationComponent;
+
+/** Implementation of {@link TagPropagationComponent}. */
+public final class TagPropagationComponentImpl extends TagPropagationComponent {
+ private final TagContextBinarySerializer tagContextBinarySerializer;
+
+ public TagPropagationComponentImpl(CurrentState state) {
+ tagContextBinarySerializer = new TagContextBinarySerializerImpl(state);
+ }
+
+ @Override
+ public TagContextBinarySerializer getBinarySerializer() {
+ return tagContextBinarySerializer;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/NoRecordEventsSpanImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/NoRecordEventsSpanImpl.java
new file mode 100644
index 00000000..8a5f8e05
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/NoRecordEventsSpanImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace;
+
+import com.google.common.base.Preconditions;
+import io.opencensus.trace.Annotation;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.EndSpanOptions;
+import io.opencensus.trace.Link;
+import io.opencensus.trace.Span;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.Status;
+import java.util.EnumSet;
+import java.util.Map;
+
+/** Implementation for the {@link Span} class that does not record trace events. */
+final class NoRecordEventsSpanImpl extends Span {
+
+ private static final EnumSet<Options> NOT_RECORD_EVENTS_SPAN_OPTIONS =
+ EnumSet.noneOf(Span.Options.class);
+
+ static NoRecordEventsSpanImpl create(SpanContext context) {
+ return new NoRecordEventsSpanImpl(context);
+ }
+
+ @Override
+ public void addAnnotation(String description, Map<String, AttributeValue> attributes) {
+ Preconditions.checkNotNull(description, "description");
+ Preconditions.checkNotNull(attributes, "attribute");
+ }
+
+ @Override
+ public void addAnnotation(Annotation annotation) {
+ Preconditions.checkNotNull(annotation, "annotation");
+ }
+
+ @Override
+ public void putAttribute(String key, AttributeValue value) {
+ Preconditions.checkNotNull(key, "key");
+ Preconditions.checkNotNull(value, "value");
+ }
+
+ @Override
+ public void putAttributes(Map<String, AttributeValue> attributes) {
+ Preconditions.checkNotNull(attributes, "attributes");
+ }
+
+ @Override
+ public void addMessageEvent(io.opencensus.trace.MessageEvent messageEvent) {
+ Preconditions.checkNotNull(messageEvent, "messageEvent");
+ }
+
+ @Override
+ public void addLink(Link link) {
+ Preconditions.checkNotNull(link, "link");
+ }
+
+ @Override
+ public void setStatus(Status status) {
+ Preconditions.checkNotNull(status, "status");
+ }
+
+ @Override
+ public void end(EndSpanOptions options) {
+ Preconditions.checkNotNull(options, "options");
+ }
+
+ private NoRecordEventsSpanImpl(SpanContext context) {
+ super(context, NOT_RECORD_EVENTS_SPAN_OPTIONS);
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/RecordEventsSpanImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/RecordEventsSpanImpl.java
new file mode 100644
index 00000000..af3545bc
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/RecordEventsSpanImpl.java
@@ -0,0 +1,579 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.EvictingQueue;
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.internal.CheckerFrameworkUtils;
+import io.opencensus.implcore.internal.TimestampConverter;
+import io.opencensus.implcore.trace.internal.ConcurrentIntrusiveList.Element;
+import io.opencensus.trace.Annotation;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.EndSpanOptions;
+import io.opencensus.trace.Link;
+import io.opencensus.trace.Span;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.SpanId;
+import io.opencensus.trace.Status;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.config.TraceParams;
+import io.opencensus.trace.export.SpanData;
+import io.opencensus.trace.export.SpanData.TimedEvent;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+// TODO(hailongwen): remove the usage of `NetworkEvent` in the future.
+/** Implementation for the {@link Span} class that records trace events. */
+@ThreadSafe
+public final class RecordEventsSpanImpl extends Span implements Element<RecordEventsSpanImpl> {
+ private static final Logger logger = Logger.getLogger(Tracer.class.getName());
+
+ private static final EnumSet<Span.Options> RECORD_EVENTS_SPAN_OPTIONS =
+ EnumSet.of(Span.Options.RECORD_EVENTS);
+
+ // The parent SpanId of this span. Null if this is a root span.
+ @Nullable private final SpanId parentSpanId;
+ // True if the parent is on a different process.
+ @Nullable private final Boolean hasRemoteParent;
+ // Active trace params when the Span was created.
+ private final TraceParams traceParams;
+ // Handler called when the span starts and ends.
+ private final StartEndHandler startEndHandler;
+ // The displayed name of the span.
+ private final String name;
+ // The kind of the span.
+ @Nullable private final Kind kind;
+ // The clock used to get the time.
+ private final Clock clock;
+ // The time converter used to convert nano time to Timestamp. This is needed because Java has
+ // millisecond granularity for Timestamp and tracing events are recorded more often.
+ @Nullable private final TimestampConverter timestampConverter;
+ // The start time of the span.
+ private final long startNanoTime;
+ // Set of recorded attributes. DO NOT CALL any other method that changes the ordering of events.
+ @GuardedBy("this")
+ @Nullable
+ private AttributesWithCapacity attributes;
+ // List of recorded annotations.
+ @GuardedBy("this")
+ @Nullable
+ private TraceEvents<EventWithNanoTime<Annotation>> annotations;
+ // List of recorded network events.
+ @GuardedBy("this")
+ @Nullable
+ private TraceEvents<EventWithNanoTime<io.opencensus.trace.MessageEvent>> messageEvents;
+ // List of recorded links to parent and child spans.
+ @GuardedBy("this")
+ @Nullable
+ private TraceEvents<Link> links;
+ // The status of the span.
+ @GuardedBy("this")
+ @Nullable
+ private Status status;
+ // The end time of the span.
+ @GuardedBy("this")
+ private long endNanoTime;
+ // True if the span is ended.
+ @GuardedBy("this")
+ private boolean hasBeenEnded;
+
+ @GuardedBy("this")
+ private boolean sampleToLocalSpanStore;
+
+ // Pointers for the ConcurrentIntrusiveList$Element. Guarded by the ConcurrentIntrusiveList.
+ @Nullable private RecordEventsSpanImpl next = null;
+ @Nullable private RecordEventsSpanImpl prev = null;
+
+ /**
+ * Creates and starts a span with the given configuration.
+ *
+ * @param context supplies the trace_id and span_id for the newly started span.
+ * @param name the displayed name for the new span.
+ * @param parentSpanId the span_id of the parent span, or null if the new span is a root span.
+ * @param hasRemoteParent {@code true} if the parentContext is remote. {@code null} if this is a
+ * root span.
+ * @param traceParams trace parameters like sampler and probability.
+ * @param startEndHandler handler called when the span starts and ends.
+ * @param timestampConverter null if the span is a root span or the parent is not sampled. If the
+ * parent is sampled, we should use the same converter to ensure ordering between tracing
+ * events.
+ * @param clock the clock used to get the time.
+ * @return a new and started span.
+ */
+ @VisibleForTesting
+ public static RecordEventsSpanImpl startSpan(
+ SpanContext context,
+ String name,
+ @Nullable Kind kind,
+ @Nullable SpanId parentSpanId,
+ @Nullable Boolean hasRemoteParent,
+ TraceParams traceParams,
+ StartEndHandler startEndHandler,
+ @Nullable TimestampConverter timestampConverter,
+ Clock clock) {
+ RecordEventsSpanImpl span =
+ new RecordEventsSpanImpl(
+ context,
+ name,
+ kind,
+ parentSpanId,
+ hasRemoteParent,
+ traceParams,
+ startEndHandler,
+ timestampConverter,
+ clock);
+ // Call onStart here instead of calling in the constructor to make sure the span is completely
+ // initialized.
+ startEndHandler.onStart(span);
+ return span;
+ }
+
+ /**
+ * Returns the name of the {@code Span}.
+ *
+ * @return the name of the {@code Span}.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the status of the {@code Span}. If not set defaults to {@link Status#OK}.
+ *
+ * @return the status of the {@code Span}.
+ */
+ public Status getStatus() {
+ synchronized (this) {
+ return getStatusWithDefault();
+ }
+ }
+
+ /**
+ * Returns the end nano time (see {@link System#nanoTime()}). If the current {@code Span} is not
+ * ended then returns {@link Clock#nowNanos()}.
+ *
+ * @return the end nano time.
+ */
+ public long getEndNanoTime() {
+ synchronized (this) {
+ return hasBeenEnded ? endNanoTime : clock.nowNanos();
+ }
+ }
+
+ /**
+ * Returns the latency of the {@code Span} in nanos. If still active then returns now() - start
+ * time.
+ *
+ * @return the latency of the {@code Span} in nanos.
+ */
+ public long getLatencyNs() {
+ synchronized (this) {
+ return hasBeenEnded ? endNanoTime - startNanoTime : clock.nowNanos() - startNanoTime;
+ }
+ }
+
+ /**
+ * Returns if the name of this {@code Span} must be register to the {@code SampledSpanStore}.
+ *
+ * @return if the name of this {@code Span} must be register to the {@code SampledSpanStore}.
+ */
+ public boolean getSampleToLocalSpanStore() {
+ synchronized (this) {
+ checkState(hasBeenEnded, "Running span does not have the SampleToLocalSpanStore set.");
+ return sampleToLocalSpanStore;
+ }
+ }
+
+ /**
+ * Returns the kind of this {@code Span}.
+ *
+ * @return the kind of this {@code Span}.
+ */
+ @Nullable
+ public Kind getKind() {
+ return kind;
+ }
+
+ /**
+ * Returns the {@code TimestampConverter} used by this {@code Span}.
+ *
+ * @return the {@code TimestampConverter} used by this {@code Span}.
+ */
+ @Nullable
+ TimestampConverter getTimestampConverter() {
+ return timestampConverter;
+ }
+
+ /**
+ * Returns an immutable representation of all the data from this {@code Span}.
+ *
+ * @return an immutable representation of all the data from this {@code Span}.
+ * @throws IllegalStateException if the Span doesn't have RECORD_EVENTS option.
+ */
+ public SpanData toSpanData() {
+ synchronized (this) {
+ SpanData.Attributes attributesSpanData =
+ attributes == null
+ ? SpanData.Attributes.create(Collections.<String, AttributeValue>emptyMap(), 0)
+ : SpanData.Attributes.create(attributes, attributes.getNumberOfDroppedAttributes());
+ SpanData.TimedEvents<Annotation> annotationsSpanData =
+ createTimedEvents(getInitializedAnnotations(), timestampConverter);
+ SpanData.TimedEvents<io.opencensus.trace.MessageEvent> messageEventsSpanData =
+ createTimedEvents(getInitializedNetworkEvents(), timestampConverter);
+ SpanData.Links linksSpanData =
+ links == null
+ ? SpanData.Links.create(Collections.<Link>emptyList(), 0)
+ : SpanData.Links.create(
+ new ArrayList<Link>(links.events), links.getNumberOfDroppedEvents());
+ return SpanData.create(
+ getContext(),
+ parentSpanId,
+ hasRemoteParent,
+ name,
+ kind,
+ CheckerFrameworkUtils.castNonNull(timestampConverter).convertNanoTime(startNanoTime),
+ attributesSpanData,
+ annotationsSpanData,
+ messageEventsSpanData,
+ linksSpanData,
+ null, // Not supported yet.
+ hasBeenEnded ? getStatusWithDefault() : null,
+ hasBeenEnded
+ ? CheckerFrameworkUtils.castNonNull(timestampConverter).convertNanoTime(endNanoTime)
+ : null);
+ }
+ }
+
+ @Override
+ public void putAttribute(String key, AttributeValue value) {
+ Preconditions.checkNotNull(key, "key");
+ Preconditions.checkNotNull(value, "value");
+ synchronized (this) {
+ if (hasBeenEnded) {
+ logger.log(Level.FINE, "Calling putAttributes() on an ended Span.");
+ return;
+ }
+ getInitializedAttributes().putAttribute(key, value);
+ }
+ }
+
+ @Override
+ public void putAttributes(Map<String, AttributeValue> attributes) {
+ Preconditions.checkNotNull(attributes, "attributes");
+ synchronized (this) {
+ if (hasBeenEnded) {
+ logger.log(Level.FINE, "Calling putAttributes() on an ended Span.");
+ return;
+ }
+ getInitializedAttributes().putAttributes(attributes);
+ }
+ }
+
+ @Override
+ public void addAnnotation(String description, Map<String, AttributeValue> attributes) {
+ Preconditions.checkNotNull(description, "description");
+ Preconditions.checkNotNull(attributes, "attribute");
+ synchronized (this) {
+ if (hasBeenEnded) {
+ logger.log(Level.FINE, "Calling addAnnotation() on an ended Span.");
+ return;
+ }
+ getInitializedAnnotations()
+ .addEvent(
+ new EventWithNanoTime<Annotation>(
+ clock.nowNanos(),
+ Annotation.fromDescriptionAndAttributes(description, attributes)));
+ }
+ }
+
+ @Override
+ public void addAnnotation(Annotation annotation) {
+ Preconditions.checkNotNull(annotation, "annotation");
+ synchronized (this) {
+ if (hasBeenEnded) {
+ logger.log(Level.FINE, "Calling addAnnotation() on an ended Span.");
+ return;
+ }
+ getInitializedAnnotations()
+ .addEvent(new EventWithNanoTime<Annotation>(clock.nowNanos(), annotation));
+ }
+ }
+
+ @Override
+ public void addMessageEvent(io.opencensus.trace.MessageEvent messageEvent) {
+ Preconditions.checkNotNull(messageEvent, "messageEvent");
+ synchronized (this) {
+ if (hasBeenEnded) {
+ logger.log(Level.FINE, "Calling addNetworkEvent() on an ended Span.");
+ return;
+ }
+ getInitializedNetworkEvents()
+ .addEvent(
+ new EventWithNanoTime<io.opencensus.trace.MessageEvent>(
+ clock.nowNanos(), checkNotNull(messageEvent, "networkEvent")));
+ }
+ }
+
+ @Override
+ public void addLink(Link link) {
+ Preconditions.checkNotNull(link, "link");
+ synchronized (this) {
+ if (hasBeenEnded) {
+ logger.log(Level.FINE, "Calling addLink() on an ended Span.");
+ return;
+ }
+ getInitializedLinks().addEvent(link);
+ }
+ }
+
+ @Override
+ public void setStatus(Status status) {
+ Preconditions.checkNotNull(status, "status");
+ synchronized (this) {
+ if (hasBeenEnded) {
+ logger.log(Level.FINE, "Calling setStatus() on an ended Span.");
+ return;
+ }
+ this.status = status;
+ }
+ }
+
+ @Override
+ public void end(EndSpanOptions options) {
+ Preconditions.checkNotNull(options, "options");
+ synchronized (this) {
+ if (hasBeenEnded) {
+ logger.log(Level.FINE, "Calling end() on an ended Span.");
+ return;
+ }
+ if (options.getStatus() != null) {
+ status = options.getStatus();
+ }
+ sampleToLocalSpanStore = options.getSampleToLocalSpanStore();
+ endNanoTime = clock.nowNanos();
+ hasBeenEnded = true;
+ }
+ startEndHandler.onEnd(this);
+ }
+
+ @GuardedBy("this")
+ private AttributesWithCapacity getInitializedAttributes() {
+ if (attributes == null) {
+ attributes = new AttributesWithCapacity(traceParams.getMaxNumberOfAttributes());
+ }
+ return attributes;
+ }
+
+ @GuardedBy("this")
+ private TraceEvents<EventWithNanoTime<Annotation>> getInitializedAnnotations() {
+ if (annotations == null) {
+ annotations =
+ new TraceEvents<EventWithNanoTime<Annotation>>(traceParams.getMaxNumberOfAnnotations());
+ }
+ return annotations;
+ }
+
+ @GuardedBy("this")
+ private TraceEvents<EventWithNanoTime<io.opencensus.trace.MessageEvent>>
+ getInitializedNetworkEvents() {
+ if (messageEvents == null) {
+ messageEvents =
+ new TraceEvents<EventWithNanoTime<io.opencensus.trace.MessageEvent>>(
+ traceParams.getMaxNumberOfMessageEvents());
+ }
+ return messageEvents;
+ }
+
+ @GuardedBy("this")
+ private TraceEvents<Link> getInitializedLinks() {
+ if (links == null) {
+ links = new TraceEvents<Link>(traceParams.getMaxNumberOfLinks());
+ }
+ return links;
+ }
+
+ @GuardedBy("this")
+ private Status getStatusWithDefault() {
+ return status == null ? Status.OK : status;
+ }
+
+ private static <T> SpanData.TimedEvents<T> createTimedEvents(
+ TraceEvents<EventWithNanoTime<T>> events, @Nullable TimestampConverter timestampConverter) {
+ if (events == null) {
+ return SpanData.TimedEvents.create(Collections.<TimedEvent<T>>emptyList(), 0);
+ }
+ List<TimedEvent<T>> eventsList = new ArrayList<TimedEvent<T>>(events.events.size());
+ for (EventWithNanoTime<T> networkEvent : events.events) {
+ eventsList.add(
+ networkEvent.toSpanDataTimedEvent(CheckerFrameworkUtils.castNonNull(timestampConverter)));
+ }
+ return SpanData.TimedEvents.create(eventsList, events.getNumberOfDroppedEvents());
+ }
+
+ @Override
+ @Nullable
+ public RecordEventsSpanImpl getNext() {
+ return next;
+ }
+
+ @Override
+ public void setNext(@Nullable RecordEventsSpanImpl element) {
+ next = element;
+ }
+
+ @Override
+ @Nullable
+ public RecordEventsSpanImpl getPrev() {
+ return prev;
+ }
+
+ @Override
+ public void setPrev(@Nullable RecordEventsSpanImpl element) {
+ prev = element;
+ }
+
+ /**
+ * Interface to handle the start and end operations for a {@link Span} only when the {@code Span}
+ * has {@link Options#RECORD_EVENTS} option.
+ *
+ * <p>Implementation must avoid high overhead work in any of the methods because the code is
+ * executed on the critical path.
+ *
+ * <p>One instance can be called by multiple threads in the same time, so the implementation must
+ * be thread-safe.
+ */
+ public interface StartEndHandler {
+ void onStart(RecordEventsSpanImpl span);
+
+ void onEnd(RecordEventsSpanImpl span);
+ }
+
+ // A map implementation with a fixed capacity that drops events when the map gets full. Eviction
+ // is based on the access order.
+ private static final class AttributesWithCapacity extends LinkedHashMap<String, AttributeValue> {
+ private final int capacity;
+ private int totalRecordedAttributes = 0;
+ // Here because -Werror complains about this: [serial] serializable class AttributesWithCapacity
+ // has no definition of serialVersionUID. This class shouldn't be serialized.
+ private static final long serialVersionUID = 42L;
+
+ private AttributesWithCapacity(int capacity) {
+ // Capacity of the map is capacity + 1 to avoid resizing because removeEldestEntry is invoked
+ // by put and putAll after inserting a new entry into the map. The loadFactor is set to 1
+ // to avoid resizing because. The accessOrder is set to true.
+ super(capacity + 1, 1, /*accessOrder=*/ true);
+ this.capacity = capacity;
+ }
+
+ // Users must call this method instead of put to keep count of the total number of entries
+ // inserted.
+ private void putAttribute(String key, AttributeValue value) {
+ totalRecordedAttributes += 1;
+ put(key, value);
+ }
+
+ // Users must call this method instead of putAll to keep count of the total number of entries
+ // inserted.
+ private void putAttributes(Map<String, AttributeValue> attributes) {
+ totalRecordedAttributes += attributes.size();
+ putAll(attributes);
+ }
+
+ private int getNumberOfDroppedAttributes() {
+ return totalRecordedAttributes - size();
+ }
+
+ // It is called after each put or putAll call in order to determine if the eldest inserted
+ // entry should be removed or not.
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<String, AttributeValue> eldest) {
+ return size() > this.capacity;
+ }
+ }
+
+ private static final class TraceEvents<T> {
+ private int totalRecordedEvents = 0;
+ private final EvictingQueue<T> events;
+
+ private int getNumberOfDroppedEvents() {
+ return totalRecordedEvents - events.size();
+ }
+
+ TraceEvents(int maxNumEvents) {
+ events = EvictingQueue.create(maxNumEvents);
+ }
+
+ void addEvent(T event) {
+ totalRecordedEvents++;
+ events.add(event);
+ }
+ }
+
+ // Timed event that uses nanoTime to represent the Timestamp.
+ private static final class EventWithNanoTime<T> {
+ private final long nanoTime;
+ private final T event;
+
+ private EventWithNanoTime(long nanoTime, T event) {
+ this.nanoTime = nanoTime;
+ this.event = event;
+ }
+
+ private TimedEvent<T> toSpanDataTimedEvent(TimestampConverter timestampConverter) {
+ return TimedEvent.create(timestampConverter.convertNanoTime(nanoTime), event);
+ }
+ }
+
+ private RecordEventsSpanImpl(
+ SpanContext context,
+ String name,
+ @Nullable Kind kind,
+ @Nullable SpanId parentSpanId,
+ @Nullable Boolean hasRemoteParent,
+ TraceParams traceParams,
+ StartEndHandler startEndHandler,
+ @Nullable TimestampConverter timestampConverter,
+ Clock clock) {
+ super(context, RECORD_EVENTS_SPAN_OPTIONS);
+ this.parentSpanId = parentSpanId;
+ this.hasRemoteParent = hasRemoteParent;
+ this.name = name;
+ this.kind = kind;
+ this.traceParams = traceParams;
+ this.startEndHandler = startEndHandler;
+ this.clock = clock;
+ this.hasBeenEnded = false;
+ this.sampleToLocalSpanStore = false;
+ this.timestampConverter =
+ timestampConverter != null ? timestampConverter : TimestampConverter.now(clock);
+ startNanoTime = clock.nowNanos();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/SpanBuilderImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/SpanBuilderImpl.java
new file mode 100644
index 00000000..5565e9de
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/SpanBuilderImpl.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.internal.TimestampConverter;
+import io.opencensus.implcore.trace.internal.RandomHandler;
+import io.opencensus.trace.Link;
+import io.opencensus.trace.Link.Type;
+import io.opencensus.trace.Sampler;
+import io.opencensus.trace.Span;
+import io.opencensus.trace.Span.Kind;
+import io.opencensus.trace.SpanBuilder;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.SpanId;
+import io.opencensus.trace.TraceId;
+import io.opencensus.trace.TraceOptions;
+import io.opencensus.trace.Tracestate;
+import io.opencensus.trace.config.TraceConfig;
+import io.opencensus.trace.config.TraceParams;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import javax.annotation.Nullable;
+
+/** Implementation of the {@link SpanBuilder}. */
+final class SpanBuilderImpl extends SpanBuilder {
+ private static final Tracestate TRACESTATE_DEFAULT = Tracestate.builder().build();
+
+ private static final TraceOptions SAMPLED_TRACE_OPTIONS =
+ TraceOptions.builder().setIsSampled(true).build();
+ private static final TraceOptions NOT_SAMPLED_TRACE_OPTIONS =
+ TraceOptions.builder().setIsSampled(false).build();
+
+ private final Options options;
+ private final String name;
+ @Nullable private final Span parent;
+ @Nullable private final SpanContext remoteParentSpanContext;
+ @Nullable private Sampler sampler;
+ private List<Span> parentLinks = Collections.<Span>emptyList();
+ @Nullable private Boolean recordEvents;
+ @Nullable private Kind kind;
+
+ private Span startSpanInternal(
+ @Nullable SpanContext parent,
+ @Nullable Boolean hasRemoteParent,
+ String name,
+ @Nullable Sampler sampler,
+ List<Span> parentLinks,
+ @Nullable Boolean recordEvents,
+ @Nullable Kind kind,
+ @Nullable TimestampConverter timestampConverter) {
+ TraceParams activeTraceParams = options.traceConfig.getActiveTraceParams();
+ Random random = options.randomHandler.current();
+ TraceId traceId;
+ SpanId spanId = SpanId.generateRandomId(random);
+ SpanId parentSpanId = null;
+ // TODO(bdrutu): Handle tracestate correctly not just propagate.
+ Tracestate tracestate = TRACESTATE_DEFAULT;
+ if (parent == null || !parent.isValid()) {
+ // New root span.
+ traceId = TraceId.generateRandomId(random);
+ // This is a root span so no remote or local parent.
+ hasRemoteParent = null;
+ } else {
+ // New child span.
+ traceId = parent.getTraceId();
+ parentSpanId = parent.getSpanId();
+ tracestate = parent.getTracestate();
+ }
+ TraceOptions traceOptions =
+ makeSamplingDecision(
+ parent,
+ hasRemoteParent,
+ name,
+ sampler,
+ parentLinks,
+ traceId,
+ spanId,
+ activeTraceParams)
+ ? SAMPLED_TRACE_OPTIONS
+ : NOT_SAMPLED_TRACE_OPTIONS;
+ Span span =
+ (traceOptions.isSampled() || Boolean.TRUE.equals(recordEvents))
+ ? RecordEventsSpanImpl.startSpan(
+ SpanContext.create(traceId, spanId, traceOptions, tracestate),
+ name,
+ kind,
+ parentSpanId,
+ hasRemoteParent,
+ activeTraceParams,
+ options.startEndHandler,
+ timestampConverter,
+ options.clock)
+ : NoRecordEventsSpanImpl.create(
+ SpanContext.create(traceId, spanId, traceOptions, tracestate));
+ linkSpans(span, parentLinks);
+ return span;
+ }
+
+ private static boolean makeSamplingDecision(
+ @Nullable SpanContext parent,
+ @Nullable Boolean hasRemoteParent,
+ String name,
+ @Nullable Sampler sampler,
+ List<Span> parentLinks,
+ TraceId traceId,
+ SpanId spanId,
+ TraceParams activeTraceParams) {
+ // If users set a specific sampler in the SpanBuilder, use it.
+ if (sampler != null) {
+ return sampler.shouldSample(parent, hasRemoteParent, traceId, spanId, name, parentLinks);
+ }
+ // Use the default sampler if this is a root Span or this is an entry point Span (has remote
+ // parent).
+ if (Boolean.TRUE.equals(hasRemoteParent) || parent == null || !parent.isValid()) {
+ return activeTraceParams
+ .getSampler()
+ .shouldSample(parent, hasRemoteParent, traceId, spanId, name, parentLinks);
+ }
+ // Parent is always different than null because otherwise we use the default sampler.
+ return parent.getTraceOptions().isSampled() || isAnyParentLinkSampled(parentLinks);
+ }
+
+ private static boolean isAnyParentLinkSampled(List<Span> parentLinks) {
+ for (Span parentLink : parentLinks) {
+ if (parentLink.getContext().getTraceOptions().isSampled()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static void linkSpans(Span span, List<Span> parentLinks) {
+ if (!parentLinks.isEmpty()) {
+ Link childLink = Link.fromSpanContext(span.getContext(), Type.CHILD_LINKED_SPAN);
+ for (Span linkedSpan : parentLinks) {
+ linkedSpan.addLink(childLink);
+ span.addLink(Link.fromSpanContext(linkedSpan.getContext(), Type.PARENT_LINKED_SPAN));
+ }
+ }
+ }
+
+ static SpanBuilderImpl createWithParent(String spanName, @Nullable Span parent, Options options) {
+ return new SpanBuilderImpl(spanName, null, parent, options);
+ }
+
+ static SpanBuilderImpl createWithRemoteParent(
+ String spanName, @Nullable SpanContext remoteParentSpanContext, Options options) {
+ return new SpanBuilderImpl(spanName, remoteParentSpanContext, null, options);
+ }
+
+ private SpanBuilderImpl(
+ String name,
+ @Nullable SpanContext remoteParentSpanContext,
+ @Nullable Span parent,
+ Options options) {
+ this.name = checkNotNull(name, "name");
+ this.parent = parent;
+ this.remoteParentSpanContext = remoteParentSpanContext;
+ this.options = options;
+ }
+
+ @Override
+ public Span startSpan() {
+ SpanContext parentContext = remoteParentSpanContext;
+ Boolean hasRemoteParent = Boolean.TRUE;
+ TimestampConverter timestampConverter = null;
+ if (remoteParentSpanContext == null) {
+ // This is not a child of a remote Span. Get the parent SpanContext from the parent Span if
+ // any.
+ Span parent = this.parent;
+ hasRemoteParent = Boolean.FALSE;
+ if (parent != null) {
+ parentContext = parent.getContext();
+ // Pass the timestamp converter from the parent to ensure that the recorded events are in
+ // the right order. Implementation uses System.nanoTime() which is monotonically increasing.
+ if (parent instanceof RecordEventsSpanImpl) {
+ timestampConverter = ((RecordEventsSpanImpl) parent).getTimestampConverter();
+ }
+ } else {
+ hasRemoteParent = null;
+ }
+ }
+ return startSpanInternal(
+ parentContext,
+ hasRemoteParent,
+ name,
+ sampler,
+ parentLinks,
+ recordEvents,
+ kind,
+ timestampConverter);
+ }
+
+ static final class Options {
+ private final RandomHandler randomHandler;
+ private final RecordEventsSpanImpl.StartEndHandler startEndHandler;
+ private final Clock clock;
+ private final TraceConfig traceConfig;
+
+ Options(
+ RandomHandler randomHandler,
+ RecordEventsSpanImpl.StartEndHandler startEndHandler,
+ Clock clock,
+ TraceConfig traceConfig) {
+ this.randomHandler = checkNotNull(randomHandler, "randomHandler");
+ this.startEndHandler = checkNotNull(startEndHandler, "startEndHandler");
+ this.clock = checkNotNull(clock, "clock");
+ this.traceConfig = checkNotNull(traceConfig, "traceConfig");
+ }
+ }
+
+ @Override
+ public SpanBuilderImpl setSampler(Sampler sampler) {
+ this.sampler = checkNotNull(sampler, "sampler");
+ return this;
+ }
+
+ @Override
+ public SpanBuilderImpl setParentLinks(List<Span> parentLinks) {
+ this.parentLinks = checkNotNull(parentLinks, "parentLinks");
+ return this;
+ }
+
+ @Override
+ public SpanBuilderImpl setRecordEvents(boolean recordEvents) {
+ this.recordEvents = recordEvents;
+ return this;
+ }
+
+ @Override
+ public SpanBuilderImpl setSpanKind(@Nullable Kind kind) {
+ this.kind = kind;
+ return this;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java
new file mode 100644
index 00000000..6adaa200
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/StartEndHandlerImpl.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace;
+
+import io.opencensus.implcore.internal.EventQueue;
+import io.opencensus.implcore.trace.RecordEventsSpanImpl.StartEndHandler;
+import io.opencensus.implcore.trace.export.RunningSpanStoreImpl;
+import io.opencensus.implcore.trace.export.SampledSpanStoreImpl;
+import io.opencensus.implcore.trace.export.SpanExporterImpl;
+import io.opencensus.trace.Span.Options;
+import io.opencensus.trace.export.SpanData;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * Uses the provided {@link EventQueue} to defer processing/exporting of the {@link SpanData} to
+ * avoid impacting the critical path.
+ */
+@ThreadSafe
+public final class StartEndHandlerImpl implements StartEndHandler {
+ private final SpanExporterImpl spanExporter;
+ @Nullable private final RunningSpanStoreImpl runningSpanStore;
+ @Nullable private final SampledSpanStoreImpl sampledSpanStore;
+ private final EventQueue eventQueue;
+ // true if any of (runningSpanStore OR sampledSpanStore) are different than null, which
+ // means the spans with RECORD_EVENTS should be enqueued in the queue.
+ private final boolean enqueueEventForNonSampledSpans;
+
+ /**
+ * Constructs a new {@code StartEndHandlerImpl}.
+ *
+ * @param spanExporter the {@code SpanExporter} implementation.
+ * @param runningSpanStore the {@code RunningSpanStore} implementation.
+ * @param sampledSpanStore the {@code SampledSpanStore} implementation.
+ * @param eventQueue the event queue where all the events are enqueued.
+ */
+ public StartEndHandlerImpl(
+ SpanExporterImpl spanExporter,
+ @Nullable RunningSpanStoreImpl runningSpanStore,
+ @Nullable SampledSpanStoreImpl sampledSpanStore,
+ EventQueue eventQueue) {
+ this.spanExporter = spanExporter;
+ this.runningSpanStore = runningSpanStore;
+ this.sampledSpanStore = sampledSpanStore;
+ this.enqueueEventForNonSampledSpans = runningSpanStore != null || sampledSpanStore != null;
+ this.eventQueue = eventQueue;
+ }
+
+ @Override
+ public void onStart(RecordEventsSpanImpl span) {
+ if (span.getOptions().contains(Options.RECORD_EVENTS) && enqueueEventForNonSampledSpans) {
+ eventQueue.enqueue(new SpanStartEvent(span, runningSpanStore));
+ }
+ }
+
+ @Override
+ public void onEnd(RecordEventsSpanImpl span) {
+ if ((span.getOptions().contains(Options.RECORD_EVENTS) && enqueueEventForNonSampledSpans)
+ || span.getContext().getTraceOptions().isSampled()) {
+ eventQueue.enqueue(new SpanEndEvent(span, spanExporter, runningSpanStore, sampledSpanStore));
+ }
+ }
+
+ // An EventQueue entry that records the start of the span event.
+ private static final class SpanStartEvent implements EventQueue.Entry {
+ private final RecordEventsSpanImpl span;
+ @Nullable private final RunningSpanStoreImpl activeSpansExporter;
+
+ SpanStartEvent(RecordEventsSpanImpl span, @Nullable RunningSpanStoreImpl activeSpansExporter) {
+ this.span = span;
+ this.activeSpansExporter = activeSpansExporter;
+ }
+
+ @Override
+ public void process() {
+ if (activeSpansExporter != null) {
+ activeSpansExporter.onStart(span);
+ }
+ }
+ }
+
+ // An EventQueue entry that records the end of the span event.
+ private static final class SpanEndEvent implements EventQueue.Entry {
+ private final RecordEventsSpanImpl span;
+ @Nullable private final RunningSpanStoreImpl runningSpanStore;
+ private final SpanExporterImpl spanExporter;
+ @Nullable private final SampledSpanStoreImpl sampledSpanStore;
+
+ SpanEndEvent(
+ RecordEventsSpanImpl span,
+ SpanExporterImpl spanExporter,
+ @Nullable RunningSpanStoreImpl runningSpanStore,
+ @Nullable SampledSpanStoreImpl sampledSpanStore) {
+ this.span = span;
+ this.runningSpanStore = runningSpanStore;
+ this.spanExporter = spanExporter;
+ this.sampledSpanStore = sampledSpanStore;
+ }
+
+ @Override
+ public void process() {
+ if (span.getContext().getTraceOptions().isSampled()) {
+ spanExporter.addSpan(span);
+ }
+ if (runningSpanStore != null) {
+ runningSpanStore.onEnd(span);
+ }
+ if (sampledSpanStore != null) {
+ sampledSpanStore.considerForSampling(span);
+ }
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/TraceComponentImplBase.java b/impl_core/src/main/java/io/opencensus/implcore/trace/TraceComponentImplBase.java
new file mode 100644
index 00000000..c1432432
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/TraceComponentImplBase.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace;
+
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.internal.EventQueue;
+import io.opencensus.implcore.internal.SimpleEventQueue;
+import io.opencensus.implcore.trace.RecordEventsSpanImpl.StartEndHandler;
+import io.opencensus.implcore.trace.config.TraceConfigImpl;
+import io.opencensus.implcore.trace.export.ExportComponentImpl;
+import io.opencensus.implcore.trace.internal.RandomHandler;
+import io.opencensus.implcore.trace.propagation.PropagationComponentImpl;
+import io.opencensus.trace.TraceComponent;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.config.TraceConfig;
+import io.opencensus.trace.export.ExportComponent;
+import io.opencensus.trace.propagation.PropagationComponent;
+
+/**
+ * Helper class to allow sharing the code for all the {@link TraceComponent} implementations. This
+ * class cannot use inheritance because in version 0.5.* the constructor of the {@code
+ * TraceComponent} is package protected.
+ *
+ * <p>This can be changed back to inheritance when version 0.5.* is no longer supported.
+ */
+public final class TraceComponentImplBase {
+ private final ExportComponentImpl exportComponent;
+ private final PropagationComponent propagationComponent = new PropagationComponentImpl();
+ private final Clock clock;
+ private final TraceConfig traceConfig = new TraceConfigImpl();
+ private final Tracer tracer;
+
+ /**
+ * Creates a new {@code TraceComponentImplBase}.
+ *
+ * @param clock the clock to use throughout tracing.
+ * @param randomHandler the random number generator for generating trace and span IDs.
+ * @param eventQueue the queue implementation.
+ */
+ public TraceComponentImplBase(Clock clock, RandomHandler randomHandler, EventQueue eventQueue) {
+ this.clock = clock;
+ // TODO(bdrutu): Add a config/argument for supportInProcessStores.
+ if (eventQueue instanceof SimpleEventQueue) {
+ exportComponent = ExportComponentImpl.createWithoutInProcessStores(eventQueue);
+ } else {
+ exportComponent = ExportComponentImpl.createWithInProcessStores(eventQueue);
+ }
+ StartEndHandler startEndHandler =
+ new StartEndHandlerImpl(
+ exportComponent.getSpanExporter(),
+ exportComponent.getRunningSpanStore(),
+ exportComponent.getSampledSpanStore(),
+ eventQueue);
+ tracer = new TracerImpl(randomHandler, startEndHandler, clock, traceConfig);
+ }
+
+ public Tracer getTracer() {
+ return tracer;
+ }
+
+ public PropagationComponent getPropagationComponent() {
+ return propagationComponent;
+ }
+
+ public final Clock getClock() {
+ return clock;
+ }
+
+ public ExportComponent getExportComponent() {
+ return exportComponent;
+ }
+
+ public TraceConfig getTraceConfig() {
+ return traceConfig;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/TracerImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/TracerImpl.java
new file mode 100644
index 00000000..48df8055
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/TracerImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace;
+
+import io.opencensus.common.Clock;
+import io.opencensus.implcore.trace.internal.RandomHandler;
+import io.opencensus.trace.Span;
+import io.opencensus.trace.SpanBuilder;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.config.TraceConfig;
+import javax.annotation.Nullable;
+
+/** Implementation of the {@link Tracer}. */
+public final class TracerImpl extends Tracer {
+ private final SpanBuilderImpl.Options spanBuilderOptions;
+
+ TracerImpl(
+ RandomHandler randomHandler,
+ RecordEventsSpanImpl.StartEndHandler startEndHandler,
+ Clock clock,
+ TraceConfig traceConfig) {
+ spanBuilderOptions =
+ new SpanBuilderImpl.Options(randomHandler, startEndHandler, clock, traceConfig);
+ }
+
+ @Override
+ public SpanBuilder spanBuilderWithExplicitParent(String spanName, @Nullable Span parent) {
+ return SpanBuilderImpl.createWithParent(spanName, parent, spanBuilderOptions);
+ }
+
+ @Override
+ public SpanBuilder spanBuilderWithRemoteParent(
+ String spanName, @Nullable SpanContext remoteParentSpanContext) {
+ return SpanBuilderImpl.createWithRemoteParent(
+ spanName, remoteParentSpanContext, spanBuilderOptions);
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/config/TraceConfigImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/config/TraceConfigImpl.java
new file mode 100644
index 00000000..25f0c613
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/config/TraceConfigImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.config;
+
+import io.opencensus.trace.config.TraceConfig;
+import io.opencensus.trace.config.TraceParams;
+
+/**
+ * Global configuration of the trace service. This allows users to change configs for the default
+ * sampler, maximum events to be kept, etc.
+ */
+public final class TraceConfigImpl extends TraceConfig {
+ // Reads and writes are atomic for reference variables. Use volatile to ensure that these
+ // operations are visible on other CPUs as well.
+ private volatile TraceParams activeTraceParams = TraceParams.DEFAULT;
+
+ /** Constructs a new {@code TraceConfigImpl}. */
+ public TraceConfigImpl() {}
+
+ @Override
+ public TraceParams getActiveTraceParams() {
+ return activeTraceParams;
+ }
+
+ @Override
+ public void updateActiveTraceParams(TraceParams traceParams) {
+ activeTraceParams = traceParams;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java
new file mode 100644
index 00000000..19817380
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/ExportComponentImpl.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.export;
+
+import io.opencensus.common.Duration;
+import io.opencensus.implcore.internal.EventQueue;
+import io.opencensus.trace.export.ExportComponent;
+import io.opencensus.trace.export.RunningSpanStore;
+import io.opencensus.trace.export.SampledSpanStore;
+
+/** Implementation of the {@link ExportComponent}. */
+public final class ExportComponentImpl extends ExportComponent {
+ private static final int EXPORTER_BUFFER_SIZE = 32;
+ // Enforces that trace export exports data at least once every 5 seconds.
+ private static final Duration EXPORTER_SCHEDULE_DELAY = Duration.create(5, 0);
+
+ private final SpanExporterImpl spanExporter;
+ private final RunningSpanStoreImpl runningSpanStore;
+ private final SampledSpanStoreImpl sampledSpanStore;
+
+ @Override
+ public SpanExporterImpl getSpanExporter() {
+ return spanExporter;
+ }
+
+ @Override
+ public RunningSpanStoreImpl getRunningSpanStore() {
+ return runningSpanStore;
+ }
+
+ @Override
+ public SampledSpanStoreImpl getSampledSpanStore() {
+ return sampledSpanStore;
+ }
+
+ @Override
+ public void shutdown() {
+ sampledSpanStore.shutdown();
+ spanExporter.shutdown();
+ }
+
+ /**
+ * Returns a new {@code ExportComponentImpl} that has valid instances for {@link RunningSpanStore}
+ * and {@link SampledSpanStore}.
+ *
+ * @return a new {@code ExportComponentImpl}.
+ */
+ public static ExportComponentImpl createWithInProcessStores(EventQueue eventQueue) {
+ return new ExportComponentImpl(true, eventQueue);
+ }
+
+ /**
+ * Returns a new {@code ExportComponentImpl} that has {@code null} instances for {@link
+ * RunningSpanStore} and {@link SampledSpanStore}.
+ *
+ * @return a new {@code ExportComponentImpl}.
+ */
+ public static ExportComponentImpl createWithoutInProcessStores(EventQueue eventQueue) {
+ return new ExportComponentImpl(false, eventQueue);
+ }
+
+ /**
+ * Constructs a new {@code ExportComponentImpl}.
+ *
+ * @param supportInProcessStores {@code true} to instantiate {@link RunningSpanStore} and {@link
+ * SampledSpanStore}.
+ */
+ private ExportComponentImpl(boolean supportInProcessStores, EventQueue eventQueue) {
+ this.spanExporter = SpanExporterImpl.create(EXPORTER_BUFFER_SIZE, EXPORTER_SCHEDULE_DELAY);
+ this.runningSpanStore =
+ supportInProcessStores
+ ? new InProcessRunningSpanStoreImpl()
+ : RunningSpanStoreImpl.getNoopRunningSpanStoreImpl();
+ this.sampledSpanStore =
+ supportInProcessStores
+ ? new InProcessSampledSpanStoreImpl(eventQueue)
+ : SampledSpanStoreImpl.getNoopSampledSpanStoreImpl();
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java
new file mode 100644
index 00000000..f7aeac71
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessRunningSpanStoreImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.export;
+
+import io.opencensus.implcore.trace.RecordEventsSpanImpl;
+import io.opencensus.implcore.trace.internal.ConcurrentIntrusiveList;
+import io.opencensus.trace.export.RunningSpanStore;
+import io.opencensus.trace.export.SpanData;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.ThreadSafe;
+
+/** In-process implementation of the {@link RunningSpanStore}. */
+@ThreadSafe
+public final class InProcessRunningSpanStoreImpl extends RunningSpanStoreImpl {
+ private final ConcurrentIntrusiveList<RecordEventsSpanImpl> runningSpans;
+
+ public InProcessRunningSpanStoreImpl() {
+ runningSpans = new ConcurrentIntrusiveList<RecordEventsSpanImpl>();
+ }
+
+ @Override
+ public void onStart(RecordEventsSpanImpl span) {
+ runningSpans.addElement(span);
+ }
+
+ @Override
+ public void onEnd(RecordEventsSpanImpl span) {
+ runningSpans.removeElement(span);
+ }
+
+ @Override
+ public Summary getSummary() {
+ Collection<RecordEventsSpanImpl> allRunningSpans = runningSpans.getAll();
+ Map<String, Integer> numSpansPerName = new HashMap<String, Integer>();
+ for (RecordEventsSpanImpl span : allRunningSpans) {
+ Integer prevValue = numSpansPerName.get(span.getName());
+ numSpansPerName.put(span.getName(), prevValue != null ? prevValue + 1 : 1);
+ }
+ Map<String, PerSpanNameSummary> perSpanNameSummary = new HashMap<String, PerSpanNameSummary>();
+ for (Map.Entry<String, Integer> it : numSpansPerName.entrySet()) {
+ perSpanNameSummary.put(it.getKey(), PerSpanNameSummary.create(it.getValue()));
+ }
+ Summary summary = Summary.create(perSpanNameSummary);
+ return summary;
+ }
+
+ @Override
+ public Collection<SpanData> getRunningSpans(Filter filter) {
+ Collection<RecordEventsSpanImpl> allRunningSpans = runningSpans.getAll();
+ int maxSpansToReturn =
+ filter.getMaxSpansToReturn() == 0 ? allRunningSpans.size() : filter.getMaxSpansToReturn();
+ List<SpanData> ret = new ArrayList<SpanData>(maxSpansToReturn);
+ for (RecordEventsSpanImpl span : allRunningSpans) {
+ if (ret.size() == maxSpansToReturn) {
+ break;
+ }
+ if (span.getName().equals(filter.getSpanName())) {
+ ret.add(span.toSpanData());
+ }
+ }
+ return ret;
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java
new file mode 100644
index 00000000..0d8e493b
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/InProcessSampledSpanStoreImpl.java
@@ -0,0 +1,396 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.export;
+
+import com.google.common.collect.EvictingQueue;
+import io.opencensus.implcore.internal.EventQueue;
+import io.opencensus.implcore.trace.RecordEventsSpanImpl;
+import io.opencensus.trace.Status;
+import io.opencensus.trace.Status.CanonicalCode;
+import io.opencensus.trace.export.SampledSpanStore;
+import io.opencensus.trace.export.SpanData;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+/** In-process implementation of the {@link SampledSpanStore}. */
+@ThreadSafe
+public final class InProcessSampledSpanStoreImpl extends SampledSpanStoreImpl {
+ private static final int NUM_SAMPLES_PER_LATENCY_BUCKET = 10;
+ private static final int NUM_SAMPLES_PER_ERROR_BUCKET = 5;
+ private static final long TIME_BETWEEN_SAMPLES = TimeUnit.SECONDS.toNanos(1);
+ private static final int NUM_LATENCY_BUCKETS = LatencyBucketBoundaries.values().length;
+ // The total number of canonical codes - 1 (the OK code).
+ private static final int NUM_ERROR_BUCKETS = CanonicalCode.values().length - 1;
+ private static final int MAX_PER_SPAN_NAME_SAMPLES =
+ NUM_SAMPLES_PER_LATENCY_BUCKET * NUM_LATENCY_BUCKETS
+ + NUM_SAMPLES_PER_ERROR_BUCKET * NUM_ERROR_BUCKETS;
+
+ // Used to stream the register/unregister events to the implementation to avoid lock contention
+ // between the main threads and the worker thread.
+ private final EventQueue eventQueue;
+
+ @GuardedBy("samples")
+ private final Map<String, PerSpanNameSamples> samples;
+
+ private static final class Bucket {
+
+ private final EvictingQueue<RecordEventsSpanImpl> sampledSpansQueue;
+ private final EvictingQueue<RecordEventsSpanImpl> notSampledSpansQueue;
+ private long lastSampledNanoTime;
+ private long lastNotSampledNanoTime;
+
+ private Bucket(int numSamples) {
+ sampledSpansQueue = EvictingQueue.create(numSamples);
+ notSampledSpansQueue = EvictingQueue.create(numSamples);
+ }
+
+ private void considerForSampling(RecordEventsSpanImpl span) {
+ long spanEndNanoTime = span.getEndNanoTime();
+ if (span.getContext().getTraceOptions().isSampled()) {
+ // Need to compare by doing the subtraction all the time because in case of an overflow,
+ // this may never sample again (at least for the next ~200 years). No real chance to
+ // overflow two times because that means the process runs for ~200 years.
+ if (spanEndNanoTime - lastSampledNanoTime > TIME_BETWEEN_SAMPLES) {
+ sampledSpansQueue.add(span);
+ lastSampledNanoTime = spanEndNanoTime;
+ }
+ } else {
+ // Need to compare by doing the subtraction all the time because in case of an overflow,
+ // this may never sample again (at least for the next ~200 years). No real chance to
+ // overflow two times because that means the process runs for ~200 years.
+ if (spanEndNanoTime - lastNotSampledNanoTime > TIME_BETWEEN_SAMPLES) {
+ notSampledSpansQueue.add(span);
+ lastNotSampledNanoTime = spanEndNanoTime;
+ }
+ }
+ }
+
+ private void getSamples(int maxSpansToReturn, List<RecordEventsSpanImpl> output) {
+ getSamples(maxSpansToReturn, output, sampledSpansQueue);
+ getSamples(maxSpansToReturn, output, notSampledSpansQueue);
+ }
+
+ private static void getSamples(
+ int maxSpansToReturn,
+ List<RecordEventsSpanImpl> output,
+ EvictingQueue<RecordEventsSpanImpl> queue) {
+ for (RecordEventsSpanImpl span : queue) {
+ if (output.size() >= maxSpansToReturn) {
+ break;
+ }
+ output.add(span);
+ }
+ }
+
+ private void getSamplesFilteredByLatency(
+ long latencyLowerNs,
+ long latencyUpperNs,
+ int maxSpansToReturn,
+ List<RecordEventsSpanImpl> output) {
+ getSamplesFilteredByLatency(
+ latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, sampledSpansQueue);
+ getSamplesFilteredByLatency(
+ latencyLowerNs, latencyUpperNs, maxSpansToReturn, output, notSampledSpansQueue);
+ }
+
+ private static void getSamplesFilteredByLatency(
+ long latencyLowerNs,
+ long latencyUpperNs,
+ int maxSpansToReturn,
+ List<RecordEventsSpanImpl> output,
+ EvictingQueue<RecordEventsSpanImpl> queue) {
+ for (RecordEventsSpanImpl span : queue) {
+ if (output.size() >= maxSpansToReturn) {
+ break;
+ }
+ long spanLatencyNs = span.getLatencyNs();
+ if (spanLatencyNs >= latencyLowerNs && spanLatencyNs < latencyUpperNs) {
+ output.add(span);
+ }
+ }
+ }
+
+ private int getNumSamples() {
+ return sampledSpansQueue.size() + notSampledSpansQueue.size();
+ }
+ }
+
+ /**
+ * Keeps samples for a given span name. Samples for all the latency buckets and for all canonical
+ * codes other than OK.
+ */
+ private static final class PerSpanNameSamples {
+
+ private final Bucket[] latencyBuckets;
+ private final Bucket[] errorBuckets;
+
+ private PerSpanNameSamples() {
+ latencyBuckets = new Bucket[NUM_LATENCY_BUCKETS];
+ for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
+ latencyBuckets[i] = new Bucket(NUM_SAMPLES_PER_LATENCY_BUCKET);
+ }
+ errorBuckets = new Bucket[NUM_ERROR_BUCKETS];
+ for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
+ errorBuckets[i] = new Bucket(NUM_SAMPLES_PER_ERROR_BUCKET);
+ }
+ }
+
+ @Nullable
+ private Bucket getLatencyBucket(long latencyNs) {
+ for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
+ LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i];
+ if (latencyNs >= boundaries.getLatencyLowerNs()
+ && latencyNs < boundaries.getLatencyUpperNs()) {
+ return latencyBuckets[i];
+ }
+ }
+ // latencyNs is negative or Long.MAX_VALUE, so this Span can be ignored. This cannot happen
+ // in real production because System#nanoTime is monotonic.
+ return null;
+ }
+
+ private Bucket getErrorBucket(CanonicalCode code) {
+ return errorBuckets[code.value() - 1];
+ }
+
+ private void considerForSampling(RecordEventsSpanImpl span) {
+ Status status = span.getStatus();
+ // Null status means running Span, this should not happen in production, but the library
+ // should not crash because of this.
+ if (status != null) {
+ Bucket bucket =
+ status.isOk()
+ ? getLatencyBucket(span.getLatencyNs())
+ : getErrorBucket(status.getCanonicalCode());
+ // If unable to find the bucket, ignore this Span.
+ if (bucket != null) {
+ bucket.considerForSampling(span);
+ }
+ }
+ }
+
+ private Map<LatencyBucketBoundaries, Integer> getNumbersOfLatencySampledSpans() {
+ Map<LatencyBucketBoundaries, Integer> latencyBucketSummaries =
+ new EnumMap<LatencyBucketBoundaries, Integer>(LatencyBucketBoundaries.class);
+ for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
+ latencyBucketSummaries.put(
+ LatencyBucketBoundaries.values()[i], latencyBuckets[i].getNumSamples());
+ }
+ return latencyBucketSummaries;
+ }
+
+ private Map<CanonicalCode, Integer> getNumbersOfErrorSampledSpans() {
+ Map<CanonicalCode, Integer> errorBucketSummaries =
+ new EnumMap<CanonicalCode, Integer>(CanonicalCode.class);
+ for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
+ errorBucketSummaries.put(CanonicalCode.values()[i + 1], errorBuckets[i].getNumSamples());
+ }
+ return errorBucketSummaries;
+ }
+
+ private List<RecordEventsSpanImpl> getErrorSamples(
+ @Nullable CanonicalCode code, int maxSpansToReturn) {
+ ArrayList<RecordEventsSpanImpl> output =
+ new ArrayList<RecordEventsSpanImpl>(maxSpansToReturn);
+ if (code != null) {
+ getErrorBucket(code).getSamples(maxSpansToReturn, output);
+ } else {
+ for (int i = 0; i < NUM_ERROR_BUCKETS; i++) {
+ errorBuckets[i].getSamples(maxSpansToReturn, output);
+ }
+ }
+ return output;
+ }
+
+ private List<RecordEventsSpanImpl> getLatencySamples(
+ long latencyLowerNs, long latencyUpperNs, int maxSpansToReturn) {
+ ArrayList<RecordEventsSpanImpl> output =
+ new ArrayList<RecordEventsSpanImpl>(maxSpansToReturn);
+ for (int i = 0; i < NUM_LATENCY_BUCKETS; i++) {
+ LatencyBucketBoundaries boundaries = LatencyBucketBoundaries.values()[i];
+ if (latencyUpperNs >= boundaries.getLatencyLowerNs()
+ && latencyLowerNs < boundaries.getLatencyUpperNs()) {
+ latencyBuckets[i].getSamplesFilteredByLatency(
+ latencyLowerNs, latencyUpperNs, maxSpansToReturn, output);
+ }
+ }
+ return output;
+ }
+ }
+
+ /** Constructs a new {@code InProcessSampledSpanStoreImpl}. */
+ InProcessSampledSpanStoreImpl(EventQueue eventQueue) {
+ samples = new HashMap<String, PerSpanNameSamples>();
+ this.eventQueue = eventQueue;
+ }
+
+ @Override
+ public Summary getSummary() {
+ Map<String, PerSpanNameSummary> ret = new HashMap<String, PerSpanNameSummary>();
+ synchronized (samples) {
+ for (Map.Entry<String, PerSpanNameSamples> it : samples.entrySet()) {
+ ret.put(
+ it.getKey(),
+ PerSpanNameSummary.create(
+ it.getValue().getNumbersOfLatencySampledSpans(),
+ it.getValue().getNumbersOfErrorSampledSpans()));
+ }
+ }
+ return Summary.create(ret);
+ }
+
+ @Override
+ public void considerForSampling(RecordEventsSpanImpl span) {
+ synchronized (samples) {
+ String spanName = span.getName();
+ if (span.getSampleToLocalSpanStore() && !samples.containsKey(spanName)) {
+ samples.put(spanName, new PerSpanNameSamples());
+ }
+ PerSpanNameSamples perSpanNameSamples = samples.get(spanName);
+ if (perSpanNameSamples != null) {
+ perSpanNameSamples.considerForSampling(span);
+ }
+ }
+ }
+
+ @Override
+ public void registerSpanNamesForCollection(Collection<String> spanNames) {
+ eventQueue.enqueue(new RegisterSpanNameEvent(this, spanNames));
+ }
+
+ @Override
+ protected void shutdown() {
+ eventQueue.shutdown();
+ }
+
+ private void internaltRegisterSpanNamesForCollection(Collection<String> spanNames) {
+ synchronized (samples) {
+ for (String spanName : spanNames) {
+ if (!samples.containsKey(spanName)) {
+ samples.put(spanName, new PerSpanNameSamples());
+ }
+ }
+ }
+ }
+
+ private static final class RegisterSpanNameEvent implements EventQueue.Entry {
+ private final InProcessSampledSpanStoreImpl sampledSpanStore;
+ private final Collection<String> spanNames;
+
+ private RegisterSpanNameEvent(
+ InProcessSampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) {
+ this.sampledSpanStore = sampledSpanStore;
+ this.spanNames = new ArrayList<String>(spanNames);
+ }
+
+ @Override
+ public void process() {
+ sampledSpanStore.internaltRegisterSpanNamesForCollection(spanNames);
+ }
+ }
+
+ @Override
+ public void unregisterSpanNamesForCollection(Collection<String> spanNames) {
+ eventQueue.enqueue(new UnregisterSpanNameEvent(this, spanNames));
+ }
+
+ private void internalUnregisterSpanNamesForCollection(Collection<String> spanNames) {
+ synchronized (samples) {
+ samples.keySet().removeAll(spanNames);
+ }
+ }
+
+ private static final class UnregisterSpanNameEvent implements EventQueue.Entry {
+ private final InProcessSampledSpanStoreImpl sampledSpanStore;
+ private final Collection<String> spanNames;
+
+ private UnregisterSpanNameEvent(
+ InProcessSampledSpanStoreImpl sampledSpanStore, Collection<String> spanNames) {
+ this.sampledSpanStore = sampledSpanStore;
+ this.spanNames = new ArrayList<String>(spanNames);
+ }
+
+ @Override
+ public void process() {
+ sampledSpanStore.internalUnregisterSpanNamesForCollection(spanNames);
+ }
+ }
+
+ @Override
+ public Set<String> getRegisteredSpanNamesForCollection() {
+ synchronized (samples) {
+ return Collections.unmodifiableSet(new HashSet<String>(samples.keySet()));
+ }
+ }
+
+ @Override
+ public Collection<SpanData> getErrorSampledSpans(ErrorFilter filter) {
+ int numSpansToReturn =
+ filter.getMaxSpansToReturn() == 0
+ ? MAX_PER_SPAN_NAME_SAMPLES
+ : filter.getMaxSpansToReturn();
+ List<RecordEventsSpanImpl> spans = Collections.emptyList();
+ // Try to not keep the lock to much, do the RecordEventsSpanImpl -> SpanData conversion outside
+ // the lock.
+ synchronized (samples) {
+ PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName());
+ if (perSpanNameSamples != null) {
+ spans = perSpanNameSamples.getErrorSamples(filter.getCanonicalCode(), numSpansToReturn);
+ }
+ }
+ List<SpanData> ret = new ArrayList<SpanData>(spans.size());
+ for (RecordEventsSpanImpl span : spans) {
+ ret.add(span.toSpanData());
+ }
+ return Collections.unmodifiableList(ret);
+ }
+
+ @Override
+ public Collection<SpanData> getLatencySampledSpans(LatencyFilter filter) {
+ int numSpansToReturn =
+ filter.getMaxSpansToReturn() == 0
+ ? MAX_PER_SPAN_NAME_SAMPLES
+ : filter.getMaxSpansToReturn();
+ List<RecordEventsSpanImpl> spans = Collections.emptyList();
+ // Try to not keep the lock to much, do the RecordEventsSpanImpl -> SpanData conversion outside
+ // the lock.
+ synchronized (samples) {
+ PerSpanNameSamples perSpanNameSamples = samples.get(filter.getSpanName());
+ if (perSpanNameSamples != null) {
+ spans =
+ perSpanNameSamples.getLatencySamples(
+ filter.getLatencyLowerNs(), filter.getLatencyUpperNs(), numSpansToReturn);
+ }
+ }
+ List<SpanData> ret = new ArrayList<SpanData>(spans.size());
+ for (RecordEventsSpanImpl span : spans) {
+ ret.add(span.toSpanData());
+ }
+ return Collections.unmodifiableList(ret);
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java
new file mode 100644
index 00000000..962f5b01
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/RunningSpanStoreImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.export;
+
+import io.opencensus.implcore.trace.RecordEventsSpanImpl;
+import io.opencensus.trace.export.RunningSpanStore;
+import io.opencensus.trace.export.SpanData;
+import java.util.Collection;
+import java.util.Collections;
+
+/** Abstract implementation of the {@link RunningSpanStore}. */
+public abstract class RunningSpanStoreImpl extends RunningSpanStore {
+
+ private static final RunningSpanStoreImpl NOOP_RUNNING_SPAN_STORE_IMPL =
+ new NoopRunningSpanStoreImpl();
+
+ /** Returns the no-op implementation of the {@link RunningSpanStoreImpl}. */
+ static RunningSpanStoreImpl getNoopRunningSpanStoreImpl() {
+ return NOOP_RUNNING_SPAN_STORE_IMPL;
+ }
+
+ /**
+ * Adds the {@code Span} into the running spans list when the {@code Span} starts.
+ *
+ * @param span the {@code Span} that started.
+ */
+ public abstract void onStart(RecordEventsSpanImpl span);
+
+ /**
+ * Removes the {@code Span} from the running spans list when the {@code Span} ends.
+ *
+ * @param span the {@code Span} that ended.
+ */
+ public abstract void onEnd(RecordEventsSpanImpl span);
+
+ private static final class NoopRunningSpanStoreImpl extends RunningSpanStoreImpl {
+
+ private static final Summary EMPTY_SUMMARY =
+ RunningSpanStore.Summary.create(Collections.<String, PerSpanNameSummary>emptyMap());
+
+ @Override
+ public void onStart(RecordEventsSpanImpl span) {}
+
+ @Override
+ public void onEnd(RecordEventsSpanImpl span) {}
+
+ @Override
+ public Summary getSummary() {
+ return EMPTY_SUMMARY;
+ }
+
+ @Override
+ public Collection<SpanData> getRunningSpans(Filter filter) {
+ return Collections.<SpanData>emptyList();
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java
new file mode 100644
index 00000000..e67c2f8e
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SampledSpanStoreImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2018, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.export;
+
+import io.opencensus.implcore.trace.RecordEventsSpanImpl;
+import io.opencensus.trace.export.SampledSpanStore;
+import io.opencensus.trace.export.SpanData;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+/** Abstract implementation of the {@link SampledSpanStore}. */
+public abstract class SampledSpanStoreImpl extends SampledSpanStore {
+ private static final SampledSpanStoreImpl NOOP_SAMPLED_SPAN_STORE_IMPL =
+ new NoopSampledSpanStoreImpl();
+
+ /** Returns the new no-op implmentation of {@link SampledSpanStoreImpl}. */
+ public static SampledSpanStoreImpl getNoopSampledSpanStoreImpl() {
+ return NOOP_SAMPLED_SPAN_STORE_IMPL;
+ }
+
+ /**
+ * Considers to save the given spans to the stored samples. This must be called at the end of each
+ * Span with the option RECORD_EVENTS.
+ *
+ * @param span the span to be consider for storing into the store buckets.
+ */
+ public abstract void considerForSampling(RecordEventsSpanImpl span);
+
+ protected void shutdown() {}
+
+ private static final class NoopSampledSpanStoreImpl extends SampledSpanStoreImpl {
+ private static final Summary EMPTY_SUMMARY =
+ Summary.create(Collections.<String, PerSpanNameSummary>emptyMap());
+ private static final Set<String> EMPTY_REGISTERED_SPAN_NAMES = Collections.<String>emptySet();
+ private static final Collection<SpanData> EMPTY_SPANDATA = Collections.<SpanData>emptySet();
+
+ @Override
+ public Summary getSummary() {
+ return EMPTY_SUMMARY;
+ }
+
+ @Override
+ public void considerForSampling(RecordEventsSpanImpl span) {}
+
+ @Override
+ public void registerSpanNamesForCollection(Collection<String> spanNames) {}
+
+ @Override
+ public void unregisterSpanNamesForCollection(Collection<String> spanNames) {}
+
+ @Override
+ public Set<String> getRegisteredSpanNamesForCollection() {
+ return EMPTY_REGISTERED_SPAN_NAMES;
+ }
+
+ @Override
+ public Collection<SpanData> getErrorSampledSpans(ErrorFilter filter) {
+ return EMPTY_SPANDATA;
+ }
+
+ @Override
+ public Collection<SpanData> getLatencySampledSpans(LatencyFilter filter) {
+ return EMPTY_SPANDATA;
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java
new file mode 100644
index 00000000..51a7b05c
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/export/SpanExporterImpl.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.export;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opencensus.common.Duration;
+import io.opencensus.implcore.internal.DaemonThreadFactory;
+import io.opencensus.implcore.trace.RecordEventsSpanImpl;
+import io.opencensus.trace.export.ExportComponent;
+import io.opencensus.trace.export.SpanData;
+import io.opencensus.trace.export.SpanExporter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.concurrent.GuardedBy;
+
+/** Implementation of the {@link SpanExporter}. */
+public final class SpanExporterImpl extends SpanExporter {
+ private static final Logger logger = Logger.getLogger(ExportComponent.class.getName());
+
+ private final Worker worker;
+ private final Thread workerThread;
+
+ /**
+ * Constructs a {@code SpanExporterImpl} that exports the {@link SpanData} asynchronously.
+ *
+ * <p>Starts a separate thread that wakes up every {@code scheduleDelay} and exports any available
+ * spans data. If the number of buffered SpanData objects is greater than {@code bufferSize} then
+ * the thread wakes up sooner.
+ *
+ * @param bufferSize the size of the buffered span data.
+ * @param scheduleDelay the maximum delay.
+ */
+ static SpanExporterImpl create(int bufferSize, Duration scheduleDelay) {
+ // TODO(bdrutu): Consider to add a shutdown hook to not avoid dropping data.
+ Worker worker = new Worker(bufferSize, scheduleDelay);
+ return new SpanExporterImpl(worker);
+ }
+
+ /**
+ * Adds a Span to the exporting service.
+ *
+ * @param span the {@code Span} to be added.
+ */
+ public void addSpan(RecordEventsSpanImpl span) {
+ worker.addSpan(span);
+ }
+
+ @Override
+ public void registerHandler(String name, Handler handler) {
+ worker.registerHandler(name, handler);
+ }
+
+ @Override
+ public void unregisterHandler(String name) {
+ worker.unregisterHandler(name);
+ }
+
+ void flush() {
+ worker.flush();
+ }
+
+ void shutdown() {
+ flush();
+ workerThread.interrupt();
+ }
+
+ private SpanExporterImpl(Worker worker) {
+ this.workerThread =
+ new DaemonThreadFactory("ExportComponent.ServiceExporterThread").newThread(worker);
+ this.workerThread.start();
+ this.worker = worker;
+ }
+
+ @VisibleForTesting
+ Thread getServiceExporterThread() {
+ return workerThread;
+ }
+
+ // Worker in a thread that batches multiple span data and calls the registered services to export
+ // that data.
+ //
+ // The map of registered handlers is implemented using ConcurrentHashMap ensuring full
+ // concurrency of retrievals and adjustable expected concurrency for updates. Retrievals
+ // reflect the results of the most recently completed update operations held upon their onset.
+ //
+ // The list of batched data is protected by an explicit monitor object which ensures full
+ // concurrency.
+ private static final class Worker implements Runnable {
+ private final Object monitor = new Object();
+
+ @GuardedBy("monitor")
+ private final List<RecordEventsSpanImpl> spans;
+
+ private final Map<String, Handler> serviceHandlers = new ConcurrentHashMap<String, Handler>();
+ private final int bufferSize;
+ private final long scheduleDelayMillis;
+
+ // See SpanExporterImpl#addSpan.
+ private void addSpan(RecordEventsSpanImpl span) {
+ synchronized (monitor) {
+ this.spans.add(span);
+ if (spans.size() > bufferSize) {
+ monitor.notifyAll();
+ }
+ }
+ }
+
+ // See SpanExporter#registerHandler.
+ private void registerHandler(String name, Handler serviceHandler) {
+ serviceHandlers.put(name, serviceHandler);
+ }
+
+ // See SpanExporter#unregisterHandler.
+ private void unregisterHandler(String name) {
+ serviceHandlers.remove(name);
+ }
+
+ // Exports the list of SpanData to all the ServiceHandlers.
+ private void onBatchExport(List<SpanData> spanDataList) {
+ // From the java documentation of the ConcurrentHashMap#entrySet():
+ // The view's iterator is a "weakly consistent" iterator that will never throw
+ // ConcurrentModificationException, and guarantees to traverse elements as they existed
+ // upon construction of the iterator, and may (but is not guaranteed to) reflect any
+ // modifications subsequent to construction.
+ for (Map.Entry<String, Handler> it : serviceHandlers.entrySet()) {
+ // In case of any exception thrown by the service handlers continue to run.
+ try {
+ it.getValue().export(spanDataList);
+ } catch (Throwable e) {
+ logger.log(Level.WARNING, "Exception thrown by the service export " + it.getKey(), e);
+ }
+ }
+ }
+
+ private Worker(int bufferSize, Duration scheduleDelay) {
+ spans = new ArrayList<RecordEventsSpanImpl>(bufferSize);
+ this.bufferSize = bufferSize;
+ this.scheduleDelayMillis = scheduleDelay.toMillis();
+ }
+
+ // Returns an unmodifiable list of all buffered spans data to ensure that any registered
+ // service handler cannot modify the list.
+ private static List<SpanData> fromSpanImplToSpanData(List<RecordEventsSpanImpl> spans) {
+ List<SpanData> spanDatas = new ArrayList<SpanData>(spans.size());
+ for (RecordEventsSpanImpl span : spans) {
+ spanDatas.add(span.toSpanData());
+ }
+ return Collections.unmodifiableList(spanDatas);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ // Copy all the batched spans in a separate list to release the monitor lock asap to
+ // avoid blocking the producer thread.
+ List<RecordEventsSpanImpl> spansCopy;
+ synchronized (monitor) {
+ if (spans.size() < bufferSize) {
+ do {
+ // In the case of a spurious wakeup we export only if we have at least one span in
+ // the batch. It is acceptable because batching is a best effort mechanism here.
+ try {
+ monitor.wait(scheduleDelayMillis);
+ } catch (InterruptedException ie) {
+ // Preserve the interruption status as per guidance and stop doing any work.
+ Thread.currentThread().interrupt();
+ return;
+ }
+ } while (spans.isEmpty());
+ }
+ spansCopy = new ArrayList<RecordEventsSpanImpl>(spans);
+ spans.clear();
+ }
+ // Execute the batch export outside the synchronized to not block all producers.
+ final List<SpanData> spanDataList = fromSpanImplToSpanData(spansCopy);
+ if (!spanDataList.isEmpty()) {
+ onBatchExport(spanDataList);
+ }
+ }
+ }
+
+ void flush() {
+ List<RecordEventsSpanImpl> spansCopy;
+ synchronized (monitor) {
+ spansCopy = new ArrayList<RecordEventsSpanImpl>(spans);
+ spans.clear();
+ }
+
+ final List<SpanData> spanDataList = fromSpanImplToSpanData(spansCopy);
+ if (!spanDataList.isEmpty()) {
+ onBatchExport(spanDataList);
+ }
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/internal/ConcurrentIntrusiveList.java b/impl_core/src/main/java/io/opencensus/implcore/trace/internal/ConcurrentIntrusiveList.java
new file mode 100644
index 00000000..22d8e41a
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/internal/ConcurrentIntrusiveList.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.internal;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import io.opencensus.implcore.internal.CheckerFrameworkUtils;
+import io.opencensus.implcore.trace.internal.ConcurrentIntrusiveList.Element;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * An {@code ConcurrentIntrusiveList<T>} is a doubly-linked list where the link pointers are
+ * embedded in the elements. This makes insertion and removal into a known position constant time.
+ *
+ * <p>Elements must derive from the {@code Element<T extends Element<T>>} interface:
+ *
+ * <pre><code>
+ * class MyClass implements {@code Element<MyClass>} {
+ * private MyClass next = null;
+ * private MyClass prev = null;
+ *
+ * {@literal @}Override
+ * MyClass getNext() {
+ * return next;
+ * }
+ *
+ * {@literal @}Override
+ * void setNext(MyClass element) {
+ * next = element;
+ * }
+ *
+ * {@literal @}Override
+ * MyClass getPrev() {
+ * return prev;
+ * }
+ *
+ * {@literal @}Override
+ * void setPrev(MyClass element) {
+ * prev = element;
+ * }
+ * }
+ * </code></pre>
+ */
+@ThreadSafe
+public final class ConcurrentIntrusiveList<T extends Element<T>> {
+ private int size = 0;
+ @Nullable private T head = null;
+
+ public ConcurrentIntrusiveList() {}
+
+ /**
+ * Adds the given {@code element} to the list.
+ *
+ * @param element the element to add.
+ * @throws IllegalArgumentException if the element is already in a list.
+ */
+ public synchronized void addElement(T element) {
+ checkArgument(
+ element.getNext() == null && element.getPrev() == null && element != head,
+ "Element already in a list.");
+ size++;
+ if (head == null) {
+ head = element;
+ } else {
+ head.setPrev(element);
+ element.setNext(head);
+ head = element;
+ }
+ }
+
+ /**
+ * Removes the given {@code element} from the list.
+ *
+ * @param element the element to remove.
+ * @throws IllegalArgumentException if the element is not in the list.
+ */
+ public synchronized void removeElement(T element) {
+ checkArgument(
+ element.getNext() != null || element.getPrev() != null || element == head,
+ "Element not in the list.");
+ size--;
+ if (element.getPrev() == null) {
+ // This is the first element
+ head = element.getNext();
+ if (head != null) {
+ // If more than one element in the list.
+ head.setPrev(null);
+ element.setNext(null);
+ }
+ } else if (element.getNext() == null) {
+ // This is the last element, and there is at least another element because
+ // element.getPrev() != null.
+ CheckerFrameworkUtils.castNonNull(element.getPrev()).setNext(null);
+ element.setPrev(null);
+ } else {
+ CheckerFrameworkUtils.castNonNull(element.getPrev()).setNext(element.getNext());
+ CheckerFrameworkUtils.castNonNull(element.getNext()).setPrev(element.getPrev());
+ element.setNext(null);
+ element.setPrev(null);
+ }
+ }
+
+ /**
+ * Returns the number of elements in this list.
+ *
+ * @return the number of elements in this list.
+ */
+ public synchronized int size() {
+ return size;
+ }
+
+ /**
+ * Returns all the elements from this list.
+ *
+ * @return all the elements from this list.
+ */
+ public synchronized Collection<T> getAll() {
+ List<T> all = new ArrayList<T>(size);
+ for (T e = head; e != null; e = e.getNext()) {
+ all.add(e);
+ }
+ return all;
+ }
+
+ /**
+ * This is an interface that must be implemented by any element that uses {@link
+ * ConcurrentIntrusiveList}.
+ *
+ * @param <T> the element that will be used for the list.
+ */
+ public interface Element<T extends Element<T>> {
+
+ /**
+ * Returns a reference to the next element in the list.
+ *
+ * @return a reference to the next element in the list.
+ */
+ @Nullable
+ T getNext();
+
+ /**
+ * Sets the reference to the next element in the list.
+ *
+ * @param element the reference to the next element in the list.
+ */
+ void setNext(@Nullable T element);
+
+ /**
+ * Returns a reference to the previous element in the list.
+ *
+ * @return a reference to the previous element in the list.
+ */
+ @Nullable
+ T getPrev();
+
+ /**
+ * Sets the reference to the previous element in the list.
+ *
+ * @param element the reference to the previous element in the list.
+ */
+ void setPrev(@Nullable T element);
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/internal/RandomHandler.java b/impl_core/src/main/java/io/opencensus/implcore/trace/internal/RandomHandler.java
new file mode 100644
index 00000000..70be5a90
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/internal/RandomHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.internal;
+
+import java.security.SecureRandom;
+import java.util.Random;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * Abstract class to access the current {@link Random}.
+ *
+ * <p>Implementation can have a per thread instance or a single global instance.
+ */
+@ThreadSafe
+public abstract class RandomHandler {
+ /**
+ * Returns the current {@link Random}.
+ *
+ * @return the current {@code Random}.
+ */
+ public abstract Random current();
+
+ /** Implementation of the {@link RandomHandler} using {@link SecureRandom}. */
+ @ThreadSafe
+ public static final class SecureRandomHandler extends RandomHandler {
+ private final Random random = new SecureRandom();
+
+ /** Constructs a new {@link SecureRandomHandler}. */
+ public SecureRandomHandler() {}
+
+ @Override
+ public Random current() {
+ return random;
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/B3Format.java b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/B3Format.java
new file mode 100644
index 00000000..d928d93c
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/B3Format.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.propagation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.SpanId;
+import io.opencensus.trace.TraceId;
+import io.opencensus.trace.TraceOptions;
+import io.opencensus.trace.Tracestate;
+import io.opencensus.trace.propagation.SpanContextParseException;
+import io.opencensus.trace.propagation.TextFormat;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/*>>>
+import org.checkerframework.checker.nullness.qual.NonNull;
+*/
+
+/**
+ * Implementation of the B3 propagation protocol. See <a
+ * href=https://github.com/openzipkin/b3-propagation>b3-propagation</a>.
+ */
+final class B3Format extends TextFormat {
+ private static final Tracestate TRACESTATE_DEFAULT = Tracestate.builder().build();
+ @VisibleForTesting static final String X_B3_TRACE_ID = "X-B3-TraceId";
+ @VisibleForTesting static final String X_B3_SPAN_ID = "X-B3-SpanId";
+ @VisibleForTesting static final String X_B3_PARENT_SPAN_ID = "X-B3-ParentSpanId";
+ @VisibleForTesting static final String X_B3_SAMPLED = "X-B3-Sampled";
+ @VisibleForTesting static final String X_B3_FLAGS = "X-B3-Flags";
+ private static final List<String> FIELDS =
+ Collections.unmodifiableList(
+ Arrays.asList(
+ X_B3_TRACE_ID, X_B3_SPAN_ID, X_B3_PARENT_SPAN_ID, X_B3_SAMPLED, X_B3_FLAGS));
+
+ // Used as the upper TraceId.SIZE hex characters of the traceID. B3-propagation used to send
+ // TraceId.SIZE hex characters (8-bytes traceId) in the past.
+ private static final String UPPER_TRACE_ID = "0000000000000000";
+ // Sampled value via the X_B3_SAMPLED header.
+ private static final String SAMPLED_VALUE = "1";
+ // "Debug" sampled value.
+ private static final String FLAGS_VALUE = "1";
+
+ @Override
+ public List<String> fields() {
+ return FIELDS;
+ }
+
+ @Override
+ public <C /*>>> extends @NonNull Object*/> void inject(
+ SpanContext spanContext, C carrier, Setter<C> setter) {
+ checkNotNull(spanContext, "spanContext");
+ checkNotNull(setter, "setter");
+ checkNotNull(carrier, "carrier");
+ setter.put(carrier, X_B3_TRACE_ID, spanContext.getTraceId().toLowerBase16());
+ setter.put(carrier, X_B3_SPAN_ID, spanContext.getSpanId().toLowerBase16());
+ if (spanContext.getTraceOptions().isSampled()) {
+ setter.put(carrier, X_B3_SAMPLED, SAMPLED_VALUE);
+ }
+ }
+
+ @Override
+ public <C /*>>> extends @NonNull Object*/> SpanContext extract(C carrier, Getter<C> getter)
+ throws SpanContextParseException {
+ checkNotNull(carrier, "carrier");
+ checkNotNull(getter, "getter");
+ try {
+ TraceId traceId;
+ String traceIdStr = getter.get(carrier, X_B3_TRACE_ID);
+ if (traceIdStr != null) {
+ if (traceIdStr.length() == TraceId.SIZE) {
+ // This is an 8-byte traceID.
+ traceIdStr = UPPER_TRACE_ID + traceIdStr;
+ }
+ traceId = TraceId.fromLowerBase16(traceIdStr);
+ } else {
+ throw new SpanContextParseException("Missing X_B3_TRACE_ID.");
+ }
+ SpanId spanId;
+ String spanIdStr = getter.get(carrier, X_B3_SPAN_ID);
+ if (spanIdStr != null) {
+ spanId = SpanId.fromLowerBase16(spanIdStr);
+ } else {
+ throw new SpanContextParseException("Missing X_B3_SPAN_ID.");
+ }
+ TraceOptions traceOptions = TraceOptions.DEFAULT;
+ if (SAMPLED_VALUE.equals(getter.get(carrier, X_B3_SAMPLED))
+ || FLAGS_VALUE.equals(getter.get(carrier, X_B3_FLAGS))) {
+ traceOptions = TraceOptions.builder().setIsSampled(true).build();
+ }
+ return SpanContext.create(traceId, spanId, traceOptions, TRACESTATE_DEFAULT);
+ } catch (IllegalArgumentException e) {
+ throw new SpanContextParseException("Invalid input.", e);
+ }
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/BinaryFormatImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/BinaryFormatImpl.java
new file mode 100644
index 00000000..233fbd31
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/BinaryFormatImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.propagation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.SpanId;
+import io.opencensus.trace.TraceId;
+import io.opencensus.trace.TraceOptions;
+import io.opencensus.trace.Tracestate;
+import io.opencensus.trace.propagation.BinaryFormat;
+import io.opencensus.trace.propagation.SpanContextParseException;
+
+/**
+ * Implementation of the {@link BinaryFormat}.
+ *
+ * <p>BinaryFormat format:
+ *
+ * <ul>
+ * <li>Binary value: &lt;version_id&gt;&lt;version_format&gt;
+ * <li>version_id: 1-byte representing the version id.
+ * <li>For version_id = 0:
+ * <ul>
+ * <li>version_format: &lt;field&gt;&lt;field&gt;
+ * <li>field_format: &lt;field_id&gt;&lt;field_format&gt;
+ * <li>Fields:
+ * <ul>
+ * <li>TraceId: (field_id = 0, len = 16, default = &#34;0000000000000000&#34;) -
+ * 16-byte array representing the trace_id.
+ * <li>SpanId: (field_id = 1, len = 8, default = &#34;00000000&#34;) - 8-byte array
+ * representing the span_id.
+ * <li>TraceOptions: (field_id = 2, len = 1, default = &#34;0&#34;) - 1-byte array
+ * representing the trace_options.
+ * </ul>
+ * <li>Fields MUST be encoded using the field id order (smaller to higher).
+ * <li>Valid value example:
+ * <ul>
+ * <li>{0, 0, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 1, 97,
+ * 98, 99, 100, 101, 102, 103, 104, 2, 1}
+ * <li>version_id = 0;
+ * <li>trace_id = {64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79}
+ * <li>span_id = {97, 98, 99, 100, 101, 102, 103, 104};
+ * <li>trace_options = {1};
+ * </ul>
+ * </ul>
+ * </ul>
+ */
+final class BinaryFormatImpl extends BinaryFormat {
+ private static final Tracestate TRACESTATE_DEFAULT = Tracestate.builder().build();
+ private static final byte VERSION_ID = 0;
+ private static final int VERSION_ID_OFFSET = 0;
+ // The version_id/field_id size in bytes.
+ private static final byte ID_SIZE = 1;
+ private static final byte TRACE_ID_FIELD_ID = 0;
+
+ // TODO: clarify if offsets are correct here. While the specification suggests you should stop
+ // parsing when you hit an unknown field, it does not suggest that fields must be declared in
+ // ID order. Rather it only groups by data type order, in this case Trace Context
+ // https://github.com/census-instrumentation/opencensus-specs/blob/master/encodings/BinaryEncoding.md#deserialization-rules
+ @VisibleForTesting static final int TRACE_ID_FIELD_ID_OFFSET = VERSION_ID_OFFSET + ID_SIZE;
+
+ private static final int TRACE_ID_OFFSET = TRACE_ID_FIELD_ID_OFFSET + ID_SIZE;
+ private static final byte SPAN_ID_FIELD_ID = 1;
+
+ @VisibleForTesting static final int SPAN_ID_FIELD_ID_OFFSET = TRACE_ID_OFFSET + TraceId.SIZE;
+
+ private static final int SPAN_ID_OFFSET = SPAN_ID_FIELD_ID_OFFSET + ID_SIZE;
+ private static final byte TRACE_OPTION_FIELD_ID = 2;
+
+ @VisibleForTesting static final int TRACE_OPTION_FIELD_ID_OFFSET = SPAN_ID_OFFSET + SpanId.SIZE;
+
+ private static final int TRACE_OPTIONS_OFFSET = TRACE_OPTION_FIELD_ID_OFFSET + ID_SIZE;
+ /** Version, Trace and Span IDs are required fields. */
+ private static final int REQUIRED_FORMAT_LENGTH = 3 * ID_SIZE + TraceId.SIZE + SpanId.SIZE;
+ /** Use {@link TraceOptions#DEFAULT} unless its optional field is present. */
+ private static final int ALL_FORMAT_LENGTH = REQUIRED_FORMAT_LENGTH + ID_SIZE + TraceOptions.SIZE;
+
+ @Override
+ public byte[] toByteArray(SpanContext spanContext) {
+ checkNotNull(spanContext, "spanContext");
+ byte[] bytes = new byte[ALL_FORMAT_LENGTH];
+ bytes[VERSION_ID_OFFSET] = VERSION_ID;
+ bytes[TRACE_ID_FIELD_ID_OFFSET] = TRACE_ID_FIELD_ID;
+ spanContext.getTraceId().copyBytesTo(bytes, TRACE_ID_OFFSET);
+ bytes[SPAN_ID_FIELD_ID_OFFSET] = SPAN_ID_FIELD_ID;
+ spanContext.getSpanId().copyBytesTo(bytes, SPAN_ID_OFFSET);
+ bytes[TRACE_OPTION_FIELD_ID_OFFSET] = TRACE_OPTION_FIELD_ID;
+ spanContext.getTraceOptions().copyBytesTo(bytes, TRACE_OPTIONS_OFFSET);
+ return bytes;
+ }
+
+ @Override
+ public SpanContext fromByteArray(byte[] bytes) throws SpanContextParseException {
+ checkNotNull(bytes, "bytes");
+ if (bytes.length == 0 || bytes[0] != VERSION_ID) {
+ throw new SpanContextParseException("Unsupported version.");
+ }
+ if (bytes.length < REQUIRED_FORMAT_LENGTH) {
+ throw new SpanContextParseException("Invalid input: truncated");
+ }
+ // TODO: the following logic assumes that fields are written in ID order. The spec does not say
+ // that. If it decides not to, this logic would need to be more like a loop
+ TraceId traceId;
+ SpanId spanId;
+ TraceOptions traceOptions = TraceOptions.DEFAULT;
+ int pos = 1;
+ if (bytes[pos] == TRACE_ID_FIELD_ID) {
+ traceId = TraceId.fromBytes(bytes, pos + ID_SIZE);
+ pos += ID_SIZE + TraceId.SIZE;
+ } else {
+ // TODO: update the spec to suggest that the trace ID is not actually optional
+ throw new SpanContextParseException("Invalid input: expected trace ID at offset " + pos);
+ }
+ if (bytes[pos] == SPAN_ID_FIELD_ID) {
+ spanId = SpanId.fromBytes(bytes, pos + ID_SIZE);
+ pos += ID_SIZE + SpanId.SIZE;
+ } else {
+ // TODO: update the spec to suggest that the span ID is not actually optional.
+ throw new SpanContextParseException("Invalid input: expected span ID at offset " + pos);
+ }
+ // Check to see if we are long enough to include an options field, and also that the next field
+ // is an options field. Per spec we simply stop parsing at first unknown field instead of
+ // failing.
+ if (bytes.length > pos && bytes[pos] == TRACE_OPTION_FIELD_ID) {
+ if (bytes.length < ALL_FORMAT_LENGTH) {
+ throw new SpanContextParseException("Invalid input: truncated");
+ }
+ traceOptions = TraceOptions.fromByte(bytes[pos + ID_SIZE]);
+ }
+ return SpanContext.create(traceId, spanId, traceOptions, TRACESTATE_DEFAULT);
+ }
+}
diff --git a/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/PropagationComponentImpl.java b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/PropagationComponentImpl.java
new file mode 100644
index 00000000..f608543d
--- /dev/null
+++ b/impl_core/src/main/java/io/opencensus/implcore/trace/propagation/PropagationComponentImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2017, OpenCensus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.opencensus.implcore.trace.propagation;
+
+import io.opencensus.trace.propagation.BinaryFormat;
+import io.opencensus.trace.propagation.PropagationComponent;
+import io.opencensus.trace.propagation.TextFormat;
+
+/** Implementation of the {@link PropagationComponent}. */
+public class PropagationComponentImpl extends PropagationComponent {
+ private final BinaryFormat binaryFormat = new BinaryFormatImpl();
+ private final B3Format b3Format = new B3Format();
+
+ @Override
+ public BinaryFormat getBinaryFormat() {
+ return binaryFormat;
+ }
+
+ @Override
+ public TextFormat getB3Format() {
+ return b3Format;
+ }
+}