Skip to content

Commit 4e95802

Browse files
jpdsclaudepront
authored
fix(vector source): implement standard gRPC health checking protocol (#24916)
* fix(vector source): implement standard gRPC health checking protocol Adds support for the standard gRPC health checking protocol (grpc.health.v1.Health) to the vector source alongside the existing custom health check endpoint. This enables compatibility with standard health checking tools like grpc-health-probe, which is commonly used in Kubernetes and other orchestration systems. Changes: - Added tonic-health dependency to support standard gRPC health protocol - Implemented grpc.health.v1.Health service in vector source - Maintained backward compatibility with existing custom health check - Added tests for both standard and custom health check endpoints - Both health services run on the same port using RoutesBuilder The custom vector.Vector/HealthCheck endpoint continues to work for existing Vector-to-Vector communication, while the new grpc.health.v1.Health/Check endpoint provides compatibility with standard tooling. Fixes: #23657 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * update licenses * cargo fmt * changelog edits --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com> Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
1 parent 891643e commit 4e95802

6 files changed

Lines changed: 139 additions & 7 deletions

File tree

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ tokio-tungstenite = { version = "0.20.1", default-features = false }
204204
toml = { version = "0.9.8", default-features = false, features = ["serde", "display", "parse"] }
205205
tonic = { version = "0.11", default-features = false, features = ["transport", "codegen", "prost", "tls", "tls-roots", "gzip"] }
206206
tonic-build = { version = "0.11", default-features = false, features = ["transport", "prost"] }
207+
tonic-health = { version = "0.11", default-features = false }
207208
tracing = { version = "0.1.44", default-features = false }
208209
tracing-subscriber = { version = "0.3.22", default-features = false, features = ["fmt"] }
209210
url = { version = "2.5.4", default-features = false, features = ["serde"] }
@@ -436,6 +437,7 @@ tokio-tungstenite = { workspace = true, features = ["connect"], optional = true
436437
toml.workspace = true
437438
hickory-proto = { workspace = true, optional = true }
438439
tonic = { workspace = true, optional = true }
440+
tonic-health = { workspace = true, optional = true }
439441
thread_local = { version = "1.1.9", default-features = false, optional = true }
440442
typetag = { version = "0.2.20", default-features = false }
441443
url.workspace = true
@@ -735,7 +737,7 @@ sources-websocket = ["dep:tokio-tungstenite"]
735737
sources-windows_event_log = ["dep:windows", "dep:quick-xml", "dep:governor"]
736738
sources-windows_event_log-integration-tests = ["sources-windows_event_log"]
737739

738-
sources-vector = ["dep:prost", "dep:tonic", "protobuf-build"]
740+
sources-vector = ["dep:prost", "dep:tonic", "dep:tonic-health", "protobuf-build"]
739741

740742
# Transforms
741743
transforms = ["transforms-logs", "transforms-metrics"]

LICENSE-3rdparty.csv

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,7 @@ toml_parser,https://github.com/toml-rs/toml,MIT OR Apache-2.0,The toml_parser Au
796796
toml_write,https://github.com/toml-rs/toml,MIT OR Apache-2.0,The toml_write Authors
797797
toml_writer,https://github.com/toml-rs/toml,MIT OR Apache-2.0,The toml_writer Authors
798798
tonic,https://github.com/hyperium/tonic,MIT,Lucio Franco <luciofranco14@gmail.com>
799+
tonic-health,https://github.com/hyperium/tonic,MIT,James Nugent <james@jen20.com>
799800
tower,https://github.com/tower-rs/tower,MIT,Tower Maintainers <team@tower-rs.com>
800801
tower-http,https://github.com/tower-rs/tower-http,MIT,Tower Maintainers <team@tower-rs.com>
801802
tower-layer,https://github.com/tower-rs/tower,MIT,Tower Maintainers <team@tower-rs.com>
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
`vector` source: Implement standard gRPC health checking protocol (`grpc.health.v1.Health`)
2+
alongside the existing custom health check endpoint. This enables compatibility with standard
3+
tools like `grpc-health-probe` for Kubernetes and other orchestration systems.
4+
5+
Issue: https://github.com/vectordotdev/vector/issues/23657
6+
7+
authors: jpds

src/sinks/vector/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ async fn healthcheck(
159159
return Ok(());
160160
}
161161

162+
// Use the custom Vector health check
163+
// Note: Both custom and standard health checks behave identically - they just
164+
// return serving status without actual health validation. The Vector source
165+
// implements both protocols now for compatibility.
162166
let request = service.client.health_check(proto::HealthCheckRequest {});
163167
match request.await {
164168
Ok(response) => match proto::ServingStatus::try_from(response.into_inner().status) {

src/sources/vector/mod.rs

Lines changed: 110 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use std::net::SocketAddr;
33

44
use chrono::Utc;
55
use futures::TryFutureExt;
6-
use tonic::{Request, Response, Status};
6+
use tonic::{Request, Response, Status, transport::server::RoutesBuilder};
7+
use tonic_health::server::health_reporter;
78
use vector_lib::{
89
EstimatedJsonEncodedSizeOf,
910
codecs::NativeDeserializerConfig,
@@ -22,7 +23,7 @@ use crate::{
2223
internal_events::{EventsReceived, StreamClosedError},
2324
proto::vector as proto,
2425
serde::bool_or_struct,
25-
sources::{Source, util::grpc::run_grpc_server},
26+
sources::{Source, util::grpc::run_grpc_server_with_routes},
2627
tls::{MaybeTlsSettings, TlsEnableableConfig},
2728
};
2829

@@ -176,7 +177,8 @@ impl SourceConfig for VectorConfig {
176177
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
177178
let log_namespace = cx.log_namespace(self.log_namespace);
178179

179-
let service = proto::Server::new(Service {
180+
// Create the custom Vector service (existing)
181+
let vector_service = proto::Server::new(Service {
180182
pipeline: cx.out,
181183
acknowledgements,
182184
log_namespace,
@@ -185,10 +187,25 @@ impl SourceConfig for VectorConfig {
185187
// Tonic added a default of 4MB in 0.9. This replaces the old behavior.
186188
.max_decoding_message_size(usize::MAX);
187189

190+
// Create the standard gRPC health service
191+
let (mut health_reporter, health_service) = health_reporter();
192+
193+
// Register the Vector service as serving in the health reporter
194+
health_reporter
195+
.set_service_status("vector.Vector", tonic_health::ServingStatus::Serving)
196+
.await;
197+
198+
// Combine both services using RoutesBuilder
199+
let mut builder = RoutesBuilder::default();
200+
builder
201+
.add_service(health_service)
202+
.add_service(vector_service);
203+
188204
let source =
189-
run_grpc_server(self.address, tls_settings, service, cx.shutdown).map_err(|error| {
190-
error!(message = "Source future failed.", %error);
191-
});
205+
run_grpc_server_with_routes(self.address, tls_settings, builder.routes(), cx.shutdown)
206+
.map_err(|error| {
207+
error!(message = "Source future failed.", %error);
208+
});
192209

193210
Ok(Box::pin(source))
194211
}
@@ -337,4 +354,91 @@ mod tests {
337354
);
338355
run_test(&config, addr).await;
339356
}
357+
358+
#[tokio::test]
359+
async fn custom_health_check_works() {
360+
use tonic::transport::Channel;
361+
362+
let (_guard, addr) = test_util::addr::next_addr();
363+
364+
let config = format!(r#"address = "{addr}""#);
365+
let source: VectorConfig = toml::from_str(&config).unwrap();
366+
367+
let (tx, _rx) = SourceSender::new_test();
368+
let server = source
369+
.build(SourceContext::new_test(tx, None))
370+
.await
371+
.unwrap();
372+
tokio::spawn(server);
373+
test_util::wait_for_tcp(addr).await;
374+
375+
// Test the custom Vector health check endpoint
376+
let endpoint = format!("http://{addr}");
377+
let channel = Channel::from_shared(endpoint)
378+
.unwrap()
379+
.connect()
380+
.await
381+
.unwrap();
382+
383+
let mut client = proto::Client::new(channel);
384+
let response = client
385+
.health_check(proto::HealthCheckRequest {})
386+
.await
387+
.unwrap();
388+
389+
assert_eq!(
390+
response.into_inner().status,
391+
proto::ServingStatus::Serving as i32
392+
);
393+
}
394+
395+
#[tokio::test]
396+
async fn standard_grpc_health_check_works() {
397+
use tonic::transport::Channel;
398+
use tonic_health::pb::{HealthCheckRequest, health_client::HealthClient};
399+
400+
let (_guard, addr) = test_util::addr::next_addr();
401+
402+
let config = format!(r#"address = "{addr}""#);
403+
let source: VectorConfig = toml::from_str(&config).unwrap();
404+
405+
let (tx, _rx) = SourceSender::new_test();
406+
let server = source
407+
.build(SourceContext::new_test(tx, None))
408+
.await
409+
.unwrap();
410+
tokio::spawn(server);
411+
test_util::wait_for_tcp(addr).await;
412+
413+
// Test the standard gRPC health check protocol
414+
let endpoint = format!("http://{addr}");
415+
let channel = Channel::from_shared(endpoint)
416+
.unwrap()
417+
.connect()
418+
.await
419+
.unwrap();
420+
421+
let mut client = HealthClient::new(channel);
422+
423+
// Check aggregate server health (empty service string)
424+
let response = client
425+
.check(HealthCheckRequest {
426+
service: String::new(),
427+
})
428+
.await
429+
.unwrap();
430+
431+
use tonic_health::pb::health_check_response::ServingStatus;
432+
assert_eq!(response.into_inner().status, ServingStatus::Serving as i32);
433+
434+
// Check the named Vector service health
435+
let response = client
436+
.check(HealthCheckRequest {
437+
service: "vector.Vector".to_string(),
438+
})
439+
.await
440+
.unwrap();
441+
442+
assert_eq!(response.into_inner().status, ServingStatus::Serving as i32);
443+
}
340444
}

0 commit comments

Comments
 (0)