Skip to content

Commit 6b46789

Browse files
committed
using atomic counter for more accurate metric
1 parent a417eeb commit 6b46789

1 file changed

Lines changed: 11 additions & 5 deletions

File tree

src/topology/builder.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use std::{
22
collections::HashMap,
33
future::ready,
44
num::NonZeroUsize,
5-
sync::{Arc, LazyLock, Mutex},
5+
sync::{
6+
Arc, LazyLock, Mutex,
7+
atomic::{AtomicU64, Ordering},
8+
},
69
time::Instant,
710
};
811

@@ -1204,15 +1207,16 @@ impl Runner {
12041207

12051208
let mut in_flight = FuturesOrdered::new();
12061209
let mut shutting_down = false;
1207-
let mut blocked_completions: u64 = 0;
1210+
let completed_count = Arc::new(AtomicU64::new(0));
1211+
let mut yielded_since_last_record: u64 = 0;
12081212

12091213
self.timer_tx.try_send_start_wait();
12101214
loop {
12111215
tokio::select! {
12121216
biased;
12131217

12141218
result = in_flight.next(), if !in_flight.is_empty() => {
1215-
blocked_completions += 1;
1219+
yielded_since_last_record += 1;
12161220
match result {
12171221
Some(Ok(mut outputs_buf)) => {
12181222
self.send_outputs(&mut outputs_buf).await
@@ -1223,12 +1227,13 @@ impl Runner {
12231227
}
12241228

12251229
input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1230+
let blocked_completions = completed_count.fetch_sub(yielded_since_last_record, Ordering::Relaxed);
1231+
yielded_since_last_record = 0;
12261232
histogram!(
12271233
"transform_concurrent_scheduling_pressure",
12281234
"component_id" => self.component_id.clone()
12291235
)
12301236
.record((blocked_completions as f64 / *TRANSFORM_CONCURRENCY_LIMIT as f64).min(1.0));
1231-
blocked_completions = 0;
12321237
match input_arrays {
12331238
Some(input_arrays) => {
12341239
let mut len = 0;
@@ -1239,10 +1244,12 @@ impl Runner {
12391244

12401245
let mut t = self.transform.clone();
12411246
let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1247+
let completed = Arc::clone(&completed_count);
12421248
let task = tokio::spawn(async move {
12431249
for events in input_arrays {
12441250
t.transform_all(events, &mut outputs_buf);
12451251
}
1252+
completed.fetch_add(1, Ordering::Relaxed);
12461253
outputs_buf
12471254
}.in_current_span());
12481255
in_flight.push_back(task);
@@ -1255,7 +1262,6 @@ impl Runner {
12551262
}
12561263

12571264
else => {
1258-
blocked_completions = 0;
12591265
if shutting_down {
12601266
break
12611267
}

0 commit comments

Comments
 (0)