Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit aa081ed

Browse files
authored
imp(orchestrator): cleanup mutex handling to proper async (#530)
* cleanup mutex handling to proper async
1 parent 3f69c78 commit aa081ed

8 files changed

Lines changed: 380 additions & 184 deletions

File tree

crates/orchestrator/src/api/routes/storage.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ fn generate_file_name(template: &str, original_name: &str) -> String {
335335
mod tests {
336336

337337
use std::collections::HashMap;
338+
use std::sync::Arc;
338339

339340
use super::*;
340341
use crate::plugins::StatusUpdatePlugin;
@@ -580,13 +581,14 @@ mod tests {
580581
compute_requirements: None,
581582
};
582583

583-
let plugin = NodeGroupsPlugin::new(
584+
let plugin = Arc::new(NodeGroupsPlugin::new(
584585
vec![config],
585586
app_state.redis_store.clone(),
586587
app_state.store_context.clone(),
587588
None,
588589
None,
589-
);
590+
));
591+
let _ = plugin.clone().register_observer().await;
590592

591593
let _ = plugin
592594
.handle_status_change(&node, &NodeStatus::Healthy)

crates/orchestrator/src/main.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,14 @@ async fn main() -> Result<()> {
259259

260260
let status_group_plugin = group_plugin.clone();
261261
let group_plugin_for_server = group_plugin.clone();
262-
node_groups_plugin = Some(Arc::new(group_plugin_for_server));
262+
let group_plugin_arc = Arc::new(group_plugin_for_server);
263+
264+
// Register the plugin as a task observer
265+
if let Err(e) = group_plugin_arc.clone().register_observer().await {
266+
error!("Failed to register node groups plugin as observer: {}", e);
267+
}
268+
269+
node_groups_plugin = Some(group_plugin_arc);
263270
scheduler_plugins.push(Box::new(group_plugin));
264271
status_update_plugins.push(Box::new(status_group_plugin));
265272
info!("Plugin: Node group plugin initialized");

crates/orchestrator/src/plugins/node_groups/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -166,22 +166,24 @@ impl NodeGroupsPlugin {
166166
}
167167
});
168168

169-
let plugin = Self {
169+
Self {
170170
configuration_templates: sorted_configs,
171171
store,
172172
store_context,
173173
node_groups_heartbeats,
174174
webhook_plugins,
175175
task_switching_policy,
176176
proximity_optimization_policy,
177-
};
177+
}
178+
}
178179

179-
plugin
180-
.store_context
180+
/// Register this plugin as a task observer (async)
181+
pub async fn register_observer(self: Arc<Self>) -> Result<()> {
182+
self.store_context
181183
.task_store
182-
.add_observer(Arc::new(plugin.clone()));
183-
184-
plugin
184+
.add_observer(self.clone())
185+
.await;
186+
Ok(())
185187
}
186188

187189
/// Check if a node is compatible with a configuration's compute requirements

0 commit comments

Comments
 (0)