Skip to content

Commit 1a4d60d

Browse files
committed
add global option to loosen ordering guarantees in stateless transforms in exchange for potential performance benefits
1 parent fbb1e4b commit 1a4d60d

4 files changed

Lines changed: 93 additions & 3 deletions

File tree

lib/vector-core/src/config/global_options.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,15 @@ pub struct GlobalOptions {
190190
/// `find_vector_metrics`, and `aggregate_vector_metrics` functions.
191191
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
192192
pub metrics_storage_refresh_period: Option<f64>,
193+
194+
/// Whether to preserve event ordering for concurrent stateless transforms.
195+
///
196+
/// When `true` (the default), concurrent stateless transforms use `FuturesOrdered`,
197+
/// guaranteeing output order matches input order. When `false`, `FuturesUnordered`
198+
/// is used, which may improve throughput at the cost of ordering guarantees.
199+
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
200+
#[configurable(metadata(docs::common = false, docs::required = false))]
201+
pub preserve_ordering_stateless_transforms: Option<bool>,
193202
}
194203

195204
impl_generate_config_from_default!(GlobalOptions);
@@ -295,6 +304,16 @@ impl GlobalOptions {
295304
errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
296305
}
297306

307+
if conflicts(
308+
self.preserve_ordering_stateless_transforms.as_ref(),
309+
with.preserve_ordering_stateless_transforms.as_ref(),
310+
) {
311+
errors.push(
312+
"conflicting values for 'preserve_ordering_stateless_transforms' found"
313+
.to_owned(),
314+
);
315+
}
316+
298317
let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
299318
with.data_dir
300319
} else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
@@ -345,6 +364,9 @@ impl GlobalOptions {
345364
metrics_storage_refresh_period: self
346365
.metrics_storage_refresh_period
347366
.or(with.metrics_storage_refresh_period),
367+
preserve_ordering_stateless_transforms: self
368+
.preserve_ordering_stateless_transforms
369+
.or(with.preserve_ordering_stateless_transforms),
348370
})
349371
} else {
350372
Err(errors)

src/topology/builder.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
time::Instant,
77
};
88

9-
use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
9+
use futures::{FutureExt, StreamExt, TryStreamExt};
1010
use futures_util::stream::FuturesUnordered;
1111
use metrics::gauge;
1212
use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
@@ -745,13 +745,19 @@ impl<'a> Builder<'a> {
745745
let sender = self
746746
.utilization_registry
747747
.add_component(node.key.clone(), gauge!("utilization"));
748+
let preserve_ordering = self
749+
.config
750+
.global
751+
.preserve_ordering_stateless_transforms
752+
.unwrap_or(true);
748753
let runner = Runner::new(
749754
t,
750755
input_rx,
751756
sender,
752757
node.input_details.data_type(),
753758
outputs,
754759
LatencyRecorder::new(self.config.global.latency_ewma_alpha),
760+
preserve_ordering,
755761
);
756762
let transform = if node.enable_concurrency {
757763
runner.run_concurrently().boxed()
@@ -1125,6 +1131,7 @@ struct Runner {
11251131
timer_tx: UtilizationComponentSender,
11261132
latency_recorder: LatencyRecorder,
11271133
events_received: Registered<EventsReceived>,
1134+
preserve_ordering: bool,
11281135
}
11291136

11301137
impl Runner {
@@ -1135,6 +1142,7 @@ impl Runner {
11351142
input_type: DataType,
11361143
outputs: TransformOutputs,
11371144
latency_recorder: LatencyRecorder,
1145+
preserve_ordering: bool,
11381146
) -> Self {
11391147
Self {
11401148
transform,
@@ -1144,6 +1152,7 @@ impl Runner {
11441152
timer_tx,
11451153
latency_recorder,
11461154
events_received: register!(EventsReceived),
1155+
preserve_ordering,
11471156
}
11481157
}
11491158

@@ -1199,7 +1208,8 @@ impl Runner {
11991208
let mut input_rx =
12001209
super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
12011210

1202-
let mut in_flight = FuturesOrdered::new();
1211+
let mut in_flight =
1212+
super::in_flight_queue::InFlightQueue::new(self.preserve_ordering);
12031213
let mut shutting_down = false;
12041214

12051215
self.timer_tx.try_send_start_wait();
@@ -1234,7 +1244,7 @@ impl Runner {
12341244
}
12351245
outputs_buf
12361246
}.in_current_span());
1237-
in_flight.push_back(task);
1247+
in_flight.push(task);
12381248
}
12391249
None => {
12401250
shutting_down = true;

src/topology/in_flight_queue.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::{future::Future, pin::Pin};
2+
3+
use futures::{
4+
Stream,
5+
stream::{FuturesOrdered, FuturesUnordered},
6+
task::{Context, Poll},
7+
};
8+
9+
/// Wraps either a [`FuturesOrdered`] or [`FuturesUnordered`] with a unified
10+
/// interface, allowing the caller to choose between ordering and throughput
11+
/// at runtime without duplicating the polling loop.
12+
pub enum InFlightQueue<Fut: Future> {
13+
Ordered(FuturesOrdered<Fut>),
14+
Unordered(FuturesUnordered<Fut>),
15+
}
16+
17+
impl<Fut: Future> InFlightQueue<Fut> {
18+
pub fn new(preserve_ordering: bool) -> Self {
19+
if preserve_ordering {
20+
Self::Ordered(FuturesOrdered::new())
21+
} else {
22+
Self::Unordered(FuturesUnordered::new())
23+
}
24+
}
25+
26+
pub fn push(&mut self, fut: Fut) {
27+
match self {
28+
Self::Ordered(q) => q.push_back(fut),
29+
Self::Unordered(q) => q.push(fut),
30+
}
31+
}
32+
33+
pub fn len(&self) -> usize {
34+
match self {
35+
Self::Ordered(q) => q.len(),
36+
Self::Unordered(q) => q.len(),
37+
}
38+
}
39+
40+
pub fn is_empty(&self) -> bool {
41+
match self {
42+
Self::Ordered(q) => q.is_empty(),
43+
Self::Unordered(q) => q.is_empty(),
44+
}
45+
}
46+
}
47+
48+
impl<Fut: Future> Stream for InFlightQueue<Fut> {
49+
type Item = Fut::Output;
50+
51+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52+
match self.get_mut() {
53+
Self::Ordered(q) => Pin::new(q).poll_next(cx),
54+
Self::Unordered(q) => Pin::new(q).poll_next(cx),
55+
}
56+
}
57+
}

src/topology/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod schema;
1212

1313
pub mod builder;
1414
mod controller;
15+
mod in_flight_queue;
1516
mod ready_arrays;
1617
mod running;
1718
mod task;

0 commit comments

Comments
 (0)