@@ -35,7 +35,7 @@ use datafusion::{
3535 functions_aggregate:: expr_fn:: last_value,
3636 logical_expr:: { col, lit, ExprFunctionExt as _} ,
3737 physical_plan:: stream:: RecordBatchStreamAdapter ,
38- prelude:: { wildcard, SessionContext } ,
38+ prelude:: { wildcard, Expr , SessionContext } ,
3939} ;
4040use futures:: TryStreamExt as _;
4141use shutdown:: { Shutdown , ShutdownSignal } ;
@@ -47,7 +47,7 @@ use crate::{
4747 cache_table:: CacheTable ,
4848 metrics:: Metrics ,
4949 schemas,
50- since:: { rows_since, FeedTable , FeedTableSource } ,
50+ since:: { gt_expression , rows_since, FeedTable , FeedTableSource , RowsSinceInput } ,
5151 PipelineContext , Result , SessionContextRef ,
5252} ;
5353// Use the SubscribeSinceMsg so its clear its a message for this actor
@@ -310,7 +310,7 @@ async fn aggregator_subscription(
310310 let mut rx = aggregator
311311 . send ( SubscribeSinceMsg {
312312 projection : None ,
313- offset,
313+ filters : offset. map ( |o| vec ! [ gt_expression ( "event_state_order" , o ) ] ) ,
314314 limit : None ,
315315 } )
316316 . await ??;
@@ -354,40 +354,42 @@ impl Handler<SubscribeSinceMsg> for Resolver {
354354 ) -> <SubscribeSinceMsg as Message >:: Result {
355355 let subscription = self . broadcast_tx . subscribe ( ) ;
356356 let ctx = self . ctx . clone ( ) ;
357- rows_since (
358- schemas:: stream_states ( ) ,
359- "stream_state_order" ,
360- message. projection ,
361- message. offset ,
362- message. limit ,
363- Box :: pin ( RecordBatchStreamAdapter :: new (
357+ rows_since ( RowsSinceInput {
358+ session_context : & ctx,
359+ schema : schemas:: stream_states ( ) ,
360+ order_col : "stream_state_order" ,
361+ projection : message. projection ,
362+ filters : message. filters . clone ( ) ,
363+ limit : message. limit ,
364+ subscription : Box :: pin ( RecordBatchStreamAdapter :: new (
364365 schemas:: stream_states ( ) ,
365366 tokio_stream:: wrappers:: BroadcastStream :: new ( subscription)
366367 . map_err ( |err| exec_datafusion_err ! ( "{err}" ) ) ,
367368 ) ) ,
368369 // Future Optimization can be to send the projection and limit into the events_since call.
369- stream_states_since ( & ctx, message. offset ) . await ?,
370- )
370+ since : stream_states_since ( & ctx, message. filters ) . await ?,
371+ } )
371372 }
372373}
373374
374375async fn stream_states_since (
375376 ctx : & SessionContext ,
376- offset : Option < u64 > ,
377+ filters : Option < Vec < Expr > > ,
377378) -> Result < SendableRecordBatchStream > {
378379 let mut stream_states = ctx
379380 . table ( STREAM_STATES_TABLE )
380381 . await ?
381382 . select ( vec ! [ wildcard( ) ] ) ?
382383 // Do not return the partition columns
383- . drop_columns ( & [ "stream_cid_partition" ] ) ?;
384- if let Some ( offset) = offset {
385- stream_states = stream_states. filter ( col ( "stream_state_order" ) . gt ( lit ( offset) ) ) ?;
384+ . drop_columns ( & [ "stream_cid_partition" ] ) ?
385+ . sort ( vec ! [ col( "stream_state_order" ) . sort( true , true ) ] ) ?;
386+
387+ if let Some ( filters) = filters {
388+ for filter in filters {
389+ stream_states = stream_states. filter ( filter) ?;
390+ }
386391 }
387- Ok ( stream_states
388- . sort ( vec ! [ col( "stream_state_order" ) . sort( true , true ) ] ) ?
389- . execute_stream ( )
390- . await ?)
392+ Ok ( stream_states. execute_stream ( ) . await ?)
391393}
392394
393395/// Inform the resolver about new event states.
@@ -581,13 +583,13 @@ impl FeedTableSource for ResolverHandle {
581583 async fn subscribe_since (
582584 & self ,
583585 projection : Option < Vec < usize > > ,
584- offset : Option < u64 > ,
586+ filters : Option < Vec < Expr > > ,
585587 limit : Option < usize > ,
586588 ) -> anyhow:: Result < SendableRecordBatchStream > {
587589 Ok ( self
588590 . send ( SubscribeSinceMsg {
589591 projection,
590- offset ,
592+ filters ,
591593 limit,
592594 } )
593595 . await ??)
0 commit comments