Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 1614cab

Browse files
authored
improve node grouping and dissolution - link to task, speed up groups api, add config name to metrics (#445)
1 parent 27f1376 commit 1614cab

6 files changed

Lines changed: 435 additions & 90 deletions

File tree

crates/orchestrator/src/api/routes/task.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,53 @@ mod tests {
264264
assert_eq!(tasks[1].image, "test2");
265265
assert_eq!(tasks[2].image, "test1");
266266
}
267+
268+
#[actix_web::test]
269+
async fn test_create_task_with_metadata() {
270+
let app_state = create_test_app_state().await;
271+
let app = test::init_service(
272+
App::new()
273+
.app_data(app_state.clone())
274+
.route("/tasks", post().to(create_task)),
275+
)
276+
.await;
277+
278+
let mut labels = std::collections::HashMap::new();
279+
labels.insert("model".to_string(), "qwen3-4b".to_string());
280+
labels.insert("dataset".to_string(), "intellect-2-rl-dataset".to_string());
281+
labels.insert("version".to_string(), "v1".to_string());
282+
283+
let payload = TaskRequest {
284+
image: "primeintellect/prime-rl:main".to_string(),
285+
name: "Qwen3-4B:INTELLECT-2-RL-Dataset".to_string(),
286+
metadata: Some(shared::models::task::TaskMetadata {
287+
labels: Some(labels),
288+
}),
289+
..Default::default()
290+
};
291+
292+
let req = test::TestRequest::post()
293+
.uri("/tasks")
294+
.set_json(payload)
295+
.to_request();
296+
let resp = test::call_service(&app, req).await;
297+
assert_eq!(resp.status(), StatusCode::OK);
298+
299+
let body = test::read_body(resp).await;
300+
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
301+
assert_eq!(json["success"], serde_json::Value::Bool(true));
302+
assert!(json["task"]["id"].is_string());
303+
assert_eq!(json["task"]["image"], "primeintellect/prime-rl:main");
304+
assert_eq!(json["task"]["name"], "Qwen3-4B:INTELLECT-2-RL-Dataset");
305+
306+
// Verify metadata is preserved
307+
assert!(json["task"]["metadata"].is_object());
308+
assert!(json["task"]["metadata"]["labels"].is_object());
309+
assert_eq!(json["task"]["metadata"]["labels"]["model"], "qwen3-4b");
310+
assert_eq!(
311+
json["task"]["metadata"]["labels"]["dataset"],
312+
"intellect-2-rl-dataset"
313+
);
314+
assert_eq!(json["task"]["metadata"]["labels"]["version"], "v1");
315+
}
267316
}

crates/orchestrator/src/metrics/mod.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod webhook_sender;
44

