Skip to content

Commit 00c7c9a

Browse files
committed
Track successful db roundtrip with vss_service_up
We add `is_healthy` to the `KvStore` API to allow callers to check whether the backing store passes a simple health check. The `vss_service_up` gauge metric now tracks `is_healthy`. Commit written with codex.
1 parent 922123e commit 00c7c9a

4 files changed

Lines changed: 93 additions & 8 deletions

File tree

rust/api/src/kv_store.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,7 @@ pub trait KvStore: Send + Sync {
3333
async fn list_key_versions(
3434
&self, user_token: String, request: ListKeyVersionsRequest,
3535
) -> Result<ListKeyVersionsResponse, VssError>;
36+
37+
/// Returns whether the backing store can complete a lightweight health-check round trip.
38+
async fn is_healthy(&self) -> bool;
3639
}

rust/api/src/kv_store_tests.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ macro_rules! define_kv_store_tests {
3737
};
3838
}
3939

40+
create_test!(is_healthy_should_return_true);
4041
create_test!(put_should_succeed_when_single_object_put_operation);
4142
create_test!(put_should_succeed_when_multi_object_put_operation);
4243
create_test!(put_should_fail_when_key_version_mismatched);
@@ -68,6 +69,13 @@ pub trait KvStoreTestSuite {
6869
/// Creates and returns a new instance of the store to be tested.
6970
async fn create_store() -> Self::Store;
7071

72+
async fn is_healthy_should_return_true() -> Result<(), VssError> {
73+
let kv_store = Self::create_store().await;
74+
assert!(kv_store.is_healthy().await);
75+
76+
Ok(())
77+
}
78+
7179
async fn put_should_succeed_when_single_object_put_operation() -> Result<(), VssError> {
7280
let kv_store = Self::create_store().await;
7381
let ctx = TestContext::new(&kv_store);

rust/impls/src/postgres_store.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const KEY_COLUMN: &str = "key";
3434
const VALUE_COLUMN: &str = "value";
3535
const VERSION_COLUMN: &str = "version";
3636
const SORT_ORDER_COLUMN: &str = "sort_order";
37+
const HEALTH_CHECK_STMT: &str = "SELECT 1";
3738

3839
/// Page token is the `sort_order` value of the last item in the previous page,
3940
/// encoded as a decimal string.
@@ -739,6 +740,24 @@ where
739740

740741
Ok(ListKeyVersionsResponse { key_versions, next_page_token, global_version })
741742
}
743+
744+
async fn is_healthy(&self) -> bool {
745+
let conn = match self.pool.get().await {
746+
Ok(conn) => conn,
747+
Err(e) => {
748+
debug!("Postgres health check failed while getting connection: {}", e);
749+
return false;
750+
},
751+
};
752+
753+
match conn.query_one(HEALTH_CHECK_STMT, &[]).await {
754+
Ok(row) => row.get::<_, i32>(0) == 1,
755+
Err(e) => {
756+
debug!("Postgres health check query failed: {}", e);
757+
false
758+
},
759+
}
760+
}
742761
}
743762

744763
#[cfg(test)]

