Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 38bd92b

Browse files
authored
Improvement/plugin folder structure (#339)
* restructure plugin approach and move all plugins to base plugins folder
1 parent 712168b commit 38bd92b

16 files changed

Lines changed: 1547 additions & 1531 deletions

File tree

crates/orchestrator/src/api/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use crate::api::routes::storage::storage_routes;
33
use crate::api::routes::task::tasks_routes;
44
use crate::api::routes::{heartbeat::heartbeat_routes, metrics::metrics_routes};
55
use crate::models::node::NodeStatus;
6+
use crate::plugins::node_groups::NodeGroupsPlugin;
67
use crate::scheduler::Scheduler;
7-
use crate::status_update::plugins::node_groups::NodeGroupsPlugin;
88
use crate::store::core::{RedisStore, StoreContext};
99
use crate::utils::loop_heartbeats::LoopHeartbeats;
1010
use crate::ServerMode;

crates/orchestrator/src/main.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod api;
22
mod discovery;
33
mod models;
44
mod node;
5-
mod prelude;
5+
mod plugins;
66
mod scheduler;
77
mod status_update;
88
mod store;
@@ -11,7 +11,6 @@ use crate::api::server::start_server;
1111
use crate::discovery::monitor::DiscoveryMonitor;
1212
use crate::node::invite::NodeInviter;
1313
use crate::scheduler::Scheduler;
14-
use crate::status_update::plugins::webhook::WebhookPlugin;
1514
use crate::status_update::NodeStatusUpdater;
1615
use crate::store::core::RedisStore;
1716
use crate::store::core::StoreContext;
@@ -24,13 +23,14 @@ use log::debug;
2423
use log::error;
2524
use log::info;
2625
use log::LevelFilter;
27-
use scheduler::plugins::SchedulerPlugin;
26+
use plugins::node_groups::NodeGroupConfiguration;
27+
use plugins::node_groups::NodeGroupsPlugin;
28+
use plugins::webhook::WebhookPlugin;
29+
use plugins::SchedulerPlugin;
30+
use plugins::StatusUpdatePlugin;
2831
use shared::web3::contracts::core::builder::ContractBuilder;
2932
use shared::web3::contracts::structs::compute_pool::PoolStatus;
3033
use shared::web3::wallet::Wallet;
31-
use status_update::plugins::node_groups::NodeGroupConfiguration;
32-
use status_update::plugins::node_groups::NodeGroupsPlugin;
33-
use status_update::plugins::StatusUpdatePlugin;
3434
use std::sync::Arc;
3535
use tokio::task::JoinSet;
3636
use url::Url;
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
mod traits;
2+
pub use traits::*;
3+
4+
pub mod node_groups;
5+
6+
pub mod newest_task;
7+
8+
pub mod webhook;
File renamed without changes.
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
use super::{Plugin, SchedulerPlugin};
2+
use crate::store::core::{RedisStore, StoreContext};
3+
use anyhow::Error;
4+
use anyhow::Result;
5+
use log::warn;
6+
use redis::{Commands, Script};
7+
use serde::{Deserialize, Serialize};
8+
use shared::models::node::ComputeRequirements;
9+
use shared::models::task::Task;
10+
use std::{collections::BTreeSet, sync::Arc};
11+
use std::{collections::HashSet, str::FromStr};
12+
13+
pub mod scheduler_impl;
14+
pub mod status_update_impl;
15+
#[cfg(test)]
16+
mod tests;
17+
18+
const GROUP_KEY_PREFIX: &str = "node_group:";
19+
const NODE_GROUP_MAP_KEY: &str = "node_to_group";
20+
const GROUP_TASK_KEY_PREFIX: &str = "group_task:";
21+
22+
#[derive(Debug, Serialize, Deserialize, Clone)]
23+
pub struct NodeGroupConfiguration {
24+
name: String,
25+
min_group_size: usize,
26+
max_group_size: usize,
27+
#[serde(deserialize_with = "deserialize_compute_requirements")]
28+
compute_requirements: Option<ComputeRequirements>,
29+
}
30+
31+
fn deserialize_compute_requirements<'de, D>(
32+
deserializer: D,
33+
) -> Result<Option<ComputeRequirements>, D::Error>
34+
where
35+
D: serde::Deserializer<'de>,
36+
{
37+
let s: Option<String> = Option::deserialize(deserializer)?;
38+
match s {
39+
Some(s) => ComputeRequirements::from_str(&s)
40+
.map(Some)
41+
.map_err(serde::de::Error::custom),
42+
None => Ok(None),
43+
}
44+
}
45+
46+
impl NodeGroupConfiguration {
47+
pub fn is_valid(&self) -> bool {
48+
if self.max_group_size < self.min_group_size {
49+
return false;
50+
}
51+
true
52+
}
53+
}
54+
55+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
56+
pub struct NodeGroup {
57+
pub id: String,
58+
pub nodes: BTreeSet<String>,
59+
pub created_at: chrono::DateTime<chrono::Utc>,
60+
pub configuration_name: String,
61+
}
62+
63+
#[derive(Clone)]
64+
pub struct NodeGroupsPlugin {
65+
configurations: Vec<NodeGroupConfiguration>,
66+
store: Arc<RedisStore>,
67+
store_context: Arc<StoreContext>,
68+
}
69+
70+
impl NodeGroupsPlugin {
71+
pub fn new(
72+
configurations: Vec<NodeGroupConfiguration>,
73+
store: Arc<RedisStore>,
74+
store_context: Arc<StoreContext>,
75+
) -> Self {
76+
let mut sorted_configs = configurations;
77+
78+
// Check for duplicate configuration names
79+
let mut seen_names = HashSet::new();
80+
for config in &sorted_configs {
81+
if !seen_names.insert(config.name.clone()) {
82+
panic!("Configuration names must be unique");
83+
}
84+
if !config.is_valid() {
85+
panic!("Plugin configuration is invalid");
86+
}
87+
}
88+
89+
sorted_configs.sort_by(|a, b| b.min_group_size.cmp(&a.min_group_size));
90+
91+
Self {
92+
configurations: sorted_configs,
93+
store,
94+
store_context,
95+
}
96+
}
97+
98+
fn generate_group_id() -> String {
99+
use rand::Rng;
100+
let mut rng = rand::rng();
101+
format!("group_{}", rng.random::<u64>())
102+
}
103+
104+
fn get_group_key(group_id: &str) -> String {
105+
format!("{}{}", GROUP_KEY_PREFIX, group_id)
106+
}
107+
108+
pub fn get_node_group(&self, node_addr: &str) -> Result<Option<NodeGroup>, Error> {
109+
let mut conn = self.store.client.get_connection()?;
110+
111+
let group_id: Option<String> = conn.hget(NODE_GROUP_MAP_KEY, node_addr)?;
112+
if let Some(group_id) = group_id {
113+
let group_key = Self::get_group_key(&group_id);
114+
let group_data: Option<String> = conn.get(&group_key)?;
115+
if let Some(group_data) = group_data {
116+
return Ok(Some(serde_json::from_str(&group_data)?));
117+
}
118+
}
119+
120+
Ok(None)
121+
}
122+
123+
fn get_current_group_task(&self, group_id: &str) -> Result<Option<Task>, Error> {
124+
let mut conn = self.store.client.get_connection()?;
125+
let task_key = format!("{}{}", GROUP_TASK_KEY_PREFIX, group_id);
126+
let task_id: Option<String> = conn.get(&task_key)?;
127+
128+
if let Some(task_id) = task_id {
129+
if let Some(task) = self.store_context.task_store.get_task(&task_id) {
130+
return Ok(Some(task));
131+
}
132+
133+
warn!("Task id set but task not found");
134+
let script = Script::new(
135+
r#"
136+
local task_key = KEYS[1]
137+
local expected_task_id = ARGV[1]
138+
139+
local current_task_id = redis.call('GET', task_key)
140+
if current_task_id == expected_task_id then
141+
redis.call('DEL', task_key)
142+
return 1
143+
else
144+
return 0
145+
end
146+
"#,
147+
);
148+
149+
let _: () = script.key(&task_key).arg(task_id).invoke(&mut conn)?;
150+
}
151+
Ok(None)
152+
}
153+
154+
fn assign_task_to_group(&self, group_id: &str, task_id: &str) -> Result<bool, Error> {
155+
let mut conn = self.store.client.get_connection()?;
156+
let task_key = format!("{}{}", GROUP_TASK_KEY_PREFIX, group_id);
157+
let result: bool = conn.set_nx::<_, _, bool>(&task_key, task_id)?;
158+
Ok(result)
159+
}
160+
}
161+
162+
impl Plugin for NodeGroupsPlugin {}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use super::{NodeGroupsPlugin, SchedulerPlugin};
2+
use alloy::primitives::Address;
3+
use log::{error, info};
4+
use rand::seq::IndexedRandom;
5+
use shared::models::task::Task;
6+
use std::str::FromStr;
7+
8+
impl SchedulerPlugin for NodeGroupsPlugin {
9+
fn filter_tasks(&self, tasks: &[Task], node_address: &Address) -> Vec<Task> {
10+
if let Ok(Some(group)) = self.get_node_group(&node_address.to_string()) {
11+
info!(
12+
"Node {} is in group {} with {} nodes",
13+
node_address,
14+
group.id,
15+
group.nodes.len()
16+
);
17+
18+
let node_group_index = group
19+
.nodes
20+
.iter()
21+
.position(|n| n == &node_address.to_string())
22+
.unwrap();
23+
24+
let mut current_task: Option<Task> = None;
25+
match self.get_current_group_task(&group.id) {
26+
Ok(Some(task)) => {
27+
current_task = Some(task);
28+
}
29+
Ok(None) => {
30+
if tasks.is_empty() {
31+
return vec![];
32+
}
33+
34+
let applicable_tasks: Vec<Task> = tasks
35+
.iter()
36+
.filter(|&task| match &task.scheduling_config {
37+
None => true,
38+
Some(config) => {
39+
match config.plugins.as_ref().and_then(|p| p.get("node_groups")) {
40+
None => true,
41+
Some(node_config) => {
42+
match node_config.get("allowed_topologies") {
43+
None => true,
44+
Some(topologies) => {
45+
topologies.contains(&group.configuration_name)
46+
}
47+
}
48+
}
49+
}
50+
}
51+
})
52+
.cloned()
53+
.collect();
54+
if applicable_tasks.is_empty() {
55+
return vec![];
56+
}
57+
58+
if let Some(new_task) = applicable_tasks.choose(&mut rand::rng()) {
59+
let task_id = new_task.id.to_string();
60+
match self.assign_task_to_group(&group.id, &task_id) {
61+
Ok(true) => {
62+
// Successfully assigned the task
63+
current_task = Some(new_task.clone());
64+
}
65+
Ok(false) => {
66+
// Another node already assigned a task, try to get it
67+
if let Ok(Some(task)) = self.get_current_group_task(&group.id) {
68+
current_task = Some(task);
69+
}
70+
}
71+
Err(e) => {
72+
error!("Failed to assign task to group: {}", e);
73+
}
74+
}
75+
}
76+
}
77+
_ => {}
78+
}
79+
80+
if let Some(t) = current_task {
81+
let mut task_clone = t.clone();
82+
83+
let next_node_idx = (node_group_index + 1) % group.nodes.len();
84+
let next_node_addr = group.nodes.iter().nth(next_node_idx).unwrap();
85+
86+
// Get p2p_id for next node from node store
87+
let next_p2p_id = if let Some(next_node) = self
88+
.store_context
89+
.node_store
90+
.get_node(&Address::from_str(next_node_addr).unwrap())
91+
{
92+
next_node.p2p_id.unwrap_or_default()
93+
} else {
94+
String::new()
95+
};
96+
97+
let mut env_vars = task_clone.env_vars.unwrap_or_default();
98+
env_vars.insert("GROUP_INDEX".to_string(), node_group_index.to_string());
99+
for (_, value) in env_vars.iter_mut() {
100+
let new_value = value
101+
.replace("${GROUP_INDEX}", &node_group_index.to_string())
102+
.replace("${GROUP_SIZE}", &group.nodes.len().to_string())
103+
.replace("${NEXT_P2P_ADDRESS}", &next_p2p_id)
104+
.replace("${GROUP_ID}", &group.id);
105+
106+
*value = new_value;
107+
}
108+
task_clone.env_vars = Some(env_vars);
109+
task_clone.args = task_clone.args.map(|args| {
110+
args.into_iter()
111+
.map(|arg| {
112+
arg.replace("${GROUP_INDEX}", &node_group_index.to_string())
113+
.replace("${GROUP_SIZE}", &group.nodes.len().to_string())
114+
.replace("${NEXT_P2P_ADDRESS}", &next_p2p_id)
115+
.replace("${GROUP_ID}", &group.id)
116+
})
117+
.collect::<Vec<String>>()
118+
});
119+
return vec![task_clone];
120+
}
121+
}
122+
info!(
123+
"Node {} is not in a group, skipping all tasks",
124+
node_address
125+
);
126+
vec![]
127+
}
128+
}

0 commit comments

Comments
 (0)