Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/meta/service/src/api/grpc/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,13 @@ impl MetaService for MetaServiceImpl {

let stream = WatchStream::new(rx, Box::new(on_drop));

let stream = stream.map(move |item| {
if let Ok(ref resp) = item {
network_metrics::incr_watch_sent(resp);
}
item
});

if flush {
let ctx = "watch-Dispatcher";
let snk = new_initialization_sink::<WatchTypes>(tx.clone(), ctx);
Expand Down
40 changes: 40 additions & 0 deletions src/meta/service/src/metrics/meta_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ pub mod network_metrics {
use std::sync::LazyLock;
use std::time::Duration;

use databend_common_meta_types::protobuf::WatchResponse;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::Histogram;
Expand All @@ -710,6 +711,12 @@ pub mod network_metrics {
req_inflights: Gauge,
req_success: Counter,
req_failed: Counter,

/// Number of items sent during watch stream initialization.
watch_initialization_item_sent: Counter,

/// Number of items sent when data changes in a watch stream.
watch_change_item_sent: Counter,
}

impl NetworkMetrics {
Expand All @@ -733,6 +740,9 @@ pub mod network_metrics {
req_inflights: Gauge::default(),
req_success: Counter::default(),
req_failed: Counter::default(),

watch_initialization_item_sent: Counter::default(),
watch_change_item_sent: Counter::default(),
};

let mut registry = load_global_registry();
Expand Down Expand Up @@ -760,6 +770,17 @@ pub mod network_metrics {
);
registry.register(key!("req_failed"), "req failed", metrics.req_failed.clone());

registry.register(
key!("watch_initialization"),
"Number of items sent during watch stream initialization",
metrics.watch_initialization_item_sent.clone(),
);
registry.register(
key!("watch_change"),
"Number of items sent when data changes in a watch stream",
metrics.watch_change_item_sent.clone(),
);

metrics
}
}
Expand Down Expand Up @@ -790,6 +811,25 @@ pub mod network_metrics {
NETWORK_METRICS.req_failed.inc();
}
}

/// Increment the number of items sent in a watch response.
///
/// It determines the type of item based on the response type.
pub fn incr_watch_sent(resp: &WatchResponse) {
if resp.is_initialization {
incr_watch_sent_initialization_item();
} else {
incr_watch_sent_change_item();
}
}

pub fn incr_watch_sent_initialization_item() {
NETWORK_METRICS.watch_initialization_item_sent.inc();
}

pub fn incr_watch_sent_change_item() {
NETWORK_METRICS.watch_change_item_sent.inc();
}
}

/// RAII metrics counter of in-flight requests count and delay.
Expand Down
4 changes: 4 additions & 0 deletions src/meta/service/tests/it/api/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,9 @@ async fn test_metrics() -> anyhow::Result<()> {
// Raft storage metrics
assert!(metric_keys.contains("metasrv_raft_storage_raft_store_write_failed_total"));

// Watch
assert!(metric_keys.contains("metasrv_meta_network_watch_initialization_total"));
assert!(metric_keys.contains("metasrv_meta_network_watch_change_total"));

Ok(())
}
Loading