diff --git a/changelog.d/vector_sink_multiple_backends.feature.md b/changelog.d/vector_sink_multiple_backends.feature.md new file mode 100644 index 0000000000000..242b9d22bdf6a --- /dev/null +++ b/changelog.d/vector_sink_multiple_backends.feature.md @@ -0,0 +1,3 @@ +Add support for configuring multiple endpoints in the `vector` sink via the new `addresses` option, enabling built-in `load_balance`, `failover`, and `failover_primary` endpoint strategies across downstream Vector instances. + +authors: fpytloun diff --git a/src/sinks/util/service/health.rs b/src/sinks/util/service/health.rs index b764ebcad64b5..f87b73f90d9d4 100644 --- a/src/sinks/util/service/health.rs +++ b/src/sinks/util/service/health.rs @@ -29,7 +29,7 @@ const UNHEALTHY_AMOUNT_OF_ERRORS: usize = 5; /// Options for determining the health of an endpoint. #[serde_as] #[configurable_component] -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] #[serde(rename_all = "snake_case")] pub struct HealthConfig { /// Initial delay between attempts to reactivate endpoints once they become unhealthy. @@ -46,6 +46,15 @@ pub struct HealthConfig { pub retry_max_duration_secs: Duration, } +impl Default for HealthConfig { + fn default() -> Self { + Self { + retry_initial_backoff_secs: default_retry_initial_backoff_secs(), + retry_max_duration_secs: default_retry_max_duration_secs(), + } + } +} + const fn default_retry_initial_backoff_secs() -> u64 { RETRY_INITIAL_BACKOFF_SECONDS_DEFAULT } @@ -329,4 +338,12 @@ mod tests { counters.inc_healthy(); assert!(counters.healthy(snapshot).is_ok()); } + + #[test] + fn default_health_config_matches_documented_defaults() { + let config = HealthConfig::default(); + + assert_eq!(config.retry_initial_backoff_secs, 1); + assert_eq!(config.retry_max_duration_secs, Duration::from_secs(3_600)); + } } diff --git a/src/sinks/vector/config.rs b/src/sinks/vector/config.rs index ae2847073a649..1b6bbe5c6880c 100644 --- a/src/sinks/vector/config.rs +++ b/src/sinks/vector/config.rs @@ -1,9 +1,18 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + task::{Context, Poll}, +}; + +use futures::{FutureExt, TryFutureExt, future::BoxFuture}; use http::Uri; use hyper::client::HttpConnector; use hyper_openssl::HttpsConnector; use hyper_proxy::ProxyConnector; use tonic::body::BoxBody; -use tower::ServiceBuilder; +use tower::{Service, ServiceBuilder}; use vector_lib::configurable::configurable_component; use super::{ @@ -22,8 +31,9 @@ use crate::{ sinks::{ Healthcheck, VectorSink as VectorSinkType, util::{ - BatchConfig, RealtimeEventBasedDefaultBatchSettings, ServiceBuilderExt, - TowerRequestConfig, retries::RetryLogic, + BatchConfig, RealtimeEventBasedDefaultBatchSettings, TowerRequestConfig, + retries::RetryLogic, + service::{HealthConfig, HealthLogic, ServiceBuilderExt}, }, }, tls::{MaybeTlsSettings, TlsEnableableConfig}, @@ -45,10 +55,28 @@ pub struct VectorConfig { /// Both IP address and hostname are accepted formats. /// /// The address _must_ include a port. + /// + /// This option is mutually exclusive with `addresses`. Set exactly one of + /// `address` or `addresses`. + #[configurable(validation(format = "uri"))] + #[configurable(metadata(docs::examples = "92.12.333.224:6000"))] + #[configurable(metadata(docs::examples = "https://somehost:6000"))] + #[serde(default)] + address: Option, + + /// The downstream Vector addresses to which to connect. + /// + /// Both IP addresses and hostnames are accepted formats. + /// + /// Each address _must_ include a port. + /// + /// This option is mutually exclusive with `address`. Set exactly one of + /// `address` or `addresses`. #[configurable(validation(format = "uri"))] #[configurable(metadata(docs::examples = "92.12.333.224:6000"))] #[configurable(metadata(docs::examples = "https://somehost:6000"))] - address: String, + #[serde(default)] + addresses: Vec, /// Compression algorithm for requests. /// @@ -72,6 +100,17 @@ pub struct VectorConfig { #[serde(default)] pub request: TowerRequestConfig, + /// Options for determining the health of Vector endpoints. + #[serde(default)] + #[configurable(derived)] + pub endpoint_health: Option, + + /// Strategy for routing requests across multiple configured addresses. + /// + /// This option is only used when `addresses` is configured. + #[serde(default)] + pub endpoint_strategy: EndpointStrategy, + #[configurable(derived)] #[serde(default)] tls: Option, @@ -102,10 +141,13 @@ impl GenerateConfig for VectorConfig { fn default_config(address: &str) -> VectorConfig { VectorConfig { version: None, - address: address.to_owned(), + address: Some(address.to_owned()), + addresses: Vec::new(), compression: VectorCompression::None, batch: BatchConfig::default(), request: TowerRequestConfig::default(), + endpoint_health: None, + endpoint_strategy: EndpointStrategy::default(), tls: None, acknowledgements: Default::default(), } @@ -116,36 +158,85 @@ fn default_config(address: &str) -> VectorConfig { impl SinkConfig for VectorConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSinkType, Healthcheck)> { let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?; - let uri = with_default_scheme(&self.address, tls.is_tls())?; + let uris = self.uris(tls.is_tls())?; let client = new_client(&tls, cx.proxy())?; - let healthcheck_uri = cx - .healthcheck - .uri - .clone() - .map(|uri| uri.uri) - .unwrap_or_else(|| uri.clone()); - let healthcheck_client = - VectorService::new(client.clone(), healthcheck_uri, VectorCompression::None); - let healthcheck = healthcheck(healthcheck_client, cx.healthcheck); - let service = VectorService::new(client, uri, self.compression); + let healthcheck = healthchecks(client.clone(), &uris, cx.healthcheck); let request_settings = self.request.into_settings(); let batch_settings = self.batch.into_batcher_settings()?; - let service = ServiceBuilder::new() - .settings(request_settings, VectorGrpcRetryLogic) - .service(service); + let services = uris + .into_iter() + .map(|uri| { + let endpoint = uri.to_string(); + let service = VectorService::new(client.clone(), uri, self.compression); + (endpoint, service) + }) + .collect::>(); + + let sink = match self.endpoint_strategy { + _ if services.len() == 1 => { + let service = ServiceBuilder::new() + .settings(request_settings, VectorGrpcRetryLogic) + .service(services.into_iter().next().expect("one service").1); + + VectorSinkType::from_event_streamsink(VectorSink { + batch_settings, + service, + }) + } + EndpointStrategy::LoadBalance => { + let service = request_settings.distributed_service( + VectorGrpcRetryLogic, + services, + self.endpoint_health.clone().unwrap_or_default(), + VectorGrpcHealthLogic, + 1, + ); + + VectorSinkType::from_event_streamsink(VectorSink { + batch_settings, + service, + }) + } + EndpointStrategy::Failover | EndpointStrategy::FailoverPrimary => { + let endpoint_timeout = request_settings.timeout; + let max_endpoint_attempts = match self.endpoint_strategy { + EndpointStrategy::Failover => services.len(), + EndpointStrategy::FailoverPrimary => services.len() + 1, + EndpointStrategy::LoadBalance => { + unreachable!("load balancing uses a different service") + } + }; + let mut failover_request_settings = request_settings; + // The outer Tower timeout wraps the whole failover loop. Add one + // endpoint timeout of slack so the final endpoint attempt is not + // aborted by scheduling overhead after earlier attempts consume + // their per-endpoint timeouts. + failover_request_settings.timeout = endpoint_timeout + .checked_mul((max_endpoint_attempts + 1) as u32) + .unwrap_or(endpoint_timeout); + + let service = ServiceBuilder::new() + .settings(failover_request_settings, VectorGrpcRetryLogic) + .service(FailoverVectorService::new( + services + .into_iter() + .map(|(_endpoint, service)| service) + .collect(), + endpoint_timeout, + self.endpoint_strategy, + )); - let sink = VectorSink { - batch_settings, - service, + VectorSinkType::from_event_streamsink(VectorSink { + batch_settings, + service, + }) + } }; - Ok(( - VectorSinkType::from_event_streamsink(sink), - Box::pin(healthcheck), - )) + Ok((sink, Box::pin(healthcheck))) } fn input(&self) -> Input { @@ -157,6 +248,271 @@ impl SinkConfig for VectorConfig { } } +/// Strategy for routing requests across multiple Vector endpoints. +#[configurable_component] +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum EndpointStrategy { + /// Distribute requests across healthy endpoints. + #[default] + LoadBalance, + /// Use one endpoint at a time. When the active endpoint fails, continue + /// through the configured addresses from the next endpoint. + /// + /// This mode keeps using the last successful endpoint until it fails. Use + /// `failover_primary` instead when retriable failures should re-check the + /// first configured address before trying secondary endpoints. + Failover, + /// Use one endpoint at a time. When the active endpoint fails, retry from + /// the configured address order so the sink can return to its configured + /// primary endpoint. + /// + /// This is useful when receiver-side connection recycling, such as + /// `max_connection_age_secs`, should converge the sink back to the first + /// configured address when it is available. + FailoverPrimary, +} + +#[derive(Clone)] +struct FailoverVectorService { + services: Vec, + state: Arc, + endpoint_timeout: std::time::Duration, + endpoint_strategy: EndpointStrategy, +} + +impl FailoverVectorService { + fn new( + services: Vec, + endpoint_timeout: std::time::Duration, + endpoint_strategy: EndpointStrategy, + ) -> Self { + Self { + services, + state: Arc::new(AtomicUsize::new(0)), + endpoint_timeout, + endpoint_strategy, + } + } +} + +impl Service for FailoverVectorService { + type Response = VectorResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: VectorRequest) -> Self::Future { + let services = self.services.clone(); + let state = Arc::clone(&self.state); + let endpoint_timeout = self.endpoint_timeout; + let endpoint_strategy = self.endpoint_strategy; + + Box::pin(async move { + let mut expected_state = state.load(Ordering::Acquire); + let start = failover_state_index(expected_state, services.len()); + let mut last_error = None; + let mut attempts = failover_attempt_indices(endpoint_strategy, start, services.len()); + let mut attempt = 0; + let mut remaining_attempts = attempts.len(); + let mut tried = Vec::new(); + + while remaining_attempts > 0 && attempt < attempts.len() { + let index = attempts[attempt]; + let mut service = services[index].clone(); + tried.push(index); + remaining_attempts -= 1; + + match tokio::time::timeout(endpoint_timeout, service.call(request.clone())).await { + Ok(Ok(response)) => { + return Ok(response); + } + Ok(Err(error)) => { + if !is_retriable_vector_error(&error) { + return Err(error); + } + + let advance = failover_advance_if_current( + &state, + expected_state, + index, + attempts.get(attempt + 1).copied(), + services.len(), + ); + expected_state = failover_next_attempts( + endpoint_strategy, + services.len(), + attempts.as_mut(), + &mut attempt, + expected_state, + advance, + &tried, + ); + last_error = Some(error); + } + Err(_elapsed) => { + let advance = failover_advance_if_current( + &state, + expected_state, + index, + attempts.get(attempt + 1).copied(), + services.len(), + ); + expected_state = failover_next_attempts( + endpoint_strategy, + services.len(), + attempts.as_mut(), + &mut attempt, + expected_state, + advance, + &tried, + ); + last_error = Some(Box::new(VectorSinkError::Request { + source: tonic::Status::deadline_exceeded( + "vector endpoint request timed out", + ), + }) as crate::Error); + } + } + } + + Err(last_error.expect("failover service should have at least one endpoint")) + }) + } +} + +fn failover_attempt_indices( + endpoint_strategy: EndpointStrategy, + start: usize, + endpoints: usize, +) -> Vec { + match endpoint_strategy { + EndpointStrategy::Failover => failover_ring_attempt_indices(start, endpoints), + EndpointStrategy::FailoverPrimary => failover_primary_attempt_indices(start, endpoints), + EndpointStrategy::LoadBalance => unreachable!("load balancing uses a different service"), + } +} + +const fn failover_state_index(state: usize, endpoints: usize) -> usize { + state % endpoints +} + +const fn failover_next_state(state: usize, next_index: usize, endpoints: usize) -> usize { + let generation = state / endpoints; + (generation + 1) * endpoints + next_index +} + +fn failover_primary_attempt_indices(start: usize, endpoints: usize) -> Vec { + std::iter::once(start).chain(0..endpoints).collect() +} + +fn failover_ring_attempt_indices(start: usize, endpoints: usize) -> Vec { + (0..endpoints) + .map(|offset| (start + offset) % endpoints) + .collect() +} + +#[derive(Debug, Eq, PartialEq)] +struct FailoverAdvance { + state: usize, + advanced: bool, +} + +fn failover_next_attempts( + endpoint_strategy: EndpointStrategy, + endpoints: usize, + attempts: &mut Vec, + attempt: &mut usize, + expected_state: usize, + advance: FailoverAdvance, + tried: &[usize], +) -> usize { + if advance.advanced + || advance.state == expected_state + || failover_state_index(advance.state, endpoints) + == failover_state_index(expected_state, endpoints) + { + *attempt += 1; + } else { + *attempts = failover_attempt_indices( + endpoint_strategy, + failover_state_index(advance.state, endpoints), + endpoints, + ) + .into_iter() + .filter(|index| !tried.contains(index)) + .collect(); + *attempt = 0; + } + + advance.state +} + +fn failover_advance_if_current( + state: &AtomicUsize, + expected_state: usize, + index: usize, + next_index: Option, + endpoints: usize, +) -> FailoverAdvance { + let Some(next_index) = next_index else { + return FailoverAdvance { + state: state.load(Ordering::Acquire), + advanced: false, + }; + }; + + if failover_state_index(expected_state, endpoints) != index { + return FailoverAdvance { + state: state.load(Ordering::Acquire), + advanced: false, + }; + } + + let next_state = failover_next_state(expected_state, next_index, endpoints); + match state.compare_exchange( + expected_state, + next_state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => FailoverAdvance { + state: next_state, + advanced: true, + }, + Err(actual) => FailoverAdvance { + state: actual, + advanced: false, + }, + } +} + +fn is_retriable_vector_error(error: &crate::Error) -> bool { + error + .downcast_ref::() + .is_none_or(|error| VectorGrpcRetryLogic.is_retriable_error(error)) +} + +impl VectorConfig { + fn uris(&self, tls: bool) -> crate::Result> { + match (self.address.as_ref(), self.addresses.as_slice()) { + (Some(_), [_first, ..]) => Err( + "`address` and `addresses` options are mutually exclusive. Please use `addresses` for multiple Vector endpoints." + .into(), + ), + (None, []) => Err("No Vector endpoint configured. Please set `address` or `addresses`.".into()), + (Some(address), []) => Ok(vec![with_default_scheme(address, tls)?]), + (None, addresses) => addresses + .iter() + .map(|address| with_default_scheme(address, tls)) + .collect(), + } + } +} + /// Check to see if the remote service accepts new events. async fn healthcheck( mut service: VectorService, @@ -183,6 +539,39 @@ async fn healthcheck( } } +fn healthchecks( + client: hyper::Client>, BoxBody>, + uris: &[Uri], + options: SinkHealthcheckOptions, +) -> Healthcheck { + if !options.enabled { + return Box::pin(futures::future::ok(())); + } + + let healthcheck_uris = options + .uri + .clone() + .map(|uri| vec![uri.uri]) + .unwrap_or_else(|| uris.to_vec()); + + Box::pin( + futures::future::select_ok(healthcheck_uris.into_iter().map(move |uri| { + let service = VectorService::new(client.clone(), uri, VectorCompression::None); + let timeout = options.timeout; + healthcheck( + service, + SinkHealthcheckOptions { + enabled: true, + uri: None, + timeout, + }, + ) + .boxed() + })) + .map_ok(|((), _)| ()), + ) +} + /// grpc doesn't like an address without a scheme, so we default to http or https if one isn't /// specified in the address. pub fn with_default_scheme(address: &str, tls: bool) -> crate::Result { @@ -256,3 +645,194 @@ impl RetryLogic for VectorGrpcRetryLogic { } } } + +#[derive(Debug, Clone)] +struct VectorGrpcHealthLogic; + +impl HealthLogic for VectorGrpcHealthLogic { + type Error = crate::Error; + type Response = VectorResponse; + + fn is_healthy(&self, response: &Result) -> Option { + match response { + Ok(_) => Some(true), + Err(error) if is_retriable_vector_error(error) => Some(false), + Err(_) => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn health_logic_ignores_non_retriable_vector_errors() { + let response = Err(Box::new(VectorSinkError::Request { + source: tonic::Status::data_loss("batch rejected"), + }) as crate::Error); + + assert_eq!(VectorGrpcHealthLogic.is_healthy(&response), None); + } + + #[test] + fn health_logic_marks_retriable_vector_errors_unhealthy() { + let response = Err(Box::new(VectorSinkError::Request { + source: tonic::Status::unavailable("endpoint unavailable"), + }) as crate::Error); + + assert_eq!(VectorGrpcHealthLogic.is_healthy(&response), Some(false)); + } + + #[test] + fn failover_advance_ignores_stale_generation() { + let endpoints = 2; + let state = AtomicUsize::new(failover_next_state( + failover_next_state(0, 1, endpoints), + 0, + endpoints, + )); + + let observed = failover_advance_if_current(&state, 0, 0, Some(1), endpoints); + + assert_eq!( + observed, + FailoverAdvance { + state: 4, + advanced: false, + } + ); + assert_eq!(state.load(Ordering::Acquire), 4); + } + + #[test] + fn failover_advance_ignores_stale_mismatched_state() { + let endpoints = 3; + let shared_state = failover_next_state(failover_next_state(0, 1, endpoints), 0, endpoints); + let stale_state = 1; + let state = AtomicUsize::new(shared_state); + + let observed = failover_advance_if_current(&state, stale_state, 0, Some(1), endpoints); + + assert_eq!( + observed, + FailoverAdvance { + state: shared_state, + advanced: false, + } + ); + assert_eq!(state.load(Ordering::Acquire), shared_state); + } + + #[test] + fn failover_primary_attempts_current_then_configured_order() { + assert_eq!(failover_primary_attempt_indices(1, 3), vec![1, 0, 1, 2]); + } + + #[test] + fn failover_attempts_current_then_ring_order() { + assert_eq!(failover_ring_attempt_indices(1, 3), vec![1, 2, 0]); + } + + #[test] + fn failover_advance_ignores_current_non_matching_endpoint() { + let endpoints = 3; + let state = AtomicUsize::new(5); + + let observed = failover_advance_if_current(&state, 0, 0, Some(1), endpoints); + + assert_eq!( + observed, + FailoverAdvance { + state: 5, + advanced: false, + } + ); + assert_eq!(state.load(Ordering::Acquire), 5); + } + + #[test] + fn failover_advance_ignores_missing_next_endpoint() { + let state = AtomicUsize::new(0); + + let observed = failover_advance_if_current(&state, 0, 0, None, 2); + + assert_eq!( + observed, + FailoverAdvance { + state: 0, + advanced: false, + } + ); + assert_eq!(state.load(Ordering::Acquire), 0); + } + + #[test] + fn failover_next_attempts_recomputes_after_concurrent_advance() { + let mut attempts = failover_ring_attempt_indices(0, 3); + let mut attempt = 0; + + let observed_state = failover_next_attempts( + EndpointStrategy::Failover, + 3, + &mut attempts, + &mut attempt, + 0, + FailoverAdvance { + state: 5, + advanced: false, + }, + &[0], + ); + + assert_eq!(observed_state, 5); + assert_eq!(attempt, 0); + assert_eq!(attempts, vec![2, 1]); + } + + #[test] + fn failover_next_attempts_continues_after_stale_same_endpoint_generation() { + let mut attempts = failover_ring_attempt_indices(0, 2); + let mut attempt = 0; + + let observed_state = failover_next_attempts( + EndpointStrategy::Failover, + 2, + &mut attempts, + &mut attempt, + 0, + FailoverAdvance { + state: 4, + advanced: false, + }, + &[0], + ); + + assert_eq!(observed_state, 4); + assert_eq!(attempt, 1); + assert_eq!(attempts, vec![0, 1]); + } + + #[test] + fn failover_next_attempts_continues_after_local_advance() { + let mut attempts = failover_primary_attempt_indices(1, 3); + let mut attempt = 0; + + let observed_state = failover_next_attempts( + EndpointStrategy::FailoverPrimary, + 3, + &mut attempts, + &mut attempt, + 1, + FailoverAdvance { + state: 3, + advanced: true, + }, + &[1], + ); + + assert_eq!(observed_state, 3); + assert_eq!(attempt, 1); + assert_eq!(attempts, vec![1, 0, 1, 2]); + } +} diff --git a/src/sinks/vector/mod.rs b/src/sinks/vector/mod.rs index c3abd90d6b629..5b188768d1a8d 100644 --- a/src/sinks/vector/mod.rs +++ b/src/sinks/vector/mod.rs @@ -43,8 +43,15 @@ mod tests { use bytes::{BufMut, Bytes, BytesMut}; use futures::{StreamExt, channel::mpsc}; use http::request::Parts; - use hyper::Method; + use hyper::{ + Method, Response, Server, + service::{make_service_fn, service_fn}, + }; use prost::Message; + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; use vector_lib::{ config::{Tags, Telemetry, init_telemetry}, event::{BatchNotifier, BatchStatus}, @@ -74,6 +81,79 @@ mod tests { crate::test_util::test_generate_config::(); } + #[tokio::test] + async fn build_rejects_missing_address() { + let config: VectorConfig = toml::from_str("").unwrap(); + + let err = match config.build(SinkContext::default()).await { + Ok(_) => panic!("missing address should fail"), + Err(err) => err, + }; + + assert!( + err.to_string() + .contains("No Vector endpoint configured. Please set `address` or `addresses`."), + "{err}" + ); + } + + #[tokio::test] + async fn build_rejects_address_and_addresses() { + let config: VectorConfig = toml::from_str( + r#" + address = "http://127.0.0.1:6000" + addresses = ["http://127.0.0.1:6001"] + "#, + ) + .unwrap(); + + let err = match config.build(SinkContext::default()).await { + Ok(_) => panic!("address and addresses should be mutually exclusive"), + Err(err) => err, + }; + + assert!( + err.to_string() + .contains("`address` and `addresses` options are mutually exclusive"), + "{err}" + ); + } + + #[test] + fn parse_addresses_config() { + let config: Result = toml::from_str( + r#" + addresses = ["http://127.0.0.1:6000", "http://127.0.0.1:6001"] + "#, + ); + + assert!(config.is_ok()); + } + + #[test] + fn parse_failover_endpoint_strategy() { + let config: Result = toml::from_str( + r#" + addresses = ["http://127.0.0.1:6000", "http://127.0.0.1:6001"] + endpoint_strategy = "failover" + "#, + ); + + assert!(config.is_ok()); + } + + #[test] + fn parse_failover_primary_endpoint_strategy() { + let config: Result = toml::from_str( + r#" + addresses = ["http://127.0.0.1:6000", "http://127.0.0.1:6001"] + endpoint_strategy = "failover_primary" + "#, + ); + + assert!(config.is_ok()); + } + enum TestType { Normal, DataVolume, @@ -163,6 +243,562 @@ mod tests { run_sink_test(TestType::Normal).await; } + #[tokio::test] + async fn deliver_message_to_multiple_addresses() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let cx = SinkContext::default(); + + let (sink, _) = config.build(cx).await.unwrap(); + let (rx1, trigger1, server1) = build_test_server_generic(addr1, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server1); + tokio::spawn(server2); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (mut input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + drop(trigger1); + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let (mut output_lines, mut output_lines2) = + futures::future::join(get_received(rx1, |_| {}), get_received(rx2, |_| {})).await; + + output_lines.append(&mut output_lines2); + + input_lines.sort(); + output_lines.sort(); + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + } + + #[tokio::test] + async fn failover_strategy_prefers_first_address() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + endpoint_strategy = "failover" + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + let (rx1, trigger1, server1) = build_test_server_generic(addr1, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server1); + tokio::spawn(server2); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + drop(trigger1); + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let (output_lines, output_lines2) = + futures::future::join(get_received(rx1, |_| {}), get_received(rx2, |_| {})).await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + assert!(output_lines2.is_empty()); + } + + #[tokio::test] + async fn failover_strategy_uses_next_address_when_first_fails() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + endpoint_strategy = "failover" + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server2); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = get_received(rx2, |_| {}).await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + } + + #[tokio::test] + async fn failover_primary_strategy_retries_primary_before_secondary() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + endpoint_strategy = "failover_primary" + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let primary_attempts = Arc::new(AtomicUsize::new(0)); + let primary_service_attempts = Arc::clone(&primary_attempts); + let (rx1, trigger1, server1) = build_test_server_generic(addr1, move || { + if primary_service_attempts.fetch_add(1, Ordering::AcqRel) == 0 { + hyper::Response::builder() + .header("grpc-status", "14") // unavailable + .header("content-type", "application/grpc") + .body(hyper::Body::empty()) + .unwrap() + } else { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + } + }); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server1); + tokio::spawn(server2); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + drop(trigger1); + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + assert_eq!( + primary_attempts.load(Ordering::Acquire), + 2, + "retriable primary failure should retry configured primary before secondary" + ); + + let output_lines = get_received(rx1, |_| {}).await; + let secondary_output_lines = get_received(rx2, |_| {}).await; + + assert_eq!(num_lines * 2, output_lines.len()); + assert!( + output_lines + .chunks(num_lines) + .all(|lines| lines == input_lines) + ); + assert!( + secondary_output_lines.is_empty(), + "secondary must not receive traffic when configured primary succeeds on retry" + ); + } + + #[tokio::test] + async fn failover_strategy_continues_in_ring_order_after_active_failure() { + let num_lines = 2; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + let (_guard3, addr3) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/", "http://{addr3}/"] + endpoint_strategy = "failover" + batch.max_events = 1 + request.concurrency = 1 + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let primary_attempts = Arc::new(AtomicUsize::new(0)); + let primary_service_attempts = Arc::clone(&primary_attempts); + let (_rx1, trigger1, server1) = build_test_server_generic(addr1, move || { + primary_service_attempts.fetch_add(1, Ordering::AcqRel); + hyper::Response::builder() + .header("grpc-status", "14") // unavailable + .header("content-type", "application/grpc") + .body(hyper::Body::empty()) + .unwrap() + }); + + let active_attempts = Arc::new(AtomicUsize::new(0)); + let active_service_attempts = Arc::clone(&active_attempts); + let (_rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + if active_service_attempts.fetch_add(1, Ordering::AcqRel) == 0 { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + } else { + hyper::Response::builder() + .header("grpc-status", "14") // unavailable + .header("content-type", "application/grpc") + .body(hyper::Body::empty()) + .unwrap() + } + }); + + let next_attempts = Arc::new(AtomicUsize::new(0)); + let next_service_attempts = Arc::clone(&next_attempts); + let (_rx3, trigger3, server3) = build_test_server_generic(addr3, move || { + next_service_attempts.fetch_add(1, Ordering::AcqRel); + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server1); + tokio::spawn(server2); + tokio::spawn(server3); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + drop(trigger1); + drop(trigger2); + drop(trigger3); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + assert_eq!(primary_attempts.load(Ordering::Acquire), 1); + assert_eq!(active_attempts.load(Ordering::Acquire), 2); + assert_eq!(next_attempts.load(Ordering::Acquire), 1); + } + + #[tokio::test] + async fn failover_primary_strategy_retries_configured_primary_after_active_failure() { + let num_lines = 2; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + let (_guard3, addr3) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/", "http://{addr3}/"] + endpoint_strategy = "failover_primary" + batch.max_events = 1 + request.concurrency = 1 + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let primary_attempts = Arc::new(AtomicUsize::new(0)); + let primary_service_attempts = Arc::clone(&primary_attempts); + let (_rx1, trigger1, server1) = build_test_server_generic(addr1, move || { + if primary_service_attempts.fetch_add(1, Ordering::AcqRel) < 2 { + hyper::Response::builder() + .header("grpc-status", "14") // unavailable + .header("content-type", "application/grpc") + .body(hyper::Body::empty()) + .unwrap() + } else { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + } + }); + + let active_attempts = Arc::new(AtomicUsize::new(0)); + let active_service_attempts = Arc::clone(&active_attempts); + let (_rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + if active_service_attempts.fetch_add(1, Ordering::AcqRel) == 0 { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + } else { + hyper::Response::builder() + .header("grpc-status", "14") // unavailable + .header("content-type", "application/grpc") + .body(hyper::Body::empty()) + .unwrap() + } + }); + + let next_attempts = Arc::new(AtomicUsize::new(0)); + let next_service_attempts = Arc::clone(&next_attempts); + let (_rx3, trigger3, server3) = build_test_server_generic(addr3, move || { + next_service_attempts.fetch_add(1, Ordering::AcqRel); + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server1); + tokio::spawn(server2); + tokio::spawn(server3); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + drop(trigger1); + drop(trigger2); + drop(trigger3); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + assert_eq!(primary_attempts.load(Ordering::Acquire), 3); + assert_eq!(active_attempts.load(Ordering::Acquire), 2); + assert_eq!(next_attempts.load(Ordering::Acquire), 0); + } + + #[tokio::test] + async fn failover_strategy_does_not_resend_non_retriable_errors() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + endpoint_strategy = "failover" + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + let (_rx1, trigger1, server1) = build_test_server_generic(addr1, move || { + hyper::Response::builder() + .header("grpc-status", "15") // data loss + .header("content-type", "application/grpc") + .body(tonic::body::empty_body()) + .unwrap() + }); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server1); + tokio::spawn(server2); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + sink.run(events).await.expect("Running sink failed"); + + drop(trigger1); + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); + assert!( + get_received(rx2, |_| {}).await.is_empty(), + "non-retriable primary rejection must not be resent to secondary endpoint" + ); + } + + #[tokio::test] + async fn failover_strategy_uses_next_address_when_first_times_out() { + let num_lines = 10; + + let (_guard1, addr1) = next_addr(); + let (_guard2, addr2) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr1}/", "http://{addr2}/"] + endpoint_strategy = "failover" + + [request] + timeout_secs = 1 + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let hanging_service = make_service_fn(|_| async { + Ok::<_, crate::Error>(service_fn(|_req| async { + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + Ok::<_, crate::Error>( + Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap(), + ) + })) + }); + let hanging_server = tokio::spawn(Server::bind(&addr1).serve(hanging_service)); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + let (rx2, trigger2, server2) = build_test_server_generic(addr2, move || { + hyper::Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap() + }); + + tokio::spawn(server2); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + hanging_server.abort(); + drop(trigger2); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = get_received(rx2, |_| {}).await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + } + + #[tokio::test] + async fn failover_strategy_retries_after_all_endpoints_time_out() { + let num_lines = 10; + + let (_guard, addr) = next_addr(); + + let config = format!( + r#" + addresses = ["http://{addr}/"] + endpoint_strategy = "failover" + + [request] + timeout_secs = 1 + retry_initial_backoff_secs = 1 + retry_max_duration_secs = 5 + "# + ); + let config: VectorConfig = toml::from_str(&config).unwrap(); + + let attempts = Arc::new(AtomicUsize::new(0)); + let service_attempts = Arc::clone(&attempts); + let service = make_service_fn(move |_| { + let service_attempts = Arc::clone(&service_attempts); + async move { + Ok::<_, crate::Error>(service_fn(move |_req| { + let attempt = service_attempts.fetch_add(1, Ordering::AcqRel); + async move { + if attempt == 0 { + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } + + Ok::<_, crate::Error>( + Response::builder() + .header("grpc-status", "0") // OK + .header("content-type", "application/grpc") + .body(hyper::Body::from(encode_body(proto::PushEventsResponse {}))) + .unwrap(), + ) + } + })) + } + }); + let server = tokio::spawn(Server::bind(&addr).serve(service)); + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); + + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + + server.abort(); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + assert!( + attempts.load(Ordering::Acquire) > 1, + "sink should retry after endpoint timeout" + ); + } + #[tokio::test] async fn deliver_message_gzip() { run_sink_test_with_compression(TestType::Normal, Some("gzip")).await; diff --git a/website/cue/reference/components/sinks/generated/vector.cue b/website/cue/reference/components/sinks/generated/vector.cue index c1bc55fcfe0fe..bbf4bac0554df 100644 --- a/website/cue/reference/components/sinks/generated/vector.cue +++ b/website/cue/reference/components/sinks/generated/vector.cue @@ -34,10 +34,30 @@ generated: components: sinks: vector: configuration: { Both IP address and hostname are accepted formats. The address _must_ include a port. + + This option is mutually exclusive with `addresses`. Set exactly one of + `address` or `addresses`. """ - required: true + required: false type: string: examples: ["92.12.333.224:6000", "https://somehost:6000"] } + addresses: { + description: """ + The downstream Vector addresses to which to connect. + + Both IP addresses and hostnames are accepted formats. + + Each address _must_ include a port. + + This option is mutually exclusive with `address`. Set exactly one of + `address` or `addresses`. + """ + required: false + type: array: { + default: [] + items: type: string: examples: ["92.12.333.224:6000", "https://somehost:6000"] + } + } batch: { description: "Event batching behavior." required: false @@ -98,6 +118,59 @@ generated: components: sinks: vector: configuration: { } } } + endpoint_health: { + description: "Options for determining the health of Vector endpoints." + required: false + type: object: options: { + retry_initial_backoff_secs: { + description: "Initial delay between attempts to reactivate endpoints once they become unhealthy." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_max_duration_secs: { + description: "Maximum delay between attempts to reactivate endpoints once they become unhealthy." + required: false + type: uint: { + default: 3600 + unit: "seconds" + } + } + } + } + endpoint_strategy: { + description: """ + Strategy for routing requests across multiple configured addresses. + + This option is only used when `addresses` is configured. + """ + required: false + type: string: { + default: "load_balance" + enum: { + failover: """ + Use one endpoint at a time. When the active endpoint fails, continue + through the configured addresses from the next endpoint. + + This mode keeps using the last successful endpoint until it fails. Use + `failover_primary` instead when retriable failures should re-check the + first configured address before trying secondary endpoints. + """ + failover_primary: """ + Use one endpoint at a time. When the active endpoint fails, retry from + the configured address order so the sink can return to its configured + primary endpoint. + + This is useful when receiver-side connection recycling, such as + `max_connection_age_secs`, should converge the sink back to the first + configured address when it is available. + """ + load_balance: "Distribute requests across healthy endpoints." + } + } + } request: { description: """ Middleware settings for outbound requests.