Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/sinks/util/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ mod tests {
};

use futures::{FutureExt, SinkExt, StreamExt, future, stream};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
use tower::{Service, ServiceExt, service_fn};
use vector_lib::json_size::JsonSize;

use super::*;
Expand Down Expand Up @@ -594,4 +596,80 @@ mod tests {
RetryAction::Successful
}
}

#[derive(Clone, Copy, Debug)]
struct RetryNever;

impl RetryLogic for RetryNever {
type Error = std::io::Error;
type Request = usize;
type Response = ();

fn is_retriable_error(&self, _: &Self::Error) -> bool {
false
}
}

#[derive(Clone, Copy, Debug)]
struct AlwaysHealthy;

impl HealthLogic for AlwaysHealthy {
type Error = crate::Error;
type Response = ();

fn is_healthy(&self, response: &Result<Self::Response, Self::Error>) -> Option<bool> {
Some(response.is_ok())
}
}

#[tokio::test]
async fn distributed_service_prefers_less_loaded_endpoint() {
let settings = TowerRequestConfig::<GlobalTowerRequestConfigDefaults> {
concurrency: Concurrency::Fixed(1),
..TowerRequestConfig::default()
}
.into_settings();

let (requests_tx, mut requests_rx) =
mpsc::unbounded_channel::<(&'static str, usize, oneshot::Sender<()>)>();

let make_service = |id| {
let requests_tx = requests_tx.clone();
service_fn(move |request: usize| {
let requests_tx = requests_tx.clone();
async move {
let (done_tx, done_rx) = oneshot::channel();
requests_tx.send((id, request, done_tx)).unwrap();
let _ = done_rx.await;
Ok::<(), std::io::Error>(())
}
})
};

let mut service = settings.distributed_service(
RetryNever,
vec![
("endpoint-a".to_owned(), make_service("a")),
("endpoint-b".to_owned(), make_service("b")),
],
HealthConfig::default(),
AlwaysHealthy,
1,
);

let first = tokio::spawn(service.ready().await.unwrap().call(1));
let (first_id, first_request, first_done) = requests_rx.recv().await.unwrap();
assert_eq!(first_request, 1);

let second = tokio::spawn(service.ready().await.unwrap().call(2));
let (second_id, second_request, second_done) = requests_rx.recv().await.unwrap();
assert_eq!(second_request, 2);
assert_ne!(first_id, second_id);

first_done.send(()).unwrap();
second_done.send(()).unwrap();

first.await.unwrap().unwrap();
second.await.unwrap().unwrap();
}
}
207 changes: 172 additions & 35 deletions src/sinks/vector/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use hyper_openssl::HttpsConnector;
use hyper_proxy::ProxyConnector;
use tonic::body::BoxBody;
use tower::ServiceBuilder;
use tower::timeout::error::Elapsed;
use vector_lib::configurable::configurable_component;

