Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit f3825b5

Browse files
authored
Feature: Advanced Webhooks (#369)
* introduce advanced webhook plugin - node group changes, node health changes, metrics changes
1 parent 0f38f96 commit f3825b5

18 files changed

Lines changed: 462 additions & 79 deletions

File tree

.env.example

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,8 @@ WORK_VALIDATION_CONTRACT=
3737
# Example: [{"server_url": "toploc url", "auth_token": "toploc auth token", "file_prefix_filter": "optional prefix filter"}]
3838
TOPLOC_CONFIGS=
3939
S3_CREDENTIALS=
40-
BUCKET_NAME=
40+
BUCKET_NAME=
41+
42+
# Webhook configurations as JSON array string
43+
# Example: [{"url": "webhook url", "bearer_token": "webhook bearer token"}]
44+
WEBHOOK_CONFIGS=

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ nalgebra = "0.33.2"
3535
redis = "0.28.1"
3636
redis-test = "0.8.0"
3737
stun = "0.7.0"
38+
mockito = "1.7.0"
3839

3940
[workspace.package]
4041
version = "0.2.11"

crates/orchestrator/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,6 @@ shared = { workspace = true }
2929
tokio = { workspace = true }
3030
url = { workspace = true }
3131
uuid = { workspace = true }
32+
33+
[dev-dependencies]
34+
mockito = { workspace = true }

crates/orchestrator/Dockerfile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ ENV S3_CREDENTIALS=""
2121
ENV BUCKET_NAME=""
2222
ENV LOG_LEVEL=""
2323
ENV HOURLY_S3_UPLOAD_LIMIT="2"
24-
ENV WEBHOOK_URLS=""
2524
ENV MODE="full"
2625
ENV NODE_GROUP_CONFIGS=""
2726
ENV NODE_GROUP_MANAGEMENT_INTERVAL="10"
27+
ENV WEBHOOK_CONFIGS=""
2828

2929
RUN echo '#!/bin/sh\n\
3030
exec /usr/local/bin/orchestrator \
@@ -44,7 +44,6 @@ $([ "$DISABLE_EJECTION" = "true" ] && echo "--disable-ejection") \
4444
$([ ! -z "$BUCKET_NAME" ] && echo "--bucket-name $BUCKET_NAME") \
4545
$([ ! -z "$LOG_LEVEL" ] && echo "--log-level $LOG_LEVEL") \
4646
$([ ! -z "$HOURLY_S3_UPLOAD_LIMIT" ] && echo "--hourly-s3-upload-limit $HOURLY_S3_UPLOAD_LIMIT") \
47-
$([ ! -z "$WEBHOOK_URLS" ] && echo "--webhook-urls $WEBHOOK_URLS") \
4847
$([ ! -z "$NODE_GROUP_MANAGEMENT_INTERVAL" ] && echo "--node-group-management-interval $NODE_GROUP_MANAGEMENT_INTERVAL") \
4948
"$@"' > /entrypoint.sh && \
5049
chmod +x /entrypoint.sh

crates/orchestrator/src/api/routes/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ mod tests {
426426
app_state.redis_store.clone(),
427427
app_state.store_context.clone(),
428428
None,
429+
None,
429430
);
430431

431432
let _ = plugin

crates/orchestrator/src/api/tests/helper.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
107107
store.clone(),
108108
store_context.clone(),
109109
None,
110+
None,
110111
)));
111112

112113
let mock_storage = MockStorageProvider::new();

crates/orchestrator/src/events/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use anyhow::Result;
22
use shared::models::task::Task;
33

