@@ -10,7 +10,9 @@ use shared::models::heartbeat::TaskDetails;
1010use shared:: models:: task:: TaskState ;
1111use std:: sync:: Arc ;
1212
13- const ORCHESTRATOR_BASE_KEY : & str = "orchestrator:node:" ;
13+ const ORCHESTRATOR_BASE_KEY : & str = "orchestrator:node" ;
14+ const ORCHESTRATOR_NODE_INDEX : & str = "orchestrator:node_index" ;
15+
1416pub struct NodeStore {
1517 redis : Arc < RedisStore > ,
1618}
@@ -22,19 +24,24 @@ impl NodeStore {
2224
2325 pub async fn get_nodes ( & self ) -> Result < Vec < OrchestratorNode > > {
2426 let mut con = self . redis . client . get_multiplexed_async_connection ( ) . await ?;
25- let keys: Vec < String > = con. keys ( format ! ( "{}:*" , ORCHESTRATOR_BASE_KEY ) ) . await ?;
2627
27- if keys. is_empty ( ) {
28+ let addresses: Vec < String > = con. smembers ( ORCHESTRATOR_NODE_INDEX ) . await ?;
29+
30+ if addresses. is_empty ( ) {
2831 return Ok ( Vec :: new ( ) ) ;
2932 }
3033
31- let mut nodes: Vec < OrchestratorNode > = Vec :: new ( ) ;
34+ let keys: Vec < String > = addresses
35+ . iter ( )
36+ . map ( |addr| format ! ( "{}:{}" , ORCHESTRATOR_BASE_KEY , addr) )
37+ . collect ( ) ;
3238
33- for node in keys {
34- let node_string: String = con. get ( node) . await ?;
35- let node: OrchestratorNode = OrchestratorNode :: from_string ( & node_string) ;
36- nodes. push ( node) ;
37- }
39+ let node_values: Vec < Option < String > > = con. mget ( & keys) . await ?;
40+
41+ let mut nodes: Vec < OrchestratorNode > = node_values
42+ . into_iter ( )
43+ . filter_map ( |value| value. map ( |s| OrchestratorNode :: from_string ( & s) ) )
44+ . collect ( ) ;
3845
3946 nodes. sort_by ( |a, b| match ( & a. status , & b. status ) {
4047 ( NodeStatus :: Healthy , NodeStatus :: Healthy ) => std:: cmp:: Ordering :: Equal ,
@@ -51,15 +58,19 @@ impl NodeStore {
5158
5259 Ok ( nodes)
5360 }
54-
5561 pub async fn add_node ( & self , node : OrchestratorNode ) -> Result < ( ) > {
5662 let mut con = self . redis . client . get_multiplexed_async_connection ( ) . await ?;
57- let _: ( ) = con
63+
64+ // Use Redis transaction (MULTI/EXEC) to ensure atomic execution of both operations
65+ let mut pipe = redis:: pipe ( ) ;
66+ pipe. atomic ( )
67+ . sadd ( ORCHESTRATOR_NODE_INDEX , node. address . to_string ( ) )
5868 . set (
5969 format ! ( "{}:{}" , ORCHESTRATOR_BASE_KEY , node. address) ,
6070 node. to_string ( ) ,
61- )
62- . await ?;
71+ ) ;
72+
73+ let _: ( ) = pipe. query_async ( & mut con) . await ?;
6374 Ok ( ( ) )
6475 }
6576
@@ -79,19 +90,31 @@ impl NodeStore {
7990
8091 pub async fn get_uninvited_nodes ( & self ) -> Result < Vec < OrchestratorNode > > {
8192 let mut con = self . redis . client . get_multiplexed_async_connection ( ) . await ?;
82- let keys: Vec < String > = con. keys ( format ! ( "{}:*" , ORCHESTRATOR_BASE_KEY ) ) . await ?;
83- let mut nodes: Vec < OrchestratorNode > = Vec :: new ( ) ;
84-
85- for key in keys {
86- if let Ok ( node_string) = con. get :: < _ , String > ( & key) . await {
87- if let Ok ( node) = serde_json:: from_str :: < OrchestratorNode > ( & node_string) {
88- if matches ! ( node. status, NodeStatus :: Discovered ) {
89- nodes. push ( node) ;
90- }
91- }
92- }
93+
94+ let addresses: Vec < String > = con. smembers ( ORCHESTRATOR_NODE_INDEX ) . await ?;
95+
96+ if addresses. is_empty ( ) {
97+ return Ok ( Vec :: new ( ) ) ;
9398 }
9499
100+ let keys: Vec < String > = addresses
101+ . iter ( )
102+ . map ( |addr| format ! ( "{}:{}" , ORCHESTRATOR_BASE_KEY , addr) )
103+ . collect ( ) ;
104+
105+ let node_values: Vec < Option < String > > = con. mget ( & keys) . await ?;
106+
107+ let nodes: Vec < OrchestratorNode > = node_values
108+ . into_iter ( )
109+ . filter_map ( |value| {
110+ value. and_then ( |s| {
111+ serde_json:: from_str :: < OrchestratorNode > ( & s)
112+ . ok ( )
113+ . filter ( |node| matches ! ( node. status, NodeStatus :: Discovered ) )
114+ } )
115+ } )
116+ . collect ( ) ;
117+
95118 Ok ( nodes)
96119 }
97120
@@ -182,6 +205,70 @@ impl NodeStore {
182205 }
183206 Ok ( ( ) )
184207 }
208+ pub async fn migrate_existing_nodes ( & self ) -> Result < ( ) > {
209+ let mut con = self . redis . client . get_multiplexed_async_connection ( ) . await ?;
210+
211+ let mut cursor = 0 ;
212+ loop {
213+ let ( new_cursor, keys) : ( u64 , Vec < String > ) = redis:: cmd ( "SCAN" )
214+ . cursor_arg ( cursor)
215+ . arg ( "MATCH" )
216+ . arg ( format ! ( "{}:*" , ORCHESTRATOR_BASE_KEY ) )
217+ . arg ( "COUNT" )
218+ . arg ( 100 )
219+ . query_async ( & mut con)
220+ . await ?;
221+
222+ for key in keys {
223+ if let Some ( address) = key. strip_prefix ( & format ! ( "{}:" , ORCHESTRATOR_BASE_KEY ) ) {
224+ let _: ( ) = con. sadd ( ORCHESTRATOR_NODE_INDEX , address) . await ?;
225+ }
226+ }
227+
228+ if new_cursor == 0 {
229+ break ;
230+ }
231+ cursor = new_cursor;
232+ }
233+
234+ // Handle legacy keys with double colons (orchestrator:node::address)
235+ let mut cursor = 0 ;
236+ loop {
237+ let ( new_cursor, keys) : ( u64 , Vec < String > ) = redis:: cmd ( "SCAN" )
238+ . cursor_arg ( cursor)
239+ . arg ( "MATCH" )
240+ . arg ( "orchestrator:node::*" )
241+ . arg ( "COUNT" )
242+ . arg ( 100 )
243+ . query_async ( & mut con)
244+ . await ?;
245+
246+ for key in keys {
247+ if let Some ( address) = key. strip_prefix ( "orchestrator:node::" ) {
248+ // Add to index
249+ let _: ( ) = con. sadd ( ORCHESTRATOR_NODE_INDEX , address) . await ?;
250+
251+ // Get the value from the old key
252+ let value: Option < String > = con. get ( & key) . await ?;
253+ if let Some ( value) = value {
254+ // Set the value with the correct key format
255+ let new_key = format ! ( "{}:{}" , ORCHESTRATOR_BASE_KEY , address) ;
256+ let _: ( ) = con. set ( & new_key, value) . await ?;
257+
258+ // Delete the old key with double colons
259+ let _: ( ) = con. del ( & key) . await ?;
260+ }
261+ }
262+ }
263+
264+ if new_cursor == 0 {
265+ break ;
266+ }
267+ cursor = new_cursor;
268+ }
269+
270+ Ok ( ( ) )
271+ }
185272}
186273
187274#[ cfg( test) ]
@@ -190,6 +277,7 @@ mod tests {
190277 use crate :: models:: node:: NodeStatus ;
191278 use crate :: models:: node:: OrchestratorNode ;
192279 use alloy:: primitives:: Address ;
280+ use redis:: AsyncCommands ;
193281 use std:: str:: FromStr ;
194282
195283 #[ tokio:: test]
@@ -268,4 +356,62 @@ mod tests {
268356 Address :: from_str( "0x0000000000000000000000000000000000000003" ) . unwrap( )
269357 ) ;
270358 }
359+
360+ #[ tokio:: test]
361+ async fn test_migration_handles_double_colon_keys ( ) {
362+ let app_state = create_test_app_state ( ) . await ;
363+ let node_store = & app_state. store_context . node_store ;
364+ let mut con = app_state
365+ . store_context
366+ . node_store
367+ . redis
368+ . client
369+ . get_multiplexed_async_connection ( )
370+ . await
371+ . unwrap ( ) ;
372+
373+ let test_address = "0x66295E2B4A78d1Cb57Db16Ac0260024900A5BA9B" ;
374+ let test_node = OrchestratorNode {
375+ address : Address :: from_str ( test_address) . unwrap ( ) ,
376+ ip_address : "192.168.1.1" . to_string ( ) ,
377+ port : 8080 ,
378+ status : NodeStatus :: Healthy ,
379+ ..Default :: default ( )
380+ } ;
381+
382+ // Manually create a legacy key with double colons
383+ let legacy_key = format ! ( "orchestrator:node::{}" , test_address) ;
384+ let _: ( ) = con. set ( & legacy_key, test_node. to_string ( ) ) . await . unwrap ( ) ;
385+
386+ // Verify the legacy key exists
387+ let exists: bool = con. exists ( & legacy_key) . await . unwrap ( ) ;
388+ assert ! ( exists) ;
389+
390+ // Run migration
391+ node_store. migrate_existing_nodes ( ) . await . unwrap ( ) ;
392+
393+ // Verify the legacy key is removed
394+ let legacy_exists: bool = con. exists ( & legacy_key) . await . unwrap ( ) ;
395+ assert ! ( !legacy_exists) ;
396+
397+ // Verify the correct key exists
398+ let correct_key = format ! ( "orchestrator:node:{}" , test_address) ;
399+ let correct_exists: bool = con. exists ( & correct_key) . await . unwrap ( ) ;
400+ assert ! ( correct_exists) ;
401+
402+ // Verify the node is in the index
403+ let in_index: bool = con
404+ . sismember ( "orchestrator:node_index" , test_address)
405+ . await
406+ . unwrap ( ) ;
407+ assert ! ( in_index) ;
408+
409+ // Verify we can retrieve the node correctly
410+ let retrieved_node = node_store
411+ . get_node ( & Address :: from_str ( test_address) . unwrap ( ) )
412+ . await
413+ . unwrap ( ) ;
414+ assert ! ( retrieved_node. is_some( ) ) ;
415+ assert_eq ! ( retrieved_node. unwrap( ) . address, test_node. address) ;
416+ }
271417}
0 commit comments