Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/http_source_decompression_bomb.security.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
HTTP-based sources (`http_server`, `prometheus_pushgateway`, `prometheus_remote_write`, `heroku_logs`, `opentelemetry`) now cap decompressed request bodies at 100 MiB. Previously, a single unauthenticated request carrying a compressed payload (e.g. a gzip bomb) could allocate unbounded memory and OOM-kill the Vector process. Decompressed payloads exceeding the cap are rejected with HTTP 413, as are requests whose declared `Content-Length` exceeds the same limit.

authors: pront
33 changes: 33 additions & 0 deletions src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,39 @@ mod tests {
}
}

#[tokio::test]
async fn http_rejects_gzip_bomb_with_413() {
// A modestly-sized gzipped blob of zeros that would expand past the default
// 100 MiB cap if decompression were unbounded.
let plaintext = vec![0u8; 200 * 1024 * 1024];
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&plaintext).unwrap();
let body = encoder.finish().unwrap();

let mut headers = HeaderMap::new();
headers.insert("Content-Encoding", "gzip".parse().unwrap());

components::init_test();
let (_rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
None,
true,
EventStatus::Delivered,
true,
None,
None,
)
.await;

assert_eq!(413, send_bytes(addr, body, headers).await);
}

#[tokio::test]
async fn http_path() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
Expand Down
170 changes: 150 additions & 20 deletions src/sources/util/http/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,48 @@ use warp::http::StatusCode;

use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};

/// Default cap on the decompressed body size produced by [`decompress_body`].
///
/// Prevents a compressed "bomb" payload from causing unbounded memory growth.
pub(crate) const DEFAULT_MAX_DECOMPRESSED_BODY_SIZE: usize = 100 * 1024 * 1024;

/// Decompresses the body based on the Content-Encoding header.
///
/// Supports gzip, deflate, snappy, zstd, and identity (no compression).
pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
///
/// Caps the decompressed output at 100 MiB to mitigate decompression-bomb DoS attacks.
pub fn decompress_body(header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
decompress_body_with_limit(header, body, Some(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE))
}

