diff --git a/processors/README.md b/processors/README.md index 795fe61ced..42f17aa55f 100644 --- a/processors/README.md +++ b/processors/README.md @@ -32,6 +32,50 @@ logger_provider: `FilteringLogRecordProcessor` is a `LogRecordProcessor` that only keep logs based on a predicate +## Filtering Span Exporter + +`FilteringSpanExporter` is a `SpanExporter` wrapper that filters spans within each export batch before delegating to the underlying exporter. Filtering is composable via two interfaces: + +- `SpanFilter` - evaluates individual spans (e.g., error status, slow duration) +- `TraceFilter` - evaluates all spans belonging to a trace within the batch (e.g., overall trace wall-clock duration) + +Within a batch, if any `SpanFilter` matches any span or any `TraceFilter` matches a trace's span group, all spans sharing that trace ID in the batch are exported together. + +**Note:** Filtering decisions are scoped to a single `export()` call. Spans from the same trace arriving in different batches are evaluated independently, so a trace split across batches may be partially exported. + +Built-in filters: + +- `ErrorSpanFilter` - matches spans with error status +- `DurationSpanFilter` - matches spans exceeding a duration threshold +- `TraceDurationFilter` - matches when a trace's wall-clock duration (max end - min start) in the batch exceeds a threshold + +Usage: + +```java +SpanExporter delegate = OtlpGrpcSpanExporter.getDefault(); + +// Export spans whose batch-colocated trace has errors, individual spans > 2s, or trace duration > 10s +SpanExporter filtering = new FilteringSpanExporter( + delegate, + Arrays.asList(new ErrorSpanFilter(), new DurationSpanFilter(Duration.ofSeconds(2))), + Collections.singletonList(new TraceDurationFilter(Duration.ofSeconds(10)))); + +// Custom filters +SpanFilter nameFilter = span -> span.getName().contains("important"); +SpanExporter custom = new FilteringSpanExporter( + delegate, + Collections.singletonList(nameFilter), + Collections.emptyList()); + +// Optionally pass a Meter to emit dropped-span metrics +Meter meter = openTelemetry.getMeter("my-service"); +SpanExporter withMetrics = new FilteringSpanExporter( + delegate, + Arrays.asList(new ErrorSpanFilter(), new DurationSpanFilter(Duration.ofSeconds(2))), + Collections.singletonList(new TraceDurationFilter(Duration.ofSeconds(10))), + meter); +``` + ## Component owners - [Cesar Munoz](https://github.com/LikeTheSalad), Elastic diff --git a/processors/src/main/java/io/opentelemetry/contrib/filter/DurationSpanFilter.java b/processors/src/main/java/io/opentelemetry/contrib/filter/DurationSpanFilter.java new file mode 100644 index 0000000000..037fd82ef1 --- /dev/null +++ b/processors/src/main/java/io/opentelemetry/contrib/filter/DurationSpanFilter.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import io.opentelemetry.sdk.trace.data.SpanData; +import java.time.Duration; + +/** + * A {@link SpanFilter} that matches spans whose duration exceeds a configurable threshold, causing + * all batch-colocated spans sharing the same trace ID to be exported. + */ +public final class DurationSpanFilter implements SpanFilter { + + private final long thresholdNanos; + + /** + * Creates a new {@code DurationSpanFilter}. + * + * @param threshold the duration threshold; spans with duration strictly greater than this are + * considered interesting + */ + public DurationSpanFilter(Duration threshold) { + if (threshold.isNegative()) { + throw new IllegalArgumentException("threshold must be non-negative, got: " + threshold); + } + this.thresholdNanos = threshold.toNanos(); + } + + @Override + public boolean shouldKeep(SpanData spanData) { + long durationNanos = spanData.getEndEpochNanos() - spanData.getStartEpochNanos(); + return durationNanos > thresholdNanos; + } +} diff --git a/processors/src/main/java/io/opentelemetry/contrib/filter/ErrorSpanFilter.java b/processors/src/main/java/io/opentelemetry/contrib/filter/ErrorSpanFilter.java new file mode 100644 index 0000000000..34e5f59081 --- /dev/null +++ b/processors/src/main/java/io/opentelemetry/contrib/filter/ErrorSpanFilter.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; + +/** + * A {@link SpanFilter} that matches spans with {@link StatusCode#ERROR}, causing all + * batch-colocated spans sharing the same trace ID to be exported. + */ +public final class ErrorSpanFilter implements SpanFilter { + + @Override + public boolean shouldKeep(SpanData spanData) { + return spanData.getStatus().getStatusCode() == StatusCode.ERROR; + } +} diff --git a/processors/src/main/java/io/opentelemetry/contrib/filter/FilteringSpanExporter.java b/processors/src/main/java/io/opentelemetry/contrib/filter/FilteringSpanExporter.java new file mode 100644 index 0000000000..ba5d359221 --- /dev/null +++ b/processors/src/main/java/io/opentelemetry/contrib/filter/FilteringSpanExporter.java @@ -0,0 +1,174 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.annotation.Nullable; + +/** + * A {@link SpanExporter} wrapper that filters spans before delegating to the underlying exporter. + * Filtering operates at the trace level within each export batch: if any filter matches, all spans + * sharing that trace ID in that batch are exported together. + * + *

