Skip to content

Commit c1ba6a8

Browse files
committed
FastSlowStoreMetrics as OTEL.
1 parent d623eb2 commit c1ba6a8

File tree

2 files changed

+56
-33
lines changed

2 files changed

+56
-33
lines changed

nativelink-store/src/fast_slow_store.rs

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@ use core::borrow::BorrowMut;
1616
use core::cmp::{max, min};
1717
use core::ops::Range;
1818
use core::pin::Pin;
19-
use core::sync::atomic::{AtomicU64, Ordering};
2019
use std::collections::HashMap;
2120
use std::ffi::OsString;
22-
use std::sync::{Arc, Weak};
21+
use std::sync::{Arc, LazyLock, Weak};
2322

2423
use async_trait::async_trait;
2524
use futures::{FutureExt, join};
@@ -35,10 +34,11 @@ use nativelink_util::store_trait::{
3534
RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations,
3635
UploadSizeInfo, slow_update_store_with_file,
3736
};
37+
use opentelemetry::{InstrumentationScope, global, metrics};
3838
use parking_lot::Mutex;
3939
use tokio::sync::OnceCell;
4040
use tracing::trace;
41-
41+
use nativelink_util::metrics::FAST_SLOW_STORE_METRICS;
4242
// TODO(palfrey) This store needs to be evaluated for more efficient memory usage,
4343
// there are many copies happening internally.
4444

@@ -57,8 +57,6 @@ pub struct FastSlowStore {
5757
slow_store: Store,
5858
slow_direction: StoreDirection,
5959
weak_self: Weak<Self>,
60-
#[metric]
61-
metrics: FastSlowStoreMetrics,
6260
// De-duplicate requests for the fast store, only the first streams, others
6361
// are blocked. This may feel like it's causing a slow down of tasks, but
6462
// actually it's faster because we're not downloading the file multiple
@@ -123,7 +121,6 @@ impl FastSlowStore {
123121
slow_store,
124122
slow_direction: spec.slow_direction,
125123
weak_self: weak_self.clone(),
126-
metrics: FastSlowStoreMetrics::default(),
127124
populating_digests: Mutex::new(HashMap::new()),
128125
})
129126
}
@@ -219,17 +216,15 @@ impl FastSlowStore {
219216
}
220217

221218
if !counted_hit {
222-
self.metrics
223-
.slow_store_hit_count
224-
.fetch_add(1, Ordering::Acquire);
219+
FAST_SLOW_STORE_METRICS.slow_store_hit_count.add(1, &[]);
225220
counted_hit = true;
226221
}
227222

228223
let output_buf_len = u64::try_from(output_buf.len())
229224
.err_tip(|| "Could not output_buf.len() to u64")?;
230-
self.metrics
225+
FAST_SLOW_STORE_METRICS
231226
.slow_store_downloaded_bytes
232-
.fetch_add(output_buf_len, Ordering::Acquire);
227+
.add(output_buf_len, &[]);
233228

