Skip to content

Commit 21efaa2

Browse files
committed
fix: add health registry, shutdown manager, and wire all modules
HealthRegistry: global singleton with HealthStatus (Healthy/Degraded/ Unhealthy). Modules register health check closures at construction. is_healthy(), is_ready(), to_json() for aggregated status. Shutdown: global CancellationToken via OnceLock. install_signal_handler() listens for SIGTERM/SIGINT. Modules listen on token.cancelled(). Wiring: - KafkaTransport, GrpcTransport: register with health, Arc<AtomicBool> - CircuitBreaker: register with health, maps Open->Unhealthy - HttpServer: register with health, use global shutdown token - TieredSink drainer: listen on global shutdown token - ConfigReloader: listen on global shutdown token Logging + config cascade added to all new transports (file, pipe, HTTP, Redis). 663 tests pass.
1 parent 67ca272 commit 21efaa2

15 files changed

Lines changed: 903 additions & 24 deletions

File tree

Cargo.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ categories = ["development-tools"]
2020
exclude = [".claude/", ".github/", "ci/", "ai/", "docs/", "examples/", "benches/", "scripts/"]
2121

2222
[features]
23-
default = ["config", "logger", "metrics", "runtime"]
23+
default = ["config", "logger", "metrics", "runtime", "shutdown", "health"]
2424

2525
# Core features
26+
health = []
27+
shutdown = ["tokio", "tokio-util"]
2628
runtime = ["dirs"]
2729
config = ["figment", "dotenvy", "serde_yaml_ng", "serde_json", "toml", "dirs", "tracing"]
2830
logger = ["tracing", "tracing-subscriber", "owo-colors", "serde_json", "tracing-throttle"]
@@ -104,7 +106,7 @@ secrets-aws = ["secrets", "aws-config", "aws-sdk-secretsmanager"]
104106
secrets-all = ["secrets-vault", "secrets-aws"]
105107

106108
# Full feature set
107-
full = ["config", "config-reload", "logger", "metrics", "metrics-dfe", "otel", "otel-metrics", "runtime", "http", "http-server", "spool", "tiered-sink", "resilience", "database", "cache", "transport-all", "transport-grpc-vector-compat", "secrets-all", "directory-config", "directory-config-git", "deployment", "version-check", "scaling", "memory", "cli", "io", "dlq", "dlq-kafka", "output-file", "expression"]
109+
full = ["config", "config-reload", "logger", "metrics", "metrics-dfe", "otel", "otel-metrics", "runtime", "shutdown", "health", "http", "http-server", "spool", "tiered-sink", "resilience", "database", "cache", "transport-all", "transport-grpc-vector-compat", "secrets-all", "directory-config", "directory-config-git", "deployment", "version-check", "scaling", "memory", "cli", "io", "dlq", "dlq-kafka", "output-file", "expression"]
108110

109111
[dependencies]
110112
# Serialisation (always needed)
@@ -158,8 +160,9 @@ metrics-util = { version = ">=0.20.1, <0.21", optional = true }
158160
metrics-exporter-opentelemetry = { version = ">=0.2.1, <0.3", optional = true }
159161
sysinfo = { version = ">=0.38.0, <0.39", optional = true }
160162

161-
# Async runtime (for metrics server, http-server)
163+
# Async runtime (for metrics server, http-server, shutdown)
162164
tokio = { version = ">=1.50.0, <2", features = ["rt-multi-thread", "net", "sync", "time", "macros", "signal", "fs", "io-std", "io-util"], optional = true }
165+
tokio-util = { version = ">=0.7.14, <0.8", optional = true }
163166

164167
# HTTP client — pinned to reqwest 0.12 until vaultrs and opentelemetry-otlp
165168
# support 0.13. reqwest-middleware 0.4 and reqwest-retry 0.7 target 0.12.

src/config/reloader.rs

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,9 @@ impl<T: Clone + Send + Sync + 'static> ConfigReloader<T> {
277277

278278
/// Main reload loop — waits for any trigger, then attempts reload.
279279
async fn run_loop(self) {
280+
#[cfg(feature = "shutdown")]
281+
let shutdown_token = crate::shutdown::token();
282+
280283
// File polling state
281284
let mut last_modified: Option<SystemTime> =
282285
self.config.config_path.as_ref().and_then(|p| file_mtime(p));
@@ -308,15 +311,46 @@ impl<T: Clone + Send + Sync + 'static> ConfigReloader<T> {
308311
};
309312

310313
loop {
311-
let trigger = self
312-
.wait_for_trigger(
313-
&mut poll_timer,
314-
&mut periodic_timer,
315-
#[cfg(unix)]
316-
&mut sighup,
317-
&mut last_modified,
318-
)
319-
.await;
314+
// Check for global shutdown before waiting for next trigger
315+
#[cfg(feature = "shutdown")]
316+
if shutdown_token.is_cancelled() {
317+
info!("Config reloader stopping (shutdown)");
318+
return;
319+
}
320+
321+
let trigger_result = {
322+
#[cfg(feature = "shutdown")]
323+
{
324+
tokio::select! {
325+
trigger = self.wait_for_trigger(
326+
&mut poll_timer,
327+
&mut periodic_timer,
328+
#[cfg(unix)]
329+
&mut sighup,
330+
&mut last_modified,
331+
) => Some(trigger),
332+
() = shutdown_token.cancelled() => None,
333+
}
334+
}
335+
#[cfg(not(feature = "shutdown"))]
336+
{
337+
Some(
338+
self.wait_for_trigger(
339+
&mut poll_timer,
340+
&mut periodic_timer,
341+
#[cfg(unix)]
342+
&mut sighup,
343+
&mut last_modified,
344+
)
345+
.await,
346+
)
347+
}
348+
};
349+
350+
let Some(trigger) = trigger_result else {
351+
info!("Config reloader stopping (shutdown)");
352+
return;
353+
};
320354

321355
// Debounce check
322356
if last_reload.elapsed() < self.config.debounce {

src/health/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Project: hyperi-rustlib
2+
// File: src/health/mod.rs
3+
// Purpose: Unified health registry for service health state
4+
// Language: Rust
5+
//
6+
// License: FSL-1.1-ALv2
7+
// Copyright: (c) 2026 HYPERI PTY LIMITED
8+
9+
//! Unified health registry for service readiness and liveness.
10+
//!
11+
//! Provides a global singleton [`HealthRegistry`] that modules register
12+
//! into at construction. The `/readyz` endpoint (or any health check)
13+
//! queries the registry to determine overall service health.
14+
//!
15+
//! # Usage
16+
//!
17+
//! ```rust
18+
//! use hyperi_rustlib::health::{HealthRegistry, HealthStatus};
19+
//!
20+
//! // Register a component health check at construction
21+
//! HealthRegistry::register("kafka_consumer", || HealthStatus::Healthy);
22+
//!
23+
//! // Query overall health
24+
//! assert!(HealthRegistry::is_ready());
25+
//! ```
26+
27+
pub mod registry;
28+
29+
pub use registry::{HealthRegistry, HealthStatus};

0 commit comments

Comments
 (0)