Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 09d2278

Browse files
authored
imp: Orchestrator Details about docker status on machine (#514)
* add more detail on running docker container via heartbeat to orchestrator
1 parent efcd364 commit 09d2278

8 files changed

Lines changed: 110 additions & 5 deletions

File tree

crates/orchestrator/src/api/routes/heartbeat.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ async fn heartbeat(
6161
node_address,
6262
task_info.task_id.clone(),
6363
task_info.task_state.clone(),
64+
task_info.task_details,
6465
)
6566
.await
6667
{
@@ -255,6 +256,7 @@ mod tests {
255256
version: None,
256257
timestamp: None,
257258
p2p_id: None,
259+
task_details: None,
258260
})
259261
);
260262

@@ -426,6 +428,7 @@ mod tests {
426428

427429
let heartbeat = HeartbeatRequest {
428430
address: "0x0000000000000000000000000000000000000000".to_string(),
431+
task_details: None,
429432
..Default::default()
430433
};
431434
assert_eq!(value, Some(heartbeat));

crates/orchestrator/src/models/node.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use alloy::primitives::Address;
22
use chrono::{DateTime, Utc};
33
use serde::{Deserialize, Serialize};
4+
use shared::models::heartbeat::TaskDetails;
45
use shared::models::node::{ComputeSpecs, DiscoveryNode};
56
use shared::models::task::TaskState;
67
use std::fmt::{self, Display};
@@ -15,6 +16,8 @@ pub struct OrchestratorNode {
1516

1617
pub task_id: Option<String>,
1718
pub task_state: Option<TaskState>,
19+
#[serde(default)]
20+
pub task_details: Option<TaskDetails>,
1821
pub version: Option<String>,
1922
pub p2p_id: Option<String>,
2023
pub last_status_change: Option<DateTime<Utc>>,
@@ -49,6 +52,7 @@ impl From<DiscoveryNode> for OrchestratorNode {
4952
p2p_id: None,
5053
last_status_change: None,
5154
first_seen: None,
55+
task_details: None,
5256
compute_specs: discovery_node.compute_specs.clone(),
5357
worker_p2p_id: discovery_node.worker_p2p_id.clone(),
5458
worker_p2p_addresses: discovery_node.worker_p2p_addresses.clone(),

crates/orchestrator/src/status_update/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ mod tests {
371371
version: Some(env!("CARGO_PKG_VERSION").to_string()),
372372
timestamp: None,
373373
p2p_id: None,
374+
task_details: None,
374375
};
375376
if let Err(e) = app_state
376377
.store_context
@@ -617,6 +618,7 @@ mod tests {
617618
let heartbeat = HeartbeatRequest {
618619
address: node.address.to_string(),
619620
version: Some(env!("CARGO_PKG_VERSION").to_string()),
621+
task_details: None,
620622
..Default::default()
621623
};
622624
if let Err(e) = app_state
@@ -837,6 +839,7 @@ mod tests {
837839
let heartbeat = HeartbeatRequest {
838840
address: node.address.to_string(),
839841
version: Some(env!("CARGO_PKG_VERSION").to_string()),
842+
task_details: None,
840843
..Default::default()
841844
};
842845
if let Err(e) = app_state

crates/orchestrator/src/store/domains/node_store.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use anyhow::Result;
66
use log::info;
77
use redis::AsyncCommands;
88
use redis::Value;
9+
use shared::models::heartbeat::TaskDetails;
910
use shared::models::task::TaskState;
1011
use std::sync::Arc;
1112

@@ -22,6 +23,11 @@ impl NodeStore {
2223
pub async fn get_nodes(&self) -> Result<Vec<OrchestratorNode>> {
2324
let mut con = self.redis.client.get_multiplexed_async_connection().await?;
2425
let keys: Vec<String> = con.keys(format!("{}:*", ORCHESTRATOR_BASE_KEY)).await?;
26+
27+
if keys.is_empty() {
28+
return Ok(Vec::new());
29+
}
30+
2531
let mut nodes: Vec<OrchestratorNode> = Vec::new();
2632

2733
for node in keys {
@@ -132,6 +138,7 @@ impl NodeStore {
132138
node_address: Address,
133139
current_task: Option<String>,
134140
task_state: Option<String>,
141+
task_details: Option<TaskDetails>,
135142
) -> Result<()> {
136143
let mut con = self.redis.client.get_multiplexed_async_connection().await?;
137144

@@ -152,15 +159,17 @@ impl NodeStore {
152159
})
153160
.unwrap();
154161
let task_state = task_state.map(|state| TaskState::from(state.as_str()));
155-
let details = (current_task, task_state);
162+
let details = (current_task, task_state, task_details);
156163
match details {
157-
(Some(task), Some(task_state)) => {
164+
(Some(task), Some(task_state), task_details) => {
158165
node.task_state = Some(task_state);
159166
node.task_id = Some(task);
167+
node.task_details = task_details;
160168
}
161169
_ => {
162170
node.task_state = None;
163171
node.task_id = None;
172+
node.task_details = None;
164173
}
165174
}
166175
let node_string = node.to_string();

crates/shared/src/models/heartbeat.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ impl From<HeartbeatResponse> for HttpResponse {
2020
}
2121
}
2222

23+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
24+
pub struct TaskDetails {
25+
pub docker_image_id: Option<String>,
26+
pub container_id: Option<String>,
27+
pub container_status: Option<String>,
28+
pub container_created_at: Option<i64>,
29+
pub container_exit_code: Option<i64>,
30+
}
31+
2332
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
2433
pub struct HeartbeatRequest {
2534
pub address: String,
@@ -31,4 +40,6 @@ pub struct HeartbeatRequest {
3140
pub timestamp: Option<u64>,
3241
#[serde(default)]
3342
pub p2p_id: Option<String>,
43+
#[serde(default)]
44+
pub task_details: Option<TaskDetails>,
3445
}

crates/worker/src/docker/docker_manager.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,4 +818,13 @@ impl DockerManager {
818818
debug!("Successfully retrieved logs for container {}", container_id);
819819
Ok(logs)
820820
}
821+
822+
pub async fn inspect_container(
823+
&self,
824+
container_id: &str,
825+
) -> Result<bollard::models::ContainerInspectResponse, DockerError> {
826+
self.docker
827+
.inspect_container(container_id, Some(InspectContainerOptions { size: false }))
828+
.await
829+
}
821830
}

crates/worker/src/docker/service.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::console::Console;
55
use bollard::models::ContainerStateStatusEnum;
66
use chrono::{DateTime, Utc};
77
use log::debug;
8+
use shared::models::heartbeat::TaskDetails;
89
use shared::models::node::GpuSpecs;
910
use shared::models::task::Task;
1011
use shared::models::task::TaskState;
@@ -347,6 +348,65 @@ impl DockerService {
347348
None => Ok(()),
348349
}
349350
}
351+
352+
pub async fn get_task_details(&self, task: &Task) -> Option<TaskDetails> {
353+
let config_hash = task.generate_config_hash();
354+
let container_name = format!("{}-{}-{:x}", TASK_PREFIX, task.id, config_hash);
355+
356+
match self.docker_manager.list_containers(true).await {
357+
Ok(containers) => {
358+
let container = containers
359+
.iter()
360+
.find(|c| c.names.contains(&format!("/{}", container_name)));
361+
362+
if let Some(container) = container {
363+
match self
364+
.docker_manager
365+
.get_container_details(&container.id)
366+
.await
367+
{
368+
Ok(details) => {
369+
let docker_image_id = if let Ok(inspect_result) =
370+
self.docker_manager.inspect_container(&container.id).await
371+
{
372+
inspect_result.image
373+
} else {
374+
Some(container.image.clone())
375+
};
376+
377+
Some(TaskDetails {
378+
docker_image_id,
379+
container_id: Some(container.id.clone()),
380+
container_status: details.status.map(|s| format!("{:?}", s)),
381+
container_created_at: Some(container.created),
382+
container_exit_code: details.status_code,
383+
})
384+
}
385+
Err(e) => {
386+
debug!("Failed to get container details: {}", e);
387+
Some(TaskDetails {
388+
docker_image_id: Some(container.image.clone()),
389+
container_id: Some(container.id.clone()),
390+
container_status: None,
391+
container_created_at: Some(container.created),
392+
container_exit_code: None,
393+
})
394+
}
395+
}
396+
} else {
397+
debug!(
398+
"Container {} not found for task {}",
399+
container_name, task.id
400+
);
401+
None
402+
}
403+
}
404+
Err(e) => {
405+
debug!("Failed to list containers: {}", e);
406+
None
407+
}
408+
}
409+
}
350410
}
351411

352412
#[cfg(test)]

crates/worker/src/operations/heartbeat/service.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ async fn send_heartbeat(
154154
.duration_since(UNIX_EPOCH)
155155
.unwrap()
156156
.as_secs();
157+
158+
let task_details = if let Some(task) = &current_task_state {
159+
docker_service.get_task_details(task).await
160+
} else {
161+
None
162+
};
163+
157164
let request = if let Some(task) = current_task_state {
158165
let metrics_for_task = metrics_store
159166
.get_metrics_for_task(task.id.to_string())
@@ -170,20 +177,19 @@ async fn send_heartbeat(
170177
),
171178
timestamp: Some(ts),
172179
p2p_id,
180+
task_details,
173181
}
174182
} else {
175183
HeartbeatRequest {
176184
address: wallet.address().to_string(),
177-
task_id: None,
178-
task_state: None,
179-
metrics: None,
180185
version: Some(
181186
option_env!("WORKER_VERSION")
182187
.unwrap_or(env!("CARGO_PKG_VERSION"))
183188
.to_string(),
184189
),
185190
timestamp: Some(ts),
186191
p2p_id,
192+
..Default::default()
187193
}
188194
};
189195

0 commit comments

Comments
 (0)