Two types of filters are supported: + * + *

+ * + *

A trace's spans are kept if any {@code SpanFilter} matches any span in the batch, OR any + * {@code TraceFilter} matches the trace's span group within the batch. + * + *

Important: Filtering decisions are scoped to a single {@link + * #export(Collection)} call. Spans from the same trace that arrive in different batches are + * evaluated independently, so a trace split across batches may be partially exported. + */ +public final class FilteringSpanExporter implements SpanExporter { + + private static final AttributeKey REASON_KEY = AttributeKey.stringKey("reason"); + + private final SpanExporter delegate; + private final List spanFilters; + private final List traceFilters; + @Nullable private final LongCounter droppedSpansCounter; + + /** + * Creates a new {@code FilteringSpanExporter}. + * + * @param delegate the exporter to delegate to for spans that pass filtering + * @param spanFilters per-span filters; a trace's spans in the batch are kept if any filter + * matches + * @param traceFilters batch-level filters; a trace's spans in the batch are kept if any filter + * matches + * @param meter optional {@link Meter} for emitting dropped-span metrics; pass {@code null} to + * disable metrics + */ + public FilteringSpanExporter( + SpanExporter delegate, + List spanFilters, + List traceFilters, + @Nullable Meter meter) { + this.delegate = Objects.requireNonNull(delegate, "delegate"); + Objects.requireNonNull(spanFilters, "spanFilters"); + Objects.requireNonNull(traceFilters, "traceFilters"); + this.spanFilters = Collections.unmodifiableList(new ArrayList<>(spanFilters)); + this.traceFilters = Collections.unmodifiableList(new ArrayList<>(traceFilters)); + if (meter != null) { + this.droppedSpansCounter = + meter + .counterBuilder("otel.contrib.processor.span.filtered") + .setDescription("Number of spans dropped by the filtering span exporter") + .setUnit("{span}") + .build(); + } else { + this.droppedSpansCounter = null; + } + } + + /** + * Creates a new {@code FilteringSpanExporter} without metrics. + * + * @param delegate the exporter to delegate to for spans that pass filtering + * @param spanFilters per-span filters + * @param traceFilters batch-level filters + */ + public FilteringSpanExporter( + SpanExporter delegate, List spanFilters, List traceFilters) { + this(delegate, spanFilters, traceFilters, null); + } + + @Override + public CompletableResultCode export(Collection spans) { + // Group spans by trace ID and evaluate span-level filters in a single pass + Set interestingTraceIds = new HashSet<>(); + Map> spansByTrace = new HashMap<>(); + + for (SpanData span : spans) { + String traceId = span.getSpanContext().getTraceId(); + + List traceSpans = spansByTrace.get(traceId); + if (traceSpans == null) { + traceSpans = new ArrayList<>(); + spansByTrace.put(traceId, traceSpans); + } + traceSpans.add(span); + + // Check span-level filters + if (!interestingTraceIds.contains(traceId)) { + for (SpanFilter filter : spanFilters) { + if (filter.shouldKeep(span)) { + interestingTraceIds.add(traceId); + break; + } + } + } + } + + // Evaluate trace-level filters + if (!traceFilters.isEmpty()) { + for (Map.Entry> entry : spansByTrace.entrySet()) { + String traceId = entry.getKey(); + if (!interestingTraceIds.contains(traceId)) { + for (TraceFilter filter : traceFilters) { + if (filter.shouldKeep(traceId, entry.getValue())) { + interestingTraceIds.add(traceId); + break; + } + } + } + } + } + + // Collect filtered spans + List filtered = new ArrayList<>(); + long droppedCount = 0; + + for (SpanData span : spans) { + if (interestingTraceIds.contains(span.getSpanContext().getTraceId())) { + filtered.add(span); + } else { + droppedCount++; + } + } + + if (droppedSpansCounter != null && droppedCount > 0) { + droppedSpansCounter.add(droppedCount, Attributes.of(REASON_KEY, "not_interesting")); + } + + if (filtered.isEmpty()) { + return CompletableResultCode.ofSuccess(); + } + + return delegate.export(filtered); + } + + @Override + public CompletableResultCode flush() { + return delegate.flush(); + } + + @Override + public CompletableResultCode shutdown() { + return delegate.shutdown(); + } +} diff --git a/processors/src/main/java/io/opentelemetry/contrib/filter/SpanFilter.java b/processors/src/main/java/io/opentelemetry/contrib/filter/SpanFilter.java new file mode 100644 index 0000000000..7268f94005 --- /dev/null +++ b/processors/src/main/java/io/opentelemetry/contrib/filter/SpanFilter.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import io.opentelemetry.sdk.trace.data.SpanData; + +/** + * A filter that evaluates individual spans to determine if their containing trace's spans (within + * the same export batch) should be exported. Used by {@link FilteringSpanExporter} to make per-span + * keep/drop decisions. + * + *

If any {@code SpanFilter} returns {@code true} for any span in a batch, all spans sharing that + * trace ID within the same batch are exported. + */ +public interface SpanFilter { + + /** + * Evaluates whether the given span is interesting enough to keep its trace's spans in the batch. + * + * @param spanData the span to evaluate + * @return {@code true} if this span should cause its trace's batch-colocated spans to be exported + */ + boolean shouldKeep(SpanData spanData); +} diff --git a/processors/src/main/java/io/opentelemetry/contrib/filter/TraceDurationFilter.java b/processors/src/main/java/io/opentelemetry/contrib/filter/TraceDurationFilter.java new file mode 100644 index 0000000000..b2ccb7c8ed --- /dev/null +++ b/processors/src/main/java/io/opentelemetry/contrib/filter/TraceDurationFilter.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import io.opentelemetry.sdk.trace.data.SpanData; +import java.time.Duration; +import java.util.Collection; + +/** + * A {@link TraceFilter} that matches when the wall-clock duration (max end - min start across all + * spans in the batch sharing a trace ID) exceeds a configurable threshold. + */ +public final class TraceDurationFilter implements TraceFilter { + + private final long thresholdNanos; + + /** + * Creates a new {@code TraceDurationFilter}. + * + * @param threshold the trace duration threshold; traces with wall-clock duration strictly greater + * than this are considered interesting + */ + public TraceDurationFilter(Duration threshold) { + if (threshold.isNegative()) { + throw new IllegalArgumentException("threshold must be non-negative, got: " + threshold); + } + this.thresholdNanos = threshold.toNanos(); + } + + @Override + public boolean shouldKeep(String traceId, Collection spans) { + if (spans.isEmpty()) { + return false; + } + long minStart = Long.MAX_VALUE; + long maxEnd = Long.MIN_VALUE; + for (SpanData span : spans) { + if (span.getStartEpochNanos() < minStart) { + minStart = span.getStartEpochNanos(); + } + if (span.getEndEpochNanos() > maxEnd) { + maxEnd = span.getEndEpochNanos(); + } + } + return (maxEnd - minStart) > thresholdNanos; + } +} diff --git a/processors/src/main/java/io/opentelemetry/contrib/filter/TraceFilter.java b/processors/src/main/java/io/opentelemetry/contrib/filter/TraceFilter.java new file mode 100644 index 0000000000..55dbad6be0 --- /dev/null +++ b/processors/src/main/java/io/opentelemetry/contrib/filter/TraceFilter.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.Collection; + +/** + * A filter that evaluates all spans belonging to a single trace within an export batch to determine + * if those spans should be exported. Used by {@link FilteringSpanExporter} for decisions that + * require batch-level context (e.g., overall trace wall-clock duration). + * + *

If any {@code TraceFilter} returns {@code true} for a trace, all spans sharing that trace ID + * within the same batch are exported. + */ +public interface TraceFilter { + + /** + * Evaluates whether the given trace (represented as all spans sharing a trace ID within the + * current batch) should be exported. + * + * @param traceId the trace ID + * @param spans all spans in the current batch belonging to this trace + * @return {@code true} if this trace should be exported + */ + boolean shouldKeep(String traceId, Collection spans); +} diff --git a/processors/src/test/java/io/opentelemetry/contrib/filter/DurationSpanFilterTest.java b/processors/src/test/java/io/opentelemetry/contrib/filter/DurationSpanFilterTest.java new file mode 100644 index 0000000000..b9e13716d4 --- /dev/null +++ b/processors/src/test/java/io/opentelemetry/contrib/filter/DurationSpanFilterTest.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentelemetry.sdk.trace.data.SpanData; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +class DurationSpanFilterTest { + + private static final Duration THRESHOLD = Duration.ofSeconds(2); + private static final long THRESHOLD_NANOS = THRESHOLD.toNanos(); + + private final DurationSpanFilter filter = new DurationSpanFilter(THRESHOLD); + + @Test + void spanOverThresholdIsKept() { + SpanData span = spanWithDurationNanos(THRESHOLD_NANOS + 1); + assertThat(filter.shouldKeep(span)).isTrue(); + } + + @Test + void spanExactlyAtThresholdIsDropped() { + SpanData span = spanWithDurationNanos(THRESHOLD_NANOS); + assertThat(filter.shouldKeep(span)).isFalse(); + } + + @Test + void spanUnderThresholdIsDropped() { + SpanData span = spanWithDurationNanos(TimeUnit.MILLISECONDS.toNanos(500)); + assertThat(filter.shouldKeep(span)).isFalse(); + } + + @Test + void negativeThresholdThrows() { + assertThatThrownBy(() -> new DurationSpanFilter(Duration.ofMillis(-1))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("threshold must be non-negative"); + } + + private static SpanData spanWithDurationNanos(long durationNanos) { + SpanData span = mock(SpanData.class); + long startNanos = TimeUnit.MILLISECONDS.toNanos(1_000_000_000L); + when(span.getStartEpochNanos()).thenReturn(startNanos); + when(span.getEndEpochNanos()).thenReturn(startNanos + durationNanos); + return span; + } +} diff --git a/processors/src/test/java/io/opentelemetry/contrib/filter/ErrorSpanFilterTest.java b/processors/src/test/java/io/opentelemetry/contrib/filter/ErrorSpanFilterTest.java new file mode 100644 index 0000000000..c7b8864a91 --- /dev/null +++ b/processors/src/test/java/io/opentelemetry/contrib/filter/ErrorSpanFilterTest.java @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import org.junit.jupiter.api.Test; + +class ErrorSpanFilterTest { + + private final ErrorSpanFilter filter = new ErrorSpanFilter(); + + @Test + void errorSpanIsKept() { + SpanData span = spanWithStatus(StatusCode.ERROR); + assertThat(filter.shouldKeep(span)).isTrue(); + } + + @Test + void okSpanIsDropped() { + SpanData span = spanWithStatus(StatusCode.OK); + assertThat(filter.shouldKeep(span)).isFalse(); + } + + @Test + void unsetSpanIsDropped() { + SpanData span = spanWithStatus(StatusCode.UNSET); + assertThat(filter.shouldKeep(span)).isFalse(); + } + + private static SpanData spanWithStatus(StatusCode statusCode) { + SpanData span = mock(SpanData.class); + StatusData status = mock(StatusData.class); + when(status.getStatusCode()).thenReturn(statusCode); + when(span.getStatus()).thenReturn(status); + return span; + } +} diff --git a/processors/src/test/java/io/opentelemetry/contrib/filter/FilteringSpanExporterTest.java b/processors/src/test/java/io/opentelemetry/contrib/filter/FilteringSpanExporterTest.java new file mode 100644 index 0000000000..6b104bcd50 --- /dev/null +++ b/processors/src/test/java/io/opentelemetry/contrib/filter/FilteringSpanExporterTest.java @@ -0,0 +1,296 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +class FilteringSpanExporterTest { + + private static final Duration SPAN_THRESHOLD = Duration.ofSeconds(2); + private static final Duration TRACE_THRESHOLD = Duration.ofSeconds(10); + + private SpanExporter delegate; + private FilteringSpanExporter exporter; + + @BeforeEach + void setUp() { + delegate = mock(SpanExporter.class); + when(delegate.export(anyCollection())).thenReturn(CompletableResultCode.ofSuccess()); + exporter = + new FilteringSpanExporter( + delegate, + Arrays.asList(new ErrorSpanFilter(), new DurationSpanFilter(SPAN_THRESHOLD)), + Collections.singletonList(new TraceDurationFilter(TRACE_THRESHOLD))); + } + + // --- Trace grouping: if one span is interesting, all siblings are kept --- + + @Test + void uninterestingSpansAreDropped() { + SpanData fastSpan = createSpan("trace-1", "span-1", StatusCode.OK, 0, 500); + + CompletableResultCode result = exporter.export(Collections.singletonList(fastSpan)); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, never()).export(anyCollection()); + } + + @Test + void traceGrouping_interestingSpanKeepsSiblings() { + String traceId = "trace-1"; + SpanData slowSpan = createSpan(traceId, "span-1", StatusCode.OK, 0, 3000); + SpanData fastSibling = createSpan(traceId, "span-2", StatusCode.OK, 0, 100); + + exporter.export(Arrays.asList(slowSpan, fastSibling)); + + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(Collection.class); + verify(delegate).export(captor.capture()); + assertThat(captor.getValue()).containsExactlyInAnyOrder(slowSpan, fastSibling); + } + + @Test + void mixedTraces_onlyInterestingTracesKept() { + SpanData slowSpan = createSpan("trace-slow", "span-1", StatusCode.OK, 0, 3000); + SpanData slowSibling = createSpan("trace-slow", "span-2", StatusCode.OK, 0, 200); + SpanData fastSpan = createSpan("trace-fast", "span-3", StatusCode.OK, 0, 500); + + exporter.export(Arrays.asList(slowSpan, slowSibling, fastSpan)); + + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(Collection.class); + verify(delegate).export(captor.capture()); + assertThat(captor.getValue()).containsExactlyInAnyOrder(slowSpan, slowSibling); + } + + @Test + void traceKeptByTraceDurationFilter() { + SpanData a1 = createSpan("trace-a", "span-1", StatusCode.OK, 0, 200); + SpanData a2 = createSpan("trace-a", "span-2", StatusCode.OK, 14800, 200); + SpanData b1 = createSpan("trace-b", "span-3", StatusCode.OK, 0, 300); + + exporter.export(Arrays.asList(a1, a2, b1)); + + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(Collection.class); + verify(delegate).export(captor.capture()); + assertThat(captor.getValue()).containsExactlyInAnyOrder(a1, a2); + } + + // --- Composability: different filter combinations --- + + @Test + void onlySpanFilters_noTraceFilters() { + FilteringSpanExporter spanOnly = + new FilteringSpanExporter( + delegate, + Collections.singletonList(new ErrorSpanFilter()), + Collections.emptyList()); + + SpanData errorSpan = createSpan("trace-1", "span-1", StatusCode.ERROR, 0, 100); + SpanData fastSpan = createSpan("trace-2", "span-2", StatusCode.OK, 0, 100); + + spanOnly.export(Arrays.asList(errorSpan, fastSpan)); + + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(Collection.class); + verify(delegate).export(captor.capture()); + assertThat(captor.getValue()).containsExactly(errorSpan); + } + + @Test + void onlyTraceFilters_noSpanFilters() { + FilteringSpanExporter traceOnly = + new FilteringSpanExporter( + delegate, + Collections.emptyList(), + Collections.singletonList(new TraceDurationFilter(TRACE_THRESHOLD))); + + SpanData earlySpan = createSpan("trace-1", "span-1", StatusCode.OK, 0, 500); + SpanData lateSpan = createSpan("trace-1", "span-2", StatusCode.OK, 11500, 500); + SpanData errorSpan = createSpan("trace-2", "span-3", StatusCode.ERROR, 0, 100); + + traceOnly.export(Arrays.asList(earlySpan, lateSpan, errorSpan)); + + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(Collection.class); + verify(delegate).export(captor.capture()); + assertThat(captor.getValue()).containsExactlyInAnyOrder(earlySpan, lateSpan); + } + + @Test + void customSpanFilter() { + SpanFilter nameFilter = span -> span.getName().contains("important"); + FilteringSpanExporter custom = + new FilteringSpanExporter( + delegate, Collections.singletonList(nameFilter), Collections.emptyList()); + + SpanData important = createNamedSpan("trace-1", "important-operation", StatusCode.OK, 0, 100); + SpanData boring = createNamedSpan("trace-2", "boring-operation", StatusCode.OK, 0, 100); + + custom.export(Arrays.asList(important, boring)); + + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(Collection.class); + verify(delegate).export(captor.capture()); + assertThat(captor.getValue()).containsExactly(important); + } + + // --- Metrics --- + + @Test + void droppedSpanMetricIsEmittedWhenMeterProvided() { + Meter meter = mock(Meter.class); + LongCounterBuilder counterBuilder = mock(LongCounterBuilder.class); + LongCounter counter = mock(LongCounter.class); + when(meter.counterBuilder("otel.contrib.processor.span.filtered")).thenReturn(counterBuilder); + when(counterBuilder.setDescription("Number of spans dropped by the filtering span exporter")) + .thenReturn(counterBuilder); + when(counterBuilder.setUnit("{span}")).thenReturn(counterBuilder); + when(counterBuilder.build()).thenReturn(counter); + + FilteringSpanExporter exporterWithMetrics = + new FilteringSpanExporter( + delegate, + Collections.singletonList(new DurationSpanFilter(SPAN_THRESHOLD)), + Collections.emptyList(), + meter); + + SpanData fastSpan = createSpan("trace-1", "span-1", StatusCode.OK, 0, 500); + exporterWithMetrics.export(Collections.singletonList(fastSpan)); + + verify(counter).add(eq(1L), any()); + } + + @Test + void noMetricEmittedWhenNoSpansDropped() { + Meter meter = mock(Meter.class); + LongCounterBuilder counterBuilder = mock(LongCounterBuilder.class); + LongCounter counter = mock(LongCounter.class); + when(meter.counterBuilder("otel.contrib.processor.span.filtered")).thenReturn(counterBuilder); + when(counterBuilder.setDescription("Number of spans dropped by the filtering span exporter")) + .thenReturn(counterBuilder); + when(counterBuilder.setUnit("{span}")).thenReturn(counterBuilder); + when(counterBuilder.build()).thenReturn(counter); + + FilteringSpanExporter exporterWithMetrics = + new FilteringSpanExporter( + delegate, + Collections.singletonList(new DurationSpanFilter(SPAN_THRESHOLD)), + Collections.emptyList(), + meter); + + SpanData slowSpan = createSpan("trace-1", "span-1", StatusCode.OK, 0, 3000); + exporterWithMetrics.export(Collections.singletonList(slowSpan)); + + verify(counter, never()).add(anyLong(), any()); + } + + // --- Delegation --- + + @Test + void flushDelegatesToWrapped() { + when(delegate.flush()).thenReturn(CompletableResultCode.ofSuccess()); + + CompletableResultCode result = exporter.flush(); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate).flush(); + } + + @Test + void shutdownDelegatesToWrapped() { + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + + CompletableResultCode result = exporter.shutdown(); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate).shutdown(); + } + + @Test + void emptyBatchReturnsSuccess() { + CompletableResultCode result = exporter.export(Collections.emptyList()); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, never()).export(anyCollection()); + } + + // --- Helpers --- + + private static SpanData createSpan( + String traceId, String spanId, StatusCode statusCode, long startOffsetMs, long durationMs) { + return createSpanNanos( + traceId, + spanId, + statusCode, + TimeUnit.MILLISECONDS.toNanos(startOffsetMs), + TimeUnit.MILLISECONDS.toNanos(durationMs)); + } + + private static SpanData createNamedSpan( + String traceId, String name, StatusCode statusCode, long startOffsetMs, long durationMs) { + SpanData span = + createSpanNanos( + traceId, + "span-id", + statusCode, + TimeUnit.MILLISECONDS.toNanos(startOffsetMs), + TimeUnit.MILLISECONDS.toNanos(durationMs)); + when(span.getName()).thenReturn(name); + return span; + } + + private static SpanData createSpanNanos( + String traceId, + String spanId, + StatusCode statusCode, + long startOffsetNanos, + long durationNanos) { + SpanData span = mock(SpanData.class); + SpanContext context = mock(SpanContext.class); + StatusData status = mock(StatusData.class); + + when(context.getTraceId()).thenReturn(traceId); + when(context.getSpanId()).thenReturn(spanId); + when(span.getSpanContext()).thenReturn(context); + when(status.getStatusCode()).thenReturn(statusCode); + when(span.getStatus()).thenReturn(status); + + long baseNanos = TimeUnit.MILLISECONDS.toNanos(1_000_000_000L); + long startNanos = baseNanos + startOffsetNanos; + when(span.getStartEpochNanos()).thenReturn(startNanos); + when(span.getEndEpochNanos()).thenReturn(startNanos + durationNanos); + + return span; + } +} diff --git a/processors/src/test/java/io/opentelemetry/contrib/filter/TraceDurationFilterTest.java b/processors/src/test/java/io/opentelemetry/contrib/filter/TraceDurationFilterTest.java new file mode 100644 index 0000000000..6e56ca4d8a --- /dev/null +++ b/processors/src/test/java/io/opentelemetry/contrib/filter/TraceDurationFilterTest.java @@ -0,0 +1,91 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.filter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentelemetry.sdk.trace.data.SpanData; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +class TraceDurationFilterTest { + + private static final Duration THRESHOLD = Duration.ofSeconds(10); + private static final long THRESHOLD_NANOS = THRESHOLD.toNanos(); + + private final TraceDurationFilter filter = new TraceDurationFilter(THRESHOLD); + + @Test + void traceOverThresholdIsKept() { + SpanData early = spanAt(0, 500); + SpanData late = spanAt(11500, 500); + + assertThat(filter.shouldKeep("trace-1", Arrays.asList(early, late))).isTrue(); + } + + @Test + void traceExactlyAtThresholdIsDropped() { + SpanData span1 = spanAtNanos(0, 100_000_000L); + SpanData span2 = spanAtNanos(THRESHOLD_NANOS - 100_000_000L, 100_000_000L); + + assertThat(filter.shouldKeep("trace-1", Arrays.asList(span1, span2))).isFalse(); + } + + @Test + void traceJustOverThresholdIsKept() { + SpanData span1 = spanAtNanos(0, 100_000_000L); + SpanData span2 = spanAtNanos(THRESHOLD_NANOS - 100_000_000L + 1, 100_000_000L); + + assertThat(filter.shouldKeep("trace-1", Arrays.asList(span1, span2))).isTrue(); + } + + @Test + void traceUnderThresholdIsDropped() { + SpanData span1 = spanAt(0, 500); + SpanData span2 = spanAt(4000, 500); + + assertThat(filter.shouldKeep("trace-1", Arrays.asList(span1, span2))).isFalse(); + } + + @Test + void singleSpanTrace() { + SpanData span = spanAt(0, 500); + + assertThat(filter.shouldKeep("trace-1", Collections.singletonList(span))).isFalse(); + } + + @Test + void emptySpanListReturnsFalse() { + assertThat(filter.shouldKeep("trace-1", Collections.emptyList())).isFalse(); + } + + @Test + void negativeThresholdThrows() { + assertThatThrownBy(() -> new TraceDurationFilter(Duration.ofMillis(-1))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("threshold must be non-negative"); + } + + private static SpanData spanAt(long startOffsetMs, long durationMs) { + return spanAtNanos( + TimeUnit.MILLISECONDS.toNanos(startOffsetMs), TimeUnit.MILLISECONDS.toNanos(durationMs)); + } + + private static SpanData spanAtNanos(long startOffsetNanos, long durationNanos) { + SpanData span = mock(SpanData.class); + long baseNanos = TimeUnit.MILLISECONDS.toNanos(1_000_000_000L); + long startNanos = baseNanos + startOffsetNanos; + when(span.getStartEpochNanos()).thenReturn(startNanos); + when(span.getEndEpochNanos()).thenReturn(startNanos + durationNanos); + return span; + } +}