diff --git a/rsworkspace/Cargo.lock b/rsworkspace/Cargo.lock index 58fa043fd..147c3687c 100644 --- a/rsworkspace/Cargo.lock +++ b/rsworkspace/Cargo.lock @@ -79,7 +79,6 @@ dependencies = [ "serde_json", "tokio", "tokio-tungstenite 0.29.0", - "tower-http", "tracing", "tracing-subscriber", "trogon-nats", @@ -1848,6 +1847,12 @@ dependencies = [ "tonic-prost", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + [[package]] name = "opentelemetry_sdk" version = "0.31.0" @@ -3477,6 +3482,7 @@ dependencies = [ "bytes", "futures", "opentelemetry", + "opentelemetry-semantic-conventions", "serde", "serde_json", "tokio", @@ -3660,10 +3666,18 @@ dependencies = [ name = "trogon-std" version = "0.1.0" dependencies = [ + "axum", "bytesize", "clap", + "opentelemetry", + "opentelemetry-semantic-conventions", "serde", "serde_json", + "tokio", + "tower", + "tower-http", + "tracing", + "tracing-opentelemetry", ] [[package]] diff --git a/rsworkspace/Cargo.toml b/rsworkspace/Cargo.toml index d2b1b14d7..08c5f0507 100644 --- a/rsworkspace/Cargo.toml +++ b/rsworkspace/Cargo.toml @@ -52,6 +52,7 @@ clap = { version = "=4.6.0", features = ["derive"] } opentelemetry = "=0.31.0" opentelemetry-appender-tracing = "=0.31.1" opentelemetry-otlp = "=0.31.1" +opentelemetry-semantic-conventions = { version = "=0.31.0", features = ["semconv_experimental"] } opentelemetry_sdk = "=0.31.0" tracing = "=0.1.44" tracing-opentelemetry = "=0.32.1" diff --git a/rsworkspace/crates/acp-nats-ws/Cargo.toml b/rsworkspace/crates/acp-nats-ws/Cargo.toml index 1f0f6baec..c5451d991 100644 --- a/rsworkspace/crates/acp-nats-ws/Cargo.toml +++ b/rsworkspace/crates/acp-nats-ws/Cargo.toml @@ -18,10 +18,9 @@ clap = { workspace = true, features = ["env"] } futures-util = { workspace = true, features = ["sink"] } opentelemetry = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal", "net", "sync", "io-util"] } -tower-http = { workspace = true } tracing = { workspace = true } trogon-nats = { workspace = true } -trogon-std = { workspace = true } +trogon-std = { workspace = true, features = ["telemetry-http"] } [dev-dependencies] serde_json = { workspace = true } diff --git a/rsworkspace/crates/acp-nats-ws/src/main.rs b/rsworkspace/crates/acp-nats-ws/src/main.rs index 888d4cd25..6faee93d7 100644 --- a/rsworkspace/crates/acp-nats-ws/src/main.rs +++ b/rsworkspace/crates/acp-nats-ws/src/main.rs @@ -9,9 +9,8 @@ use upgrade::{ConnectionRequest, UpgradeState}; #[cfg(not(coverage))] use { - acp_nats::nats, acp_telemetry::ServiceName, clap::Parser, std::net::SocketAddr, - tower_http::trace::TraceLayer, tracing::error, trogon_std::env::SystemEnv, - trogon_std::fs::SystemFs, + acp_nats::nats, acp_telemetry::ServiceName, clap::Parser, std::net::SocketAddr, tracing::error, + trogon_std::env::SystemEnv, trogon_std::fs::SystemFs, }; #[cfg(not(coverage))] @@ -47,10 +46,11 @@ async fn main() -> Result<(), Box> { shutdown_tx: shutdown_tx.clone(), }; - let app = axum::Router::new() - .route("/ws", axum::routing::get(upgrade::handle)) - .layer(TraceLayer::new_for_http()) - .with_state(state); + let app = trogon_std::telemetry::http::instrument_router( + axum::Router::new() + .route("/ws", axum::routing::get(upgrade::handle)) + .with_state(state), + ); let addr = SocketAddr::from((ws_config.host, ws_config.port)); let listener = tokio::net::TcpListener::bind(addr).await?; diff --git a/rsworkspace/crates/trogon-gateway/Cargo.toml b/rsworkspace/crates/trogon-gateway/Cargo.toml index b70dd9b43..86a3b32af 100644 --- a/rsworkspace/crates/trogon-gateway/Cargo.toml +++ b/rsworkspace/crates/trogon-gateway/Cargo.toml @@ -29,7 +29,7 @@ trogon-source-linear = { workspace = true } trogon-source-notion = { workspace = true } trogon-source-slack = { workspace = true } trogon-source-telegram = { workspace = true } -trogon-std = { workspace = true, features = ["clap"] } +trogon-std = { workspace = true, features = ["clap", "telemetry-http"] } [dev-dependencies] tempfile = { workspace = true } diff --git a/rsworkspace/crates/trogon-gateway/src/main.rs b/rsworkspace/crates/trogon-gateway/src/main.rs index 67596c01d..3f4c195fc 100644 --- a/rsworkspace/crates/trogon-gateway/src/main.rs +++ b/rsworkspace/crates/trogon-gateway/src/main.rs @@ -96,7 +96,8 @@ async fn main() -> Result<(), Box> { } } - let app = http::mount_sources(resolved, publisher); + let app = + trogon_std::telemetry::http::instrument_router(http::mount_sources(resolved, publisher)); let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = tokio::net::TcpListener::bind(addr).await?; diff --git a/rsworkspace/crates/trogon-nats/Cargo.toml b/rsworkspace/crates/trogon-nats/Cargo.toml index 8dfde1553..c1d5dd350 100644 --- a/rsworkspace/crates/trogon-nats/Cargo.toml +++ b/rsworkspace/crates/trogon-nats/Cargo.toml @@ -11,6 +11,7 @@ async-nats = { workspace = true, features = ["ring", "nkeys", "jetstream", "kv", bytes = { workspace = true } futures = { workspace = true } opentelemetry = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["time", "io-util"] } diff --git a/rsworkspace/crates/trogon-nats/src/lib.rs b/rsworkspace/crates/trogon-nats/src/lib.rs index c9151bba8..806ffa7e5 100644 --- a/rsworkspace/crates/trogon-nats/src/lib.rs +++ b/rsworkspace/crates/trogon-nats/src/lib.rs @@ -45,6 +45,7 @@ pub mod lease; pub mod messaging; pub mod nats_token; pub mod subject_token_violation; +pub(crate) mod telemetry; pub(crate) mod token; #[cfg(any(test, feature = "test-support"))] diff --git a/rsworkspace/crates/trogon-nats/src/messaging.rs b/rsworkspace/crates/trogon-nats/src/messaging.rs index fde275d2d..c79272890 100644 --- a/rsworkspace/crates/trogon-nats/src/messaging.rs +++ b/rsworkspace/crates/trogon-nats/src/messaging.rs @@ -1,9 +1,12 @@ use crate::client::{FlushClient, PublishClient, RequestClient}; +use crate::telemetry::messaging::{ + MessagingError, MessagingOperation, set_client_operation_span_attributes, set_span_error, +}; use async_nats::header::HeaderMap; use opentelemetry::propagation::Injector; use serde::{Serialize, de::DeserializeOwned}; use std::time::Duration; -use tracing::Span; +use tracing::{Span, instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::constants::{DEFAULT_TIMEOUT, REQ_ID_HEADER}; @@ -35,6 +38,7 @@ pub fn build_request_headers() -> HeaderMap { headers } +#[instrument(name = "nats.request", skip(client, request), fields(subject = %subject))] pub async fn request_with_timeout( client: &N, subject: &str, @@ -45,7 +49,13 @@ where Req: Serialize, Res: DeserializeOwned, { - let payload = serde_json::to_vec(request).map_err(NatsError::Serialize)?; + let span = Span::current(); + set_client_operation_span_attributes(&span, MessagingOperation::Request, subject); + + let payload = serde_json::to_vec(request).map_err(|error| { + set_span_error(&span, MessagingError::Serialize); + NatsError::Serialize(error) + })?; let headers = build_request_headers(); let response = tokio::time::timeout( @@ -53,20 +63,31 @@ where client.request_with_headers(subject.to_string(), headers, payload.into()), ) .await - .map_err(|_| NatsError::Timeout { - subject: subject.to_string(), + .map_err(|_| { + set_span_error(&span, MessagingError::Timeout); + NatsError::Timeout { + subject: subject.to_string(), + } })? - .map_err(|e| NatsError::Request { - subject: subject.to_string(), - error: e.to_string(), + .map_err(|error| { + set_span_error(&span, MessagingError::Request); + NatsError::Request { + subject: subject.to_string(), + error: error.to_string(), + } })?; let payload_str = String::from_utf8_lossy(&response.payload); tracing::debug!(payload = %payload_str, "Received NATS response"); - serde_json::from_slice(&response.payload).map_err(|e| { - tracing::error!(payload = %payload_str, error = %e, "Failed to deserialize NATS response"); - NatsError::Deserialize(e) + serde_json::from_slice(&response.payload).map_err(|error| { + set_span_error(&span, MessagingError::Deserialize); + tracing::error!( + error = %error, + subject = %subject, + "Failed to deserialize NATS response" + ); + NatsError::Deserialize(error) }) } @@ -246,6 +267,7 @@ impl PublishOptionsBuilder { } } +#[instrument(name = "nats.publish", skip(client, request, options), fields(subject = %subject))] pub async fn publish( client: &N, subject: &str, @@ -255,7 +277,13 @@ pub async fn publish( where Req: Serialize, { - let payload = serde_json::to_vec(request).map_err(NatsError::Serialize)?; + let span = Span::current(); + set_client_operation_span_attributes(&span, MessagingOperation::Publish, subject); + + let payload = serde_json::to_vec(request).map_err(|error| { + set_span_error(&span, MessagingError::Serialize); + NatsError::Serialize(error) + })?; let headers = headers_with_trace_context(); options @@ -274,7 +302,10 @@ where "publish", subject, ) - .await?; + .await + .inspect_err(|_error| { + set_span_error(&span, MessagingError::PublishOperation); + })?; let Some(flush_policy) = options.flush else { return Ok(()); @@ -296,6 +327,9 @@ where subject, ) .await + .inspect_err(|_error| { + set_span_error(&span, MessagingError::FlushOperation); + }) } #[derive(Debug)] @@ -388,6 +422,22 @@ mod tests { result: String, } + #[cfg(feature = "test-support")] + struct FailingSerialize; + + #[cfg(feature = "test-support")] + impl serde::Serialize for FailingSerialize { + fn serialize(&self, _serializer: S) -> Result + where + S: serde::Serializer, + { + Err(serde::ser::Error::custom(format!( + "{} cannot be serialized", + std::any::type_name::() + ))) + } + } + #[test] fn test_retry_policy_no_retries() { let policy = RetryPolicy::no_retries(); @@ -580,6 +630,17 @@ mod tests { } } + #[tokio::test] + #[cfg(feature = "test-support")] + async fn test_request_serialize_error() { + let mock = AdvancedMockNatsClient::new(); + + let result: Result = + request(&mock, "test.subject", &FailingSerialize).await; + + assert!(matches!(result, Err(NatsError::Serialize(_)))); + } + #[tokio::test] #[cfg(feature = "test-support")] async fn test_publish_simple() { @@ -594,6 +655,23 @@ mod tests { assert_eq!(mock.published_messages(), vec!["test.subject"]); } + #[tokio::test] + #[cfg(feature = "test-support")] + async fn test_publish_serialize_error() { + let mock = AdvancedMockNatsClient::new(); + + let result = publish( + &mock, + "test.subject", + &FailingSerialize, + PublishOptions::simple(), + ) + .await; + + assert!(matches!(result, Err(NatsError::Serialize(_)))); + assert!(mock.published_messages().is_empty()); + } + #[tokio::test] #[cfg(feature = "test-support")] async fn test_publish_with_flush() { @@ -612,6 +690,39 @@ mod tests { assert_eq!(mock.published_messages(), vec!["test.subject"]); } + #[tokio::test] + #[cfg(feature = "test-support")] + async fn test_publish_returns_error_when_publish_fails() { + let mock = AdvancedMockNatsClient::new(); + mock.fail_next_publish(); + let data = TestRequest { + message: "test".to_string(), + }; + + let result = publish(&mock, "test.subject", &data, PublishOptions::simple()).await; + + assert!(matches!(result, Err(NatsError::PublishOperation(_)))); + assert!(mock.published_messages().is_empty()); + } + + #[tokio::test] + #[cfg(feature = "test-support")] + async fn test_publish_returns_error_when_flush_fails() { + let mock = AdvancedMockNatsClient::new(); + mock.fail_next_flush(); + let data = TestRequest { + message: "test".to_string(), + }; + let options = PublishOptions::builder() + .flush_policy(FlushPolicy::no_retries()) + .build(); + + let result = publish(&mock, "test.subject", &data, options).await; + + assert!(matches!(result, Err(NatsError::PublishOperation(_)))); + assert_eq!(mock.published_messages(), vec!["test.subject"]); + } + #[test] fn test_publish_operation_error_display() { let err = PublishOperationError("test error".to_string()); diff --git a/rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs b/rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs new file mode 100644 index 000000000..49c443893 --- /dev/null +++ b/rsworkspace/crates/trogon-nats/src/telemetry/messaging.rs @@ -0,0 +1,163 @@ +use opentelemetry::KeyValue; +use opentelemetry_semantic_conventions::{attribute as attr_semconv, trace as trace_semconv}; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum MessagingError { + Deserialize, + FlushOperation, + PublishOperation, + Request, + Serialize, + Timeout, +} + +impl MessagingError { + const fn as_str(self) -> &'static str { + match self { + Self::Deserialize => "deserialize", + Self::FlushOperation => "flush_operation", + Self::PublishOperation => "publish_operation", + Self::Request => "request", + Self::Serialize => "serialize", + Self::Timeout => "timeout", + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum MessagingOperation { + Publish, + Request, +} + +impl MessagingOperation { + const fn name(self) -> &'static str { + match self { + Self::Publish => "publish", + Self::Request => "request", + } + } + + const fn operation_type(self) -> &'static str { + "send" + } +} + +pub(crate) fn client_operation_attributes( + operation: MessagingOperation, + destination_name: &str, +) -> [KeyValue; 4] { + [ + KeyValue::new(attr_semconv::MESSAGING_SYSTEM, "nats"), + KeyValue::new( + attr_semconv::MESSAGING_DESTINATION_NAME, + destination_name.to_owned(), + ), + KeyValue::new(attr_semconv::MESSAGING_OPERATION_NAME, operation.name()), + KeyValue::new( + attr_semconv::MESSAGING_OPERATION_TYPE, + operation.operation_type(), + ), + ] +} + +pub(crate) fn error_attribute(error: MessagingError) -> KeyValue { + KeyValue::new(trace_semconv::ERROR_TYPE, error.as_str()) +} + +pub(crate) fn set_client_operation_span_attributes( + span: &Span, + operation: MessagingOperation, + destination_name: &str, +) { + for attribute in client_operation_attributes(operation, destination_name) { + span.set_attribute(attribute.key, attribute.value); + } +} + +pub(crate) fn set_span_error(span: &Span, error: MessagingError) { + let attribute = error_attribute(error); + span.set_attribute(attribute.key, attribute.value); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn value_for<'a>(attributes: &'a [KeyValue], key: &str) -> Option<&'a opentelemetry::Value> { + attributes + .iter() + .find(|attribute| attribute.key.as_str() == key) + .map(|attribute| &attribute.value) + } + + #[test] + fn client_operation_attributes_follow_messaging_semantic_conventions() { + let attributes = + client_operation_attributes(MessagingOperation::Request, "acp.session.new"); + + assert_eq!( + value_for(&attributes, attr_semconv::MESSAGING_SYSTEM) + .unwrap() + .as_str() + .as_ref(), + "nats" + ); + assert_eq!( + value_for(&attributes, attr_semconv::MESSAGING_DESTINATION_NAME) + .unwrap() + .as_str() + .as_ref(), + "acp.session.new" + ); + assert_eq!( + value_for(&attributes, attr_semconv::MESSAGING_OPERATION_NAME) + .unwrap() + .as_str() + .as_ref(), + "request" + ); + assert_eq!( + value_for(&attributes, attr_semconv::MESSAGING_OPERATION_TYPE) + .unwrap() + .as_str() + .as_ref(), + "send" + ); + } + + #[test] + fn error_attribute_uses_semantic_error_type_key() { + let attribute = error_attribute(MessagingError::Timeout); + + assert_eq!(attribute.key.as_str(), trace_semconv::ERROR_TYPE); + assert_eq!(attribute.value.as_str().as_ref(), "timeout"); + } + + #[test] + fn error_attribute_covers_all_semantic_error_variants() { + assert_eq!( + error_attribute(MessagingError::FlushOperation) + .value + .as_str() + .as_ref(), + "flush_operation" + ); + assert_eq!( + error_attribute(MessagingError::PublishOperation) + .value + .as_str() + .as_ref(), + "publish_operation" + ); + assert_eq!( + error_attribute(MessagingError::Serialize) + .value + .as_str() + .as_ref(), + "serialize" + ); + } +} diff --git a/rsworkspace/crates/trogon-nats/src/telemetry/mod.rs b/rsworkspace/crates/trogon-nats/src/telemetry/mod.rs new file mode 100644 index 000000000..3ef8c6ace --- /dev/null +++ b/rsworkspace/crates/trogon-nats/src/telemetry/mod.rs @@ -0,0 +1 @@ +pub(crate) mod messaging; diff --git a/rsworkspace/crates/trogon-std/Cargo.toml b/rsworkspace/crates/trogon-std/Cargo.toml index a4cf630aa..8f14f99dc 100644 --- a/rsworkspace/crates/trogon-std/Cargo.toml +++ b/rsworkspace/crates/trogon-std/Cargo.toml @@ -9,11 +9,27 @@ workspace = true [features] test-support = [] clap = ["dep:clap"] +telemetry-http = [ + "dep:axum", + "dep:opentelemetry", + "dep:opentelemetry-semantic-conventions", + "dep:tower-http", + "dep:tracing", + "dep:tracing-opentelemetry", +] [dependencies] +axum = { workspace = true, optional = true } bytesize = "2.3.1" clap = { workspace = true, optional = true } +opentelemetry = { workspace = true, optional = true } +opentelemetry-semantic-conventions = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } +tower-http = { workspace = true, optional = true } +tracing = { workspace = true, optional = true } +tracing-opentelemetry = { workspace = true, optional = true } [dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tower = { version = "0.5", features = ["util"] } diff --git a/rsworkspace/crates/trogon-std/src/lib.rs b/rsworkspace/crates/trogon-std/src/lib.rs index 93f0c4fee..ddf191a8f 100644 --- a/rsworkspace/crates/trogon-std/src/lib.rs +++ b/rsworkspace/crates/trogon-std/src/lib.rs @@ -38,6 +38,8 @@ pub mod fs; pub mod http; pub mod json; pub mod secret_string; +#[cfg(feature = "telemetry-http")] +pub mod telemetry; pub mod time; #[cfg(all(feature = "clap", not(coverage)))] diff --git a/rsworkspace/crates/trogon-std/src/telemetry/http.rs b/rsworkspace/crates/trogon-std/src/telemetry/http.rs new file mode 100644 index 000000000..268af6a62 --- /dev/null +++ b/rsworkspace/crates/trogon-std/src/telemetry/http.rs @@ -0,0 +1,337 @@ +use axum::{ + Router, + body::Body, + http::{ + Request, StatusCode, Version, + header::{HOST, USER_AGENT}, + uri::Authority, + }, +}; +use opentelemetry::KeyValue; +use opentelemetry_semantic_conventions::trace as semconv; +use tower_http::trace::TraceLayer; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +pub fn instrument_router(router: Router) -> Router +where + S: Clone + Send + Sync + 'static, +{ + router.layer( + TraceLayer::new_for_http() + .make_span_with(|request: &Request| { + let span = tracing::info_span!( + "http.server.request", + otel.kind = "server", + method = %request.method(), + path = %request.uri().path() + ); + set_server_request_span_attributes(&span, request); + span + }) + .on_response( + |response: &axum::http::Response, + _latency: std::time::Duration, + span: &tracing::Span| { + set_server_response_span_attributes(span, response.status()); + }, + ), + ) +} + +pub fn server_request_attributes(request: &Request) -> Vec { + let mut attributes = vec![ + KeyValue::new( + semconv::HTTP_REQUEST_METHOD, + request.method().as_str().to_owned(), + ), + KeyValue::new(semconv::URL_PATH, request.uri().path().to_owned()), + ]; + + if let Some(protocol_version) = protocol_version(request.version()) { + attributes.push(KeyValue::new( + semconv::NETWORK_PROTOCOL_VERSION, + protocol_version, + )); + } + + if let Some(user_agent) = request + .headers() + .get(USER_AGENT) + .and_then(|value| value.to_str().ok()) + { + attributes.push(KeyValue::new( + semconv::USER_AGENT_ORIGINAL, + user_agent.to_owned(), + )); + } + + if let Some(authority) = request + .headers() + .get(HOST) + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.parse::().ok()) + { + attributes.push(KeyValue::new( + semconv::SERVER_ADDRESS, + authority.host().to_owned(), + )); + if let Some(port) = authority.port_u16() { + attributes.push(KeyValue::new(semconv::SERVER_PORT, i64::from(port))); + } + } + + attributes +} + +pub fn set_server_request_span_attributes(span: &Span, request: &Request) { + for attribute in server_request_attributes(request) { + span.set_attribute(attribute.key, attribute.value); + } +} + +pub fn server_response_attributes(status: StatusCode) -> [KeyValue; 1] { + [KeyValue::new( + semconv::HTTP_RESPONSE_STATUS_CODE, + i64::from(status.as_u16()), + )] +} + +pub fn set_server_response_span_attributes(span: &Span, status: StatusCode) { + for attribute in server_response_attributes(status) { + span.set_attribute(attribute.key, attribute.value); + } +} + +#[rustfmt::skip] +fn protocol_version(version: Version) -> Option<&'static str> { + Some(if version == Version::HTTP_09 { "0.9" } else if version == Version::HTTP_10 { "1.0" } else if version == Version::HTTP_11 { "1.1" } else if version == Version::HTTP_2 { "2" } else if version == Version::HTTP_3 { "3" } else { return None; }) +} + +#[cfg(test)] +mod tests { + use super::*; + use tower::ServiceExt; + + fn value_for<'a>(attributes: &'a [KeyValue], key: &str) -> Option<&'a opentelemetry::Value> { + attributes + .iter() + .find(|attribute| attribute.key.as_str() == key) + .map(|attribute| &attribute.value) + } + + #[tokio::test] + async fn instrument_router_executes_trace_callbacks() { + let app = instrument_router(Router::<()>::new().route( + "/-/liveness", + axum::routing::get(|| async { StatusCode::OK }), + )); + + let response = app + .oneshot( + Request::builder() + .uri("/-/liveness") + .version(Version::HTTP_11) + .header(HOST, "gateway.test:8443") + .header(USER_AGENT, "curl/8.7.1") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } + + #[test] + fn server_request_attributes_follow_http_semantic_conventions() { + let request = Request::builder() + .method("POST") + .uri("/github/webhook") + .version(Version::HTTP_11) + .header(HOST, "gateway.test:8443") + .header(USER_AGENT, "curl/8.7.1") + .body(()) + .unwrap(); + + let attributes = server_request_attributes(&request); + + assert_eq!( + value_for(&attributes, semconv::HTTP_REQUEST_METHOD) + .unwrap() + .as_str() + .as_ref(), + "POST" + ); + assert_eq!( + value_for(&attributes, semconv::URL_PATH) + .unwrap() + .as_str() + .as_ref(), + "/github/webhook" + ); + assert_eq!( + value_for(&attributes, semconv::NETWORK_PROTOCOL_VERSION) + .unwrap() + .as_str() + .as_ref(), + "1.1" + ); + assert_eq!( + value_for(&attributes, semconv::USER_AGENT_ORIGINAL) + .unwrap() + .as_str() + .as_ref(), + "curl/8.7.1" + ); + assert_eq!( + value_for(&attributes, semconv::SERVER_ADDRESS) + .unwrap() + .as_str() + .as_ref(), + "gateway.test" + ); + assert_eq!( + value_for(&attributes, semconv::SERVER_PORT) + .unwrap() + .as_str() + .as_ref(), + "8443" + ); + } + + #[test] + fn server_request_attributes_skip_optional_headers_when_missing() { + let request = Request::builder() + .method("GET") + .uri("/-/liveness") + .version(Version::HTTP_2) + .body(()) + .unwrap(); + + let attributes = server_request_attributes(&request); + + assert_eq!( + value_for(&attributes, semconv::HTTP_REQUEST_METHOD) + .unwrap() + .as_str() + .as_ref(), + "GET" + ); + assert_eq!( + value_for(&attributes, semconv::URL_PATH) + .unwrap() + .as_str() + .as_ref(), + "/-/liveness" + ); + assert_eq!( + value_for(&attributes, semconv::NETWORK_PROTOCOL_VERSION) + .unwrap() + .as_str() + .as_ref(), + "2" + ); + assert!(value_for(&attributes, semconv::USER_AGENT_ORIGINAL).is_none()); + assert!(value_for(&attributes, semconv::SERVER_ADDRESS).is_none()); + assert!(value_for(&attributes, semconv::SERVER_PORT).is_none()); + } + + #[test] + fn server_request_attributes_cover_other_protocol_versions() { + let http_09_request = Request::builder() + .method("GET") + .uri("/legacy") + .version(Version::HTTP_09) + .body(()) + .unwrap(); + let http_10_request = Request::builder() + .method("GET") + .uri("/legacy") + .version(Version::HTTP_10) + .body(()) + .unwrap(); + let http_3_request = Request::builder() + .method("GET") + .uri("/modern") + .version(Version::HTTP_3) + .body(()) + .unwrap(); + + assert_eq!( + value_for( + &server_request_attributes(&http_09_request), + semconv::NETWORK_PROTOCOL_VERSION, + ) + .unwrap() + .as_str() + .as_ref(), + "0.9" + ); + assert_eq!( + value_for( + &server_request_attributes(&http_10_request), + semconv::NETWORK_PROTOCOL_VERSION, + ) + .unwrap() + .as_str() + .as_ref(), + "1.0" + ); + assert_eq!( + value_for( + &server_request_attributes(&http_3_request), + semconv::NETWORK_PROTOCOL_VERSION, + ) + .unwrap() + .as_str() + .as_ref(), + "3" + ); + } + + #[test] + fn server_request_attributes_ignore_invalid_host_header() { + let request = Request::builder() + .method("GET") + .uri("/-/liveness") + .version(Version::HTTP_11) + .header(HOST, "invalid host") + .body(()) + .unwrap(); + + let attributes = server_request_attributes(&request); + + assert!(value_for(&attributes, semconv::SERVER_ADDRESS).is_none()); + assert!(value_for(&attributes, semconv::SERVER_PORT).is_none()); + } + + #[test] + fn direct_span_attribute_helpers_do_not_panic() { + let span = tracing::info_span!("http.server.request.test"); + let _guard = span.enter(); + let request = Request::builder() + .method("GET") + .uri("/-/ready") + .version(Version::HTTP_11) + .header(HOST, "gateway.test") + .body(()) + .unwrap(); + + set_server_request_span_attributes(&Span::current(), &request); + set_server_response_span_attributes(&Span::current(), StatusCode::NO_CONTENT); + } + + #[test] + fn server_response_attributes_follow_http_semantic_conventions() { + let attributes = server_response_attributes(StatusCode::CREATED); + + assert_eq!( + value_for(&attributes, semconv::HTTP_RESPONSE_STATUS_CODE) + .unwrap() + .as_str() + .as_ref(), + "201" + ); + } +} diff --git a/rsworkspace/crates/trogon-std/src/telemetry/mod.rs b/rsworkspace/crates/trogon-std/src/telemetry/mod.rs new file mode 100644 index 000000000..3883215fc --- /dev/null +++ b/rsworkspace/crates/trogon-std/src/telemetry/mod.rs @@ -0,0 +1 @@ +pub mod http;