Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,48 @@ logger_provider:

`FilteringLogRecordProcessor` is a `LogRecordProcessor` that only keep logs based on a predicate

## Filtering Span Exporter

`FilteringSpanExporter` is a `SpanExporter` wrapper that filters spans at the trace level before delegating to the underlying exporter. Filtering is composable via two interfaces:

- `SpanFilter` - evaluates individual spans (e.g., error status, slow duration)
- `TraceFilter` - evaluates all spans belonging to a trace within the batch (e.g., overall trace wall-clock duration)

A trace is kept if any `SpanFilter` matches any of its spans, OR any `TraceFilter` matches the trace's span group. All spans sharing a trace ID are exported together.

Built-in filters:

- `ErrorSpanFilter` - keeps traces containing any span with error status
- `DurationSpanFilter` - keeps traces containing any span exceeding a duration threshold
- `TraceDurationFilter` - keeps traces whose wall-clock duration (max end - min start) exceeds a threshold

Usage:

```java
SpanExporter delegate = OtlpGrpcSpanExporter.getDefault();

// Export traces with errors, individual spans > 2s, or trace duration > 10s
SpanExporter filtering = new FilteringSpanExporter(
delegate,
Arrays.asList(new ErrorSpanFilter(), new DurationSpanFilter(2000)),
Collections.singletonList(new TraceDurationFilter(10000)));

// Custom filters
SpanFilter nameFilter = span -> span.getName().contains("important");
SpanExporter custom = new FilteringSpanExporter(
delegate,
Collections.singletonList(nameFilter),
Collections.emptyList());

// Optionally pass a Meter to emit dropped-span metrics
Meter meter = openTelemetry.getMeter("my-service");
SpanExporter withMetrics = new FilteringSpanExporter(
delegate,
Arrays.asList(new ErrorSpanFilter(), new DurationSpanFilter(2000)),
Collections.singletonList(new TraceDurationFilter(10000)),
meter);
```

## Component owners

- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.concurrent.TimeUnit;