use super::{
Expand All @@ -21,6 +22,7 @@ use crate::{
proto::vector as proto,
sinks::{
Healthcheck, VectorSink as VectorSinkType,
util::service::{HealthConfig, HealthLogic},
util::{
BatchConfig, RealtimeEventBasedDefaultBatchSettings, ServiceBuilderExt,
TowerRequestConfig, retries::RetryLogic,
Expand All @@ -29,6 +31,29 @@ use crate::{
tls::{MaybeTlsSettings, TlsEnableableConfig},
};

const fn default_connection_concurrency() -> usize {
1
}

/// Connection settings for the `vector` sink.
#[configurable_component]
#[derive(Clone, Copy, Debug)]
#[serde(deny_unknown_fields)]
pub struct VectorConnectionConfig {
/// The number of outbound gRPC connections to open to the configured endpoint.
#[serde(default = "default_connection_concurrency")]
#[configurable(validation(range(min = 1)))]
pub concurrency: usize,
}

impl Default for VectorConnectionConfig {
fn default() -> Self {
Self {
concurrency: default_connection_concurrency(),
}
}
}

/// Configuration for the `vector` sink.
#[configurable_component(sink("vector", "Relay observability data to a Vector instance."))]
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -72,6 +97,10 @@ pub struct VectorConfig {
#[serde(default)]
pub request: TowerRequestConfig,

#[configurable(derived)]
#[serde(default)]
pub connection: VectorConnectionConfig,

#[configurable(derived)]
#[serde(default)]
tls: Option<TlsEnableableConfig>,
Expand Down Expand Up @@ -106,6 +135,7 @@ fn default_config(address: &str) -> VectorConfig {
compression: VectorCompression::None,
batch: BatchConfig::default(),
request: TowerRequestConfig::default(),
connection: VectorConnectionConfig::default(),
tls: None,
acknowledgements: Default::default(),
}
Expand All @@ -115,37 +145,67 @@ fn default_config(address: &str) -> VectorConfig {
#[typetag::serde(name = "vector")]
impl SinkConfig for VectorConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSinkType, Healthcheck)> {
if self.connection.concurrency == 0 {
return Err(Box::new(VectorSinkError::InvalidConnectionConcurrency {
value: self.connection.concurrency,
}));
}

let proxy = cx.proxy().clone();
let healthcheck_options = cx.healthcheck.clone();
let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
let uri = with_default_scheme(&self.address, tls.is_tls())?;

let client = new_client(&tls, cx.proxy())?;

let healthcheck_uri = cx
.healthcheck
let healthcheck_uri = healthcheck_options
.uri
.clone()
.map(|uri| uri.uri)
.unwrap_or_else(|| uri.clone());
let healthcheck_client =
VectorService::new(client.clone(), healthcheck_uri, VectorCompression::None);
let healthcheck = healthcheck(healthcheck_client, cx.healthcheck);
let service = VectorService::new(client, uri, self.compression);
let healthcheck_client = VectorService::new(
new_client(&tls, &proxy)?,
healthcheck_uri,
VectorCompression::None,
);
let healthcheck = healthcheck(healthcheck_client, healthcheck_options);
let request_settings = self.request.into_settings();
let batch_settings = self.batch.into_batcher_settings()?;
let sink = if self.connection.concurrency == 1 {
let client = new_client(&tls, &proxy)?;
let service = VectorService::new(client, uri, self.compression);
let service = ServiceBuilder::new()
.settings(request_settings, VectorGrpcRetryLogic)
.service(service);

let service = ServiceBuilder::new()
.settings(request_settings, VectorGrpcRetryLogic)
.service(service);
VectorSinkType::from_event_streamsink(VectorSink {
batch_settings: self.batch.into_batcher_settings()?,
service,
})
} else {
let endpoint = uri.to_string();
let services = (0..self.connection.concurrency)
.map(|_| {
let client = new_client(&tls, &proxy)?;
Ok((
endpoint.clone(),
VectorService::new(client, uri.clone(), self.compression),
))
})
.collect::<crate::Result<Vec<_>>>()?;

let service = request_settings.distributed_service(
VectorGrpcRetryLogic,
services,
HealthConfig::default(),
VectorHealthLogic,
1,
);

let sink = VectorSink {
batch_settings,
service,
VectorSinkType::from_event_streamsink(VectorSink {
batch_settings: self.batch.into_batcher_settings()?,
service,
})
};

Ok((
VectorSinkType::from_event_streamsink(sink),
Box::pin(healthcheck),
))
Ok((sink, Box::pin(healthcheck)))
}

fn input(&self) -> Input {
Expand Down Expand Up @@ -229,30 +289,107 @@ fn new_client(
#[derive(Debug, Clone)]
struct VectorGrpcRetryLogic;

fn is_permanent_grpc_status(code: tonic::Code) -> bool {
use tonic::Code::*;

matches!(
code,
// List taken from
//
// <https://github.com/grpc/grpc/blob/ed1b20777c69bd47e730a63271eafc1b299f6ca0/doc/statuscodes.md>
NotFound
| InvalidArgument
| AlreadyExists
| PermissionDenied
| OutOfRange
| Unimplemented
| Unauthenticated
| DataLoss
)
}

impl RetryLogic for VectorGrpcRetryLogic {
type Error = VectorSinkError;
type Request = VectorRequest;
type Response = VectorResponse;

fn is_retriable_error(&self, err: &Self::Error) -> bool {
use tonic::Code::*;

match err {
VectorSinkError::Request { source } => !matches!(
source.code(),
// List taken from
//
// <https://github.com/grpc/grpc/blob/ed1b20777c69bd47e730a63271eafc1b299f6ca0/doc/statuscodes.md>
NotFound
| InvalidArgument
| AlreadyExists
| PermissionDenied
| OutOfRange
| Unimplemented
| Unauthenticated
| DataLoss
),
VectorSinkError::Request { source } => !is_permanent_grpc_status(source.code()),
_ => true,
}
}
}

#[derive(Clone)]
struct VectorHealthLogic;

impl HealthLogic for VectorHealthLogic {
type Error = crate::Error;
type Response = VectorResponse;

fn is_healthy(&self, response: &Result<Self::Response, Self::Error>) -> Option<bool> {
match response {
Ok(_) => Some(true),
Err(error) => {
if error.downcast_ref::<Elapsed>().is_some() {
return Some(false);
}

error
.downcast_ref::<VectorSinkError>()
.and_then(|err| match err {
VectorSinkError::Request { source } => {
if is_permanent_grpc_status(source.code()) {
None
} else {
Some(false)
}
}
_ => None,
})
}
}
}
}

#[cfg(test)]
mod tests {
use tonic::{Code, Status};

use super::*;

#[test]
fn grpc_retry_logic_does_not_retry_permanent_statuses() {
let logic = VectorGrpcRetryLogic;

assert!(!logic.is_retriable_error(&VectorSinkError::Request {
source: Status::new(Code::InvalidArgument, "invalid request"),
}));
assert!(logic.is_retriable_error(&VectorSinkError::Request {
source: Status::new(Code::Unavailable, "temporarily unavailable"),
}));
}

#[test]
fn grpc_health_logic_only_marks_transient_statuses_unhealthy() {
let logic = VectorHealthLogic;

assert_eq!(
logic.is_healthy(&Err(Box::new(VectorSinkError::Request {
source: Status::new(Code::InvalidArgument, "invalid request"),
}))),
None
);
assert_eq!(
logic.is_healthy(&Err(Box::new(VectorSinkError::Request {
source: Status::new(Code::Unavailable, "temporarily unavailable"),
}))),
Some(false)
);
assert_eq!(
logic.is_healthy(&Err(Box::new(Elapsed::new()))),
Some(false)
);
}
}
Loading
Loading