diff --git a/changelog.d/http_source_decompression_bomb.security.md b/changelog.d/http_source_decompression_bomb.security.md new file mode 100644 index 0000000000000..9b7b39ec750c0 --- /dev/null +++ b/changelog.d/http_source_decompression_bomb.security.md @@ -0,0 +1,3 @@ +HTTP-based sources (`http_server`, `prometheus_pushgateway`, `prometheus_remote_write`, `heroku_logs`, `opentelemetry`) now cap decompressed request bodies at 100 MiB. Previously, a single unauthenticated request carrying a compressed payload (e.g. a gzip bomb) could allocate unbounded memory and OOM-kill the Vector process. Decompressed payloads exceeding the cap are rejected with HTTP 413, as are requests whose declared `Content-Length` exceeds the same limit. + +authors: pront diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 7c444284adc6c..a4db54f2f77ed 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -1270,6 +1270,39 @@ mod tests { } } + #[tokio::test] + async fn http_rejects_gzip_bomb_with_413() { + // A modestly-sized gzipped blob of zeros that would expand past the default + // 100 MiB cap if decompression were unbounded. + let plaintext = vec![0u8; 200 * 1024 * 1024]; + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&plaintext).unwrap(); + let body = encoder.finish().unwrap(); + + let mut headers = HeaderMap::new(); + headers.insert("Content-Encoding", "gzip".parse().unwrap()); + + components::init_test(); + let (_rx, addr) = source( + vec![], + vec![], + "http_path", + "remote_ip", + "/", + "POST", + StatusCode::OK, + None, + true, + EventStatus::Delivered, + true, + None, + None, + ) + .await; + + assert_eq!(413, send_bytes(addr, body, headers).await); + } + #[tokio::test] async fn http_path() { let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { diff --git a/src/sources/util/http/encoding.rs b/src/sources/util/http/encoding.rs index c71a1891d604e..1f9a3a9e350c1 100644 --- a/src/sources/util/http/encoding.rs +++ b/src/sources/util/http/encoding.rs @@ -7,35 +7,48 @@ use warp::http::StatusCode; use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError}; +/// Default cap on the decompressed body size produced by [`decompress_body`]. +/// +/// Prevents a compressed "bomb" payload from causing unbounded memory growth. +pub(crate) const DEFAULT_MAX_DECOMPRESSED_BODY_SIZE: usize = 100 * 1024 * 1024; + /// Decompresses the body based on the Content-Encoding header. /// /// Supports gzip, deflate, snappy, zstd, and identity (no compression). -pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result { +/// +/// Caps the decompressed output at 100 MiB to mitigate decompression-bomb DoS attacks. +pub fn decompress_body(header: Option<&str>, body: Bytes) -> Result { + decompress_body_with_limit(header, body, Some(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE)) +} + +/// Like [`decompress_body`], but allows the caller to control the decompressed size cap. +/// +/// `max_decompressed_size = None` disables the cap (not recommended for unauthenticated input). +pub(crate) fn decompress_body_with_limit( + header: Option<&str>, + mut body: Bytes, + max_decompressed_size: Option, +) -> Result { if let Some(encodings) = header { for encoding in encodings.rsplit(',').map(str::trim) { body = match encoding { "identity" => body, - "gzip" => { - let mut decoded = Vec::new(); - MultiGzDecoder::new(body.reader()) - .read_to_end(&mut decoded) + "gzip" => decompress_reader( + MultiGzDecoder::new(body.reader()), + encoding, + max_decompressed_size, + )?, + "deflate" => decompress_reader( + ZlibDecoder::new(body.reader()), + encoding, + max_decompressed_size, + )?, + "snappy" => decompress_snappy(&body, max_decompressed_size)?, + "zstd" => { + let decoder = zstd::stream::read::Decoder::new(body.reader()) .map_err(|error| emit_decompress_error(encoding, error))?; - decoded.into() + decompress_reader(decoder, encoding, max_decompressed_size)? } - "deflate" => { - let mut decoded = Vec::new(); - ZlibDecoder::new(body.reader()) - .read_to_end(&mut decoded) - .map_err(|error| emit_decompress_error(encoding, error))?; - decoded.into() - } - "snappy" => SnappyDecoder::new() - .decompress_vec(&body) - .map_err(|error| emit_decompress_error(encoding, error))? - .into(), - "zstd" => zstd::decode_all(body.reader()) - .map_err(|error| emit_decompress_error(encoding, error))? - .into(), encoding => { return Err(ErrorMessage::new( StatusCode::UNSUPPORTED_MEDIA_TYPE, @@ -49,6 +62,60 @@ pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result( + reader: R, + encoding: &str, + max_decompressed_size: Option, +) -> Result { + let mut decoded = Vec::new(); + match max_decompressed_size { + Some(max) => { + // Read one byte beyond the cap so we can detect overflow without ambiguity. + let limit = u64::try_from(max).unwrap_or(u64::MAX).saturating_add(1); + reader + .take(limit) + .read_to_end(&mut decoded) + .map_err(|error| emit_decompress_error(encoding, error))?; + if decoded.len() > max { + return Err(decompressed_too_large_error(encoding, max)); + } + } + None => { + let mut reader = reader; + reader + .read_to_end(&mut decoded) + .map_err(|error| emit_decompress_error(encoding, error))?; + } + } + Ok(decoded.into()) +} + +fn decompress_snappy( + body: &Bytes, + max_decompressed_size: Option, +) -> Result { + // Snappy stores the decompressed length in the frame header, so reject oversized + // payloads before allocating the output buffer. + if let Some(max) = max_decompressed_size { + let len = snap::raw::decompress_len(body) + .map_err(|error| emit_decompress_error("snappy", error))?; + if len > max { + return Err(decompressed_too_large_error("snappy", max)); + } + } + let decoded = SnappyDecoder::new() + .decompress_vec(body) + .map_err(|error| emit_decompress_error("snappy", error))?; + Ok(decoded.into()) +} + +fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage { + ErrorMessage::new( + StatusCode::PAYLOAD_TOO_LARGE, + format!("Decompressed {encoding} body exceeds limit of {max} bytes."), + ) +} + pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage { emit!(HttpDecompressError { encoding, @@ -59,3 +126,66 @@ pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> E format!("Failed decompressing payload with {encoding} decoder."), ) } + +#[cfg(test)] +mod tests { + use std::io::Write; + + use flate2::{Compression, write::GzEncoder}; + + use super::*; + + fn gzip_payload(plaintext: &[u8]) -> Bytes { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(plaintext).unwrap(); + encoder.finish().unwrap().into() + } + + #[test] + fn gzip_within_limit_succeeds() { + let plaintext = vec![0u8; 10_000]; + let body = gzip_payload(&plaintext); + + let decoded = decompress_body_with_limit(Some("gzip"), body, Some(100_000)).unwrap(); + assert_eq!(decoded.len(), plaintext.len()); + } + + #[test] + fn gzip_exceeding_limit_returns_413() { + // Compress 1 MB of zeros, then cap at 1 KB. + let plaintext = vec![0u8; 1_000_000]; + let body = gzip_payload(&plaintext); + + let err = + decompress_body_with_limit(Some("gzip"), body, Some(1024)).expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn default_limit_protects_against_decompression_bomb() { + // A small input that would expand far past 100 MB if we let it run unbounded. + let plaintext = vec![0u8; 200 * 1024 * 1024]; + let body = gzip_payload(&plaintext); + + let err = decompress_body(Some("gzip"), body).expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn snappy_exceeding_limit_returns_413_before_allocating() { + // 2 MB of zeros. Snappy keeps the embedded length in the frame header. + let plaintext = vec![0u8; 2 * 1024 * 1024]; + let compressed = snap::raw::Encoder::new().compress_vec(&plaintext).unwrap(); + + let err = decompress_body_with_limit(Some("snappy"), compressed.into(), Some(1024)) + .expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn identity_passes_through() { + let body: Bytes = Bytes::from_static(b"hello world"); + let decoded = decompress_body(Some("identity"), body.clone()).unwrap(); + assert_eq!(decoded, body); + } +} diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 3414ea00264b0..cfcc31cfb242f 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -21,7 +21,7 @@ use warp::{ reject::Rejection, }; -use super::encoding::decompress_body; +use super::encoding::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, decompress_body}; use crate::{ SourceSender, common::http::{ErrorMessage, server_auth::HttpServerAuthConfig}, @@ -99,6 +99,28 @@ pub trait HttpSource: Clone + Send + Sync + 'static { for s in path.split('/').filter(|&x| !x.is_empty()) { filter = filter.and(warp::path(s.to_string())).boxed() } + // Defense-in-depth: reject oversized requests up front based on the declared + // `Content-Length`, before reading or decompressing the body. Mirrors the + // decompressed-body cap applied in `decompress_body`. + const MAX_REQUEST_BODY_SIZE: u64 = DEFAULT_MAX_DECOMPRESSED_BODY_SIZE as u64; + let body_filter: BoxedFilter<(Bytes,)> = warp::header::optional::( + "content-length", + ) + .and_then(|declared: Option| async move { + match declared { + Some(len) if len > MAX_REQUEST_BODY_SIZE => { + Err(warp::reject::custom(ErrorMessage::new( + StatusCode::PAYLOAD_TOO_LARGE, + format!("Request body exceeds limit of {MAX_REQUEST_BODY_SIZE} bytes."), + ))) + } + _ => Ok(()), + } + }) + .untuple_one() + .and(warp::body::bytes()) + .boxed(); + let svc = filter .and(warp::path::tail()) .and_then(move |tail: Tail| async move { @@ -118,7 +140,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .and(warp::path::full()) .and(warp::header::optional::("content-encoding")) .and(warp::header::headers_cloned()) - .and(warp::body::bytes()) + .and(body_filter) .and(warp::query::>()) .and(warp::filters::ext::optional()) .and_then(