Skip to content

Commit 2f092ff

Browse files
committed
send errors logs to server
1 parent 1d5a394 commit 2f092ff

5 files changed

Lines changed: 187 additions & 63 deletions

File tree

src/main.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
#[cfg(not(target_os = "windows"))]
22
use jemallocator::Jemalloc;
33
use router::Router;
4-
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
4+
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
55
#[cfg(not(target_os = "windows"))]
66
#[global_allocator]
77
static GLOBAL: Jemalloc = Jemalloc;
88

9-
use crate::shared::utils::AbortOnDrop;
9+
use crate::{monitor::logs::SendLogLayer, shared::utils::AbortOnDrop};
1010
use config::Configuration;
1111
use key_utils::Secp256k1PublicKey;
1212
use lazy_static::lazy_static;
@@ -21,11 +21,11 @@ mod config;
2121
mod ingress;
2222
pub mod jd_client;
2323
mod minin_pool_connection;
24+
mod monitor;
2425
mod proxy_state;
2526
mod router;
2627
mod share_accounter;
2728
mod shared;
28-
mod shares_monitor;
2929
mod translator;
3030

3131
const TRANSLATOR_BUFFER_SIZE: usize = 32;
@@ -74,15 +74,18 @@ async fn main() {
7474
let log_level = Configuration::loglevel();
7575
let noise_connection_log_level = Configuration::nc_loglevel();
7676

77+
let remote_layer = SendLogLayer::new();
78+
let console_layer =
79+
tracing_subscriber::fmt::layer().with_filter(tracing_subscriber::EnvFilter::new(format!(
80+
"{},demand_sv2_connection::noise_connection_tokio={}",
81+
log_level, noise_connection_log_level
82+
)));
7783
//Disable noise_connection error (for now) because:
7884
// 1. It produce logs that are not very user friendly and also bloat the logs
7985
// 2. The errors resulting from noise_connection are handled. E.g if unrecoverable error from noise connection occurs during Pool connection: We either retry connecting immediatley or we update Proxy state to Pool Down
8086
tracing_subscriber::registry()
81-
.with(tracing_subscriber::fmt::layer())
82-
.with(tracing_subscriber::EnvFilter::new(format!(
83-
"{},demand_sv2_connection::noise_connection_tokio={}",
84-
log_level, noise_connection_log_level
85-
)))
87+
.with(console_layer)
88+
.with(remote_layer)
8689
.init();
8790

8891
Configuration::token().expect("TOKEN is not set");

src/monitor/logs.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use crate::monitor::{proxy_log_server_endpoint, MonitorAPI};
2+
use serde::Serialize;
3+
use std::sync::Arc;
4+
use tracing::error;
5+
6+
/// A custom tracing Layer that sends error logs to the server.
7+
///
8+
/// This helps centralize error reporting by automatically forwarding error logs
9+
#[derive(Clone)]
10+
pub struct SendLogLayer {
11+
api: Arc<MonitorAPI>,
12+
content: String,
13+
}
14+
15+
impl SendLogLayer {
16+
pub fn new() -> Self {
17+
let server_url = proxy_log_server_endpoint();
18+
let api = Arc::new(MonitorAPI::new(server_url));
19+
SendLogLayer {
20+
api,
21+
content: String::new(),
22+
}
23+
}
24+
}
25+
26+
/// Implements the `Visit` trait to collect log fields into a string.
27+
impl tracing::field::Visit for SendLogLayer {
28+
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
29+
self.content
30+
.push_str(&format!("{}: {:?}, ", field.name(), value));
31+
}
32+
}
33+
34+
impl<S> tracing_subscriber::Layer<S> for SendLogLayer
35+
where
36+
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
37+
{
38+
/// Called when an event is recorded.
39+
/// If the event is an error, we sends it to the server.
40+
fn on_event(
41+
&self,
42+
event: &tracing::Event<'_>,
43+
_ctx: tracing_subscriber::layer::Context<'_, S>,
44+
) {
45+
if *event.metadata().level() == tracing::Level::ERROR {
46+
let mut visitor = self.clone();
47+
event.record(&mut visitor);
48+
let content = format!(
49+
"{}: {}",
50+
event.metadata().target(),
51+
visitor.content.trim_end_matches(", ")
52+
);
53+
let payload = ProxyLog::new(Severity::Error, content);
54+
let api = Arc::clone(&self.api);
55+
let payload = Arc::new(payload);
56+
// Spawn a new task to send the log asynchronously
57+
tokio::spawn(async move {
58+
if let Err(e) = api.send_log(payload.as_ref().clone()).await {
59+
error!("Failed to send log to API: {}", e);
60+
}
61+
});
62+
}
63+
}
64+
}
65+
66+
/// Represent the log to be sent to the API server
67+
#[derive(Serialize, Debug, Clone)]
68+
pub struct ProxyLog {
69+
severity: Severity,
70+
content: String,
71+
}
72+
impl ProxyLog {
73+
fn new(severity: Severity, content: String) -> Self {
74+
ProxyLog { severity, content }
75+
}
76+
}
77+
78+
#[derive(Debug, Clone, Serialize)]
79+
enum Severity {
80+
_Info,
81+
_Warning,
82+
Error,
83+
}

