Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 7892822

Browse files
authored
Fix: multiple group assignments (#327)
* improve redis tests, fix node group assignments * add groups to api endpoint
1 parent d5af5da commit 7892822

7 files changed

Lines changed: 352 additions & 46 deletions

File tree

crates/orchestrator/src/api/routes/nodes.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::str::FromStr;
1111
use std::time::Duration;
1212
// Timeout for node operations in seconds
1313
const NODE_REQUEST_TIMEOUT: u64 = 30;
14-
1514
async fn get_nodes(app_state: Data<AppState>) -> HttpResponse {
1615
let nodes = app_state.store_context.node_store.get_nodes();
1716

@@ -29,11 +28,34 @@ async fn get_nodes(app_state: Data<AppState>) -> HttpResponse {
2928
}
3029
}
3130

32-
HttpResponse::Ok().json(json!({
31+
let mut response = json!({
3332
"success": true,
3433
"nodes": nodes,
3534
"counts": status_counts
36-
}))
35+
});
36+
37+
// If node groups plugin exists, add group information to each node
38+
if let Some(node_groups_plugin) = &app_state.node_groups_plugin {
39+
let mut nodes_with_groups = Vec::new();
40+
41+
for node in &nodes {
42+
let mut node_json = json!(node);
43+
44+
if let Ok(Some(group)) = node_groups_plugin.get_node_group(&node.address.to_string()) {
45+
node_json["group"] = json!({
46+
"id": group.id,
47+
"size": group.nodes.len(),
48+
"created_at": group.created_at
49+
});
50+
}
51+
52+
nodes_with_groups.push(node_json);
53+
}
54+
55+
response["nodes"] = json!(nodes_with_groups);
56+
}
57+
58+
HttpResponse::Ok().json(response)
3759
}
3860

3961
async fn restart_node_task(node_id: web::Path<String>, app_state: Data<AppState>) -> HttpResponse {

crates/orchestrator/src/api/server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::api::routes::task::tasks_routes;
44
use crate::api::routes::{heartbeat::heartbeat_routes, metrics::metrics_routes};
55
use crate::models::node::NodeStatus;
66
use crate::scheduler::Scheduler;
7+
use crate::status_update::plugins::node_groups::NodeGroupsPlugin;
78
use crate::store::core::{RedisStore, StoreContext};
89
use crate::utils::loop_heartbeats::LoopHeartbeats;
910
use crate::ServerMode;
@@ -30,6 +31,7 @@ pub struct AppState {
3031
pub contracts: Option<Arc<Contracts>>,
3132
pub pool_id: u32,
3233
pub scheduler: Scheduler,
34+
pub node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
3335
}
3436

3537
#[allow(clippy::too_many_arguments)]
@@ -48,6 +50,7 @@ pub async fn start_server(
4850
pool_id: u32,
4951
server_mode: ServerMode,
5052
scheduler: Scheduler,
53+
node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
5154
) -> Result<(), Error> {
5255
info!("Starting server at http://{}:{}", host, port);
5356
let app_state = Data::new(AppState {
@@ -61,6 +64,7 @@ pub async fn start_server(
6164
contracts,
6265
pool_id,
6366
scheduler,
67+
node_groups_plugin,
6468
});
6569
let node_store = app_state.store_context.node_store.clone();
6670
let node_store_clone = node_store.clone();

crates/orchestrator/src/api/tests/helper.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub async fn create_test_app_state() -> Data<AppState> {
5252
hourly_upload_limit: 12,
5353
redis_store: store.clone(),
5454
scheduler,
55+
node_groups_plugin: None,
5556
})
5657
}
5758

crates/orchestrator/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,16 @@ async fn main() -> Result<()> {
201201
let group_store_context = store_context.clone();
202202
let mut scheduler_plugins: Vec<Box<dyn SchedulerPlugin>> = Vec::new();
203203
let mut status_update_plugins: Vec<Box<dyn StatusUpdatePlugin>> = vec![];
204+
let mut node_groups_plugin: Option<Arc<NodeGroupsPlugin>> = None;
204205

205206
// Add group plugin if enabled
206207
if args.with_basic_group_plugin {
207208
let group_size: usize = args.group_size as usize;
208209
let group_plugin =
209210
NodeGroupsPlugin::new(group_size, group_size, store.clone(), group_store_context);
210211
let status_group_plugin = group_plugin.clone();
212+
let group_plugin_for_server = group_plugin.clone();
213+
node_groups_plugin = Some(Arc::new(group_plugin_for_server));
211214
scheduler_plugins.push(Box::new(group_plugin));
212215
status_update_plugins.push(Box::new(status_group_plugin));
213216
}
@@ -295,6 +298,7 @@ async fn main() -> Result<()> {
295298
compute_pool_id,
296299
server_mode,
297300
scheduler,
301+
node_groups_plugin,
298302
) => {
299303
if let Err(e) = res {
300304
error!("Server error: {}", e);

0 commit comments

Comments
 (0)