diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index af1edc75a25..c816e9a492b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7898,6 +7898,8 @@ dependencies = [ "itertools 0.14.0", "mime_guess", "mockall", + "opentelemetry", + "opentelemetry_sdk", "percent-encoding", "pprof", "prost 0.14.3", @@ -7942,6 +7944,8 @@ dependencies = [ "tower 0.5.3", "tower-http", "tracing", + "tracing-opentelemetry", + "tracing-subscriber", "utoipa", "warp", "zstd", @@ -10734,6 +10738,7 @@ dependencies = [ "tower 0.5.3", "tower-layer", "tower-service", + "tracing", ] [[package]] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 95cfbb5eac2..1211e0d598a 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -285,6 +285,7 @@ tower-http = { version = "0.6", features = [ "compression-gzip", "compression-zstd", "cors", + "trace", ] } tracing = "0.1" tracing-opentelemetry = "0.32" diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index ba028ce71d5..4a8e05a2d3d 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -29,6 +29,7 @@ humantime = { workspace = true } hyper-util = {workspace = true} itertools = { workspace = true } mime_guess = { workspace = true } +opentelemetry = { workspace = true } percent-encoding = { workspace = true } pprof = { workspace = true, optional = true } prost = { workspace = true } @@ -52,6 +53,7 @@ tonic-reflection = { workspace = true } tower = { workspace = true, features = ["limit"] } tower-http = { workspace = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } utoipa = { workspace = true } warp = { workspace = true, features = ["server"] } zstd = { workspace = true } @@ -84,10 +86,14 @@ assert-json-diff = { workspace = true } http = { workspace = true } itertools = { workspace = true } mockall = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } +tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true } quickwit-actors = { workspace = true, features = ["testsuite"] } quickwit-cluster = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index c8e2e2f24e2..e0551b44d4b 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -35,6 +35,7 @@ mod openapi; mod otlp_api; mod rate_modulator; mod rest; +mod rest_api_request_span; mod rest_api_response; mod search_api; pub(crate) mod simple_list; diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 3f193783b04..ac9cbd727e0 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -30,6 +30,7 @@ use tower::ServiceBuilder; use tower_http::compression::CompressionLayer; use tower_http::compression::predicate::{NotForContentType, Predicate, SizeAbove}; use tower_http::cors::{AllowOrigin, CorsLayer}; +use tower_http::trace::TraceLayer; use tracing::{error, info}; use warp::filters::log::Info; use warp::hyper::http::HeaderValue; @@ -49,6 +50,7 @@ use crate::jaeger_api::jaeger_api_handlers; use crate::metrics_api::metrics_handler; use crate::node_info_handler::node_info_handler; use crate::otlp_api::otlp_ingest_api_handlers; +use crate::rest_api_request_span::{make_http_request_span, set_status_code_on_request_span}; use crate::rest_api_response::{RestApiError, RestApiResponse}; use crate::search_api::{ search_get_handler, search_plan_get_handler, search_plan_post_handler, search_post_handler, @@ -208,7 +210,15 @@ pub(crate) async fn start_rest_server( let compression_predicate = CompressionPredicate::from_env().and(NotForContentType::IMAGES); let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins); + let trace_layer = TraceLayer::new_for_http() + .make_span_with(make_http_request_span as fn(&http::Request<_>) -> tracing::Span) + .on_response( + set_status_code_on_request_span + as fn(&http::Response<_>, std::time::Duration, &tracing::Span), + ); + let service = ServiceBuilder::new() + .layer(trace_layer) .layer( CompressionLayer::new() .zstd(true) diff --git a/quickwit/quickwit-serve/src/rest_api_request_span.rs b/quickwit/quickwit-serve/src/rest_api_request_span.rs new file mode 100644 index 00000000000..f9f2ce7ccd1 --- /dev/null +++ b/quickwit/quickwit-serve/src/rest_api_request_span.rs @@ -0,0 +1,104 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ::opentelemetry::global; +use ::opentelemetry::propagation::Extractor; +use tracing::Level; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +/// `HeaderMap` extracts OpenTelemetry tracing keys from HTTP headers. +struct HeaderMap<'a>(&'a http::HeaderMap); + +impl Extractor for HeaderMap<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(|key| key.as_str()).collect() + } +} + +/// Extracts an OpenTelemetry context from HTTP [`http::HeaderMap`]. +fn extract_context_from_request_headers(headers: &http::HeaderMap) -> ::opentelemetry::Context { + global::get_text_map_propagator(|prop| prop.extract(&HeaderMap(headers))) +} + +/// Create a new OpenTelemetry [`Span`] for an incoming HTTP request. +pub(crate) fn make_http_request_span(request: &http::Request) -> tracing::Span { + let span = tracing::span!( + Level::INFO, + "http_request", + otel.kind = "Server", + http.request.method = %request.method(), + url.path = %request.uri().path() + ); + if let Some(scheme) = request.uri().scheme_str() { + span.set_attribute("url.scheme", scheme.to_string()); + }; + if let Some(query) = request.uri().query() { + span.set_attribute("url.query", query.to_string()); + }; + // Best effort attempt to extract the server address from the incoming HTTP request + // See: https://opentelemetry.io/docs/specs/semconv/http/http-spans/#setting-serveraddress-and-serverport-attributes + if let Some(server_address) = request.uri().authority() { + span.set_attribute("server.address", server_address.host().to_string()); + if let Some(port) = server_address.port_u16() { + span.set_attribute("server.port", port as i64); + } + } + if let Some(user_agent) = request.headers().get(http::header::USER_AGENT) + && let Ok(user_agent_str) = user_agent.to_str() + { + span.set_attribute("user_agent.original", user_agent_str.to_string()); + }; + let ctx = extract_context_from_request_headers(request.headers()); + let _ = span.set_parent(ctx); + span +} + +pub(crate) fn set_status_code_on_request_span( + response: &http::Response, + _latency: std::time::Duration, + span: &tracing::Span, +) { + span.set_attribute( + "http.response.status_code", + response.status().as_u16() as i64, + ); +} + +#[cfg(test)] +mod tests { + use warp::hyper::{Method, http}; + + use super::*; + + #[test] + fn test_make_http_request_span() { + let _guard = tracing::subscriber::set_default( + tracing_subscriber::fmt() + .with_max_level(Level::INFO) + .finish(), + ); + let request = http::Request::builder() + .method(Method::POST) + .uri("http://quickwit:7280/api/v1/_elastic/otel-traces-v0_9/_search") + .header(http::header::USER_AGENT, "test-agent/1.0") + .body(r#"{"query":{"match_all":{}},"size":1}"#) + .unwrap(); + let span = make_http_request_span(&request); + assert!(!span.is_disabled()); + } +}