Skip to content

Commit 0a690e4

Browse files
committed
fix: add metrics instrumentation to all modules
Universal auto-instrumentation via global singleton pattern: - tiered-sink: spool messages/bytes, circuit state, hot/cold path, disk available, circuit trips - spool: queue depth gauge on push/pop/recv/clear - dlq: entries total, entries written, write errors - cache: hits/misses by source, entry count - http-client: request count by method/status, duration histogram - secrets: fetch total, cache hits/misses All metrics #[cfg(feature = "metrics")] gated — zero cost when off. Apps with MetricsManager get full observability across every rustlib feature with zero additional wiring. Document core pillar design decision in CLAUDE.md: config, logging, and metrics are non-negotiable for every module.
1 parent ccfc9ad commit 0a690e4

6 files changed

Lines changed: 144 additions & 9 deletions

File tree

src/cache/mod.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,17 @@ impl Cache {
9191
/// Returns `None` if not found or expired.
9292
pub async fn get<T: serde::de::DeserializeOwned>(&self, source: &str, key: &str) -> Option<T> {
9393
let full_key = format!("{source}:{key}");
94-
let bytes = self.inner.get(&full_key).await?;
94+
let bytes = self.inner.get(&full_key).await;
95+
96+
#[cfg(feature = "metrics")]
97+
if bytes.is_some() {
98+
metrics::counter!("dfe_cache_hits_total", "source" => source.to_string()).increment(1);
99+
} else {
100+
metrics::counter!("dfe_cache_misses_total", "source" => source.to_string())
101+
.increment(1);
102+
}
103+
104+
let bytes = bytes?;
95105
serde_json::from_slice(&bytes).ok()
96106
}
97107

@@ -112,6 +122,9 @@ impl Cache {
112122

113123
self.inner.insert(full_key.clone(), bytes).await;
114124

125+
#[cfg(feature = "metrics")]
126+
metrics::gauge!("dfe_cache_entries").set(self.inner.entry_count() as f64);
127+
115128
// Track key for source-level invalidation
116129
if let Ok(mut keys) = self.source_keys.lock() {
117130
keys.entry(source.to_string()).or_default().push(full_key);
@@ -130,6 +143,9 @@ impl Cache {
130143
for key in keys {
131144
self.inner.invalidate(&key).await;
132145
}
146+
147+
#[cfg(feature = "metrics")]
148+
metrics::gauge!("dfe_cache_entries").set(self.inner.entry_count() as f64);
133149
}
134150

135151
/// Invalidate a single entry.

src/dlq/file.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,17 @@ impl DlqBackend for FileDlq {
9494
.map_err(|e| DlqError::Serialization(format!("failed to serialise DLQ entry: {e}")))?;
9595
line.push(b'\n');
9696

97-
self.writer.write_line(&line)?;
97+
if let Err(e) = self.writer.write_line(&line) {
98+
#[cfg(feature = "metrics")]
99+
metrics::counter!("dfe_dlq_write_errors_total").increment(1);
100+
return Err(e.into());
101+
}
102+
103+
#[cfg(feature = "metrics")]
104+
{
105+
metrics::counter!("dfe_dlq_entries_total").increment(1);
106+
metrics::gauge!("dfe_dlq_entries_written").set(self.writer.lines_written() as f64);
107+
}
98108

99109
debug!(
100110
service = %self.service_name,

src/http_client/mod.rs

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,20 @@ impl HttpClient {
8181

8282
/// Send a GET request.
8383
pub async fn get(&self, url: &str) -> Result<Response, reqwest_middleware::Error> {
84-
self.inner.get(url).send().await
84+
#[cfg(feature = "metrics")]
85+
let start = std::time::Instant::now();
86+
87+
let result = self.inner.get(url).send().await;
88+
89+
#[cfg(feature = "metrics")]
90+
{
91+
let status = if result.is_ok() { "success" } else { "error" };
92+
metrics::counter!("dfe_http_client_requests_total", "method" => "GET", "status" => status).increment(1);
93+
metrics::histogram!("dfe_http_client_duration_seconds", "method" => "GET")
94+
.record(start.elapsed().as_secs_f64());
95+
}
96+
97+
result
8598
}
8699

87100
/// Send a POST request with a JSON body.
@@ -90,12 +103,26 @@ impl HttpClient {
90103
url: &str,
91104
body: &T,
92105
) -> Result<Response, reqwest_middleware::Error> {
93-
self.inner
106+
#[cfg(feature = "metrics")]
107+
let start = std::time::Instant::now();
108+
109+
let result = self
110+
.inner
94111
.post(url)
95112
.header("content-type", "application/json")
96113
.body(serde_json::to_vec(body).unwrap_or_default())
97114
.send()
98-
.await
115+
.await;
116+
117+
#[cfg(feature = "metrics")]
118+
{
119+
let status = if result.is_ok() { "success" } else { "error" };
120+
metrics::counter!("dfe_http_client_requests_total", "method" => "POST", "status" => status).increment(1);
121+
metrics::histogram!("dfe_http_client_duration_seconds", "method" => "POST")
122+
.record(start.elapsed().as_secs_f64());
123+
}
124+
125+
result
99126
}
100127

101128
/// Send a PUT request with a JSON body.
@@ -104,17 +131,44 @@ impl HttpClient {
104131
url: &str,
105132
body: &T,
106133
) -> Result<Response, reqwest_middleware::Error> {
107-
self.inner
134+
#[cfg(feature = "metrics")]
135+
let start = std::time::Instant::now();
136+
137+
let result = self
138+
.inner
108139
.put(url)
109140
.header("content-type", "application/json")
110141
.body(serde_json::to_vec(body).unwrap_or_default())
111142
.send()
112-
.await
143+
.await;
144+
145+
#[cfg(feature = "metrics")]
146+
{
147+
let status = if result.is_ok() { "success" } else { "error" };
148+
metrics::counter!("dfe_http_client_requests_total", "method" => "PUT", "status" => status).increment(1);
149+
metrics::histogram!("dfe_http_client_duration_seconds", "method" => "PUT")
150+
.record(start.elapsed().as_secs_f64());
151+
}
152+
153+
result
113154
}
114155

115156
/// Send a DELETE request.
116157
pub async fn delete(&self, url: &str) -> Result<Response, reqwest_middleware::Error> {
117-
self.inner.delete(url).send().await
158+
#[cfg(feature = "metrics")]
159+
let start = std::time::Instant::now();
160+
161+
let result = self.inner.delete(url).send().await;
162+
163+
#[cfg(feature = "metrics")]
164+
{
165+
let status = if result.is_ok() { "success" } else { "error" };
166+
metrics::counter!("dfe_http_client_requests_total", "method" => "DELETE", "status" => status).increment(1);
167+
metrics::histogram!("dfe_http_client_duration_seconds", "method" => "DELETE")
168+
.record(start.elapsed().as_secs_f64());
169+
}
170+
171+
result
118172
}
119173

120174
/// Access the underlying middleware client for custom requests.

src/secrets/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,20 @@ impl SecretsManager {
158158
// Check cache first
159159
if let Some(cached) = self.cache.read().get(&cache_key) {
160160
debug!(path = %path, "Secret loaded from cache");
161+
#[cfg(feature = "metrics")]
162+
metrics::counter!("dfe_secrets_cache_hits_total").increment(1);
161163
return Ok(cached);
162164
}
163165

166+
#[cfg(feature = "metrics")]
167+
metrics::counter!("dfe_secrets_cache_misses_total").increment(1);
168+
164169
// Fetch from file
165170
let value = self.file_provider.get(path).await?;
166171

172+
#[cfg(feature = "metrics")]
173+
metrics::counter!("dfe_secrets_fetch_total").increment(1);
174+
167175
// Update cache
168176
if let Err(e) = self.cache.write().set(&cache_key, &value) {
169177
warn!(error = %e, "Failed to cache secret");
@@ -181,9 +189,14 @@ impl SecretsManager {
181189
// Check cache first
182190
if let Some(cached) = self.cache.read().get(cache_key) {
183191
debug!(key = %cache_key, "Secret loaded from cache");
192+
#[cfg(feature = "metrics")]
193+
metrics::counter!("dfe_secrets_cache_hits_total").increment(1);
184194
return Ok(cached);
185195
}
186196

197+
#[cfg(feature = "metrics")]
198+
metrics::counter!("dfe_secrets_cache_misses_total").increment(1);
199+
187200
// Fetch from provider
188201
let result = match source {
189202
SecretSource::File { path } => self.file_provider.get(path).await,
@@ -221,6 +234,9 @@ impl SecretsManager {
221234
}
222235
};
223236

237+
#[cfg(feature = "metrics")]
238+
metrics::counter!("dfe_secrets_fetch_total").increment(1);
239+
224240
match result {
225241
Ok(value) => {
226242
// Update cache

src/spool/queue.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ impl Spool {
109109
.map_err(|e| SpoolError::Queue(e.to_string()))?;
110110

111111
self.len += 1;
112+
#[cfg(feature = "metrics")]
113+
::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
112114
Ok(())
113115
}
114116

@@ -181,6 +183,8 @@ impl Spool {
181183
.commit()
182184
.map_err(|e| SpoolError::Queue(e.to_string()))?;
183185
self.len = self.len.saturating_sub(1);
186+
#[cfg(feature = "metrics")]
187+
::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
184188
Ok(Some(data))
185189
}
186190
Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
@@ -215,6 +219,8 @@ impl Spool {
215219
.commit()
216220
.map_err(|e| SpoolError::Queue(e.to_string()))?;
217221
self.len = self.len.saturating_sub(1);
222+
#[cfg(feature = "metrics")]
223+
::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
218224
Ok(data)
219225
}
220226

@@ -255,6 +261,8 @@ impl Spool {
255261
}
256262
}
257263
self.len = 0;
264+
#[cfg(feature = "metrics")]
265+
::metrics::gauge!("dfe_spool_queue_depth").set(0.0);
258266
Ok(())
259267
}
260268

src/tiered_sink/tiered.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ impl<S: Sink> TieredSink<S> {
146146
match self.try_hot_path(data).await {
147147
Ok(()) => {
148148
self.hot_path_count.fetch_add(1, AtomicOrdering::Relaxed);
149+
#[cfg(feature = "metrics")]
150+
::metrics::counter!("dfe_spool_hot_path_total").increment(1);
149151
return Ok(());
150152
}
151153
Err(TieredSinkError::Sink(_)) => {
@@ -161,13 +163,22 @@ impl<S: Sink> TieredSink<S> {
161163
// Cold path: spool to disk
162164
self.spool_message(data).await?;
163165
self.cold_path_count.fetch_add(1, AtomicOrdering::Relaxed);
166+
#[cfg(feature = "metrics")]
167+
::metrics::counter!("dfe_spool_cold_path_total").increment(1);
164168
Ok(())
165169
}
166170

167171
/// Determine if we should attempt the hot path.
168172
async fn should_use_hot_path(&self) -> bool {
169173
let circuit_state = self.circuit.state().await;
170174

175+
#[cfg(feature = "metrics")]
176+
::metrics::gauge!("dfe_spool_circuit_state").set(match circuit_state {
177+
CircuitState::Closed => 0.0,
178+
CircuitState::HalfOpen => 1.0,
179+
CircuitState::Open => 2.0,
180+
});
181+
171182
match circuit_state {
172183
CircuitState::Open => false,
173184
CircuitState::Closed | CircuitState::HalfOpen => match self.config.ordering {
@@ -195,6 +206,8 @@ impl<S: Sink> TieredSink<S> {
195206
}
196207
Ok(Err(SinkError::Unavailable)) => {
197208
self.circuit.record_failure().await;
209+
#[cfg(feature = "metrics")]
210+
::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
198211
Err(TieredSinkError::Spool("sink unavailable".into()))
199212
}
200213
Ok(Err(SinkError::Fatal(e))) => {
@@ -203,6 +216,8 @@ impl<S: Sink> TieredSink<S> {
203216
}
204217
Err(_timeout) => {
205218
self.circuit.record_failure().await;
219+
#[cfg(feature = "metrics")]
220+
::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
206221
Err(TieredSinkError::Spool("send timeout".into()))
207222
}
208223
}
@@ -250,6 +265,14 @@ impl<S: Sink> TieredSink<S> {
250265
self.spool_bytes
251266
.fetch_add(compressed_len, AtomicOrdering::Relaxed);
252267

268+
#[cfg(feature = "metrics")]
269+
{
270+
::metrics::gauge!("dfe_spool_messages")
271+
.set(self.spool_count.load(AtomicOrdering::Relaxed) as f64);
272+
::metrics::gauge!("dfe_spool_bytes")
273+
.set(self.spool_bytes.load(AtomicOrdering::Relaxed) as f64);
274+
}
275+
253276
#[cfg(feature = "logger")]
254277
tracing::debug!(
255278
spool_items = self.spool_count.load(AtomicOrdering::Relaxed),
@@ -453,7 +476,15 @@ async fn disk_capacity_poller(
453476
() = tokio::time::sleep(poll_interval) => {}
454477
}
455478

456-
let available = check_disk_space(&spool_path).is_none_or(|(total, avail)| {
479+
let disk_space = check_disk_space(&spool_path);
480+
481+
#[cfg(feature = "metrics")]
482+
if let Some((total, avail)) = disk_space {
483+
::metrics::gauge!("dfe_spool_disk_available_bytes").set(avail as f64);
484+
::metrics::gauge!("dfe_spool_disk_total_bytes").set(total as f64);
485+
}
486+
487+
let available = disk_space.is_none_or(|(total, avail)| {
457488
if total == 0 {
458489
return true;
459490
}

0 commit comments

Comments
 (0)