Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 7a94d8a

Browse files
authored
Merge pull request #525 from PrimeIntellect-ai/release/v0.3.1
Release v.0.3.1
2 parents 789752f + f61357f commit 7a94d8a

22 files changed

Lines changed: 551 additions & 30 deletions

File tree

Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ iroh = "0.34.1"
4040
rand_v8 = { package = "rand", version = "0.8.5", features = ["std"] }
4141
rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] }
4242
[workspace.package]
43-
version = "0.3.0"
43+
version = "0.3.1"
4444
edition = "2021"
4545

4646
[workspace.features]

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,16 @@ down:
6666
pkill -f "target/debug/orchestrator" 2>/dev/null || true
6767
pkill -f "target/debug/validator" 2>/dev/null || true
6868
pkill -f "target/debug/discovery" 2>/dev/null || true
69+
pkill -9 -f "cargo run --bin discovery" 2>/dev/null || true
70+
pkill -9 -f "cargo watch" 2>/dev/null || true
6971

7072
whitelist-provider:
7173
set -a; source ${ENV_FILE}; set +a; \
7274
cargo run -p dev-utils --example whitelist_provider -- --provider-address $${PROVIDER_ADDRESS} --key $${PRIVATE_KEY_VALIDATOR} --rpc-url $${RPC_URL}
7375

7476
watch-discovery:
7577
set -a; source .env; set +a; \
76-
cargo watch -w crates/discovery/src -x "run --bin discovery -- --rpc-url $${RPC_URL} --max-nodes-per-ip $${MAX_NODES_PER_IP:-2}"
78+
cargo watch -w crates/discovery/src -x "run --bin discovery -- --rpc-url $${RPC_URL} --max-nodes-per-ip $${MAX_NODES_PER_IP:-2} $${LOCATION_SERVICE_URL:+--location-service-url $${LOCATION_SERVICE_URL}} $${LOCATION_SERVICE_API_KEY:+--location-service-api-key $${LOCATION_SERVICE_API_KEY}}"
7779

7880
watch-worker:
7981
set -a; source ${ENV_FILE}; set +a; \

crates/discovery/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ futures = { workspace = true }
1313
log = { workspace = true }
1414
redis = { workspace = true, features = ["tokio-comp"] }
1515
redis-test = { workspace = true }
16+
reqwest = { workspace = true }
1617
serde = { workspace = true }
1718
serde_json = { workspace = true }
1819
shared = { workspace = true }

crates/discovery/Dockerfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ ENV REDIS_URL="redis://localhost:6380"
1010
ENV PORT="8089"
1111
ENV MAX_NODES_PER_IP="3"
1212
ENV MODE="full"
13+
ENV LOCATION_SERVICE_URL=""
14+
ENV LOCATION_SERVICE_API_KEY=""
1315

1416
RUN echo '#!/bin/sh\n\
1517
exec /usr/local/bin/discovery \
@@ -19,6 +21,8 @@ exec /usr/local/bin/discovery \
1921
--port "$PORT" \
2022
--max-nodes-per-ip "$MAX_NODES_PER_IP" \
2123
--mode "$MODE" \
24+
$([ ! -z "$LOCATION_SERVICE_URL" ] && echo "--location-service-url $LOCATION_SERVICE_URL") \
25+
$([ ! -z "$LOCATION_SERVICE_API_KEY" ] && echo "--location-service-api-key $LOCATION_SERVICE_API_KEY") \
2226
"$@"' > /entrypoint.sh && \
2327
chmod +x /entrypoint.sh
2428

