Skip to content

Commit 803e964

Browse files
authored
Merge pull request dmnd-pool#137 from Priceless-P/feat/worker-activity
Log worker activity
2 parents 4e3459b + 8c8acf7 commit 803e964

4 files changed

Lines changed: 118 additions & 6 deletions

File tree

src/monitor/mod.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,19 @@ use tracing::{debug, error};
44

55
use crate::{
66
config::Configuration,
7-
monitor::{logs::ProxyLog, shares::ShareInfo},
7+
monitor::{logs::ProxyLog, shares::ShareInfo, worker_activity::WorkerActivity},
88
shared::error::Error,
99
LOCAL_URL, PRODUCTION_URL, STAGING_URL, TESTNET3_URL,
1010
};
1111

1212
pub mod logs;
1313
pub mod shares;
14-
14+
pub mod worker_activity;
1515
pub struct MonitorAPI {
1616
pub url: Url,
1717
pub client: reqwest::Client,
1818
}
19+
1920
fn proxy_log_server_endpoint() -> String {
2021
match Configuration::environment().as_str() {
2122
"staging" => format!("{}/api/proxy/logs", STAGING_URL),
@@ -36,6 +37,17 @@ fn shares_server_endpoint() -> String {
3637
}
3738
}
3839

40+
fn worker_activity_server_endpoint() -> String {
41+
// Determine the monitoring server URL based on the environment
42+
match Configuration::environment().as_str() {
43+
"staging" => format!("{}/api/worker/activity", LOCAL_URL),
44+
"testnet3" => format!("{}/api/worker/activity", TESTNET3_URL),
45+
"local" => format!("{}/api/worker/activity", LOCAL_URL),
46+
"production" => format!("{}/api/worker/activity", PRODUCTION_URL),
47+
_ => unreachable!(),
48+
}
49+
}
50+
3951
impl MonitorAPI {
4052
pub fn new(url: String) -> Self {
4153
let client = reqwest::Client::new();
@@ -86,4 +98,24 @@ impl MonitorAPI {
8698
}
8799
}
88100
}
101+
102+
/// Sends a worker activity log to the monitoring server.
103+
pub async fn send_worker_activity(&self, activity: WorkerActivity) -> Result<(), Error> {
104+
let token = crate::config::Configuration::token().expect("Token is not set");
105+
debug!("Sending worker activity to API: {:?}", activity);
106+
let response = self
107+
.client
108+
.post(worker_activity_server_endpoint())
109+
.json(&json!({ "data": activity, "token": token }))
110+
.send()
111+
.await?;
112+
113+
match response.error_for_status() {
114+
Ok(_) => Ok(()),
115+
Err(err) => {
116+
error!("Failed to send worker activity: {}", err);
117+
Err(err.into())
118+
}
119+
}
120+
}
89121
}

src/monitor/worker_activity.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use crate::monitor::{worker_activity_server_endpoint, MonitorAPI};
2+
3+
#[derive(serde::Serialize, Debug)]
4+
pub enum WorkerActivityType {
5+
Connected,
6+
Disconnected,
7+
}
8+
9+
#[derive(serde::Serialize, Debug)]
10+
pub struct WorkerActivity {
11+
user_agent: String,
12+
worker_name: String,
13+
activity: WorkerActivityType,
14+
}
15+
16+
impl WorkerActivity {
17+
pub fn new(user_agent: String, worker_name: String, activity: WorkerActivityType) -> Self {
18+
WorkerActivity {
19+
user_agent,
20+
worker_name,
21+
activity,
22+
}
23+
}
24+
25+
pub fn monitor_api(&self) -> MonitorAPI {
26+
MonitorAPI::new(worker_activity_server_endpoint())
27+
}
28+
}

