Skip to content

Commit aaa5f77

Browse files
adarsh0728vigith
andauthored
feat: add reduce metrics for rust data plane (#3364)
Signed-off-by: adarsh0728 <gooneriitk@gmail.com> Signed-off-by: Vigith Maurice <vigith@gmail.com> Co-authored-by: Vigith Maurice <vigith@gmail.com>
1 parent ce2ac17 commit aaa5f77

11 files changed

Lines changed: 321 additions & 5 deletions

File tree

docs/operations/metrics/metrics.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,21 @@ These metrics are specific to map/UDF vertices.
5555
| `forwarder_udf_error_total` | Counter | | Total number of UDF errors |
5656
| `forwarder_udf_processing_time` | Histogram | | Processing times of User-Defined Functions (UDFs), in microseconds |
5757

58+
### Reduce Metrics
59+
60+
These metrics are specific to reduce (windowed aggregation) vertices.
61+
62+
| Metric name | Metric type | Additional Labels | Description |
63+
|--------------------------------------|-------------|-------------------|----------------------------------------------------------------------|
64+
| `reduce_active_windows` | Gauge | | Number of currently open reduce windows |
65+
| `reduce_closed_windows` | Gauge | | Number of closed windows awaiting GC |
66+
| `reduce_watermark_lag` | Gauge | | Difference between wall clock and watermark, in milliseconds |
67+
| `reduce_window_processing_time` | Histogram | | Window open-to-close latency, in microseconds |
68+
| `reduce_pnf_process_time` | Histogram | | UDF reduce function execution time per window, in microseconds |
69+
| `reduce_pbq_write` | Counter | | Total data messages written to PBQ |
70+
71+
> **Note:** All reduce metrics carry the standard pipeline common labels (`pipeline`, `vertex`, `replica`) with `vertex_type` always set to `ReduceUDF`.
72+
5873
### Fallback Sink Metrics
5974

6075
These metrics are specific to sink vertices with a fallback sink configured.

rust/numaflow-core/src/metrics/mod.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const MVTX_REGISTRY_GLOBAL_PREFIX: &str = "monovtx";
6262
// Prefixes for the sub-registries
6363
const SINK_REGISTRY_PREFIX: &str = "sink";
6464
const FALLBACK_SINK_REGISTRY_PREFIX: &str = "fallback_sink";
65+
const REDUCE_REGISTRY_PREFIX: &str = "reduce";
6566
const ON_SUCCESS_SINK_REGISTRY_PREFIX: &str = "onsuccess_sink";
6667
const TRANSFORMER_REGISTRY_PREFIX: &str = "transformer";
6768
const UDF_REGISTRY_PREFIX: &str = "udf";
@@ -149,6 +150,14 @@ const SINK_TIME: &str = "time";
149150
const FALLBACK_SINK_TIME: &str = "time";
150151
const ON_SUCCESS_SINK_TIME: &str = "time";
151152

153+
// reduce specific metrics
154+
const REDUCE_ACTIVE_WINDOWS: &str = "active_windows";
155+
const REDUCE_CLOSED_WINDOWS: &str = "closed_windows";
156+
const REDUCE_WINDOW_PROCESSING_TIME: &str = "window_processing_time";
157+
const REDUCE_PNF_PROCESS_TIME: &str = "pnf_process_time";
158+
const REDUCE_WATERMARK_LAG: &str = "watermark_lag";
159+
const REDUCE_PBQ_WRITE_TOTAL: &str = "pbq_write";
160+
152161
// jetstream isb processing times
153162
const JETSTREAM_ISB_READ_TIME_TOTAL: &str = "read_time_total";
154163
const JETSTREAM_ISB_WRITE_TIME_TOTAL: &str = "write_time_total";
@@ -307,6 +316,8 @@ pub(crate) struct PipelineMetrics {
307316
pub(crate) sink_forwarder: SinkForwarderMetrics,
308317
pub(crate) jetstream_isb: JetStreamISBMetrics,
309318
pub(crate) pending_raw: Family<Vec<(String, String)>, Gauge>,
319+
// reduce specific metrics
320+
pub(crate) reduce: ReduceMetrics,
310321
}
311322

312323
/// Family of metrics for the sink
@@ -342,6 +353,39 @@ pub(crate) struct UDFMetrics {
342353
pub(crate) errors_total: Family<Vec<(String, String)>, Counter>,
343354
}
344355

356+
/// Family of metrics for the Reduce vertex
357+
pub(crate) struct ReduceMetrics {
358+
// gauges
359+
pub(crate) active_windows: Family<Vec<(String, String)>, Gauge>,
360+
pub(crate) closed_windows: Family<Vec<(String, String)>, Gauge>,
361+
pub(crate) watermark_lag: Family<Vec<(String, String)>, Gauge<f64, AtomicU64>>,
362+
// histograms
363+
pub(crate) window_processing_time: Family<Vec<(String, String)>, Histogram>,
364+
pub(crate) pnf_process_time: Family<Vec<(String, String)>, Histogram>,
365+
// counters
366+
pub(crate) pbq_write_total: Family<Vec<(String, String)>, Counter>,
367+
}
368+
369+
impl ReduceMetrics {
370+
pub(crate) fn new() -> Self {
371+
Self {
372+
active_windows: Family::<Vec<(String, String)>, Gauge>::default(),
373+
closed_windows: Family::<Vec<(String, String)>, Gauge>::default(),
374+
watermark_lag: Family::<Vec<(String, String)>, Gauge<f64, AtomicU64>>::default(),
375+
window_processing_time:
376+
Family::<Vec<(String, String)>, Histogram>::new_with_constructor(
377+
// 1ms to 60 minutes in microseconds
378+
|| Histogram::new(exponential_buckets_range(1000.0, 3_600_000_000.0, 10)),
379+
),
380+
pnf_process_time: Family::<Vec<(String, String)>, Histogram>::new_with_constructor(
381+
// 1ms to 20 minutes in microseconds
382+
|| Histogram::new(exponential_buckets_range(1000.0, 1_200_000_000.0, 10)),
383+
),
384+
pbq_write_total: Family::<Vec<(String, String)>, Counter>::default(),
385+
}
386+
}
387+
}
388+
345389
/// Generic forwarder metrics
346390
pub(crate) struct PipelineForwarderMetrics {
347391
// read counters
@@ -763,13 +807,15 @@ impl PipelineMetrics {
763807
sink_forwarder: SinkForwarderMetrics::new(),
764808
jetstream_isb: JetStreamISBMetrics::new(),
765809
pending_raw: Family::<Vec<(String, String)>, Gauge>::default(),
810+
reduce: ReduceMetrics::new(),
766811
};
767812
let mut registry = global_registry().registry.lock();
768813
Self::register_forwarder_metrics(&metrics, &mut registry);
769814
Self::register_source_forwarder_metrics(&metrics, &mut registry);
770815
Self::register_sink_forwarder_metrics(&metrics, &mut registry);
771816
Self::register_jetstream_isb_metrics(&metrics, &mut registry);
772817
Self::register_vertex_metrics(&metrics, &mut registry);
818+
Self::register_reduce_metrics(&metrics, &mut registry);
773819
metrics
774820
}
775821

@@ -1047,6 +1093,40 @@ impl PipelineMetrics {
10471093
metrics.pending_raw.clone(),
10481094
);
10491095
}
1096+
1097+
fn register_reduce_metrics(metrics: &Self, registry: &mut Registry) {
1098+
let reduce_registry = registry.sub_registry_with_prefix(REDUCE_REGISTRY_PREFIX);
1099+
reduce_registry.register(
1100+
REDUCE_ACTIVE_WINDOWS,
1101+
"Number of currently open reduce windows",
1102+
metrics.reduce.active_windows.clone(),
1103+
);
1104+
reduce_registry.register(
1105+
REDUCE_CLOSED_WINDOWS,
1106+
"Number of closed reduce windows awaiting GC",
1107+
metrics.reduce.closed_windows.clone(),
1108+
);
1109+
reduce_registry.register(
1110+
REDUCE_WATERMARK_LAG,
1111+
"Difference between current wall clock and watermark in milliseconds",
1112+
metrics.reduce.watermark_lag.clone(),
1113+
);
1114+
reduce_registry.register(
1115+
REDUCE_WINDOW_PROCESSING_TIME,
1116+
"Time from window open to window close in microseconds (1ms to 60 minutes)",
1117+
metrics.reduce.window_processing_time.clone(),
1118+
);
1119+
reduce_registry.register(
1120+
REDUCE_PNF_PROCESS_TIME,
1121+
"Time for UDF reduce function to complete per window in microseconds (1ms to 20 minutes)",
1122+
metrics.reduce.pnf_process_time.clone(),
1123+
);
1124+
reduce_registry.register(
1125+
REDUCE_PBQ_WRITE_TOTAL,
1126+
"Total number of messages written to PBQ",
1127+
metrics.reduce.pbq_write_total.clone(),
1128+
);
1129+
}
10501130
}
10511131