src/monitor/mod.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use reqwest::Url;
2+
use serde_json::json;
3+
use tracing::{debug, error};
4+
5+
use crate::{
6+
config::Configuration,
7+
monitor::{logs::ProxyLog, shares::ShareInfo},
8+
shared::error::Error,
9+
LOCAL_URL, PRODUCTION_URL, STAGING_URL, TESTNET3_URL,
10+
};
11+
12+
pub mod logs;
13+
pub mod shares;
14+
15+
pub struct MonitorAPI {
16+
pub url: Url,
17+
pub client: reqwest::Client,
18+
}
19+
fn proxy_log_server_endpoint() -> String {
20+
match Configuration::environment().as_str() {
21+
"staging" => format!("{}/api/proxy/logs", STAGING_URL),
22+
"testnet3" => format!("{}/api/proxy/logs", TESTNET3_URL),
23+
"local" => format!("{}/api/proxy/logs", LOCAL_URL),
24+
"production" => format!("{}/api/proxy/logs", PRODUCTION_URL),
25+
_ => unreachable!(),
26+
}
27+
}
28+
fn shares_server_endpoint() -> String {
29+
// Determine the monitoring server URL based on the environment
30+
match Configuration::environment().as_str() {
31+
"staging" => format!("{}/api/share/save", STAGING_URL),
32+
"testnet3" => format!("{}/api/share/save", TESTNET3_URL),
33+
"local" => format!("{}/api/share/save", LOCAL_URL),
34+
"production" => format!("{}/api/share/save", PRODUCTION_URL),
35+
_ => unreachable!(),
36+
}
37+
}
38+
39+
impl MonitorAPI {
40+
pub fn new(url: String) -> Self {
41+
let client = reqwest::Client::new();
42+
MonitorAPI {
43+
url: url.parse().expect("Invalid URL"),
44+
client,
45+
}
46+
}
47+
48+
/// Sends a batch of shares to the monitoring server.
49+
async fn send_shares(&self, shares: Vec<ShareInfo>) -> Result<(), Error> {
50+
let token = crate::config::Configuration::token().expect("Token is not set");
51+
52+
debug!("Sending batch of {} shares to API", shares.len());
53+
let response = self
54+
.client
55+
.post(self.url.clone())
56+
.json(&json!({ "shares": shares, "token": token }))
57+
.send()
58+
.await?;
59+
60+
match response.error_for_status() {
61+
Ok(_) => Ok(()),
62+
Err(err) => {
63+
error!("Failed to send shares: {}", err);
64+
Err(err.into())
65+
}
66+
}
67+
}
68+
69+
/// Sends a log to the monitoring server.
70+
pub async fn send_log(&self, log: ProxyLog) -> Result<(), Error> {
71+
let token = crate::config::Configuration::token().expect("Token is not set");
72+
73+
debug!("Sending log to API: {:?}", log);
74+
let response = self
75+
.client
76+
.post(self.url.clone())
77+
.json(&json!({ "log": log, "token": token }))
78+
.send()
79+
.await?;
80+
81+
match response.error_for_status() {
82+
Ok(_) => Ok(()),
83+
Err(err) => {
84+
error!("Failed to send log: {}", err);
85+
Err(err.into())
86+
}
87+
}
88+
}
89+
}
Lines changed: 2 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,13 @@
1-
use reqwest::Url;
21
use roles_logic_sv2::utils::Mutex;
3-
use serde_json::json;
42
use std::sync::Arc;
53
use tracing::{debug, error};
64
const BATCH_SIZE: u32 = 20; // Default batch size for sending shares
75

