From 2824e774b119214f7b261a192a74856881f92608 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 13 Apr 2026 13:07:27 -0400 Subject: [PATCH 01/10] feat(telemetry): align transport spans with semconv Signed-off-by: Yordis Prieto --- rsworkspace/Cargo.lock | 10 + rsworkspace/Cargo.toml | 1 + rsworkspace/crates/acp-nats-ws/src/main.rs | 23 +- rsworkspace/crates/acp-telemetry/Cargo.toml | 2 + rsworkspace/crates/acp-telemetry/src/http.rs | 203 ++++++++++++++++++ rsworkspace/crates/acp-telemetry/src/lib.rs | 1 + rsworkspace/crates/trogon-gateway/Cargo.toml | 1 + rsworkspace/crates/trogon-gateway/src/main.rs | 27 ++- rsworkspace/crates/trogon-nats/Cargo.toml | 1 + rsworkspace/crates/trogon-nats/src/lib.rs | 1 + .../crates/trogon-nats/src/messaging.rs | 60 ++++-- .../trogon-nats/src/telemetry/messaging.rs | 138 ++++++++++++ .../crates/trogon-nats/src/telemetry/mod.rs | 1 + 13 files changed, 455 insertions(+), 14 deletions(-) create mode 100644 rsworkspace/crates/acp-telemetry/src/http.rs create mode 100644 rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs create mode 100644 rsworkspace/crates/trogon-nats/src/telemetry/mod.rs diff --git a/rsworkspace/Cargo.lock b/rsworkspace/Cargo.lock index 58fa043fd..822a10140 100644 --- a/rsworkspace/Cargo.lock +++ b/rsworkspace/Cargo.lock @@ -90,9 +90,11 @@ dependencies = [ name = "acp-telemetry" version = "0.1.0" dependencies = [ + "axum", "opentelemetry", "opentelemetry-appender-tracing", "opentelemetry-otlp", + "opentelemetry-semantic-conventions", "opentelemetry_sdk", "tokio", "tracing", @@ -1848,6 +1850,12 @@ dependencies = [ "tonic-prost", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + [[package]] name = "opentelemetry_sdk" version = "0.31.0" @@ -3455,6 +3463,7 @@ dependencies = [ "serde", "tempfile", "tokio", + "tower-http", "tracing", "trogon-nats", "trogon-service-config", @@ -3477,6 +3486,7 @@ dependencies = [ "bytes", "futures", "opentelemetry", + "opentelemetry-semantic-conventions", "serde", "serde_json", "tokio", diff --git a/rsworkspace/Cargo.toml b/rsworkspace/Cargo.toml index d2b1b14d7..08c5f0507 100644 --- a/rsworkspace/Cargo.toml +++ b/rsworkspace/Cargo.toml @@ -52,6 +52,7 @@ clap = { version = "=4.6.0", features = ["derive"] } opentelemetry = "=0.31.0" opentelemetry-appender-tracing = "=0.31.1" opentelemetry-otlp = "=0.31.1" +opentelemetry-semantic-conventions = { version = "=0.31.0", features = ["semconv_experimental"] } opentelemetry_sdk = "=0.31.0" tracing = "=0.1.44" tracing-opentelemetry = "=0.32.1" diff --git a/rsworkspace/crates/acp-nats-ws/src/main.rs b/rsworkspace/crates/acp-nats-ws/src/main.rs index 888d4cd25..9f386f8dc 100644 --- a/rsworkspace/crates/acp-nats-ws/src/main.rs +++ b/rsworkspace/crates/acp-nats-ws/src/main.rs @@ -49,7 +49,28 @@ async fn main() -> Result<(), Box> { let app = axum::Router::new() .route("/ws", axum::routing::get(upgrade::handle)) - .layer(TraceLayer::new_for_http()) + .layer( + TraceLayer::new_for_http() + .make_span_with(|request: &axum::http::Request| { + let span = tracing::info_span!( + "http.server.request", + method = %request.method(), + path = %request.uri().path() + ); + acp_telemetry::http::set_server_request_span_attributes(&span, request); + span + }) + .on_response( + |response: &axum::http::Response, + _latency: std::time::Duration, + span: &tracing::Span| { + acp_telemetry::http::set_server_response_span_attributes( + span, + response.status(), + ); + }, + ), + ) .with_state(state); let addr = SocketAddr::from((ws_config.host, ws_config.port)); diff --git a/rsworkspace/crates/acp-telemetry/Cargo.toml b/rsworkspace/crates/acp-telemetry/Cargo.toml index aee2789b8..5cefca055 100644 --- a/rsworkspace/crates/acp-telemetry/Cargo.toml +++ b/rsworkspace/crates/acp-telemetry/Cargo.toml @@ -7,9 +7,11 @@ edition = "2024" workspace = true [dependencies] +axum = { workspace = true } opentelemetry = { workspace = true } opentelemetry-appender-tracing = { workspace = true } opentelemetry-otlp = { workspace = true, features = ["http-json", "logs", "metrics", "reqwest-rustls"] } +opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio", "logs", "metrics"] } tokio = { workspace = true, features = ["signal"] } tracing = { workspace = true } diff --git a/rsworkspace/crates/acp-telemetry/src/http.rs b/rsworkspace/crates/acp-telemetry/src/http.rs new file mode 100644 index 000000000..28da3f840 --- /dev/null +++ b/rsworkspace/crates/acp-telemetry/src/http.rs @@ -0,0 +1,203 @@ +use axum::http::{ + Request, StatusCode, Version, + header::{HOST, USER_AGENT}, + uri::Authority, +}; +use opentelemetry::KeyValue; +use opentelemetry_semantic_conventions::trace as semconv; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +pub fn server_request_attributes(request: &Request) -> Vec { + let mut attributes = vec![ + KeyValue::new( + semconv::HTTP_REQUEST_METHOD, + request.method().as_str().to_owned(), + ), + KeyValue::new(semconv::URL_PATH, request.uri().path().to_owned()), + ]; + + if let Some(protocol_version) = protocol_version(request.version()) { + attributes.push(KeyValue::new( + semconv::NETWORK_PROTOCOL_VERSION, + protocol_version, + )); + } + + if let Some(user_agent) = request + .headers() + .get(USER_AGENT) + .and_then(|value| value.to_str().ok()) + { + attributes.push(KeyValue::new( + semconv::USER_AGENT_ORIGINAL, + user_agent.to_owned(), + )); + } + + if let Some(authority) = request + .headers() + .get(HOST) + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.parse::().ok()) + { + attributes.push(KeyValue::new( + semconv::SERVER_ADDRESS, + authority.host().to_owned(), + )); + if let Some(port) = authority.port_u16() { + attributes.push(KeyValue::new(semconv::SERVER_PORT, i64::from(port))); + } + } + + attributes +} + +pub fn set_server_request_span_attributes(span: &Span, request: &Request) { + for attribute in server_request_attributes(request) { + span.set_attribute(attribute.key, attribute.value); + } +} + +pub fn server_response_attributes(status: StatusCode) -> [KeyValue; 1] { + [KeyValue::new( + semconv::HTTP_RESPONSE_STATUS_CODE, + i64::from(status.as_u16()), + )] +} + +pub fn set_server_response_span_attributes(span: &Span, status: StatusCode) { + for attribute in server_response_attributes(status) { + span.set_attribute(attribute.key, attribute.value); + } +} + +fn protocol_version(version: Version) -> Option<&'static str> { + match version { + Version::HTTP_09 => Some("0.9"), + Version::HTTP_10 => Some("1.0"), + Version::HTTP_11 => Some("1.1"), + Version::HTTP_2 => Some("2"), + Version::HTTP_3 => Some("3"), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn value_for<'a>(attributes: &'a [KeyValue], key: &str) -> Option<&'a opentelemetry::Value> { + attributes + .iter() + .find(|attribute| attribute.key.as_str() == key) + .map(|attribute| &attribute.value) + } + + #[test] + fn server_request_attributes_follow_http_semantic_conventions() { + let request = Request::builder() + .method("POST") + .uri("/github/webhook") + .version(Version::HTTP_11) + .header(HOST, "gateway.test:8443") + .header(USER_AGENT, "curl/8.7.1") + .body(()) + .unwrap(); + + let attributes = server_request_attributes(&request); + + assert_eq!( + value_for(&attributes, semconv::HTTP_REQUEST_METHOD) + .unwrap() + .as_str() + .as_ref(), + "POST" + ); + assert_eq!( + value_for(&attributes, semconv::URL_PATH) + .unwrap() + .as_str() + .as_ref(), + "/github/webhook" + ); + assert_eq!( + value_for(&attributes, semconv::NETWORK_PROTOCOL_VERSION) + .unwrap() + .as_str() + .as_ref(), + "1.1" + ); + assert_eq!( + value_for(&attributes, semconv::USER_AGENT_ORIGINAL) + .unwrap() + .as_str() + .as_ref(), + "curl/8.7.1" + ); + assert_eq!( + value_for(&attributes, semconv::SERVER_ADDRESS) + .unwrap() + .as_str() + .as_ref(), + "gateway.test" + ); + assert_eq!( + value_for(&attributes, semconv::SERVER_PORT) + .unwrap() + .as_str() + .as_ref(), + "8443" + ); + } + + #[test] + fn server_request_attributes_skip_optional_headers_when_missing() { + let request = Request::builder() + .method("GET") + .uri("/-/liveness") + .version(Version::HTTP_2) + .body(()) + .unwrap(); + + let attributes = server_request_attributes(&request); + + assert_eq!( + value_for(&attributes, semconv::HTTP_REQUEST_METHOD) + .unwrap() + .as_str() + .as_ref(), + "GET" + ); + assert_eq!( + value_for(&attributes, semconv::URL_PATH) + .unwrap() + .as_str() + .as_ref(), + "/-/liveness" + ); + assert_eq!( + value_for(&attributes, semconv::NETWORK_PROTOCOL_VERSION) + .unwrap() + .as_str() + .as_ref(), + "2" + ); + assert!(value_for(&attributes, semconv::USER_AGENT_ORIGINAL).is_none()); + assert!(value_for(&attributes, semconv::SERVER_ADDRESS).is_none()); + assert!(value_for(&attributes, semconv::SERVER_PORT).is_none()); + } + + #[test] + fn server_response_attributes_follow_http_semantic_conventions() { + let attributes = server_response_attributes(StatusCode::CREATED); + + assert_eq!( + value_for(&attributes, semconv::HTTP_RESPONSE_STATUS_CODE) + .unwrap() + .as_str() + .as_ref(), + "201" + ); + } +} diff --git a/rsworkspace/crates/acp-telemetry/src/lib.rs b/rsworkspace/crates/acp-telemetry/src/lib.rs index 86a98dabd..a50a32de6 100644 --- a/rsworkspace/crates/acp-telemetry/src/lib.rs +++ b/rsworkspace/crates/acp-telemetry/src/lib.rs @@ -1,4 +1,5 @@ pub mod constants; +pub mod http; mod log; mod metric; mod service_name; diff --git a/rsworkspace/crates/trogon-gateway/Cargo.toml b/rsworkspace/crates/trogon-gateway/Cargo.toml index b70dd9b43..b2ca613f1 100644 --- a/rsworkspace/crates/trogon-gateway/Cargo.toml +++ b/rsworkspace/crates/trogon-gateway/Cargo.toml @@ -18,6 +18,7 @@ clap = { workspace = true } confique = { workspace = true } serde = { workspace = true } tokio = { workspace = true, features = ["full"] } +tower-http = { workspace = true } tracing = { workspace = true } trogon-nats = { workspace = true } trogon-service-config = { workspace = true } diff --git a/rsworkspace/crates/trogon-gateway/src/main.rs b/rsworkspace/crates/trogon-gateway/src/main.rs index 67596c01d..431452501 100644 --- a/rsworkspace/crates/trogon-gateway/src/main.rs +++ b/rsworkspace/crates/trogon-gateway/src/main.rs @@ -14,9 +14,13 @@ use std::net::SocketAddr; #[cfg(not(coverage))] use std::time::Duration; +#[cfg(not(coverage))] +use axum::body::Body; #[cfg(not(coverage))] use tokio::task::JoinSet; #[cfg(not(coverage))] +use tower_http::trace::TraceLayer; +#[cfg(not(coverage))] use tracing::{error, info}; #[cfg(not(coverage))] use trogon_nats::connect; @@ -96,7 +100,28 @@ async fn main() -> Result<(), Box> { } } - let app = http::mount_sources(resolved, publisher); + let app = http::mount_sources(resolved, publisher).layer( + TraceLayer::new_for_http() + .make_span_with(|request: &axum::http::Request| { + let span = tracing::info_span!( + "http.server.request", + method = %request.method(), + path = %request.uri().path() + ); + acp_telemetry::http::set_server_request_span_attributes(&span, request); + span + }) + .on_response( + |response: &axum::http::Response, + _latency: Duration, + span: &tracing::Span| { + acp_telemetry::http::set_server_response_span_attributes( + span, + response.status(), + ); + }, + ), + ); let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = tokio::net::TcpListener::bind(addr).await?; diff --git a/rsworkspace/crates/trogon-nats/Cargo.toml b/rsworkspace/crates/trogon-nats/Cargo.toml index 8dfde1553..c1d5dd350 100644 --- a/rsworkspace/crates/trogon-nats/Cargo.toml +++ b/rsworkspace/crates/trogon-nats/Cargo.toml @@ -11,6 +11,7 @@ async-nats = { workspace = true, features = ["ring", "nkeys", "jetstream", "kv", bytes = { workspace = true } futures = { workspace = true } opentelemetry = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["time", "io-util"] } diff --git a/rsworkspace/crates/trogon-nats/src/lib.rs b/rsworkspace/crates/trogon-nats/src/lib.rs index c9151bba8..806ffa7e5 100644 --- a/rsworkspace/crates/trogon-nats/src/lib.rs +++ b/rsworkspace/crates/trogon-nats/src/lib.rs @@ -45,6 +45,7 @@ pub mod lease; pub mod messaging; pub mod nats_token; pub mod subject_token_violation; +pub(crate) mod telemetry; pub(crate) mod token; #[cfg(any(test, feature = "test-support"))] diff --git a/rsworkspace/crates/trogon-nats/src/messaging.rs b/rsworkspace/crates/trogon-nats/src/messaging.rs index fde275d2d..962927c10 100644 --- a/rsworkspace/crates/trogon-nats/src/messaging.rs +++ b/rsworkspace/crates/trogon-nats/src/messaging.rs @@ -1,9 +1,12 @@ use crate::client::{FlushClient, PublishClient, RequestClient}; +use crate::telemetry::messaging::{ + MessagingError, MessagingOperation, set_client_operation_span_attributes, set_span_error, +}; use async_nats::header::HeaderMap; use opentelemetry::propagation::Injector; use serde::{Serialize, de::DeserializeOwned}; use std::time::Duration; -use tracing::Span; +use tracing::{Span, instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::constants::{DEFAULT_TIMEOUT, REQ_ID_HEADER}; @@ -35,6 +38,7 @@ pub fn build_request_headers() -> HeaderMap { headers } +#[instrument(name = "nats.request", skip(client, request), fields(subject = %subject))] pub async fn request_with_timeout( client: &N, subject: &str, @@ -45,7 +49,13 @@ where Req: Serialize, Res: DeserializeOwned, { - let payload = serde_json::to_vec(request).map_err(NatsError::Serialize)?; + let span = Span::current(); + set_client_operation_span_attributes(&span, MessagingOperation::Request, subject); + + let payload = serde_json::to_vec(request).map_err(|error| { + set_span_error(&span, MessagingError::Serialize); + NatsError::Serialize(error) + })?; let headers = build_request_headers(); let response = tokio::time::timeout( @@ -53,20 +63,31 @@ where client.request_with_headers(subject.to_string(), headers, payload.into()), ) .await - .map_err(|_| NatsError::Timeout { - subject: subject.to_string(), + .map_err(|_| { + set_span_error(&span, MessagingError::Timeout); + NatsError::Timeout { + subject: subject.to_string(), + } })? - .map_err(|e| NatsError::Request { - subject: subject.to_string(), - error: e.to_string(), + .map_err(|error| { + set_span_error(&span, MessagingError::Request); + NatsError::Request { + subject: subject.to_string(), + error: error.to_string(), + } })?; let payload_str = String::from_utf8_lossy(&response.payload); tracing::debug!(payload = %payload_str, "Received NATS response"); - serde_json::from_slice(&response.payload).map_err(|e| { - tracing::error!(payload = %payload_str, error = %e, "Failed to deserialize NATS response"); - NatsError::Deserialize(e) + serde_json::from_slice(&response.payload).map_err(|error| { + set_span_error(&span, MessagingError::Deserialize); + tracing::error!( + payload = %payload_str, + error = %error, + "Failed to deserialize NATS response" + ); + NatsError::Deserialize(error) }) } @@ -246,6 +267,7 @@ impl PublishOptionsBuilder { } } +#[instrument(name = "nats.publish", skip(client, request, options), fields(subject = %subject))] pub async fn publish( client: &N, subject: &str, @@ -255,7 +277,13 @@ pub async fn publish( where Req: Serialize, { - let payload = serde_json::to_vec(request).map_err(NatsError::Serialize)?; + let span = Span::current(); + set_client_operation_span_attributes(&span, MessagingOperation::Publish, subject); + + let payload = serde_json::to_vec(request).map_err(|error| { + set_span_error(&span, MessagingError::Serialize); + NatsError::Serialize(error) + })?; let headers = headers_with_trace_context(); options @@ -274,7 +302,11 @@ where "publish", subject, ) - .await?; + .await + .map_err(|error| { + set_span_error(&span, MessagingError::PublishOperation); + error + })?; let Some(flush_policy) = options.flush else { return Ok(()); @@ -296,6 +328,10 @@ where subject, ) .await + .map_err(|error| { + set_span_error(&span, MessagingError::FlushOperation); + error + }) } #[derive(Debug)] diff --git a/rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs b/rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs new file mode 100644 index 000000000..c2bcf270f --- /dev/null +++ b/rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs @@ -0,0 +1,138 @@ +use opentelemetry::KeyValue; +use opentelemetry_semantic_conventions::{attribute as attr_semconv, trace as trace_semconv}; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum MessagingError { + Deserialize, + FlushOperation, + PublishOperation, + Request, + Serialize, + Timeout, +} + +impl MessagingError { + const fn as_str(self) -> &'static str { + match self { + Self::Deserialize => "deserialize", + Self::FlushOperation => "flush_operation", + Self::PublishOperation => "publish_operation", + Self::Request => "request", + Self::Serialize => "serialize", + Self::Timeout => "timeout", + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum MessagingOperation { + Publish, + Request, +} + +impl MessagingOperation { + const fn name(self) -> &'static str { + match self { + Self::Publish => "publish", + Self::Request => "request", + } + } + + const fn operation_type(self) -> &'static str { + "send" + } +} + +pub(crate) fn client_operation_attributes( + operation: MessagingOperation, + destination_name: &str, +) -> [KeyValue; 4] { + [ + KeyValue::new(attr_semconv::MESSAGING_SYSTEM, "nats"), + KeyValue::new( + attr_semconv::MESSAGING_DESTINATION_NAME, + destination_name.to_owned(), + ), + KeyValue::new(attr_semconv::MESSAGING_OPERATION_NAME, operation.name()), + KeyValue::new( + attr_semconv::MESSAGING_OPERATION_TYPE, + operation.operation_type(), + ), + ] +} + +pub(crate) fn error_attribute(error: MessagingError) -> KeyValue { + KeyValue::new(trace_semconv::ERROR_TYPE, error.as_str()) +} + +pub(crate) fn set_client_operation_span_attributes( + span: &Span, + operation: MessagingOperation, + destination_name: &str, +) { + for attribute in client_operation_attributes(operation, destination_name) { + span.set_attribute(attribute.key, attribute.value); + } +} + +pub(crate) fn set_span_error(span: &Span, error: MessagingError) { + let attribute = error_attribute(error); + span.set_attribute(attribute.key, attribute.value); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn value_for<'a>(attributes: &'a [KeyValue], key: &str) -> Option<&'a opentelemetry::Value> { + attributes + .iter() + .find(|attribute| attribute.key.as_str() == key) + .map(|attribute| &attribute.value) + } + + #[test] + fn client_operation_attributes_follow_messaging_semantic_conventions() { + let attributes = + client_operation_attributes(MessagingOperation::Request, "acp.session.new"); + + assert_eq!( + value_for(&attributes, attr_semconv::MESSAGING_SYSTEM) + .unwrap() + .as_str() + .as_ref(), + "nats" + ); + assert_eq!( + value_for(&attributes, attr_semconv::MESSAGING_DESTINATION_NAME) + .unwrap() + .as_str() + .as_ref(), + "acp.session.new" + ); + assert_eq!( + value_for(&attributes, attr_semconv::MESSAGING_OPERATION_NAME) + .unwrap() + .as_str() + .as_ref(), + "request" + ); + assert_eq!( + value_for(&attributes, attr_semconv::MESSAGING_OPERATION_TYPE) + .unwrap() + .as_str() + .as_ref(), + "send" + ); + } + + #[test] + fn error_attribute_uses_semantic_error_type_key() { + let attribute = error_attribute(MessagingError::Timeout); + + assert_eq!(attribute.key.as_str(), trace_semconv::ERROR_TYPE); + assert_eq!(attribute.value.as_str().as_ref(), "timeout"); + } +} diff --git a/rsworkspace/crates/trogon-nats/src/telemetry/mod.rs b/rsworkspace/crates/trogon-nats/src/telemetry/mod.rs new file mode 100644 index 000000000..3ef8c6ace --- /dev/null +++ b/rsworkspace/crates/trogon-nats/src/telemetry/mod.rs @@ -0,0 +1 @@ +pub(crate) mod messaging; From a937e3e2962ad682dd34e306720cf7112417bc27 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 13 Apr 2026 13:38:35 -0400 Subject: [PATCH 02/10] fix(telemetry): centralize axum semconv wiring Signed-off-by: Yordis Prieto --- rsworkspace/Cargo.lock | 2 +- rsworkspace/crates/acp-nats-ws/src/main.rs | 35 +++----------- rsworkspace/crates/acp-telemetry/Cargo.toml | 1 + rsworkspace/crates/acp-telemetry/src/http.rs | 46 +++++++++++++++++-- rsworkspace/crates/trogon-gateway/Cargo.toml | 1 - rsworkspace/crates/trogon-gateway/src/main.rs | 27 +---------- 6 files changed, 52 insertions(+), 60 deletions(-) diff --git a/rsworkspace/Cargo.lock b/rsworkspace/Cargo.lock index 822a10140..0a242166f 100644 --- a/rsworkspace/Cargo.lock +++ b/rsworkspace/Cargo.lock @@ -97,6 +97,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry_sdk", "tokio", + "tower-http", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -3463,7 +3464,6 @@ dependencies = [ "serde", "tempfile", "tokio", - "tower-http", "tracing", "trogon-nats", "trogon-service-config", diff --git a/rsworkspace/crates/acp-nats-ws/src/main.rs b/rsworkspace/crates/acp-nats-ws/src/main.rs index 9f386f8dc..afb01d25f 100644 --- a/rsworkspace/crates/acp-nats-ws/src/main.rs +++ b/rsworkspace/crates/acp-nats-ws/src/main.rs @@ -9,9 +9,8 @@ use upgrade::{ConnectionRequest, UpgradeState}; #[cfg(not(coverage))] use { - acp_nats::nats, acp_telemetry::ServiceName, clap::Parser, std::net::SocketAddr, - tower_http::trace::TraceLayer, tracing::error, trogon_std::env::SystemEnv, - trogon_std::fs::SystemFs, + acp_nats::nats, acp_telemetry::ServiceName, clap::Parser, std::net::SocketAddr, tracing::error, + trogon_std::env::SystemEnv, trogon_std::fs::SystemFs, }; #[cfg(not(coverage))] @@ -47,31 +46,11 @@ async fn main() -> Result<(), Box> { shutdown_tx: shutdown_tx.clone(), }; - let app = axum::Router::new() - .route("/ws", axum::routing::get(upgrade::handle)) - .layer( - TraceLayer::new_for_http() - .make_span_with(|request: &axum::http::Request| { - let span = tracing::info_span!( - "http.server.request", - method = %request.method(), - path = %request.uri().path() - ); - acp_telemetry::http::set_server_request_span_attributes(&span, request); - span - }) - .on_response( - |response: &axum::http::Response, - _latency: std::time::Duration, - span: &tracing::Span| { - acp_telemetry::http::set_server_response_span_attributes( - span, - response.status(), - ); - }, - ), - ) - .with_state(state); + let app = acp_telemetry::http::instrument_router( + axum::Router::new() + .route("/ws", axum::routing::get(upgrade::handle)) + .with_state(state), + ); let addr = SocketAddr::from((ws_config.host, ws_config.port)); let listener = tokio::net::TcpListener::bind(addr).await?; diff --git a/rsworkspace/crates/acp-telemetry/Cargo.toml b/rsworkspace/crates/acp-telemetry/Cargo.toml index 5cefca055..a03b09d0b 100644 --- a/rsworkspace/crates/acp-telemetry/Cargo.toml +++ b/rsworkspace/crates/acp-telemetry/Cargo.toml @@ -14,6 +14,7 @@ opentelemetry-otlp = { workspace = true, features = ["http-json", "logs", "metri opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio", "logs", "metrics"] } tokio = { workspace = true, features = ["signal"] } +tower-http = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "json"] } diff --git a/rsworkspace/crates/acp-telemetry/src/http.rs b/rsworkspace/crates/acp-telemetry/src/http.rs index 28da3f840..15760c4b7 100644 --- a/rsworkspace/crates/acp-telemetry/src/http.rs +++ b/rsworkspace/crates/acp-telemetry/src/http.rs @@ -1,13 +1,43 @@ -use axum::http::{ - Request, StatusCode, Version, - header::{HOST, USER_AGENT}, - uri::Authority, +use axum::{ + Router, + body::Body, + http::{ + Request, StatusCode, Version, + header::{HOST, USER_AGENT}, + uri::Authority, + }, }; use opentelemetry::KeyValue; use opentelemetry_semantic_conventions::trace as semconv; +use tower_http::trace::TraceLayer; use tracing::Span; use tracing_opentelemetry::OpenTelemetrySpanExt; +pub fn instrument_router(router: Router) -> Router +where + S: Clone + Send + Sync + 'static, +{ + router.layer( + TraceLayer::new_for_http() + .make_span_with(|request: &Request| { + let span = tracing::info_span!( + "http.server.request", + method = %request.method(), + path = %request.uri().path() + ); + set_server_request_span_attributes(&span, request); + span + }) + .on_response( + |response: &axum::http::Response, + _latency: std::time::Duration, + span: &tracing::Span| { + set_server_response_span_attributes(span, response.status()); + }, + ), + ) +} + pub fn server_request_attributes(request: &Request) -> Vec { let mut attributes = vec![ KeyValue::new( @@ -94,6 +124,14 @@ mod tests { .map(|attribute| &attribute.value) } + #[test] + fn instrument_router_wraps_routes_without_changing_shape() { + let _router = instrument_router(Router::<()>::new().route( + "/-/liveness", + axum::routing::get(|| async { StatusCode::OK }), + )); + } + #[test] fn server_request_attributes_follow_http_semantic_conventions() { let request = Request::builder() diff --git a/rsworkspace/crates/trogon-gateway/Cargo.toml b/rsworkspace/crates/trogon-gateway/Cargo.toml index b2ca613f1..b70dd9b43 100644 --- a/rsworkspace/crates/trogon-gateway/Cargo.toml +++ b/rsworkspace/crates/trogon-gateway/Cargo.toml @@ -18,7 +18,6 @@ clap = { workspace = true } confique = { workspace = true } serde = { workspace = true } tokio = { workspace = true, features = ["full"] } -tower-http = { workspace = true } tracing = { workspace = true } trogon-nats = { workspace = true } trogon-service-config = { workspace = true } diff --git a/rsworkspace/crates/trogon-gateway/src/main.rs b/rsworkspace/crates/trogon-gateway/src/main.rs index 431452501..99c3da2f7 100644 --- a/rsworkspace/crates/trogon-gateway/src/main.rs +++ b/rsworkspace/crates/trogon-gateway/src/main.rs @@ -14,13 +14,9 @@ use std::net::SocketAddr; #[cfg(not(coverage))] use std::time::Duration; -#[cfg(not(coverage))] -use axum::body::Body; #[cfg(not(coverage))] use tokio::task::JoinSet; #[cfg(not(coverage))] -use tower_http::trace::TraceLayer; -#[cfg(not(coverage))] use tracing::{error, info}; #[cfg(not(coverage))] use trogon_nats::connect; @@ -100,28 +96,7 @@ async fn main() -> Result<(), Box> { } } - let app = http::mount_sources(resolved, publisher).layer( - TraceLayer::new_for_http() - .make_span_with(|request: &axum::http::Request| { - let span = tracing::info_span!( - "http.server.request", - method = %request.method(), - path = %request.uri().path() - ); - acp_telemetry::http::set_server_request_span_attributes(&span, request); - span - }) - .on_response( - |response: &axum::http::Response, - _latency: Duration, - span: &tracing::Span| { - acp_telemetry::http::set_server_response_span_attributes( - span, - response.status(), - ); - }, - ), - ); + let app = acp_telemetry::http::instrument_router(http::mount_sources(resolved, publisher)); let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = tokio::net::TcpListener::bind(addr).await?; From e7488dabe5a543d1bcdcf76965d8dc950486da80 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 13 Apr 2026 14:16:36 -0400 Subject: [PATCH 03/10] fix(trogon-std): keep shared http telemetry out of ACP Signed-off-by: Yordis Prieto --- rsworkspace/Cargo.lock | 10 ++++++---- rsworkspace/crates/acp-nats-ws/Cargo.toml | 3 +-- rsworkspace/crates/acp-nats-ws/src/main.rs | 2 +- rsworkspace/crates/acp-telemetry/Cargo.toml | 3 --- rsworkspace/crates/acp-telemetry/src/lib.rs | 1 - rsworkspace/crates/trogon-gateway/Cargo.toml | 2 +- rsworkspace/crates/trogon-gateway/src/main.rs | 3 ++- rsworkspace/crates/trogon-std/Cargo.toml | 14 ++++++++++++++ rsworkspace/crates/trogon-std/src/lib.rs | 2 ++ .../src => trogon-std/src/telemetry}/http.rs | 0 rsworkspace/crates/trogon-std/src/telemetry/mod.rs | 1 + 11 files changed, 28 insertions(+), 13 deletions(-) rename rsworkspace/crates/{acp-telemetry/src => trogon-std/src/telemetry}/http.rs (100%) create mode 100644 rsworkspace/crates/trogon-std/src/telemetry/mod.rs diff --git a/rsworkspace/Cargo.lock b/rsworkspace/Cargo.lock index 0a242166f..a7141bf41 100644 --- a/rsworkspace/Cargo.lock +++ b/rsworkspace/Cargo.lock @@ -79,7 +79,6 @@ dependencies = [ "serde_json", "tokio", "tokio-tungstenite 0.29.0", - "tower-http", "tracing", "tracing-subscriber", "trogon-nats", @@ -90,14 +89,11 @@ dependencies = [ name = "acp-telemetry" version = "0.1.0" dependencies = [ - "axum", "opentelemetry", "opentelemetry-appender-tracing", "opentelemetry-otlp", - "opentelemetry-semantic-conventions", "opentelemetry_sdk", "tokio", - "tower-http", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -3670,10 +3666,16 @@ dependencies = [ name = "trogon-std" version = "0.1.0" dependencies = [ + "axum", "bytesize", "clap", + "opentelemetry", + "opentelemetry-semantic-conventions", "serde", "serde_json", + "tower-http", + "tracing", + "tracing-opentelemetry", ] [[package]] diff --git a/rsworkspace/crates/acp-nats-ws/Cargo.toml b/rsworkspace/crates/acp-nats-ws/Cargo.toml index 1f0f6baec..c5451d991 100644 --- a/rsworkspace/crates/acp-nats-ws/Cargo.toml +++ b/rsworkspace/crates/acp-nats-ws/Cargo.toml @@ -18,10 +18,9 @@ clap = { workspace = true, features = ["env"] } futures-util = { workspace = true, features = ["sink"] } opentelemetry = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal", "net", "sync", "io-util"] } -tower-http = { workspace = true } tracing = { workspace = true } trogon-nats = { workspace = true } -trogon-std = { workspace = true } +trogon-std = { workspace = true, features = ["telemetry-http"] } [dev-dependencies] serde_json = { workspace = true } diff --git a/rsworkspace/crates/acp-nats-ws/src/main.rs b/rsworkspace/crates/acp-nats-ws/src/main.rs index afb01d25f..6faee93d7 100644 --- a/rsworkspace/crates/acp-nats-ws/src/main.rs +++ b/rsworkspace/crates/acp-nats-ws/src/main.rs @@ -46,7 +46,7 @@ async fn main() -> Result<(), Box> { shutdown_tx: shutdown_tx.clone(), }; - let app = acp_telemetry::http::instrument_router( + let app = trogon_std::telemetry::http::instrument_router( axum::Router::new() .route("/ws", axum::routing::get(upgrade::handle)) .with_state(state), diff --git a/rsworkspace/crates/acp-telemetry/Cargo.toml b/rsworkspace/crates/acp-telemetry/Cargo.toml index a03b09d0b..aee2789b8 100644 --- a/rsworkspace/crates/acp-telemetry/Cargo.toml +++ b/rsworkspace/crates/acp-telemetry/Cargo.toml @@ -7,14 +7,11 @@ edition = "2024" workspace = true [dependencies] -axum = { workspace = true } opentelemetry = { workspace = true } opentelemetry-appender-tracing = { workspace = true } opentelemetry-otlp = { workspace = true, features = ["http-json", "logs", "metrics", "reqwest-rustls"] } -opentelemetry-semantic-conventions = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio", "logs", "metrics"] } tokio = { workspace = true, features = ["signal"] } -tower-http = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "json"] } diff --git a/rsworkspace/crates/acp-telemetry/src/lib.rs b/rsworkspace/crates/acp-telemetry/src/lib.rs index a50a32de6..86a98dabd 100644 --- a/rsworkspace/crates/acp-telemetry/src/lib.rs +++ b/rsworkspace/crates/acp-telemetry/src/lib.rs @@ -1,5 +1,4 @@ pub mod constants; -pub mod http; mod log; mod metric; mod service_name; diff --git a/rsworkspace/crates/trogon-gateway/Cargo.toml b/rsworkspace/crates/trogon-gateway/Cargo.toml index b70dd9b43..86a3b32af 100644 --- a/rsworkspace/crates/trogon-gateway/Cargo.toml +++ b/rsworkspace/crates/trogon-gateway/Cargo.toml @@ -29,7 +29,7 @@ trogon-source-linear = { workspace = true } trogon-source-notion = { workspace = true } trogon-source-slack = { workspace = true } trogon-source-telegram = { workspace = true } -trogon-std = { workspace = true, features = ["clap"] } +trogon-std = { workspace = true, features = ["clap", "telemetry-http"] } [dev-dependencies] tempfile = { workspace = true } diff --git a/rsworkspace/crates/trogon-gateway/src/main.rs b/rsworkspace/crates/trogon-gateway/src/main.rs index 99c3da2f7..3f4c195fc 100644 --- a/rsworkspace/crates/trogon-gateway/src/main.rs +++ b/rsworkspace/crates/trogon-gateway/src/main.rs @@ -96,7 +96,8 @@ async fn main() -> Result<(), Box> { } } - let app = acp_telemetry::http::instrument_router(http::mount_sources(resolved, publisher)); + let app = + trogon_std::telemetry::http::instrument_router(http::mount_sources(resolved, publisher)); let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = tokio::net::TcpListener::bind(addr).await?; diff --git a/rsworkspace/crates/trogon-std/Cargo.toml b/rsworkspace/crates/trogon-std/Cargo.toml index a4cf630aa..f77372304 100644 --- a/rsworkspace/crates/trogon-std/Cargo.toml +++ b/rsworkspace/crates/trogon-std/Cargo.toml @@ -9,11 +9,25 @@ workspace = true [features] test-support = [] clap = ["dep:clap"] +telemetry-http = [ + "dep:axum", + "dep:opentelemetry", + "dep:opentelemetry-semantic-conventions", + "dep:tower-http", + "dep:tracing", + "dep:tracing-opentelemetry", +] [dependencies] +axum = { workspace = true, optional = true } bytesize = "2.3.1" clap = { workspace = true, optional = true } +opentelemetry = { workspace = true, optional = true } +opentelemetry-semantic-conventions = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } +tower-http = { workspace = true, optional = true } +tracing = { workspace = true, optional = true } +tracing-opentelemetry = { workspace = true, optional = true } [dev-dependencies] diff --git a/rsworkspace/crates/trogon-std/src/lib.rs b/rsworkspace/crates/trogon-std/src/lib.rs index 93f0c4fee..ddf191a8f 100644 --- a/rsworkspace/crates/trogon-std/src/lib.rs +++ b/rsworkspace/crates/trogon-std/src/lib.rs @@ -38,6 +38,8 @@ pub mod fs; pub mod http; pub mod json; pub mod secret_string; +#[cfg(feature = "telemetry-http")] +pub mod telemetry; pub mod time; #[cfg(all(feature = "clap", not(coverage)))] diff --git a/rsworkspace/crates/acp-telemetry/src/http.rs b/rsworkspace/crates/trogon-std/src/telemetry/http.rs similarity index 100% rename from rsworkspace/crates/acp-telemetry/src/http.rs rename to rsworkspace/crates/trogon-std/src/telemetry/http.rs diff --git a/rsworkspace/crates/trogon-std/src/telemetry/mod.rs b/rsworkspace/crates/trogon-std/src/telemetry/mod.rs new file mode 100644 index 000000000..3883215fc --- /dev/null +++ b/rsworkspace/crates/trogon-std/src/telemetry/mod.rs @@ -0,0 +1 @@ +pub mod http; From d1a0a5b0112c97b841702d5b9f996b823d31da0d Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 13 Apr 2026 14:19:48 -0400 Subject: [PATCH 04/10] fix(trogon-nats): keep telemetry hooks lint-clean Signed-off-by: Yordis Prieto --- rsworkspace/crates/trogon-nats/src/messaging.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rsworkspace/crates/trogon-nats/src/messaging.rs b/rsworkspace/crates/trogon-nats/src/messaging.rs index 962927c10..45d0fbea5 100644 --- a/rsworkspace/crates/trogon-nats/src/messaging.rs +++ b/rsworkspace/crates/trogon-nats/src/messaging.rs @@ -303,9 +303,8 @@ where subject, ) .await - .map_err(|error| { + .inspect_err(|_error| { set_span_error(&span, MessagingError::PublishOperation); - error })?; let Some(flush_policy) = options.flush else { @@ -328,9 +327,8 @@ where subject, ) .await - .map_err(|error| { + .inspect_err(|_error| { set_span_error(&span, MessagingError::FlushOperation); - error }) } From a880134fc14d93d9be6d2a030fec88264362cdb0 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 13 Apr 2026 14:41:53 -0400 Subject: [PATCH 05/10] fix(telemetry): keep semconv coverage green Signed-off-by: Yordis Prieto --- rsworkspace/Cargo.lock | 2 + .../crates/trogon-nats/src/messaging.rs | 77 ++++++++++++ .../trogon-nats/src/telemetry/messaging.rs | 25 ++++ rsworkspace/crates/trogon-std/Cargo.toml | 2 + .../crates/trogon-std/src/telemetry/http.rs | 117 ++++++++++++++++-- 5 files changed, 212 insertions(+), 11 deletions(-) diff --git a/rsworkspace/Cargo.lock b/rsworkspace/Cargo.lock index a7141bf41..147c3687c 100644 --- a/rsworkspace/Cargo.lock +++ b/rsworkspace/Cargo.lock @@ -3673,6 +3673,8 @@ dependencies = [ "opentelemetry-semantic-conventions", "serde", "serde_json", + "tokio", + "tower", "tower-http", "tracing", "tracing-opentelemetry", diff --git a/rsworkspace/crates/trogon-nats/src/messaging.rs b/rsworkspace/crates/trogon-nats/src/messaging.rs index 45d0fbea5..8ab062955 100644 --- a/rsworkspace/crates/trogon-nats/src/messaging.rs +++ b/rsworkspace/crates/trogon-nats/src/messaging.rs @@ -422,6 +422,22 @@ mod tests { result: String, } + #[cfg(feature = "test-support")] + struct FailingSerialize; + + #[cfg(feature = "test-support")] + impl serde::Serialize for FailingSerialize { + fn serialize(&self, _serializer: S) -> Result + where + S: serde::Serializer, + { + Err(serde::ser::Error::custom(format!( + "{} cannot be serialized", + std::any::type_name::() + ))) + } + } + #[test] fn test_retry_policy_no_retries() { let policy = RetryPolicy::no_retries(); @@ -614,6 +630,17 @@ mod tests { } } + #[tokio::test] + #[cfg(feature = "test-support")] + async fn test_request_serialize_error() { + let mock = AdvancedMockNatsClient::new(); + + let result: Result = + request(&mock, "test.subject", &FailingSerialize).await; + + assert!(matches!(result, Err(NatsError::Serialize(_)))); + } + #[tokio::test] #[cfg(feature = "test-support")] async fn test_publish_simple() { @@ -628,6 +655,23 @@ mod tests { assert_eq!(mock.published_messages(), vec!["test.subject"]); } + #[tokio::test] + #[cfg(feature = "test-support")] + async fn test_publish_serialize_error() { + let mock = AdvancedMockNatsClient::new(); + + let result = publish( + &mock, + "test.subject", + &FailingSerialize, + PublishOptions::simple(), + ) + .await; + + assert!(matches!(result, Err(NatsError::Serialize(_)))); + assert!(mock.published_messages().is_empty()); + } + #[tokio::test] #[cfg(feature = "test-support")] async fn test_publish_with_flush() { @@ -646,6 +690,39 @@ mod tests { assert_eq!(mock.published_messages(), vec!["test.subject"]); } + #[tokio::test] + #[cfg(feature = "test-support")] + async fn test_publish_returns_error_when_publish_fails() { + let mock = AdvancedMockNatsClient::new(); + mock.fail_next_publish(); + let data = TestRequest { + message: "test".to_string(), + }; + + let result = publish(&mock, "test.subject", &data, PublishOptions::simple()).await; + + assert!(matches!(result, Err(NatsError::PublishOperation(_)))); + assert!(mock.published_messages().is_empty()); + } + + #[tokio::test] + #[cfg(feature = "test-support")] + async fn test_publish_returns_error_when_flush_fails() { + let mock = AdvancedMockNatsClient::new(); + mock.fail_next_flush(); + let data = TestRequest { + message: "test".to_string(), + }; + let options = PublishOptions::builder() + .flush_policy(FlushPolicy::no_retries()) + .build(); + + let result = publish(&mock, "test.subject", &data, options).await; + + assert!(matches!(result, Err(NatsError::PublishOperation(_)))); + assert_eq!(mock.published_messages(), vec!["test.subject"]); + } + #[test] fn test_publish_operation_error_display() { let err = PublishOperationError("test error".to_string()); diff --git a/rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs b/rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs index c2bcf270f..49c443893 100644 --- a/rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs +++ b/rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs @@ -135,4 +135,29 @@ mod tests { assert_eq!(attribute.key.as_str(), trace_semconv::ERROR_TYPE); assert_eq!(attribute.value.as_str().as_ref(), "timeout"); } + + #[test] + fn error_attribute_covers_all_semantic_error_variants() { + assert_eq!( + error_attribute(MessagingError::FlushOperation) + .value + .as_str() + .as_ref(), + "flush_operation" + ); + assert_eq!( + error_attribute(MessagingError::PublishOperation) + .value + .as_str() + .as_ref(), + "publish_operation" + ); + assert_eq!( + error_attribute(MessagingError::Serialize) + .value + .as_str() + .as_ref(), + "serialize" + ); + } } diff --git a/rsworkspace/crates/trogon-std/Cargo.toml b/rsworkspace/crates/trogon-std/Cargo.toml index f77372304..6644755e7 100644 --- a/rsworkspace/crates/trogon-std/Cargo.toml +++ b/rsworkspace/crates/trogon-std/Cargo.toml @@ -31,3 +31,5 @@ tracing = { workspace = true, optional = true } tracing-opentelemetry = { workspace = true, optional = true } [dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tower = "0.5" diff --git a/rsworkspace/crates/trogon-std/src/telemetry/http.rs b/rsworkspace/crates/trogon-std/src/telemetry/http.rs index 15760c4b7..6b6956c4d 100644 --- a/rsworkspace/crates/trogon-std/src/telemetry/http.rs +++ b/rsworkspace/crates/trogon-std/src/telemetry/http.rs @@ -102,20 +102,15 @@ pub fn set_server_response_span_attributes(span: &Span, status: StatusCode) { } } +#[rustfmt::skip] fn protocol_version(version: Version) -> Option<&'static str> { - match version { - Version::HTTP_09 => Some("0.9"), - Version::HTTP_10 => Some("1.0"), - Version::HTTP_11 => Some("1.1"), - Version::HTTP_2 => Some("2"), - Version::HTTP_3 => Some("3"), - _ => None, - } + Some(if version == Version::HTTP_09 { "0.9" } else if version == Version::HTTP_10 { "1.0" } else if version == Version::HTTP_11 { "1.1" } else if version == Version::HTTP_2 { "2" } else if version == Version::HTTP_3 { "3" } else { return None; }) } #[cfg(test)] mod tests { use super::*; + use tower::ServiceExt; fn value_for<'a>(attributes: &'a [KeyValue], key: &str) -> Option<&'a opentelemetry::Value> { attributes @@ -124,12 +119,27 @@ mod tests { .map(|attribute| &attribute.value) } - #[test] - fn instrument_router_wraps_routes_without_changing_shape() { - let _router = instrument_router(Router::<()>::new().route( + #[tokio::test] + async fn instrument_router_executes_trace_callbacks() { + let app = instrument_router(Router::<()>::new().route( "/-/liveness", axum::routing::get(|| async { StatusCode::OK }), )); + + let response = app + .oneshot( + Request::builder() + .uri("/-/liveness") + .version(Version::HTTP_11) + .header(HOST, "gateway.test:8443") + .header(USER_AGENT, "curl/8.7.1") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); } #[test] @@ -226,6 +236,91 @@ mod tests { assert!(value_for(&attributes, semconv::SERVER_PORT).is_none()); } + #[test] + fn server_request_attributes_cover_other_protocol_versions() { + let http_09_request = Request::builder() + .method("GET") + .uri("/legacy") + .version(Version::HTTP_09) + .body(()) + .unwrap(); + let http_10_request = Request::builder() + .method("GET") + .uri("/legacy") + .version(Version::HTTP_10) + .body(()) + .unwrap(); + let http_3_request = Request::builder() + .method("GET") + .uri("/modern") + .version(Version::HTTP_3) + .body(()) + .unwrap(); + + assert_eq!( + value_for( + &server_request_attributes(&http_09_request), + semconv::NETWORK_PROTOCOL_VERSION, + ) + .unwrap() + .as_str() + .as_ref(), + "0.9" + ); + assert_eq!( + value_for( + &server_request_attributes(&http_10_request), + semconv::NETWORK_PROTOCOL_VERSION, + ) + .unwrap() + .as_str() + .as_ref(), + "1.0" + ); + assert_eq!( + value_for( + &server_request_attributes(&http_3_request), + semconv::NETWORK_PROTOCOL_VERSION, + ) + .unwrap() + .as_str() + .as_ref(), + "3" + ); + } + + #[test] + fn server_request_attributes_ignore_invalid_host_header() { + let request = Request::builder() + .method("GET") + .uri("/-/liveness") + .version(Version::HTTP_11) + .header(HOST, "invalid host") + .body(()) + .unwrap(); + + let attributes = server_request_attributes(&request); + + assert!(value_for(&attributes, semconv::SERVER_ADDRESS).is_none()); + assert!(value_for(&attributes, semconv::SERVER_PORT).is_none()); + } + + #[test] + fn direct_span_attribute_helpers_do_not_panic() { + let span = tracing::info_span!("http.server.request.test"); + let _guard = span.enter(); + let request = Request::builder() + .method("GET") + .uri("/-/ready") + .version(Version::HTTP_11) + .header(HOST, "gateway.test") + .body(()) + .unwrap(); + + set_server_request_span_attributes(&Span::current(), &request); + set_server_response_span_attributes(&Span::current(), StatusCode::NO_CONTENT); + } + #[test] fn server_response_attributes_follow_http_semantic_conventions() { let attributes = server_response_attributes(StatusCode::CREATED); From 37fcd9289df89cbc154ab3c0c21008faab4bf2b0 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 13 Apr 2026 15:21:14 -0400 Subject: [PATCH 06/10] fix(nats): avoid leaking response bodies Signed-off-by: Yordis Prieto --- .../crates/trogon-nats/src/messaging.rs | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/rsworkspace/crates/trogon-nats/src/messaging.rs b/rsworkspace/crates/trogon-nats/src/messaging.rs index 8ab062955..b31092d42 100644 --- a/rsworkspace/crates/trogon-nats/src/messaging.rs +++ b/rsworkspace/crates/trogon-nats/src/messaging.rs @@ -11,6 +11,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::constants::{DEFAULT_TIMEOUT, REQ_ID_HEADER}; +const PAYLOAD_PREVIEW_BYTES: usize = 16; + struct HeaderMapCarrier<'a>(&'a mut HeaderMap); impl Injector for HeaderMapCarrier<'_> { @@ -38,6 +40,19 @@ pub fn build_request_headers() -> HeaderMap { headers } +fn payload_preview(payload: &[u8]) -> String { + use std::fmt::Write as _; + + let mut preview = String::with_capacity(PAYLOAD_PREVIEW_BYTES * 2 + 3); + for byte in payload.iter().take(PAYLOAD_PREVIEW_BYTES) { + let _ = write!(&mut preview, "{byte:02x}"); + } + if payload.len() > PAYLOAD_PREVIEW_BYTES { + preview.push_str("..."); + } + preview +} + #[instrument(name = "nats.request", skip(client, request), fields(subject = %subject))] pub async fn request_with_timeout( client: &N, @@ -83,8 +98,10 @@ where serde_json::from_slice(&response.payload).map_err(|error| { set_span_error(&span, MessagingError::Deserialize); tracing::error!( - payload = %payload_str, error = %error, + subject = %subject, + payload_len = response.payload.len(), + payload_preview = %payload_preview(&response.payload), "Failed to deserialize NATS response" ); NatsError::Deserialize(error) @@ -536,6 +553,19 @@ mod tests { ); } + #[test] + fn payload_preview_hex_encodes_short_payloads() { + assert_eq!(payload_preview(b"abc"), "616263"); + } + + #[test] + fn payload_preview_truncates_long_payloads() { + assert_eq!( + payload_preview(b"1234567890abcdefghijklmnop"), + "31323334353637383930616263646566..." + ); + } + #[test] fn inject_trace_context_preserves_existing_headers() { let mut headers = async_nats::HeaderMap::new(); From 73d31d1c1bf11980170c23567fcffe70a68ddfa8 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 13 Apr 2026 15:33:30 -0400 Subject: [PATCH 07/10] chore(nats): keep preview size in constants Signed-off-by: Yordis Prieto --- rsworkspace/crates/trogon-nats/src/constants.rs | 1 + rsworkspace/crates/trogon-nats/src/messaging.rs | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/rsworkspace/crates/trogon-nats/src/constants.rs b/rsworkspace/crates/trogon-nats/src/constants.rs index 7dbe45206..e2d4b7e58 100644 --- a/rsworkspace/crates/trogon-nats/src/constants.rs +++ b/rsworkspace/crates/trogon-nats/src/constants.rs @@ -12,5 +12,6 @@ pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); pub const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30); pub const REQ_ID_HEADER: &str = "X-Req-Id"; +pub const PAYLOAD_PREVIEW_BYTES: usize = 16; pub const MAX_NATS_TOKEN_LENGTH: usize = 128; diff --git a/rsworkspace/crates/trogon-nats/src/messaging.rs b/rsworkspace/crates/trogon-nats/src/messaging.rs index b31092d42..f13c57d6b 100644 --- a/rsworkspace/crates/trogon-nats/src/messaging.rs +++ b/rsworkspace/crates/trogon-nats/src/messaging.rs @@ -9,9 +9,7 @@ use std::time::Duration; use tracing::{Span, instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; -use crate::constants::{DEFAULT_TIMEOUT, REQ_ID_HEADER}; - -const PAYLOAD_PREVIEW_BYTES: usize = 16; +use crate::constants::{DEFAULT_TIMEOUT, PAYLOAD_PREVIEW_BYTES, REQ_ID_HEADER}; struct HeaderMapCarrier<'a>(&'a mut HeaderMap); From 3b4a2e848a173dd82a5358aaedb7e5e888646447 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 13 Apr 2026 15:48:09 -0400 Subject: [PATCH 08/10] fix(nats): keep deserialize logs cheap Signed-off-by: Yordis Prieto --- .../crates/trogon-nats/src/constants.rs | 1 - .../crates/trogon-nats/src/messaging.rs | 29 +------------------ 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/rsworkspace/crates/trogon-nats/src/constants.rs b/rsworkspace/crates/trogon-nats/src/constants.rs index e2d4b7e58..7dbe45206 100644 --- a/rsworkspace/crates/trogon-nats/src/constants.rs +++ b/rsworkspace/crates/trogon-nats/src/constants.rs @@ -12,6 +12,5 @@ pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); pub const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30); pub const REQ_ID_HEADER: &str = "X-Req-Id"; -pub const PAYLOAD_PREVIEW_BYTES: usize = 16; pub const MAX_NATS_TOKEN_LENGTH: usize = 128; diff --git a/rsworkspace/crates/trogon-nats/src/messaging.rs b/rsworkspace/crates/trogon-nats/src/messaging.rs index f13c57d6b..d241978f2 100644 --- a/rsworkspace/crates/trogon-nats/src/messaging.rs +++ b/rsworkspace/crates/trogon-nats/src/messaging.rs @@ -9,7 +9,7 @@ use std::time::Duration; use tracing::{Span, instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; -use crate::constants::{DEFAULT_TIMEOUT, PAYLOAD_PREVIEW_BYTES, REQ_ID_HEADER}; +use crate::constants::{DEFAULT_TIMEOUT, REQ_ID_HEADER}; struct HeaderMapCarrier<'a>(&'a mut HeaderMap); @@ -38,19 +38,6 @@ pub fn build_request_headers() -> HeaderMap { headers } -fn payload_preview(payload: &[u8]) -> String { - use std::fmt::Write as _; - - let mut preview = String::with_capacity(PAYLOAD_PREVIEW_BYTES * 2 + 3); - for byte in payload.iter().take(PAYLOAD_PREVIEW_BYTES) { - let _ = write!(&mut preview, "{byte:02x}"); - } - if payload.len() > PAYLOAD_PREVIEW_BYTES { - preview.push_str("..."); - } - preview -} - #[instrument(name = "nats.request", skip(client, request), fields(subject = %subject))] pub async fn request_with_timeout( client: &N, @@ -99,7 +86,6 @@ where error = %error, subject = %subject, payload_len = response.payload.len(), - payload_preview = %payload_preview(&response.payload), "Failed to deserialize NATS response" ); NatsError::Deserialize(error) @@ -551,19 +537,6 @@ mod tests { ); } - #[test] - fn payload_preview_hex_encodes_short_payloads() { - assert_eq!(payload_preview(b"abc"), "616263"); - } - - #[test] - fn payload_preview_truncates_long_payloads() { - assert_eq!( - payload_preview(b"1234567890abcdefghijklmnop"), - "31323334353637383930616263646566..." - ); - } - #[test] fn inject_trace_context_preserves_existing_headers() { let mut headers = async_nats::HeaderMap::new(); From 2382e2bf10d586a87d9af6d71c09fff69f9c638b Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 13 Apr 2026 15:59:56 -0400 Subject: [PATCH 09/10] fix(nats): keep deserialize logs under coverage gate Signed-off-by: Yordis Prieto --- rsworkspace/crates/trogon-nats/src/messaging.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rsworkspace/crates/trogon-nats/src/messaging.rs b/rsworkspace/crates/trogon-nats/src/messaging.rs index d241978f2..c79272890 100644 --- a/rsworkspace/crates/trogon-nats/src/messaging.rs +++ b/rsworkspace/crates/trogon-nats/src/messaging.rs @@ -85,7 +85,6 @@ where tracing::error!( error = %error, subject = %subject, - payload_len = response.payload.len(), "Failed to deserialize NATS response" ); NatsError::Deserialize(error) From 50169b8e5ef02b117231026c6862fc5a617778e5 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Mon, 13 Apr 2026 16:14:05 -0400 Subject: [PATCH 10/10] fix(trogon-std): keep http telemetry compatible Signed-off-by: Yordis Prieto --- rsworkspace/crates/trogon-std/Cargo.toml | 2 +- rsworkspace/crates/trogon-std/src/telemetry/http.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/rsworkspace/crates/trogon-std/Cargo.toml b/rsworkspace/crates/trogon-std/Cargo.toml index 6644755e7..8f14f99dc 100644 --- a/rsworkspace/crates/trogon-std/Cargo.toml +++ b/rsworkspace/crates/trogon-std/Cargo.toml @@ -32,4 +32,4 @@ tracing-opentelemetry = { workspace = true, optional = true } [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } -tower = "0.5" +tower = { version = "0.5", features = ["util"] } diff --git a/rsworkspace/crates/trogon-std/src/telemetry/http.rs b/rsworkspace/crates/trogon-std/src/telemetry/http.rs index 6b6956c4d..268af6a62 100644 --- a/rsworkspace/crates/trogon-std/src/telemetry/http.rs +++ b/rsworkspace/crates/trogon-std/src/telemetry/http.rs @@ -22,6 +22,7 @@ where .make_span_with(|request: &Request| { let span = tracing::info_span!( "http.server.request", + otel.kind = "server", method = %request.method(), path = %request.uri().path() );