Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit efcd364

Browse files
authored
improve webhook resiliency (#515)
1 parent a042abf commit efcd364

1 file changed

Lines changed: 199 additions & 44 deletions

File tree

  • crates/orchestrator/src/plugins/webhook

crates/orchestrator/src/plugins/webhook/mod.rs

Lines changed: 199 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
66
use crate::models::node::{NodeStatus, OrchestratorNode};
77

88
use super::{Plugin, StatusUpdatePlugin};
9-
use log::{debug, error};
9+
use log::{error, info, warn};
1010

1111
#[derive(Debug, Clone, Serialize, Deserialize)]
1212
#[serde(tag = "event", content = "data")]
@@ -70,20 +70,16 @@ pub struct WebhookPlugin {
7070
}
7171

7272
impl WebhookPlugin {
73+
const MAX_RETRIES: u32 = 5;
74+
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
75+
const BASE_RETRY_DELAY_MS: u64 = 500;
76+
const MAX_RETRY_DELAY_MS: u64 = 10000;
77+
7378
pub fn new(webhook_config: WebhookConfig) -> Self {
7479
let client = Arc::new(
7580
reqwest::Client::builder()
76-
.default_headers({
77-
let mut headers = reqwest::header::HeaderMap::new();
78-
if let Some(token) = &webhook_config.bearer_token {
79-
headers.insert(
80-
reqwest::header::AUTHORIZATION,
81-
reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token))
82-
.expect("Invalid token"),
83-
);
84-
}
85-
headers
86-
})
81+
.default_headers(Self::build_headers(&webhook_config.bearer_token))
82+
.timeout(Self::REQUEST_TIMEOUT)
8783
.build()
8884
.expect("Failed to build HTTP client"),
8985
);
@@ -94,44 +90,145 @@ impl WebhookPlugin {
9490
}
9591
}
9692

93+
fn build_headers(bearer_token: &Option<String>) -> reqwest::header::HeaderMap {
94+
let mut headers = reqwest::header::HeaderMap::new();
95+
if let Some(token) = bearer_token {
96+
headers.insert(
97+
reqwest::header::AUTHORIZATION,
98+
reqwest::header::HeaderValue::from_str(&format!("Bearer {}", token))
99+
.expect("Invalid bearer token"),
100+
);
101+
}
102+
headers
103+
}
104+
105+
fn calculate_retry_delay(attempt: u32) -> Duration {
106+
let delay_ms = std::cmp::min(
107+
Self::BASE_RETRY_DELAY_MS * (2_u64.pow(attempt)),
108+
Self::MAX_RETRY_DELAY_MS,
109+
);
110+
Duration::from_millis(delay_ms)
111+
}
112+
113+
fn get_event_name(event: &WebhookEvent) -> &'static str {
114+
match event {
115+
WebhookEvent::NodeStatusChanged { .. } => "node.status_changed",
116+
WebhookEvent::GroupCreated { .. } => "group.created",
117+
WebhookEvent::GroupDestroyed { .. } => "group.destroyed",
118+
WebhookEvent::MetricsUpdated { .. } => "metrics.updated",
119+
}
120+
}
121+
122+
async fn send_webhook_request(&self, payload: &WebhookPayload) -> Result<(), Error> {
123+
let start_time = std::time::Instant::now();
124+
125+
let response = self
126+
.client
127+
.post(&self.webhook_url)
128+
.json(payload)
129+
.send()
130+
.await?;
131+
132+
let duration = start_time.elapsed();
133+
let status = response.status();
134+
135+
if response.status().is_success() {
136+
info!(
137+
"Webhook '{}' sent successfully to {} (HTTP {}, took {:?})",
138+
Self::get_event_name(&payload.event),
139+
self.webhook_url,
140+
status,
141+
duration
142+
);
143+
Ok(())
144+
} else {
145+
let error_text = response
146+
.text()
147+
.await
148+
.unwrap_or_else(|_| "Failed to read error response".to_string());
149+
Err(anyhow::anyhow!(
150+
"HTTP {} after {:?}: {}",
151+
status,
152+
duration,
153+
error_text
154+
))
155+
}
156+
}
157+
158+
async fn send_with_retry(&self, payload: WebhookPayload) -> Result<(), Error> {
159+
let event_name = Self::get_event_name(&payload.event);
160+
let mut last_error = None;
161+
162+
info!(
163+
"Sending webhook '{}' to {} (max {} retries)",
164+
event_name,
165+
self.webhook_url,
166+
Self::MAX_RETRIES
167+
);
168+
169+
for attempt in 0..=Self::MAX_RETRIES {
170+
match self.send_webhook_request(&payload).await {
171+
Ok(()) => {
172+
if attempt > 0 {
173+
info!(
174+
"Webhook '{}' succeeded on attempt {} after retries",
175+
event_name,
176+
attempt + 1
177+
);
178+
}
179+
return Ok(());
180+
}
181+
Err(e) => {
182+
if attempt < Self::MAX_RETRIES {
183+
let delay = Self::calculate_retry_delay(attempt);
184+
warn!(
185+
"Webhook '{}' attempt {} failed: {}, retrying in {:?}",
186+
event_name,
187+
attempt + 1,
188+
e,
189+
delay
190+
);
191+
tokio::time::sleep(delay).await;
192+
}
193+
last_error = Some(e);
194+
}
195+
}
196+
}
197+
198+
let error = last_error.unwrap_or_else(|| anyhow::anyhow!("Unknown error"));
199+
error!(
200+
"Failed to send webhook '{}' to {} after {} attempts: {}",
201+
event_name,
202+
self.webhook_url,
203+
Self::MAX_RETRIES + 1,
204+
error
205+
);
206+
Err(error)
207+
}
208+
97209
async fn send_event(&self, event: WebhookEvent) -> Result<(), Error> {
98210
let payload = WebhookPayload::new(event);
99-
let webhook_url = self.webhook_url.clone();
100-
let client = self.client.clone();
101211

102212
#[cfg(not(test))]
103213
{
214+
let webhook_url = self.webhook_url.clone();
215+
let client = self.client.clone();
104216
tokio::spawn(async move {
105-
if let Err(e) = client
106-
.post(&webhook_url)
107-
.json(&payload)
108-
.timeout(Duration::from_secs(5))
109-
.send()
110-
.await
111-
{
112-
error!("Failed to send webhook to {}: {}", webhook_url, e);
113-
} else {
114-
debug!("Webhook to {} triggered successfully", webhook_url);
217+
let plugin = WebhookPlugin {
218+
webhook_url,
219+
client,
220+
};
221+
if let Err(e) = plugin.send_with_retry(payload).await {
222+
// Error already logged in send_with_retry
223+
let _ = e;
115224
}
116225
});
117226
Ok(())
118227
}
119228

120229
#[cfg(test)]
121230
{
122-
if let Err(e) = client
123-
.post(&webhook_url)
124-
.json(&payload)
125-
.timeout(Duration::from_secs(5))
126-
.send()
127-
.await
128-
{
129-
error!("Failed to send webhook to {}: {}", webhook_url, e);
130-
Err(e.into())
131-
} else {
132-
debug!("Webhook to {} triggered successfully", webhook_url);
133-
Ok(())
134-
}
231+
self.send_with_retry(payload).await
135232
}
136233
}
137234

@@ -250,17 +347,16 @@ mod tests {
250347
"old_status": "Dead",
251348
"new_status": "Healthy"
252349
},
350+
"timestamp": "2024-01-01T00:00:00Z"
253351
})))
254352
.create();
255353

