Skip to content

Commit f7a4037

Browse files
avi-starkwareclaude
andcommitted
starknet_transaction_prover: /health returns 503 when service is saturated
Adds `SaturationMonitor` (shared by `ProvingRpcServerImpl` and `HealthLayer`) that tracks whether the concurrency semaphore has been continuously rejecting proving requests. Once that has held for the configured window (`health_max_saturated_ms`, default 10s), `/health` returns 503 with an opaque body so load balancers can drain the pod before in-flight requests start failing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9e0ab62 commit f7a4037

15 files changed

Lines changed: 468 additions & 39 deletions

crates/starknet_transaction_prover/src/main.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ fn main() {
1111
async fn main() -> anyhow::Result<()> {
1212
use std::net::SocketAddr;
1313
use std::sync::Arc;
14+
use std::time::Duration;
1415

1516
use anyhow::Context;
1617
use clap::Parser;
@@ -21,11 +22,13 @@ async fn main() -> anyhow::Result<()> {
2122
TransportMode,
2223
};
2324
use starknet_transaction_prover::server::cors::{build_cors_layer, cors_mode};
25+
use starknet_transaction_prover::server::health::HealthLayer;
2426
use starknet_transaction_prover::server::log_redact::redact_url_host;
2527
use starknet_transaction_prover::server::metrics::install_exporter;
2628
use starknet_transaction_prover::server::panic::install_panic_hook;
2729
use starknet_transaction_prover::server::rpc_api::ProvingRpcServer;
2830
use starknet_transaction_prover::server::rpc_impl::ProvingRpcServerImpl;
31+
use starknet_transaction_prover::server::saturation::SaturationMonitor;
2932
use starknet_transaction_prover::server::shutdown::spawn_signal_bridge;
3033
use starknet_transaction_prover::server::{
3134
start_server,
@@ -77,8 +80,16 @@ async fn main() -> anyhow::Result<()> {
7780
"Starting Starknet transaction prover."
7881
);
7982

83+
// Shared saturation tracker — writer is `ProvingRpcServerImpl` on every
84+
// permit acquire/reject; reader is `HealthLayer`. See `saturation.rs`.
85+
let saturation_monitor = SaturationMonitor::default();
86+
let health_layer = HealthLayer::new(
87+
saturation_monitor.clone(),
88+
Duration::from_millis(config.health_max_saturated_ms),
89+
);
90+
8091
// Build and start the JSON-RPC server.
81-
let rpc_impl = ProvingRpcServerImpl::from_config(&config);
92+
let rpc_impl = ProvingRpcServerImpl::from_config(&config, saturation_monitor);
8293
let addr = SocketAddr::new(config.ip, config.port);
8394
let cors_layer = build_cors_layer(&config.cors_allow_origin)?;
8495

@@ -110,6 +121,7 @@ async fn main() -> anyhow::Result<()> {
110121
cors_layer,
111122
ohttp_layer,
112123
metrics_layer,
124+
health_layer,
113125
)
114126
.await?;
115127

crates/starknet_transaction_prover/src/server.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full<Bytes>) -> HttpBody = HttpBody::
5353
/// - `RequestSpanLayer` sits BELOW `OhttpLayer` so it spans the decapsulated inner request with a
5454
/// fresh, envelope-unlinkable id (see `request_span`).
5555
macro_rules! prover_http_middleware {
56-
($metrics_layer:expr, $cors_layer:expr, $ohttp_layer:expr $(,)?) => {
56+
($health_layer:expr, $metrics_layer:expr, $cors_layer:expr, $ohttp_layer:expr $(,)?) => {
5757
ServiceBuilder::new()
5858
.layer(RequestLogLayer)
59-
.layer(HealthLayer)
59+
.layer($health_layer)
6060
.option_layer($metrics_layer)
6161
.layer(HttpMetricsLayer)
6262
.option_layer($cors_layer)
@@ -84,6 +84,7 @@ pub mod request_log;
8484
pub mod request_span;
8585
pub mod rpc_api;
8686
pub mod rpc_impl;
87+
pub mod saturation;
8788
pub mod shutdown;
8889
#[cfg(test)]
8990
pub mod test_recorder;
@@ -94,6 +95,7 @@ pub use http_metrics::HttpMetricsLayer;
9495
pub use metrics::{MetricsLayer, METRICS_PATH};
9596
pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER};
9697
pub use request_span::RequestSpanLayer;
98+
pub use saturation::SaturationMonitor;
9799

98100
#[cfg(test)]
99101
mod rpc_spec_test;
@@ -116,6 +118,7 @@ pub async fn start_server(
116118
cors_layer: Option<CorsLayer>,
117119
ohttp_layer: Option<OhttpJsonrpseeLayer>,
118120
metrics_layer: Option<MetricsLayer>,
121+
health_layer: HealthLayer,
119122
) -> anyhow::Result<(SocketAddr, ServerHandle)> {
120123
match transport {
121124
TransportMode::Http => {
@@ -126,7 +129,12 @@ pub async fn start_server(
126129
let server = ServerBuilder::default()
127130
.set_config(server_config)
128131
// See `prover_http_middleware!` for the full layer-order rationale.
129-
.set_http_middleware(prover_http_middleware!(metrics_layer, cors_layer, ohttp_layer))
132+
.set_http_middleware(prover_http_middleware!(
133+
health_layer,
134+
metrics_layer,
135+
cors_layer,
136+
ohttp_layer
137+
))
130138
.build(&addr)
131139
.await
132140
.context(format!("Failed to bind JSON-RPC server to {addr}"))?;
@@ -145,6 +153,7 @@ pub async fn start_server(
145153
cors_layer,
146154
ohttp_layer,
147155
metrics_layer,
156+
health_layer,
148157
)
149158
.await
150159
}

crates/starknet_transaction_prover/src/server/config.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ const DEFAULT_COMPILED_CLASS_CACHE_SIZE: usize = 600;
3636
/// 5 MiB — matches the convention used elsewhere in the sequencer.
3737
pub(crate) const DEFAULT_MAX_REQUEST_BODY_SIZE: u32 = 5 * 1024 * 1024;
3838
const DEFAULT_OHTTP_KEY_CACHE_MAX_AGE_SECS: u64 = 3600;
39+
/// Default saturation window before `/health` returns 503. 10 seconds
40+
/// matches "service is rejecting requests for a sustained period" without
41+
/// flipping on a single in-flight burst.
42+
const DEFAULT_HEALTH_MAX_SATURATED_MS: u64 = 10_000;
3943

4044
/// Transport mode for the JSON-RPC server.
4145
#[derive(Clone, Debug)]
@@ -103,6 +107,7 @@ struct RawServiceConfig {
103107
max_request_body_size: u32,
104108
ohttp_enabled: bool,
105109
ohttp_key_cache_max_age_secs: u64,
110+
health_max_saturated_ms: u64,
106111
}
107112

108113
impl Default for RawServiceConfig {
@@ -130,6 +135,7 @@ impl Default for RawServiceConfig {
130135
max_request_body_size: DEFAULT_MAX_REQUEST_BODY_SIZE,
131136
ohttp_enabled: false,
132137
ohttp_key_cache_max_age_secs: DEFAULT_OHTTP_KEY_CACHE_MAX_AGE_SECS,
138+
health_max_saturated_ms: DEFAULT_HEALTH_MAX_SATURATED_MS,
133139
}
134140
}
135141
}
@@ -167,6 +173,11 @@ pub struct ServiceConfig {
167173
pub ohttp_enabled: bool,
168174
/// Cache-Control max-age for the `GET /ohttp-keys` response (seconds).
169175
pub ohttp_key_cache_max_age_secs: u64,
176+
/// Saturation window (milliseconds) before `/health` flips to 503. The
177+
/// service is "saturated" when it has been continuously rejecting
178+
/// proving requests due to the concurrency limit; once that has held
179+
/// for this many milliseconds, load balancers should drain the pod.
180+
pub health_max_saturated_ms: u64,
170181
}
171182

172183
/// Applies an optional CLI override to a config field, logging `old -> new` when it changes.
@@ -370,6 +381,11 @@ impl ServiceConfig {
370381
&mut config.ohttp_key_cache_max_age_secs,
371382
args.ohttp_key_cache_max_age_secs,
372383
);
384+
override_field(
385+
"health_max_saturated_ms",
386+
&mut config.health_max_saturated_ms,
387+
args.health_max_saturated_ms,
388+
);
373389

374390
// Validate required fields.
375391
if config.rpc_node_url.is_empty() {
@@ -466,6 +482,7 @@ impl ServiceConfig {
466482
max_request_body_size: config.max_request_body_size,
467483
ohttp_enabled: config.ohttp_enabled,
468484
ohttp_key_cache_max_age_secs: config.ohttp_key_cache_max_age_secs,
485+
health_max_saturated_ms: config.health_max_saturated_ms,
469486
})
470487
}
471488
}
@@ -584,6 +601,14 @@ pub struct CliArgs {
584601
#[arg(long, value_enum, value_name = "FORMAT", env = "LOG_FORMAT", default_value_t = LogFormat::Text)]
585602
pub log_format: LogFormat,
586603

604+
/// Saturation window (milliseconds) before `/health` returns 503
605+
/// (default: 10000). The service is "saturated" when it has been
606+
/// continuously rejecting proving requests due to the concurrency
607+
/// limit; once that has held for this many milliseconds, load
608+
/// balancers should drain the pod.
609+
#[arg(long, value_name = "MILLIS", env = "HEALTH_MAX_SATURATED_MS")]
610+
pub health_max_saturated_ms: Option<u64>,
611+
587612
/// Hidden escape hatch: override the embedded bouncer config (block capacity limits) with a
588613
/// custom JSON file. Not advertised because the embedded defaults are tuned for this prover
589614
/// (including high `l1_gas` / `message_segment_length`: virtual OS output is not L1-bound; it

crates/starknet_transaction_prover/src/server/config_test.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ fn base_args() -> CliArgs {
6262
ohttp_enabled: false,
6363
ohttp_key_cache_max_age_secs: None,
6464
log_format: LogFormat::Text,
65+
health_max_saturated_ms: None,
6566
}
6667
}
6768

@@ -153,6 +154,7 @@ fn cors_allow_origin_rejects_non_array_in_config_file() {
153154
ohttp_enabled: false,
154155
ohttp_key_cache_max_age_secs: None,
155156
log_format: LogFormat::Text,
157+
health_max_saturated_ms: None,
156158
};
157159

158160
let error = ServiceConfig::from_args(args).unwrap_err();

crates/starknet_transaction_prover/src/server/health.rs

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
//! HTTP `/health` endpoint as a tower middleware layer.
22
//!
33
//! Short-circuits `GET /health` before the jsonrpsee service sees the request
4-
//! (which would 405 a GET). Any other request passes through unchanged.
4+
//! (which would 405 a GET). Returns 503 once its `SaturationMonitor` reports the
5+
//! service has been continuously rejecting requests for the configured threshold,
6+
//! so load balancers can drain the pod. Body is opaque (no timestamps, counters,
7+
//! or upstream URLs).
58
69
use std::task::{Context, Poll};
10+
use std::time::Duration;
711

812
use bytes::Bytes;
913
use futures::future::{ready, Either, Ready};
@@ -12,28 +16,66 @@ use http_body_util::Full;
1216
use jsonrpsee::server::HttpBody;
1317
use tower::{Layer, Service};
1418

19+
use crate::server::saturation::SaturationMonitor;
20+
1521
#[cfg(test)]
1622
#[path = "health_test.rs"]
1723
mod health_test;
1824

1925
pub const HEALTH_PATH: &str = "/health";
2026

2127
const HEALTHY_BODY: &[u8] = br#"{"status":"ok"}"#;
28+
/// Body returned by `GET /health` when saturated. Reason is an opaque code,
29+
/// no internal state included.
30+
const SATURATED_BODY: &[u8] = br#"{"status":"unhealthy","reason":"saturated"}"#;
2231

23-
#[derive(Clone, Copy, Default)]
24-
pub struct HealthLayer;
32+
/// Returns `503` once `saturation.saturated_for_at_least` crosses
33+
/// `saturation_threshold`, and `200` otherwise. Tests that only need the healthy
34+
/// path pass a fresh `SaturationMonitor::default()`, which never reports saturated.
35+
#[derive(Clone)]
36+
pub struct HealthLayer {
37+
saturation: SaturationMonitor,
38+
saturation_threshold: Duration,
39+
}
40+
41+
impl HealthLayer {
42+
pub fn new(monitor: SaturationMonitor, threshold: Duration) -> Self {
43+
Self { saturation: monitor, saturation_threshold: threshold }
44+
}
45+
}
2546

2647
impl<S> Layer<S> for HealthLayer {
2748
type Service = HealthService<S>;
2849

2950
fn layer(&self, inner: S) -> Self::Service {
30-
HealthService { inner }
51+
HealthService {
52+
inner,
53+
saturation: self.saturation.clone(),
54+
saturation_threshold: self.saturation_threshold,
55+
}
3156
}
3257
}
3358

3459
#[derive(Clone)]
3560
pub struct HealthService<S> {
3661
inner: S,
62+
saturation: SaturationMonitor,
63+
saturation_threshold: Duration,
64+
}
65+
66+
impl<S> HealthService<S> {
67+
fn health_response(&self) -> Response<HttpBody> {
68+
let (status, body) = if self.saturation.saturated_for_at_least(self.saturation_threshold) {
69+
(StatusCode::SERVICE_UNAVAILABLE, SATURATED_BODY)
70+
} else {
71+
(StatusCode::OK, HEALTHY_BODY)
72+
};
73+
Response::builder()
74+
.status(status)
75+
.header(header::CONTENT_TYPE, "application/json")
76+
.body(HttpBody::new(Full::new(Bytes::from_static(body))))
77+
.expect("response build with a static body is infallible")
78+
}
3779
}
3880

3981
impl<S, ReqB> Service<Request<ReqB>> for HealthService<S>
@@ -52,12 +94,7 @@ where
5294

5395
fn call(&mut self, request: Request<ReqB>) -> Self::Future {
5496
if request.method() == Method::GET && request.uri().path() == HEALTH_PATH {
55-
let response = Response::builder()
56-
.status(StatusCode::OK)
57-
.header(header::CONTENT_TYPE, "application/json")
58-
.body(HttpBody::new(Full::new(Bytes::from_static(HEALTHY_BODY))))
59-
.expect("response build with a static body is infallible");
60-
return Either::Left(ready(Ok(response)));
97+
return Either::Left(ready(Ok(self.health_response())));
6198
}
6299
Either::Right(self.inner.call(request))
63100
}

crates/starknet_transaction_prover/src/server/health_test.rs

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
use std::time::Duration;
2+
13
use bytes::Bytes;
24
use http::{Method, Request, Response, StatusCode};
35
use http_body_util::{BodyExt, Full};
46
use jsonrpsee::server::HttpBody;
57
use tower::{Layer, ServiceExt};
68

79
use crate::server::health::{HealthLayer, HEALTH_PATH};
10+
use crate::server::saturation::SaturationMonitor;
811

912
/// Inner stub returning 418 so we can tell whether `HealthLayer` short-circuited.
1013
fn fallthrough_service() -> impl tower::Service<
@@ -38,7 +41,8 @@ async fn read_body(response: Response<HttpBody>) -> (StatusCode, Vec<u8>, http::
3841

3942
#[tokio::test]
4043
async fn get_health_returns_200_with_json_body() {
41-
let svc = HealthLayer.layer(fallthrough_service());
44+
let svc = HealthLayer::new(SaturationMonitor::default(), Duration::from_millis(0))
45+
.layer(fallthrough_service());
4246

4347
let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();
4448

@@ -50,7 +54,8 @@ async fn get_health_returns_200_with_json_body() {
5054

5155
#[tokio::test]
5256
async fn non_get_health_falls_through() {
53-
let svc = HealthLayer.layer(fallthrough_service());
57+
let svc = HealthLayer::new(SaturationMonitor::default(), Duration::from_millis(0))
58+
.layer(fallthrough_service());
5459

5560
let response = svc.oneshot(empty_request(Method::POST, HEALTH_PATH)).await.unwrap();
5661

@@ -60,10 +65,55 @@ async fn non_get_health_falls_through() {
6065

6166
#[tokio::test]
6267
async fn get_other_path_falls_through() {
63-
let svc = HealthLayer.layer(fallthrough_service());
68+
let svc = HealthLayer::new(SaturationMonitor::default(), Duration::from_millis(0))
69+
.layer(fallthrough_service());
6470

6571
let response = svc.oneshot(empty_request(Method::GET, "/")).await.unwrap();
6672

6773
let (status, _body, _) = read_body(response).await;
6874
assert_eq!(status, StatusCode::IM_A_TEAPOT);
6975
}
76+
77+
#[tokio::test]
78+
async fn unsaturated_health_returns_200_when_monitor_is_supplied() {
79+
let monitor = SaturationMonitor::default();
80+
let svc = HealthLayer::new(monitor, Duration::from_millis(0)).layer(fallthrough_service());
81+
82+
let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();
83+
84+
let (status, body, _) = read_body(response).await;
85+
assert_eq!(status, StatusCode::OK);
86+
assert_eq!(body, br#"{"status":"ok"}"#);
87+
}
88+
89+
#[tokio::test]
90+
async fn saturated_for_at_least_threshold_returns_503_with_opaque_body() {
91+
let monitor = SaturationMonitor::default();
92+
monitor.mark_rejected();
93+
// Zero threshold so the saturation is immediately past it. Avoids
94+
// sleeping in the test.
95+
let svc = HealthLayer::new(monitor, Duration::from_millis(0)).layer(fallthrough_service());
96+
97+
let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();
98+
99+
let (status, body, _) = read_body(response).await;
100+
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
101+
let body_text = std::str::from_utf8(&body).unwrap();
102+
assert!(body_text.contains("saturated"));
103+
// No internal state — no timestamps, no permits, no upstream URLs.
104+
assert!(!body_text.contains("Instant"));
105+
assert!(!body_text.chars().any(|c| c.is_ascii_digit()), "body had digits: {body_text}");
106+
}
107+
108+
#[tokio::test]
109+
async fn recovery_clears_saturation_and_health_returns_to_200() {
110+
let monitor = SaturationMonitor::default();
111+
monitor.mark_rejected();
112+
monitor.mark_accepted();
113+
let svc = HealthLayer::new(monitor, Duration::from_millis(0)).layer(fallthrough_service());
114+
115+
let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();
116+
let (status, body, _) = read_body(response).await;
117+
assert_eq!(status, StatusCode::OK);
118+
assert_eq!(body, br#"{"status":"ok"}"#);
119+
}

0 commit comments

Comments
 (0)