src/translator/downstream/downstream.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use crate::{
22
api::stats::StatsSender,
3-
monitor::shares::{RejectionReason, ShareInfo, SharesMonitor},
3+
monitor::{
4+
shares::{RejectionReason, ShareInfo, SharesMonitor},
5+
worker_activity::{WorkerActivity, WorkerActivityType},
6+
},
47
proxy_state::{DownstreamType, ProxyState},
58
shared::utils::AbortOnDrop,
69
translator::{error::Error, utils::validate_share},
@@ -117,6 +120,7 @@ pub struct Downstream {
117120
pub recent_jobs: RecentJobs,
118121
pub first_job: Notify<'static>,
119122
pub share_monitor: SharesMonitor,
123+
pub user_agent: std::cell::RefCell<String>, // RefCell is used here because `handle_subscribe` and `handle_authorize` take &self not &mut self and we need to mutate user_agent
120124
}
121125

122126
impl Downstream {
@@ -197,6 +201,7 @@ impl Downstream {
197201
recent_jobs: RecentJobs::new(),
198202
first_job: last_notify.expect("we have an assertion at the beginning of this function"),
199203
share_monitor: SharesMonitor::new(),
204+
user_agent: std::cell::RefCell::new(String::new()),
200205
}));
201206

202207
if let Err(e) = start_receive_downstream(
@@ -399,6 +404,7 @@ impl Downstream {
399404
stats_sender,
400405
recent_jobs: RecentJobs::new(),
401406
share_monitor: SharesMonitor::new(),
407+
user_agent: std::cell::RefCell::new(String::new()),
402408
}
403409
}
404410
}
@@ -446,12 +452,29 @@ impl IsServer<'static> for Downstream {
446452
"mining.notify".to_string(),
447453
"ae6812eb4cd7735a302a8a9dd95cf71f".to_string(),
448454
);
449-
455+
self.user_agent.replace(request.agent_signature.clone());
450456
vec![set_difficulty_sub, notify_sub]
451457
}
452458

453-
fn handle_authorize(&self, _request: &client_to_server::Authorize) -> bool {
459+
fn handle_authorize(&self, request: &client_to_server::Authorize) -> bool {
454460
if self.authorized_names.is_empty() {
461+
let user_agent = self.user_agent.borrow().clone();
462+
let worker_activity = WorkerActivity::new(
463+
user_agent,
464+
request.name.clone(),
465+
WorkerActivityType::Connected,
466+
);
467+
468+
tokio::spawn(async move {
469+
if let Err(e) = worker_activity
470+
.monitor_api()
471+
.send_worker_activity(worker_activity)
472+
.await
473+
{
474+
error!("Failed to send worker activity: {}", e);
475+
}
476+
});
477+
455478
true
456479
} else {
457480
// when downstream is already authorized we do not want return an ok response otherwise

src/translator/downstream/receive_from_downstream.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
use super::{downstream::Downstream, task_manager::TaskManager};
2-
use crate::{proxy_state::ProxyState, translator::error::Error};
2+
use crate::{
3+
monitor::worker_activity::{WorkerActivity, WorkerActivityType},
4+
proxy_state::ProxyState,
5+
translator::error::Error,
6+
};
37
use roles_logic_sv2::utils::Mutex;
48
use std::sync::Arc;
59
use sv1_api::{client_to_server::Submit, json_rpc};
@@ -58,6 +62,31 @@ pub async fn start_receive_downstream(
5862
if let Err(e) = Downstream::remove_downstream_hashrate_from_channel(&downstream) {
5963
error!("Failed to remove downstream hashrate from channel: {}", e)
6064
};
65+
66+
let (worker_name, user_agent) = downstream
67+
.safe_lock(|d| {
68+
(
69+
d.authorized_names.first().cloned().unwrap_or_default(),
70+
d.user_agent.borrow().clone(),
71+
)
72+
})
73+
.unwrap_or_else(|e| {
74+
error!("Failed to lock downstream: {:?}", e);
75+
ProxyState::update_inconsistency(Some(1));
76+
("unknown".to_string(), "unknown".to_string())
77+
});
78+
79+
let worker_activity =
80+
WorkerActivity::new(user_agent, worker_name, WorkerActivityType::Disconnected);
81+
82+
worker_activity
83+
.monitor_api()
84+
.send_worker_activity(worker_activity)
85+
.await
86+
.unwrap_or_else(|e| {
87+
error!("Failed to send worker activity: {}", e);
88+
});
89+
6190
// Apparently there is no way to make the compiler happy without unwrapping here. But
6291
// is not an issue since:
6392
// 1. the mutex should never get poisioned and if it does will be very very rare

0 commit comments

Comments
 (0)