From d680a313d7af5d256aeb217ff36cc36f62920bb7 Mon Sep 17 00:00:00 2001 From: Robert Blafford Date: Fri, 22 May 2026 11:56:39 -0400 Subject: [PATCH 1/6] fix(fanout): drop empty event batches before they reach downstream buffers --- lib/vector-core/src/fanout.rs | 66 ++++++++++++++++++++++-- lib/vector-core/src/transform/outputs.rs | 2 +- src/topology/builder.rs | 4 +- 3 files changed, 65 insertions(+), 7 deletions(-) diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index 97e21bcc8db17..6dbd644f126b6 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -7,7 +7,10 @@ use tokio::sync::mpsc; use tokio_util::sync::ReusableBoxFuture; use vector_buffers::topology::channel::BufferSender; -use crate::{config::ComponentKey, event::EventArray}; +use crate::{ + config::ComponentKey, + event::{EventArray, EventContainer}, +}; pub enum ControlMessage { /// Adds a new sink to the fanout. @@ -45,15 +48,17 @@ pub type ControlChannel = mpsc::UnboundedSender; pub struct Fanout { senders: IndexMap>, control_channel: mpsc::UnboundedReceiver, + upstream_component: ComponentKey, } impl Fanout { - pub fn new() -> (Self, ControlChannel) { + pub fn new(upstream_component: ComponentKey) -> (Self, ControlChannel) { let (control_tx, control_rx) = mpsc::unbounded_channel(); let fanout = Self { senders: Default::default(), control_channel: control_rx, + upstream_component, }; (fanout, control_tx) @@ -220,6 +225,28 @@ impl Fanout { // Wait for any senders that are paused to be replaced first before continuing with the send. self.wait_for_replacements().await; + // Drop empty event batches before they reach any downstream buffer, this is technically + // programmer error. In debug/test builds the `debug_assert!` makes the underlying bug fail + // loudly; in release builds the batch is dropped with a rate-limited warning naming the + // upstream component. + debug_assert!( + !events.is_empty(), + "Fanout received empty event batch from upstream component '{}'", + self.upstream_component, + ); + // TODO: Wrap the conditional below with `std::hint::unlikely` once it stabilizes. This is an + // applicable situation to use it in since the following conditional should never evaluate to + // true. + #[cfg(not(debug_assertions))] + if events.is_empty() { + warn!( + message = "Dropping empty event batch emitted by upstream component. This is likely a bug in that component.", + component_id = %self.upstream_component, + downstream_count = self.senders.len(), + ); + return Ok(()); + } + // Nothing to send if we have no sender. if self.senders.is_empty() { trace!("No senders present."); @@ -525,7 +552,7 @@ mod tests { UnboundedSender, Vec>, ) { - let (mut fanout, control) = Fanout::new(); + let (mut fanout, control) = Fanout::new(ComponentKey::from("test_upstream")); let pairs = build_sender_pairs(capacities); let mut receivers = Vec::new(); @@ -827,7 +854,7 @@ mod tests { #[tokio::test] async fn fanout_no_sinks() { - let (mut fanout, _) = Fanout::new(); + let (mut fanout, _) = Fanout::new(ComponentKey::from("test_upstream")); let events = make_events(2); fanout @@ -840,6 +867,37 @@ mod tests { .expect("should not fail"); } + // The empty-batch guard panics via `debug_assert!` in debug builds and silently drops in + // release builds. The two tests below cover each half of that behavior. + #[tokio::test] + #[cfg(not(debug_assertions))] + async fn fanout_drops_empty_event_array_in_release_builds() { + let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]); + let empty: EventArray = Vec::::new().into(); + + fanout + .send(empty, None) + .await + .expect("empty batch should be dropped, not errored"); + + for receiver in receivers { + assert!( + collect_ready(receiver.into_stream()).is_empty(), + "no downstream receiver should observe the empty batch", + ); + } + } + + #[tokio::test] + #[cfg(debug_assertions)] + #[should_panic(expected = "Fanout received empty event batch from upstream component")] + async fn fanout_panics_on_empty_event_array_in_debug_builds() { + let (mut fanout, _, _receivers) = fanout_from_senders(&[2, 2]); + let empty: EventArray = Vec::::new().into(); + + let _ = fanout.send(empty, None).await; + } + #[tokio::test] async fn fanout_replace() { let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]); diff --git a/lib/vector-core/src/transform/outputs.rs b/lib/vector-core/src/transform/outputs.rs index 7918fb008f764..113f5f2b3418f 100644 --- a/lib/vector-core/src/transform/outputs.rs +++ b/lib/vector-core/src/transform/outputs.rs @@ -42,7 +42,7 @@ impl TransformOutputs { let mut controls = HashMap::new(); for output in outputs_in { - let (fanout, control) = Fanout::new(); + let (fanout, control) = Fanout::new(component_key.clone()); let log_schema_definitions = output .log_schema_definitions diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 8be9a0ce73d7a..6ed0206446597 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -300,7 +300,7 @@ impl<'a> Builder<'a> { for output in source_outputs.into_iter() { let rx = builder.add_source_output(output.clone(), key.clone()); - let (fanout, control) = Fanout::new(); + let (fanout, control) = Fanout::new(key.clone()); let source_type = source.inner.get_component_name(); let source = Arc::new(key.clone()); @@ -832,7 +832,7 @@ impl<'a> Builder<'a> { key: &ComponentKey, outputs: &[TransformOutput], ) -> (Task, HashMap) { - let (mut fanout, control) = Fanout::new(); + let (mut fanout, control) = Fanout::new(key.clone()); let sender = self .utilization_registry From aed3fa115e2f8b534a0191847d9e0c5bfd31c84c Mon Sep 17 00:00:00 2001 From: Robert Blafford Date: Fri, 22 May 2026 12:01:18 -0400 Subject: [PATCH 2/6] Add changelog fragment --- changelog.d/drop_empty_batches_fanout.fix.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog.d/drop_empty_batches_fanout.fix.md diff --git a/changelog.d/drop_empty_batches_fanout.fix.md b/changelog.d/drop_empty_batches_fanout.fix.md new file mode 100644 index 0000000000000..ae1c34ac1861f --- /dev/null +++ b/changelog.d/drop_empty_batches_fanout.fix.md @@ -0,0 +1,5 @@ +Drops empty event batches before they reach downstream buffers. +Defense against possible programmer errors in new components, +this prevents Vector from crashing and instead logs a warning. + +authors: graphcareful From 44699fad329b24401b4a707c938db66d722baf26 Mon Sep 17 00:00:00 2001 From: Rob Blafford Date: Fri, 22 May 2026 16:54:41 -0400 Subject: [PATCH 3/6] Remove conditional compilation release build for test case --- lib/vector-core/src/fanout.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index 6dbd644f126b6..dee7979b25918 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -870,7 +870,6 @@ mod tests { // The empty-batch guard panics via `debug_assert!` in debug builds and silently drops in // release builds. The two tests below cover each half of that behavior. #[tokio::test] - #[cfg(not(debug_assertions))] async fn fanout_drops_empty_event_array_in_release_builds() { let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]); let empty: EventArray = Vec::::new().into(); From cf685dd3e27471ce29f6bf007611bb827a863e08 Mon Sep 17 00:00:00 2001 From: Robert Blafford Date: Tue, 26 May 2026 08:47:52 -0400 Subject: [PATCH 4/6] fix(fanout): Remove release_build variant of test --- lib/vector-core/src/fanout.rs | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index dee7979b25918..2a3015ac7910f 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -867,28 +867,7 @@ mod tests { .expect("should not fail"); } - // The empty-batch guard panics via `debug_assert!` in debug builds and silently drops in - // release builds. The two tests below cover each half of that behavior. #[tokio::test] - async fn fanout_drops_empty_event_array_in_release_builds() { - let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]); - let empty: EventArray = Vec::::new().into(); - - fanout - .send(empty, None) - .await - .expect("empty batch should be dropped, not errored"); - - for receiver in receivers { - assert!( - collect_ready(receiver.into_stream()).is_empty(), - "no downstream receiver should observe the empty batch", - ); - } - } - - #[tokio::test] - #[cfg(debug_assertions)] #[should_panic(expected = "Fanout received empty event batch from upstream component")] async fn fanout_panics_on_empty_event_array_in_debug_builds() { let (mut fanout, _, _receivers) = fanout_from_senders(&[2, 2]); From d57a417cccbf0cb1c672c4901fcae7babbffb3b2 Mon Sep 17 00:00:00 2001 From: Rob Blafford Date: Tue, 26 May 2026 09:55:56 -0400 Subject: [PATCH 5/6] Update changelog.d/drop_empty_batches_fanout.fix.md Co-authored-by: Pavlos Rontidis --- changelog.d/drop_empty_batches_fanout.fix.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/changelog.d/drop_empty_batches_fanout.fix.md b/changelog.d/drop_empty_batches_fanout.fix.md index ae1c34ac1861f..12ebfcd682555 100644 --- a/changelog.d/drop_empty_batches_fanout.fix.md +++ b/changelog.d/drop_empty_batches_fanout.fix.md @@ -1,5 +1,4 @@ -Drops empty event batches before they reach downstream buffers. -Defense against possible programmer errors in new components, -this prevents Vector from crashing and instead logs a warning. +Fixed a crash that could occur when a source or transform emitted an empty event batch into a topology with downstream buffers. Vector now +drops empty batches before they reach those buffers and logs a warning identifying the upstream component. authors: graphcareful From a520c52e9b90d78bd800951fa25ad129a35cc778 Mon Sep 17 00:00:00 2001 From: Rob Blafford Date: Tue, 26 May 2026 09:56:20 -0400 Subject: [PATCH 6/6] Update comment in lib/vector-core/src/fanout.rs Co-authored-by: Pavlos Rontidis --- lib/vector-core/src/fanout.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index 2a3015ac7910f..33ff30ae3e72a 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -227,8 +227,7 @@ impl Fanout { // Drop empty event batches before they reach any downstream buffer, this is technically // programmer error. In debug/test builds the `debug_assert!` makes the underlying bug fail - // loudly; in release builds the batch is dropped with a rate-limited warning naming the - // upstream component. + // loudly. debug_assert!( !events.is_empty(), "Fanout received empty event batch from upstream component '{}'",