From 27d967bf573253f5ff202899af3ae11daea83d56 Mon Sep 17 00:00:00 2001 From: Jared Patterson Date: Thu, 2 Apr 2026 12:37:34 +1300 Subject: [PATCH 1/7] feat(http_server source): add configuration for custom HTTP responses Adds a `response_source` configuration option to the `http_server` source that accepts a VRL program to generate a custom HTTP response for each request. Closes: #21013 --- ...ttp_server_response_program.enhancement.md | 3 + src/sources/http_server.rs | 471 +++++++++++++++++- src/sources/util/http/prelude.rs | 294 ++++++++++- .../components/sources/generated/http.cue | 75 +++ .../sources/generated/http_server.cue | 75 +++ 5 files changed, 895 insertions(+), 23 deletions(-) create mode 100644 changelog.d/21013_http_server_response_program.enhancement.md diff --git a/changelog.d/21013_http_server_response_program.enhancement.md b/changelog.d/21013_http_server_response_program.enhancement.md new file mode 100644 index 0000000000000..2265b128a5c04 --- /dev/null +++ b/changelog.d/21013_http_server_response_program.enhancement.md @@ -0,0 +1,3 @@ +Add the `response_source` config option to the `http_server` source, allowing a VRL program to generate a custom HTTP response. The program receives the decoded events as input (`.` is an array of event objects) and can return a string body or an object with `status`, `body`, and `headers` fields. + +authors: stigglor diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 7c444284adc6c..d54b759a32274 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, net::SocketAddr}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use bytes::{Bytes, BytesMut}; use chrono::Utc; @@ -11,12 +11,16 @@ use vector_lib::{ NewlineDelimitedDecoderConfig, decoding::{DeserializerConfig, FramingConfig}, }, + compile_vrl, config::{DataType, LegacyKey, LogNamespace}, configurable::configurable_component, lookup::{lookup_v2::OptionalValuePath, owned_value_path, path}, schema::Definition, }; -use vrl::value::{Kind, kind::Collection}; +use vrl::{ + compiler::{CompilationResult, CompileConfig, Program, TypeState}, + value::{Kind, kind::Collection}, +}; use warp::http::HeaderMap; use crate::{ @@ -27,6 +31,7 @@ use crate::{ SourceOutput, }, event::Event, + format_vrl_diagnostics, http::KeepaliveConfig, serde::{bool_or_struct, default_decoding}, sources::util::{ @@ -154,6 +159,65 @@ pub struct SimpleHttpConfig { #[serde(default = "default_http_response_code")] response_code: StatusCode, + /// The HTTP status code returned to the client when the `response_source` VRL program calls + /// `abort`. Use `abort "message"` in your VRL program to suppress event forwarding and return + /// this status code immediately, bypassing sink acknowledgements entirely. + /// + /// The appropriate code depends on the reason for rejection: for example, `400` for invalid + /// input, `403` for an unauthorized source, or `429` for a rate limit. Defaults to `400`. + /// + /// To vary the status code per `abort` call rather than using a single fixed value, pass a + /// JSON-encoded object as the abort message instead of a plain string: + /// `abort encode_json({ "status": 403, "body": "forbidden" })`. The object accepts the same + /// optional fields as the `response_source` return value: `status`, `body`, and `headers`. + /// When `status` is omitted from the object, this `reject_code` is used as the fallback. + #[configurable(metadata(docs::examples = 400))] + #[configurable(metadata(docs::examples = 403))] + #[configurable(metadata(docs::numeric_type = "uint"))] + #[serde(with = "http_serde::status_code")] + #[serde(default = "default_http_reject_code")] + reject_code: StatusCode, + + /// A [VRL] program to generate the HTTP response sent back to the client. + /// + /// The program receives the decoded, enriched events where `.` is an array of event objects. + /// It must return either: + /// - A string, used directly as the response body with the configured `response_code`. + /// - An object with optional fields: `status` (integer), `body` (string), `headers` (object). + /// + /// ## Aborting and early rejection + /// + /// Call `abort` in the program to suppress event forwarding entirely and respond immediately, + /// bypassing sink acknowledgements. The abort message becomes the response body and + /// [`reject_code`][Self::reject_code] is used as the status code. + /// + /// To control both the status code and body per `abort` call, pass a JSON-encoded object as + /// the message: `abort encode_json({ "status": 403, "body": "forbidden" })`. The object + /// accepts the same optional fields as the normal return value: `status`, `body`, and + /// `headers`. Any field that is omitted falls back to the same default as the normal path + /// (`status` defaults to `reject_code`, `body` defaults to empty, `headers` to none). + /// + /// ## Acknowledgements and sink failures + /// + /// When [acknowledgements] are enabled, the response defined by this program is only sent to + /// the client if the downstream sink successfully delivers the events (`Delivered`). If the + /// sink reports a failure (`Errored` or `Rejected`), the configured response is discarded and + /// the client receives a `500 Internal Server Error` or `400 Bad Request` instead. + /// + /// Responses returned via `abort` are not subject to this — they are sent immediately before + /// any events reach the sink, so acknowledgement results can never override them. + /// + /// [acknowledgements]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + /// [VRL]: https://vector.dev/docs/reference/vrl + #[configurable(metadata( + docs::examples = "encode_json({ \"ids\": map_values(.) -> |e| { e.id } })", + docs::examples = "parsed, err = parse_json(.[0].body)\nif err != null {\n { \"status\": 400, \"body\": \"invalid JSON\" }\n} else {\n { \"status\": 202, \"body\": encode_json(parsed), \"headers\": { \"x-request-id\": .[0].request_id } }\n}", + docs::examples = "row, err = get_enrichment_table_record(\"ip_allowlist\", { \"ip\": .source_ip })\nif err != null {\n abort encode_json({ \"status\": 403, \"body\": \"source address not permitted\" })\n} else {\n { \"status\": 202, \"body\": encode_json({ \"accepted\": true }) }\n}", + docs::syntax_override = "remap_program" + ))] + #[serde(default)] + response_source: Option, + #[configurable(derived)] tls: Option, @@ -280,6 +344,8 @@ impl Default for SimpleHttpConfig { host_key: default_host_key(), method: default_http_method(), response_code: default_http_response_code(), + reject_code: default_http_reject_code(), + response_source: None, strict_path: true, framing: None, decoding: Some(default_decoding()), @@ -312,6 +378,34 @@ const fn default_http_response_code() -> StatusCode { StatusCode::OK } +const fn default_http_reject_code() -> StatusCode { + StatusCode::BAD_REQUEST +} + +fn compile_response_source( + source: &str, + enrichment_tables: &vector_lib::enrichment::TableRegistry, + metrics_storage: &vector_vrl_metrics::MetricsStorage, +) -> crate::Result> { + let functions = vector_vrl_functions::all(); + let state = TypeState::default(); + let mut config = CompileConfig::default(); + config.set_custom(enrichment_tables.clone()); + config.set_custom(metrics_storage.clone()); + let CompilationResult { + program, warnings, .. + } = compile_vrl(source, &functions, &state, config) + .map_err(|diags| format_vrl_diagnostics(source, diags))?; + if !warnings.is_empty() { + let warnings_str = format_vrl_diagnostics(source, warnings); + warn!( + message = "VRL response source compilation warning.", + warnings = %warnings_str + ); + } + Ok(Arc::new(program)) +} + /// Removes duplicates from the list, and logs a `warn!()` for each duplicate removed. pub fn remove_duplicates(mut list: Vec, list_name: &str) -> Vec { list.sort(); @@ -363,6 +457,12 @@ impl SourceConfig for SimpleHttpConfig { .build()? .with_log_namespace(log_namespace); + let response_source = self + .response_source + .as_deref() + .map(|s| compile_response_source(s, &cx.enrichment_tables, &cx.metrics_storage)) + .transpose()?; + let source = SimpleHttpSource { headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?, query_parameters: build_param_matcher(&remove_duplicates( @@ -373,6 +473,8 @@ impl SourceConfig for SimpleHttpConfig { host_key: self.host_key.clone(), decoder, log_namespace, + response_source, + reject_code: self.reject_code, }; source.run( self.address, @@ -421,9 +523,19 @@ struct SimpleHttpSource { host_key: OptionalValuePath, decoder: Decoder, log_namespace: LogNamespace, + response_source: Option>, + reject_code: StatusCode, } impl HttpSource for SimpleHttpSource { + fn response_source(&self) -> Option> { + self.response_source.clone() + } + + fn reject_code(&self) -> StatusCode { + self.reject_code + } + /// Enriches the log events with metadata for the `request_path` and for each of the headers. /// Non-log events are skipped. fn enrich_events( @@ -559,6 +671,7 @@ mod tests { sources::http_server::HttpMethod, test_util::{ addr::next_addr, + collect_ready, components::{self, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance}, spawn_collect_n, wait_for_tcp, }, @@ -604,6 +717,8 @@ mod tests { encoding: None, query_parameters, response_code, + reject_code: StatusCode::TOO_MANY_REQUESTS, + response_source: None, tls: None, auth, strict_path, @@ -684,6 +799,358 @@ mod tests { .as_u16() } + /// Spawn an `http_server` source with a `response_source` and return the bound address. + async fn source_with_program( + response_source: &str, + decoding: Option, + ) -> (impl Stream, SocketAddr) { + let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let (_guard, address) = next_addr(); + let context = SourceContext::new_test(sender, None); + let program = response_source.to_owned(); + tokio::spawn(async move { + SimpleHttpConfig { + address, + response_source: Some(program), + decoding, + ..SimpleHttpConfig::default() + } + .build(context) + .await + .unwrap() + .await + .unwrap(); + }); + wait_for_tcp(address).await; + (recv, address) + } + + /// Spawn an `http_server` source with a `response_source` and a custom `reject_code`. + async fn source_with_program_and_reject_code( + response_source: &str, + reject_code: StatusCode, + ) -> (impl Stream, SocketAddr) { + let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let (_guard, address) = next_addr(); + let context = SourceContext::new_test(sender, None); + let program = response_source.to_owned(); + tokio::spawn(async move { + SimpleHttpConfig { + address, + response_source: Some(program), + reject_code, + ..SimpleHttpConfig::default() + } + .build(context) + .await + .unwrap() + .await + .unwrap(); + }); + wait_for_tcp(address).await; + (recv, address) + } + + /// POST `body` and return the full reqwest Response (status + headers + body text). + async fn post_raw(address: SocketAddr, body: &str) -> reqwest::Response { + reqwest::Client::new() + .post(format!("http://{address}/")) + .body(body.to_owned()) + .send() + .await + .unwrap() + } + + #[tokio::test] + async fn response_source_string_return() { + assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { + let (rx, addr) = source_with_program(r#" "request received" "#, None).await; + + spawn_collect_n( + async move { + let resp = post_raw(addr, "hello\n").await; + assert_eq!(resp.status().as_u16(), 200); + assert_eq!(resp.text().await.unwrap(), "request received"); + }, + rx, + 1, + ) + .await; + }) + .await; + } + + #[tokio::test] + async fn response_source_object_custom_status_and_body() { + assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { + let program = r#" + { + "status": 202, + "body": encode_json({ "count": length(.) }) + } + "#; + let (rx, addr) = + source_with_program(program, Some(JsonDeserializerConfig::default().into())).await; + + spawn_collect_n( + async move { + let resp = post_raw(addr, "{\"id\":1}\n{\"id\":2}\n").await; + assert_eq!(resp.status().as_u16(), 202); + let body: serde_json::Value = + serde_json::from_str(&resp.text().await.unwrap()).unwrap(); + assert_eq!(body["count"], 2); + }, + rx, + 2, + ) + .await; + }) + .await; + } + + #[tokio::test] + async fn response_source_custom_response_headers() { + assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { + let program = r#" + { + "body": encode_json({ "id": .[0].id }), + "headers": { "X-Event-Id": to_string(.[0].id) } + } + "#; + let (rx, addr) = + source_with_program(program, Some(JsonDeserializerConfig::default().into())).await; + + spawn_collect_n( + async move { + let resp = post_raw(addr, "{\"id\": 42}\n").await; + assert_eq!(resp.status().as_u16(), 200); + assert_eq!(resp.headers()["x-event-id"], "42"); + }, + rx, + 1, + ) + .await; + }) + .await; + } + + #[tokio::test] + async fn response_source_invalid_vrl_fails_at_build() { + let result = SimpleHttpConfig { + address: "0.0.0.0:0".parse().unwrap(), + response_source: Some("this is not valid vrl !!!".to_owned()), + ..SimpleHttpConfig::default() + } + .build(SourceContext::new_test( + SourceSender::new_test_finalize(EventStatus::Delivered).0, + None, + )) + .await; + + assert!(result.is_err(), "invalid VRL should fail at build time"); + } + + /// Spawn an `http_server` source with a `response_source`, a specific `EventStatus` finalizer, + /// and acknowledgements enabled. This lets us simulate sink failure while also having a custom + /// VRL response configured. + async fn source_with_program_and_status( + response_source: &str, + status: EventStatus, + ) -> (impl Stream, SocketAddr) { + let (sender, recv) = SourceSender::new_test_finalize(status); + let (_guard, address) = next_addr(); + let context = SourceContext::new_test(sender, None); + let program = response_source.to_owned(); + tokio::spawn(async move { + SimpleHttpConfig { + address, + response_source: Some(program), + acknowledgements: true.into(), + ..SimpleHttpConfig::default() + } + .build(context) + .await + .unwrap() + .await + .unwrap(); + }); + wait_for_tcp(address).await; + (recv, address) + } + + /// When `response_source` calls `abort` with a plain string message, events are dropped + /// immediately, the client receives `reject_code` as the status, and the message as the body. + #[tokio::test] + async fn response_source_abort_suppresses_events_and_returns_reject_code() { + // Program that always aborts with a plain string message. + let program = r#"abort "request rejected""#; + + let (rx, addr) = + source_with_program_and_reject_code(program, StatusCode::BAD_REQUEST).await; + + let resp = post_raw(addr, "hello\n").await; + + // Client receives reject_code (400) with the abort message as the body. + assert_eq!(resp.status().as_u16(), 400); + assert_eq!(resp.text().await.unwrap(), "request rejected"); + + // No events should have been forwarded to the sink. + // Give a brief window for any (erroneous) event to arrive, then assert the stream is empty. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let events = collect_ready(rx).await; + assert!( + events.is_empty(), + "abort should suppress event forwarding, but {n} event(s) reached the sink", + n = events.len() + ); + } + + /// When `response_source` calls `abort` with a JSON-encoded object containing both `status` + /// and `body`, the client receives that exact status and body — not `reject_code`. + #[tokio::test] + async fn response_source_abort_json_object_overrides_status_and_body() { + let program = r#"abort encode_json({ "status": 403, "body": "forbidden" })"#; + + let (rx, addr) = + source_with_program_and_reject_code(program, StatusCode::BAD_REQUEST).await; + + let resp = post_raw(addr, "hello\n").await; + + // Client receives the status and body from the JSON object, not reject_code. + assert_eq!(resp.status().as_u16(), 403); + assert_eq!(resp.text().await.unwrap(), "forbidden"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let events = collect_ready(rx).await; + assert!(events.is_empty(), "abort must not forward events to sink"); + } + + /// When the JSON object in `abort` omits `status`, `reject_code` is used as the fallback. + #[tokio::test] + async fn response_source_abort_json_object_status_falls_back_to_reject_code() { + let program = r#"abort encode_json({ "body": "rejected" })"#; + + let (rx, addr) = + source_with_program_and_reject_code(program, StatusCode::UNPROCESSABLE_ENTITY).await; + + let resp = post_raw(addr, "hello\n").await; + + // No status in the object — reject_code (422) is used. + assert_eq!(resp.status().as_u16(), 422); + assert_eq!(resp.text().await.unwrap(), "rejected"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let events = collect_ready(rx).await; + assert!(events.is_empty(), "abort must not forward events to sink"); + } + + /// When `abort` is called with a JSON object containing `headers`, those headers are present + /// in the response. + #[tokio::test] + async fn response_source_abort_json_object_includes_headers() { + let program = r#"abort encode_json({ "status": 429, "body": "rate limited", "headers": { "Retry-After": "60" } })"#; + + let (rx, addr) = + source_with_program_and_reject_code(program, StatusCode::BAD_REQUEST).await; + + let resp = post_raw(addr, "hello\n").await; + + assert_eq!(resp.status().as_u16(), 429); + assert_eq!(resp.headers()["retry-after"], "60"); + assert_eq!(resp.text().await.unwrap(), "rate limited"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let events = collect_ready(rx).await; + assert!(events.is_empty(), "abort must not forward events to sink"); + } + + /// When `response_source` calls `abort` and acknowledgements are enabled, the response is + /// returned immediately without ever touching the sink — so ack results cannot override it. + #[tokio::test] + async fn response_source_abort_not_overridden_by_ack_failure() { + // Program that always aborts. + let program = r#"abort "request rejected""#; + + // Use EventStatus::Errored + acknowledgements to prove that even a failing sink + // cannot override the abort response, because we never reach handle_batch_status. + let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Errored); + let (_guard, address) = next_addr(); + let context = SourceContext::new_test(sender, None); + let prog = program.to_owned(); + tokio::spawn(async move { + SimpleHttpConfig { + address, + response_source: Some(prog), + acknowledgements: true.into(), + reject_code: StatusCode::BAD_REQUEST, + ..SimpleHttpConfig::default() + } + .build(context) + .await + .unwrap() + .await + .unwrap(); + }); + wait_for_tcp(address).await; + + let resp = post_raw(address, "hello\n").await; + + // Must be 400 from the abort, not 500 from the ack failure path. + assert_eq!( + resp.status().as_u16(), + 400, + "abort response must not be overridden by ack failure" + ); + assert_eq!(resp.text().await.unwrap(), "request rejected"); + + // No events were forwarded — the stream should be empty. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let events = collect_ready(recv).await; + assert!(events.is_empty(), "abort must not forward events to sink"); + } + + /// When a `response_source` is configured to return `{ "status": 200, "body": "ok" }` but the + /// sink reports `Errored`, the acknowledgement path overrides the custom VRL response and the + /// client receives a `500 Internal Server Error` instead. + /// + /// This test documents the current behaviour: a user-configured successful response is + /// silently discarded whenever the downstream sink fails and acknowledgements are enabled. + /// + /// The `BatchNotifier` attached to each event only resolves (and thus unblocks + /// `handle_batch_status`) once every cloned notifier is dropped — which happens when + /// `new_test_finalize` finalizes the event on the consumer side. That means the HTTP response + /// cannot complete until the event is actually pulled off the stream, so we must drive the + /// stream concurrently with the HTTP request using `spawn_collect_n`. + #[tokio::test] + async fn response_source_overridden_by_ack_failure_on_sink_error() { + // VRL program that unconditionally asks for a 200 OK with body "ok". + let program = r#"{ "status": 200, "body": "ok" }"#; + + let (rx, addr) = source_with_program_and_status(program, EventStatus::Errored).await; + + // Drive the stream concurrently: the HTTP request is the "sender" future and we collect + // exactly 1 event from the stream. The stream consumer finalizing the event fires the + // BatchNotifier (with EventStatus::Errored), which unblocks handle_batch_status and lets + // the HTTP response complete. + // + // The configured response_source returns { "status": 200, "body": "ok" }, but because + // acknowledgements are enabled and the sink reports Errored, handle_batch_status discards + // the VRL-built response entirely and returns a 500 rejection instead. + spawn_collect_n( + async move { + let status = post_raw(addr, "hello\n").await.status().as_u16(); + assert_eq!( + status, 500, + "sink error with acks enabled overrides the configured response_source (bug: \ + expected 200 from response_source but got 500 from ack failure path)" + ); + }, + rx, + 1, + ) + .await; + } + async fn send_bytes(address: SocketAddr, body: Vec, headers: HeaderMap) -> u16 { reqwest::Client::new() .post(format!("http://{address}/")) diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 3414ea00264b0..cd36aba1b28cc 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, convert::Infallible, fmt, net::SocketAddr, time::Duration}; +use std::{ + collections::HashMap, convert::Infallible, fmt, net::SocketAddr, sync::Arc, time::Duration, +}; + +use serde_json::Value as JsonValue; use bytes::Bytes; use futures::{FutureExt, TryFutureExt}; @@ -7,9 +11,16 @@ use tokio::net::TcpStream; use tower::ServiceBuilder; use tracing::Span; use vector_lib::{ - EstimatedJsonEncodedSizeOf, + EstimatedJsonEncodedSizeOf, TimeZone, config::SourceAcknowledgementsConfig, - event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event}, + event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, EventMetadata, VrlTarget}, +}; +use vrl::{ + compiler::{ + Program, + runtime::{Runtime, Terminate}, + }, + value::Value, }; use warp::{ Filter, @@ -19,6 +30,7 @@ use warp::{ }, http::{HeaderMap, StatusCode}, reject::Rejection, + reply::Reply, }; use super::encoding::decompress_body; @@ -34,7 +46,34 @@ use crate::{ tls::{MaybeTlsIncomingStream, MaybeTlsSettings, TlsEnableableConfig}, }; +/// The decision returned by `build_vrl_response` after running the VRL `response_source` program. +/// +/// - `Forward(response)` — the program returned normally. Send events to the sink and, once the +/// sink acknowledges them, send `response` back to the HTTP client. +/// - `Reject(response)` — the program called `abort`. Drop the decoded events without forwarding +/// them to the sink and send `response` back to the HTTP client immediately. Acknowledgements +/// are never involved, so the response can never be overridden by a sink failure. +enum VrlResponseDecision { + Forward(warp::reply::Response), + Reject(warp::reply::Response), +} + pub trait HttpSource: Clone + Send + Sync + 'static { + /// Optional compiled VRL program used to generate the HTTP response body. + /// The default returns `None`, meaning the source responds with the configured status code + /// and an empty body. Override this in implementations that support `response_source`. + fn response_source(&self) -> Option> { + None + } + + /// The HTTP status code returned to the client when the `response_source` VRL program calls + /// `abort`. Signals that the request was intentionally rejected by the program before any + /// events were forwarded. Override this in implementations that expose a configurable + /// `reject_code`. + fn reject_code(&self) -> StatusCode { + StatusCode::BAD_REQUEST + } + // This function can be defined to enrich events with additional HTTP // metadata. This function should be used rather than internal enrichment so // that accurate byte count metrics can be emitted. @@ -169,7 +208,16 @@ pub trait HttpSource: Clone + Send + Sync + 'static { events }); - handle_request(events, acknowledgements, response_code, cx.out.clone()) + let response_source = self.response_source(); + let reject_code = self.reject_code(); + handle_request( + events, + acknowledgements, + response_code, + reject_code, + response_source, + cx.out.clone(), + ) }, ); @@ -255,22 +303,40 @@ async fn handle_request( events: Result, ErrorMessage>, acknowledgements: bool, response_code: StatusCode, + reject_code: StatusCode, + response_source: Option>, mut out: SourceSender, -) -> Result { +) -> Result { match events { Ok(mut events) => { - let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events); - - let count = events.len(); - out.send_batch(events) - .map_err(|_| { - // can only fail if receiving end disconnected, so we are shutting down, - // probably not gracefully. - emit!(StreamClosedError { count }); - warp::reject::custom(RejectShuttingDown) - }) - .and_then(|_| handle_batch_status(response_code, receiver)) - .await + let decision = match response_source { + Some(ref program) => { + build_vrl_response(&events, program, response_code, reject_code)? + } + None => VrlResponseDecision::Forward(response_code.into_response()), + }; + + match decision { + // The VRL program called `abort` — drop events, respond immediately. + // Acknowledgements are not involved, so the response is never overridden. + VrlResponseDecision::Reject(response) => Ok(response), + + // The VRL program returned normally — forward events to the sink and wait + // for the acknowledgement before responding to the HTTP client. + VrlResponseDecision::Forward(response) => { + let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events); + let count = events.len(); + out.send_batch(events) + .map_err(|_| { + // can only fail if receiving end disconnected, so we are shutting down, + // probably not gracefully. + emit!(StreamClosedError { count }); + warp::reject::custom(RejectShuttingDown) + }) + .and_then(|_| handle_batch_status(response, receiver)) + .await + } + } } Err(error) => { emit!(HttpBadRequest::new(error.code(), error.message())); @@ -280,13 +346,13 @@ async fn handle_request( } async fn handle_batch_status( - success_response_code: StatusCode, + success_response: warp::reply::Response, receiver: Option, -) -> Result { +) -> Result { match receiver { - None => Ok(success_response_code), + None => Ok(success_response), Some(receiver) => match receiver.await { - BatchStatus::Delivered => Ok(success_response_code), + BatchStatus::Delivered => Ok(success_response), BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new( StatusCode::INTERNAL_SERVER_ERROR, "Error delivering contents to sink".into(), @@ -298,3 +364,189 @@ async fn handle_batch_status( }, } } + +fn build_vrl_response( + events: &[Event], + program: &Program, + default_status: StatusCode, + reject_code: StatusCode, +) -> Result { + let event_values: Vec = events + .iter() + .filter_map(|e| e.maybe_as_log()) + .map(|log| log.value().clone()) + .collect(); + + let target_value = Value::Array(event_values); + let mut target = VrlTarget::LogEvent(target_value, EventMetadata::default()); + + match Runtime::default().resolve(&mut target, program, &TimeZone::default()) { + // The program called `abort`. Suppress event forwarding and respond immediately. + // + // The abort message can be either: + // - A plain string — used directly as the response body with `reject_code` as the status. + // - A JSON-encoded object with the same shape as the normal return path: + // `{ "status": , "body": , "headers": }`. + // When a JSON object is provided, `status`, `body`, and `headers` are each optional; + // `status` defaults to `reject_code` if omitted. + // + // This lets the VRL program vary both the status code and body per `abort` call without + // requiring any changes to the upstream VRL crate. + Err(Terminate::Abort(err)) => { + let message = match err { + vrl::compiler::expression::ExpressionError::Abort { message, .. } => message, + _ => None, + }; + let response = match message { + Some(msg) => match serde_json::from_str::(&msg) { + Ok(JsonValue::Object(obj)) => build_response_from_json_obj(&obj, reject_code)?, + // Not a JSON object (plain string, or JSON but not an object) — use as-is. + _ => build_plain_reject_response(reject_code, msg)?, + }, + None => warp::http::Response::builder() + .status(reject_code) + .body(hyper::Body::empty()) + .map_err(|err| { + warp::reject::custom(ErrorMessage::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to build reject response: {err}"), + )) + })?, + }; + Ok(VrlResponseDecision::Reject(response)) + } + Err(Terminate::Error(err)) => { + emit!(HttpInternalError { + message: &format!("VRL response program failed: {err}") + }); + Err(warp::reject::custom(ErrorMessage::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("VRL response program failed: {err}"), + ))) + } + Ok(Value::Bytes(body)) => { + let response = warp::http::Response::builder() + .status(default_status) + .body(hyper::Body::from(body)) + .map_err(|err| { + warp::reject::custom(ErrorMessage::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to build response: {err}"), + )) + })?; + Ok(VrlResponseDecision::Forward(response)) + } + Ok(Value::Object(obj)) => { + let response = build_response_from_vrl_obj(&obj, default_status)?; + Ok(VrlResponseDecision::Forward(response)) + } + Ok(other) => { + emit!(HttpInternalError { + message: "VRL response program returned unexpected type" + }); + Err(warp::reject::custom(ErrorMessage::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("VRL response program must return a string or object, got: {other:?}"), + ))) + } + } +} + +/// Builds an HTTP response from a VRL `Value::Object` returned by the `response_source` program. +/// Used by both the normal return path and the JSON-encoded abort path. +/// +/// The object may contain: +/// - `status` — integer HTTP status code; falls back to `default_status` if absent or invalid. +/// - `body` — string response body; empty if absent. +/// - `headers` — object of string header name → string value pairs; ignored if absent. +fn build_response_from_vrl_obj( + obj: &vrl::value::ObjectMap, + default_status: StatusCode, +) -> Result { + let status = obj + .get("status") + .and_then(|v| v.as_integer()) + .and_then(|n| StatusCode::from_u16(n as u16).ok()) + .unwrap_or(default_status); + + let body = obj + .get("body") + .map(|v| match v { + Value::Bytes(b) => hyper::Body::from(b.clone()), + other => hyper::Body::from(other.to_string()), + }) + .unwrap_or_else(hyper::Body::empty); + + let mut builder = warp::http::Response::builder().status(status); + + if let Some(Value::Object(headers)) = obj.get("headers") { + for (k, v) in headers { + if let Value::Bytes(v) = v { + builder = builder.header(k.as_str(), v.as_ref()); + } + } + } + + builder.body(body).map_err(|err| { + warp::reject::custom(ErrorMessage::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to build response: {err}"), + )) + }) +} + +/// Builds an HTTP response from a JSON object encoded in an `abort` message string. +/// +/// The object may contain: +/// - `status` — integer HTTP status code; falls back to `default_status` if absent or invalid. +/// - `body` — string response body; empty if absent. +/// - `headers` — object of string header name → string value pairs; ignored if absent. +fn build_response_from_json_obj( + obj: &serde_json::Map, + default_status: StatusCode, +) -> Result { + let status = obj + .get("status") + .and_then(|v| v.as_u64()) + .and_then(|n| StatusCode::from_u16(n as u16).ok()) + .unwrap_or(default_status); + + let body = obj + .get("body") + .and_then(|v| v.as_str()) + .map(|s| hyper::Body::from(s.to_owned())) + .unwrap_or_else(hyper::Body::empty); + + let mut builder = warp::http::Response::builder().status(status); + + if let Some(JsonValue::Object(headers)) = obj.get("headers") { + for (k, v) in headers { + if let Some(v) = v.as_str() { + builder = builder.header(k.as_str(), v); + } + } + } + + builder.body(body).map_err(|err| { + warp::reject::custom(ErrorMessage::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to build reject response: {err}"), + )) + }) +} + +/// Builds a plain-text reject response using `reject_code` as the status and `msg` as the body. +fn build_plain_reject_response( + reject_code: StatusCode, + msg: String, +) -> Result { + warp::http::Response::builder() + .status(reject_code) + .body(hyper::Body::from(msg)) + .map_err(|err| { + warp::reject::custom(ErrorMessage::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to build reject response: {err}"), + )) + }) +} diff --git a/website/cue/reference/components/sources/generated/http.cue b/website/cue/reference/components/sources/generated/http.cue index 259c800829438..298f9b744a8fd 100644 --- a/website/cue/reference/components/sources/generated/http.cue +++ b/website/cue/reference/components/sources/generated/http.cue @@ -696,6 +696,27 @@ generated: components: sources: http: configuration: { items: type: string: examples: ["application", "source", "param*", "*"] } } + reject_code: { + description: """ + The HTTP status code returned to the client when the `response_source` VRL program calls + `abort`. Use `abort "message"` in your VRL program to suppress event forwarding and return + this status code immediately, bypassing sink acknowledgements entirely. + + The appropriate code depends on the reason for rejection: for example, `400` for invalid + input, `403` for an unauthorized source, or `429` for a rate limit. Defaults to `400`. + + To vary the status code per `abort` call rather than using a single fixed value, pass a + JSON-encoded object as the abort message instead of a plain string: + `abort encode_json({ "status": 403, "body": "forbidden" })`. The object accepts the same + optional fields as the `response_source` return value: `status`, `body`, and `headers`. + When `status` is omitted from the object, this `reject_code` is used as the fallback. + """ + required: false + type: uint: { + default: 400 + examples: [400, 403] + } + } response_code: { description: "Specifies the HTTP response status code that will be returned on successful requests." required: false @@ -706,6 +727,60 @@ generated: components: sources: http: configuration: { ] } } + response_source: { + description: """ + A [VRL] program to generate the HTTP response sent back to the client. + + The program receives the decoded, enriched events where `.` is an array of event objects. + It must return either: + - A string, used directly as the response body with the configured `response_code`. + - An object with optional fields: `status` (integer), `body` (string), `headers` (object). + + ## Aborting and early rejection + + Call `abort` in the program to suppress event forwarding entirely and respond immediately, + bypassing sink acknowledgements. The abort message becomes the response body and + [`reject_code`][Self::reject_code] is used as the status code. + + To control both the status code and body per `abort` call, pass a JSON-encoded object as + the message: `abort encode_json({ "status": 403, "body": "forbidden" })`. The object + accepts the same optional fields as the normal return value: `status`, `body`, and + `headers`. Any field that is omitted falls back to the same default as the normal path + (`status` defaults to `reject_code`, `body` defaults to empty, `headers` to none). + + ## Acknowledgements and sink failures + + When [acknowledgements] are enabled, the response defined by this program is only sent to + the client if the downstream sink successfully delivers the events (`Delivered`). If the + sink reports a failure (`Errored` or `Rejected`), the configured response is discarded and + the client receives a `500 Internal Server Error` or `400 Bad Request` instead. + + Responses returned via `abort` are not subject to this — they are sent immediately before + any events reach the sink, so acknowledgement results can never override them. + + [acknowledgements]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + [VRL]: https://vector.dev/docs/reference/vrl + """ + required: false + type: string: { + examples: ["encode_json({ \"ids\": map_values(.) -> |e| { e.id } })", """ + parsed, err = parse_json(.[0].body) + if err != null { + { "status": 400, "body": "invalid JSON" } + } else { + { "status": 202, "body": encode_json(parsed), "headers": { "x-request-id": .[0].request_id } } + } + """, """ + row, err = get_enrichment_table_record("ip_allowlist", { "ip": .source_ip }) + if err != null { + abort encode_json({ "status": 403, "body": "source address not permitted" }) + } else { + { "status": 202, "body": encode_json({ "accepted": true }) } + } + """] + syntax: "remap_program" + } + } strict_path: { description: """ Whether or not to treat the configured `path` as an absolute path. diff --git a/website/cue/reference/components/sources/generated/http_server.cue b/website/cue/reference/components/sources/generated/http_server.cue index c84425dd8db2c..594cb2481bddc 100644 --- a/website/cue/reference/components/sources/generated/http_server.cue +++ b/website/cue/reference/components/sources/generated/http_server.cue @@ -696,6 +696,27 @@ generated: components: sources: http_server: configuration: { items: type: string: examples: ["application", "source", "param*", "*"] } } + reject_code: { + description: """ + The HTTP status code returned to the client when the `response_source` VRL program calls + `abort`. Use `abort "message"` in your VRL program to suppress event forwarding and return + this status code immediately, bypassing sink acknowledgements entirely. + + The appropriate code depends on the reason for rejection: for example, `400` for invalid + input, `403` for an unauthorized source, or `429` for a rate limit. Defaults to `400`. + + To vary the status code per `abort` call rather than using a single fixed value, pass a + JSON-encoded object as the abort message instead of a plain string: + `abort encode_json({ "status": 403, "body": "forbidden" })`. The object accepts the same + optional fields as the `response_source` return value: `status`, `body`, and `headers`. + When `status` is omitted from the object, this `reject_code` is used as the fallback. + """ + required: false + type: uint: { + default: 400 + examples: [400, 403] + } + } response_code: { description: "Specifies the HTTP response status code that will be returned on successful requests." required: false @@ -706,6 +727,60 @@ generated: components: sources: http_server: configuration: { ] } } + response_source: { + description: """ + A [VRL] program to generate the HTTP response sent back to the client. + + The program receives the decoded, enriched events where `.` is an array of event objects. + It must return either: + - A string, used directly as the response body with the configured `response_code`. + - An object with optional fields: `status` (integer), `body` (string), `headers` (object). + + ## Aborting and early rejection + + Call `abort` in the program to suppress event forwarding entirely and respond immediately, + bypassing sink acknowledgements. The abort message becomes the response body and + [`reject_code`][Self::reject_code] is used as the status code. + + To control both the status code and body per `abort` call, pass a JSON-encoded object as + the message: `abort encode_json({ "status": 403, "body": "forbidden" })`. The object + accepts the same optional fields as the normal return value: `status`, `body`, and + `headers`. Any field that is omitted falls back to the same default as the normal path + (`status` defaults to `reject_code`, `body` defaults to empty, `headers` to none). + + ## Acknowledgements and sink failures + + When [acknowledgements] are enabled, the response defined by this program is only sent to + the client if the downstream sink successfully delivers the events (`Delivered`). If the + sink reports a failure (`Errored` or `Rejected`), the configured response is discarded and + the client receives a `500 Internal Server Error` or `400 Bad Request` instead. + + Responses returned via `abort` are not subject to this — they are sent immediately before + any events reach the sink, so acknowledgement results can never override them. + + [acknowledgements]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + [VRL]: https://vector.dev/docs/reference/vrl + """ + required: false + type: string: { + examples: ["encode_json({ \"ids\": map_values(.) -> |e| { e.id } })", """ + parsed, err = parse_json(.[0].body) + if err != null { + { "status": 400, "body": "invalid JSON" } + } else { + { "status": 202, "body": encode_json(parsed), "headers": { "x-request-id": .[0].request_id } } + } + """, """ + row, err = get_enrichment_table_record("ip_allowlist", { "ip": .source_ip }) + if err != null { + abort encode_json({ "status": 403, "body": "source address not permitted" }) + } else { + { "status": 202, "body": encode_json({ "accepted": true }) } + } + """] + syntax: "remap_program" + } + } strict_path: { description: """ Whether or not to treat the configured `path` as an absolute path. From 689682438b9878f78fbd61b8e1b73b353810f0c9 Mon Sep 17 00:00:00 2001 From: Jared Patterson Date: Thu, 9 Apr 2026 15:05:34 +1200 Subject: [PATCH 2/7] Address PR Comments - Added a note for Non-log events being dropped - Added a note for EventMetadata::default()` loses vector-namespace metadata - Added fix for u16 truncation wraps out-of-range status codes - Added fix for JSON abort message without control keys silently drops body --- src/sources/http_server.rs | 56 ++++++++++++++++++++++++++++++++ src/sources/util/http/prelude.rs | 18 +++++++--- 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index d54b759a32274..92f3c418b0c1e 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -207,6 +207,14 @@ pub struct SimpleHttpConfig { /// Responses returned via `abort` are not subject to this — they are sent immediately before /// any events reach the sink, so acknowledgement results can never override them. /// + /// ## Log namespace + /// + /// When [`log_namespace`][Self::log_namespace] is set to `vector`, request metadata such as + /// the request path, headers, query parameters, and source IP are stored in event metadata + /// rather than in the event fields that the program can access via `.`. Programs that rely + /// on inspecting this context should use the default `legacy` log namespace, where all + /// metadata is written directly into the event fields before the program runs. + /// /// [acknowledgements]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ /// [VRL]: https://vector.dev/docs/reference/vrl #[configurable(metadata( @@ -1064,6 +1072,54 @@ mod tests { assert!(events.is_empty(), "abort must not forward events to sink"); } + /// When `abort` is called with a JSON object whose `status` value is out of the valid u16 + /// range, the cast must not wrap — `reject_code` is used as the fallback instead. + #[tokio::test] + async fn response_source_abort_json_out_of_range_status_falls_back_to_reject_code() { + // 65736 wraps to 200 with `as u16` — with `u16::try_from` it is rejected and + // `reject_code` (422) must be used instead. + let program = r#"abort encode_json({ "status": 65736, "body": "wrapped" })"#; + + let (rx, addr) = + source_with_program_and_reject_code(program, StatusCode::UNPROCESSABLE_ENTITY).await; + + let resp = post_raw(addr, "hello\n").await; + + assert_eq!( + resp.status().as_u16(), + 422, + "out-of-range status must not wrap via as u16 — reject_code should be used" + ); + assert_eq!(resp.text().await.unwrap(), "wrapped"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let events = collect_ready(rx).await; + assert!(events.is_empty(), "abort must not forward events to sink"); + } + + /// When `abort` is called with a JSON object that contains no recognised control keys + /// (`status`, `body`, `headers`), the entire message string is used as the response body + /// rather than silently dropping its content. + #[tokio::test] + async fn response_source_abort_json_without_control_keys_used_as_plain_body() { + // This object has no control keys — it should be treated as a plain string body, + // not as a (empty) control object that discards the content. + let program = r#"abort encode_json({ "error": "bad request" })"#; + + let (rx, addr) = + source_with_program_and_reject_code(program, StatusCode::BAD_REQUEST).await; + + let resp = post_raw(addr, "hello\n").await; + + assert_eq!(resp.status().as_u16(), 400); + // The full JSON string must be the body — content must not be silently dropped. + assert_eq!(resp.text().await.unwrap(), r#"{"error":"bad request"}"#); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let events = collect_ready(rx).await; + assert!(events.is_empty(), "abort must not forward events to sink"); + } + /// When `response_source` calls `abort` and acknowledgements are enabled, the response is /// returned immediately without ever touching the sink — so ack results cannot override it. #[tokio::test] diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index cd36aba1b28cc..a2daa4afaa7f5 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -399,8 +399,16 @@ fn build_vrl_response( }; let response = match message { Some(msg) => match serde_json::from_str::(&msg) { - Ok(JsonValue::Object(obj)) => build_response_from_json_obj(&obj, reject_code)?, - // Not a JSON object (plain string, or JSON but not an object) — use as-is. + Ok(JsonValue::Object(obj)) + if obj.contains_key("status") + || obj.contains_key("body") + || obj.contains_key("headers") => + { + build_response_from_json_obj(&obj, reject_code)? + } + // Not a JSON object, or a JSON object with no recognised control keys + // (e.g. `abort encode_json({"error": "bad"})`) — treat the whole message + // string as the response body so the content is not silently dropped. _ => build_plain_reject_response(reject_code, msg)?, }, None => warp::http::Response::builder() @@ -466,7 +474,8 @@ fn build_response_from_vrl_obj( let status = obj .get("status") .and_then(|v| v.as_integer()) - .and_then(|n| StatusCode::from_u16(n as u16).ok()) + .and_then(|n| u16::try_from(n).ok()) + .and_then(|n| StatusCode::from_u16(n).ok()) .unwrap_or(default_status); let body = obj @@ -508,7 +517,8 @@ fn build_response_from_json_obj( let status = obj .get("status") .and_then(|v| v.as_u64()) - .and_then(|n| StatusCode::from_u16(n as u16).ok()) + .and_then(|n| u16::try_from(n).ok()) + .and_then(|n| StatusCode::from_u16(n).ok()) .unwrap_or(default_status); let body = obj From a7c0cea50a083509294a9fdc47a9d48297b39d72 Mon Sep 17 00:00:00 2001 From: Jared Patterson Date: Wed, 24 Jun 2026 20:48:53 +1200 Subject: [PATCH 3/7] chore(http_server source): reconcile tests and docs after master merge The master merge added response_source and reject_code to SimpleHttpSource. This sets both fields in the three enrich_events tests so the test build compiles, fixes import ordering in the http prelude, and regenerates the http and http_server component docs. --- src/sources/http_server.rs | 28 +++++++++++-------- src/sources/util/http/prelude.rs | 26 ++++++++--------- .../components/sources/generated/http.cue | 10 ++++++- .../sources/generated/http_server.cue | 10 ++++++- 4 files changed, 48 insertions(+), 26 deletions(-) diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 7dd88c2a26ee9..b9dd736ce11a5 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -213,7 +213,7 @@ pub struct SimpleHttpConfig { /// sink reports a failure (`Errored` or `Rejected`), the configured response is discarded and /// the client receives a `500 Internal Server Error` or `400 Bad Request` instead. /// - /// Responses returned via `abort` are not subject to this — they are sent immediately before + /// Responses returned via `abort` are not subject to this. They are sent immediately before /// any events reach the sink, so acknowledgement results can never override them. /// /// ## Log namespace @@ -1070,7 +1070,7 @@ mod tests { } /// When `response_source` calls `abort` with a JSON-encoded object containing both `status` - /// and `body`, the client receives that exact status and body — not `reject_code`. + /// and `body`, the client receives that exact status and body, not `reject_code`. #[tokio::test] async fn response_source_abort_json_object_overrides_status_and_body() { let program = r#"abort encode_json({ "status": 403, "body": "forbidden" })"#; @@ -1099,7 +1099,7 @@ mod tests { let resp = post_raw(addr, "hello\n").await; - // No status in the object — reject_code (422) is used. + // No status in the object, so reject_code (422) is used. assert_eq!(resp.status().as_u16(), 422); assert_eq!(resp.text().await.unwrap(), "rejected"); @@ -1129,10 +1129,10 @@ mod tests { } /// When `abort` is called with a JSON object whose `status` value is out of the valid u16 - /// range, the cast must not wrap — `reject_code` is used as the fallback instead. + /// range, the cast must not wrap, so `reject_code` is used as the fallback instead. #[tokio::test] async fn response_source_abort_json_out_of_range_status_falls_back_to_reject_code() { - // 65736 wraps to 200 with `as u16` — with `u16::try_from` it is rejected and + // 65736 wraps to 200 with `as u16`. With `u16::try_from` it is rejected and // `reject_code` (422) must be used instead. let program = r#"abort encode_json({ "status": 65736, "body": "wrapped" })"#; @@ -1144,7 +1144,7 @@ mod tests { assert_eq!( resp.status().as_u16(), 422, - "out-of-range status must not wrap via as u16 — reject_code should be used" + "out-of-range status must not wrap via as u16, reject_code should be used" ); assert_eq!(resp.text().await.unwrap(), "wrapped"); @@ -1158,7 +1158,7 @@ mod tests { /// rather than silently dropping its content. #[tokio::test] async fn response_source_abort_json_without_control_keys_used_as_plain_body() { - // This object has no control keys — it should be treated as a plain string body, + // This object has no control keys, so it should be treated as a plain string body, // not as a (empty) control object that discards the content. let program = r#"abort encode_json({ "error": "bad request" })"#; @@ -1168,7 +1168,7 @@ mod tests { let resp = post_raw(addr, "hello\n").await; assert_eq!(resp.status().as_u16(), 400); - // The full JSON string must be the body — content must not be silently dropped. + // The full JSON string must be the body, content must not be silently dropped. assert_eq!(resp.text().await.unwrap(), r#"{"error":"bad request"}"#); tokio::time::sleep(std::time::Duration::from_millis(100)).await; @@ -1177,7 +1177,7 @@ mod tests { } /// When `response_source` calls `abort` and acknowledgements are enabled, the response is - /// returned immediately without ever touching the sink — so ack results cannot override it. + /// returned immediately without ever touching the sink, so ack results cannot override it. #[tokio::test] async fn response_source_abort_not_overridden_by_ack_failure() { // Program that always aborts. @@ -1215,7 +1215,7 @@ mod tests { ); assert_eq!(resp.text().await.unwrap(), "request rejected"); - // No events were forwarded — the stream should be empty. + // No events were forwarded, so the stream should be empty. tokio::time::sleep(std::time::Duration::from_millis(100)).await; let events = collect_ready(recv).await; assert!(events.is_empty(), "abort must not forward events to sink"); @@ -1229,7 +1229,7 @@ mod tests { /// silently discarded whenever the downstream sink fails and acknowledgements are enabled. /// /// The `BatchNotifier` attached to each event only resolves (and thus unblocks - /// `handle_batch_status`) once every cloned notifier is dropped — which happens when + /// `handle_batch_status`) once every cloned notifier is dropped, which happens when /// `new_test_finalize` finalizes the event on the consumer side. That means the HTTP response /// cannot complete until the event is actually pulled off the stream, so we must drive the /// stream concurrently with the HTTP request using `spawn_collect_n`. @@ -2326,6 +2326,8 @@ mod tests { host_key: OptionalValuePath::none(), decoder, log_namespace: LogNamespace::Vector, + response_source: None, + reject_code: StatusCode::BAD_REQUEST, }; let mut log = LogEvent::default(); @@ -2389,6 +2391,8 @@ mod tests { host_key: OptionalValuePath::none(), decoder, log_namespace: LogNamespace::Vector, + response_source: None, + reject_code: StatusCode::BAD_REQUEST, }; let mut log = LogEvent::default(); @@ -2445,6 +2449,8 @@ mod tests { host_key: OptionalValuePath::none(), decoder, log_namespace: LogNamespace::Vector, + response_source: None, + reject_code: StatusCode::BAD_REQUEST, }; let metric = Metric::new( diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 541d8278341f2..7a3894d72257b 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -15,6 +15,7 @@ use vector_lib::{ config::SourceAcknowledgementsConfig, event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, EventMetadata, VrlTarget}, }; +use vrl::value::ObjectMap; use vrl::{ compiler::{ Program, @@ -22,7 +23,6 @@ use vrl::{ }, value::Value, }; -use vrl::value::ObjectMap; use warp::{ Filter, filters::{ @@ -49,9 +49,9 @@ use crate::{ /// The decision returned by `build_vrl_response` after running the VRL `response_source` program. /// -/// - `Forward(response)` — the program returned normally. Send events to the sink and, once the +/// - `Forward(response)`: the program returned normally. Send events to the sink and, once the /// sink acknowledges them, send `response` back to the HTTP client. -/// - `Reject(response)` — the program called `abort`. Drop the decoded events without forwarding +/// - `Reject(response)`: the program called `abort`. Drop the decoded events without forwarding /// them to the sink and send `response` back to the HTTP client immediately. Acknowledgements /// are never involved, so the response can never be overridden by a sink failure. enum VrlResponseDecision { @@ -338,11 +338,11 @@ async fn handle_request( }; match decision { - // The VRL program called `abort` — drop events, respond immediately. + // The VRL program called `abort`, so drop events and respond immediately. // Acknowledgements are not involved, so the response is never overridden. VrlResponseDecision::Reject(response) => Ok(response), - // The VRL program returned normally — forward events to the sink and wait + // The VRL program returned normally, so forward events to the sink and wait // for the acknowledgement before responding to the HTTP client. VrlResponseDecision::Forward(response) => { let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events); @@ -405,7 +405,7 @@ fn build_vrl_response( // The program called `abort`. Suppress event forwarding and respond immediately. // // The abort message can be either: - // - A plain string — used directly as the response body with `reject_code` as the status. + // - A plain string, used directly as the response body with `reject_code` as the status. // - A JSON-encoded object with the same shape as the normal return path: // `{ "status": , "body": , "headers": }`. // When a JSON object is provided, `status`, `body`, and `headers` are each optional; @@ -428,7 +428,7 @@ fn build_vrl_response( build_response_from_json_obj(&obj, reject_code)? } // Not a JSON object, or a JSON object with no recognised control keys - // (e.g. `abort encode_json({"error": "bad"})`) — treat the whole message + // (e.g. `abort encode_json({"error": "bad"})`), treat the whole message // string as the response body so the content is not silently dropped. _ => build_plain_reject_response(reject_code, msg)?, }, @@ -485,9 +485,9 @@ fn build_vrl_response( /// Used by both the normal return path and the JSON-encoded abort path. /// /// The object may contain: -/// - `status` — integer HTTP status code; falls back to `default_status` if absent or invalid. -/// - `body` — string response body; empty if absent. -/// - `headers` — object of string header name → string value pairs; ignored if absent. +/// - `status`: integer HTTP status code; falls back to `default_status` if absent or invalid. +/// - `body`: string response body; empty if absent. +/// - `headers`: object of string header name to string value pairs; ignored if absent. fn build_response_from_vrl_obj( obj: &vrl::value::ObjectMap, default_status: StatusCode, @@ -528,9 +528,9 @@ fn build_response_from_vrl_obj( /// Builds an HTTP response from a JSON object encoded in an `abort` message string. /// /// The object may contain: -/// - `status` — integer HTTP status code; falls back to `default_status` if absent or invalid. -/// - `body` — string response body; empty if absent. -/// - `headers` — object of string header name → string value pairs; ignored if absent. +/// - `status`: integer HTTP status code; falls back to `default_status` if absent or invalid. +/// - `body`: string response body; empty if absent. +/// - `headers`: object of string header name to string value pairs; ignored if absent. fn build_response_from_json_obj( obj: &serde_json::Map, default_status: StatusCode, diff --git a/website/cue/reference/components/sources/generated/http.cue b/website/cue/reference/components/sources/generated/http.cue index 8a6195f54f320..abc9805997b0d 100644 --- a/website/cue/reference/components/sources/generated/http.cue +++ b/website/cue/reference/components/sources/generated/http.cue @@ -756,9 +756,17 @@ generated: components: sources: http: configuration: { sink reports a failure (`Errored` or `Rejected`), the configured response is discarded and the client receives a `500 Internal Server Error` or `400 Bad Request` instead. - Responses returned via `abort` are not subject to this — they are sent immediately before + Responses returned via `abort` are not subject to this. They are sent immediately before any events reach the sink, so acknowledgement results can never override them. + ## Log namespace + + When [`log_namespace`][Self::log_namespace] is set to `vector`, request metadata such as + the request path, headers, query parameters, and source IP are stored in event metadata + rather than in the event fields that the program can access via `.`. Programs that rely + on inspecting this context should use the default `legacy` log namespace, where all + metadata is written directly into the event fields before the program runs. + [acknowledgements]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ [VRL]: https://vector.dev/docs/reference/vrl """ diff --git a/website/cue/reference/components/sources/generated/http_server.cue b/website/cue/reference/components/sources/generated/http_server.cue index 17a39a2815dd1..b2b090e0084b8 100644 --- a/website/cue/reference/components/sources/generated/http_server.cue +++ b/website/cue/reference/components/sources/generated/http_server.cue @@ -756,9 +756,17 @@ generated: components: sources: http_server: configuration: { sink reports a failure (`Errored` or `Rejected`), the configured response is discarded and the client receives a `500 Internal Server Error` or `400 Bad Request` instead. - Responses returned via `abort` are not subject to this — they are sent immediately before + Responses returned via `abort` are not subject to this. They are sent immediately before any events reach the sink, so acknowledgement results can never override them. + ## Log namespace + + When [`log_namespace`][Self::log_namespace] is set to `vector`, request metadata such as + the request path, headers, query parameters, and source IP are stored in event metadata + rather than in the event fields that the program can access via `.`. Programs that rely + on inspecting this context should use the default `legacy` log namespace, where all + metadata is written directly into the event fields before the program runs. + [acknowledgements]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ [VRL]: https://vector.dev/docs/reference/vrl """ From 081956b8841215fd2bf99c393e7aa1ad5cb5cacd Mon Sep 17 00:00:00 2001 From: Jared Patterson Date: Fri, 26 Jun 2026 11:11:41 +1200 Subject: [PATCH 4/7] fix(http_server source): skip invalid response_source headers instead of dropping events A response_source program builds response headers from event data, so an invalid header name or value (for example a value containing CR/LF) put the response builder into an error state. build_vrl_response then returned a 500 before send_batch ran, turning a malformed optional header into dropped input events. Validate each header name and value up front and insert via headers_mut, skipping any invalid header with a warning. The accepted batch is always forwarded, and the rest of the response is returned intact. --- src/sources/http_server.rs | 57 ++++++++++++++++++++++++++++++++ src/sources/util/http/prelude.rs | 32 ++++++++++++++++-- 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index b9dd736ce11a5..0c8dce1b28e17 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -998,6 +998,63 @@ mod tests { .await; } + /// An invalid header name (spaces are not allowed) is skipped rather than failing the whole + /// response, so the valid header and body are returned and the events are still forwarded. + #[tokio::test] + async fn response_source_invalid_header_name_skipped_and_events_forwarded() { + assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { + let program = r#" + { + "body": "ok", + "headers": { "Bad Header Name": "v", "X-Good": "yes" } + } + "#; + let (rx, addr) = source_with_program(program, None).await; + + spawn_collect_n( + async move { + let resp = post_raw(addr, "hello\n").await; + assert_eq!(resp.status().as_u16(), 200); + assert!(resp.headers().get("bad header name").is_none()); + assert_eq!(resp.headers()["x-good"], "yes"); + assert_eq!(resp.text().await.unwrap(), "ok"); + }, + rx, + 1, + ) + .await; + }) + .await; + } + + /// An invalid header value (CR/LF is not allowed) is skipped rather than failing the whole + /// response, so the body is still returned and the events are still forwarded. + #[tokio::test] + async fn response_source_invalid_header_value_skipped_and_events_forwarded() { + assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { + let program = r#" + { + "body": "ok", + "headers": { "X-Test": "line1\nline2" } + } + "#; + let (rx, addr) = source_with_program(program, None).await; + + spawn_collect_n( + async move { + let resp = post_raw(addr, "hello\n").await; + assert_eq!(resp.status().as_u16(), 200); + assert!(resp.headers().get("x-test").is_none()); + assert_eq!(resp.text().await.unwrap(), "ok"); + }, + rx, + 1, + ) + .await; + }) + .await; + } + #[tokio::test] async fn response_source_invalid_vrl_fails_at_build() { let result = SimpleHttpConfig { diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 7a3894d72257b..32b25be66f147 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -29,7 +29,7 @@ use warp::{ BoxedFilter, path::{FullPath, Tail}, }, - http::{HeaderMap, StatusCode}, + http::{HeaderMap, HeaderName, HeaderValue, StatusCode}, reject::Rejection, reply::Reply, }; @@ -512,7 +512,7 @@ fn build_response_from_vrl_obj( if let Some(Value::Object(headers)) = obj.get("headers") { for (k, v) in headers { if let Value::Bytes(v) = v { - builder = builder.header(k.as_str(), v.as_ref()); + insert_validated_header(&mut builder, k.as_str(), v.as_ref()); } } } @@ -525,6 +525,32 @@ fn build_response_from_vrl_obj( }) } +/// Inserts a single response header after validating its name and value. +/// +/// Header data can be built from arbitrary event content, so an invalid name or value (for +/// example a value containing CR/LF) is skipped with a warning rather than failing the whole +/// response. Validating up front and inserting via `headers_mut` avoids latching an error on the +/// builder, which would otherwise make `body()` fail and drop the already-accepted events. +fn insert_validated_header(builder: &mut warp::http::response::Builder, name: &str, value: &[u8]) { + let header_name = match HeaderName::from_bytes(name.as_bytes()) { + Ok(name) => name, + Err(error) => { + warn!(message = "Skipping invalid response header name.", header = %name, %error); + return; + } + }; + let header_value = match HeaderValue::from_bytes(value) { + Ok(value) => value, + Err(error) => { + warn!(message = "Skipping invalid response header value.", header = %name, %error); + return; + } + }; + if let Some(headers) = builder.headers_mut() { + headers.insert(header_name, header_value); + } +} + /// Builds an HTTP response from a JSON object encoded in an `abort` message string. /// /// The object may contain: @@ -553,7 +579,7 @@ fn build_response_from_json_obj( if let Some(JsonValue::Object(headers)) = obj.get("headers") { for (k, v) in headers { if let Some(v) = v.as_str() { - builder = builder.header(k.as_str(), v); + insert_validated_header(&mut builder, k.as_str(), v.as_bytes()); } } } From 701eb254e438cce7d44dbb17ac88a381224ea99b Mon Sep 17 00:00:00 2001 From: Jared Patterson Date: Fri, 26 Jun 2026 11:17:07 +1200 Subject: [PATCH 5/7] fix(http_server source): reject invalid response_source return types at build compile_response_source only checked that the VRL program compiled, so a program like `response_source = "1"` was accepted even though the runtime can only turn a string or object into a response. Such a value hit the `unexpected type` branch at request time, returned a 500, and dropped the batch. Validate the program's result type at build, rejecting anything that cannot be a string or object. Programs that always `abort` have a `never` result type and are still allowed. --- src/sources/http_server.rs | 50 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 0c8dce1b28e17..2c35e784a7ec6 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -427,6 +427,17 @@ fn compile_response_source( warnings = %warnings_str ); } + + // The runtime only knows how to turn a string (body) or an object (`status`/`body`/`headers`) + // into a response; any other return type hits the `unexpected type` branch at request time, + // returns a 500, and drops the batch. Reject those statically so a config mistake fails fast + // instead of losing data at runtime. A program that always `abort`s has a `never` result and + // is allowed, since it never reaches the normal return path. + let result = program.final_type_info().result; + if !(result.is_never() || result.contains_bytes() || result.contains_object()) { + return Err("`response_source` must return a string or object.".into()); + } + Ok(Arc::new(program)) } @@ -1071,6 +1082,45 @@ mod tests { assert!(result.is_err(), "invalid VRL should fail at build time"); } + /// A program that compiles but returns a type the runtime cannot turn into a response (here an + /// integer) is rejected at build time rather than producing a 500 and dropping events. + #[tokio::test] + async fn response_source_non_string_object_return_fails_at_build() { + let result = SimpleHttpConfig { + address: "0.0.0.0:0".parse().unwrap(), + response_source: Some("1".to_owned()), + ..SimpleHttpConfig::default() + } + .build(SourceContext::new_test( + SourceSender::new_test_finalize(EventStatus::Delivered).0, + None, + )) + .await; + + assert!( + result.is_err(), + "a non-string/object return type should fail at build time" + ); + } + + /// A program that unconditionally `abort`s has a `never` return type and must still build, + /// so the return-type check does not reject valid reject-only programs. + #[tokio::test] + async fn response_source_abort_only_program_builds() { + let result = SimpleHttpConfig { + address: "0.0.0.0:0".parse().unwrap(), + response_source: Some(r#"abort "rejected""#.to_owned()), + ..SimpleHttpConfig::default() + } + .build(SourceContext::new_test( + SourceSender::new_test_finalize(EventStatus::Delivered).0, + None, + )) + .await; + + assert!(result.is_ok(), "an abort-only program should build"); + } + /// Spawn an `http_server` source with a `response_source`, a specific `EventStatus` finalizer, /// and acknowledgements enabled. This lets us simulate sink failure while also having a custom /// VRL response configured. From 966bff655c07af2f9d3b103fbb27b8302b7dd753 Mon Sep 17 00:00:00 2001 From: Jared Patterson Date: Fri, 26 Jun 2026 11:29:52 +1200 Subject: [PATCH 6/7] docs(http_server source): note response_source only sees log events The response_source program builds its input from log events only. When a decoder that emits metric or trace events (native, native_json, otlp) is configured, those events are still forwarded to the sink but are absent from `.`, so logic such as `length(.)` accounts only for log events. Document this so the limitation is explicit. --- src/sources/http_server.rs | 7 +++++++ .../cue/reference/components/sources/generated/http.cue | 7 +++++++ .../reference/components/sources/generated/http_server.cue | 7 +++++++ 3 files changed, 21 insertions(+) diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 2c35e784a7ec6..da1c4350327cd 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -194,6 +194,13 @@ pub struct SimpleHttpConfig { /// - A string, used directly as the response body with the configured `response_code`. /// - An object with optional fields: `status` (integer), `body` (string), `headers` (object). /// + /// ## Event types + /// + /// The program only sees log events. When a decoder that emits metric or trace events is + /// configured (for example `native`, `native_json`, or `otlp`), those events are still + /// forwarded to the sink but are not included in `.`. Programs that count or inspect events + /// (for example `length(.)`) therefore account only for log events. + /// /// ## Aborting and early rejection /// /// Call `abort` in the program to suppress event forwarding entirely and respond immediately, diff --git a/website/cue/reference/components/sources/generated/http.cue b/website/cue/reference/components/sources/generated/http.cue index abc9805997b0d..a05a4d8830433 100644 --- a/website/cue/reference/components/sources/generated/http.cue +++ b/website/cue/reference/components/sources/generated/http.cue @@ -737,6 +737,13 @@ generated: components: sources: http: configuration: { - A string, used directly as the response body with the configured `response_code`. - An object with optional fields: `status` (integer), `body` (string), `headers` (object). + ## Event types + + The program only sees log events. When a decoder that emits metric or trace events is + configured (for example `native`, `native_json`, or `otlp`), those events are still + forwarded to the sink but are not included in `.`. Programs that count or inspect events + (for example `length(.)`) therefore account only for log events. + ## Aborting and early rejection Call `abort` in the program to suppress event forwarding entirely and respond immediately, diff --git a/website/cue/reference/components/sources/generated/http_server.cue b/website/cue/reference/components/sources/generated/http_server.cue index b2b090e0084b8..560a4dd7e1fd8 100644 --- a/website/cue/reference/components/sources/generated/http_server.cue +++ b/website/cue/reference/components/sources/generated/http_server.cue @@ -737,6 +737,13 @@ generated: components: sources: http_server: configuration: { - A string, used directly as the response body with the configured `response_code`. - An object with optional fields: `status` (integer), `body` (string), `headers` (object). + ## Event types + + The program only sees log events. When a decoder that emits metric or trace events is + configured (for example `native`, `native_json`, or `otlp`), those events are still + forwarded to the sink but are not included in `.`. Programs that count or inspect events + (for example `length(.)`) therefore account only for log events. + ## Aborting and early rejection Call `abort` in the program to suppress event forwarding entirely and respond immediately, From fb71a472d6bb01bee72e33932c0d3cac284b1d06 Mon Sep 17 00:00:00 2001 From: Jared Patterson Date: Fri, 26 Jun 2026 11:51:45 +1200 Subject: [PATCH 7/7] fix(http_server source): reject mixed response_source return kinds at build The return-type check accepted any program whose result contained a supported kind, so a union like `if cond { "ok" } else { 1 }` passed and then dropped the batch with a 500 on the integer branch. Require the result to be a subset of the supported kinds (string, object, or an abort-only `never`). --- src/sources/http_server.rs | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index da1c4350327cd..e1f031c9a8ffc 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -437,11 +437,15 @@ fn compile_response_source( // The runtime only knows how to turn a string (body) or an object (`status`/`body`/`headers`) // into a response; any other return type hits the `unexpected type` branch at request time, - // returns a 500, and drops the batch. Reject those statically so a config mistake fails fast - // instead of losing data at runtime. A program that always `abort`s has a `never` result and - // is allowed, since it never reaches the normal return path. - let result = program.final_type_info().result; - if !(result.is_never() || result.contains_bytes() || result.contains_object()) { + // returns a 500, and drops the batch. Require the result to be a subset of those supported + // kinds, not merely to contain one of them, so a mixed union such as `if cond { "ok" } else + // { 1 }` is rejected rather than dropping the batch on the unsupported branch. A program that + // always `abort`s has a `never` result, which is a subset of any kind and so is allowed. + let allowed = Kind::bytes().or_object(Collection::any()); + if allowed + .is_superset(&program.final_type_info().result) + .is_err() + { return Err("`response_source` must return a string or object.".into()); } @@ -1110,6 +1114,28 @@ mod tests { ); } + /// A program whose result is a union of a supported and an unsupported kind (here string or + /// integer) is rejected at build time. The integer branch would otherwise reach the runtime + /// `unexpected type` path, return a 500, and drop the batch. + #[tokio::test] + async fn response_source_mixed_return_kinds_fails_at_build() { + let result = SimpleHttpConfig { + address: "0.0.0.0:0".parse().unwrap(), + response_source: Some(r#"if exists(.[0].ok) { "ok" } else { 1 }"#.to_owned()), + ..SimpleHttpConfig::default() + } + .build(SourceContext::new_test( + SourceSender::new_test_finalize(EventStatus::Delivered).0, + None, + )) + .await; + + assert!( + result.is_err(), + "a union with an unsupported return kind should fail at build time" + ); + } + /// A program that unconditionally `abort`s has a `never` return type and must still build, /// so the return-type check does not reject valid reject-only programs. #[tokio::test]