diff --git a/Cargo.lock b/Cargo.lock index c178c169c1f4f..0ef63125f6457 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16527,8 +16527,8 @@ dependencies = [ [[package]] name = "watcher" -version = "0.4.1" -source = "git+https://github.com/databendlabs/watcher?tag=v0.4.1#70d512a0beeccc7d7e1ce588a6e90b8ad6b875db" +version = "0.4.2" +source = "git+https://github.com/databendlabs/watcher?tag=v0.4.2#38e5109dd07a0fd496a48646dddd7de43242d75d" dependencies = [ "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index 6e5289203b0f1..0975c032a9b22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -535,7 +535,7 @@ url = "2.5.4" uuid = { version = "1.10.0", features = ["std", "serde", "v4", "v7"] } volo-thrift = "0.10" walkdir = "2.3.2" -watcher = { version = "0.4.1" } +watcher = { version = "0.4.2" } wiremock = "0.6" wkt = "0.11.1" xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] } @@ -661,5 +661,5 @@ sub-cache = { git = "https://github.com/databendlabs/sub-cache", tag = "v0.2.1" tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" } tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" } tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "0e300e9" } -watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.4.1" } +watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.4.2" } xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" } diff --git a/src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs b/src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs index 58d6c6e2c0433..15aa18862f0f6 100644 --- a/src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs +++ b/src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs @@ -33,6 +33,7 @@ use crate::leveled_store::rotbl_codec::RotblCodec; use crate::leveled_store::util; use crate::marked::Marked; use crate::state_machine::ExpireKey; +use crate::utils::add_cooperative_yielding; /// The data to compact. /// @@ -141,6 +142,8 @@ impl<'a> CompactingData<'a> { // Filter out tombstone let normal_strm = coalesce.try_filter(|(_k, v)| future::ready(v.is_normal())); + let normal_strm = add_cooperative_yielding(normal_strm, "compact"); + Ok((sys_data, normal_strm.boxed())) } } diff --git a/src/meta/raft-store/src/state_machine_api_ext.rs b/src/meta/raft-store/src/state_machine_api_ext.rs index 42d8e9136f621..fb8e4694be87a 100644 --- a/src/meta/raft-store/src/state_machine_api_ext.rs +++ b/src/meta/raft-store/src/state_machine_api_ext.rs @@ -39,6 +39,7 @@ use crate::leveled_store::map_api::MarkedOf; use crate::marked::Marked; use crate::state_machine::ExpireKey; use crate::state_machine_api::StateMachineApi; +use crate::utils::add_cooperative_yielding; use crate::utils::prefix_right_bound; #[async_trait::async_trait] @@ -111,7 +112,9 @@ pub trait StateMachineApiExt: StateMachineApi { let strm = strm // Return only keys with the expected prefix - .try_take_while(move |(k, _)| future::ready(Ok(k.starts_with(&p)))) + .try_take_while(move |(k, _)| future::ready(Ok(k.starts_with(&p)))); + + let strm = add_cooperative_yielding(strm, format!("list_kv: {prefix}")) // Skip tombstone .try_filter_map(|(k, marked)| future::ready(Ok(marked_to_seqv(k, marked)))); @@ -121,10 +124,15 @@ pub trait StateMachineApiExt: StateMachineApi { /// Return a range of kv entries. async fn range_kv(&self, rng: R) -> Result, io::Error> where R: RangeBounds + Send + Sync + Clone + 'static { - let strm = self.map_ref().str_map().range(rng).await?; + let left = rng.start_bound().cloned(); + let right = rng.end_bound().cloned(); - // Skip tombstone - let strm = strm.try_filter_map(|(k, marked)| future::ready(Ok(marked_to_seqv(k, marked)))); + let leveled_map = self.map_ref(); + let strm = leveled_map.str_map().range(rng).await?; + + let strm = add_cooperative_yielding(strm, format!("range_kv: {left:?} to {right:?}")) + // Skip tombstone + .try_filter_map(|(k, marked)| future::ready(Ok(marked_to_seqv(k, marked)))); Ok(strm.boxed()) } @@ -181,12 +189,13 @@ pub trait StateMachineApiExt: StateMachineApi { let strm = self.map_ref().expire_map().range(start..end).await?; - let strm = strm - // Return only non-deleted records - .try_filter_map(|(k, marked)| { - let expire_entry = marked.unpack().map(|(v, _v_meta)| (k, v)); - future::ready(Ok(expire_entry)) - }); + let strm = + add_cooperative_yielding(strm, format!("list_expire_index since {start} to {end}")) + // Return only non-deleted records + .try_filter_map(|(k, marked)| { + let expire_entry = marked.unpack().map(|(v, _v_meta)| (k, v)); + future::ready(Ok(expire_entry)) + }); Ok(strm.boxed()) } diff --git a/src/meta/raft-store/src/utils.rs b/src/meta/raft-store/src/utils.rs index 803ee071dc1a0..73c7d6245a06e 100644 --- a/src/meta/raft-store/src/utils.rs +++ b/src/meta/raft-store/src/utils.rs @@ -12,6 +12,44 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; + +use futures::Stream; +use futures_util::StreamExt; +use log::info; + +/// Add cooperative yielding to a stream to prevent task starvation. +/// +/// This yields control back to the async runtime every 100 items to prevent +/// blocking other concurrent tasks when processing large streams. +pub(crate) fn add_cooperative_yielding( + stream: S, + stream_name: impl fmt::Display + Send, +) -> impl Stream +where + S: Stream, + T: Send + 'static, +{ + stream.enumerate().then(move |(index, item)| { + // Yield control every 100 items to prevent blocking other tasks + let to_yield = if index % 100 == 0 { + if index % 5000 == 0 { + info!("{stream_name} yield control to allow other tasks to run: index={index}"); + } + true + } else { + false + }; + + async move { + if to_yield { + tokio::task::yield_now().await; + } + item + } + }) +} + /// Return the right bound of the prefix, so that `p..right` will cover all strings with prefix `p`. /// /// If the right bound can not be built, return None. diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index 24dec581d5581..88334f51e6361 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -53,7 +53,6 @@ use databend_common_meta_types::LogEntry; use databend_common_meta_types::TxnReply; use databend_common_meta_types::TxnRequest; use databend_common_metrics::count::Count; -use display_more::DisplayOptionExt; use fastrace::func_name; use fastrace::func_path; use fastrace::prelude::*; @@ -449,8 +448,11 @@ impl MetaService for MetaServiceImpl { let sm = &mn.raft_store.state_machine; let sm = sm.write().await; - let weak_sender = mn.add_watcher(watch, tx.clone()).await?; - let sender_str = weak_sender.upgrade().map(|s| s.to_string()); + info!("enter sm write lock for watch {}", watch); + + let sender = mn.new_watch_sender(watch, tx.clone())?; + let sender_str = sender.to_string(); + let weak_sender = mn.insert_watch_sender(sender); // Build a closure to remove the stream tx from Dispatcher when the stream is dropped. let on_drop = { @@ -467,9 +469,15 @@ impl MetaService for MetaServiceImpl { let snk = new_initialization_sink::(tx.clone(), ctx); let strm = sm.range_kv(key_range).await?; + info!("created initialization stream for {}", sender_str); + + let sndr = sender_str.clone(); + let fu = async move { try_forward(strm, snk, ctx).await; + info!("initialization flush complete for watcher {}", sndr); + // Send an empty message with `is_initialization=false` to indicate // the end of the initialization flush. tx.send(Ok(WatchResponse::new_initialization_complete())) @@ -478,12 +486,17 @@ impl MetaService for MetaServiceImpl { error!("failed to send flush complete message: {}", e); }) .ok(); + + info!( + "finished sending initialization complete flag for watcher {}", + sndr + ); }; let fu = Box::pin(fu); info!( "sending initial flush Future to watcher {} via Dispatcher", - sender_str.display() + sender_str ); mn.dispatcher_handle.send_future(fu); diff --git a/src/meta/service/src/meta_service/meta_node.rs b/src/meta/service/src/meta_service/meta_node.rs index 81a131b03fbda..65a413630f0d1 100644 --- a/src/meta/service/src/meta_service/meta_node.rs +++ b/src/meta/service/src/meta_service/meta_node.rs @@ -1174,26 +1174,34 @@ impl MetaNode { } } - pub(crate) async fn add_watcher( + pub(crate) fn insert_watch_sender( + &self, + sender: Arc>, + ) -> Weak> { + let weak = Arc::downgrade(&sender); + + self.dispatcher_handle + .request(move |dispatcher: &mut Dispatcher| { + dispatcher.insert_watch_stream_sender(sender); + }); + + weak + } + + pub(crate) fn new_watch_sender( &self, request: WatchRequest, tx: mpsc::Sender>, - ) -> Result>, Status> { - let stream_sender = self - .dispatcher_handle - .request_blocking(move |dispatcher: &mut Dispatcher| { - let key_range = match build_key_range(&request.key, &request.key_end) { - Ok(kr) => kr, - Err(e) => return Err(Status::invalid_argument(e.to_string())), - }; - - let interested = event_filter_from_filter_type(request.filter_type()); - Ok(dispatcher.add_watcher(key_range, interested, tx)) - }) - .await - .map_err(|_e| Status::internal("watch-event-Dispatcher closed"))??; + ) -> Result>, Status> { + let key_range = match build_key_range(&request.key, &request.key_end) { + Ok(kr) => kr, + Err(e) => return Err(Status::invalid_argument(e.to_string())), + }; + + let interested = event_filter_from_filter_type(request.filter_type()); - Ok(stream_sender) + let sender = Dispatcher::new_watch_stream_sender(key_range.clone(), interested, tx); + Ok(sender) } /// Get a kvapi::KVApi implementation.