/**
* A {@link SpanFilter} that keeps traces containing any span whose duration exceeds a configurable
* threshold.
*/
public final class DurationSpanFilter implements SpanFilter {

private final long thresholdNanos;

/**
* Creates a new {@code DurationSpanFilter}.
*
* @param thresholdMs the duration threshold in milliseconds; spans with duration strictly greater
* than this are considered interesting
*/
public DurationSpanFilter(long thresholdMs) {
this.thresholdNanos = TimeUnit.MILLISECONDS.toNanos(thresholdMs);
Comment thread
jaydeluca marked this conversation as resolved.
Outdated
}

@Override
public boolean shouldKeep(SpanData spanData) {
long durationNanos = spanData.getEndEpochNanos() - spanData.getStartEpochNanos();
return durationNanos > thresholdNanos;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;

/** A {@link SpanFilter} that keeps traces containing any span with {@link StatusCode#ERROR}. */
public final class ErrorSpanFilter implements SpanFilter {

@Override
public boolean shouldKeep(SpanData spanData) {
return spanData.getStatus().getStatusCode() == StatusCode.ERROR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;

/**
* A {@link SpanExporter} wrapper that filters spans before delegating to the underlying exporter.
* Filtering operates at the trace level: if any filter matches, all spans sharing that trace ID are
* exported together.
*
* <p>Two types of filters are supported:
*
* <ul>
* <li>{@link SpanFilter} - evaluates individual spans (e.g., error status, slow duration)
* <li>{@link TraceFilter} - evaluates all spans belonging to a trace within the batch (e.g.,
* overall trace wall-clock duration)
* </ul>
*
* <p>A trace is kept if any {@code SpanFilter} matches any of its spans, OR any {@code TraceFilter}
* matches the trace's span group.
*/
public final class FilteringSpanExporter implements SpanExporter {

private static final AttributeKey<String> REASON_KEY = AttributeKey.stringKey("reason");

private final SpanExporter delegate;
private final List<SpanFilter> spanFilters;
private final List<TraceFilter> traceFilters;
@Nullable private final LongCounter droppedSpansCounter;

/**
* Creates a new {@code FilteringSpanExporter}.
*
* @param delegate the exporter to delegate to for spans that pass filtering
* @param spanFilters per-span filters; a trace is kept if any filter matches any span
* @param traceFilters batch-level filters; a trace is kept if any filter matches the trace group
* @param meter optional {@link Meter} for emitting dropped-span metrics; pass {@code null} to
* disable metrics
*/
public FilteringSpanExporter(
SpanExporter delegate,
List<SpanFilter> spanFilters,
List<TraceFilter> traceFilters,
@Nullable Meter meter) {
this.delegate = delegate;
this.spanFilters = Collections.unmodifiableList(new ArrayList<>(spanFilters));
this.traceFilters = Collections.unmodifiableList(new ArrayList<>(traceFilters));
Comment thread
jaydeluca marked this conversation as resolved.
Outdated
if (meter != null) {
this.droppedSpansCounter =
meter
.counterBuilder("filtering.span.exporter.dropped")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the sdk they have metrics that look like otel.sdk.exporter.span.exported and otel.sdk.processor.span.processed, so perhaps we should use something closer to that pattern like otel.contrib.exporter.span.filtered or otel.contrib.processor.span.dropped? Or maybe having contrib doesn't make sense in this context

@trask wdyt?

Suggested change
.counterBuilder("filtering.span.exporter.dropped")
.counterBuilder("otel.contrib.processor.span.filtered")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used otel.contrib.processor.span.filtered — aligns well with the SDK naming conventions. Happy to adjust if @trask has a different preference.

.setDescription("Number of spans dropped by the filtering span exporter")
Comment thread
jaydeluca marked this conversation as resolved.
.build();
} else {
this.droppedSpansCounter = null;
}
}

/**
* Creates a new {@code FilteringSpanExporter} without metrics.
*
* @param delegate the exporter to delegate to for spans that pass filtering
* @param spanFilters per-span filters
* @param traceFilters batch-level filters
*/
public FilteringSpanExporter(
SpanExporter delegate, List<SpanFilter> spanFilters, List<TraceFilter> traceFilters) {
this(delegate, spanFilters, traceFilters, null);
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
// Group spans by trace ID and evaluate span-level filters in a single pass
Set<String> interestingTraceIds = new HashSet<>();
Map<String, List<SpanData>> spansByTrace = new HashMap<>();

for (SpanData span : spans) {
String traceId = span.getSpanContext().getTraceId();

List<SpanData> traceSpans = spansByTrace.get(traceId);
if (traceSpans == null) {
traceSpans = new ArrayList<>();
spansByTrace.put(traceId, traceSpans);
}
traceSpans.add(span);

// Check span-level filters
if (!interestingTraceIds.contains(traceId)) {
for (SpanFilter filter : spanFilters) {
if (filter.shouldKeep(span)) {
interestingTraceIds.add(traceId);
break;
}
}
}
}

// Evaluate trace-level filters
if (!traceFilters.isEmpty()) {
for (Map.Entry<String, List<SpanData>> entry : spansByTrace.entrySet()) {
String traceId = entry.getKey();
if (!interestingTraceIds.contains(traceId)) {
for (TraceFilter filter : traceFilters) {
if (filter.shouldKeep(traceId, entry.getValue())) {
interestingTraceIds.add(traceId);
break;
}
}
}
}
}

// Collect filtered spans
List<SpanData> filtered = new ArrayList<>();
long droppedCount = 0;

for (SpanData span : spans) {
if (interestingTraceIds.contains(span.getSpanContext().getTraceId())) {
filtered.add(span);
} else {
droppedCount++;
}
}

if (droppedSpansCounter != null && droppedCount > 0) {
droppedSpansCounter.add(droppedCount, Attributes.of(REASON_KEY, "not_interesting"));
}

if (filtered.isEmpty()) {
return CompletableResultCode.ofSuccess();
}

return delegate.export(filtered);
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.sdk.trace.data.SpanData;

/**
* A filter that evaluates individual spans to determine if their containing trace should be
* exported. Used by {@link FilteringSpanExporter} to make per-span keep/drop decisions.
*
* <p>If any {@code SpanFilter} returns {@code true} for any span in a trace, all spans sharing that
* trace ID are exported.
*/
public interface SpanFilter {

/**
* Evaluates whether the given span is interesting enough to keep its entire trace.
*
* @param spanData the span to evaluate
* @return {@code true} if this span should cause its trace to be exported
*/
boolean shouldKeep(SpanData spanData);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.concurrent.TimeUnit;

/**
* A {@link TraceFilter} that keeps traces whose overall wall-clock duration (max end - min start
* across all spans in the batch sharing a trace ID) exceeds a configurable threshold.
*/
public final class TraceDurationFilter implements TraceFilter {

private final long thresholdNanos;

/**
* Creates a new {@code TraceDurationFilter}.
*
* @param thresholdMs the trace duration threshold in milliseconds; traces with wall-clock
* duration strictly greater than this are considered interesting
*/
public TraceDurationFilter(long thresholdMs) {
this.thresholdNanos = TimeUnit.MILLISECONDS.toNanos(thresholdMs);
Comment thread
jaydeluca marked this conversation as resolved.
Outdated
}

@Override
public boolean shouldKeep(String traceId, Collection<SpanData> spans) {
Comment thread
jaydeluca marked this conversation as resolved.
long minStart = Long.MAX_VALUE;
long maxEnd = Long.MIN_VALUE;
for (SpanData span : spans) {
if (span.getStartEpochNanos() < minStart) {
minStart = span.getStartEpochNanos();
}
if (span.getEndEpochNanos() > maxEnd) {
maxEnd = span.getEndEpochNanos();
}
}
return (maxEnd - minStart) > thresholdNanos;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;

/**
* A filter that evaluates all spans belonging to a single trace within an export batch to determine
* if the trace should be exported. Used by {@link FilteringSpanExporter} for decisions that require
* batch-level context (e.g., overall trace wall-clock duration).
*
* <p>If any {@code TraceFilter} returns {@code true} for a trace, all spans sharing that trace ID
* are exported.
*/
public interface TraceFilter {

/**
* Evaluates whether the given trace (represented as all spans sharing a trace ID within the
* current batch) should be exported.
*
* @param traceId the trace ID
* @param spans all spans in the current batch belonging to this trace
* @return {@code true} if this trace should be exported
*/
boolean shouldKeep(String traceId, Collection<SpanData> spans);
}
Loading
Loading