/* * Copyright 2017, OpenCensus Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.opencensus.implcore.trace.export; import com.google.common.annotations.VisibleForTesting; import io.opencensus.common.Duration; import io.opencensus.implcore.internal.DaemonThreadFactory; import io.opencensus.implcore.trace.RecordEventsSpanImpl; import io.opencensus.trace.export.ExportComponent; import io.opencensus.trace.export.SpanData; import io.opencensus.trace.export.SpanExporter; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; /** Implementation of the {@link SpanExporter}. */ public final class SpanExporterImpl extends SpanExporter { private static final Logger logger = Logger.getLogger(ExportComponent.class.getName()); private final Worker worker; private final Thread workerThread; /** * Constructs a {@code SpanExporterImpl} that exports the {@link SpanData} asynchronously. * *
Starts a separate thread that wakes up every {@code scheduleDelay} and exports any available
* spans data. If the number of buffered SpanData objects is greater than {@code bufferSize} then
* the thread wakes up sooner.
*
* @param bufferSize the size of the buffered span data.
* @param scheduleDelay the maximum delay.
*/
static SpanExporterImpl create(int bufferSize, Duration scheduleDelay) {
// TODO(bdrutu): Consider to add a shutdown hook to not avoid dropping data.
Worker worker = new Worker(bufferSize, scheduleDelay);
return new SpanExporterImpl(worker);
}
/**
* Adds a Span to the exporting service.
*
* @param span the {@code Span} to be added.
*/
public void addSpan(RecordEventsSpanImpl span) {
worker.addSpan(span);
}
@Override
public void registerHandler(String name, Handler handler) {
worker.registerHandler(name, handler);
}
@Override
public void unregisterHandler(String name) {
worker.unregisterHandler(name);
}
void flush() {
worker.flush();
}
void shutdown() {
flush();
workerThread.interrupt();
}
private SpanExporterImpl(Worker worker) {
this.workerThread =
new DaemonThreadFactory("ExportComponent.ServiceExporterThread").newThread(worker);
this.workerThread.start();
this.worker = worker;
}
@VisibleForTesting
Thread getServiceExporterThread() {
return workerThread;
}
// Worker in a thread that batches multiple span data and calls the registered services to export
// that data.
//
// The map of registered handlers is implemented using ConcurrentHashMap ensuring full
// concurrency of retrievals and adjustable expected concurrency for updates. Retrievals
// reflect the results of the most recently completed update operations held upon their onset.
//
// The list of batched data is protected by an explicit monitor object which ensures full
// concurrency.
private static final class Worker implements Runnable {
private final Object monitor = new Object();
@GuardedBy("monitor")
private final List