4-
// TODO: Shouldnt this be handled in a tokio trhead?
54
pub trait TaskObserver: Send + Sync {
65
fn on_task_created(&self, task: &Task) -> Result<()>;
76
fn on_task_deleted(&self, task: Option<Task>) -> Result<()>;

crates/orchestrator/src/main.rs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ use log::debug;
2525
use log::error;
2626
use log::info;
2727
use log::LevelFilter;
28+
use metrics::webhook_sender::MetricsWebhookSender;
2829
use metrics::MetricsContext;
2930
use plugins::node_groups::NodeGroupConfiguration;
3031
use plugins::node_groups::NodeGroupsPlugin;
32+
use plugins::webhook::WebhookConfig;
3133
use plugins::webhook::WebhookPlugin;
3234
use plugins::SchedulerPlugin;
3335
use plugins::StatusUpdatePlugin;
@@ -112,10 +114,6 @@ struct Args {
112114
#[arg(short = 'l', long, default_value = "info")]
113115
log_level: String,
114116

115-
/// Webhook urls (comma-separated string)
116-
#[arg(long, default_value = "")]
117-
webhook_urls: Option<String>,
118-
119117
/// Node group management interval
120118
#[arg(long, default_value = "10")]
121119
node_group_management_interval: u64,
@@ -199,6 +197,40 @@ async fn main() -> Result<()> {
199197
let mut scheduler_plugins: Vec<Box<dyn SchedulerPlugin>> = Vec::new();
200198
let mut status_update_plugins: Vec<Box<dyn StatusUpdatePlugin>> = vec![];
201199
let mut node_groups_plugin: Option<Arc<NodeGroupsPlugin>> = None;
200+
let mut webhook_plugins: Vec<WebhookPlugin> = vec![];
201+
202+
let configs = std::env::var("WEBHOOK_CONFIGS").unwrap_or_default();
203+
match serde_json::from_str::<Vec<WebhookConfig>>(&configs) {
204+
Ok(configs) if !configs.is_empty() => {
205+
for config in configs {
206+
let plugin = WebhookPlugin::new(config);
207+
let plugin_clone = plugin.clone();
208+
webhook_plugins.push(plugin_clone);
209+
status_update_plugins.push(Box::new(plugin));
210+
}
211+
}
212+
Ok(_) => {
213+
info!("No webhook configurations provided");
214+
}
215+
Err(e) => {
216+
error!("Failed to parse webhook configs from environment: {}", e);
217+
}
218+
}
219+
220+
let webhook_sender_store = store_context.clone();
221+
let webhook_plugins_clone = webhook_plugins.clone();
222+
if !webhook_plugins_clone.is_empty() {
223+
tasks.spawn(async move {
224+
let mut webhook_sender = MetricsWebhookSender::new(
225+
webhook_sender_store.clone(),
226+
webhook_plugins_clone.clone(),
227+
);
228+
if let Err(e) = webhook_sender.run().await {
229+
error!("Error running webhook sender: {}", e);
230+
}
231+
Ok(())
232+
});
233+
}
202234

203235
// Load node group configurations from environment variable
204236
if let Ok(configs_json) = std::env::var("NODE_GROUP_CONFIGS") {
@@ -210,13 +242,14 @@ async fn main() -> Result<()> {
210242
store.clone(),
211243
group_store_context.clone(),
212244
Some(node_groups_heartbeats.clone()),
245+
Some(webhook_plugins.clone()),
213246
);
214247
let status_group_plugin = group_plugin.clone();
215248
let group_plugin_for_server = group_plugin.clone();
216249
node_groups_plugin = Some(Arc::new(group_plugin_for_server));
217250
scheduler_plugins.push(Box::new(group_plugin));
218251
status_update_plugins.push(Box::new(status_group_plugin));
219-
info!("Node group plugin initialized",);
252+
info!("Node group plugin initialized");
220253
}
221254
Ok(_) => {
222255
info!(
@@ -277,17 +310,6 @@ async fn main() -> Result<()> {
277310
let status_update_store_context = store_context.clone();
278311
let status_update_heartbeats = heartbeats.clone();
279312
let status_update_contracts = contracts.clone();
280-
let webhook_urls: Vec<String> = args
281-
.webhook_urls
282-
.clone()
283-
.unwrap_or_default()
284-
.split(',')
285-
.map(|s| s.to_string())
286-
.collect();
287-
288-
for url in webhook_urls {
289-
status_update_plugins.push(Box::new(WebhookPlugin::new(url.to_string())));
290-
}
291313

292314
tasks.spawn(async move {
293315
let status_updater = NodeStatusUpdater::new(

crates/orchestrator/src/metrics/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use prometheus::{GaugeVec, Opts, Registry, TextEncoder};
2+
pub mod webhook_sender;
23

34
pub struct MetricsContext {
45
pub compute_task_gauges: GaugeVec,

0 commit comments

Comments
 (0)