Skip to content

Commit ea9d3b9

Browse files
authored
Merge pull request dmnd-pool#122 from jbesraa/2025-08-13/shares-monitor-fixes
Monitoring and other fixes before release
2 parents 8ec67c2 + cbd829f commit ea9d3b9

4 files changed

Lines changed: 54 additions & 70 deletions

File tree

src/config.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use serde_json::json;
55
use std::{
66
net::{SocketAddr, ToSocketAddrs},
77
path::PathBuf,
8+
time::Duration,
89
};
910
use tracing::{debug, error, info, warn};
1011

@@ -337,6 +338,7 @@ impl Configuration {
337338

338339
/// Parses a hashrate string (e.g., "10T", "2.5P", "500E") into an f32 value in h/s.
339340
fn parse_hashrate(hashrate_str: &str) -> Result<f32, String> {
341+
info!("Received hashrate: '{}'", hashrate_str);
340342
let hashrate_str = hashrate_str.trim();
341343
if hashrate_str.is_empty() {
342344
return Err("Hashrate cannot be empty. Expected format: '<number><unit>' (e.g., '10T', '2.5P', '5E'".to_string());
@@ -364,7 +366,7 @@ fn parse_hashrate(hashrate_str: &str) -> Result<f32, String> {
364366
if hashrate.is_infinite() || hashrate.is_nan() {
365367
return Err("Hashrate too large or invalid".to_string());
366368
}
367-
369+
info!("Parsed hashrate: {} h/s", hashrate);
368370
Ok(hashrate)
369371
}
370372

@@ -387,37 +389,41 @@ fn parse_address(addr: String) -> Option<SocketAddr> {
387389
/// Fetches pool URLs from the server based on the environment.
388390
async fn fetch_pool_urls() -> Result<Vec<SocketAddr>, Error> {
389391
if CONFIG.local {
392+
info!("Running in local mode, using hardcoded address 127.0.0.1:20000");
390393
return Ok(vec![
391394
parse_address("127.0.0.1:20000".to_string()).expect("Invalid local address")
392395
]);
393396
};
394-
395397
let url = if CONFIG.staging {
396-
info!("Fetching pool URLs from staging server: {}", STAGING_URL);
397398
STAGING_URL
398399
} else if CONFIG.testnet3 {
399-
info!("Fetching pool URLs from testnet server: {}", TESTNET3_URL);
400400
TESTNET3_URL
401401
} else {
402-
info!(
403-
"Fetching pool URLs from production server: {}",
404-
PRODUCTION_URL
405-
);
406402
PRODUCTION_URL
407403
};
408404
let endpoint = format!("{}/api/pool/urls", url);
405+
info!("Fetching pool URLs from: {}", endpoint);
409406
let token = Configuration::token().expect("TOKEN is not set");
407+
let mut retries = 8;
408+
let client = reqwest::Client::new();
410409

411-
let response = match reqwest::Client::new()
412-
.post(endpoint)
413-
.json(&json!({"token": token}))
414-
.send()
415-
.await
416-
{
417-
Ok(resp) => resp,
418-
Err(e) => {
419-
error!("Failed to fetch pool urls: {}", e);
420-
return Err(Error::from(e));
410+
let response = loop {
411+
let request = client
412+
.post(endpoint.clone())
413+
.json(&json!({"token": token}))
414+
.timeout(Duration::from_secs(15));
415+
416+
match request.send().await {
417+
Ok(resp) => break resp,
418+
Err(e) => {
419+
error!("Failed to fetch pool urls: {}", e);
420+
if retries == 0 {
421+
return Err(Error::from(e));
422+
}
423+
retries -= 1;
424+
info!("Retrying in 3 seconds...");
425+
tokio::time::sleep(Duration::from_secs(3)).await;
426+
}
421427
}
422428
};
423429

@@ -436,9 +442,10 @@ async fn fetch_pool_urls() -> Result<Vec<SocketAddr>, Error> {
436442
.filter_map(|addr| {
437443
let address = format!("{}:{}", addr.host, addr.port);
438444
parse_address(address)
439-
}) // Filter out any None values
445+
}) // Filter out any None values, i.e., invalid addresses
440446
.collect();
441-
debug!("Pool addresses: {:?}", socket_addrs);
447+
info!("Found {} pool addresses", socket_addrs.len());
448+
info!("Pool addresses: {:?}", &socket_addrs);
442449
Ok(socket_addrs)
443450
}
444451

src/main.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ const DEFAULT_SV1_HASHPOWER: f32 = 100_000_000_000_000.0;
3636
const SHARE_PER_MIN: f32 = 10.0;
3737
const CHANNEL_DIFF_UPDTATE_INTERVAL: u32 = 10;
3838
const MAX_LEN_DOWN_MSG: u32 = 10000;
39-
const MAIN_AUTH_PUB_KEY: &str = "9bQHWXsQ2J9TRFTaxRh3KjoxdyLRfWVEy25YHtKF8y8gotLoCZZ";
39+
const MAIN_AUTH_PUB_KEY: &str = "9c44K6QVizyPWb9xfeqhckFRosxWwB3EfytGa4CfTdD526qb2QV";
4040
const TEST_AUTH_PUB_KEY: &str = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72";
4141
const DEFAULT_LISTEN_ADDRESS: &str = "0.0.0.0:32767";
4242
const REPO_OWNER: &str = "demand-open-source";
@@ -147,7 +147,6 @@ async fn initialize_proxy(
147147
);
148148
let secs = 5;
149149
tokio::time::sleep(Duration::from_secs(secs)).await;
150-
// Restart loop, esentially restarting proxy
151150
continue;
152151
}
153152
};

src/monitor/shares.rs

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use roles_logic_sv2::utils::Mutex;
22
use std::sync::Arc;
3-
use tracing::{debug, error};
4-
const BATCH_SIZE: u32 = 20; // Default batch size for sending shares
3+
use tracing::{error, info, warn};
54

65
use crate::{
76
monitor::{shares_server_endpoint, MonitorAPI},
@@ -13,7 +12,8 @@ pub struct ShareInfo {
1312
worker_name: String,
1413
difficulty: Option<f32>,
1514
job_id: i64,
16-
rejection_reason: Option<RejectionReason>, // if None, the share was accepted
15+
// if None, the share was accepted
16+
rejection_reason: Option<RejectionReason>,
1717
timestamp: u64,
1818
}
1919

@@ -39,21 +39,19 @@ impl ShareInfo {
3939

4040
#[derive(Debug, Clone)]
4141
pub struct SharesMonitor {
42-
pending_shares: Arc<Mutex<Vec<ShareInfo>>>,
43-
batch_size: u32,
42+
shares: Arc<Mutex<Vec<ShareInfo>>>,
4443
}
4544

4645
impl SharesMonitor {
4746
pub fn new() -> Self {
4847
SharesMonitor {
49-
pending_shares: Arc::new(Mutex::new(Vec::new())),
50-
batch_size: BATCH_SIZE,
48+
shares: Arc::new(Mutex::new(Vec::new())),
5149
}
5250
}
5351

5452
/// Inserts a new share into the pending shares list.
5553
pub fn insert_share(&self, share: ShareInfo) {
56-
self.pending_shares
54+
self.shares
5755
.safe_lock(|event| {
5856
event.push(share);
5957
})
@@ -64,8 +62,8 @@ impl SharesMonitor {
6462
}
6563

6664
/// Retrieves the list of pending shares.
67-
fn get_pending_shares(&self) -> Vec<ShareInfo> {
68-
self.pending_shares
65+
fn get_next_shares(&self) -> Vec<ShareInfo> {
66+
self.shares
6967
.safe_lock(|event| event.clone())
7068
.unwrap_or_else(|e| {
7169
error!("Failed to lock pending shares: {:?}", e);
@@ -75,8 +73,8 @@ impl SharesMonitor {
7573
}
7674

7775
/// Clears the list of pending shares.
78-
fn clear_pending_shares(&self) {
79-
self.pending_shares
76+
fn clear_next_shares(&self) {
77+
self.shares
8078
.safe_lock(|event| {
8179
event.clear();
8280
})
@@ -86,34 +84,29 @@ impl SharesMonitor {
8684
});
8785
}
8886

89-
/// Monitors the pending shares and sends them to the monitoring server in batches.
9087
pub async fn monitor(&self) {
9188
let api = MonitorAPI::new(shares_server_endpoint());
92-
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); // Check every 60 seconds
93-
interval.tick().await; // Skip the first tick to avoid unnecessary error log
89+
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
90+
// First tick completes immediately
91+
interval.tick().await;
9492
loop {
9593
interval.tick().await;
96-
let shares_to_send = self.get_pending_shares();
94+
let shares_to_send = self.get_next_shares();
9795
if !shares_to_send.is_empty() {
98-
if shares_to_send.len() >= self.batch_size as usize {
99-
match api.send_shares(shares_to_send.clone()).await {
100-
Ok(_) => {
101-
debug!("Successfully sent Shares: {:?} to API", &shares_to_send);
102-
}
103-
Err(err) => {
104-
error!("Failed to send shares: {}", err);
105-
}
96+
match api.send_shares(shares_to_send.clone()).await {
97+
Ok(_) => {
98+
info!(
99+
"Saved {} shares to the monitoring server",
100+
shares_to_send.len()
101+
);
102+
self.clear_next_shares();
103+
}
104+
Err(err) => {
105+
warn!("Failed to send shares, this does not affect mining but may cause issues with monitoring: {:?}", err);
106106
}
107-
self.clear_pending_shares(); // Clear after sending
108-
} else {
109-
debug!(
110-
"Current shares count ({}) is less than batch size ({}), waiting for more",
111-
shares_to_send.len(),
112-
self.batch_size
113-
);
114107
}
115108
} else {
116-
error!("No pending shares to send");
109+
warn!("No pending shares to send. If this happens frequently, check your miner.");
117110
}
118111
}
119112
}

src/router/mod.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ impl Router {
196196
let auth_pub_key = self.auth_pub_k;
197197

198198
tokio::time::timeout(
199-
Duration::from_secs(15),
199+
Duration::from_secs(8),
200200
PoolLatency::get_mining_setup_latencies(
201201
&mut pool,
202202
setup_connection_msg.cloned(),
@@ -212,21 +212,6 @@ impl Router {
212212
);
213213
})??;
214214

215-
if (PoolLatency::get_mining_setup_latencies(
216-
&mut pool,
217-
setup_connection_msg.cloned(),
218-
timer.cloned(),
219-
auth_pub_key,
220-
)
221-
.await)
222-
.is_err()
223-
{
224-
error!(
225-
"Failed to get mining setup latencies for: {:?}",
226-
pool_address
227-
);
228-
return Err(());
229-
}
230215
if (PoolLatency::get_jd_latencies(&mut pool, auth_pub_key).await).is_err() {
231216
error!("Failed to get jd setup latencies for: {:?}", pool_address);
232217
return Err(());

0 commit comments

Comments
 (0)