Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 4320638

Browse files
authored
feat(orchestrator): ability to sync location via discovery, location based group building (#523)
* ability to sync location via discovery, location based group building
1 parent c6538e7 commit 4320638

16 files changed

Lines changed: 460 additions & 21 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,16 @@ down:
6666
pkill -f "target/debug/orchestrator" 2>/dev/null || true
6767
pkill -f "target/debug/validator" 2>/dev/null || true
6868
pkill -f "target/debug/discovery" 2>/dev/null || true
69+
pkill -9 -f "cargo run --bin discovery" 2>/dev/null || true
70+
pkill -9 -f "cargo watch" 2>/dev/null || true
6971

7072
whitelist-provider:
7173
set -a; source ${ENV_FILE}; set +a; \
7274
cargo run -p dev-utils --example whitelist_provider -- --provider-address $${PROVIDER_ADDRESS} --key $${PRIVATE_KEY_VALIDATOR} --rpc-url $${RPC_URL}
7375

7476
watch-discovery:
7577
set -a; source .env; set +a; \
76-
cargo watch -w crates/discovery/src -x "run --bin discovery -- --rpc-url $${RPC_URL} --max-nodes-per-ip $${MAX_NODES_PER_IP:-2}"
78+
cargo watch -w crates/discovery/src -x "run --bin discovery -- --rpc-url $${RPC_URL} --max-nodes-per-ip $${MAX_NODES_PER_IP:-2} $${LOCATION_SERVICE_URL:+--location-service-url $${LOCATION_SERVICE_URL}} $${LOCATION_SERVICE_API_KEY:+--location-service-api-key $${LOCATION_SERVICE_API_KEY}}"
7779

7880
watch-worker:
7981
set -a; source ${ENV_FILE}; set +a; \

crates/discovery/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ futures = { workspace = true }
1313
log = { workspace = true }
1414
redis = { workspace = true, features = ["tokio-comp"] }
1515
redis-test = { workspace = true }
16+
reqwest = { workspace = true }
1617
serde = { workspace = true }
1718
serde_json = { workspace = true }
1819
shared = { workspace = true }

crates/discovery/Dockerfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ ENV REDIS_URL="redis://localhost:6380"
1010
ENV PORT="8089"
1111
ENV MAX_NODES_PER_IP="3"
1212
ENV MODE="full"
13+
ENV LOCATION_SERVICE_URL=""
14+
ENV LOCATION_SERVICE_API_KEY=""
1315

1416
RUN echo '#!/bin/sh\n\
1517
exec /usr/local/bin/discovery \
@@ -19,6 +21,8 @@ exec /usr/local/bin/discovery \
1921
--port "$PORT" \
2022
--max-nodes-per-ip "$MAX_NODES_PER_IP" \
2123
--mode "$MODE" \
24+
$([ ! -z "$LOCATION_SERVICE_URL" ] && echo "--location-service-url $LOCATION_SERVICE_URL") \
25+
$([ ! -z "$LOCATION_SERVICE_API_KEY" ] && echo "--location-service-api-key $LOCATION_SERVICE_API_KEY") \
2226
"$@"' > /entrypoint.sh && \
2327
chmod +x /entrypoint.sh
2428

crates/discovery/src/api/routes/node.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ pub async fn register_node(
204204

205205
let node_store = data.node_store.clone();
206206

207-
match node_store.register_node(node.clone()).await {
207+
match node_store.register_node(node.into_inner()).await {
208208
Ok(_) => HttpResponse::Ok().json(ApiResponse::new(true, "Node registered successfully")),
209209
Err(_) => HttpResponse::InternalServerError()
210210
.json(ApiResponse::new(false, "Internal server error")),
@@ -360,6 +360,7 @@ mod tests {
360360
is_blacklisted: false,
361361
last_updated: None,
362362
created_at: None,
363+
location: None,
363364
};
364365

365366
match app_state.node_store.update_node(validated).await {

crates/discovery/src/chainsync/sync.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,25 @@ impl ChainSync {
103103
})?;
104104
n.is_blacklisted = is_blacklisted;
105105

106-
match node_store.update_node(n).await {
107-
Ok(_) => {
108-
debug!("Successfully updated node {}", node.id);
109-
Ok(())
110-
}
111-
Err(e) => {
112-
error!("Error updating node {}: {}", node.id, e);
113-
Err(anyhow::anyhow!("Failed to update node: {}", e))
106+
// Only update if the node has changed
107+
if n.is_active != node.is_active
108+
|| n.is_validated != node.is_validated
109+
|| n.is_provider_whitelisted != node.is_provider_whitelisted
110+
|| n.is_blacklisted != node.is_blacklisted
111+
{
112+
match node_store.update_node(n).await {
113+
Ok(_) => {
114+
debug!("Successfully updated node {}", node.id);
115+
Ok(())
116+
}
117+
Err(e) => {
118+
error!("Error updating node {}: {}", node.id, e);
119+
Err(anyhow::anyhow!("Failed to update node: {}", e))
120+
}
114121
}
122+
} else {
123+
debug!("Node {} unchanged, skipping update", node.id);
124+
Ok(())
115125
}
116126
}
117127

crates/discovery/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod api;
2+
pub mod chainsync;
3+
pub mod location_enrichment;
4+
pub mod location_service;
5+
pub mod store;
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use crate::location_service::LocationService;
2+
use crate::store::node_store::NodeStore;
3+
use anyhow::Result;
4+
use log::{error, info, warn};
5+
use redis::AsyncCommands;
6+
use std::sync::Arc;
7+
use std::time::Duration;
8+
use tokio::time::interval;
9+
10+
const LOCATION_RETRY_KEY: &str = "location:retries:";
11+
const MAX_RETRIES: u32 = 3;
12+
const BATCH_SIZE: usize = 10;
13+
14+
pub struct LocationEnrichmentService {
15+
node_store: Arc<NodeStore>,
16+
location_service: Arc<LocationService>,
17+
redis_client: redis::Client,
18+
}
19+
20+
impl LocationEnrichmentService {
21+
pub fn new(
22+
node_store: Arc<NodeStore>,
23+
location_service: Arc<LocationService>,
24+
redis_url: &str,
25+
) -> Result<Self> {
26+
let redis_client = redis::Client::open(redis_url)?;
27+
Ok(Self {
28+
node_store,
29+
location_service,
30+
redis_client,
31+
})
32+
}
33+
34+
pub async fn run(&self, interval_seconds: u64) -> Result<()> {
35+
let mut interval = interval(Duration::from_secs(interval_seconds));
36+
37+
loop {
38+
interval.tick().await;
39+
40+
if let Err(e) = self.enrich_nodes_without_location().await {
41+
error!("Location enrichment cycle failed: {}", e);
42+
}
43+
}
44+
}
45+
46+
async fn enrich_nodes_without_location(&self) -> Result<()> {
47+
let nodes = self.node_store.get_nodes().await?;
48+
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
49+
50+
let nodes_without_location: Vec<_> = nodes
51+
.into_iter()
52+
.filter(|node| node.location.is_none())
53+
.collect();
54+
55+
if nodes_without_location.is_empty() {
56+
return Ok(());
57+
}
58+
59+
info!(
60+
"Found {} nodes without location data",
61+
nodes_without_location.len()
62+
);
63+
64+
// Process in batches to respect rate limits
65+
for chunk in nodes_without_location.chunks(BATCH_SIZE) {
66+
for node in chunk {
67+
let retry_key = format!("{}{}", LOCATION_RETRY_KEY, node.id);
68+
let retries: u32 = conn.get(&retry_key).await.unwrap_or(0);
69+
70+
if retries >= MAX_RETRIES {
71+
continue; // Skip nodes that have exceeded retry limit
72+
}
73+
74+
match self.location_service.get_location(&node.ip_address).await {
75+
Ok(Some(location)) => {
76+
info!(
77+
"Successfully fetched location for node {}: {:?}",
78+
node.id, location
79+
);
80+
81+
let mut updated_node = node.clone();
82+
updated_node.location = Some(location);
83+
84+
if let Err(e) = self.node_store.update_node(updated_node).await {
85+
error!("Failed to update node {} with location: {}", node.id, e);
86+
} else {
87+
let _: () = conn.del(&retry_key).await?;
88+
}
89+
}
90+
Ok(None) => {
91+
// Location service is disabled
92+
break;
93+
}
94+
Err(e) => {
95+
warn!(
96+
"Failed to fetch location for node {} (attempt {}/{}): {}",
97+
node.id,
98+
retries + 1,
99+
MAX_RETRIES,
100+
e
101+
);
102+
103+
// Increment retry counter
104+
let _: () = conn.set_ex(&retry_key, retries + 1, 86400).await?;
105+
// Expire after 24h
106+
}
107+
}
108+
109+
// Rate limiting - wait between requests
110+
tokio::time::sleep(Duration::from_millis(100)).await;
111+
}
112+
113+
// Longer wait between batches
114+
tokio::time::sleep(Duration::from_secs(1)).await;
115+
}
116+
117+
Ok(())
118+
}
119+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use anyhow::{Context, Result};
2+
use reqwest::Client;
3+
use serde::{Deserialize, Serialize};
4+
use shared::models::node::NodeLocation;
5+
use std::time::Duration;
6+
7+
#[derive(Debug, Deserialize, Serialize)]
8+
struct IpApiResponse {
9+
ip: String,
10+
city: Option<String>,
11+
region: Option<String>,
12+
country: Option<String>,
13+
#[serde(default)]
14+
latitude: f64,
15+
#[serde(default)]
16+
longitude: f64,
17+
}
18+
19+
pub struct LocationService {
20+
client: Client,
21+
base_url: String,
22+
enabled: bool,
23+
api_key: String,
24+
}
25+
26+
impl LocationService {
27+
pub fn new(base_url: Option<String>, api_key: Option<String>) -> Self {
28+
let enabled = base_url.is_some();
29+
let base_url = base_url.unwrap_or_else(|| "https://ipapi.co".to_string());
30+
let api_key = api_key.unwrap_or_default();
31+
let client = Client::builder()
32+
.timeout(Duration::from_secs(5))
33+
.build()
34+
.expect("Failed to build HTTP client");
35+
36+
Self {
37+
client,
38+
base_url,
39+
enabled,
40+
api_key,
41+
}
42+
}
43+
44+
pub async fn get_location(&self, ip_address: &str) -> Result<Option<NodeLocation>> {
45+
if !self.enabled {
46+
return Ok(None);
47+
}
48+
49+
let url = format!(
50+
"{}/{}/json/?key={}",
51+
self.base_url, ip_address, self.api_key
52+
);
53+
54+
let response = self
55+
.client
56+
.get(&url)
57+
.send()
58+
.await
59+
.context("Failed to send request to location service")?;
60+
61+
let api_response: IpApiResponse = response
62+
.json()
63+
.await
64+
.context("Failed to parse location service response")?;
65+
66+
Ok(Some(NodeLocation {
67+
latitude: api_response.latitude,
68+
longitude: api_response.longitude,
69+
city: api_response.city,
70+
region: api_response.region,
71+
country: api_response.country,
72+
}))
73+
}
74+
}
75+
76+
impl Default for LocationService {
77+
fn default() -> Self {
78+
Self::new(None, None)
79+
}
80+
}

crates/discovery/src/main.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
mod api;
22
mod chainsync;
3+
mod location_enrichment;
4+
mod location_service;
35
mod store;
46
use crate::api::server::start_server;
57
use crate::chainsync::ChainSync;
8+
use crate::location_enrichment::LocationEnrichmentService;
9+
use crate::location_service::LocationService;
610
use crate::store::node_store::NodeStore;
711
use crate::store::redis::RedisStore;
812
use alloy::providers::RootProvider;
@@ -64,6 +68,14 @@ struct Args {
6468
/// Service mode: api, processor, or full
6569
#[arg(short = 'm', long, default_value = "full")]
6670
mode: ServiceMode,
71+
72+
/// Location service URL (e.g., https://ipapi.co). If not provided, location services are disabled.
73+
#[arg(long)]
74+
location_service_url: Option<String>,
75+
76+
/// Location service API key
77+
#[arg(long)]
78+
location_service_api_key: Option<String>,
6779
}
6880

6981
#[tokio::main]
@@ -110,6 +122,26 @@ async fn main() -> Result<()> {
110122
);
111123
chain_sync.run().await?;
112124

125+
// Start location enrichment service if enabled
126+
if let Some(location_url) = args.location_service_url.clone() {
127+
let location_service = Arc::new(LocationService::new(
128+
Some(location_url),
129+
args.location_service_api_key.clone(),
130+
));
131+
let location_enrichment = LocationEnrichmentService::new(
132+
node_store.clone(),
133+
location_service,
134+
&args.redis_url,
135+
)?;
136+
137+
info!("Starting location enrichment service");
138+
tokio::spawn(async move {
139+
if let Err(e) = location_enrichment.run(30).await {
140+
error!("Location enrichment service failed: {}", e);
141+
}
142+
});
143+
}
144+
113145
if let Err(err) = start_server(
114146
"0.0.0.0",
115147
args.port,

0 commit comments

Comments
 (0)