diff --git a/.gitignore b/.gitignore index 2547c8b9..a5a59636 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.DS_Store target/ /models/ /config.toml diff --git a/atoma-bin/atoma_node.rs b/atoma-bin/atoma_node.rs index 2b075838..bfb99827 100644 --- a/atoma-bin/atoma_node.rs +++ b/atoma-bin/atoma_node.rs @@ -66,6 +66,7 @@ impl NodeConfig { let service = AtomaServiceConfig::from_file_path(path); let state = AtomaStateManagerConfig::from_file_path(path); let daemon = AtomaDaemonConfig::from_file_path(path); + Self { sui, p2p, @@ -165,6 +166,13 @@ async fn main() -> Result<()> { let (event_subscriber_sender, event_subscriber_receiver) = flume::unbounded(); let (state_manager_sender, state_manager_receiver) = flume::unbounded(); let (p2p_event_sender, p2p_event_receiver) = flume::unbounded(); + + // Start the heartbeat service + start_heartbeat_service( + shutdown_receiver.clone(), + config.service.heartbeat_url.clone(), + ); + info!( target = "atoma-node-service", event = "keystore_path", @@ -491,3 +499,72 @@ fn handle_tasks_results( )?; Ok(()) } + +/// Starts a heartbeat service that pings a health check endpoint every minute. +/// +/// This function spawns a background task that sends a GET request to a health check +/// service at regular intervals to indicate the daemon is still running. +/// +/// # Arguments +/// * `shutdown_receiver` - A receiver that signals when the service should shut down +/// * `heartbeat_url` - The URL of the heartbeat service +#[allow(clippy::redundant_pub_crate)] +fn start_heartbeat_service(mut shutdown_receiver: watch::Receiver, heartbeat_url: String) { + tokio::spawn(async move { + let client = reqwest::Client::new(); + let interval = std::time::Duration::from_secs(60); + + tracing::info!( + target = "atoma_daemon", + event = "heartbeat-service-start", + url = %heartbeat_url.clone(), + interval_secs = %interval.as_secs(), + "Starting heartbeat service" + ); + + loop { + tokio::select! { + () = tokio::time::sleep(interval) => { + // Send heartbeat ping + match client.get(heartbeat_url.clone()).send().await { + Ok(response) => { + if response.status().is_success() { + tracing::debug!( + target = "atoma_daemon", + event = "heartbeat-ping", + status = %response.status(), + "Sent heartbeat ping successfully" + ); + } else { + tracing::warn!( + target = "atoma_daemon", + event = "heartbeat-ping-failed", + status = %response.status(), + "Heartbeat ping returned non-success status" + ); + } + }, + Err(e) => { + tracing::error!( + target = "atoma_daemon", + event = "heartbeat-ping-error", + error = %e, + "Failed to send heartbeat ping" + ); + } + } + } + result = shutdown_receiver.changed() => { + if result.is_err() || *shutdown_receiver.borrow() { + tracing::info!( + target = "atoma_daemon", + event = "heartbeat-service-shutdown", + "Heartbeat service shutting down" + ); + break; + } + } + } + } + }); +} diff --git a/atoma-service/src/config.rs b/atoma-service/src/config.rs index 105323f0..d23f6249 100644 --- a/atoma-service/src/config.rs +++ b/atoma-service/src/config.rs @@ -44,6 +44,9 @@ pub struct AtomaServiceConfig { /// /// This field specifies the address and port on which the Atoma Service will bind. pub service_bind_address: String, + + /// The URL of the heartbeat service. + pub heartbeat_url: String, } impl AtomaServiceConfig { diff --git a/config.example.toml b/config.example.toml index d97450cb..6d474924 100644 --- a/config.example.toml +++ b/config.example.toml @@ -36,6 +36,7 @@ chat_completions_service_urls = { "Infermatic/Llama-3.3-70B-Instruct-FP8-Dynamic embeddings_service_url = "http://embeddings:80" image_generations_service_url = "http://image-generations:80" # List of models to be used by the service, the current value here is just a placeholder, please change it to the models you want to deploy +heartbeat_url = "my-heartbeat-url" models = [ "Infermatic/Llama-3.3-70B-Instruct-FP8-Dynamic" ] revisions = [ "main" ] service_bind_address = "0.0.0.0:3000"