1515use std:: any:: Any ;
1616use std:: sync:: Arc ;
1717use std:: time:: Duration ;
18+ use std:: time:: Instant ;
1819
20+ use databend_common_base:: base:: WatchNotify ;
1921use databend_common_catalog:: runtime_filter_info:: RuntimeFilterReady ;
2022use databend_common_catalog:: table_context:: TableContext ;
2123use databend_common_exception:: ErrorCode ;
@@ -26,7 +28,60 @@ use databend_common_pipeline::core::OutputPort;
2628use databend_common_pipeline:: core:: Processor ;
2729use databend_common_pipeline:: core:: ProcessorPtr ;
2830use databend_common_sql:: IndexType ;
29- use tokio:: time:: timeout;
31+
32+ const RUNTIME_FILTER_WAIT_TIMEOUT : Duration = Duration :: from_secs ( 30 ) ;
33+ const RUNTIME_FILTER_WAIT_POLL_INTERVAL : Duration = Duration :: from_millis ( 50 ) ;
34+
35+ async fn wait_runtime_filters (
36+ scan_id : IndexType ,
37+ input : & Arc < InputPort > ,
38+ output : & Arc < OutputPort > ,
39+ abort_notify : Arc < WatchNotify > ,
40+ runtime_filter_ready : & [ Arc < RuntimeFilterReady > ] ,
41+ ) -> Result < ( ) > {
42+ for runtime_filter_ready in runtime_filter_ready {
43+ let mut rx = runtime_filter_ready. runtime_filter_watcher . subscribe ( ) ;
44+ if ( * rx. borrow ( ) ) . is_some ( ) {
45+ continue ;
46+ }
47+
48+ let deadline = Instant :: now ( ) + RUNTIME_FILTER_WAIT_TIMEOUT ;
49+ loop {
50+ if output. is_finished ( ) {
51+ input. finish ( ) ;
52+ return Ok ( ( ) ) ;
53+ }
54+
55+ let now = Instant :: now ( ) ;
56+ if now >= deadline {
57+ log:: warn!(
58+ "Runtime filter wait timeout after {:?} for scan_id: {}" ,
59+ RUNTIME_FILTER_WAIT_TIMEOUT ,
60+ scan_id
61+ ) ;
62+ break ;
63+ }
64+
65+ let wait_duration = ( deadline - now) . min ( RUNTIME_FILTER_WAIT_POLL_INTERVAL ) ;
66+ tokio:: select! {
67+ changed = rx. changed( ) => {
68+ match changed {
69+ Ok ( ( ) ) => break ,
70+ Err ( _) => return Err ( ErrorCode :: TokioError ( "watcher's sender is dropped" ) ) ,
71+ }
72+ }
73+ _ = abort_notify. notified( ) => {
74+ return Err ( ErrorCode :: AbortedQuery (
75+ "query aborted while waiting for runtime filter" ,
76+ ) ) ;
77+ }
78+ _ = tokio:: time:: sleep( wait_duration) => { }
79+ }
80+ }
81+ }
82+
83+ Ok ( ( ) )
84+ }
3085
3186pub struct TransformRuntimeFilterWait {
3287 ctx : Arc < dyn TableContext > ,
@@ -111,30 +166,104 @@ impl Processor for TransformRuntimeFilterWait {
111166 self . runtime_filter_ready. len( )
112167 ) ;
113168
114- let timeout_duration = Duration :: from_secs ( 30 ) ;
115- for runtime_filter_ready in & self . runtime_filter_ready {
116- let mut rx = runtime_filter_ready. runtime_filter_watcher . subscribe ( ) ;
117- if ( * rx. borrow ( ) ) . is_some ( ) {
118- continue ;
119- }
120-
121- match timeout ( timeout_duration, rx. changed ( ) ) . await {
122- Ok ( Ok ( ( ) ) ) => { }
123- Ok ( Err ( _) ) => {
124- return Err ( ErrorCode :: TokioError ( "watcher's sender is dropped" ) ) ;
125- }
126- Err ( _) => {
127- log:: warn!(
128- "Runtime filter wait timeout after {:?} for scan_id: {}" ,
129- timeout_duration,
130- self . scan_id
131- ) ;
132- }
133- }
134- }
169+ wait_runtime_filters (
170+ self . scan_id ,
171+ & self . input ,
172+ & self . output ,
173+ self . ctx . get_abort_notify ( ) ,
174+ & self . runtime_filter_ready ,
175+ )
176+ . await ?;
135177
136178 self . runtime_filter_ready . clear ( ) ;
137179 self . wait_finished = true ;
138180 Ok ( ( ) )
139181 }
140182}
183+
184+ #[ cfg( test) ]
185+ mod tests {
186+ use std:: sync:: Arc ;
187+
188+ use databend_common_base:: base:: WatchNotify ;
189+ use databend_common_base:: runtime:: spawn;
190+ use databend_common_pipeline:: core:: InputPort ;
191+ use databend_common_pipeline:: core:: OutputPort ;
192+
193+ use super :: * ;
194+
195+ #[ tokio:: test]
196+ async fn test_wait_runtime_filters_returns_when_output_finished ( ) {
197+ let input = InputPort :: create ( ) ;
198+ let output = OutputPort :: create ( ) ;
199+ let ready = Arc :: new ( RuntimeFilterReady :: default ( ) ) ;
200+ let abort_notify = Arc :: new ( WatchNotify :: new ( ) ) ;
201+
202+ let output_cloned = output. clone ( ) ;
203+ spawn ( async move {
204+ tokio:: time:: sleep ( Duration :: from_millis ( 20 ) ) . await ;
205+ output_cloned. finish ( ) ;
206+ } ) ;
207+
208+ tokio:: time:: timeout (
209+ Duration :: from_millis ( 300 ) ,
210+ wait_runtime_filters ( 0 , & input, & output, abort_notify, & [ ready] ) ,
211+ )
212+ . await
213+ . expect ( "runtime filter wait should stop after branch finish" )
214+ . expect ( "branch finish should not return an error" ) ;
215+
216+ assert ! ( input. is_finished( ) ) ;
217+ }
218+
219+ #[ tokio:: test]
220+ async fn test_wait_runtime_filters_returns_when_query_aborted ( ) {
221+ let input = InputPort :: create ( ) ;
222+ let output = OutputPort :: create ( ) ;
223+ let ready = Arc :: new ( RuntimeFilterReady :: default ( ) ) ;
224+ let abort_notify = Arc :: new ( WatchNotify :: new ( ) ) ;
225+
226+ let abort_notify_cloned = abort_notify. clone ( ) ;
227+ spawn ( async move {
228+ tokio:: time:: sleep ( Duration :: from_millis ( 20 ) ) . await ;
229+ abort_notify_cloned. notify_waiters ( ) ;
230+ } ) ;
231+
232+ let err = tokio:: time:: timeout (
233+ Duration :: from_millis ( 300 ) ,
234+ wait_runtime_filters ( 0 , & input, & output, abort_notify, & [ ready] ) ,
235+ )
236+ . await
237+ . expect ( "runtime filter wait should stop after query abort" )
238+ . expect_err ( "query abort should propagate as an error" ) ;
239+
240+ assert_eq ! ( err. name( ) , "AbortedQuery" ) ;
241+ }
242+
243+ #[ tokio:: test]
244+ async fn test_wait_runtime_filters_returns_when_filter_notified ( ) {
245+ let input = InputPort :: create ( ) ;
246+ let output = OutputPort :: create ( ) ;
247+ let ready = Arc :: new ( RuntimeFilterReady :: default ( ) ) ;
248+ let abort_notify = Arc :: new ( WatchNotify :: new ( ) ) ;
249+
250+ let ready_cloned = ready. clone ( ) ;
251+ spawn ( async move {
252+ tokio:: time:: sleep ( Duration :: from_millis ( 20 ) ) . await ;
253+ ready_cloned
254+ . runtime_filter_watcher
255+ . send ( Some ( ( ) ) )
256+ . expect ( "watcher should stay open" ) ;
257+ } ) ;
258+
259+ tokio:: time:: timeout (
260+ Duration :: from_millis ( 300 ) ,
261+ wait_runtime_filters ( 0 , & input, & output, abort_notify, & [ ready] ) ,
262+ )
263+ . await
264+ . expect ( "runtime filter wait should stop after filter notification" )
265+ . expect ( "filter notification should not return an error" ) ;
266+
267+ assert ! ( !input. is_finished( ) ) ;
268+ }
269+ }
0 commit comments