Skip to content

Commit 4e3459b

Browse files
authored
Merge pull request dmnd-pool#130 from Priceless-P/fix/downstream-disconnect
fix
2 parents 258f10c + eb4b6f8 commit 4e3459b

2 files changed

Lines changed: 4 additions & 10 deletions

File tree

src/main.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ async fn initialize_proxy(
239239
abort_handles.push((jdc_handle, "jdc".to_string()));
240240
}
241241
let server_handle = tokio::spawn(api::start(router.clone(), stats_sender));
242-
match monitor(router, abort_handles, epsilon, server_handle).await {
242+
abort_handles.push((server_handle.into(), "api_server".to_string()));
243+
match monitor(router, abort_handles, epsilon).await {
243244
Reconnect::NewUpstream(new_pool_addr) => {
244245
ProxyState::update_proxy_state_up();
245246
pool_addr = Some(new_pool_addr);
@@ -258,7 +259,6 @@ async fn monitor(
258259
router: &mut Router,
259260
abort_handles: Vec<(AbortOnDrop, std::string::String)>,
260261
epsilon: Duration,
261-
server_handle: tokio::task::JoinHandle<()>,
262262
) -> Reconnect {
263263
let mut should_check_upstreams_latency = 0;
264264
loop {
@@ -269,7 +269,6 @@ async fn monitor(
269269
if let Some(new_upstream) = router.monitor_upstream(epsilon).await {
270270
info!("Faster upstream detected. Reinitializing proxy...");
271271
drop(abort_handles);
272-
server_handle.abort(); // abort server
273272

274273
// Needs a little to time to drop
275274
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
@@ -288,7 +287,6 @@ async fn monitor(
288287
for (handle, _name) in abort_handles {
289288
drop(handle);
290289
}
291-
server_handle.abort(); // abort server
292290

293291
// Check if the proxy state is down, and if so, reinitialize the proxy.
294292
let is_proxy_down = ProxyState::is_proxy_down();
@@ -311,7 +309,6 @@ async fn monitor(
311309
is_proxy_down.1.unwrap_or("Proxy".to_string())
312310
);
313311
drop(abort_handles); // Drop all abort handles
314-
server_handle.abort(); // abort server
315312
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; // Needs a little to time to drop
316313
return Reconnect::NoUpstream;
317314
}

src/translator/downstream/receive_from_downstream.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
use super::{downstream::Downstream, task_manager::TaskManager};
2-
use crate::{
3-
proxy_state::{DownstreamType, ProxyState},
4-
translator::error::Error,
5-
};
2+
use crate::{proxy_state::ProxyState, translator::error::Error};
63
use roles_logic_sv2::utils::Mutex;
74
use std::sync::Arc;
85
use sv1_api::{client_to_server::Submit, json_rpc};
@@ -36,7 +33,7 @@ pub async fn start_receive_downstream(
3633
Downstream::handle_incoming_sv1(downstream.clone(), incoming).await
3734
{
3835
error!("Failed to handle incoming sv1 msg: {:?}", error);
39-
ProxyState::update_downstream_state(DownstreamType::TranslatorDownstream);
36+
break;
4037
};
4138
} else {
4239
// Message received could not be converted to rpc message

0 commit comments

Comments
 (0)