Skip to content

Commit 8970e4d

Browse files
committed
Ensure constant reference to thread
Making the worker extend thread was the only sensible way of having it keep a constant reference to its thread. Everything else I tried was quite hacky
1 parent 481bf35 commit 8970e4d

1 file changed

Lines changed: 10 additions & 15 deletions

File tree

sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import io.opentelemetry.sdk.common.CompletableResultCode;
1111
import io.opentelemetry.sdk.common.InternalTelemetryVersion;
1212
import io.opentelemetry.sdk.common.internal.ComponentId;
13-
import io.opentelemetry.sdk.common.internal.DaemonThreadFactory;
1413
import io.opentelemetry.sdk.common.internal.ThrowableUtil;
1514
import io.opentelemetry.sdk.trace.ReadWriteSpan;
1615
import io.opentelemetry.sdk.trace.ReadableSpan;
@@ -29,7 +28,6 @@
2928
import java.util.function.Supplier;
3029
import java.util.logging.Level;
3130
import java.util.logging.Logger;
32-
import javax.annotation.Nullable;
3331

3432
/**
3533
* Implementation of the {@link SpanProcessor} that batches spans exported by the SDK then pushes
@@ -85,9 +83,8 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) {
8583
exporterTimeoutNanos,
8684
JcTools.newFixedSizeQueue(maxQueueSize),
8785
maxQueueSize);
88-
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
89-
this.worker.setWorkerThread(workerThread);
90-
workerThread.start();
86+
87+
worker.start();
9188
}
9289

9390
@Override
@@ -160,7 +157,7 @@ public String toString() {
160157

161158
// Worker is a thread that batches multiple spans and calls the registered SpanExporter to export
162159
// the data.
163-
private static final class Worker implements Runnable {
160+
private static final class Worker extends Thread {
164161

165162
private final SpanProcessorInstrumentation spanProcessorInstrumentation;
166163

@@ -170,7 +167,6 @@ private static final class Worker implements Runnable {
170167
private final long exporterTimeoutNanos;
171168

172169
private long nextExportTime;
173-
@Nullable private Thread workerThread;
174170

175171
private final Queue<ReadableSpan> queue;
176172
private final AtomicInteger queueSize = new AtomicInteger();
@@ -196,6 +192,9 @@ private Worker(
196192
long exporterTimeoutNanos,
197193
Queue<ReadableSpan> queue,
198194
long maxQueueSize) {
195+
super(WORKER_THREAD_NAME);
196+
super.setDaemon(true);
197+
199198
this.spanExporter = spanExporter;
200199
this.scheduleDelayNanos = scheduleDelayNanos;
201200
this.maxExportBatchSize = maxExportBatchSize;
@@ -209,18 +208,14 @@ private Worker(
209208
this.batch = new ArrayList<>(this.maxExportBatchSize);
210209
}
211210

212-
private void setWorkerThread(Thread workerThread) {
213-
this.workerThread = workerThread;
214-
}
215-
216211
private void addSpan(ReadableSpan span) {
217212
spanProcessorInstrumentation.buildQueueMetricsOnce(maxQueueSize, queue::size);
218213
if (!queue.offer(span)) {
219214
spanProcessorInstrumentation.dropSpans(1);
220215
droppedSpanCount.incrementAndGet();
221216
} else {
222-
if (workerThread != null && queueSize.incrementAndGet() >= spansNeeded) {
223-
LockSupport.unpark(workerThread);
217+
if (queueSize.incrementAndGet() >= spansNeeded) {
218+
LockSupport.unpark(this);
224219
}
225220
}
226221
}
@@ -301,8 +296,8 @@ private CompletableResultCode shutdown() {
301296
private CompletableResultCode forceFlush() {
302297
CompletableResultCode flushResult = new CompletableResultCode();
303298
// we set the atomic here to trigger the worker loop to do a flush of the entire queue.
304-
if (workerThread != null && flushRequested.compareAndSet(null, flushResult)) {
305-
LockSupport.unpark(workerThread);
299+
if (flushRequested.compareAndSet(null, flushResult)) {
300+
LockSupport.unpark(this);
306301
}
307302
CompletableResultCode possibleResult = flushRequested.get();
308303
// there's a race here where the flush happening in the worker loop could complete before we

0 commit comments

Comments
 (0)