Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/drop_empty_batches_fanout.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fixed a crash that could occur when a source or transform emitted an empty event batch into a topology with downstream buffers. Vector now
drops empty batches before they reach those buffers and logs a warning identifying the upstream component.

authors: graphcareful
43 changes: 39 additions & 4 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use tokio::sync::mpsc;
use tokio_util::sync::ReusableBoxFuture;
use vector_buffers::topology::channel::BufferSender;

use crate::{config::ComponentKey, event::EventArray};
use crate::{
config::ComponentKey,
event::{EventArray, EventContainer},
};

pub enum ControlMessage {
/// Adds a new sink to the fanout.
Expand Down Expand Up @@ -45,15 +48,17 @@ pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
pub struct Fanout {
senders: IndexMap<ComponentKey, Option<Sender>>,
control_channel: mpsc::UnboundedReceiver<ControlMessage>,
upstream_component: ComponentKey,
}

impl Fanout {
pub fn new() -> (Self, ControlChannel) {
pub fn new(upstream_component: ComponentKey) -> (Self, ControlChannel) {
let (control_tx, control_rx) = mpsc::unbounded_channel();

let fanout = Self {
senders: Default::default(),
control_channel: control_rx,
upstream_component,
};

(fanout, control_tx)
Expand Down Expand Up @@ -220,6 +225,27 @@ impl Fanout {
// Wait for any senders that are paused to be replaced first before continuing with the send.
self.wait_for_replacements().await;

// Drop empty event batches before they reach any downstream buffer, this is technically
// programmer error. In debug/test builds the `debug_assert!` makes the underlying bug fail
// loudly.
debug_assert!(
!events.is_empty(),
"Fanout received empty event batch from upstream component '{}'",
self.upstream_component,
);
// TODO: Wrap the conditional below with `std::hint::unlikely` once it stabilizes. This is an
// applicable situation to use it in since the following conditional should never evaluate to
// true.
#[cfg(not(debug_assertions))]
if events.is_empty() {
warn!(
message = "Dropping empty event batch emitted by upstream component. This is likely a bug in that component.",
component_id = %self.upstream_component,
downstream_count = self.senders.len(),
);
Comment thread
graphcareful marked this conversation as resolved.
return Ok(());
}

// Nothing to send if we have no sender.
if self.senders.is_empty() {
trace!("No senders present.");
Expand Down Expand Up @@ -525,7 +551,7 @@ mod tests {
UnboundedSender<ControlMessage>,
Vec<BufferReceiver<EventArray>>,
) {
let (mut fanout, control) = Fanout::new();
let (mut fanout, control) = Fanout::new(ComponentKey::from("test_upstream"));
let pairs = build_sender_pairs(capacities);

let mut receivers = Vec::new();
Expand Down Expand Up @@ -827,7 +853,7 @@ mod tests {

#[tokio::test]
async fn fanout_no_sinks() {
let (mut fanout, _) = Fanout::new();
let (mut fanout, _) = Fanout::new(ComponentKey::from("test_upstream"));
let events = make_events(2);

fanout
Expand All @@ -840,6 +866,15 @@ mod tests {
.expect("should not fail");
}

#[tokio::test]
#[should_panic(expected = "Fanout received empty event batch from upstream component")]
async fn fanout_panics_on_empty_event_array_in_debug_builds() {
let (mut fanout, _, _receivers) = fanout_from_senders(&[2, 2]);
let empty: EventArray = Vec::<LogEvent>::new().into();

let _ = fanout.send(empty, None).await;
}

#[tokio::test]
async fn fanout_replace() {
let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]);
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/transform/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl TransformOutputs {
let mut controls = HashMap::new();

for output in outputs_in {
let (fanout, control) = Fanout::new();
let (fanout, control) = Fanout::new(component_key.clone());

let log_schema_definitions = output
.log_schema_definitions
Expand Down
4 changes: 2 additions & 2 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl<'a> Builder<'a> {
for output in source_outputs.into_iter() {
let rx = builder.add_source_output(output.clone(), key.clone());

let (fanout, control) = Fanout::new();
let (fanout, control) = Fanout::new(key.clone());
let source_type = source.inner.get_component_name();
let source = Arc::new(key.clone());

Expand Down Expand Up @@ -832,7 +832,7 @@ impl<'a> Builder<'a> {
key: &ComponentKey,
outputs: &[TransformOutput],
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();
let (mut fanout, control) = Fanout::new(key.clone());

let sender = self
.utilization_registry
Expand Down
Loading