Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 0ad0f4e

Browse files
authored
imp(orchestrator): optimize orchestrator nodes / groups performance (#546)
* optimize orchestrator nodes / groups performance
1 parent 18d6883 commit 0ad0f4e

3 files changed

Lines changed: 215 additions & 27 deletions

File tree

crates/orchestrator/src/api/routes/nodes.rs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,34 @@ async fn get_nodes(query: Query<NodeQuery>, app_state: Data<AppState>) -> HttpRe
6868
if let Some(node_groups_plugin) = &app_state.node_groups_plugin {
6969
let mut nodes_with_groups = Vec::new();
7070

71-
for node in &nodes {
72-
let mut node_json = json!(node);
71+
let node_addresses: Vec<String> =
72+
nodes.iter().map(|node| node.address.to_string()).collect();
7373

74-
if let Ok(Some(group)) = node_groups_plugin
75-
.get_node_group(&node.address.to_string())
76-
.await
77-
{
78-
node_json["group"] = json!({
79-
"id": group.id,
80-
"size": group.nodes.len(),
81-
"created_at": group.created_at,
82-
"topology_config": group.configuration_name
83-
});
84-
}
74+
match node_groups_plugin
75+
.get_node_groups_batch(&node_addresses)
76+
.await
77+
{
78+
Ok(node_groups) => {
79+
for node in &nodes {
80+
let mut node_json = json!(node);
8581

86-
nodes_with_groups.push(node_json);
82+
if let Some(Some(group)) = node_groups.get(&node.address.to_string()) {
83+
node_json["group"] = json!({
84+
"id": group.id,
85+
"size": group.nodes.len(),
86+
"created_at": group.created_at,
87+
"topology_config": group.configuration_name
88+
});
89+
}
90+
91+
nodes_with_groups.push(node_json);
92+
}
93+
}
94+
Err(e) => {
95+
error!("Error getting node groups batch: {}", e);
96+
// Fall back to nodes without group information
97+
nodes_with_groups = nodes.iter().map(|node| json!(node)).collect();
98+
}
8799
}
88100

89101
response["nodes"] = json!(nodes_with_groups);

crates/orchestrator/src/main.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,25 @@ async fn main() -> Result<()> {
305305
Some(node_groups_heartbeats.clone()),
306306
Some(webhook_plugins.clone()),
307307
);
308+
309+
// Run groups index migration on startup
310+
match group_plugin.migrate_groups_index().await {
311+
Ok(count) => {
312+
if count > 0 {
313+
info!(
314+
"Groups index migration completed: {} groups migrated",
315+
count
316+
);
317+
} else {
318+
info!("Groups index migration: no groups to migrate");
319+
}
320+
}
321+
Err(e) => {
322+
error!("Groups index migration failed: {}", e);
323+
return Err(e);
324+
}
325+
}
326+
308327
let status_group_plugin = group_plugin.clone();
309328
let group_plugin_for_server = group_plugin.clone();
310329
node_groups_plugin = Some(Arc::new(group_plugin_for_server));

crates/orchestrator/src/plugins/node_groups/mod.rs

Lines changed: 170 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ mod tests;
2727
const GROUP_KEY_PREFIX: &str = "node_group:";
2828
const NODE_GROUP_MAP_KEY: &str = "node_to_group";
2929
const GROUP_TASK_KEY_PREFIX: &str = "group_task:";
30+
const GROUPS_INDEX_KEY: &str = "orchestrator:groups_index";
3031

3132
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
3233
pub struct NodeGroupConfiguration {
@@ -290,6 +291,9 @@ impl NodeGroupsPlugin {
290291
let group_data = serde_json::to_string(group)?;
291292
pipe.set(&group_key, group_data);
292293

294+
// Add group ID to groups index
295+
pipe.sadd(GROUPS_INDEX_KEY, &group.id);
296+
293297
// Map nodes to group
294298
for node in &group.nodes {
295299
pipe.hset(NODE_GROUP_MAP_KEY, node, &group.id);
@@ -324,6 +328,66 @@ impl NodeGroupsPlugin {
324328
Ok(None)
325329
}
326330

331+
pub async fn get_node_groups_batch(
332+
&self,
333+
node_addresses: &[String],
334+
) -> Result<HashMap<String, Option<NodeGroup>>, Error> {
335+
let mut conn = self.store.client.get_multiplexed_async_connection().await?;
336+
let mut result = HashMap::new();
337+
338+
if node_addresses.is_empty() {
339+
return Ok(result);
340+
}
341+
342+
let mut pipe = redis::pipe();
343+
for node_addr in node_addresses {
344+
pipe.hget(NODE_GROUP_MAP_KEY, node_addr);
345+
}
346+
let group_ids: Vec<Option<String>> = pipe.query_async(&mut conn).await?;
347+
348+
let unique_group_ids: HashSet<String> = group_ids
349+
.iter()
350+
.filter_map(|opt| opt.as_ref())
351+
.cloned()
352+
.collect();
353+
354+
// Step 3: Batch fetch all group data
355+
let group_data: HashMap<String, NodeGroup> = if !unique_group_ids.is_empty() {
356+
let group_keys: Vec<String> = unique_group_ids
357+
.iter()
358+
.map(|id| Self::get_group_key(id))
359+
.collect();
360+
361+
let group_values: Vec<Option<String>> = conn.mget(&group_keys).await?;
362+
363+
unique_group_ids
364+
.into_iter()
365+
.zip(group_values.into_iter())
366+
.filter_map(|(group_id, group_json)| {
367+
group_json.and_then(|json| {
368+
serde_json::from_str::<NodeGroup>(&json)
369+
.map_err(|e| {
370+
error!("Failed to parse group {} data: {}", group_id, e);
371+
e
372+
})
373+
.ok()
374+
.map(|group| (group_id, group))
375+
})
376+
})
377+
.collect()
378+
} else {
379+
HashMap::new()
380+
};
381+
382+
// Step 4: Build result mapping node addresses to their groups
383+
for (node_addr, group_id) in node_addresses.iter().zip(group_ids.iter()) {
384+
let group = group_id.as_ref().and_then(|id| group_data.get(id)).cloned();
385+
result.insert(node_addr.clone(), group);
386+
}
387+
388+
Ok(result)
389+
}
390+
327391
pub async fn get_available_configurations(&self) -> Vec<NodeGroupConfiguration> {
328392
let mut conn = match self.store.client.get_multiplexed_async_connection().await {
329393
Ok(conn) => conn,
@@ -790,6 +854,9 @@ impl NodeGroupsPlugin {
790854
}
791855
}
792856

857+
// Remove group ID from groups index
858+
pipe.srem(GROUPS_INDEX_KEY, group_id);
859+
793860
// Delete group task assignment
794861
let task_key = format!("{}{}", GROUP_TASK_KEY_PREFIX, group_id);
795862
pipe.del(&task_key);
@@ -803,6 +870,9 @@ impl NodeGroupsPlugin {
803870
let group_data = serde_json::to_string(&merged_group)?;
804871
pipe.set(&group_key, group_data);
805872

873+
// Add new group ID to groups index
874+
pipe.sadd(GROUPS_INDEX_KEY, &new_group_id);
875+
806876
// Map nodes to new group
807877
for node in merged_nodes {
808878
pipe.hset(NODE_GROUP_MAP_KEY, node, &new_group_id);
@@ -933,6 +1003,9 @@ impl NodeGroupsPlugin {
9331003
pipe.hdel(NODE_GROUP_MAP_KEY, node);
9341004
}
9351005

1006+
// Remove group ID from groups index
1007+
pipe.srem(GROUPS_INDEX_KEY, group_id);
1008+
9361009
// Delete group task assignment
9371010
let task_key = format!("{}{}", GROUP_TASK_KEY_PREFIX, group_id);
9381011
debug!("Deleting group task assignment from key: {}", task_key);
@@ -1011,31 +1084,33 @@ impl NodeGroupsPlugin {
10111084
debug!("Getting all groups");
10121085
let mut conn = self.store.client.get_multiplexed_async_connection().await?;
10131086

1014-
// Get all node-to-group mappings
1015-
let node_mappings: HashMap<String, String> = conn.hgetall(NODE_GROUP_MAP_KEY).await?;
1087+
// Use SMEMBERS to get all group IDs from the groups index
1088+
let group_ids: Vec<String> = conn.smembers(GROUPS_INDEX_KEY).await?;
10161089

1017-
if node_mappings.is_empty() {
1018-
debug!("No node mappings found");
1090+
if group_ids.is_empty() {
1091+
debug!("No groups found in index");
10191092
return Ok(Vec::new());
10201093
}
10211094

1022-
// Collect unique group IDs
1023-
let group_ids: HashSet<String> = node_mappings.values().cloned().collect();
1024-
debug!("Found {} unique group IDs", group_ids.len());
1095+
debug!("Found {} group IDs in index", group_ids.len());
10251096

1026-
// Fetch each group's data
1097+
// Use MGET to batch fetch all group data
1098+
let group_keys: Vec<String> = group_ids.iter().map(|id| Self::get_group_key(id)).collect();
1099+
1100+
let group_values: Vec<Option<String>> = conn.mget(&group_keys).await?;
1101+
1102+
// Parse the group data
10271103
let mut groups = Vec::new();
1028-
for group_id in group_ids {
1029-
let group_key = Self::get_group_key(&group_id);
1030-
if let Some(group_data) = conn.get::<_, Option<String>>(&group_key).await? {
1031-
match serde_json::from_str::<NodeGroup>(&group_data) {
1104+
for (group_id, group_data) in group_ids.iter().zip(group_values.iter()) {
1105+
if let Some(group_data) = group_data {
1106+
match serde_json::from_str::<NodeGroup>(group_data) {
10321107
Ok(group) => groups.push(group),
10331108
Err(e) => {
10341109
error!("Failed to parse group {} data: {}", group_id, e);
10351110
}
10361111
}
10371112
} else {
1038-
warn!("Group {} exists in mapping but has no data", group_id);
1113+
warn!("Group {} exists in index but has no data", group_id);
10391114
}
10401115
}
10411116

@@ -1063,6 +1138,88 @@ impl NodeGroupsPlugin {
10631138
Ok(mappings)
10641139
}
10651140

1141+
/// Migrate existing group data to populate the groups index
1142+
/// This method should be called once after deploying the new groups index feature
1143+
pub async fn migrate_groups_index(&self) -> Result<usize, Error> {
1144+
debug!("Starting groups index migration");
1145+
let mut conn = self.store.client.get_multiplexed_async_connection().await?;
1146+
1147+
// Check if migration is needed by seeing if groups index is empty
1148+
let existing_groups_in_index: Vec<String> = conn.smembers(GROUPS_INDEX_KEY).await?;
1149+
if !existing_groups_in_index.is_empty() {
1150+
info!(
1151+
"Groups index already contains {} groups, migration appears completed",
1152+
existing_groups_in_index.len()
1153+
);
1154+
return Ok(existing_groups_in_index.len());
1155+
}
1156+
1157+
// Get all node-to-group mappings using the old method
1158+
let node_mappings: HashMap<String, String> = conn.hgetall(NODE_GROUP_MAP_KEY).await?;
1159+
1160+
if node_mappings.is_empty() {
1161+
info!("No existing groups found to migrate");
1162+
return Ok(0);
1163+
}
1164+
1165+
// Collect unique group IDs from node mappings
1166+
let existing_group_ids: HashSet<String> = node_mappings.values().cloned().collect();
1167+
info!(
1168+
"Found {} unique group IDs to migrate",
1169+
existing_group_ids.len()
1170+
);
1171+
1172+
// Verify these groups actually exist by checking their keys
1173+
let group_keys: Vec<String> = existing_group_ids
1174+
.iter()
1175+
.map(|id| Self::get_group_key(id))
1176+
.collect();
1177+
1178+
let group_values: Vec<Option<String>> = conn.mget(&group_keys).await?;
1179+
1180+
let mut valid_group_ids = Vec::new();
1181+
for (group_id, group_data) in existing_group_ids.iter().zip(group_values.iter()) {
1182+
if group_data.is_some() {
1183+
valid_group_ids.push(group_id.clone());
1184+
} else {
1185+
warn!(
1186+
"Group {} exists in mappings but has no data, skipping",
1187+
group_id
1188+
);
1189+
}
1190+
}
1191+
1192+
if valid_group_ids.is_empty() {
1193+
info!("No valid groups found to migrate");
1194+
return Ok(0);
1195+
}
1196+
1197+
// Add all valid group IDs to the groups index in a single operation
1198+
let _: () = conn.sadd(GROUPS_INDEX_KEY, &valid_group_ids).await?;
1199+
1200+
info!(
1201+
"Successfully migrated {} groups to groups index",
1202+
valid_group_ids.len()
1203+
);
1204+
1205+
// Verify the migration by checking the index
1206+
let migrated_count: usize = conn.scard(GROUPS_INDEX_KEY).await?;
1207+
if migrated_count != valid_group_ids.len() {
1208+
error!(
1209+
"Migration verification failed: expected {} groups in index, found {}",
1210+
valid_group_ids.len(),
1211+
migrated_count
1212+
);
1213+
} else {
1214+
info!(
1215+
"Migration verification successful: {} groups in index",
1216+
migrated_count
1217+
);
1218+
}
1219+
1220+
Ok(valid_group_ids.len())
1221+
}
1222+
10661223
/// Get all groups assigned to a specific task
10671224
/// Returns a list of group IDs that are currently working on the given task
10681225
pub async fn get_groups_for_task(&self, task_id: &str) -> Result<Vec<String>, Error> {

0 commit comments

Comments
 (0)