diff --git a/Cargo.lock b/Cargo.lock index 377bfb8d..9035713b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,6 +59,16 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -370,6 +380,24 @@ dependencies = [ "typenum", ] +[[package]] +name = "deadpool" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b" +dependencies = [ + "deadpool-runtime", + "lazy_static", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "deranged" version = "0.5.5" @@ -512,6 +540,21 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -519,6 +562,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -527,6 +571,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -562,6 +617,7 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -717,6 +773,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "hyper" version = "1.8.1" @@ -731,6 +793,7 @@ dependencies = [ "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -2219,6 +2282,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "wiremock", ] [[package]] @@ -3186,6 +3250,29 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" +[[package]] +name = "wiremock" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08db1edfb05d9b3c1542e521aea074442088292f00b5f28e435c714a98f85031" +dependencies = [ + "assert-json-diff", + "base64 0.22.1", + "deadpool", + "futures", + "http", + "http-body-util", + "hyper", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" diff --git a/openfeature-provider/rust/Cargo.toml b/openfeature-provider/rust/Cargo.toml index 607c1357..5c11e689 100644 --- a/openfeature-provider/rust/Cargo.toml +++ b/openfeature-provider/rust/Cargo.toml @@ -63,6 +63,7 @@ prost-build = "0.12" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } num_cpus = "1.16" +wiremock = "0.6" [[example]] name = "demo" diff --git a/openfeature-provider/rust/README.md b/openfeature-provider/rust/README.md index febeb40c..da6e97f5 100644 --- a/openfeature-provider/rust/README.md +++ b/openfeature-provider/rust/README.md @@ -9,7 +9,7 @@ A high-performance OpenFeature provider for [Confidence](https://confidence.spot - **Local Resolution**: Evaluates feature flags locally using the native Rust resolver - **Low Latency**: No network calls during flag evaluation - **Automatic Sync**: Periodically syncs flag configurations from Confidence -- **Exposure Logging**: Fully supported exposure logging and resolve analytics +- **Exposure Logging**: Fully supported exposure logging and resolve analytics with automatic retry on transient failures - **OpenFeature Compatible**: Works with the standard OpenFeature Rust SDK - **Async/Await**: Built on Tokio for efficient async operations @@ -220,10 +220,12 @@ fn main() { ``` The provider logs at different levels: -- `DEBUG`: Flag resolution details, state updates +- `DEBUG`: Flag resolution details, state updates, individual retry attempts - `INFO`: Provider initialization, configuration -- `WARN`: Non-critical issues, fallbacks -- `ERROR`: Failures, network errors +- `WARN`: Non-critical issues, fallbacks, log delivery failures after retries exhausted +- `ERROR`: Failures, non-retryable HTTP errors + +Flag log delivery retries up to 3 times on transient failures (5xx, 408, 429) with exponential backoff (500ms base, 2x multiplier, ±10% jitter). Server `Retry-After` headers are respected when present. ## Shutdown diff --git a/openfeature-provider/rust/src/logger.rs b/openfeature-provider/rust/src/logger.rs index 18615d87..9005866f 100644 --- a/openfeature-provider/rust/src/logger.rs +++ b/openfeature-provider/rust/src/logger.rs @@ -1,6 +1,10 @@ //! Log management for sending flag logs to the Confidence API. +use std::time::Duration; + use prost::Message; +use rand::Rng; +use reqwest::StatusCode; use reqwest_middleware::ClientWithMiddleware; use confidence_resolver::assign_logger::AssignLogger; @@ -16,10 +20,43 @@ const FLAG_LOGS_URL: &str = "https://resolver.confidence.dev/v1/clientFlagLogs:w /// Target size for log batches (4 MB). const LOG_TARGET_BYTES: usize = 4 * 1024 * 1024; +/// Maximum number of send attempts before giving up. +const MAX_ATTEMPTS: u32 = 3; + +/// Initial delay between retry attempts. +const RETRY_BASE_DELAY: Duration = Duration::from_millis(500); + +/// Multiplier applied to the delay after each failed attempt. +const RETRY_BACKOFF_MULTIPLIER: u32 = 2; + +/// Jitter factor applied to retry delays (±10%). +const RETRY_JITTER: f64 = 0.1; + +fn is_retryable_status(status: StatusCode) -> bool { + status.is_server_error() + || status == StatusCode::REQUEST_TIMEOUT + || status == StatusCode::TOO_MANY_REQUESTS +} + +fn apply_jitter(delay: Duration) -> Duration { + let mut rng = rand::rng(); + let factor = 1.0 + rng.random_range(-RETRY_JITTER..RETRY_JITTER); + delay.mul_f64(factor) +} + +fn parse_retry_after(header: Option<&str>) -> Option { + let value = header?.trim(); + if let Ok(seconds) = value.parse::() { + return Some(Duration::from_secs(seconds)); + } + None +} + /// Log sender that sends flag logs to the Confidence API. pub struct LogSender { client: ClientWithMiddleware, client_secret: String, + url: String, } impl LogSender { @@ -28,32 +65,89 @@ impl LogSender { Self { client, client_secret, + url: FLAG_LOGS_URL.to_string(), } } - /// Send encoded flag logs to the API. + /// Send encoded flag logs to the API, retrying on transient failures. pub async fn send(&self, logs: &[u8]) -> Result<()> { if logs.is_empty() { return Ok(()); } - let response = self - .client - .post(FLAG_LOGS_URL) - .header("Content-Type", "application/x-protobuf") - .header( - "Authorization", - format!("ClientSecret {}", self.client_secret), - ) - .body(logs.to_vec()) - .send() - .await?; - - if !response.status().is_success() { - let status = response.status(); - let body = response.text().await.unwrap_or_default(); - tracing::error!("Failed to send flag logs: {} - {}", status, body); - // Don't return error for log sending failures to avoid disrupting flag evaluation + let mut delay = RETRY_BASE_DELAY; + + for attempt in 1..=MAX_ATTEMPTS { + let result = self + .client + .post(&self.url) + .header("Content-Type", "application/x-protobuf") + .header( + "Authorization", + format!("ClientSecret {}", self.client_secret), + ) + .body(logs.to_vec()) + .send() + .await; + + match result { + Ok(response) if response.status().is_success() => return Ok(()), + Ok(response) if is_retryable_status(response.status()) => { + let status = response.status(); + if attempt < MAX_ATTEMPTS { + let server_delay = parse_retry_after( + response + .headers() + .get("retry-after") + .and_then(|v| v.to_str().ok()), + ); + let sleep_dur = server_delay.unwrap_or_else(|| apply_jitter(delay)); + tracing::debug!( + "Flag log send attempt {}/{} failed with {}, retrying in {:?}", + attempt, + MAX_ATTEMPTS, + status, + sleep_dur + ); + tokio::time::sleep(sleep_dur).await; + delay *= RETRY_BACKOFF_MULTIPLIER; + } else { + let body = response.text().await.unwrap_or_default(); + tracing::warn!( + "Failed to send flag logs after {} attempts: {} - {}", + MAX_ATTEMPTS, + status, + body + ); + } + } + Ok(response) => { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + tracing::error!("Failed to send flag logs: {} - {}", status, body); + return Ok(()); + } + Err(e) => { + if attempt < MAX_ATTEMPTS { + let sleep_dur = apply_jitter(delay); + tracing::debug!( + "Flag log send attempt {}/{} failed with {}, retrying in {:?}", + attempt, + MAX_ATTEMPTS, + e, + sleep_dur + ); + tokio::time::sleep(sleep_dur).await; + delay *= RETRY_BACKOFF_MULTIPLIER; + } else { + tracing::warn!( + "Failed to send flag logs after {} attempts: {}", + MAX_ATTEMPTS, + e + ); + } + } + } } Ok(()) @@ -116,3 +210,180 @@ fn has_logs(request: &WriteFlagLogsRequest) -> bool { || !request.flag_resolve_info.is_empty() || request.telemetry_data.is_some() } + +#[cfg(test)] +mod tests { + use super::*; + use reqwest::Client; + use reqwest_middleware::ClientBuilder; + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + fn test_sender(url: &str) -> LogSender { + let client = ClientBuilder::new(Client::new()).build(); + LogSender { + client, + client_secret: "test-secret".to_string(), + url: url.to_string(), + } + } + + #[tokio::test] + async fn send_empty_logs_is_noop() { + let server = MockServer::start().await; + let sender = test_sender(&format!("{}/v1/clientFlagLogs:write", server.uri())); + sender.send(&[]).await.unwrap(); + assert_eq!(server.received_requests().await.unwrap().len(), 0); + } + + #[tokio::test] + async fn send_succeeds_on_first_attempt() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/clientFlagLogs:write")) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&server) + .await; + + let sender = test_sender(&format!("{}/v1/clientFlagLogs:write", server.uri())); + sender.send(b"test-payload").await.unwrap(); + } + + #[tokio::test] + async fn retries_on_503_up_to_max_attempts() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/clientFlagLogs:write")) + .respond_with(ResponseTemplate::new(503)) + .expect(3) + .mount(&server) + .await; + + let sender = test_sender(&format!("{}/v1/clientFlagLogs:write", server.uri())); + sender.send(b"test-payload").await.unwrap(); + } + + #[tokio::test] + async fn retries_on_429() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/clientFlagLogs:write")) + .respond_with(ResponseTemplate::new(429)) + .expect(3) + .mount(&server) + .await; + + let sender = test_sender(&format!("{}/v1/clientFlagLogs:write", server.uri())); + sender.send(b"test-payload").await.unwrap(); + } + + #[tokio::test] + async fn retries_on_408() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/clientFlagLogs:write")) + .respond_with(ResponseTemplate::new(408)) + .expect(3) + .mount(&server) + .await; + + let sender = test_sender(&format!("{}/v1/clientFlagLogs:write", server.uri())); + sender.send(b"test-payload").await.unwrap(); + } + + #[tokio::test] + async fn does_not_retry_on_400() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/clientFlagLogs:write")) + .respond_with(ResponseTemplate::new(400)) + .expect(1) + .mount(&server) + .await; + + let sender = test_sender(&format!("{}/v1/clientFlagLogs:write", server.uri())); + sender.send(b"test-payload").await.unwrap(); + } + + #[tokio::test] + async fn does_not_retry_on_403() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/clientFlagLogs:write")) + .respond_with(ResponseTemplate::new(403)) + .expect(1) + .mount(&server) + .await; + + let sender = test_sender(&format!("{}/v1/clientFlagLogs:write", server.uri())); + sender.send(b"test-payload").await.unwrap(); + } + + #[tokio::test] + async fn respects_retry_after_header() { + let server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/v1/clientFlagLogs:write")) + .respond_with(ResponseTemplate::new(429).insert_header("retry-after", "1")) + .up_to_n_times(2) + .mount(&server) + .await; + + Mock::given(method("POST")) + .and(path("/v1/clientFlagLogs:write")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let sender = test_sender(&format!("{}/v1/clientFlagLogs:write", server.uri())); + let start = tokio::time::Instant::now(); + sender.send(b"test-payload").await.unwrap(); + let elapsed = start.elapsed(); + + assert_eq!(server.received_requests().await.unwrap().len(), 3); + // 2 retries with Retry-After: 1 second each + assert!(elapsed >= Duration::from_secs(2)); + } + + #[test] + fn parse_retry_after_seconds() { + assert_eq!(parse_retry_after(Some("5")), Some(Duration::from_secs(5))); + assert_eq!(parse_retry_after(Some("0")), Some(Duration::from_secs(0))); + assert_eq!(parse_retry_after(Some(" 3 ")), Some(Duration::from_secs(3))); + } + + #[test] + fn parse_retry_after_invalid() { + assert_eq!(parse_retry_after(None), None); + assert_eq!(parse_retry_after(Some("not-a-number")), None); + assert_eq!(parse_retry_after(Some("")), None); + } + + #[test] + fn jitter_stays_within_bounds() { + let base = Duration::from_millis(500); + for _ in 0..100 { + let jittered = apply_jitter(base); + assert!(jittered >= Duration::from_millis(450)); + assert!(jittered <= Duration::from_millis(550)); + } + } + + #[test] + fn retryable_status_codes() { + assert!(is_retryable_status(StatusCode::INTERNAL_SERVER_ERROR)); + assert!(is_retryable_status(StatusCode::BAD_GATEWAY)); + assert!(is_retryable_status(StatusCode::SERVICE_UNAVAILABLE)); + assert!(is_retryable_status(StatusCode::GATEWAY_TIMEOUT)); + assert!(is_retryable_status(StatusCode::REQUEST_TIMEOUT)); + assert!(is_retryable_status(StatusCode::TOO_MANY_REQUESTS)); + + assert!(!is_retryable_status(StatusCode::OK)); + assert!(!is_retryable_status(StatusCode::BAD_REQUEST)); + assert!(!is_retryable_status(StatusCode::UNAUTHORIZED)); + assert!(!is_retryable_status(StatusCode::FORBIDDEN)); + assert!(!is_retryable_status(StatusCode::NOT_FOUND)); + } +}