Skip to content

Commit 6957e46

Browse files
committed
add global option to loosen ordering guarantees in stateless transforms in exchange for potential performance benefits
1 parent 17c9244 commit 6957e46

4 files changed

Lines changed: 93 additions & 3 deletions

File tree

lib/vector-core/src/config/global_options.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,15 @@ pub struct GlobalOptions {
190190
/// `find_vector_metrics`, and `aggregate_vector_metrics` functions.
191191
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
192192
pub metrics_storage_refresh_period: Option<f64>,
193+
194+
/// Whether to preserve event ordering for concurrent stateless transforms.
195+
///
196+
/// When `true` (the default), concurrent stateless transforms use `FuturesOrdered`,
197+
/// guaranteeing output order matches input order. When `false`, `FuturesUnordered`
198+
/// is used, which may improve throughput at the cost of ordering guarantees.
199+
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
200+
#[configurable(metadata(docs::common = false, docs::required = false))]
201+
pub preserve_ordering_stateless_transforms: Option<bool>,
193202
}
194203

195204
impl_generate_config_from_default!(GlobalOptions);
@@ -295,6 +304,16 @@ impl GlobalOptions {
295304
errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
296305
}
297306

307+
if conflicts(
308+
self.preserve_ordering_stateless_transforms.as_ref(),
309+
with.preserve_ordering_stateless_transforms.as_ref(),
310+
) {
311+
errors.push(
312+
"conflicting values for 'preserve_ordering_stateless_transforms' found"
313+
.to_owned(),
314+
);
315+
}
316+
298317
let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
299318
with.data_dir
300319
} else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
@@ -345,6 +364,9 @@ impl GlobalOptions {
345364
metrics_storage_refresh_period: self
346365
.metrics_storage_refresh_period
347366
.or(with.metrics_storage_refresh_period),
367+
preserve_ordering_stateless_transforms: self
368+
.preserve_ordering_stateless_transforms
369+
.or(with.preserve_ordering_stateless_transforms),
348370
})
349371
} else {
350372
Err(errors)

src/topology/builder.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99
time::Instant,
1010
};
1111

12-
use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
12+
use futures::{FutureExt, StreamExt, TryStreamExt};
1313
use futures_util::stream::FuturesUnordered;
1414
use metrics::{gauge, histogram};
1515
use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
@@ -751,13 +751,19 @@ impl<'a> Builder<'a> {
751751
let sender = self
752752
.utilization_registry
753753
.add_component(node.key.clone(), gauge!("utilization"));
754+
let preserve_ordering = self
755+
.config
756+
.global
757+
.preserve_ordering_stateless_transforms
758+
.unwrap_or(true);
754759
let mut runner = Runner::new(
755760
t,
756761
input_rx,
757762
sender,
758763
node.input_details.data_type(),
759764
outputs,
760765
LatencyRecorder::new(self.config.global.latency_ewma_alpha),
766+
preserve_ordering,
761767
);
762768
runner.component_id = node.key.id().to_owned();
763769
let transform = if node.enable_concurrency {
@@ -1135,6 +1141,7 @@ struct Runner {
11351141
latency_recorder: LatencyRecorder,
11361142
events_received: Registered<EventsReceived>,
11371143
component_id: String,
1144+
preserve_ordering: bool,
11381145
}
11391146

11401147
impl Runner {
@@ -1145,6 +1152,7 @@ impl Runner {
11451152
input_type: DataType,
11461153
outputs: TransformOutputs,
11471154
latency_recorder: LatencyRecorder,
1155+
preserve_ordering: bool,
11481156
) -> Self {
11491157
Self {
11501158
transform,
@@ -1155,6 +1163,7 @@ impl Runner {
11551163
latency_recorder,
11561164
events_received: register!(EventsReceived),
11571165
component_id: String::new(),
1166+
preserve_ordering,
11581167
}
11591168
}
11601169

@@ -1210,7 +1219,8 @@ impl Runner {
12101219
let mut input_rx =
12111220
super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
12121221

1213-
let mut in_flight = FuturesOrdered::new();
1222+
let mut in_flight =
1223+
super::in_flight_queue::InFlightQueue::new(self.preserve_ordering);
12141224
let mut shutting_down = false;
12151225
let completed_count = Arc::new(AtomicU64::new(0));
12161226
let mut yielded_since_last_record: u64 = 0;
@@ -1257,7 +1267,7 @@ impl Runner {
12571267
completed.fetch_add(1, Ordering::Relaxed);
12581268
outputs_buf
12591269
}.in_current_span());
1260-
in_flight.push_back(task);
1270+
in_flight.push(task);
12611271
}
12621272
None => {
12631273
shutting_down = true;

src/topology/in_flight_queue.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::{future::Future, pin::Pin};
2+
3+
use futures::{
4+
Stream,
5+
stream::{FuturesOrdered, FuturesUnordered},
6+
task::{Context, Poll},
7+
};
8+
9+
/// Wraps either a [`FuturesOrdered`] or [`FuturesUnordered`] with a unified
10+
/// interface, allowing the caller to choose between ordering and throughput
11+
/// at runtime without duplicating the polling loop.
12+
pub enum InFlightQueue<Fut: Future> {
13+
Ordered(FuturesOrdered<Fut>),
14+
Unordered(FuturesUnordered<Fut>),
15+
}
16+
17+
impl<Fut: Future> InFlightQueue<Fut> {
18+
pub fn new(preserve_ordering: bool) -> Self {
19+
if preserve_ordering {
20+
Self::Ordered(FuturesOrdered::new())
21+
} else {
22+
Self::Unordered(FuturesUnordered::new())
23+
}
24+
}
25+
26+
pub fn push(&mut self, fut: Fut) {
27+
match self {
28+
Self::Ordered(q) => q.push_back(fut),
29+
Self::Unordered(q) => q.push(fut),
30+
}
31+
}
32+
33+
pub fn len(&self) -> usize {
34+
match self {
35+
Self::Ordered(q) => q.len(),
36+
Self::Unordered(q) => q.len(),
37+
}
38+
}
39+
40+
pub fn is_empty(&self) -> bool {
41+
match self {
42+
Self::Ordered(q) => q.is_empty(),
43+
Self::Unordered(q) => q.is_empty(),
44+
}
45+
}
46+
}
47+
48+
impl<Fut: Future> Stream for InFlightQueue<Fut> {
49+
type Item = Fut::Output;
50+
51+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52+
match self.get_mut() {
53+
Self::Ordered(q) => Pin::new(q).poll_next(cx),
54+
Self::Unordered(q) => Pin::new(q).poll_next(cx),
55+
}
56+
}
57+
}

src/topology/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod schema;
1212

1313
pub mod builder;
1414
mod controller;
15+
mod in_flight_queue;
1516
mod ready_arrays;
1617
mod running;
1718
mod task;

0 commit comments

Comments
 (0)