From e7a78f9d68cf2028df722e21baacf48c1ede1a7e Mon Sep 17 00:00:00 2001 From: chad Date: Mon, 21 Apr 2025 15:10:56 -0500 Subject: [PATCH 1/4] feat: add heartbeat service --- .gitignore | 1 + atoma-bin/atoma_node.rs | 76 +++++++++++++++++++++++++++++++++++++ atoma-service/src/config.rs | 3 ++ config.example.toml | 1 + 4 files changed, 81 insertions(+) diff --git a/.gitignore b/.gitignore index 2547c8b9..a5a59636 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.DS_Store target/ /models/ /config.toml diff --git a/atoma-bin/atoma_node.rs b/atoma-bin/atoma_node.rs index 2b075838..912f13ae 100644 --- a/atoma-bin/atoma_node.rs +++ b/atoma-bin/atoma_node.rs @@ -66,6 +66,7 @@ impl NodeConfig { let service = AtomaServiceConfig::from_file_path(path); let state = AtomaStateManagerConfig::from_file_path(path); let daemon = AtomaDaemonConfig::from_file_path(path); + Self { sui, p2p, @@ -165,6 +166,13 @@ async fn main() -> Result<()> { let (event_subscriber_sender, event_subscriber_receiver) = flume::unbounded(); let (state_manager_sender, state_manager_receiver) = flume::unbounded(); let (p2p_event_sender, p2p_event_receiver) = flume::unbounded(); + + // Start the heartbeat service + start_heartbeat_service( + shutdown_receiver.clone(), + config.service.heartbeat_url.clone(), + ); + info!( target = "atoma-node-service", event = "keystore_path", @@ -491,3 +499,71 @@ fn handle_tasks_results( )?; Ok(()) } + +/// Starts a heartbeat service that pings a health check endpoint every minute. +/// +/// This function spawns a background task that sends a GET request to a health check +/// service at regular intervals to indicate the daemon is still running. +/// +/// # Arguments +/// * `shutdown_receiver` - A receiver that signals when the service should shut down +/// * `heartbeat_url` - The URL of the heartbeat service +fn start_heartbeat_service(mut shutdown_receiver: watch::Receiver, heartbeat_url: String) { + tokio::spawn(async move { + let client = reqwest::Client::new(); + let interval = std::time::Duration::from_secs(60); + + tracing::info!( + target = "atoma_daemon", + event = "heartbeat-service-start", + url = %heartbeat_url.clone(), + interval_secs = %interval.as_secs(), + "Starting heartbeat service" + ); + + loop { + tokio::select! { + _ = tokio::time::sleep(interval) => { + // Send heartbeat ping + match client.get(heartbeat_url.clone()).send().await { + Ok(response) => { + if response.status().is_success() { + tracing::debug!( + target = "atoma_daemon", + event = "heartbeat-ping", + status = %response.status(), + "Sent heartbeat ping successfully" + ); + } else { + tracing::warn!( + target = "atoma_daemon", + event = "heartbeat-ping-failed", + status = %response.status(), + "Heartbeat ping returned non-success status" + ); + } + }, + Err(e) => { + tracing::error!( + target = "atoma_daemon", + event = "heartbeat-ping-error", + error = %e, + "Failed to send heartbeat ping" + ); + } + } + } + result = shutdown_receiver.changed() => { + if result.is_err() || *shutdown_receiver.borrow() { + tracing::info!( + target = "atoma_daemon", + event = "heartbeat-service-shutdown", + "Heartbeat service shutting down" + ); + break; + } + } + } + } + }); +} diff --git a/atoma-service/src/config.rs b/atoma-service/src/config.rs index 105323f0..d23f6249 100644 --- a/atoma-service/src/config.rs +++ b/atoma-service/src/config.rs @@ -44,6 +44,9 @@ pub struct AtomaServiceConfig { /// /// This field specifies the address and port on which the Atoma Service will bind. pub service_bind_address: String, + + /// The URL of the heartbeat service. + pub heartbeat_url: String, } impl AtomaServiceConfig { diff --git a/config.example.toml b/config.example.toml index d97450cb..4c29029f 100644 --- a/config.example.toml +++ b/config.example.toml @@ -39,6 +39,7 @@ image_generations_service_url = "http://image-generations:80" models = [ "Infermatic/Llama-3.3-70B-Instruct-FP8-Dynamic" ] revisions = [ "main" ] service_bind_address = "0.0.0.0:3000" +heartbeat_url = "my-heartbeat-url" [atoma_sui] atoma_db = "0x02920289f426dd1f3c2572d613f7dc92be95041720864a73d44d65585530efc5" # Current ATOMA DB object ID for testnet From 03bf8e523f2f9ba12c33e24d6427242d55da9ecf Mon Sep 17 00:00:00 2001 From: chad Date: Mon, 21 Apr 2025 15:12:03 -0500 Subject: [PATCH 2/4] chore: format taplo --- config.example.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.example.toml b/config.example.toml index 4c29029f..6d474924 100644 --- a/config.example.toml +++ b/config.example.toml @@ -36,10 +36,10 @@ chat_completions_service_urls = { "Infermatic/Llama-3.3-70B-Instruct-FP8-Dynamic embeddings_service_url = "http://embeddings:80" image_generations_service_url = "http://image-generations:80" # List of models to be used by the service, the current value here is just a placeholder, please change it to the models you want to deploy +heartbeat_url = "my-heartbeat-url" models = [ "Infermatic/Llama-3.3-70B-Instruct-FP8-Dynamic" ] revisions = [ "main" ] service_bind_address = "0.0.0.0:3000" -heartbeat_url = "my-heartbeat-url" [atoma_sui] atoma_db = "0x02920289f426dd1f3c2572d613f7dc92be95041720864a73d44d65585530efc5" # Current ATOMA DB object ID for testnet From 6994e14b0612f1707b7a78f9b0c6b64af71999bf Mon Sep 17 00:00:00 2001 From: chad Date: Mon, 21 Apr 2025 15:17:55 -0500 Subject: [PATCH 3/4] lint: formatting --- atoma-bin/atoma_node.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/atoma-bin/atoma_node.rs b/atoma-bin/atoma_node.rs index 912f13ae..a416d42a 100644 --- a/atoma-bin/atoma_node.rs +++ b/atoma-bin/atoma_node.rs @@ -508,6 +508,7 @@ fn handle_tasks_results( /// # Arguments /// * `shutdown_receiver` - A receiver that signals when the service should shut down /// * `heartbeat_url` - The URL of the heartbeat service +#[allow(clippy::redundant_pub_crate)] fn start_heartbeat_service(mut shutdown_receiver: watch::Receiver, heartbeat_url: String) { tokio::spawn(async move { let client = reqwest::Client::new(); From 2e00465cac8054a80a68dcd2f643d4b8f443e489 Mon Sep 17 00:00:00 2001 From: chad Date: Mon, 21 Apr 2025 15:33:31 -0500 Subject: [PATCH 4/4] lint: clippy issue --- atoma-bin/atoma_node.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atoma-bin/atoma_node.rs b/atoma-bin/atoma_node.rs index a416d42a..bfb99827 100644 --- a/atoma-bin/atoma_node.rs +++ b/atoma-bin/atoma_node.rs @@ -524,7 +524,7 @@ fn start_heartbeat_service(mut shutdown_receiver: watch::Receiver, heartbe loop { tokio::select! { - _ = tokio::time::sleep(interval) => { + () = tokio::time::sleep(interval) => { // Send heartbeat ping match client.get(heartbeat_url.clone()).send().await { Ok(response) => {