Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
107bb47
feat: allow to add headers to metadata for OpenTelemetry metrics and …
ozanichkovsky Mar 17, 2026
20dd177
chore: code formatting
ozanichkovsky Mar 17, 2026
b92608b
Merge remote-tracking branch 'origin/master' into feature/opentelemet…
ozanichkovsky Mar 17, 2026
75772df
chore: refactored the implementation and tests
ozanichkovsky Mar 18, 2026
6aff9c7
Merge remote-tracking branch 'origin/master' into feature/opentelemet…
ozanichkovsky Mar 18, 2026
695b093
chore: added changelog
ozanichkovsky Mar 18, 2026
7877315
chore: fixed formatting
ozanichkovsky Mar 18, 2026
c564228
Merge remote-tracking branch 'origin/master' into feature/opentelemet…
ozanichkovsky Mar 18, 2026
51dc5a4
chore: updated docs
ozanichkovsky Mar 18, 2026
e74e038
Merge branch 'master' into feature/opentelemetry-metrics-traces-headers
pront Mar 23, 2026
274c25f
make generate-docs
pront Mar 23, 2026
95208a9
feat: pass log_namespace when enriching metrics with header for use_o…
ozanichkovsky Mar 23, 2026
e146286
chore: removed log_namespace comment related to metrics and traces
ozanichkovsky Mar 23, 2026
314498c
chore: fixed review comments
ozanichkovsky Mar 24, 2026
a016821
update comment and regen docs
pront Mar 25, 2026
2ce73a6
chore: revert log_namespace threading in build_warp_trace_filter
pront Mar 25, 2026
e6b5cc0
Merge branch 'master' into feature/opentelemetry-metrics-traces-headers
pront Mar 25, 2026
23babc6
chore: revert log_namespace threading in build_warp_trace_filter
pront Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
`opentelemetry` source: Implemented header enrichment for OTLP metrics and traces. Unlike logs, which support enriching
the event itself or its metadata, depending on `log_namespace` settings, for metrics and traces this setting is ignored
and header values are added to the event metadata.

Issue: https://github.com/vectordotdev/vector/issues/24619

authors: ozanichkovsky
7 changes: 4 additions & 3 deletions src/sources/opentelemetry/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,14 @@ pub struct HttpConfig {
#[serde(default)]
pub keepalive: KeepaliveConfig,

/// A list of HTTP headers to include in the log event.
/// A list of HTTP headers to include in the event.
///
/// Accepts the wildcard (`*`) character for headers matching a specified pattern.
///
/// Specifying "*" results in all headers included in the log event.
/// Specifying "*" results in all headers included in the event.
///
/// These headers are not included in the JSON payload if a field with a conflicting name exists.
/// For log events in legacy namespace mode, headers are not included if a field with a conflicting name exists.
/// For metrics and traces, headers are always added to event metadata.
#[serde(default)]
#[configurable(metadata(docs::examples = "User-Agent"))]
#[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
Expand Down
25 changes: 17 additions & 8 deletions src/sources/opentelemetry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,19 @@ pub(crate) fn build_warp_filter(
);
let metrics_filters = build_warp_metrics_filter(
acknowledgements,
log_namespace,
out.clone(),
bytes_received.clone(),
events_received.clone(),
headers.clone(),
metrics_deserializer,
);
let trace_filters = build_warp_trace_filter(
acknowledgements,
out.clone(),
bytes_received,
events_received,
headers.clone(),
traces_deserializer,
);
log_filters
Expand Down Expand Up @@ -257,12 +260,14 @@ fn build_warp_log_filter(
}
fn build_warp_metrics_filter(
acknowledgements: bool,
log_namespace: LogNamespace,
source_sender: SourceSender,
bytes_received: Registered<BytesReceived>,
events_received: Registered<EventsReceived>,
headers_cfg: Vec<HttpConfigParamKind>,
deserializer: Option<OtlpDeserializer>,
) -> BoxedFilter<(Response,)> {
let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
decompress_body(encoding_header.as_deref(), body)
.inspect_err(|err| {
// Other status codes are already handled by `sources::util::decompress_body` (tech debt).
Expand All @@ -276,15 +281,14 @@ fn build_warp_metrics_filter(
.and_then(|decoded_body| {
bytes_received.emit(ByteSize(decoded_body.len()));
if let Some(d) = deserializer.as_ref() {
parse_with_deserializer(
d,
decoded_body,
LogNamespace::default(),
&events_received,
)
parse_with_deserializer(d, decoded_body, log_namespace, &events_received)
} else {
decode_metrics_body(decoded_body, &events_received)
}
.map(|mut events| {
enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
events
})
})
};

Expand All @@ -301,9 +305,10 @@ fn build_warp_trace_filter(
source_sender: SourceSender,
bytes_received: Registered<BytesReceived>,
events_received: Registered<EventsReceived>,
headers_cfg: Vec<HttpConfigParamKind>,
deserializer: Option<OtlpDeserializer>,
) -> BoxedFilter<(Response,)> {
let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
decompress_body(encoding_header.as_deref(), body)
.inspect_err(|err| {
// Other status codes are already handled by `sources::util::decompress_body` (tech debt).
Expand All @@ -326,6 +331,10 @@ fn build_warp_trace_filter(
} else {
decode_trace_body(decoded_body, &events_received)
}
.map(|mut events| {
enrich_events(&mut events, &headers_cfg, &headers, LogNamespace::default());
events
})
})
};

Expand Down
Loading
Loading