Skip to content

Commit 9376329

Browse files
committed
fix: OTel trace propagation, /readyz health wiring, description update
OTel trace context (W3C traceparent) auto-propagated through gRPC (PushRequest metadata), Kafka (message headers), and HTTP (request headers). All gated behind #[cfg(feature = "otel")] — zero overhead when disabled. Shared propagation helpers in transport/propagation.rs. /readyz endpoint now aggregates from HealthRegistry when health feature is enabled. /health/detailed returns per-component JSON status. MetricsManager readiness also checks HealthRegistry. Fix test ordering issue with HealthRegistry global state. Update crate description. 671 tests pass.
1 parent 0e18ca9 commit 9376329

8 files changed

Lines changed: 290 additions & 10 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ name = "hyperi-rustlib"
1111
version = "1.17.0-dev.16"
1212
edition = "2024"
1313
rust-version = "1.94"
14-
description = "Shared utility library for HyperI Rust applications"
14+
description = "Opinionated Rust framework for high-throughput data pipelines at PB scale. Auto-wiring config, logging, metrics, tracing, health, and graceful shutdown — built from many years of production infrastructure experience."
1515
license = "FSL-1.1-ALv2"
1616
repository = "https://github.com/hyperi-io/hyperi-rustlib"
1717
publish = true

src/http_server/server.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ impl HttpServer {
196196
);
197197
}
198198

