Skip to content

Commit e35c5e2

Browse files
committed
using atomic counter for more accurate metric
1 parent 957cca0 commit e35c5e2

1 file changed

Lines changed: 11 additions & 5 deletions

File tree

src/topology/builder.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use std::{
22
collections::HashMap,
33
future::ready,
44
num::NonZeroUsize,
5-
sync::{Arc, LazyLock, Mutex},
5+
sync::{
6+
Arc, LazyLock, Mutex,
7+
atomic::{AtomicU64, Ordering},
8+
},
69
time::Instant,
710
};
811

@@ -1209,15 +1212,16 @@ impl Runner {
12091212

12101213
let mut in_flight = FuturesOrdered::new();
12111214
let mut shutting_down = false;
1212-
let mut blocked_completions: u64 = 0;
1215+
let completed_count = Arc::new(AtomicU64::new(0));
1216+
let mut yielded_since_last_record: u64 = 0;
12131217

12141218
self.timer_tx.try_send_start_wait();
12151219
loop {
12161220
tokio::select! {
12171221
biased;
12181222

12191223
result = in_flight.next(), if !in_flight.is_empty() => {
1220-
blocked_completions += 1;
1224+
yielded_since_last_record += 1;
12211225
match result {
12221226
Some(Ok(mut outputs_buf)) => {
12231227
self.send_outputs(&mut outputs_buf).await
@@ -1228,12 +1232,13 @@ impl Runner {
12281232
}
12291233

12301234
input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1235+
let blocked_completions = completed_count.fetch_sub(yielded_since_last_record, Ordering::Relaxed);
1236+
yielded_since_last_record = 0;
12311237
histogram!(
12321238
"transform_concurrent_scheduling_pressure",
12331239
"component_id" => self.component_id.clone()
12341240
)
12351241
.record((blocked_completions as f64 / *TRANSFORM_CONCURRENCY_LIMIT as f64).min(1.0));
1236-
blocked_completions = 0;
12371242
match input_arrays {
12381243
Some(input_arrays) => {
12391244
let mut len = 0;
@@ -1244,10 +1249,12 @@ impl Runner {
12441249

12451250
let mut t = self.transform.clone();
12461251
let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1252+
let completed = Arc::clone(&completed_count);
12471253
let task = tokio::spawn(async move {
12481254
for events in input_arrays {
12491255
t.transform_all(events, &mut outputs_buf);
12501256
}
1257+
completed.fetch_add(1, Ordering::Relaxed);
12511258
outputs_buf
12521259
}.in_current_span());
12531260
in_flight.push_back(task);
@@ -1260,7 +1267,6 @@ impl Runner {
12601267
}
12611268

12621269
else => {
1263-
blocked_completions = 0;
12641270
if shutting_down {
12651271
break
12661272
}

0 commit comments

Comments
 (0)