@@ -127,13 +127,55 @@ impl DiscoveryMonitor {
127127 Ok ( nodes)
128128 }
129129
130+ async fn has_healthy_node_with_same_endpoint (
131+ & self ,
132+ node_address : Address ,
133+ ip_address : & str ,
134+ port : u16 ,
135+ ) -> Result < bool , Error > {
136+ let nodes = self . store_context . node_store . get_nodes ( ) . await ?;
137+ Ok ( nodes. iter ( ) . any ( |other_node| {
138+ other_node. address != node_address
139+ && other_node. ip_address == ip_address
140+ && other_node. port == port
141+ && other_node. status == NodeStatus :: Healthy
142+ } ) )
143+ }
144+
130145 async fn sync_single_node_with_discovery (
131146 & self ,
132147 discovery_node : & DiscoveryNode ,
133148 ) -> Result < ( ) , Error > {
134149 let node_address = discovery_node. node . id . parse :: < Address > ( ) ?;
150+
151+ // Check if there's any healthy node with the same IP and port
152+ let has_healthy_node_same_endpoint = self
153+ . has_healthy_node_with_same_endpoint (
154+ node_address,
155+ & discovery_node. node . ip_address ,
156+ discovery_node. node . port ,
157+ )
158+ . await ?;
159+
135160 match self . store_context . node_store . get_node ( & node_address) . await {
136161 Ok ( Some ( existing_node) ) => {
162+ // If there's a healthy node with same IP and port, and this node isn't healthy, mark it dead
163+ if has_healthy_node_same_endpoint && existing_node. status != NodeStatus :: Healthy {
164+ info ! (
165+ "Node {} shares endpoint {}:{} with a healthy node, marking as dead" ,
166+ node_address, discovery_node. node. ip_address, discovery_node. node. port
167+ ) ;
168+ if let Err ( e) = self
169+ . store_context
170+ . node_store
171+ . update_node_status ( & node_address, NodeStatus :: Dead )
172+ . await
173+ {
174+ error ! ( "Error updating node status: {}" , e) ;
175+ }
176+ return Ok ( ( ) ) ;
177+ }
178+
137179 if discovery_node. is_validated && !discovery_node. is_provider_whitelisted {
138180 info ! (
139181 "Node {} is validated but not provider whitelisted, marking as ejected" ,
@@ -172,8 +214,8 @@ impl DiscoveryMonitor {
172214 // Node is active False but we have it in store and it is healthy
173215 // This means that the node likely got kicked by e.g. the validator
174216 // We simply remove it from the store now and will rediscover it later?
175- println ! (
176- "Node {} is no longer active on chain, marking as dead " ,
217+ info ! (
218+ "Node {} is no longer active on chain, marking as ejected " ,
177219 node_address
178220 ) ;
179221 if !discovery_node. is_provider_whitelisted {
@@ -225,6 +267,15 @@ impl DiscoveryMonitor {
225267 }
226268 }
227269 Ok ( None ) => {
270+ // Don't add new node if there's already a healthy node with same IP and port
271+ if has_healthy_node_same_endpoint {
272+ info ! (
273+ "Skipping new node {} as endpoint {}:{} is already used by a healthy node" ,
274+ node_address, discovery_node. node. ip_address, discovery_node. node. port
275+ ) ;
276+ return Ok ( ( ) ) ;
277+ }
278+
228279 info ! ( "Discovered new validated node: {}" , node_address) ;
229280 let mut node = OrchestratorNode :: from ( discovery_node. clone ( ) ) ;
230281 node. first_seen = Some ( Utc :: now ( ) ) ;
@@ -496,4 +547,119 @@ mod tests {
496547 // Status should remain the same
497548 assert_eq ! ( node_after_resync. status, NodeStatus :: Discovered ) ;
498549 }
550+
551+ #[ tokio:: test]
552+ async fn test_sync_node_with_same_endpoint ( ) {
553+ let store = Arc :: new ( RedisStore :: new_test ( ) ) ;
554+ let mut con = store
555+ . client
556+ . get_connection ( )
557+ . expect ( "Should connect to test Redis instance" ) ;
558+
559+ redis:: cmd ( "PING" )
560+ . query :: < String > ( & mut con)
561+ . expect ( "Redis should be responsive" ) ;
562+ redis:: cmd ( "FLUSHALL" )
563+ . query :: < String > ( & mut con)
564+ . expect ( "Redis should be flushed" ) ;
565+
566+ let store_context = Arc :: new ( StoreContext :: new ( store. clone ( ) ) ) ;
567+
568+ // Create first node (will be healthy)
569+ let node1_address = "0x1234567890123456789012345678901234567890" ;
570+ let node1 = DiscoveryNode {
571+ is_validated : true ,
572+ is_provider_whitelisted : true ,
573+ is_active : true ,
574+ node : Node {
575+ id : node1_address. to_string ( ) ,
576+ provider_address : node1_address. to_string ( ) ,
577+ ip_address : "127.0.0.1" . to_string ( ) ,
578+ port : 8080 ,
579+ compute_pool_id : 1 ,
580+ compute_specs : Some ( ComputeSpecs {
581+ gpu : None ,
582+ cpu : None ,
583+ ram_mb : Some ( 1024 ) ,
584+ storage_gb : Some ( 10 ) ,
585+ storage_path : None ,
586+ } ) ,
587+ } ,
588+ is_blacklisted : false ,
589+ last_updated : None ,
590+ created_at : None ,
591+ } ;
592+
593+ let mut orchestrator_node1 = OrchestratorNode :: from ( node1. clone ( ) ) ;
594+ orchestrator_node1. status = NodeStatus :: Healthy ;
595+ orchestrator_node1. address = node1. node . id . parse :: < Address > ( ) . unwrap ( ) ;
596+
597+ let _ = store_context
598+ . node_store
599+ . add_node ( orchestrator_node1. clone ( ) )
600+ . await ;
601+
602+ // Create second node with same IP and port
603+ let node2_address = "0x2234567890123456789012345678901234567890" ;
604+ let mut node2 = node1. clone ( ) ;
605+ node2. node . id = node2_address. to_string ( ) ;
606+ node2. node . provider_address = node2_address. to_string ( ) ;
607+
608+ let fake_wallet = Wallet :: new (
609+ "0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97" ,
610+ Url :: parse ( "http://localhost:8545" ) . unwrap ( ) ,
611+ )
612+ . unwrap ( ) ;
613+
614+ let mode = ServerMode :: Full ;
615+ let discovery_monitor = DiscoveryMonitor :: new (
616+ fake_wallet,
617+ 1 ,
618+ 10 ,
619+ "http://localhost:8080" . to_string ( ) ,
620+ store_context. clone ( ) ,
621+ Arc :: new ( LoopHeartbeats :: new ( & mode) ) ,
622+ ) ;
623+
624+ // Try to sync the second node
625+ discovery_monitor
626+ . sync_single_node_with_discovery ( & node2)
627+ . await
628+ . unwrap ( ) ;
629+
630+ // Verify second node was not added
631+ let node2_result = store_context
632+ . node_store
633+ . get_node ( & node2_address. parse :: < Address > ( ) . unwrap ( ) )
634+ . await
635+ . unwrap ( ) ;
636+ assert ! (
637+ node2_result. is_none( ) ,
638+ "Node with same endpoint should not be added"
639+ ) ;
640+
641+ // Create third node with same IP but different port (should be allowed)
642+ let node3_address = "0x3234567890123456789012345678901234567890" ;
643+ let mut node3 = node1. clone ( ) ;
644+ node3. node . id = node3_address. to_string ( ) ;
645+ node3. node . provider_address = node3_address. to_string ( ) ;
646+ node3. node . port = 8081 ; // Different port
647+
648+ // Try to sync the third node
649+ discovery_monitor
650+ . sync_single_node_with_discovery ( & node3)
651+ . await
652+ . unwrap ( ) ;
653+
654+ // Verify third node was added (different port)
655+ let node3_result = store_context
656+ . node_store
657+ . get_node ( & node3_address. parse :: < Address > ( ) . unwrap ( ) )
658+ . await
659+ . unwrap ( ) ;
660+ assert ! (
661+ node3_result. is_some( ) ,
662+ "Node with different port should be added"
663+ ) ;
664+ }
499665}
0 commit comments