Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.DS_Store
target/
/models/
/config.toml
Expand Down
77 changes: 77 additions & 0 deletions atoma-bin/atoma_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl NodeConfig {
let service = AtomaServiceConfig::from_file_path(path);
let state = AtomaStateManagerConfig::from_file_path(path);
let daemon = AtomaDaemonConfig::from_file_path(path);

Self {
sui,
p2p,
Expand Down Expand Up @@ -165,6 +166,13 @@ async fn main() -> Result<()> {
let (event_subscriber_sender, event_subscriber_receiver) = flume::unbounded();
let (state_manager_sender, state_manager_receiver) = flume::unbounded();
let (p2p_event_sender, p2p_event_receiver) = flume::unbounded();

// Start the heartbeat service
start_heartbeat_service(
shutdown_receiver.clone(),
config.service.heartbeat_url.clone(),
);

info!(
target = "atoma-node-service",
event = "keystore_path",
Expand Down Expand Up @@ -491,3 +499,72 @@ fn handle_tasks_results(
)?;
Ok(())
}

/// Starts a heartbeat service that pings a health check endpoint every minute.
///
/// This function spawns a background task that sends a GET request to a health check
/// service at regular intervals to indicate the daemon is still running.
///
/// # Arguments
/// * `shutdown_receiver` - A receiver that signals when the service should shut down
/// * `heartbeat_url` - The URL of the heartbeat service
#[allow(clippy::redundant_pub_crate)]
fn start_heartbeat_service(mut shutdown_receiver: watch::Receiver<bool>, heartbeat_url: String) {
tokio::spawn(async move {
let client = reqwest::Client::new();
let interval = std::time::Duration::from_secs(60);

tracing::info!(
target = "atoma_daemon",
event = "heartbeat-service-start",
url = %heartbeat_url.clone(),
interval_secs = %interval.as_secs(),
"Starting heartbeat service"
);

loop {
tokio::select! {
() = tokio::time::sleep(interval) => {
// Send heartbeat ping
match client.get(heartbeat_url.clone()).send().await {
Ok(response) => {
if response.status().is_success() {
tracing::debug!(
target = "atoma_daemon",
event = "heartbeat-ping",
status = %response.status(),
"Sent heartbeat ping successfully"
);
} else {
tracing::warn!(
target = "atoma_daemon",
event = "heartbeat-ping-failed",
status = %response.status(),
"Heartbeat ping returned non-success status"
);
}
},
Err(e) => {
tracing::error!(
target = "atoma_daemon",
event = "heartbeat-ping-error",
error = %e,
"Failed to send heartbeat ping"
);
}
}
}
result = shutdown_receiver.changed() => {
if result.is_err() || *shutdown_receiver.borrow() {
tracing::info!(
target = "atoma_daemon",
event = "heartbeat-service-shutdown",
"Heartbeat service shutting down"
);
break;
}
}
}
}
});
}
3 changes: 3 additions & 0 deletions atoma-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub struct AtomaServiceConfig {
///
/// This field specifies the address and port on which the Atoma Service will bind.
pub service_bind_address: String,

/// The URL of the heartbeat service.
pub heartbeat_url: String,
}

impl AtomaServiceConfig {
Expand Down
1 change: 1 addition & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ chat_completions_service_urls = { "Infermatic/Llama-3.3-70B-Instruct-FP8-Dynamic
embeddings_service_url = "http://embeddings:80"
image_generations_service_url = "http://image-generations:80"
# List of models to be used by the service, the current value here is just a placeholder, please change it to the models you want to deploy
heartbeat_url = "my-heartbeat-url"
models = [ "Infermatic/Llama-3.3-70B-Instruct-FP8-Dynamic" ]
revisions = [ "main" ]
service_bind_address = "0.0.0.0:3000"
Expand Down