10521132
/// MONOVTX_METRICS is the MonoVtxMetrics object which stores the metrics

rust/numaflow-core/src/reduce/pbq.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use crate::config::pipeline::VERTEX_TYPE_REDUCE_UDF;
12
use crate::error::Result;
23
use crate::mark_success;
3-
use crate::message::Message;
4+
use crate::message::{Message, MessageType};
5+
use crate::metrics::{pipeline_metric_labels, pipeline_metrics};
46
use crate::pipeline::isb::reader::ISBReaderOrchestrator;
57
use crate::reduce::wal::WalMessage;
68
use crate::reduce::wal::segment::append::{AppendOnlyWal, SegmentWriteMessage};
@@ -114,9 +116,18 @@ impl<C: NumaflowTypeConfig> PBQ<C> {
114116
let msg: WalMessage = msg.try_into().map_err(|e| {
115117
crate::error::Error::Reduce(format!("Failed to parse WAL message: {e}"))
116118
})?;
117-
messages_tx.send(msg.into()).await.map_err(|_| {
119+
let message: Message = msg.into();
120+
let is_data = message.typ == MessageType::Data;
121+
messages_tx.send(message).await.map_err(|_| {
118122
crate::error::Error::Reduce("PBQ WAL replay: receiver dropped".to_string())
119123
})?;
124+
if is_data {
125+
pipeline_metrics()
126+
.reduce
127+
.pbq_write_total
128+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
129+
.inc();
130+
}
120131
replayed_count += 1;
121132
}
122133

@@ -153,11 +164,19 @@ impl<C: NumaflowTypeConfig> PBQ<C> {
153164
})?;
154165

155166
// Send cloned message downstream
167+
let is_data = message.typ == MessageType::Data;
156168
tx.send(message).await.map_err(|_| {
157169
crate::error::Error::Reduce(
158170
"PBQ ISB reader: downstream receiver dropped".to_string(),
159171
)
160172
})?;
173+
if is_data {
174+
pipeline_metrics()
175+
.reduce
176+
.pbq_write_total
177+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
178+
.inc();
179+
}
161180
}
162181