/// Like [`decompress_body`], but allows the caller to control the decompressed size cap.
///
/// `max_decompressed_size = None` disables the cap (not recommended for unauthenticated input).
pub(crate) fn decompress_body_with_limit(
header: Option<&str>,
mut body: Bytes,
max_decompressed_size: Option<usize>,
) -> Result<Bytes, ErrorMessage> {
if let Some(encodings) = header {
for encoding in encodings.rsplit(',').map(str::trim) {
body = match encoding {
"identity" => body,
"gzip" => {
let mut decoded = Vec::new();
MultiGzDecoder::new(body.reader())
.read_to_end(&mut decoded)
"gzip" => decompress_reader(
MultiGzDecoder::new(body.reader()),
encoding,
max_decompressed_size,
)?,
"deflate" => decompress_reader(
ZlibDecoder::new(body.reader()),
encoding,
max_decompressed_size,
)?,
"snappy" => decompress_snappy(&body, max_decompressed_size)?,
"zstd" => {
let decoder = zstd::stream::read::Decoder::new(body.reader())
.map_err(|error| emit_decompress_error(encoding, error))?;
decoded.into()
decompress_reader(decoder, encoding, max_decompressed_size)?
}
"deflate" => {
let mut decoded = Vec::new();
ZlibDecoder::new(body.reader())
.read_to_end(&mut decoded)
.map_err(|error| emit_decompress_error(encoding, error))?;
decoded.into()
}
"snappy" => SnappyDecoder::new()
.decompress_vec(&body)
.map_err(|error| emit_decompress_error(encoding, error))?
.into(),
"zstd" => zstd::decode_all(body.reader())
.map_err(|error| emit_decompress_error(encoding, error))?
.into(),
encoding => {
return Err(ErrorMessage::new(
StatusCode::UNSUPPORTED_MEDIA_TYPE,
Expand All @@ -49,6 +62,60 @@ pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result<Bytes, E
Ok(body)
}

fn decompress_reader<R: Read>(
reader: R,
encoding: &str,
max_decompressed_size: Option<usize>,
) -> Result<Bytes, ErrorMessage> {
let mut decoded = Vec::new();
match max_decompressed_size {
Some(max) => {
// Read one byte beyond the cap so we can detect overflow without ambiguity.
let limit = u64::try_from(max).unwrap_or(u64::MAX).saturating_add(1);
reader
.take(limit)
.read_to_end(&mut decoded)
.map_err(|error| emit_decompress_error(encoding, error))?;
if decoded.len() > max {
return Err(decompressed_too_large_error(encoding, max));
}
}
None => {
let mut reader = reader;
reader
.read_to_end(&mut decoded)
.map_err(|error| emit_decompress_error(encoding, error))?;
}
}
Ok(decoded.into())
}

fn decompress_snappy(
body: &Bytes,
max_decompressed_size: Option<usize>,
) -> Result<Bytes, ErrorMessage> {
// Snappy stores the decompressed length in the frame header, so reject oversized
// payloads before allocating the output buffer.
if let Some(max) = max_decompressed_size {
let len = snap::raw::decompress_len(body)
.map_err(|error| emit_decompress_error("snappy", error))?;
if len > max {
return Err(decompressed_too_large_error("snappy", max));
}
}
let decoded = SnappyDecoder::new()
.decompress_vec(body)
.map_err(|error| emit_decompress_error("snappy", error))?;
Ok(decoded.into())
}

fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage {
ErrorMessage::new(
StatusCode::PAYLOAD_TOO_LARGE,
format!("Decompressed {encoding} body exceeds limit of {max} bytes."),
)
}

pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
emit!(HttpDecompressError {
encoding,
Expand All @@ -59,3 +126,66 @@ pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> E
format!("Failed decompressing payload with {encoding} decoder."),
)
}

#[cfg(test)]
mod tests {
use std::io::Write;

use flate2::{Compression, write::GzEncoder};

use super::*;

fn gzip_payload(plaintext: &[u8]) -> Bytes {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(plaintext).unwrap();
encoder.finish().unwrap().into()
}

#[test]
fn gzip_within_limit_succeeds() {
let plaintext = vec![0u8; 10_000];
let body = gzip_payload(&plaintext);

let decoded = decompress_body_with_limit(Some("gzip"), body, Some(100_000)).unwrap();
assert_eq!(decoded.len(), plaintext.len());
}

#[test]
fn gzip_exceeding_limit_returns_413() {
// Compress 1 MB of zeros, then cap at 1 KB.
let plaintext = vec![0u8; 1_000_000];
let body = gzip_payload(&plaintext);

let err =
decompress_body_with_limit(Some("gzip"), body, Some(1024)).expect_err("should reject");
assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
}

#[test]
fn default_limit_protects_against_decompression_bomb() {
// A small input that would expand far past 100 MB if we let it run unbounded.
let plaintext = vec![0u8; 200 * 1024 * 1024];
let body = gzip_payload(&plaintext);

let err = decompress_body(Some("gzip"), body).expect_err("should reject");
assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
}

#[test]
fn snappy_exceeding_limit_returns_413_before_allocating() {
// 2 MB of zeros. Snappy keeps the embedded length in the frame header.
let plaintext = vec![0u8; 2 * 1024 * 1024];
let compressed = snap::raw::Encoder::new().compress_vec(&plaintext).unwrap();

let err = decompress_body_with_limit(Some("snappy"), compressed.into(), Some(1024))
.expect_err("should reject");
assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
}

#[test]
fn identity_passes_through() {
let body: Bytes = Bytes::from_static(b"hello world");
let decoded = decompress_body(Some("identity"), body.clone()).unwrap();
assert_eq!(decoded, body);
}
}
26 changes: 24 additions & 2 deletions src/sources/util/http/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use warp::{
reject::Rejection,
};

use super::encoding::decompress_body;
use super::encoding::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, decompress_body};
use crate::{
SourceSender,
common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
Expand Down Expand Up @@ -99,6 +99,28 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
for s in path.split('/').filter(|&x| !x.is_empty()) {
filter = filter.and(warp::path(s.to_string())).boxed()
}
// Defense-in-depth: reject oversized requests up front based on the declared
// `Content-Length`, before reading or decompressing the body. Mirrors the
// decompressed-body cap applied in `decompress_body`.
const MAX_REQUEST_BODY_SIZE: u64 = DEFAULT_MAX_DECOMPRESSED_BODY_SIZE as u64;
let body_filter: BoxedFilter<(Bytes,)> = warp::header::optional::<u64>(
"content-length",
)
.and_then(|declared: Option<u64>| async move {
match declared {
Some(len) if len > MAX_REQUEST_BODY_SIZE => {
Err(warp::reject::custom(ErrorMessage::new(
StatusCode::PAYLOAD_TOO_LARGE,
format!("Request body exceeds limit of {MAX_REQUEST_BODY_SIZE} bytes."),
)))
}
_ => Ok(()),
}
})
.untuple_one()
.and(warp::body::bytes())
.boxed();

let svc = filter
.and(warp::path::tail())
.and_then(move |tail: Tail| async move {
Expand All @@ -118,7 +140,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
.and(warp::path::full())
.and(warp::header::optional::<String>("content-encoding"))
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.and(body_filter)
.and(warp::query::<HashMap<String, String>>())
.and(warp::filters::ext::optional())
.and_then(
Expand Down
Loading