From 038b3131f567e44896791b0b4a002f46659cd639 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 22 May 2026 13:22:21 -0400 Subject: [PATCH 1/5] fix(sources): cap decompressed body to prevent OOM The shared decompress_body helper used by http_server, prometheus_remote_write, prometheus_pushgateway, heroku_logs, and the opentelemetry HTTP receiver previously read the gzip/deflate/zstd/snappy output into an unbounded Vec. A single unauthenticated POST with a small compressed bomb could OOM-kill the Vector process. Cap the decompressed payload at 100 MiB and return HTTP 413 when exceeded. gzip/deflate/zstd are wrapped with io::Take; snappy is rejected up front via snap::raw::decompress_len before allocating. Also short-circuit at the warp layer when the declared Content-Length already exceeds the cap. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...http_source_decompression_bomb.security.md | 3 + src/sources/http_server.rs | 33 ++++ src/sources/util/http/encoding.rs | 171 ++++++++++++++++-- src/sources/util/http/prelude.rs | 26 ++- 4 files changed, 211 insertions(+), 22 deletions(-) create mode 100644 changelog.d/http_source_decompression_bomb.security.md diff --git a/changelog.d/http_source_decompression_bomb.security.md b/changelog.d/http_source_decompression_bomb.security.md new file mode 100644 index 0000000000000..9b7b39ec750c0 --- /dev/null +++ b/changelog.d/http_source_decompression_bomb.security.md @@ -0,0 +1,3 @@ +HTTP-based sources (`http_server`, `prometheus_pushgateway`, `prometheus_remote_write`, `heroku_logs`, `opentelemetry`) now cap decompressed request bodies at 100 MiB. Previously, a single unauthenticated request carrying a compressed payload (e.g. a gzip bomb) could allocate unbounded memory and OOM-kill the Vector process. Decompressed payloads exceeding the cap are rejected with HTTP 413, as are requests whose declared `Content-Length` exceeds the same limit. + +authors: pront diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 7c444284adc6c..a4db54f2f77ed 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -1270,6 +1270,39 @@ mod tests { } } + #[tokio::test] + async fn http_rejects_gzip_bomb_with_413() { + // A modestly-sized gzipped blob of zeros that would expand past the default + // 100 MiB cap if decompression were unbounded. + let plaintext = vec![0u8; 200 * 1024 * 1024]; + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&plaintext).unwrap(); + let body = encoder.finish().unwrap(); + + let mut headers = HeaderMap::new(); + headers.insert("Content-Encoding", "gzip".parse().unwrap()); + + components::init_test(); + let (_rx, addr) = source( + vec![], + vec![], + "http_path", + "remote_ip", + "/", + "POST", + StatusCode::OK, + None, + true, + EventStatus::Delivered, + true, + None, + None, + ) + .await; + + assert_eq!(413, send_bytes(addr, body, headers).await); + } + #[tokio::test] async fn http_path() { let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { diff --git a/src/sources/util/http/encoding.rs b/src/sources/util/http/encoding.rs index c71a1891d604e..06b5dbe0882a1 100644 --- a/src/sources/util/http/encoding.rs +++ b/src/sources/util/http/encoding.rs @@ -7,35 +7,49 @@ use warp::http::StatusCode; use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError}; +/// Default cap on the decompressed body size produced by [`decompress_body`]. +/// +/// Prevents a compressed "bomb" payload from causing unbounded memory growth. +pub(crate) const DEFAULT_MAX_DECOMPRESSED_BODY_SIZE: usize = 100 * 1024 * 1024; + /// Decompresses the body based on the Content-Encoding header. /// /// Supports gzip, deflate, snappy, zstd, and identity (no compression). -pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result { +/// +/// Caps the decompressed output at [`DEFAULT_MAX_DECOMPRESSED_BODY_SIZE`] to mitigate +/// decompression-bomb DoS attacks. For a custom limit, use [`decompress_body_with_limit`]. +pub fn decompress_body(header: Option<&str>, body: Bytes) -> Result { + decompress_body_with_limit(header, body, Some(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE)) +} + +/// Like [`decompress_body`], but allows the caller to control the decompressed size cap. +/// +/// `max_decompressed_size = None` disables the cap (not recommended for unauthenticated input). +pub(crate) fn decompress_body_with_limit( + header: Option<&str>, + mut body: Bytes, + max_decompressed_size: Option, +) -> Result { if let Some(encodings) = header { for encoding in encodings.rsplit(',').map(str::trim) { body = match encoding { "identity" => body, - "gzip" => { - let mut decoded = Vec::new(); - MultiGzDecoder::new(body.reader()) - .read_to_end(&mut decoded) + "gzip" => decompress_reader( + MultiGzDecoder::new(body.reader()), + encoding, + max_decompressed_size, + )?, + "deflate" => decompress_reader( + ZlibDecoder::new(body.reader()), + encoding, + max_decompressed_size, + )?, + "snappy" => decompress_snappy(&body, max_decompressed_size)?, + "zstd" => { + let decoder = zstd::stream::read::Decoder::new(body.reader()) .map_err(|error| emit_decompress_error(encoding, error))?; - decoded.into() + decompress_reader(decoder, encoding, max_decompressed_size)? } - "deflate" => { - let mut decoded = Vec::new(); - ZlibDecoder::new(body.reader()) - .read_to_end(&mut decoded) - .map_err(|error| emit_decompress_error(encoding, error))?; - decoded.into() - } - "snappy" => SnappyDecoder::new() - .decompress_vec(&body) - .map_err(|error| emit_decompress_error(encoding, error))? - .into(), - "zstd" => zstd::decode_all(body.reader()) - .map_err(|error| emit_decompress_error(encoding, error))? - .into(), encoding => { return Err(ErrorMessage::new( StatusCode::UNSUPPORTED_MEDIA_TYPE, @@ -49,6 +63,60 @@ pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result( + reader: R, + encoding: &str, + max_decompressed_size: Option, +) -> Result { + let mut decoded = Vec::new(); + match max_decompressed_size { + Some(max) => { + // Read one byte beyond the cap so we can detect overflow without ambiguity. + let limit = u64::try_from(max).unwrap_or(u64::MAX).saturating_add(1); + reader + .take(limit) + .read_to_end(&mut decoded) + .map_err(|error| emit_decompress_error(encoding, error))?; + if decoded.len() > max { + return Err(decompressed_too_large_error(encoding, max)); + } + } + None => { + let mut reader = reader; + reader + .read_to_end(&mut decoded) + .map_err(|error| emit_decompress_error(encoding, error))?; + } + } + Ok(decoded.into()) +} + +fn decompress_snappy( + body: &Bytes, + max_decompressed_size: Option, +) -> Result { + // Snappy stores the decompressed length in the frame header, so reject oversized + // payloads before allocating the output buffer. + if let Some(max) = max_decompressed_size { + let len = snap::raw::decompress_len(body) + .map_err(|error| emit_decompress_error("snappy", error))?; + if len > max { + return Err(decompressed_too_large_error("snappy", max)); + } + } + let decoded = SnappyDecoder::new() + .decompress_vec(body) + .map_err(|error| emit_decompress_error("snappy", error))?; + Ok(decoded.into()) +} + +fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage { + ErrorMessage::new( + StatusCode::PAYLOAD_TOO_LARGE, + format!("Decompressed {encoding} body exceeds limit of {max} bytes."), + ) +} + pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage { emit!(HttpDecompressError { encoding, @@ -59,3 +127,66 @@ pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> E format!("Failed decompressing payload with {encoding} decoder."), ) } + +#[cfg(test)] +mod tests { + use std::io::Write; + + use flate2::{Compression, write::GzEncoder}; + + use super::*; + + fn gzip_payload(plaintext: &[u8]) -> Bytes { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(plaintext).unwrap(); + encoder.finish().unwrap().into() + } + + #[test] + fn gzip_within_limit_succeeds() { + let plaintext = vec![0u8; 10_000]; + let body = gzip_payload(&plaintext); + + let decoded = decompress_body_with_limit(Some("gzip"), body, Some(100_000)).unwrap(); + assert_eq!(decoded.len(), plaintext.len()); + } + + #[test] + fn gzip_exceeding_limit_returns_413() { + // Compress 1 MB of zeros, then cap at 1 KB. + let plaintext = vec![0u8; 1_000_000]; + let body = gzip_payload(&plaintext); + + let err = + decompress_body_with_limit(Some("gzip"), body, Some(1024)).expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn default_limit_protects_against_decompression_bomb() { + // A small input that would expand far past 100 MB if we let it run unbounded. + let plaintext = vec![0u8; 200 * 1024 * 1024]; + let body = gzip_payload(&plaintext); + + let err = decompress_body(Some("gzip"), body).expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn snappy_exceeding_limit_returns_413_before_allocating() { + // 2 MB of zeros. Snappy keeps the embedded length in the frame header. + let plaintext = vec![0u8; 2 * 1024 * 1024]; + let compressed = snap::raw::Encoder::new().compress_vec(&plaintext).unwrap(); + + let err = decompress_body_with_limit(Some("snappy"), compressed.into(), Some(1024)) + .expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn identity_passes_through() { + let body: Bytes = Bytes::from_static(b"hello world"); + let decoded = decompress_body(Some("identity"), body.clone()).unwrap(); + assert_eq!(decoded, body); + } +} diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 3414ea00264b0..cfcc31cfb242f 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -21,7 +21,7 @@ use warp::{ reject::Rejection, }; -use super::encoding::decompress_body; +use super::encoding::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, decompress_body}; use crate::{ SourceSender, common::http::{ErrorMessage, server_auth::HttpServerAuthConfig}, @@ -99,6 +99,28 @@ pub trait HttpSource: Clone + Send + Sync + 'static { for s in path.split('/').filter(|&x| !x.is_empty()) { filter = filter.and(warp::path(s.to_string())).boxed() } + // Defense-in-depth: reject oversized requests up front based on the declared + // `Content-Length`, before reading or decompressing the body. Mirrors the + // decompressed-body cap applied in `decompress_body`. + const MAX_REQUEST_BODY_SIZE: u64 = DEFAULT_MAX_DECOMPRESSED_BODY_SIZE as u64; + let body_filter: BoxedFilter<(Bytes,)> = warp::header::optional::( + "content-length", + ) + .and_then(|declared: Option| async move { + match declared { + Some(len) if len > MAX_REQUEST_BODY_SIZE => { + Err(warp::reject::custom(ErrorMessage::new( + StatusCode::PAYLOAD_TOO_LARGE, + format!("Request body exceeds limit of {MAX_REQUEST_BODY_SIZE} bytes."), + ))) + } + _ => Ok(()), + } + }) + .untuple_one() + .and(warp::body::bytes()) + .boxed(); + let svc = filter .and(warp::path::tail()) .and_then(move |tail: Tail| async move { @@ -118,7 +140,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .and(warp::path::full()) .and(warp::header::optional::("content-encoding")) .and(warp::header::headers_cloned()) - .and(warp::body::bytes()) + .and(body_filter) .and(warp::query::>()) .and(warp::filters::ext::optional()) .and_then( From 4d41014f6ac7716f92037705b555aa018bb24765 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 22 May 2026 14:36:12 -0400 Subject: [PATCH 2/5] fix: drop private intra-doc links from decompress_body decompress_body is pub, but DEFAULT_MAX_DECOMPRESSED_BODY_SIZE and decompress_body_with_limit are pub(crate). The intra-doc link lint fails CI under #![deny(warnings)] when public docs reference private items. Inline the 100 MiB number in the doc comment instead. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/sources/util/http/encoding.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sources/util/http/encoding.rs b/src/sources/util/http/encoding.rs index 06b5dbe0882a1..1f9a3a9e350c1 100644 --- a/src/sources/util/http/encoding.rs +++ b/src/sources/util/http/encoding.rs @@ -16,8 +16,7 @@ pub(crate) const DEFAULT_MAX_DECOMPRESSED_BODY_SIZE: usize = 100 * 1024 * 1024; /// /// Supports gzip, deflate, snappy, zstd, and identity (no compression). /// -/// Caps the decompressed output at [`DEFAULT_MAX_DECOMPRESSED_BODY_SIZE`] to mitigate -/// decompression-bomb DoS attacks. For a custom limit, use [`decompress_body_with_limit`]. +/// Caps the decompressed output at 100 MiB to mitigate decompression-bomb DoS attacks. pub fn decompress_body(header: Option<&str>, body: Bytes) -> Result { decompress_body_with_limit(header, body, Some(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE)) } From 7d1980efc46f2453c5366a2132fb25cac8d011fd Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 26 May 2026 10:12:12 -0400 Subject: [PATCH 3/5] fix: tighten HTTP decompression limits --- src/sources/util/http/encoding.rs | 81 ++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 1 deletion(-) diff --git a/src/sources/util/http/encoding.rs b/src/sources/util/http/encoding.rs index 1f9a3a9e350c1..c7b53e91d4672 100644 --- a/src/sources/util/http/encoding.rs +++ b/src/sources/util/http/encoding.rs @@ -45,8 +45,15 @@ pub(crate) fn decompress_body_with_limit( )?, "snappy" => decompress_snappy(&body, max_decompressed_size)?, "zstd" => { - let decoder = zstd::stream::read::Decoder::new(body.reader()) + let mut decoder = zstd::stream::read::Decoder::new(body.reader()) .map_err(|error| emit_decompress_error(encoding, error))?; + if let Some(max) = max_decompressed_size + && let Some(window_log_max) = zstd_window_log_max(max) + { + decoder + .window_log_max(window_log_max) + .map_err(|error| emit_decompress_error(encoding, error))?; + } decompress_reader(decoder, encoding, max_decompressed_size)? } encoding => { @@ -59,6 +66,7 @@ pub(crate) fn decompress_body_with_limit( } } + ensure_body_within_limit(&body, "identity", max_decompressed_size)?; Ok(body) } @@ -109,6 +117,30 @@ fn decompress_snappy( Ok(decoded.into()) } +fn ensure_body_within_limit( + body: &Bytes, + encoding: &str, + max_decompressed_size: Option, +) -> Result<(), ErrorMessage> { + if let Some(max) = max_decompressed_size + && body.len() > max + { + return Err(decompressed_too_large_error(encoding, max)); + } + Ok(()) +} + +fn zstd_window_log_max(max_decompressed_size: usize) -> Option { + const MIN_ZSTD_WINDOW_LOG: u32 = 10; + const MAX_ZSTD_WINDOW_LOG: u32 = 31; + + // `window_log_max` is expressed as a power-of-two log. Use the smallest zstd + // window capable of representing the configured byte budget. + max_decompressed_size.checked_sub(1).map(|max_index| { + (usize::BITS - max_index.leading_zeros()).clamp(MIN_ZSTD_WINDOW_LOG, MAX_ZSTD_WINDOW_LOG) + }) +} + fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage { ErrorMessage::new( StatusCode::PAYLOAD_TOO_LARGE, @@ -132,6 +164,7 @@ mod tests { use std::io::Write; use flate2::{Compression, write::GzEncoder}; + use zstd::stream::Encoder as ZstdEncoder; use super::*; @@ -141,6 +174,13 @@ mod tests { encoder.finish().unwrap().into() } + fn zstd_payload_with_window_log(plaintext: &[u8], window_log: u32) -> Bytes { + let mut encoder = ZstdEncoder::new(Vec::new(), 0).unwrap(); + encoder.window_log(window_log).unwrap(); + encoder.write_all(plaintext).unwrap(); + encoder.finish().unwrap().into() + } + #[test] fn gzip_within_limit_succeeds() { let plaintext = vec![0u8; 10_000]; @@ -182,10 +222,49 @@ mod tests { assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); } + #[test] + fn zstd_exceeding_limit_returns_413() { + let plaintext = vec![0u8; 10_000]; + let compressed = zstd_payload_with_window_log(plaintext.as_slice(), 10); + + let err = decompress_body_with_limit(Some("zstd"), compressed, Some(1024)) + .expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + #[test] fn identity_passes_through() { let body: Bytes = Bytes::from_static(b"hello world"); let decoded = decompress_body(Some("identity"), body.clone()).unwrap(); assert_eq!(decoded, body); } + + #[test] + fn identity_exceeding_limit_returns_413() { + let body = Bytes::from_static(b"hello world"); + + let err = + decompress_body_with_limit(Some("identity"), body, Some(5)).expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn missing_content_encoding_exceeding_limit_returns_413() { + let body = Bytes::from_static(b"hello world"); + + let err = decompress_body_with_limit(None, body, Some(5)).expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn zstd_window_log_tracks_limit() { + assert_eq!(zstd_window_log_max(0), None); + assert_eq!(zstd_window_log_max(1), Some(10)); + assert_eq!(zstd_window_log_max(1024), Some(10)); + assert_eq!(zstd_window_log_max(1025), Some(11)); + assert_eq!( + zstd_window_log_max(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE), + Some(27) + ); + } } From 47b9ada3e89c7129c463a4acbedfc04b125b42a8 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 26 May 2026 12:45:19 -0400 Subject: [PATCH 4/5] fix: bound HTTP request body aggregation --- src/sources/http_server.rs | 6 +- src/sources/opentelemetry/http.rs | 9 ++- src/sources/util/http/encoding.rs | 98 +++++++++++++++++++++++++++---- src/sources/util/http/mod.rs | 2 + src/sources/util/http/prelude.rs | 24 +------- 5 files changed, 101 insertions(+), 38 deletions(-) diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index a4db54f2f77ed..975a92c6108ad 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -1274,9 +1274,11 @@ mod tests { async fn http_rejects_gzip_bomb_with_413() { // A modestly-sized gzipped blob of zeros that would expand past the default // 100 MiB cap if decompression were unbounded. - let plaintext = vec![0u8; 200 * 1024 * 1024]; let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); - encoder.write_all(&plaintext).unwrap(); + let chunk = [0u8; 8 * 1024]; + for _ in 0..(200 * 1024 * 1024 / chunk.len()) { + encoder.write_all(&chunk).unwrap(); + } let body = encoder.finish().unwrap(); let mut headers = HeaderMap::new(); diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index 42be681f1cb90..4590481608652 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -39,7 +39,10 @@ use crate::{ sources::{ http_server::HttpConfigParamKind, opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES}, - util::{add_headers, decompress_body}, + util::{ + add_headers, decompress_body, + http::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, limited_body}, + }, }, tls::MaybeTlsSettings, }; @@ -191,6 +194,8 @@ where + 'static + Fn(Option, HeaderMap, Bytes) -> Result, ErrorMessage>, { + let body_filter = limited_body(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE); + warp::post() .and(warp::path("v1")) .and(warp::path(telemetry_type)) @@ -201,7 +206,7 @@ where )) .and(warp::header::optional::("content-encoding")) .and(warp::header::headers_cloned()) - .and(warp::body::bytes()) + .and(body_filter) .and_then( move |encoding_header: Option, headers: HeaderMap, body: Bytes| { let events = make_events(encoding_header, headers, body); diff --git a/src/sources/util/http/encoding.rs b/src/sources/util/http/encoding.rs index c7b53e91d4672..e6297fe2dd1db 100644 --- a/src/sources/util/http/encoding.rs +++ b/src/sources/util/http/encoding.rs @@ -1,9 +1,10 @@ use std::io::Read; -use bytes::{Buf, Bytes}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use flate2::read::{MultiGzDecoder, ZlibDecoder}; +use futures_util::StreamExt; use snap::raw::Decoder as SnappyDecoder; -use warp::http::StatusCode; +use warp::{Filter, filters::BoxedFilter, http::StatusCode}; use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError}; @@ -12,6 +13,31 @@ use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError}; /// Prevents a compressed "bomb" payload from causing unbounded memory growth. pub(crate) const DEFAULT_MAX_DECOMPRESSED_BODY_SIZE: usize = 100 * 1024 * 1024; +/// Collects a request body into [`Bytes`] while enforcing an in-memory size cap. +#[allow(dead_code)] +pub(crate) fn limited_body(max_body_size: usize) -> BoxedFilter<(Bytes,)> { + let max_body_size_header = u64::try_from(max_body_size).unwrap_or(u64::MAX); + + warp::header::optional::("content-length") + .and_then(move |declared: Option| async move { + if declared.is_some_and(|len| len > max_body_size_header) { + Err(warp::reject::custom(request_body_too_large_error( + max_body_size, + ))) + } else { + Ok(()) + } + }) + .untuple_one() + .and(warp::body::stream()) + .and_then(move |body| async move { + collect_body_with_limit(body, max_body_size) + .await + .map_err(warp::reject::custom) + }) + .boxed() +} + /// Decompresses the body based on the Content-Encoding header. /// /// Supports gzip, deflate, snappy, zstd, and identity (no compression). @@ -117,6 +143,25 @@ fn decompress_snappy( Ok(decoded.into()) } +async fn collect_body_with_limit(body: S, max_body_size: usize) -> Result +where + S: futures_util::Stream>, + B: Buf, +{ + futures_util::pin_mut!(body); + + let mut bytes = BytesMut::new(); + while let Some(chunk) = body.next().await { + let chunk = chunk.map_err(body_read_error)?; + if chunk.remaining() > max_body_size.saturating_sub(bytes.len()) { + return Err(request_body_too_large_error(max_body_size)); + } + bytes.put(chunk); + } + + Ok(bytes.freeze()) +} + fn ensure_body_within_limit( body: &Bytes, encoding: &str, @@ -141,6 +186,13 @@ fn zstd_window_log_max(max_decompressed_size: usize) -> Option { }) } +fn request_body_too_large_error(max: usize) -> ErrorMessage { + ErrorMessage::new( + StatusCode::PAYLOAD_TOO_LARGE, + format!("Request body exceeds limit of {max} bytes."), + ) +} + fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage { ErrorMessage::new( StatusCode::PAYLOAD_TOO_LARGE, @@ -148,6 +200,13 @@ fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage { ) } +fn body_read_error(error: warp::Error) -> ErrorMessage { + ErrorMessage::new( + StatusCode::BAD_REQUEST, + format!("Failed reading request body: {error}"), + ) +} + pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage { emit!(HttpDecompressError { encoding, @@ -164,6 +223,7 @@ mod tests { use std::io::Write; use flate2::{Compression, write::GzEncoder}; + use futures_util::stream; use zstd::stream::Encoder as ZstdEncoder; use super::*; @@ -201,16 +261,6 @@ mod tests { assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); } - #[test] - fn default_limit_protects_against_decompression_bomb() { - // A small input that would expand far past 100 MB if we let it run unbounded. - let plaintext = vec![0u8; 200 * 1024 * 1024]; - let body = gzip_payload(&plaintext); - - let err = decompress_body(Some("gzip"), body).expect_err("should reject"); - assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); - } - #[test] fn snappy_exceeding_limit_returns_413_before_allocating() { // 2 MB of zeros. Snappy keeps the embedded length in the frame header. @@ -267,4 +317,28 @@ mod tests { Some(27) ); } + + #[tokio::test] + async fn collect_body_with_limit_succeeds_within_limit() { + let body = stream::iter([ + Ok::<_, warp::Error>(Bytes::from_static(b"hello")), + Ok::<_, warp::Error>(Bytes::from_static(b" world")), + ]); + + let collected = collect_body_with_limit(body, 11).await.unwrap(); + assert_eq!(collected, Bytes::from_static(b"hello world")); + } + + #[tokio::test] + async fn collect_body_with_limit_rejects_oversized_stream() { + let body = stream::iter([ + Ok::<_, warp::Error>(Bytes::from_static(b"hello")), + Ok::<_, warp::Error>(Bytes::from_static(b" world")), + ]); + + let err = collect_body_with_limit(body, 5) + .await + .expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } } diff --git a/src/sources/util/http/mod.rs b/src/sources/util/http/mod.rs index 22c75b925f14a..67734286e2f79 100644 --- a/src/sources/util/http/mod.rs +++ b/src/sources/util/http/mod.rs @@ -16,6 +16,8 @@ mod prelude; ))] mod query; +#[cfg(feature = "sources-opentelemetry")] +pub(crate) use encoding::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, limited_body}; #[cfg(feature = "sources-utils-http-encoding")] pub use encoding::{decompress_body, emit_decompress_error}; #[cfg(feature = "sources-utils-http-headers")] diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index cfcc31cfb242f..f52add6932904 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -21,7 +21,7 @@ use warp::{ reject::Rejection, }; -use super::encoding::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, decompress_body}; +use super::encoding::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, decompress_body, limited_body}; use crate::{ SourceSender, common::http::{ErrorMessage, server_auth::HttpServerAuthConfig}, @@ -99,27 +99,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { for s in path.split('/').filter(|&x| !x.is_empty()) { filter = filter.and(warp::path(s.to_string())).boxed() } - // Defense-in-depth: reject oversized requests up front based on the declared - // `Content-Length`, before reading or decompressing the body. Mirrors the - // decompressed-body cap applied in `decompress_body`. - const MAX_REQUEST_BODY_SIZE: u64 = DEFAULT_MAX_DECOMPRESSED_BODY_SIZE as u64; - let body_filter: BoxedFilter<(Bytes,)> = warp::header::optional::( - "content-length", - ) - .and_then(|declared: Option| async move { - match declared { - Some(len) if len > MAX_REQUEST_BODY_SIZE => { - Err(warp::reject::custom(ErrorMessage::new( - StatusCode::PAYLOAD_TOO_LARGE, - format!("Request body exceeds limit of {MAX_REQUEST_BODY_SIZE} bytes."), - ))) - } - _ => Ok(()), - } - }) - .untuple_one() - .and(warp::body::bytes()) - .boxed(); + let body_filter = limited_body(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE); let svc = filter .and(warp::path::tail()) From 3c10614e89deb80db7520ecf1d594bf85b568fb4 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 29 May 2026 13:07:16 -0400 Subject: [PATCH 5/5] chore: remove spurious allow(dead_code) from limited_body Co-Authored-By: Claude Sonnet 4.6 --- src/sources/util/http/encoding.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sources/util/http/encoding.rs b/src/sources/util/http/encoding.rs index e6297fe2dd1db..ebaf8a5a4de32 100644 --- a/src/sources/util/http/encoding.rs +++ b/src/sources/util/http/encoding.rs @@ -14,7 +14,6 @@ use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError}; pub(crate) const DEFAULT_MAX_DECOMPRESSED_BODY_SIZE: usize = 100 * 1024 * 1024; /// Collects a request body into [`Bytes`] while enforcing an in-memory size cap. -#[allow(dead_code)] pub(crate) fn limited_body(max_body_size: usize) -> BoxedFilter<(Bytes,)> { let max_body_size_header = u64::try_from(max_body_size).unwrap_or(u64::MAX);