Skip to content

Commit 3eb66dd

Browse files
authored
Merge pull request dmnd-pool#108 from Priceless-P/feat/send-worker-shares
Send workers shares
2 parents e3bb07a + 2f172ed commit 3eb66dd

6 files changed

Lines changed: 433 additions & 98 deletions

File tree

src/config.rs

Lines changed: 121 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
use clap::Parser;
22
use lazy_static::lazy_static;
33
use serde::{Deserialize, Serialize};
4+
use serde_json::json;
45
use std::{
56
net::{SocketAddr, ToSocketAddrs},
67
path::PathBuf,
78
};
89
use tracing::{debug, error, info, warn};
910

10-
use crate::{HashUnit, DEFAULT_SV1_HASHPOWER};
11+
use crate::{shared::error::Error, HashUnit, DEFAULT_SV1_HASHPOWER, PRODUCTION_URL, STAGING_URL};
1112
lazy_static! {
1213
pub static ref CONFIG: Configuration = Configuration::load_config();
1314
}
1415
#[derive(Parser)]
1516
struct Args {
1617
#[clap(long)]
17-
test: bool,
18+
staging: bool,
19+
#[clap(long)]
20+
local: bool,
1821
#[clap(long = "d", short = 'd', value_parser = parse_hashrate)]
1922
downstream_hashrate: Option<f32>,
2023
#[clap(long = "loglevel", short = 'l')]
@@ -27,10 +30,6 @@ struct Args {
2730
delay: Option<u64>,
2831
#[clap(long = "interval", short = 'i')]
2932
adjustment_interval: Option<u64>,
30-
#[clap(long = "pool", short = 'p', value_delimiter = ',')]
31-
pool_addresses: Option<Vec<String>>,
32-
#[clap(long = "test-pool", value_delimiter = ',')]
33-
test_pool_addresses: Option<Vec<String>>,
3433
#[clap(long)]
3534
token: Option<String>,
3635
#[clap(long)]
@@ -51,33 +50,52 @@ struct Args {
5150
struct ConfigFile {
5251
token: Option<String>,
5352
tp_address: Option<String>,
54-
pool_addresses: Option<Vec<String>>,
55-
test_pool_addresses: Option<Vec<String>>,
5653
interval: Option<u64>,
5754
delay: Option<u64>,
5855
downstream_hashrate: Option<String>,
5956
loglevel: Option<String>,
6057
nc_loglevel: Option<String>,
6158
sv1_log: Option<bool>,
62-
test: Option<bool>,
59+
staging: Option<bool>,
60+
local: Option<bool>,
6361
listening_addr: Option<String>,
6462
api_server_port: Option<String>,
6563
monitor: Option<bool>,
6664
auto_update: Option<bool>,
6765
}
6866

67+
impl ConfigFile {
68+
pub fn default() -> Self {
69+
ConfigFile {
70+
token: None,
71+
tp_address: None,
72+
interval: None,
73+
delay: None,
74+
downstream_hashrate: None,
75+
loglevel: None,
76+
nc_loglevel: None,
77+
sv1_log: None,
78+
staging: None,
79+
local: None,
80+
listening_addr: None,
81+
api_server_port: None,
82+
monitor: None,
83+
auto_update: None,
84+
}
85+
}
86+
}
87+
6988
pub struct Configuration {
7089
token: Option<String>,
7190
tp_address: Option<String>,
72-
pool_addresses: Option<Vec<SocketAddr>>,
73-
test_pool_addresses: Option<Vec<SocketAddr>>,
7491
interval: u64,
7592
delay: u64,
7693
downstream_hashrate: f32,
7794
loglevel: String,
7895
nc_loglevel: String,
7996
sv1_log: bool,
80-
test: bool,
97+
staging: bool,
98+
local: bool,
8199
listening_addr: Option<String>,
82100
api_server_port: String,
83101
monitor: bool,
@@ -92,11 +110,13 @@ impl Configuration {
92110
CONFIG.tp_address.clone()
93111
}
94112

95-
pub fn pool_address() -> Option<Vec<SocketAddr>> {
96-
if CONFIG.test {
97-
CONFIG.test_pool_addresses.clone() // Return test pool addresses in test mode
98-
} else {
99-
CONFIG.pool_addresses.clone()
113+
pub async fn pool_address() -> Option<Vec<SocketAddr>> {
114+
match fetch_pool_urls().await {
115+
Ok(addresses) => Some(addresses),
116+
Err(e) => {
117+
error!("Failed to fetch pool addresses: {}", e);
118+
None
119+
}
100120
}
101121
}
102122

@@ -149,8 +169,25 @@ impl Configuration {
149169
CONFIG.sv1_log
150170
}
151171

152-
pub fn test() -> bool {
153-
CONFIG.test
172+
pub fn staging() -> bool {
173+
CONFIG.staging
174+
}
175+
176+
pub fn local() -> bool {
177+
CONFIG.local
178+
}
179+
180+
/// Returns the environment based on the configuration.
181+
/// Possible values: "staging", "local", "production".
182+
/// If no environment is set, it defaults to "production".
183+
pub fn environment() -> String {
184+
if CONFIG.staging {
185+
"staging".to_string()
186+
} else if CONFIG.local {
187+
"local".to_string()
188+
} else {
189+
"production".to_string()
190+
}
154191
}
155192

156193
pub fn monitor() -> bool {
@@ -168,23 +205,7 @@ impl Configuration {
168205
let config: ConfigFile = std::fs::read_to_string(&config_path)
169206
.ok()
170207
.and_then(|content| toml::from_str(&content).ok())
171-
.unwrap_or(ConfigFile {
172-
token: None,
173-
tp_address: None,
174-
pool_addresses: None,
175-
test_pool_addresses: None,
176-
interval: None,
177-
delay: None,
178-
downstream_hashrate: None,
179-
loglevel: None,
180-
nc_loglevel: None,
181-
sv1_log: None,
182-
test: None,
183-
listening_addr: None,
184-
api_server_port: None,
185-
monitor: None,
186-
auto_update: None,
187-
});
208+
.unwrap_or(ConfigFile::default());
188209

189210
let token = args
190211
.token
@@ -197,54 +218,6 @@ impl Configuration {
197218
.or(config.tp_address)
198219
.or_else(|| std::env::var("TP_ADDRESS").ok());
199220

200-
let pool_addresses: Option<Vec<SocketAddr>> = args
201-
.pool_addresses
202-
.map(|addresses| {
203-
addresses
204-
.into_iter()
205-
.map(parse_address)
206-
.collect::<Vec<SocketAddr>>()
207-
})
208-
.or_else(|| {
209-
config.pool_addresses.map(|addresses| {
210-
addresses
211-
.into_iter()
212-
.map(parse_address)
213-
.collect::<Vec<SocketAddr>>()
214-
})
215-
})
216-
.or_else(|| {
217-
std::env::var("POOL_ADDRESSES").ok().map(|s| {
218-
s.split(',')
219-
.map(|s| parse_address(s.trim().to_string()))
220-
.collect::<Vec<SocketAddr>>()
221-
})
222-
});
223-
224-
let test_pool_addresses: Option<Vec<SocketAddr>> = args
225-
.test_pool_addresses
226-
.map(|addresses| {
227-
addresses
228-
.into_iter()
229-
.map(parse_address)
230-
.collect::<Vec<SocketAddr>>()
231-
})
232-
.or_else(|| {
233-
config.test_pool_addresses.map(|addresses| {
234-
addresses
235-
.into_iter()
236-
.map(parse_address)
237-
.collect::<Vec<SocketAddr>>()
238-
})
239-
})
240-
.or_else(|| {
241-
std::env::var("TEST_POOL_ADDRESSES").ok().map(|s| {
242-
s.split(',')
243-
.map(|s| parse_address(s.trim().to_string()))
244-
.collect::<Vec<SocketAddr>>()
245-
})
246-
});
247-
248221
let interval = args
249222
.adjustment_interval
250223
.or(config.interval)
@@ -316,8 +289,9 @@ impl Configuration {
316289
|| config.sv1_log.unwrap_or(false)
317290
|| std::env::var("SV1_LOGLEVEL").is_ok();
318291

319-
let test = args.test || config.test.unwrap_or(false) || std::env::var("TEST").is_ok();
320-
292+
let staging =
293+
args.staging || config.staging.unwrap_or(false) || std::env::var("STAGING").is_ok();
294+
let local = args.local || config.local.unwrap_or(false) || std::env::var("LOCAL").is_ok();
321295
let monitor =
322296
args.monitor || config.monitor.unwrap_or(false) || std::env::var("MONITOR").is_ok();
323297

@@ -328,15 +302,14 @@ impl Configuration {
328302
Configuration {
329303
token,
330304
tp_address,
331-
pool_addresses,
332-
test_pool_addresses,
333305
interval,
334306
delay,
335307
downstream_hashrate,
336308
loglevel,
337309
nc_loglevel,
338310
sv1_log,
339-
test,
311+
staging,
312+
local,
340313
listening_addr,
341314
api_server_port,
342315
monitor,
@@ -385,3 +358,62 @@ fn parse_address(addr: String) -> SocketAddr {
385358
.next()
386359
.expect("No socket address resolved")
387360
}
361+
362+
/// Fetches pool URLs from the server based on the environment.
363+
async fn fetch_pool_urls() -> Result<Vec<SocketAddr>, Error> {
364+
if CONFIG.local {
365+
return Ok(vec![parse_address("127.0.0.1:20000".to_string())]);
366+
};
367+
368+
let url = if CONFIG.staging {
369+
info!("Fetching pool URLs from staging server: {}", STAGING_URL);
370+
STAGING_URL
371+
} else {
372+
info!(
373+
"Fetching pool URLs from production server: {}",
374+
PRODUCTION_URL
375+
);
376+
PRODUCTION_URL
377+
};
378+
let endpoint = format!("{}/api/pool/urls", url);
379+
let token = Configuration::token().expect("TOKEN is not set");
380+
381+
let response = match reqwest::Client::new()
382+
.post(endpoint)
383+
.json(&json!({"token": token}))
384+
.send()
385+
.await
386+
{
387+
Ok(resp) => resp,
388+
Err(e) => {
389+
error!("Failed to fetch pool urls: {}", e);
390+
return Err(Error::from(e));
391+
}
392+
};
393+
394+
debug!("Response status: {}", response.status());
395+
let addresses: Vec<PoolAddress> = match response.json().await {
396+
Ok(addrs) => addrs,
397+
Err(e) => {
398+
error!("Failed to parse pool urls: {}", e);
399+
return Err(Error::from(e));
400+
}
401+
};
402+
403+
// Parse the addresses into SocketAddr
404+
let socket_addrs: Vec<SocketAddr> = addresses
405+
.into_iter()
406+
.map(|addr| {
407+
let address = format!("{}:{}", addr.host, addr.port);
408+
parse_address(address)
409+
})
410+
.collect();
411+
debug!("Pool addresses: {:?}", socket_addrs);
412+
Ok(socket_addrs)
413+
}
414+
415+
#[derive(Debug, Deserialize)]
416+
struct PoolAddress {
417+
host: String,
418+
port: u16,
419+
}

src/main.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ mod proxy_state;
2525
mod router;
2626
mod share_accounter;
2727
mod shared;
28+
mod shares_monitor;
2829
mod translator;
2930

3031
const TRANSLATOR_BUFFER_SIZE: usize = 32;
@@ -41,6 +42,9 @@ const DEFAULT_LISTEN_ADDRESS: &str = "0.0.0.0:32767";
4142
const REPO_OWNER: &str = "demand-open-source";
4243
const REPO_NAME: &str = "demand-cli";
4344
const BIN_NAME: &str = "demand-cli";
45+
const STAGING_URL: &str = "https://staging-user-dashboard-server.dmnd.work";
46+
const LOCAL_URL: &str = "http://localhost:8787/api";
47+
const PRODUCTION_URL: &str = "https://production-user-dashboard-server.dmnd.work";
4448

4549
lazy_static! {
4650
static ref SV1_DOWN_LISTEN_ADDR: String =
@@ -54,7 +58,10 @@ lazy_static! {
5458
}
5559

5660
lazy_static! {
57-
pub static ref AUTH_PUB_KEY: &'static str = if Configuration::test() {
61+
62+
// for staging and local environments, use the test auth public key
63+
// for production, use the main auth public key
64+
pub static ref AUTH_PUB_KEY: &'static str = if Configuration::staging() || Configuration::local() {
5865
TEST_AUTH_PUB_KEY
5966
} else {
6067
MAIN_AUTH_PUB_KEY
@@ -87,20 +94,23 @@ async fn main() {
8794
};
8895
}
8996

90-
if Configuration::test() {
91-
info!("Package is running in test mode");
97+
if Configuration::staging() {
98+
info!("Package is running in staging mode");
99+
}
100+
if Configuration::local() {
101+
info!("Package is running in local mode");
92102
}
93103

94104
let auth_pub_k: Secp256k1PublicKey = AUTH_PUB_KEY.parse().expect("Invalid public key");
95105

96106
let pool_addresses = Configuration::pool_address()
107+
.await
97108
.filter(|p| !p.is_empty())
98-
.unwrap_or_else(|| {
99-
if Configuration::test() {
100-
panic!("Test pool address is missing");
101-
} else {
102-
panic!("Pool address is missing");
103-
}
109+
.unwrap_or_else(|| match Configuration::environment().as_str() {
110+
"staging" => panic!("Staging pool address is missing"),
111+
"local" => panic!("Local pool address is missing"),
112+
"production" => panic!("Pool address is missing"),
113+
_ => unreachable!(),
104114
});
105115

106116
let mut router = router::Router::new(pool_addresses, auth_pub_k, None, None);

0 commit comments

Comments
 (0)