crates/discovery/src/api/routes/node.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ pub async fn register_node(
204204

205205
let node_store = data.node_store.clone();
206206

207-
match node_store.register_node(node.clone()).await {
207+
match node_store.register_node(node.into_inner()).await {
208208
Ok(_) => HttpResponse::Ok().json(ApiResponse::new(true, "Node registered successfully")),
209209
Err(_) => HttpResponse::InternalServerError()
210210
.json(ApiResponse::new(false, "Internal server error")),
@@ -360,6 +360,7 @@ mod tests {
360360
is_blacklisted: false,
361361
last_updated: None,
362362
created_at: None,
363+
location: None,
363364
};
364365

365366
match app_state.node_store.update_node(validated).await {

crates/discovery/src/chainsync/sync.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,25 @@ impl ChainSync {
103103
})?;
104104
n.is_blacklisted = is_blacklisted;
105105

106-
match node_store.update_node(n).await {
107-
Ok(_) => {
108-
debug!("Successfully updated node {}", node.id);
109-
Ok(())
110-
}
111-
Err(e) => {
112-
error!("Error updating node {}: {}", node.id, e);
113-
Err(anyhow::anyhow!("Failed to update node: {}", e))
106+
// Only update if the node has changed
107+
if n.is_active != node.is_active
108+
|| n.is_validated != node.is_validated
109+
|| n.is_provider_whitelisted != node.is_provider_whitelisted
110+
|| n.is_blacklisted != node.is_blacklisted
111+
{
112+
match node_store.update_node(n).await {
113+
Ok(_) => {
114+
debug!("Successfully updated node {}", node.id);
115+
Ok(())
116+
}
117+
Err(e) => {
118+
error!("Error updating node {}: {}", node.id, e);
119+
Err(anyhow::anyhow!("Failed to update node: {}", e))
120+
}
114121
}
122+
} else {
123+
debug!("Node {} unchanged, skipping update", node.id);
124+
Ok(())
115125
}
116126
}
117127

crates/discovery/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod api;
2+
pub mod chainsync;
3+
pub mod location_enrichment;
4+
pub mod location_service;
5+
pub mod store;
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use crate::location_service::LocationService;
2+
use crate::store::node_store::NodeStore;
3+
use anyhow::Result;
4+
use log::{error, info, warn};
5+
use redis::AsyncCommands;
6+
use std::sync::Arc;
7+
use std::time::Duration;
8+
use tokio::time::interval;
9+
10+
const LOCATION_RETRY_KEY: &str = "location:retries:";
11+
const MAX_RETRIES: u32 = 3;
12+
const BATCH_SIZE: usize = 10;
13+
14+
pub struct LocationEnrichmentService {
15+
node_store: Arc<NodeStore>,
16+
location_service: Arc<LocationService>,
17+
redis_client: redis::Client,
18+
}
19+
20+
impl LocationEnrichmentService {
21+
pub fn new(
22+
node_store: Arc<NodeStore>,
23+
location_service: Arc<LocationService>,
24+
redis_url: &str,
25+
) -> Result<Self> {
26+
let redis_client = redis::Client::open(redis_url)?;
27+
Ok(Self {
28+
node_store,
29+
location_service,
30+
redis_client,
31+
})
32+
}
33+
34+
pub async fn run(&self, interval_seconds: u64) -> Result<()> {
35+
let mut interval = interval(Duration::from_secs(interval_seconds));
36+
37+
loop {
38+
interval.tick().await;
39+
40+
if let Err(e) = self.enrich_nodes_without_location().await {
41+
error!("Location enrichment cycle failed: {}", e);
42+
}
43+
}
44+
}
45+
46+
async fn enrich_nodes_without_location(&self) -> Result<()> {
47+
let nodes = self.node_store.get_nodes().await?;
48+
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
49+
50+
let nodes_without_location: Vec<_> = nodes
51+
.into_iter()
52+
.filter(|node| node.location.is_none())
53+
.collect();
54+
55+
if nodes_without_location.is_empty() {
56+
return Ok(());
57+
}
58+
59+
info!(
60+
"Found {} nodes without location data",
61+
nodes_without_location.len()
62+
);
63+
64+
// Process in batches to respect rate limits
65+
for chunk in nodes_without_location.chunks(BATCH_SIZE) {
66+
for node in chunk {
67+
let retry_key = format!("{}{}", LOCATION_RETRY_KEY, node.id);
68+
let retries: u32 = conn.get(&retry_key).await.unwrap_or(0);
69+
70+
if retries >= MAX_RETRIES {
71+
continue; // Skip nodes that have exceeded retry limit
72+
}
73+
74+
match self.location_service.get_location(&node.ip_address).await {
75+
Ok(Some(location)) => {
76+
info!(
77+
"Successfully fetched location for node {}: {:?}",
78+
node.id, location
79+
);
80+
81+
let mut updated_node = node.clone();
82+
updated_node.location = Some(location);
83+
84+
if let Err(e) = self.node_store.update_node(updated_node).await {
85+
error!("Failed to update node {} with location: {}", node.id, e);
86+
} else {
87+
let _: () = conn.del(&retry_key).await?;
88+
}
89+
}
90+
Ok(None) => {
91+
// Location service is disabled
92+
break;
93+
}
94+
Err(e) => {
95+
warn!(
96+
"Failed to fetch location for node {} (attempt {}/{}): {}",
97+
node.id,
98+
retries + 1,
99+
MAX_RETRIES,
100+
e
101+
);
102+
103+
// Increment retry counter
104+
let _: () = conn.set_ex(&retry_key, retries + 1, 86400).await?;
105+
// Expire after 24h
106+
}
107+
}
108+
109+
// Rate limiting - wait between requests
110+
tokio::time::sleep(Duration::from_millis(100)).await;
111+
}
112+
113+
// Longer wait between batches
114+
tokio::time::sleep(Duration::from_secs(1)).await;
115+
}
116+
117+
Ok(())
118+
}
119+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use anyhow::{Context, Result};
2+
use reqwest::Client;
3+
use serde::{Deserialize, Serialize};
4+
use shared::models::node::NodeLocation;
5+
use std::time::Duration;
6+
7+
#[derive(Debug, Deserialize, Serialize)]
8+
struct IpApiResponse {
9+
ip: String,
10+
city: Option<String>,
11+
region: Option<String>,
12+
country: Option<String>,
13+
#[serde(default)]
14+
latitude: f64,
15+
#[serde(default)]
16+
longitude: f64,
17+
}
18+
19+
pub struct LocationService {
20+
client: Client,
21+
base_url: String,
22+
enabled: bool,
23+
api_key: String,
24+
}
25+
26+
impl LocationService {
27+
pub fn new(base_url: Option<String>, api_key: Option<String>) -> Self {
28+
let enabled = base_url.is_some();
29+
let base_url = base_url.unwrap_or_else(|| "https://ipapi.co".to_string());
30+
let api_key = api_key.unwrap_or_default();
31+
let client = Client::builder()
32+
.timeout(Duration::from_secs(5))
33+
.build()
34+
.expect("Failed to build HTTP client");
35+
36+
Self {
37+
client,
38+
base_url,
39+
enabled,
40+
api_key,
41+
}
42+
}
43+
44+
pub async fn get_location(&self, ip_address: &str) -> Result<Option<NodeLocation>> {
45+
if !self.enabled {
46+
return Ok(None);
47+
}
48+
49+
let url = format!(
50+
"{}/{}/json/?key={}",
51+
self.base_url, ip_address, self.api_key
52+
);
53+
54+
let response = self
55+
.client
56+
.get(&url)
57+
.send()
58+
.await
59+
.context("Failed to send request to location service")?;
60+
61+
let api_response: IpApiResponse = response
62+
.json()
63+
.await
64+
.context("Failed to parse location service response")?;
65+
66+
Ok(Some(NodeLocation {
67+
latitude: api_response.latitude,
68+
longitude: api_response.longitude,
69+
city: api_response.city,
70+
region: api_response.region,
71+
country: api_response.country,
72+
}))
73+
}
74+
}
75+
76+
impl Default for LocationService {
77+
fn default() -> Self {
78+
Self::new(None, None)
79+
}
80+
}

0 commit comments

Comments
 (0)