diff --git a/src/meta/client/src/grpc_action.rs b/src/meta/client/src/grpc_action.rs index 08f4a19014a61..b601797fbc065 100644 --- a/src/meta/client/src/grpc_action.rs +++ b/src/meta/client/src/grpc_action.rs @@ -152,6 +152,14 @@ impl MetaGrpcReadReq { Ok(raft_request) } + + pub fn type_name(&self) -> &'static str { + match self { + MetaGrpcReadReq::GetKV(_) => "get", + MetaGrpcReadReq::MGetKV(_) => "mget", + MetaGrpcReadReq::ListKV(_) => "list", + } + } } impl RequestFor for GetKVReq { diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index ce43b0b400b99..3ad35652c63f2 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -53,6 +53,7 @@ use databend_common_meta_types::LogEntry; use databend_common_meta_types::TxnReply; use databend_common_meta_types::TxnRequest; use databend_common_metrics::count::Count; +use databend_common_tracing::start_trace_for_remote_request; use fastrace::func_name; use fastrace::func_path; use fastrace::prelude::*; @@ -160,10 +161,8 @@ impl MetaServiceImpl { #[fastrace::trace] async fn handle_kv_read_v1( &self, - request: Request, + req: MetaGrpcReadReq, ) -> Result<(Option, BoxStream), Status> { - let req: MetaGrpcReadReq = GrpcHelper::parse_req(request)?; - debug!("{}: Received ReadRequest: {:?}", func_name!(), req); let req = ForwardRequest::new(1, req); @@ -296,8 +295,7 @@ impl MetaService for MetaServiceImpl { network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64); let _guard = RequestInFlight::guard(); - let root = - databend_common_tracing::start_trace_for_remote_request(func_path!(), &request); + let root = start_trace_for_remote_request(func_path!(), &request); let reply = self.handle_kv_api(request).in_span(root).await?; network_metrics::incr_sent_bytes(reply.encoded_len() as u64); @@ -316,12 +314,23 @@ impl MetaService for MetaServiceImpl { self.check_token(request.metadata())?; let _guard = thread_tracking_guard(&request); + + network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64); + + let root = start_trace_for_remote_request(func_path!(), &request); + + let req: MetaGrpcReadReq = GrpcHelper::parse_req(request)?; + let req_typ = req.type_name(); + ThreadTracker::tracking_future(async move { - network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64); - let root = - databend_common_tracing::start_trace_for_remote_request(func_path!(), &request); + let (endpoint, strm) = self.handle_kv_read_v1(req).in_span(root).await?; - let (endpoint, strm) = self.handle_kv_read_v1(request).in_span(root).await?; + let strm = strm + .map(move |item| { + network_metrics::incr_stream_sent_item(req_typ); + item + }) + .boxed(); let mut resp = Response::new(strm); GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref()); @@ -343,8 +352,7 @@ impl MetaService for MetaServiceImpl { network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64); let _guard = RequestInFlight::guard(); - let root = - databend_common_tracing::start_trace_for_remote_request(func_path!(), &request); + let root = start_trace_for_remote_request(func_path!(), &request); let (endpoint, reply) = self.handle_txn(request).in_span(root).await?; network_metrics::incr_sent_bytes(reply.encoded_len() as u64); diff --git a/src/meta/service/src/metrics/meta_metrics.rs b/src/meta/service/src/metrics/meta_metrics.rs index ac0d00460cae5..909153eb6f881 100644 --- a/src/meta/service/src/metrics/meta_metrics.rs +++ b/src/meta/service/src/metrics/meta_metrics.rs @@ -690,6 +690,7 @@ pub mod network_metrics { use std::time::Duration; use databend_common_meta_types::protobuf::WatchResponse; + use log::error; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::gauge::Gauge; use prometheus_client::metrics::histogram::Histogram; @@ -717,6 +718,15 @@ pub mod network_metrics { /// Number of items sent when data changes in a watch stream. watch_change_item_sent: Counter, + + /// Number of items sent in a stream get response. + stream_get_item_sent: Counter, + + /// Number of items sent in a stream mget response. + stream_mget_item_sent: Counter, + + /// Number of items sent in a stream list response. + stream_list_item_sent: Counter, } impl NetworkMetrics { @@ -743,6 +753,10 @@ pub mod network_metrics { watch_initialization_item_sent: Counter::default(), watch_change_item_sent: Counter::default(), + + stream_get_item_sent: Counter::default(), + stream_mget_item_sent: Counter::default(), + stream_list_item_sent: Counter::default(), }; let mut registry = load_global_registry(); @@ -781,6 +795,22 @@ pub mod network_metrics { metrics.watch_change_item_sent.clone(), ); + registry.register( + key!("stream_get_item_sent"), + "Number of items sent in a stream get response", + metrics.stream_get_item_sent.clone(), + ); + registry.register( + key!("stream_mget_item_sent"), + "Number of items sent in a stream mget response", + metrics.stream_mget_item_sent.clone(), + ); + registry.register( + key!("stream_list_item_sent"), + "Number of items sent in a stream list response", + metrics.stream_list_item_sent.clone(), + ); + metrics } } @@ -830,6 +860,23 @@ pub mod network_metrics { pub fn incr_watch_sent_change_item() { NETWORK_METRICS.watch_change_item_sent.inc(); } + + pub fn incr_stream_sent_item(typ: &'static str) { + match typ { + "get" => { + NETWORK_METRICS.stream_get_item_sent.inc(); + } + "mget" => { + NETWORK_METRICS.stream_mget_item_sent.inc(); + } + "list" => { + NETWORK_METRICS.stream_list_item_sent.inc(); + } + _ => { + error!("Unknown stream item type: {}", typ); + } + } + } } /// RAII metrics counter of in-flight requests count and delay. diff --git a/src/meta/service/tests/it/api/http/metrics.rs b/src/meta/service/tests/it/api/http/metrics.rs index 1b3b497baacf3..cf189ec8a395d 100644 --- a/src/meta/service/tests/it/api/http/metrics.rs +++ b/src/meta/service/tests/it/api/http/metrics.rs @@ -285,5 +285,10 @@ async fn test_metrics() -> anyhow::Result<()> { assert!(metric_keys.contains("metasrv_meta_network_watch_initialization_total")); assert!(metric_keys.contains("metasrv_meta_network_watch_change_total")); + // Stream metrics + assert!(metric_keys.contains("metasrv_meta_network_stream_get_item_sent_total")); + assert!(metric_keys.contains("metasrv_meta_network_stream_mget_item_sent_total")); + assert!(metric_keys.contains("metasrv_meta_network_stream_list_item_sent_total")); + Ok(()) }