1+ use crate :: callback_utils:: RemoveItemCallbackHolder ;
2+ use crate :: filesystem_store:: FilesystemStore ;
3+ use crate :: memory_store:: MemoryStore ;
14use async_trait:: async_trait;
25use nativelink_error:: Error ;
36use nativelink_metric:: MetricsComponent ;
47use nativelink_util:: buf_channel:: { DropCloserReadHalf , DropCloserWriteHalf } ;
58use nativelink_util:: health_utils:: { HealthStatus , HealthStatusIndicator } ;
6- use nativelink_util:: metrics:: { StoreMetricAttrs , StoreType , STORE_METRICS } ;
9+ use nativelink_util:: metrics:: { STORE_METRICS , StoreMetricAttrs , StoreType } ;
710use nativelink_util:: store_trait:: {
811 RemoveItemCallback , Store , StoreDriver , StoreKey , StoreLike , UploadSizeInfo ,
912} ;
@@ -15,15 +18,31 @@ use std::time::Instant;
1518#[ derive( MetricsComponent , Debug ) ]
1619pub struct MetricsStore {
1720 inner : Arc < Store > ,
18- attrs : StoreMetricAttrs ,
21+ attrs : Arc < StoreMetricAttrs > ,
1922}
2023
2124impl MetricsStore {
2225 #[ must_use]
2326 pub fn new ( inner : Arc < Store > , name : & str , store_type : StoreType ) -> Arc < Self > {
27+ let attrs = Arc :: new ( StoreMetricAttrs :: new_with_name ( store_type, name) ) ;
28+ if should_add_remove_callback ( inner. clone ( ) ) {
29+ #[ derive( Debug ) ]
30+ struct EvictionCallback {
31+ attrs : Arc < StoreMetricAttrs > ,
32+ }
33+ impl RemoveItemCallback for EvictionCallback {
34+ fn callback < ' a > ( & ' a self , store_key : StoreKey < ' a > ) -> Pin < Box < dyn Future < Output =( ) > + Send + ' a > > {
35+ Box :: pin ( async { STORE_METRICS . eviction_count . add ( 1 , self . attrs . eviction ( ) ) } )
36+ }
37+ }
38+ if let Err ( e) = inner. register_remove_callback ( Arc :: new ( EvictionCallback { attrs : attrs. clone ( ) } ) ) {
39+ tracing:: error!( "Failed to register remove callback: {:?}" , e) ;
40+ }
41+ }
42+
2443 Arc :: new ( Self {
2544 inner : inner. clone ( ) ,
26- attrs : StoreMetricAttrs :: new_with_name ( store_type , name ) ,
45+ attrs : attrs . clone ( ) ,
2746 } )
2847 }
2948}
@@ -76,7 +95,9 @@ impl StoreDriver for MetricsStore {
7695 . store_operation_duration
7796 . record ( duration_ms as f64 , & self . attrs . write_success ( ) ) ;
7897 } else {
79- STORE_METRICS . store_operations . add ( 1 , & self . attrs . write_error ( ) ) ;
98+ STORE_METRICS
99+ . store_operations
100+ . add ( 1 , & self . attrs . write_error ( ) ) ;
80101 STORE_METRICS
81102 . store_operation_duration
82103 . record ( duration_ms as f64 , & self . attrs . write_error ( ) ) ;
@@ -144,3 +165,8 @@ impl HealthStatusIndicator for MetricsStore {
144165 self . inner . check_health ( _namespace) . await
145166 }
146167}
168+
169+ fn should_add_remove_callback ( store : Arc < Store > ) -> bool {
170+ store. downcast_ref :: < FilesystemStore > ( None ) . is_some ( )
171+ || store. downcast_ref :: < MemoryStore > ( None ) . is_some ( )
172+ }
0 commit comments