199+
#[cfg(all(feature = "health", feature = "serde_json"))]
200+
if self.config.enable_health_endpoints {
201+
router = router.route("/health/detailed", get(health_detailed));
202+
}
203+
199204
#[cfg(feature = "config")]
200205
if self.config.enable_config_endpoint {
201206
router = router.route("/config", get(config_dump));
@@ -240,14 +245,35 @@ async fn health_live() -> impl IntoResponse {
240245
}
241246

242247
/// Readiness endpoint handler.
248+
///
249+
/// Checks the local ready flag AND (when the `health` feature is enabled)
250+
/// the global [`HealthRegistry`](crate::health::HealthRegistry). Both must
251+
/// be true for a 200 response; otherwise 503.
243252
async fn health_ready(ready: Arc<AtomicBool>) -> impl IntoResponse {
244-
if ready.load(Ordering::SeqCst) {
253+
let locally_ready = ready.load(Ordering::SeqCst);
254+
255+
#[cfg(feature = "health")]
256+
let registry_ready = crate::health::HealthRegistry::is_ready();
257+
#[cfg(not(feature = "health"))]
258+
let registry_ready = true;
259+
260+
if locally_ready && registry_ready {
245261
(StatusCode::OK, "OK")
246262
} else {
247263
(StatusCode::SERVICE_UNAVAILABLE, "NOT READY")
248264
}
249265
}
250266

267+
/// Detailed health endpoint returning per-component status as JSON.
268+
///
269+
/// Returns the output of [`HealthRegistry::to_json()`](crate::health::HealthRegistry::to_json),
270+
/// which includes overall status and each registered component's state.
271+
#[cfg(all(feature = "health", feature = "serde_json"))]
272+
async fn health_detailed() -> impl IntoResponse {
273+
let json = crate::health::HealthRegistry::to_json();
274+
axum::Json(json)
275+
}
276+
251277
/// Config registry dump endpoint handler (redacted).
252278
#[cfg(feature = "config")]
253279
async fn config_dump() -> impl IntoResponse {
@@ -332,6 +358,9 @@ mod tests {
332358

333359
#[tokio::test]
334360
async fn test_health_ready_when_ready() {
361+
#[cfg(feature = "health")]
362+
crate::health::HealthRegistry::reset();
363+
335364
let config = HttpServerConfig::default();
336365
let server = HttpServer::new(config);
337366
server.set_ready(true);

src/metrics/mod.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,14 @@ async fn handle_connection(
759759
} else if request_line.starts_with("GET /readyz")
760760
|| request_line.starts_with("GET /health/ready")
761761
{
762-
let ready = readiness_fn.as_ref().is_none_or(|f| f());
762+
let callback_ready = readiness_fn.as_ref().is_none_or(|f| f());
763+
764+
#[cfg(feature = "health")]
765+
let registry_ready = crate::health::HealthRegistry::is_ready();
766+
#[cfg(not(feature = "health"))]
767+
let registry_ready = true;
768+
769+
let ready = callback_ready && registry_ready;
763770
if ready {
764771
("200 OK", r#"{"status":"ready"}"#.to_string())
765772
} else {
@@ -787,11 +794,22 @@ async fn handle_connection(
787794
}
788795

789796
/// Readiness response helper for axum endpoints.
797+
///
798+
/// Checks the caller-supplied readiness callback AND (when the `health`
799+
/// feature is enabled) the global [`HealthRegistry`](crate::health::HealthRegistry).
800+
/// Both must be true for a 200 response.
790801
#[cfg(all(feature = "metrics", feature = "http-server"))]
791802
fn readiness_response(rf: Option<ReadinessFn>) -> axum::response::Response {
792803
use axum::response::IntoResponse;
793804

794-
let ready = rf.as_ref().is_none_or(|f| f());
805+
let callback_ready = rf.as_ref().is_none_or(|f| f());
806+
807+
#[cfg(feature = "health")]
808+
let registry_ready = crate::health::HealthRegistry::is_ready();
809+
#[cfg(not(feature = "health"))]
810+
let registry_ready = true;
811+
812+
let ready = callback_ready && registry_ready;
795813
if ready {
796814
(
797815
[(axum::http::header::CONTENT_TYPE, "application/json")],

src/transport/grpc/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,12 @@ impl TransportSender for GrpcTransport {
245245
metadata.insert("topic".to_string(), key.to_string());
246246
}
247247

248+
// Inject W3C traceparent into gRPC metadata for distributed tracing
249+
#[cfg(feature = "otel")]
250+
if let Some(tp) = super::propagation::current_traceparent() {
251+
metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
252+
}
253+
248254
let request = proto::PushRequest {
249255
payload: payload.to_vec(),
250256
format: proto::Format::Auto.into(),
@@ -392,6 +398,14 @@ impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
392398
let req = request.into_inner();
393399
let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
394400

401+
// Extract W3C traceparent from incoming gRPC metadata for distributed tracing
402+
#[cfg(feature = "otel")]
403+
if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
404+
&& super::propagation::is_valid_traceparent(tp)
405+
{
406+
tracing::Span::current().record("traceparent", tp.as_str());
407+
}
408+
395409
let format = PayloadFormat::detect(&req.payload);
396410
let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
397411

src/transport/http.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -307,12 +307,27 @@ struct ReceiverState {
307307
async fn ingest_handler(
308308
axum::extract::State(state): axum::extract::State<ReceiverState>,
309309
axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
310+
headers: axum::http::HeaderMap,
310311
body: axum::body::Bytes,
311312
) -> axum::http::StatusCode {
312313
if body.is_empty() {
313314
return axum::http::StatusCode::BAD_REQUEST;
314315
}
315316

317+
// Extract W3C traceparent from incoming HTTP headers for distributed tracing
318+
#[cfg(feature = "otel")]
319+
if let Some(tp) = headers
320+
.get(super::propagation::TRACEPARENT_HEADER)
321+
.and_then(|v| v.to_str().ok())
322+
&& super::propagation::is_valid_traceparent(tp)
323+
{
324+
tracing::Span::current().record("traceparent", tp);
325+
}
326+
327+
// Suppress unused variable warning when otel feature is disabled
328+
#[cfg(not(feature = "otel"))]
329+
let _ = &headers;
330+
316331
let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
317332
let format = PayloadFormat::detect(&body);
318333
let timestamp_ms = chrono::Utc::now().timestamp_millis();
@@ -384,14 +399,20 @@ impl TransportSender for HttpTransport {
384399
#[cfg(feature = "metrics")]
385400
let start = std::time::Instant::now();
386401

387-
let result = match self
402+
// Build request with optional W3C traceparent header for distributed tracing
403+
let request_builder = self
388404
.client
389405
.post(&url)
390-
.header("content-type", "application/octet-stream")
391-
.body(payload.to_vec())
392-
.send()
393-
.await
394-
{
406+
.header("content-type", "application/octet-stream");
407+
408+
#[cfg(feature = "otel")]
409+
let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
410+
request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
411+
} else {
412+
request_builder
413+
};
414+
415+
let result = match request_builder.body(payload.to_vec()).send().await {
395416
Ok(resp) if resp.status().is_success() => {
396417
#[cfg(feature = "logger")]
397418
tracing::debug!(url = %url, bytes = payload.len(), "HTTP transport: POST sent");

src/transport/kafka/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,18 @@ impl TransportSender for KafkaTransport {
293293

294294
let record: FutureRecord<'_, str, [u8]> = FutureRecord::to(key).payload(payload);
295295

296+
// Inject W3C traceparent into Kafka message headers for distributed tracing
297+
#[cfg(feature = "otel")]
298+
let record = if let Some(tp) = super::propagation::current_traceparent() {
299+
let headers = rdkafka::message::OwnedHeaders::new().insert(rdkafka::message::Header {
300+
key: super::propagation::TRACEPARENT_HEADER,
301+
value: Some(tp.as_str()),
302+
});
303+
record.headers(headers)
304+
} else {
305+
record
306+
};
307+
296308
#[cfg(feature = "metrics")]
297309
let start = std::time::Instant::now();
298310

@@ -371,6 +383,26 @@ impl TransportReceiver for KafkaTransport {
371383
if let Some(result) = self.consumer.poll(timeout) {
372384
match result {
373385
Ok(msg) => {
386+
// Extract W3C traceparent from Kafka headers (first message only,
387+
// to associate the batch span with the upstream trace)
388+
#[cfg(feature = "otel")]
389+
if let Some(headers) = msg.headers() {
390+
use rdkafka::message::Headers;
391+
for idx in 0..headers.count() {
392+
if let Some(Ok(header)) = headers.try_get_as::<[u8]>(idx)
393+
&& header.key == super::propagation::TRACEPARENT_HEADER
394+
{
395+
if let Some(value) = header.value
396+
&& let Ok(tp) = std::str::from_utf8(value)
397+
&& super::propagation::is_valid_traceparent(tp)
398+
{
399+
tracing::Span::current().record("traceparent", tp);
400+
}
401+
break;
402+
}
403+
}
404+
}
405+
374406
let topic_str = msg.topic();
375407
let topic: Arc<str> = get_or_insert_topic(&mut local_cache, topic_str);
376408
let payload = msg.payload().map_or_else(Vec::new, |p| p.to_vec());

src/transport/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ mod detect;
5353
mod error;
5454
pub mod factory;
5555
mod payload;
56+
pub mod propagation;
5657
mod traits;
5758
mod types;
5859

0 commit comments

Comments
 (0)