Skip to content

Commit c6574fd

Browse files
prontclaudethomasqueirozb
authored
enhancement(api)!: replace custom Health RPC with standard gRPC health service (#25139)
* enhancement(api)!: replace custom Health RPC with standard gRPC health service Replace the custom `ObservabilityService/Health` RPC with the standard `grpc.health.v1.Health` service on the observability API (port 8686). This enables native Kubernetes gRPC health probes, `grpc-health-probe`, and other standard tooling to work out of the box. The empty service name (`""`) is used for whole-server health, matching what Kubernetes probes and `grpc-health-probe` query by default. Key changes: - Remove Health RPC, HealthRequest, HealthResponse from the proto - Add tonic_health standard health service to the gRPC server - Switch vector-api-client to use tonic_health HealthClient; the client checks ServingStatus and returns NotServing error if not SERVING - Flip health to NOT_SERVING in TopologyController::stop before draining the topology, so Kubernetes removes the pod from endpoints early - Update docs, changelog, and add integration test Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix gate * cleanup * Update src/topology/controller.rs Co-authored-by: Thomas <thomas.schneider@datadoghq.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Thomas <thomas.schneider@datadoghq.com>
1 parent 43f6204 commit c6574fd

16 files changed

Lines changed: 119 additions & 77 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ docker = ["dep:bollard", "dep:dirs-next"]
561561
api = [
562562
"dep:base64",
563563
"dep:tonic",
564+
"dep:tonic-health",
564565
"dep:tonic-reflection",
565566
"dep:prost",
566567
"dep:prost-types",

changelog.d/graphql_to_grpc_api.breaking.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ vector top --url http://localhost:8686
2727
[grpcurl](https://github.com/fullstorydev/grpcurl):
2828

2929
```bash
30-
# Check health
31-
grpcurl -plaintext localhost:8686 vector.observability.v1.ObservabilityService/Health
30+
# Check health (standard gRPC health check, compatible with Kubernetes gRPC probes)
31+
grpcurl -plaintext localhost:8686 grpc.health.v1.Health/Check
3232

3333
# List components
3434
grpcurl -plaintext localhost:8686 vector.observability.v1.ObservabilityService/GetComponents

lib/vector-api-client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ license = "MPL-2.0"
99
[dependencies]
1010
# gRPC
1111
tonic.workspace = true
12+
tonic-health.workspace = true
1213
prost.workspace = true
1314
prost-types.workspace = true
1415

lib/vector-api-client/src/client.rs

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,28 @@
11
use http::Uri;
22
use tokio_stream::{Stream, StreamExt};
33
use tonic::transport::{Channel, Endpoint};
4+
use tonic_health::pb::{
5+
HealthCheckRequest, health_check_response::ServingStatus, health_client::HealthClient,
6+
};
47

58
use crate::{
69
error::{Error, Result},
710
proto::{
811
GetAllocationTracingStatusRequest, GetAllocationTracingStatusResponse,
9-
GetComponentsRequest, GetComponentsResponse, GetMetaRequest, GetMetaResponse,
10-
HealthRequest, HealthResponse, MetricName, StreamComponentAllocatedBytesRequest,
11-
StreamComponentAllocatedBytesResponse, StreamComponentMetricsRequest,
12-
StreamComponentMetricsResponse, StreamHeartbeatRequest, StreamHeartbeatResponse,
13-
StreamOutputEventsRequest, StreamOutputEventsResponse, StreamUptimeRequest,
14-
StreamUptimeResponse, observability_service_client::ObservabilityServiceClient,
12+
GetComponentsRequest, GetComponentsResponse, GetMetaRequest, GetMetaResponse, MetricName,
13+
StreamComponentAllocatedBytesRequest, StreamComponentAllocatedBytesResponse,
14+
StreamComponentMetricsRequest, StreamComponentMetricsResponse, StreamHeartbeatRequest,
15+
StreamHeartbeatResponse, StreamOutputEventsRequest, StreamOutputEventsResponse,
16+
StreamUptimeRequest, StreamUptimeResponse,
17+
observability_service_client::ObservabilityServiceClient,
1518
},
1619
};
1720

1821
/// gRPC client for the Vector observability API
1922
#[derive(Debug, Clone)]
2023
pub struct Client {
2124
endpoint: Endpoint,
25+
channel: Option<Channel>,
2226
client: Option<ObservabilityServiceClient<Channel>>,
2327
}
2428

@@ -33,14 +37,16 @@ impl Client {
3337
pub fn new(uri: Uri) -> Self {
3438
Self {
3539
endpoint: Endpoint::from(uri),
40+
channel: None,
3641
client: None,
3742
}
3843
}
3944

4045
/// Connect to the gRPC server
4146
pub async fn connect(&mut self) -> Result<()> {
4247
let channel = self.endpoint.connect().await?;
43-
self.client = Some(ObservabilityServiceClient::new(channel));
48+
self.client = Some(ObservabilityServiceClient::new(channel.clone()));
49+
self.channel = Some(channel);
4450
Ok(())
4551
}
4652

@@ -49,13 +55,34 @@ impl Client {
4955
self.client.as_mut().ok_or(Error::NotConnected)
5056
}
5157

58+
/// Get the underlying channel
59+
fn channel(&self) -> Result<&Channel> {
60+
self.channel.as_ref().ok_or(Error::NotConnected)
61+
}
62+
5263
// ========== Unary RPCs ==========
5364

54-
/// Check if the API server is healthy
55-
pub async fn health(&mut self) -> Result<HealthResponse> {
56-
let client = self.ensure_connected()?;
57-
let response = client.health(HealthRequest {}).await?;
58-
Ok(response.into_inner())
65+
/// Check if the API server is healthy using the standard gRPC health check
66+
/// protocol (grpc.health.v1.Health/Check).
67+
///
68+
/// Queries the empty service name (`""`), which represents whole-server
69+
/// health. This is the default used by Kubernetes gRPC probes and
70+
/// `grpc-health-probe`.
71+
///
72+
/// Returns `Ok(())` if the server is `SERVING`, or an error otherwise.
73+
pub async fn health(&mut self) -> Result<()> {
74+
let channel = self.channel()?.clone();
75+
let mut health_client = HealthClient::new(channel);
76+
let response = health_client
77+
.check(HealthCheckRequest {
78+
service: String::new(),
79+
})
80+
.await?;
81+
let status = response.into_inner().status;
82+
if status != ServingStatus::Serving as i32 {
83+
return Err(Error::NotServing { status });
84+
}
85+
Ok(())
5986
}
6087

6188
/// Get metadata about the Vector instance

lib/vector-api-client/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ pub enum Error {
1616
#[snafu(display("Not connected to gRPC server"))]
1717
NotConnected,
1818

19+
#[snafu(display("Server is not serving (status: {})", status))]
20+
NotServing { status: i32 },
21+
1922
#[snafu(display("Stream error: {}", message))]
2023
Stream { message: String },
2124
}

lib/vector-api-client/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
//! let mut client = Client::new("http://localhost:9999".parse().unwrap());
1212
//! client.connect().await?;
1313
//!
14-
//! // Check health
15-
//! let health = client.health().await?;
16-
//! println!("Healthy: {}", health.healthy);
14+
//! // Check health (standard gRPC health check)
15+
//! client.health().await?;
16+
//! println!("Server is healthy");
1717
//!
1818
//! // Get components
1919
//! let components = client.get_components(0).await?;

proto/vector/observability.proto

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ import "event.proto";
88
service ObservabilityService {
99
// ========== Simple Queries ==========
1010

11-
// Check if the API server is healthy and responding
12-
rpc Health(HealthRequest) returns (HealthResponse);
13-
1411
// Get metadata about the Vector instance (version, hostname)
1512
rpc GetMeta(GetMetaRequest) returns (GetMetaResponse);
1613

@@ -42,13 +39,7 @@ service ObservabilityService {
4239
rpc StreamOutputEvents(StreamOutputEventsRequest) returns (stream StreamOutputEventsResponse);
4340
}
4441

45-
// ========== Health & Meta Messages ==========
46-
47-
message HealthRequest {}
48-
49-
message HealthResponse {
50-
bool healthy = 1;
51-
}
42+
// ========== Meta Messages ==========
5243

5344
message GetMetaRequest {}
5445

src/api/grpc/service.rs

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@ use std::pin::Pin;
44
// (only used in synchronous map updates inside IntervalStream closures), so the
55
// cheaper std mutex is correct here. tokio::sync::Mutex is only needed when the
66
// critical section itself contains .await.
7-
use std::sync::{
8-
Arc, Mutex,
9-
atomic::{AtomicBool, Ordering},
10-
};
7+
use std::sync::{Arc, Mutex};
118
use std::time::Duration;
129

1310
use futures::{StreamExt as FuturesStreamExt, stream};
@@ -399,30 +396,18 @@ fn ports_to_proto_outputs(
399396
/// gRPC observability service implementation.
400397
pub struct ObservabilityService {
401398
watch_rx: WatchRx,
402-
running: Arc<AtomicBool>,
403399
}
404400

405401
impl ObservabilityService {
406-
pub const fn new(watch_rx: WatchRx, running: Arc<AtomicBool>) -> Self {
407-
Self { watch_rx, running }
402+
pub const fn new(watch_rx: WatchRx) -> Self {
403+
Self { watch_rx }
408404
}
409405
}
410406

411407
#[tonic::async_trait]
412408
impl observability::Service for ObservabilityService {
413409
// ========== Simple Queries ==========
414410

415-
async fn health(
416-
&self,
417-
_request: Request<HealthRequest>,
418-
) -> Result<Response<HealthResponse>, Status> {
419-
if self.running.load(Ordering::Relaxed) {
420-
Ok(Response::new(HealthResponse { healthy: true }))
421-
} else {
422-
Err(Status::unavailable("Vector is shutting down"))
423-
}
424-
}
425-
426411
async fn get_meta(
427412
&self,
428413
_request: Request<GetMetaRequest>,

src/api/grpc_server.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
use std::{
2-
error::Error as StdError,
3-
net::SocketAddr,
4-
sync::{Arc, atomic::AtomicBool},
5-
};
1+
use std::{error::Error as StdError, net::SocketAddr};
62
use tokio::sync::oneshot;
73
use tokio_stream::wrappers::TcpListenerStream;
84
use tonic::transport::Server as TonicServer;
5+
use tonic_health::server::{HealthReporter, health_reporter};
96
use vector_lib::tap::topology::WatchRx;
107

118
use super::grpc::ObservabilityService;
@@ -14,6 +11,7 @@ use crate::{config::Config, proto::observability::Server as ObservabilityServer}
1411
/// gRPC API server for Vector observability.
1512
pub struct GrpcServer {
1613
_shutdown: oneshot::Sender<()>,
14+
health_reporter: HealthReporter,
1715
addr: SocketAddr,
1816
}
1917

@@ -25,11 +23,7 @@ impl GrpcServer {
2523
/// is dropped.
2624
///
2725
/// Returns an error if the server fails to bind to the configured address.
28-
pub async fn start(
29-
config: &Config,
30-
watch_rx: WatchRx,
31-
running: Arc<AtomicBool>,
32-
) -> crate::Result<Self> {
26+
pub async fn start(config: &Config, watch_rx: WatchRx) -> crate::Result<Self> {
3327
let addr = config.api.address.ok_or_else(|| {
3428
crate::Error::from("API address not configured in config.api.address")
3529
})?;
@@ -46,7 +40,11 @@ impl GrpcServer {
4640

4741
info!("GRPC API server bound to {}.", actual_addr);
4842

49-
let service = ObservabilityService::new(watch_rx, running);
43+
let service = ObservabilityService::new(watch_rx);
44+
45+
// Create the standard gRPC health service (grpc.health.v1.Health).
46+
// The empty service ("") is registered as SERVING by default.
47+
let (health_reporter, health_service) = health_reporter();
5048

5149
let (_shutdown, rx) = oneshot::channel();
5250

@@ -59,10 +57,12 @@ impl GrpcServer {
5957
.register_encoded_file_descriptor_set(
6058
crate::proto::observability::FILE_DESCRIPTOR_SET,
6159
)
60+
.register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
6261
.build()
6362
.expect("Failed to build reflection service");
6463

6564
let result = TonicServer::builder()
65+
.add_service(health_service)
6666
.add_service(ObservabilityServer::new(service))
6767
.add_service(reflection_service)
6868
.serve_with_incoming_shutdown(incoming, async {
@@ -85,10 +85,22 @@ impl GrpcServer {
8585

8686
Ok(Self {
8787
_shutdown,
88+
health_reporter,
8889
addr: actual_addr,
8990
})
9091
}
9192

93+
/// Signal that the server is no longer serving.
94+
///
95+
/// Call this **before** draining the topology so that Kubernetes gRPC
96+
/// readiness probes fail early and the pod is removed from endpoints
97+
/// before the process exits.
98+
pub async fn set_not_serving(&mut self) {
99+
self.health_reporter
100+
.set_service_status("", tonic_health::ServingStatus::NotServing)
101+
.await;
102+
}
103+
92104
/// Get the address the server is listening on
93105
pub const fn addr(&self) -> SocketAddr {
94106
self.addr

0 commit comments

Comments
 (0)