Skip to content

Commit 360d240

Browse files
antiguruclaude
andauthored
environmentd: make webhook max request size configurable (#37277)
### Motivation The webhook source request body limit was a compile-time constant (5 MiB) applied to all HTTP routes, so operators could not adjust it without a rebuild. ### Description Adds the `webhook_max_request_size_bytes` dyncfg (default 5 MiB, preserving prior behavior). It is read from the live shared persist `ConfigSet`, so there is no coordinator round-trip on the webhook hot path. The webhook route disables the global `DefaultBodyLimit` and the handler enforces the cap per request on the decompressed body via `to_bytes`: size-exceeded returns 413, other body-read errors return 500. Enforcement is scoped to the webhook route only; SQL and other HTTP routes keep the static 5 MiB limit. ### Verification New integration test `webhook_max_request_size` in `src/environmentd/tests/server.rs`: the 5 MiB default rejects a 6 MiB body, raising the dyncfg accepts it, and lowering it below the default re-rejects. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 4ea428c commit 360d240

10 files changed

Lines changed: 155 additions & 1 deletion

File tree

misc/python/materialize/mzcompose/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,7 @@ def get_default_system_parameters(
679679
"enable_mcp_developer_query_tool",
680680
"mcp_max_response_size",
681681
"user_id_pool_batch_size",
682+
"webhook_max_request_size_bytes",
682683
]
683684

684685

misc/python/materialize/parallel_workload/action.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1655,6 +1655,12 @@ def __init__(
16551655
BOOLEAN_FLAG_VALUES
16561656
)
16571657
self.flags_with_values["enable_upsert_paged_spill"] = BOOLEAN_FLAG_VALUES
1658+
self.flags_with_values["webhook_max_request_size_bytes"] = [
1659+
# 1 MiB, 5 MiB (default), 10 MiB
1660+
"1048576",
1661+
"5242880",
1662+
"10485760",
1663+
]
16581664

16591665
# If you are adding a new config flag in Materialize, consider using it
16601666
# here instead of just marking it as uninteresting to silence the

src/adapter-types/src/dyncfgs.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,17 @@ pub const MCP_MAX_RESPONSE_SIZE: Config<usize> = Config::new(
221221
"Maximum size in bytes of MCP tool response content. Responses exceeding this limit are rejected with an error telling the agent to narrow its query.",
222222
);
223223

224+
/// Maximum size (in bytes) of a webhook request body, measured after
225+
/// decompression. Requests whose body exceeds this limit are rejected with
226+
/// HTTP 413. Applies only to the webhook route; other HTTP routes use a
227+
/// separate static limit.
228+
pub const WEBHOOK_MAX_REQUEST_SIZE_BYTES: Config<usize> = Config::new(
229+
"webhook_max_request_size_bytes",
230+
// Matches `MAX_REQUEST_SIZE`, the static limit the other environmentd HTTP routes use.
231+
5 * 1024 * 1024,
232+
"The maximum size in bytes of a webhook request body, measured after decompression.",
233+
);
234+
224235
/// Number of user IDs to pre-allocate in a batch. Pre-allocating IDs avoids
225236
/// a persist write + oracle call per DDL statement.
226237
pub const USER_ID_POOL_BATCH_SIZE: Config<u32> = Config::new(
@@ -319,6 +330,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
319330
.add(&ENABLE_MCP_DEVELOPER_QUERY_TOOL)
320331
.add(&ENABLE_PUBLIC_METRICS_ENDPOINT)
321332
.add(&MCP_MAX_RESPONSE_SIZE)
333+
.add(&WEBHOOK_MAX_REQUEST_SIZE_BYTES)
322334
.add(&USER_ID_POOL_BATCH_SIZE)
323335
.add(&CONSOLE_OIDC_CLIENT_ID)
324336
.add(&CONSOLE_OIDC_SCOPES)

src/environmentd/src/environmentd/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
10221022
});
10231023

