diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index 88334f51e6361..ce43b0b400b99 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -464,6 +464,13 @@ impl MetaService for MetaServiceImpl { let stream = WatchStream::new(rx, Box::new(on_drop)); + let stream = stream.map(move |item| { + if let Ok(ref resp) = item { + network_metrics::incr_watch_sent(resp); + } + item + }); + if flush { let ctx = "watch-Dispatcher"; let snk = new_initialization_sink::(tx.clone(), ctx); diff --git a/src/meta/service/src/metrics/meta_metrics.rs b/src/meta/service/src/metrics/meta_metrics.rs index 61860b89b9333..ac0d00460cae5 100644 --- a/src/meta/service/src/metrics/meta_metrics.rs +++ b/src/meta/service/src/metrics/meta_metrics.rs @@ -689,6 +689,7 @@ pub mod network_metrics { use std::sync::LazyLock; use std::time::Duration; + use databend_common_meta_types::protobuf::WatchResponse; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::gauge::Gauge; use prometheus_client::metrics::histogram::Histogram; @@ -710,6 +711,12 @@ pub mod network_metrics { req_inflights: Gauge, req_success: Counter, req_failed: Counter, + + /// Number of items sent during watch stream initialization. + watch_initialization_item_sent: Counter, + + /// Number of items sent when data changes in a watch stream. + watch_change_item_sent: Counter, } impl NetworkMetrics { @@ -733,6 +740,9 @@ pub mod network_metrics { req_inflights: Gauge::default(), req_success: Counter::default(), req_failed: Counter::default(), + + watch_initialization_item_sent: Counter::default(), + watch_change_item_sent: Counter::default(), }; let mut registry = load_global_registry(); @@ -760,6 +770,17 @@ pub mod network_metrics { ); registry.register(key!("req_failed"), "req failed", metrics.req_failed.clone()); + registry.register( + key!("watch_initialization"), + "Number of items sent during watch stream initialization", + metrics.watch_initialization_item_sent.clone(), + ); + registry.register( + key!("watch_change"), + "Number of items sent when data changes in a watch stream", + metrics.watch_change_item_sent.clone(), + ); + metrics } } @@ -790,6 +811,25 @@ pub mod network_metrics { NETWORK_METRICS.req_failed.inc(); } } + + /// Increment the number of items sent in a watch response. + /// + /// It determines the type of item based on the response type. + pub fn incr_watch_sent(resp: &WatchResponse) { + if resp.is_initialization { + incr_watch_sent_initialization_item(); + } else { + incr_watch_sent_change_item(); + } + } + + pub fn incr_watch_sent_initialization_item() { + NETWORK_METRICS.watch_initialization_item_sent.inc(); + } + + pub fn incr_watch_sent_change_item() { + NETWORK_METRICS.watch_change_item_sent.inc(); + } } /// RAII metrics counter of in-flight requests count and delay. diff --git a/src/meta/service/tests/it/api/http/metrics.rs b/src/meta/service/tests/it/api/http/metrics.rs index 4078306ac33ec..1b3b497baacf3 100644 --- a/src/meta/service/tests/it/api/http/metrics.rs +++ b/src/meta/service/tests/it/api/http/metrics.rs @@ -281,5 +281,9 @@ async fn test_metrics() -> anyhow::Result<()> { // Raft storage metrics assert!(metric_keys.contains("metasrv_raft_storage_raft_store_write_failed_total")); + // Watch + assert!(metric_keys.contains("metasrv_meta_network_watch_initialization_total")); + assert!(metric_keys.contains("metasrv_meta_network_watch_change_total")); + Ok(()) }