55
pub struct MetricsContext {
66
pub compute_task_gauges: GaugeVec,
7+
pub task_info: GaugeVec,
78
pub pool_id: String,
89
pub registry: Registry,
910
pub file_upload_requests_total: CounterVec,
@@ -19,7 +20,21 @@ impl MetricsContext {
1920
// For current state/rate metrics
2021
let compute_task_gauges = GaugeVec::new(
2122
Opts::new("compute_gauges", "Compute task gauge metrics"),
22-
&["node_address", "task_id", "task_name", "label", "pool_id"],
23+
&[
24+
"node_address",
25+
"task_id",
26+
"task_name",
27+
"label",
28+
"pool_id",
29+
"group_id",
30+
"group_config_name",
31+
],
32+
)
33+
.unwrap();
34+
35+
let task_info = GaugeVec::new(
36+
Opts::new("task_info", "Task information with metadata"),
37+
&["task_id", "task_name", "pool_id", "metadata"],
2338
)
2439
.unwrap();
2540

@@ -77,6 +92,7 @@ impl MetricsContext {
7792

7893
let registry = Registry::new();
7994
let _ = registry.register(Box::new(compute_task_gauges.clone()));
95+
let _ = registry.register(Box::new(task_info.clone()));
8096
let _ = registry.register(Box::new(file_upload_requests_total.clone()));
8197
let _ = registry.register(Box::new(nodes_total.clone()));
8298
let _ = registry.register(Box::new(tasks_total.clone()));
@@ -86,6 +102,7 @@ impl MetricsContext {
86102

87103
Self {
88104
compute_task_gauges,
105+
task_info,
89106
pool_id,
90107
registry,
91108
file_upload_requests_total,
@@ -97,16 +114,29 @@ impl MetricsContext {
97114
}
98115
}
99116

117+
#[allow(clippy::too_many_arguments)]
100118
pub fn record_compute_task_gauge(
101119
&self,
102120
node_address: &str,
103121
task_id: &str,
104122
task_name: &str,
105123
label: &str,
106124
value: f64,
125+
group_id: Option<&str>,
126+
group_config_name: Option<&str>,
107127
) {
128+
let group_id_str = group_id.unwrap_or("none");
129+
let group_config_name_str = group_config_name.unwrap_or("none");
108130
self.compute_task_gauges
109-
.with_label_values(&[node_address, task_id, task_name, label, &self.pool_id])
131+
.with_label_values(&[
132+
node_address,
133+
task_id,
134+
task_name,
135+
label,
136+
&self.pool_id,
137+
group_id_str,
138+
group_config_name_str,
139+
])
110140
.set(value);
111141
}
112142

@@ -151,6 +181,12 @@ impl MetricsContext {
151181
.set(count);
152182
}
153183

184+
pub fn set_task_info(&self, task_id: &str, task_name: &str, metadata: &str) {
185+
self.task_info
186+
.with_label_values(&[task_id, task_name, &self.pool_id, metadata])
187+
.set(1.0);
188+
}
189+
154190
pub fn export_metrics(&self) -> Result<String, prometheus::Error> {
155191
let encoder = TextEncoder::new();
156192
let metric_families = self.registry.gather();
@@ -170,5 +206,6 @@ impl MetricsContext {
170206
self.tasks_total.reset();
171207
self.groups_total.reset();
172208
self.nodes_per_task.reset();
209+
self.task_info.reset();
173210
}
174211
}

crates/orchestrator/src/metrics/sync_service.rs

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::plugins::node_groups::NodeGroupsPlugin;
33
use crate::store::core::StoreContext;
44
use crate::ServerMode;
55
use log::{debug, error, info};
6+
use shared::models::task::Task;
67
use std::collections::HashMap;
78
use std::sync::Arc;
89
use std::time::Duration;
@@ -33,6 +34,70 @@ impl MetricsSyncService {
3334
}
3435
}
3536

37+
/// Format task metadata into a structured string for Prometheus labels
38+
/// Example: "model:qwen3-4b|dataset:intellect-2-rl|version:v1"
39+
fn format_task_metadata(task: &Task) -> String {
40+
if let Some(metadata) = &task.metadata {
41+
if let Some(labels) = &metadata.labels {
42+
if !labels.is_empty() {
43+
return labels
44+
.iter()
45+
.map(|(k, v)| format!("{}:{}", k, v))
46+
.collect::<Vec<_>>()
47+
.join("|");
48+
}
49+
}
50+
}
51+
"".to_string()
52+
}
53+
54+
/// Efficiently get all node-to-group mappings including both group_id and group_config_name
55+
/// Returns a HashMap where key is node_address and value is (group_id, group_config_name)
56+
async fn get_all_node_group_info(&self) -> anyhow::Result<HashMap<String, (String, String)>> {
57+
if let Some(node_groups_plugin) = &self.node_groups_plugin {
58+
// First get all node to group_id mappings
59+
let node_to_group_mappings =
60+
match node_groups_plugin.get_all_node_group_mappings().await {
61+
Ok(mappings) => mappings,
62+
Err(e) => {
63+
error!("Failed to get node group mappings: {}", e);
64+
return Ok(HashMap::new());
65+
}
66+
};
67+
68+
// Then get all groups to get their configuration names
69+
let groups = match node_groups_plugin.get_all_groups().await {
70+
Ok(groups) => groups,
71+
Err(e) => {
72+
error!("Failed to get all groups: {}", e);
73+
return Ok(HashMap::new());
74+
}
75+
};
76+
77+
// Create a mapping from group_id to configuration_name
78+
let group_id_to_config: HashMap<String, String> = groups
79+
.into_iter()
80+
.map(|group| (group.id, group.configuration_name))
81+
.collect();
82+
83+
// Combine the mappings to create node_address -> (group_id, group_config_name)
84+
let mut result = HashMap::new();
85+
for (node_address, group_id) in node_to_group_mappings {
86+
if let Some(config_name) = group_id_to_config.get(&group_id) {
87+
result.insert(node_address, (group_id, config_name.clone()));
88+
} else {
89+
// If we can't find the config name, still include the group_id
90+
debug!("No configuration name found for group_id: {}", group_id);
91+
result.insert(node_address, (group_id, "unknown".to_string()));
92+
}
93+
}
94+
95+
Ok(result)
96+
} else {
97+
Ok(HashMap::new())
98+
}
99+
}
100+
36101
pub async fn run(&self) -> anyhow::Result<()> {
37102
// Only run the sync service on ProcessorOnly or Full mode instances
38103
if !matches!(
@@ -86,6 +151,18 @@ impl MetricsSyncService {
86151
.map(|task| (task.id.to_string(), task.name.clone()))
87152
.collect();
88153

154+
let node_to_group_info = if self.node_groups_plugin.is_some() {
155+
match self.get_all_node_group_info().await {
156+
Ok(info) => info,
157+
Err(e) => {
158+
error!("Failed to get node group info: {}", e);
159+
HashMap::new()
160+
}
161+
}
162+
} else {
163+
HashMap::new()
164+
};
165+
89166
// Clear existing Prometheus metrics
90167
self.metrics_context.clear_compute_task_metrics();
91168

@@ -99,12 +176,19 @@ impl MetricsSyncService {
99176

100177
for (label, node_metrics) in task_metrics {
101178
for (node_address, value) in node_metrics {
179+
let (group_id, group_config_name) = node_to_group_info
180+
.get(&node_address)
181+
.map(|(id, config)| (Some(id.as_str()), Some(config.as_str())))
182+
.unwrap_or((None, None));
183+
102184
self.metrics_context.record_compute_task_gauge(
103185
&node_address,
104186
&task_id,
105187
&task_name,
106188
&label,
107189
value,
190+
group_id,
191+
group_config_name,
108192
);
109193
total_metrics += 1;
110194
}
@@ -150,6 +234,15 @@ impl MetricsSyncService {
150234
self.metrics_context.set_tasks_count(total_tasks);
151235
debug!("Synced task statistics: {} total tasks", total_tasks);
152236

237+
// Sync task info metrics with metadata
238+
for task in &tasks {
239+
let task_id = task.id.to_string();
240+
let metadata = Self::format_task_metadata(task);
241+
self.metrics_context
242+
.set_task_info(&task_id, &task.name, &metadata);
243+
}
244+
debug!("Synced task info metrics with metadata");
245+
153246
// Sync nodes per task based on node assignments
154247
// Create task name mapping
155248
let task_name_map: HashMap<String, String> = tasks
@@ -202,3 +295,86 @@ impl MetricsSyncService {
202295
Ok(())
203296
}
204297
}
298+
299+
#[cfg(test)]
300+
mod tests {
301+
use super::*;
302+
use shared::models::task::{Task, TaskMetadata, TaskState};
303+
use std::collections::HashMap;
304+
use uuid::Uuid;
305+
306+
#[test]
307+
fn test_format_task_metadata_with_labels() {
308+
let mut labels = HashMap::new();
309+
labels.insert("model".to_string(), "qwen3-4b".to_string());
310+
labels.insert("dataset".to_string(), "intellect-2-rl-dataset".to_string());
311+
labels.insert("version".to_string(), "v1".to_string());
312+
313+
let task = Task {
314+
id: Uuid::new_v4(),
315+
image: "test".to_string(),
316+
name: "test".to_string(),
317+
state: TaskState::PENDING,
318+
metadata: Some(TaskMetadata {
319+
labels: Some(labels),
320+
}),
321+
..Default::default()
322+
};
323+
324+
let formatted = MetricsSyncService::format_task_metadata(&task);
325+
326+
// The format should be key:value pairs separated by |
327+
// Order might vary due to HashMap iteration
328+
assert!(formatted.contains("model:qwen3-4b"));
329+
assert!(formatted.contains("dataset:intellect-2-rl-dataset"));
330+
assert!(formatted.contains("version:v1"));
331+
assert_eq!(formatted.matches('|').count(), 2); // Should have 2 separators for 3 labels
332+
}
333+
334+
#[test]
335+
fn test_format_task_metadata_empty() {
336+
let task = Task {
337+
id: Uuid::new_v4(),
338+
image: "test".to_string(),
339+
name: "test".to_string(),
340+
state: TaskState::PENDING,
341+
metadata: None,
342+
..Default::default()
343+
};
344+
345+
let formatted = MetricsSyncService::format_task_metadata(&task);
346+
assert_eq!(formatted, "");
347+
}
348+
349+
#[test]
350+
fn test_format_task_metadata_empty_labels() {
351+
let task = Task {
352+
id: Uuid::new_v4(),
353+
image: "test".to_string(),
354+
name: "test".to_string(),
355+
state: TaskState::PENDING,
356+
metadata: Some(TaskMetadata {
357+
labels: Some(HashMap::new()),
358+
}),
359+
..Default::default()
360+
};
361+
362+
let formatted = MetricsSyncService::format_task_metadata(&task);
363+
assert_eq!(formatted, "");
364+
}
365+
366+
#[test]
367+
fn test_format_task_metadata_no_labels() {
368+
let task = Task {
369+
id: Uuid::new_v4(),
370+
image: "test".to_string(),
371+
name: "test".to_string(),
372+
state: TaskState::PENDING,
373+
metadata: Some(TaskMetadata { labels: None }),
374+
..Default::default()
375+
};
376+
377+
let formatted = MetricsSyncService::format_task_metadata(&task);
378+
assert_eq!(formatted, "");
379+
}
380+
}

0 commit comments

Comments
 (0)