Skip to content

Commit 25ceca6

Browse files
committed
feat(vector sink): add multiple endpoint strategies
1 parent 5d41252 commit 25ceca6

4 files changed

Lines changed: 719 additions & 28 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add support for configuring multiple endpoints in the `vector` sink via the new `addresses` option, enabling built-in load balancing and failover across downstream Vector instances.
2+
3+
authors: fpytloun

src/sinks/vector/config.rs

Lines changed: 268 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
1+
use std::{
2+
sync::{
3+
Arc,
4+
atomic::{AtomicUsize, Ordering},
5+
},
6+
task::{Context, Poll},
7+
};
8+
9+
use futures::{FutureExt, TryFutureExt, future::BoxFuture};
110
use http::Uri;
211
use hyper::client::HttpConnector;
312
use hyper_openssl::HttpsConnector;
413
use hyper_proxy::ProxyConnector;
514
use tonic::body::BoxBody;
6-
use tower::ServiceBuilder;
15+
use tower::{Service, ServiceBuilder};
716
use vector_lib::configurable::configurable_component;
817

918
use super::{
@@ -22,8 +31,9 @@ use crate::{
2231
sinks::{
2332
Healthcheck, VectorSink as VectorSinkType,
2433
util::{
25-
BatchConfig, RealtimeEventBasedDefaultBatchSettings, ServiceBuilderExt,
26-
TowerRequestConfig, retries::RetryLogic,
34+
BatchConfig, RealtimeEventBasedDefaultBatchSettings, TowerRequestConfig,
35+
retries::RetryLogic,
36+
service::{HealthConfig, HealthLogic, ServiceBuilderExt},
2737
},
2838
},
2939
tls::{MaybeTlsSettings, TlsEnableableConfig},
@@ -45,10 +55,28 @@ pub struct VectorConfig {
4555
/// Both IP address and hostname are accepted formats.
4656
///
4757
/// The address _must_ include a port.
58+
///
59+
/// This option is mutually exclusive with `addresses`. Set exactly one of
60+
/// `address` or `addresses`.
4861
#[configurable(validation(format = "uri"))]
4962
#[configurable(metadata(docs::examples = "92.12.333.224:6000"))]
5063
#[configurable(metadata(docs::examples = "https://somehost:6000"))]
51-
address: String,
64+
#[serde(default)]
65+
address: Option<String>,
66+
67+
/// The downstream Vector addresses to which to connect.
68+
///
69+
/// Both IP addresses and hostnames are accepted formats.
70+
///
71+
/// Each address _must_ include a port.
72+
///
73+
/// This option is mutually exclusive with `address`. Set exactly one of
74+
/// `address` or `addresses`.
75+
#[configurable(validation(format = "uri"))]
76+
#[configurable(metadata(docs::examples = "92.12.333.224:6000"))]
77+
#[configurable(metadata(docs::examples = "https://somehost:6000"))]
78+
#[serde(default)]
79+
addresses: Vec<String>,
5280

5381
/// Compression algorithm for requests.
5482
///
@@ -72,6 +100,17 @@ pub struct VectorConfig {
72100
#[serde(default)]
73101
pub request: TowerRequestConfig,
74102

103+
/// Options for determining the health of Vector endpoints.
104+
#[serde(default)]
105+
#[configurable(derived)]
106+
pub endpoint_health: Option<HealthConfig>,
107+
108+
/// Strategy for routing requests across multiple configured addresses.
109+
///
110+
/// This option is only used when `addresses` is configured.
111+
#[serde(default)]
112+
pub endpoint_strategy: EndpointStrategy,
113+
75114
#[configurable(derived)]
76115
#[serde(default)]
77116
tls: Option<TlsEnableableConfig>,
@@ -102,10 +141,13 @@ impl GenerateConfig for VectorConfig {
102141
fn default_config(address: &str) -> VectorConfig {
103142
VectorConfig {
104143
version: None,
105-
address: address.to_owned(),
144+
address: Some(address.to_owned()),
145+
addresses: Vec::new(),
106146
compression: VectorCompression::None,
107147
batch: BatchConfig::default(),
108148
request: TowerRequestConfig::default(),
149+
endpoint_health: None,
150+
endpoint_strategy: EndpointStrategy::default(),
109151
tls: None,
110152
acknowledgements: Default::default(),
111153
}
@@ -116,36 +158,73 @@ fn default_config(address: &str) -> VectorConfig {
116158
impl SinkConfig for VectorConfig {
117159
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSinkType, Healthcheck)> {
118160
let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
119-
let uri = with_default_scheme(&self.address, tls.is_tls())?;
161+
let uris = self.uris(tls.is_tls())?;
120162

121163
let client = new_client(&tls, cx.proxy())?;
122164

123-
let healthcheck_uri = cx
124-
.healthcheck
125-
.uri
126-
.clone()
127-
.map(|uri| uri.uri)
128-
.unwrap_or_else(|| uri.clone());
129-
let healthcheck_client =
130-
VectorService::new(client.clone(), healthcheck_uri, VectorCompression::None);
131-
let healthcheck = healthcheck(healthcheck_client, cx.healthcheck);
132-
let service = VectorService::new(client, uri, self.compression);
165+
let healthcheck = healthchecks(client.clone(), &uris, cx.healthcheck);
133166
let request_settings = self.request.into_settings();
134167
let batch_settings = self.batch.into_batcher_settings()?;
135168

136-
let service = ServiceBuilder::new()
137-
.settings(request_settings, VectorGrpcRetryLogic)
138-
.service(service);
169+
let services = uris
170+
.into_iter()
171+
.map(|uri| {
172+
let endpoint = uri.to_string();
173+
let service = VectorService::new(client.clone(), uri, self.compression);
174+
(endpoint, service)
175+
})
176+
.collect::<Vec<_>>();
177+
178+
let sink = match self.endpoint_strategy {
179+
_ if services.len() == 1 => {
180+
let service = ServiceBuilder::new()
181+
.settings(request_settings, VectorGrpcRetryLogic)
182+
.service(services.into_iter().next().expect("one service").1);
183+
184+
VectorSinkType::from_event_streamsink(VectorSink {
185+
batch_settings,
186+
service,
187+
})
188+
}
189+
EndpointStrategy::LoadBalance => {
190+
let service = request_settings.distributed_service(
191+
VectorGrpcRetryLogic,
192+
services,
193+
self.endpoint_health.clone().unwrap_or_default(),
194+
VectorGrpcHealthLogic,
195+
1,
196+
);
139197

140-
let sink = VectorSink {
141-
batch_settings,
142-
service,
198+
VectorSinkType::from_event_streamsink(VectorSink {
199+
batch_settings,
200+
service,
201+
})
202+
}
203+
EndpointStrategy::Failover => {
204+
let endpoint_timeout = request_settings.timeout;
205+
let mut failover_request_settings = request_settings;
206+
failover_request_settings.timeout = endpoint_timeout
207+
.checked_mul((services.len() + 1) as u32)
208+
.unwrap_or(endpoint_timeout);
209+
210+
let service = ServiceBuilder::new()
211+
.settings(failover_request_settings, VectorGrpcRetryLogic)
212+
.service(FailoverVectorService::new(
213+
services
214+
.into_iter()
215+
.map(|(_endpoint, service)| service)
216+
.collect(),
217+
endpoint_timeout,
218+
));
219+
220+
VectorSinkType::from_event_streamsink(VectorSink {
221+
batch_settings,
222+
service,
223+
})
224+
}
143225
};
144226

145-
Ok((
146-
VectorSinkType::from_event_streamsink(sink),
147-
Box::pin(healthcheck),
148-
))
227+
Ok((sink, Box::pin(healthcheck)))
149228
}
150229

151230
fn input(&self) -> Input {
@@ -157,6 +236,124 @@ impl SinkConfig for VectorConfig {
157236
}
158237
}
159238

239+
/// Strategy for routing requests across multiple Vector endpoints.
240+
#[configurable_component]
241+
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
242+
#[serde(rename_all = "snake_case")]
243+
pub enum EndpointStrategy {
244+
/// Distribute requests across healthy endpoints.
245+
#[default]
246+
LoadBalance,
247+
/// Use endpoints in configured order, moving to the next endpoint only when
248+
/// the current endpoint fails.
249+
Failover,
250+
}
251+
252+
#[derive(Clone)]
253+
struct FailoverVectorService {
254+
services: Vec<VectorService>,
255+
active: Arc<AtomicUsize>,
256+
endpoint_timeout: std::time::Duration,
257+
}
258+
259+
impl FailoverVectorService {
260+
fn new(services: Vec<VectorService>, endpoint_timeout: std::time::Duration) -> Self {
261+
Self {
262+
services,
263+
active: Arc::new(AtomicUsize::new(0)),
264+
endpoint_timeout,
265+
}
266+
}
267+
}
268+
269+
impl Service<VectorRequest> for FailoverVectorService {
270+
type Response = VectorResponse;
271+
type Error = crate::Error;
272+
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
273+
274+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
275+
Poll::Ready(Ok(()))
276+
}
277+
278+
fn call(&mut self, request: VectorRequest) -> Self::Future {
279+
let services = self.services.clone();
280+
let active = Arc::clone(&self.active);
281+
let endpoint_timeout = self.endpoint_timeout;
282+
283+
Box::pin(async move {
284+
let start = active.load(Ordering::Acquire);
285+
let mut last_error = None;
286+
287+
for offset in 0..services.len() {
288+
let index = (start + offset) % services.len();
289+
let next_index = (index + 1) % services.len();
290+
let mut service = services[index].clone();
291+
292+
match tokio::time::timeout(endpoint_timeout, service.call(request.clone())).await {
293+
Ok(Ok(response)) => {
294+
let current = active.load(Ordering::Acquire);
295+
if current == start || current == index {
296+
active.store(index, Ordering::Release);
297+
}
298+
return Ok(response);
299+
}
300+
Ok(Err(error)) => {
301+
if !is_retriable_vector_error(&error) {
302+
return Err(error);
303+
}
304+
305+
let _ = active.compare_exchange(
306+
index,
307+
next_index,
308+
Ordering::AcqRel,
309+
Ordering::Acquire,
310+
);
311+
last_error = Some(error);
312+
}
313+
Err(_elapsed) => {
314+
let _ = active.compare_exchange(
315+
index,
316+
next_index,
317+
Ordering::AcqRel,
318+
Ordering::Acquire,
319+
);
320+
last_error = Some(Box::new(VectorSinkError::Request {
321+
source: tonic::Status::deadline_exceeded(
322+
"vector endpoint request timed out",
323+
),
324+
}) as crate::Error);
325+
}
326+
}
327+
}
328+
329+
Err(last_error.expect("failover service should have at least one endpoint"))
330+
})
331+
}
332+
}
333+
334+
fn is_retriable_vector_error(error: &crate::Error) -> bool {
335+
error
336+
.downcast_ref::<VectorSinkError>()
337+
.is_none_or(|error| VectorGrpcRetryLogic.is_retriable_error(error))
338+
}
339+
340+
impl VectorConfig {
341+
fn uris(&self, tls: bool) -> crate::Result<Vec<Uri>> {
342+
match (self.address.as_ref(), self.addresses.as_slice()) {
343+
(Some(_), [_first, ..]) => Err(
344+
"`address` and `addresses` options are mutually exclusive. Please use `addresses` for multiple Vector endpoints."
345+
.into(),
346+
),
347+
(None, []) => Err("No Vector endpoint configured. Please set `address` or `addresses`.".into()),
348+
(Some(address), []) => Ok(vec![with_default_scheme(address, tls)?]),
349+
(None, addresses) => addresses
350+
.iter()
351+
.map(|address| with_default_scheme(address, tls))
352+
.collect(),
353+
}
354+
}
355+
}
356+
160357
/// Check to see if the remote service accepts new events.
161358
async fn healthcheck(
162359
mut service: VectorService,
@@ -183,6 +380,39 @@ async fn healthcheck(
183380
}
184381
}
185382

383+
fn healthchecks(
384+
client: hyper::Client<ProxyConnector<HttpsConnector<HttpConnector>>, BoxBody>,
385+
uris: &[Uri],
386+
options: SinkHealthcheckOptions,
387+
) -> Healthcheck {
388+
if !options.enabled {
389+
return Box::pin(futures::future::ok(()));
390+
}
391+
392+
let healthcheck_uris = options
393+
.uri
394+
.clone()
395+
.map(|uri| vec![uri.uri])
396+
.unwrap_or_else(|| uris.to_vec());
397+
398+
Box::pin(
399+
futures::future::select_ok(healthcheck_uris.into_iter().map(move |uri| {
400+
let service = VectorService::new(client.clone(), uri, VectorCompression::None);
401+
let timeout = options.timeout;
402+
healthcheck(
403+
service,
404+
SinkHealthcheckOptions {
405+
enabled: true,
406+
uri: None,
407+
timeout,
408+
},
409+
)
410+
.boxed()
411+
}))
412+
.map_ok(|((), _)| ()),
413+
)
414+
}
415+
186416
/// grpc doesn't like an address without a scheme, so we default to http or https if one isn't
187417
/// specified in the address.
188418
pub fn with_default_scheme(address: &str, tls: bool) -> crate::Result<Uri> {
@@ -256,3 +486,15 @@ impl RetryLogic for VectorGrpcRetryLogic {
256486
}
257487
}
258488
}
489+
490+
#[derive(Debug, Clone)]
491+
struct VectorGrpcHealthLogic;
492+
493+
impl HealthLogic for VectorGrpcHealthLogic {
494+
type Error = crate::Error;
495+
type Response = VectorResponse;
496+
497+
fn is_healthy(&self, response: &Result<Self::Response, Self::Error>) -> Option<bool> {
498+
Some(response.is_ok())
499+
}
500+
}

0 commit comments

Comments
 (0)