163182
isb_handle
@@ -194,11 +213,19 @@ impl<C: NumaflowTypeConfig> PBQ<C> {
194213
while let Some(read_msg) = isb_stream.next().await {
195214
// Extract the message and forward to output channel
196215
let message = read_msg.message().clone();
216+
let is_data = message.typ == MessageType::Data;
197217
tx.send(message).await.map_err(|_| {
198218
crate::error::Error::Reduce(
199219
"PBQ ISB reader: downstream receiver dropped".to_string(),
200220
)
201221
})?;
222+
if is_data {
223+
pipeline_metrics()
224+
.reduce
225+
.pbq_write_total
226+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
227+
.inc();
228+
}
202229
// Mark the read message as success after processing
203230
mark_success!(read_msg);
204231
}

rust/numaflow-core/src/reduce/reducer/aligned/reducer.rs

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
use crate::config::pipeline::VERTEX_TYPE_REDUCE_UDF;
12
use crate::config::{get_vertex_name, get_vertex_replica};
23
use crate::error::Error;
34
use crate::message::{Message, MessageType};
4-
use crate::metrics::{pipeline_drop_metric_labels, pipeline_metrics};
5+
use crate::metrics::{pipeline_drop_metric_labels, pipeline_metric_labels, pipeline_metrics};
56
use crate::pipeline::isb::writer::ISBWriterOrchestrator;
67
use crate::reduce::reducer::aligned::user_defined::UserDefinedAlignedReduce;
78
use crate::reduce::reducer::aligned::windower::{
@@ -15,7 +16,7 @@ use numaflow_pb::objects::wal::GcEvent;
1516
use std::collections::HashMap;
1617
use std::ops::Sub;
1718
use std::sync::Arc;
18-
use std::time::Duration;
19+
use std::time::{Duration, Instant};
1920
use tokio::sync::mpsc;
2021
use tokio::task::JoinHandle;
2122
use tokio_stream::StreamExt;
@@ -44,6 +45,7 @@ struct ReduceTask<C: NumaflowTypeConfig> {
4445
error_tx: mpsc::Sender<Error>,
4546
window: Window,
4647
window_manager: AlignedWindowManager,
48+
window_open_time: Instant,
4749
}
4850

4951
impl<C: NumaflowTypeConfig> ReduceTask<C> {
@@ -63,6 +65,7 @@ impl<C: NumaflowTypeConfig> ReduceTask<C> {
6365
error_tx,
6466
window,
6567
window_manager,
68+
window_open_time: Instant::now(),
6669
}
6770
}
6871

@@ -92,10 +95,17 @@ impl<C: NumaflowTypeConfig> ReduceTask<C> {
9295
// Call the reduce function. This is a blocking call and will return only once the window
9396
// is closed, cancellation is detected, or on error. The output is sent to the result_tx
9497
// channel which is consumed by the writer task and published to JetStream.
98+
let udf_start = Instant::now();
9599
let result = self
96100
.client
97101
.reduce_fn(message_stream, result_tx, cln_token)
98102
.await;
103+
// recorded unconditionally; error/cancellation paths return below before window_processing_time
104+
pipeline_metrics()
105+
.reduce
106+
.pnf_process_time
107+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
108+
.observe(udf_start.elapsed().as_micros() as f64);
99109

100110
if let Err(e) = result {
101111
// Check if this is a cancellation error
@@ -126,10 +136,24 @@ impl<C: NumaflowTypeConfig> ReduceTask<C> {
126136
.oldest_window()
127137
.expect("no oldest window found");
128138

139+
// Record window processing time (open to results-written)
140+
pipeline_metrics()
141+
.reduce
142+
.window_processing_time
143+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
144+
.observe(self.window_open_time.elapsed().as_micros() as f64);
145+
129146
// we can safely delete the window from the window manager since the results are
130147
// successfully written to jetstream and watermark is published.
131148
self.window_manager.gc_window(self.window.clone());
132149

150+
// Update closed windows gauge after GC removes the window
151+
pipeline_metrics()
152+
.reduce
153+
.closed_windows
154+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
155+
.set(self.window_manager.closed_window_count() as i64);
156+
133157
// now that the processing is done, we can add this window to the GC WAL.
134158
let Some(gc_wal_tx) = &self.gc_wal_tx else {
135159
// return if the GC WAL is not configured
@@ -281,6 +305,13 @@ impl<C: NumaflowTypeConfig> AlignedReduceActor<C> {
281305
},
282306
);
283307

308+
// Update active windows gauge
309+
pipeline_metrics()
310+
.reduce
311+
.active_windows
312+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
313+
.set(self.window_manager.active_window_count() as i64);
314+
284315
// Send the open command with the first message
285316
if let Err(e) = message_tx.send(window_msg).await
286317
&& !self.cln_token.is_cancelled()
@@ -340,6 +371,18 @@ impl<C: NumaflowTypeConfig> AlignedReduceActor<C> {
340371
return;
341372
};
342373

374+
// Update active and closed windows gauges
375+
pipeline_metrics()
376+
.reduce
377+
.active_windows
378+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
379+
.set(self.window_manager.active_window_count() as i64);
380+
pipeline_metrics()
381+
.reduce
382+
.closed_windows
383+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
384+
.set(self.window_manager.closed_window_count() as i64);
385+
343386
// we don't need to write the close message to the client, stream closing
344387
// is considered as close for aligned windows.
345388
// Drop the sender to signal completion
@@ -475,6 +518,12 @@ impl<C: NumaflowTypeConfig> AlignedReducer<C> {
475518
// Only close windows if the idle watermark is greater than current watermark
476519
if idle_watermark > self.current_watermark {
477520
self.current_watermark = idle_watermark;
521+
let lag = (Utc::now().timestamp_millis() - self.current_watermark.timestamp_millis()).max(0);
522+
pipeline_metrics()
523+
.reduce
524+
.watermark_lag
525+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
526+
.set(lag as f64);
478527
self.close_windows_with_watermark(idle_watermark, &actor_tx).await;
479528
}
480529
}
@@ -579,6 +628,17 @@ impl<C: NumaflowTypeConfig> AlignedReducer<C> {
579628
.unwrap_or(DateTime::from_timestamp_millis(-1).expect("Invalid timestamp")),
580629
);
581630

631+
// Update watermark lag gauge (skip -1 sentinel)
632+
if self.current_watermark.timestamp_millis() != -1 {
633+
let lag =
634+
(Utc::now().timestamp_millis() - self.current_watermark.timestamp_millis()).max(0);
635+
pipeline_metrics()
636+
.reduce
637+
.watermark_lag
638+
.get_or_create(pipeline_metric_labels(VERTEX_TYPE_REDUCE_UDF))
639+
.set(lag as f64);
640+
}
641+
582642
// Handle late messages
583643
if msg.is_late {
584644
if self.current_watermark.timestamp_millis() == -1 {

rust/numaflow-core/src/reduce/reducer/aligned/windower.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,22 @@ impl AlignedWindowManager {
6262
AlignedWindowManager::Sliding(manager) => manager.oldest_window(),
6363
}
6464
}
65+
66+
/// Returns the number of currently active windows.
67+
pub(crate) fn active_window_count(&self) -> usize {
68+
match self {
69+
AlignedWindowManager::Fixed(manager) => manager.active_window_count(),
70+
AlignedWindowManager::Sliding(manager) => manager.active_window_count(),
71+
}
72+
}
73+
74+
/// Returns the number of closed windows awaiting GC.
75+
pub(crate) fn closed_window_count(&self) -> usize {
76+
match self {
77+
AlignedWindowManager::Fixed(manager) => manager.closed_window_count(),
78+
AlignedWindowManager::Sliding(manager) => manager.closed_window_count(),
79+
}
80+
}
6581
}
6682

6783
/// A Window is represented by its start and end time. All the data which event time falls within

0 commit comments

Comments
 (0)