rust/server/src/vss_service.rs

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use api::types::{
1717
use std::future::Future;
1818
use std::pin::Pin;
1919
use std::sync::Arc;
20+
use std::time::Duration;
2021

2122
use log::{debug, trace};
2223

@@ -25,6 +26,7 @@ use crate::util::KeyValueVecKeyPrinter;
2526
const MAXIMUM_REQUEST_BODY_SIZE: usize = 1024 * 1024 * 1024;
2627
const PROTOCOL_VERSION_HEADER: &str = "vss-protocol-version";
2728
const PROTOCOL_VERSION: &str = "0";
29+
const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(2);
2830

2931
#[derive(Clone, Copy)]
3032
pub(crate) struct VssServiceConfig {
@@ -80,14 +82,8 @@ impl Service<Request<Incoming>> for VssService {
8082

8183
Box::pin(async move {
8284
if path == "/metrics" {
83-
let response = b"# HELP vss_service_up Is the vss service up?\n# TYPE vss_service_up gauge\nvss_service_up 1\n";
84-
return Ok(Response::builder()
85-
.status(StatusCode::OK)
86-
.header("Content-Type", "text/plain; version=0.0.4")
87-
.header(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION.as_bytes())
88-
.body(Full::new(Bytes::from_static(response)))
89-
// unwrap safety: body only errors when previous chained calls failed.
90-
.unwrap());
85+
let is_healthy = check_store_health(&store).await;
86+
return Ok(build_metrics_response(is_healthy));
9187
}
9288

9389
let prefix_stripped_path = path.strip_prefix(BASE_PATH_PREFIX).unwrap_or_default();
@@ -257,6 +253,33 @@ fn build_response(status_code: StatusCode, body: Bytes) -> Response<Full<Bytes>>
257253
.unwrap()
258254
}
259255

256+
async fn check_store_health(store: &Arc<dyn KvStore>) -> bool {
257+
match tokio::time::timeout(HEALTH_CHECK_TIMEOUT, store.is_healthy()).await {
258+
Ok(is_healthy) => is_healthy,
259+
Err(_) => false,
260+
}
261+
}
262+
263+
fn build_metrics_response(is_healthy: bool) -> Response<Full<Bytes>> {
264+
let response = if is_healthy {
265+
Bytes::from_static(
266+
b"# HELP vss_service_up Is the vss service up?\n# TYPE vss_service_up gauge\nvss_service_up 1\n",
267+
)
268+
} else {
269+
Bytes::from_static(
270+
b"# HELP vss_service_up Is the vss service up?\n# TYPE vss_service_up gauge\nvss_service_up 0\n",
271+
)
272+
};
273+
274+
Response::builder()
275+
.status(StatusCode::OK)
276+
.header("Content-Type", "text/plain; version=0.0.4")
277+
.header(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION.as_bytes())
278+
.body(Full::new(response))
279+
// unwrap safety: body only errors when previous chained calls failed.
280+
.unwrap()
281+
}
282+
260283
fn build_error_response(e: VssError) -> Response<Full<Bytes>> {
261284
let (status_code, error_response) = match e {
262285
VssError::NoSuchKeyError(msg) => {
@@ -326,4 +349,36 @@ mod tests {
326349
PROTOCOL_VERSION,
327350
);
328351
}
352+
353+
#[tokio::test]
354+
async fn build_metrics_response_reports_healthy_store() {
355+
let response = build_metrics_response(true);
356+
assert_eq!(response.status(), StatusCode::OK);
357+
assert_eq!(
358+
response.headers().get(PROTOCOL_VERSION_HEADER).unwrap().to_str().unwrap(),
359+
PROTOCOL_VERSION,
360+
);
361+
362+
let body = response.into_body().collect().await.unwrap().to_bytes();
363+
assert_eq!(
364+
body.as_ref(),
365+
b"# HELP vss_service_up Is the vss service up?\n# TYPE vss_service_up gauge\nvss_service_up 1\n",
366+
);
367+
}
368+
369+
#[tokio::test]
370+
async fn build_metrics_response_reports_unhealthy_store() {
371+
let response = build_metrics_response(false);
372+
assert_eq!(response.status(), StatusCode::OK);
373+
assert_eq!(
374+
response.headers().get(PROTOCOL_VERSION_HEADER).unwrap().to_str().unwrap(),
375+
PROTOCOL_VERSION,
376+
);
377+
378+
let body = response.into_body().collect().await.unwrap().to_bytes();
379+
assert_eq!(
380+
body.as_ref(),
381+
b"# HELP vss_service_up Is the vss service up?\n# TYPE vss_service_up gauge\nvss_service_up 0\n",
382+
);
383+
}
329384
}

0 commit comments

Comments
 (0)