diff --git a/changelog.d/drop_empty_batches_fanout.fix.md b/changelog.d/drop_empty_batches_fanout.fix.md new file mode 100644 index 0000000000000..12ebfcd682555 --- /dev/null +++ b/changelog.d/drop_empty_batches_fanout.fix.md @@ -0,0 +1,4 @@ +Fixed a crash that could occur when a source or transform emitted an empty event batch into a topology with downstream buffers. Vector now +drops empty batches before they reach those buffers and logs a warning identifying the upstream component. + +authors: graphcareful diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index 97e21bcc8db17..33ff30ae3e72a 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -7,7 +7,10 @@ use tokio::sync::mpsc; use tokio_util::sync::ReusableBoxFuture; use vector_buffers::topology::channel::BufferSender; -use crate::{config::ComponentKey, event::EventArray}; +use crate::{ + config::ComponentKey, + event::{EventArray, EventContainer}, +}; pub enum ControlMessage { /// Adds a new sink to the fanout. @@ -45,15 +48,17 @@ pub type ControlChannel = mpsc::UnboundedSender; pub struct Fanout { senders: IndexMap>, control_channel: mpsc::UnboundedReceiver, + upstream_component: ComponentKey, } impl Fanout { - pub fn new() -> (Self, ControlChannel) { + pub fn new(upstream_component: ComponentKey) -> (Self, ControlChannel) { let (control_tx, control_rx) = mpsc::unbounded_channel(); let fanout = Self { senders: Default::default(), control_channel: control_rx, + upstream_component, }; (fanout, control_tx) @@ -220,6 +225,27 @@ impl Fanout { // Wait for any senders that are paused to be replaced first before continuing with the send. self.wait_for_replacements().await; + // Drop empty event batches before they reach any downstream buffer, this is technically + // programmer error. In debug/test builds the `debug_assert!` makes the underlying bug fail + // loudly. + debug_assert!( + !events.is_empty(), + "Fanout received empty event batch from upstream component '{}'", + self.upstream_component, + ); + // TODO: Wrap the conditional below with `std::hint::unlikely` once it stabilizes. This is an + // applicable situation to use it in since the following conditional should never evaluate to + // true. + #[cfg(not(debug_assertions))] + if events.is_empty() { + warn!( + message = "Dropping empty event batch emitted by upstream component. This is likely a bug in that component.", + component_id = %self.upstream_component, + downstream_count = self.senders.len(), + ); + return Ok(()); + } + // Nothing to send if we have no sender. if self.senders.is_empty() { trace!("No senders present."); @@ -525,7 +551,7 @@ mod tests { UnboundedSender, Vec>, ) { - let (mut fanout, control) = Fanout::new(); + let (mut fanout, control) = Fanout::new(ComponentKey::from("test_upstream")); let pairs = build_sender_pairs(capacities); let mut receivers = Vec::new(); @@ -827,7 +853,7 @@ mod tests { #[tokio::test] async fn fanout_no_sinks() { - let (mut fanout, _) = Fanout::new(); + let (mut fanout, _) = Fanout::new(ComponentKey::from("test_upstream")); let events = make_events(2); fanout @@ -840,6 +866,15 @@ mod tests { .expect("should not fail"); } + #[tokio::test] + #[should_panic(expected = "Fanout received empty event batch from upstream component")] + async fn fanout_panics_on_empty_event_array_in_debug_builds() { + let (mut fanout, _, _receivers) = fanout_from_senders(&[2, 2]); + let empty: EventArray = Vec::::new().into(); + + let _ = fanout.send(empty, None).await; + } + #[tokio::test] async fn fanout_replace() { let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]); diff --git a/lib/vector-core/src/transform/outputs.rs b/lib/vector-core/src/transform/outputs.rs index 7918fb008f764..113f5f2b3418f 100644 --- a/lib/vector-core/src/transform/outputs.rs +++ b/lib/vector-core/src/transform/outputs.rs @@ -42,7 +42,7 @@ impl TransformOutputs { let mut controls = HashMap::new(); for output in outputs_in { - let (fanout, control) = Fanout::new(); + let (fanout, control) = Fanout::new(component_key.clone()); let log_schema_definitions = output .log_schema_definitions diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 8be9a0ce73d7a..6ed0206446597 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -300,7 +300,7 @@ impl<'a> Builder<'a> { for output in source_outputs.into_iter() { let rx = builder.add_source_output(output.clone(), key.clone()); - let (fanout, control) = Fanout::new(); + let (fanout, control) = Fanout::new(key.clone()); let source_type = source.inner.get_component_name(); let source = Arc::new(key.clone()); @@ -832,7 +832,7 @@ impl<'a> Builder<'a> { key: &ComponentKey, outputs: &[TransformOutput], ) -> (Task, HashMap) { - let (mut fanout, control) = Fanout::new(); + let (mut fanout, control) = Fanout::new(key.clone()); let sender = self .utilization_registry