256354
let plugin = WebhookPlugin::new(WebhookConfig {
257-
url: server.url(),
355+
url: format!("{}/webhook", server.url()),
258356
bearer_token: None,
259357
});
260-
let node = create_test_node(NodeStatus::Dead);
261-
let result = plugin
262-
.handle_status_change(&node, &NodeStatus::Healthy)
263-
.await;
358+
let node = create_test_node(NodeStatus::Healthy);
359+
let result = plugin.handle_status_change(&node, &NodeStatus::Dead).await;
264360
assert!(result.is_ok());
265361
Ok(())
266362
}
@@ -277,12 +373,13 @@ mod tests {
277373
"group_id": "1234567890",
278374
"configuration_name": "test_configuration",
279375
"nodes": ["0x1234567890123456789012345678901234567890"]
280-
}
376+
},
377+
"timestamp": "2024-01-01T00:00:00Z"
281378
})))
282379
.create();
283380

284381
let plugin = WebhookPlugin::new(WebhookConfig {
285-
url: server.url(),
382+
url: format!("{}/webhook", server.url()),
286383
bearer_token: None,
287384
});
288385
let group_id = "1234567890";
@@ -356,4 +453,62 @@ mod tests {
356453
mock.assert_async().await;
357454
Ok(())
358455
}
456+
457+
#[tokio::test]
458+
async fn test_webhook_retry_logic() -> Result<()> {
459+
let mut server = Server::new_async().await;
460+
461+
// First two attempts fail, third succeeds
462+
let _mock1 = server
463+
.mock("POST", "/webhook")
464+
.with_status(500)
465+
.expect(1)
466+
.create();
467+
468+
let _mock2 = server
469+
.mock("POST", "/webhook")
470+
.with_status(502)
471+
.expect(1)
472+
.create();
473+
474+
let _mock3 = server
475+
.mock("POST", "/webhook")
476+
.with_status(200)
477+
.expect(1)
478+
.create();
479+
480+
let plugin = WebhookPlugin::new(WebhookConfig {
481+
url: format!("{}/webhook", server.url()),
482+
bearer_token: None,
483+
});
484+
485+
let mut metrics = std::collections::HashMap::new();
486+
metrics.insert("test_metric".to_string(), 1.0);
487+
let result = plugin.send_metrics_updated(1, metrics).await;
488+
assert!(result.is_ok());
489+
Ok(())
490+
}
491+
492+
#[tokio::test]
493+
async fn test_webhook_max_retries_exceeded() -> Result<()> {
494+
let mut server = Server::new_async().await;
495+
496+
// All attempts fail
497+
let _mock = server
498+
.mock("POST", "/webhook")
499+
.with_status(500)
500+
.expect(6) // 1 initial + 5 retries
501+
.create();
502+
503+
let plugin = WebhookPlugin::new(WebhookConfig {
504+
url: format!("{}/webhook", server.url()),
505+
bearer_token: None,
506+
});
507+
508+
let mut metrics = std::collections::HashMap::new();
509+
metrics.insert("test_metric".to_string(), 1.0);
510+
let result = plugin.send_metrics_updated(1, metrics).await;
511+
assert!(result.is_err());
512+
Ok(())
513+
}
359514
}

0 commit comments

Comments
 (0)