Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 31dbebd

Browse files
authored
feat(orchestrator): add first_seen timestamp to track node discovery … (#427)
* feat(orchestrator): add first_seen timestamp to track node discovery time
1 parent cd599b5 commit 31dbebd

6 files changed

Lines changed: 152 additions & 1 deletion

File tree

crates/orchestrator/src/api/routes/nodes.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ mod tests {
469469
version: None,
470470
last_status_change: None,
471471
p2p_id: None,
472+
first_seen: None,
472473
compute_specs: None,
473474
};
474475
app_state

crates/orchestrator/src/discovery/monitor.rs

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::utils::loop_heartbeats::LoopHeartbeats;
55
use alloy::primitives::Address;
66
use anyhow::Error;
77
use anyhow::Result;
8+
use chrono::Utc;
89
use log::{error, info};
910
use serde_json;
1011
use shared::models::api::ApiResponse;
@@ -225,7 +226,8 @@ impl DiscoveryMonitor {
225226
}
226227
Ok(None) => {
227228
info!("Discovered new validated node: {}", node_address);
228-
let node = OrchestratorNode::from(discovery_node.clone());
229+
let mut node = OrchestratorNode::from(discovery_node.clone());
230+
node.first_seen = Some(Utc::now());
229231
let _ = self.store_context.node_store.add_node(node.clone()).await;
230232
}
231233
Err(e) => {
@@ -292,6 +294,7 @@ mod tests {
292294
let mut orchestrator_node = OrchestratorNode::from(discovery_node.clone());
293295
orchestrator_node.status = NodeStatus::Ejected;
294296
orchestrator_node.address = discovery_node.node.id.parse::<Address>().unwrap();
297+
orchestrator_node.first_seen = Some(Utc::now());
295298
orchestrator_node.compute_specs = Some(ComputeSpecs {
296299
gpu: None,
297300
cpu: None,
@@ -364,4 +367,133 @@ mod tests {
364367
assert_eq!(node.status, NodeStatus::Dead);
365368
}
366369
}
370+
371+
#[tokio::test]
372+
async fn test_first_seen_timestamp_set_on_new_node() {
373+
let node_address = "0x2234567890123456789012345678901234567890";
374+
let discovery_node = DiscoveryNode {
375+
is_validated: true,
376+
is_provider_whitelisted: true,
377+
is_active: true,
378+
node: Node {
379+
id: node_address.to_string(),
380+
provider_address: node_address.to_string(),
381+
ip_address: "192.168.1.100".to_string(),
382+
port: 8080,
383+
compute_pool_id: 1,
384+
compute_specs: None,
385+
},
386+
is_blacklisted: false,
387+
last_updated: None,
388+
created_at: None,
389+
};
390+
391+
let store = Arc::new(RedisStore::new_test());
392+
let mut con = store
393+
.client
394+
.get_connection()
395+
.expect("Should connect to test Redis instance");
396+
397+
redis::cmd("PING")
398+
.query::<String>(&mut con)
399+
.expect("Redis should be responsive");
400+
redis::cmd("FLUSHALL")
401+
.query::<String>(&mut con)
402+
.expect("Redis should be flushed");
403+
404+
let store_context = Arc::new(StoreContext::new(store.clone()));
405+
406+
let fake_wallet = Wallet::new(
407+
"0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97",
408+
Url::parse("http://localhost:8545").unwrap(),
409+
)
410+
.unwrap();
411+
412+
let mode = ServerMode::Full;
413+
414+
let discovery_monitor = DiscoveryMonitor::new(
415+
fake_wallet,
416+
1,
417+
10,
418+
"http://localhost:8080".to_string(),
419+
store_context.clone(),
420+
Arc::new(LoopHeartbeats::new(&mode)),
421+
);
422+
423+
let time_before = Utc::now();
424+
425+
// Sync a new node that doesn't exist in the store
426+
discovery_monitor
427+
.sync_single_node_with_discovery(&discovery_node)
428+
.await
429+
.unwrap();
430+
431+
let time_after = Utc::now();
432+
433+
// Verify the node was added with first_seen timestamp
434+
let node_from_store = store_context
435+
.node_store
436+
.get_node(&discovery_node.node.id.parse::<Address>().unwrap())
437+
.await
438+
.unwrap();
439+
440+
assert!(node_from_store.is_some());
441+
let node = node_from_store.unwrap();
442+
443+
// Verify first_seen is set
444+
assert!(node.first_seen.is_some());
445+
let first_seen = node.first_seen.unwrap();
446+
447+
// Verify the timestamp is within the expected range
448+
assert!(first_seen >= time_before && first_seen <= time_after);
449+
450+
// Verify other fields are set correctly
451+
assert_eq!(node.status, NodeStatus::Discovered);
452+
assert_eq!(node.ip_address, "192.168.1.100");
453+
454+
// Test case: Sync the same node again to verify first_seen is preserved
455+
// Simulate some time passing
456+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
457+
458+
// Update discovery data to simulate a change (e.g., IP address change)
459+
let updated_discovery_node = DiscoveryNode {
460+
is_validated: true,
461+
is_provider_whitelisted: true,
462+
is_active: true,
463+
node: Node {
464+
id: node_address.to_string(),
465+
provider_address: node_address.to_string(),
466+
ip_address: "192.168.1.101".to_string(), // Changed IP
467+
port: 8080,
468+
compute_pool_id: 1,
469+
compute_specs: None,
470+
},
471+
is_blacklisted: false,
472+
last_updated: Some(Utc::now()),
473+
created_at: None,
474+
};
475+
476+
// Sync the node again
477+
discovery_monitor
478+
.sync_single_node_with_discovery(&updated_discovery_node)
479+
.await
480+
.unwrap();
481+
482+
// Verify the node was updated but first_seen is preserved
483+
let node_after_resync = store_context
484+
.node_store
485+
.get_node(&discovery_node.node.id.parse::<Address>().unwrap())
486+
.await
487+
.unwrap()
488+
.unwrap();
489+
490+
// Verify first_seen is still the same (preserved)
491+
assert_eq!(node_after_resync.first_seen, Some(first_seen));
492+
493+
// Verify IP was updated
494+
assert_eq!(node_after_resync.ip_address, "192.168.1.101");
495+
496+
// Status should remain the same
497+
assert_eq!(node_after_resync.status, NodeStatus::Discovered);
498+
}
367499
}

crates/orchestrator/src/models/node.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ pub struct OrchestratorNode {
1818
pub version: Option<String>,
1919
pub p2p_id: Option<String>,
2020
pub last_status_change: Option<DateTime<Utc>>,
21+
#[serde(default)]
22+
pub first_seen: Option<DateTime<Utc>>,
2123

2224
#[serde(default)]
2325
pub compute_specs: Option<ComputeSpecs>,
@@ -42,6 +44,7 @@ impl From<DiscoveryNode> for OrchestratorNode {
4244
version: None,
4345
p2p_id: None,
4446
last_status_change: None,
47+
first_seen: None,
4548
compute_specs: discovery_node.compute_specs.clone(),
4649
}
4750
}

crates/orchestrator/src/plugins/node_groups/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ fn create_test_node(
4040
task_state: None,
4141
version: None,
4242
last_status_change: None,
43+
first_seen: None,
4344
p2p_id: Some("test_p2p_id".to_string()),
4445
compute_specs,
4546
}

crates/orchestrator/src/status_update/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ mod tests {
357357
version: None,
358358
last_status_change: None,
359359
p2p_id: None,
360+
first_seen: None,
360361
compute_specs: None,
361362
};
362363

@@ -438,6 +439,7 @@ mod tests {
438439
version: None,
439440
last_status_change: None,
440441
p2p_id: None,
442+
first_seen: None,
441443
compute_specs: None,
442444
};
443445

@@ -495,6 +497,7 @@ mod tests {
495497
version: None,
496498
last_status_change: None,
497499
p2p_id: None,
500+
first_seen: None,
498501
compute_specs: None,
499502
};
500503

@@ -559,6 +562,7 @@ mod tests {
559562
version: None,
560563
last_status_change: None,
561564
p2p_id: None,
565+
first_seen: None,
562566
compute_specs: None,
563567
};
564568

@@ -628,6 +632,7 @@ mod tests {
628632
version: None,
629633
last_status_change: None,
630634
p2p_id: None,
635+
first_seen: None,
631636
compute_specs: None,
632637
};
633638
if let Err(e) = app_state
@@ -720,6 +725,7 @@ mod tests {
720725
version: None,
721726
last_status_change: None,
722727
p2p_id: None,
728+
first_seen: None,
723729
compute_specs: None,
724730
};
725731
if let Err(e) = app_state
@@ -749,6 +755,7 @@ mod tests {
749755
version: None,
750756
last_status_change: None,
751757
p2p_id: None,
758+
first_seen: None,
752759
compute_specs: None,
753760
};
754761

@@ -829,6 +836,7 @@ mod tests {
829836
version: None,
830837
last_status_change: None,
831838
p2p_id: None,
839+
first_seen: None,
832840
compute_specs: None,
833841
};
834842

@@ -923,6 +931,7 @@ mod tests {
923931
version: None,
924932
last_status_change: None,
925933
p2p_id: None,
934+
first_seen: None,
926935
compute_specs: None,
927936
};
928937

crates/orchestrator/src/store/domains/node_store.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ mod tests {
198198
version: None,
199199
last_status_change: None,
200200
p2p_id: None,
201+
first_seen: None,
201202
compute_specs: None,
202203
};
203204

@@ -211,6 +212,7 @@ mod tests {
211212
version: None,
212213
last_status_change: None,
213214
p2p_id: None,
215+
first_seen: None,
214216
compute_specs: None,
215217
};
216218

@@ -238,6 +240,7 @@ mod tests {
238240
version: None,
239241
p2p_id: None,
240242
last_status_change: None,
243+
first_seen: None,
241244
compute_specs: None,
242245
},
243246
OrchestratorNode {
@@ -250,6 +253,7 @@ mod tests {
250253
version: None,
251254
p2p_id: None,
252255
last_status_change: None,
256+
first_seen: None,
253257
compute_specs: None,
254258
},
255259
OrchestratorNode {
@@ -262,6 +266,7 @@ mod tests {
262266
version: None,
263267
p2p_id: None,
264268
last_status_change: None,
269+
first_seen: None,
265270
compute_specs: None,
266271
},
267272
];

0 commit comments

Comments
 (0)