86
use crate::{
9-
config::Configuration,
7+
monitor::{shares_server_endpoint, MonitorAPI},
108
proxy_state::{DownstreamType, ProxyState},
11-
shared::error::Error,
12-
LOCAL_URL, PRODUCTION_URL, STAGING_URL, TESTNET3_URL,
139
};
1410

15-
fn monitoring_server_url() -> String {
16-
// Determine the monitoring server URL based on the environment
17-
match Configuration::environment().as_str() {
18-
"staging" => format!("{}/api/share/save", STAGING_URL),
19-
"testnet3" => format!("{}/api/share/save", TESTNET3_URL),
20-
"local" => format!("{}/api/share/save", LOCAL_URL),
21-
"production" => format!("{}/api/share/save", PRODUCTION_URL),
22-
_ => unreachable!(),
23-
}
24-
}
25-
2611
#[derive(serde::Serialize, Clone, Debug)]
2712
pub struct ShareInfo {
2813
worker_name: String,
@@ -103,7 +88,7 @@ impl SharesMonitor {
10388

10489
/// Monitors the pending shares and sends them to the monitoring server in batches.
10590
pub async fn monitor(&self) {
106-
let api = MonitorAPI::new(monitoring_server_url());
91+
let api = MonitorAPI::new(shares_server_endpoint());
10792
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); // Check every 60 seconds
10893
interval.tick().await; // Skip the first tick to avoid unnecessary error log
10994
loop {
@@ -134,42 +119,6 @@ impl SharesMonitor {
134119
}
135120
}
136121

137-
struct MonitorAPI {
138-
url: Url,
139-
client: reqwest::Client,
140-
}
141-
142-
impl MonitorAPI {
143-
fn new(url: String) -> Self {
144-
let client = reqwest::Client::new();
145-
MonitorAPI {
146-
url: url.parse().expect("Invalid URL"),
147-
client,
148-
}
149-
}
150-
151-
/// Sends a batch of shares to the monitoring server.
152-
async fn send_shares(&self, shares: Vec<ShareInfo>) -> Result<(), Error> {
153-
let token = crate::config::Configuration::token().expect("Token is not set");
154-
155-
debug!("Sending batch of {} shares to API", shares.len());
156-
let response = self
157-
.client
158-
.post(self.url.clone())
159-
.json(&json!({ "shares": shares, "token": token }))
160-
.send()
161-
.await?;
162-
163-
match response.error_for_status() {
164-
Ok(_) => Ok(()),
165-
Err(err) => {
166-
error!("Failed to send shares: {}", err);
167-
Err(err.into())
168-
}
169-
}
170-
}
171-
}
172-
173122
#[derive(Debug, Clone, serde::Serialize)]
174123
pub enum RejectionReason {
175124
JobIdNotFound,

src/translator/downstream/downstream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::{
22
api::stats::StatsSender,
3+
monitor::shares::{RejectionReason, ShareInfo, SharesMonitor},
34
proxy_state::{DownstreamType, ProxyState},
45
shared::utils::AbortOnDrop,
5-
shares_monitor::{RejectionReason, ShareInfo, SharesMonitor},
66
translator::{error::Error, utils::validate_share},
77
};
88

@@ -381,7 +381,7 @@ impl Downstream {
381381
stats_sender: StatsSender,
382382
first_job: Notify<'static>,
383383
) -> Self {
384-
use crate::shares_monitor::SharesMonitor;
384+
use crate::monitor::shares::SharesMonitor;
385385

386386
Downstream {
387387
connection_id,

0 commit comments

Comments
 (0)