234229
let writer_fut = Self::calculate_range(
235230
&(bytes_received..bytes_received + output_buf_len),
@@ -538,15 +533,13 @@ impl StoreDriver for FastSlowStore {
538533
// TODO(palfrey) Investigate if we should maybe ignore errors here instead of
539534
// forwarding them up.
540535
if self.fast_store.has(key.borrow()).await?.is_some() {
541-
self.metrics
542-
.fast_store_hit_count
543-
.fetch_add(1, Ordering::Acquire);
536+
FAST_SLOW_STORE_METRICS.fast_store_hit_count.add(1, &[]);
544537
self.fast_store
545538
.get_part(key, writer.borrow_mut(), offset, length)
546539
.await?;
547-
self.metrics
540+
FAST_SLOW_STORE_METRICS
548541
.fast_store_downloaded_bytes
549-
.fetch_add(writer.get_bytes_written(), Ordering::Acquire);
542+
.add(writer.get_bytes_written(), &[]);
550543
return Ok(());
551544
}
552545

@@ -558,15 +551,13 @@ impl StoreDriver for FastSlowStore {
558551
|| self.fast_direction == StoreDirection::ReadOnly
559552
|| self.fast_direction == StoreDirection::Update
560553
{
561-
self.metrics
562-
.slow_store_hit_count
563-
.fetch_add(1, Ordering::Acquire);
554+
FAST_SLOW_STORE_METRICS.slow_store_hit_count.add(1, &[]);
564555
self.slow_store
565556
.get_part(key, writer.borrow_mut(), offset, length)
566557
.await?;
567-
self.metrics
558+
FAST_SLOW_STORE_METRICS
568559
.slow_store_downloaded_bytes
569-
.fetch_add(writer.get_bytes_written(), Ordering::Acquire);
560+
.add(writer.get_bytes_written(), &[]);
570561
return Ok(());
571562
}
572563

@@ -612,16 +603,4 @@ impl StoreDriver for FastSlowStore {
612603
}
613604
}
614605

615-
#[derive(Debug, Default, MetricsComponent)]
616-
struct FastSlowStoreMetrics {
617-
#[metric(help = "Hit count for the fast store")]
618-
fast_store_hit_count: AtomicU64,
619-
#[metric(help = "Downloaded bytes from the fast store")]
620-
fast_store_downloaded_bytes: AtomicU64,
621-
#[metric(help = "Hit count for the slow store")]
622-
slow_store_hit_count: AtomicU64,
623-
#[metric(help = "Downloaded bytes from the slow store")]
624-
slow_store_downloaded_bytes: AtomicU64,
625-
}
626-
627606
default_health_status_indicator!(FastSlowStore);

nativelink-util/src/metrics.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1514,3 +1514,47 @@ pub struct RunningActionsMetrics {
15141514
// Other counters
15151515
pub task_timeouts: metrics::Counter<u64>,
15161516
}
1517+
1518+
/// Global fast/slow store metrics instruments.
1519+
pub static FAST_SLOW_STORE_METRICS: LazyLock<FastSlowStoreMetrics> = LazyLock::new(|| {
1520+
let meter = global::meter_with_scope(InstrumentationScope::builder("nativelink").build());
1521+
1522+
FastSlowStoreMetrics {
1523+
fast_store_hit_count: meter
1524+
.u64_counter("fast_slow_store.fast_store.hit_count")
1525+
.with_description("Hit count for the fast store")
1526+
.with_unit("{hit}")
1527+
.build(),
1528+
1529+
fast_store_downloaded_bytes: meter
1530+
.u64_counter("fast_slow_store.fast_store.downloaded_bytes")
1531+
.with_description("Downloaded bytes from the fast store")
1532+
.with_unit("By")
1533+
.build(),
1534+
1535+
slow_store_hit_count: meter
1536+
.u64_counter("fast_slow_store.slow_store.hit_count")
1537+
.with_description("Hit count for the slow store")
1538+
.with_unit("{hit}")
1539+
.build(),
1540+
1541+
slow_store_downloaded_bytes: meter
1542+
.u64_counter("fast_slow_store.slow_store.downloaded_bytes")
1543+
.with_description("Downloaded bytes from the slow store")
1544+
.with_unit("By")
1545+
.build(),
1546+
}
1547+
});
1548+
1549+
/// OpenTelemetry metrics instruments for fast/slow store monitoring.
1550+
#[derive(Debug)]
1551+
pub struct FastSlowStoreMetrics {
1552+
/// Counter of cache hits on the fast store
1553+
pub fast_store_hit_count: metrics::Counter<u64>,
1554+
/// Counter of bytes downloaded from the fast store
1555+
pub fast_store_downloaded_bytes: metrics::Counter<u64>,
1556+
/// Counter of cache hits on the slow store
1557+
pub slow_store_hit_count: metrics::Counter<u64>,
1558+
/// Counter of bytes downloaded from the slow store
1559+
pub slow_store_downloaded_bytes: metrics::Counter<u64>,
1560+
}

0 commit comments

Comments
 (0)