10241024
let persist_clients = Arc::new(persist_clients);
1025+
let system_dyncfgs = Arc::clone(&persist_clients.cfg().configs);
10251026
let connection_context = ConnectionContext::from_cli_args(
10261027
args.environment_id.to_string(),
10271028
&args.tracing.startup_log_filter,
@@ -1101,6 +1102,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
11011102
controller,
11021103
secrets_controller,
11031104
cloud_resource_controller,
1105+
system_dyncfgs,
11041106
// Storage options.
11051107
storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
11061108
storage_usage_retention_period: args.storage_usage_retention_period,

src/environmentd/src/http.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ use mz_auth::Authenticated;
8080
use mz_auth::password::Password;
8181
use mz_authenticator::Authenticator;
8282
use mz_controller::ReplicaHttpLocator;
83+
use mz_dyncfg::ConfigSet;
8384
use mz_frontegg_auth::Error as FronteggError;
8485
use mz_http_util::DynamicFilterTarget;
8586
use mz_ore::cast::u64_to_usize;
@@ -174,6 +175,7 @@ pub struct HttpConfig {
174175
pub http_host_name: Option<String>,
175176
pub frontegg_oauth_issuer_url: Option<String>,
176177
pub concurrent_webhook_req: Arc<tokio::sync::Semaphore>,
178+
pub dyncfgs: Arc<ConfigSet>,
177179
pub metrics: Metrics,
178180
pub metrics_registry: MetricsRegistry,
179181
pub mcp_metrics: mcp_metrics::McpMetrics,
@@ -206,6 +208,7 @@ pub struct WsState {
206208
pub struct WebhookState {
207209
adapter_client_rx: Delayed<mz_adapter::Client>,
208210
webhook_cache: WebhookAppenderCache,
211+
dyncfgs: Arc<ConfigSet>,
209212
}
210213

211214
#[derive(Clone, Debug)]
@@ -233,6 +236,7 @@ impl HttpServer {
233236
http_host_name,
234237
frontegg_oauth_issuer_url,
235238
concurrent_webhook_req,
239+
dyncfgs,
236240
metrics,
237241
metrics_registry,
238242
mcp_metrics,
@@ -346,6 +350,7 @@ impl HttpServer {
346350
.with_state(WebhookState {
347351
adapter_client_rx: adapter_client_rx.clone(),
348352
webhook_cache,
353+
dyncfgs,
349354
})
350355
.layer(
351356
tower_http::decompression::RequestDecompressionLayer::new()
@@ -354,6 +359,11 @@ impl HttpServer {
354359
.br(true)
355360
.zstd(true),
356361
)
362+
// The webhook handler enforces WEBHOOK_MAX_REQUEST_SIZE_BYTES
363+
// itself via to_bytes on a raw Body. This disable is defense-in-depth:
364+
// it only matters if the handler ever returns to a Bytes extractor,
365+
// which would otherwise inherit the global 5 MiB limit.
366+
.layer(DefaultBodyLimit::disable())
357367
.layer(
358368
CorsLayer::new()
359369
.allow_methods(Method::POST)

src/environmentd/src/http/webhook.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ use mz_repr::{Datum, Diff, Row, RowPacker, SqlScalarType};
2121
use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
2222
use mz_storage_types::controller::StorageError;
2323

24+
use axum::body::Body;
2425
use axum::extract::{Path, State};
2526
use axum::response::IntoResponse;
2627
use bytes::Bytes;
2728
use http::StatusCode;
29+
use mz_adapter_types::dyncfgs::WEBHOOK_MAX_REQUEST_SIZE_BYTES;
2830
use thiserror::Error;
2931

3032
use crate::http::WebhookState;
@@ -33,11 +35,32 @@ pub async fn handle_webhook(
3335
State(WebhookState {
3436
adapter_client_rx,
3537
webhook_cache,
38+
dyncfgs,
3639
}): State<WebhookState>,
3740
Path((database, schema, name)): Path<(String, String, String)>,
3841
headers: http::HeaderMap,
39-
body: Bytes,
42+
body: Body,
4043
) -> impl IntoResponse {
44+
let max_request_size = WEBHOOK_MAX_REQUEST_SIZE_BYTES.get(&dyncfgs);
45+
let body = axum::body::to_bytes(body, max_request_size)
46+
.await
47+
.map_err(|err| {
48+
use std::error::Error;
49+
// axum::Error wraps the underlying cause as its source. If that source is a
50+
// LengthLimitError the body exceeded the configured limit (HTTP 413). Any other
51+
// cause (TCP reset, decompression failure, etc.) is an internal read error (HTTP 500)
52+
// and must not be reported as a size-limit violation.
53+
if err
54+
.source()
55+
.is_some_and(|s| s.is::<http_body_util::LengthLimitError>())
56+
{
57+
WebhookError::BodyTooLarge {
58+
max_bytes: max_request_size,
59+
}
60+
} else {
61+
WebhookError::Internal(anyhow::anyhow!(err))
62+
}
63+
})?;
4164
let adapter_client = adapter_client_rx.clone().await.expect("sender not dropped");
4265
// Collect headers into a map, while converting them into strings.
4366
let mut headers_s = BTreeMap::new();
@@ -332,6 +355,8 @@ pub enum WebhookError {
332355
Unavailable,
333356
#[error("internal storage failure! {0:?}")]
334357
InternalStorageError(StorageError),
358+
#[error("request body exceeds the maximum allowed size of {max_bytes} bytes")]
359+
BodyTooLarge { max_bytes: usize },
335360
#[error("internal failure! {0:?}")]
336361
Internal(#[from] anyhow::Error),
337362
}
@@ -387,6 +412,9 @@ impl IntoResponse for WebhookError {
387412
e @ WebhookError::InvalidHeaders(_) => {
388413
(StatusCode::UNAUTHORIZED, e.to_string()).into_response()
389414
}
415+
e @ WebhookError::BodyTooLarge { .. } => {
416+
(StatusCode::PAYLOAD_TOO_LARGE, e.to_string()).into_response()
417+
}
390418
e @ WebhookError::Unavailable => {
391419
(StatusCode::SERVICE_UNAVAILABLE, e.to_string()).into_response()
392420
}

src/environmentd/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use mz_catalog::config::ClusterReplicaSizeMap;
4343
use mz_catalog::durable::BootstrapArgs;
4444
use mz_cloud_resources::CloudResourceController;
4545
use mz_controller::ControllerConfig;
46+
use mz_dyncfg::ConfigSet;
4647
use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
4748
use mz_license_keys::ValidatedLicenseKey;
4849
use mz_ore::future::OreFutureExt;
@@ -137,6 +138,8 @@ pub struct Config {
137138
pub secrets_controller: Arc<dyn SecretsController>,
138139
/// VpcEndpoint controller configuration.
139140
pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
141+
/// The process-wide live system dyncfg set.
142+
pub system_dyncfgs: Arc<ConfigSet>,
140143

141144
// === Storage options. ===
142145
/// The interval at which to collect storage usage information.
@@ -412,6 +415,7 @@ impl Listeners {
412415
allowed_origin: config.cors_allowed_origin.clone(),
413416
allowed_origin_list: config.cors_allowed_origin_list.clone(),
414417
concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
418+
dyncfgs: Arc::clone(&config.system_dyncfgs),
415419
metrics: metrics.clone(),
416420
metrics_registry: metrics_registry.clone(),
417421
mcp_metrics: mcp_metrics.clone(),

src/environmentd/src/test_util.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,7 @@ impl Listeners {
806806
let persist_clients =
807807
PersistClientCache::new(persist_cfg, &metrics_registry, |_, _| persist_pubsub_client);
808808
let persist_clients = Arc::new(persist_clients);
809+
let system_dyncfgs = Arc::clone(&persist_clients.cfg().configs);
809810

810811
let secrets_controller = Arc::clone(&orchestrator);
811812
let connection_context = ConnectionContext::for_tests(orchestrator.reader());
@@ -884,6 +885,7 @@ impl Listeners {
884885
},
885886
secrets_controller,
886887
cloud_resource_controller: None,
888+
system_dyncfgs,
887889
tls: config.tls,
888890
frontegg: config.frontegg,
889891
frontegg_oauth_issuer_url: None,

src/environmentd/tests/server.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3911,6 +3911,93 @@ fn webhook_too_large_request() {
39113911
assert_eq!(payload_too_large, 1.0);
39123912
}
39133913

3914+
#[mz_ore::test]
3915+
#[cfg_attr(miri, ignore)] // too slow
3916+
#[allow(clippy::disallowed_methods)]
3917+
fn webhook_max_request_size() {
3918+
let server = test_util::TestHarness::default()
3919+
.unsafe_mode()
3920+
.start_blocking();
3921+
3922+
let mut mz_client = server
3923+
.pg_config_internal()
3924+
.user(&SYSTEM_USER.name)
3925+
.connect(postgres::NoTls)
3926+
.unwrap();
3927+
3928+
let mut client = server.connect(postgres::NoTls).unwrap();
3929+
3930+
client
3931+
.execute(
3932+
"CREATE CLUSTER webhook_cluster (SIZE 'scale=1,workers=1');",
3933+
&[],
3934+
)
3935+
.expect("failed to create cluster");
3936+
client
3937+
.execute(
3938+
"CREATE SOURCE webhook_bytes IN CLUSTER webhook_cluster \
3939+
FROM WEBHOOK BODY FORMAT BYTES",
3940+
&[],
3941+
)
3942+
.expect("failed to create source");
3943+
3944+
let http_client = reqwest::Client::new();
3945+
let webhook_url = format!(
3946+
"http://{}/api/webhook/materialize/public/webhook_bytes",
3947+
server.http_local_addr(),
3948+
);
3949+
3950+
// Helper: POST a body of `len` bytes, return the HTTP status.
3951+
let post = |len: usize| {
3952+
let body = vec![b'a'; len];
3953+
server.runtime().block_on(async {
3954+
http_client
3955+
.post(&webhook_url)
3956+
.body(body)
3957+
.send()
3958+
.await
3959+
.expect("request failed")
3960+
.status()
3961+
})
3962+
};
3963+
3964+
// Default is 5 MiB: a 6 MiB body must be rejected with 413.
3965+
assert_eq!(post(6 * 1024 * 1024).as_u16(), 413);
3966+
3967+
// Raise the limit to 10 MiB and confirm the 6 MiB body is now accepted.
3968+
mz_client
3969+
.batch_execute("ALTER SYSTEM SET webhook_max_request_size_bytes = 10485760")
3970+
.unwrap();
3971+
// The dyncfg propagates to the shared persist ConfigSet asynchronously,
3972+
// so retry briefly until the new limit takes effect.
3973+
Retry::default()
3974+
.max_duration(std::time::Duration::from_secs(30))
3975+
.retry(|_| {
3976+
if post(6 * 1024 * 1024).is_success() {
3977+
Ok(())
3978+
} else {
3979+
Err(())
3980+
}
3981+
})
3982+
.expect("6 MiB body accepted after raising the limit to 10 MiB");
3983+
3984+
// Lower the limit below the default and confirm enforcement is live: a
3985+
// body that would have been accepted at the default is now rejected.
3986+
mz_client
3987+
.batch_execute("ALTER SYSTEM SET webhook_max_request_size_bytes = 1024")
3988+
.unwrap();
3989+
Retry::default()
3990+
.max_duration(std::time::Duration::from_secs(30))
3991+
.retry(|_| {
3992+
if post(2048).as_u16() == 413 {
3993+
Ok(())
3994+
} else {
3995+
Err(())
3996+
}
3997+
})
3998+
.expect("2 KiB body rejected after lowering the limit to 1 KiB");
3999+
}
4000+
39144001
#[mz_ore::test]
39154002
#[cfg_attr(miri, ignore)] // too slow
39164003
#[allow(clippy::disallowed_methods)]

src/sqllogictest/src/runner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,6 +1193,7 @@ impl<'a> RunnerInner<'a> {
11931193
persist_clients: Arc::clone(&persist_clients),
11941194
metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
11951195
};
1196+
let system_dyncfgs = Arc::clone(&persist_clients.cfg().configs);
11961197
let server_config = mz_environmentd::Config {
11971198
catalog_config,
11981199
timestamp_oracle_url: Some(timestamp_oracle_url),
@@ -1227,6 +1228,7 @@ impl<'a> RunnerInner<'a> {
12271228
},
12281229
secrets_controller,
12291230
cloud_resource_controller: None,
1231+
system_dyncfgs,
12301232
tls: None,
12311233
frontegg: None,
12321234
frontegg_oauth_issuer_url: None,

0 commit comments

Comments
 (0)