11use crate :: store:: core:: RedisStore ;
22use alloy:: primitives:: Address ;
33use anyhow:: { anyhow, Result } ;
4- use log:: { error, info } ;
4+ use log:: error;
55use redis:: AsyncCommands ;
66use shared:: models:: metric:: MetricEntry ;
77use std:: collections:: HashMap ;
88use std:: sync:: Arc ;
99
1010const ORCHESTRATOR_NODE_METRICS_STORE : & str = "orchestrator:node_metrics" ;
11- const ORCHESTRATOR_METRICS_STORE : & str = "orchestrator:metrics" ;
1211
1312pub struct MetricsStore {
1413 redis : Arc < RedisStore > ,
@@ -23,77 +22,6 @@ impl MetricsStore {
2322 label. replace ( ':' , "" )
2423 }
2524
26- pub async fn migrate_node_metrics_if_needed ( & self , node_address : Address ) -> Result < ( ) > {
27- let mut con = self . redis . client . get_multiplexed_async_connection ( ) . await ?;
28- let new_key = format ! ( "{}:{}" , ORCHESTRATOR_NODE_METRICS_STORE , node_address) ;
29-
30- // Check if the new node-centric key already exists
31- let exists: bool = con. exists ( & new_key) . await ?;
32- if exists {
33- info ! ( "Migration already complete for this node: {}" , node_address) ;
34- // Migration already complete for this node
35- return Ok ( ( ) ) ;
36- }
37-
38- // Perform the slow SCAN to find all metrics for this node in the old data structure
39- let pattern = format ! ( "{}:*:*" , ORCHESTRATOR_METRICS_STORE ) ;
40- let mut iter: redis:: AsyncIter < String > = con. scan_match ( & pattern) . await ?;
41- let mut old_keys_to_migrate = Vec :: new ( ) ;
42-
43- while let Some ( key) = iter. next_item ( ) . await {
44- old_keys_to_migrate. push ( key) ;
45- }
46- drop ( iter) ;
47-
48- // Collect all metrics for this node from the old structure
49- let mut node_metrics = HashMap :: new ( ) ;
50- let mut keys_to_clean = Vec :: new ( ) ;
51-
52- for old_key in old_keys_to_migrate {
53- if let Some ( value_str) = con
54- . hget :: < _ , _ , Option < String > > ( & old_key, node_address. to_string ( ) )
55- . await ?
56- {
57- if let Ok ( val) = value_str. parse :: < f64 > ( ) {
58- let parts: Vec < & str > = old_key. split ( ':' ) . collect ( ) ;
59- if parts. len ( ) >= 4 {
60- let task_id = parts[ 2 ] ;
61- let metric_name = parts[ 3 ] ;
62- let new_metric_key = format ! ( "{}:{}" , task_id, metric_name) ;
63- node_metrics. insert ( new_metric_key, val) ;
64- keys_to_clean. push ( old_key) ;
65- }
66- }
67- }
68- }
69-
70- // If we have metrics for this node, perform the atomic migration
71- if !node_metrics. is_empty ( ) {
72- // Use Redis MULTI/EXEC transaction for atomicity
73- let mut pipe = redis:: pipe ( ) ;
74- pipe. atomic ( ) ;
75-
76- // Set all metrics in the new node-centric key
77- for ( metric_key, value) in & node_metrics {
78- pipe. hset ( & new_key, metric_key, value) ;
79- }
80-
81- // Clean up the old data structure by removing this node's fields
82- for old_key in & keys_to_clean {
83- pipe. hdel ( old_key, node_address. to_string ( ) ) ;
84- }
85-
86- pipe. query_async :: < ( ) > ( & mut con) . await ?;
87- }
88-
89- // Always create the key to mark migration as complete, even if no metrics exist
90- // This prevents future migration attempts for nodes without data
91- let _: ( ) = con. hset ( & new_key, "_migrated" , "true" ) . await ?;
92- let _: ( ) = con. hdel ( & new_key, "_migrated" ) . await ?;
93-
94- Ok ( ( ) )
95- }
96-
9725 pub async fn store_metrics (
9826 & self ,
9927 metrics : Option < Vec < MetricEntry > > ,
0 commit comments