Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.sdk.common.InternalTelemetryVersion;
import io.opentelemetry.sdk.common.internal.ComponentId;
import io.opentelemetry.sdk.common.internal.DaemonThreadFactory;
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
import io.opentelemetry.sdk.common.internal.ThrowableUtil;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
Expand Down Expand Up @@ -46,6 +47,7 @@ public final class BatchSpanProcessor implements SpanProcessor {
ComponentId.generateLazy("batching_span_processor");

private static final Logger logger = Logger.getLogger(BatchSpanProcessor.class.getName());
private static final ThrottlingLogger throttledLogger = new ThrottlingLogger(logger);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throttlingLogger would be a better name.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I've updated the variable name from throttledLogger to throttlingLogger to better reflect that this logger performs throttling (matching the active form of the class name ThrottlingLogger).


private static final String WORKER_THREAD_NAME =
BatchSpanProcessor.class.getSimpleName() + "_WorkerThread";
Expand Down Expand Up @@ -212,6 +214,11 @@ private void addSpan(ReadableSpan span) {
spanProcessorInstrumentation.buildQueueMetricsOnce(maxQueueSize, queue::size);
if (!queue.offer(span)) {
spanProcessorInstrumentation.dropSpans(1);
throttledLogger.log(
Level.WARNING,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't warning too high level? I think INFO would be enough.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think WARNING is appropriate here because dropped spans represent actual data loss, which impacts observability. This warrants immediate attention from operators in production. Since we're using ThrottlingLogger, the output is already rate-limited to prevent log spam.

However, if the team prefers INFO, I'm happy to adjust. Please let me know.

"BatchSpanProcessor dropped a span because the queue is full (maxQueueSize="
+ maxQueueSize
+ ")");
} else {
if (queueSize.incrementAndGet() >= spansNeeded.get()) {
signal.offer(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.internal.GuardedBy;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
Expand All @@ -29,6 +30,7 @@
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import java.time.Duration;
import org.slf4j.event.Level;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -42,6 +44,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand All @@ -62,9 +65,13 @@ class BatchSpanProcessorTest {
@Mock private Sampler mockSampler;
@Mock private SpanExporter mockSpanExporter;

@RegisterExtension
LogCapturer logs = LogCapturer.create().captureForType(BatchSpanProcessor.class);

@BeforeEach
void setUp() {
when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
when(mockSpanExporter.export(anyList())).thenReturn(CompletableResultCode.ofSuccess());
}

@AfterEach
Expand Down Expand Up @@ -232,6 +239,30 @@ void exportMoreSpansThanTheBufferSize() {
span6.toSpanData()));
}

@Test
void droppedSpanIsLogged() {
sdkTracerProvider =
SdkTracerProvider.builder()
.addSpanProcessor(
BatchSpanProcessor.builder(mockSpanExporter)
.setMaxQueueSize(1)
.setMaxExportBatchSize(1_000)
.setScheduleDelay(Duration.ofDays(1))
.build())
.build();

// Add two spans quickly to trigger a drop when the queue is full.
createEndedSpan(SPAN_NAME_1);
createEndedSpan(SPAN_NAME_2);

await()
.untilAsserted(
() ->
logs.assertContains(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable to assert the whole message, especially since it has some dynamic parts

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @anuraaga! Good catches on the test. I've updated it.

@jkwatson @jack-berg -- on the question about SpanProcessorInstrumentation, I agree with @anuraaga that keeping the tracking in BSP makes more sense since the instrumentation counters are synchronous and adding state for this there would complicate things without much benefit.

Would appreciate another look when you get a chance!

loggingEvent -> loggingEvent.getLevel().equals(Level.WARN),
"BatchSpanProcessor dropped a span"));
}

@Test
void forceExport() {
WaitingSpanExporter waitingSpanExporter =
Expand Down