@@ -15,17 +15,17 @@ use tokio::sync::mpsc;
1515use tokio:: time:: Instant ;
1616use tokio_util:: time:: delay_queue:: { self , DelayQueue , Expired } ;
1717
18- use crate :: db:: datastore:: locking_tx_datastore:: MutTxId ;
19- use crate :: db:: datastore:: system_tables:: { StFields , StScheduledFields , ST_SCHEDULED_ID } ;
20- use crate :: db:: datastore:: traits:: IsolationLevel ;
21- use crate :: db:: relational_db:: RelationalDB ;
22- use crate :: execution_context:: Workload ;
23-
2418use super :: module_host:: ModuleEvent ;
2519use super :: module_host:: ModuleFunctionCall ;
2620use super :: module_host:: { CallReducerParams , WeakModuleHost } ;
2721use super :: module_host:: { DatabaseUpdate , EventStatus } ;
2822use super :: { ModuleHost , ReducerArgs , ReducerCallError } ;
23+ use crate :: db:: datastore:: locking_tx_datastore:: MutTxId ;
24+ use crate :: db:: datastore:: system_tables:: { StFields , StScheduledFields , ST_SCHEDULED_ID } ;
25+ use crate :: db:: datastore:: traits:: IsolationLevel ;
26+ use crate :: db:: relational_db:: RelationalDB ;
27+ use crate :: execution_context:: Workload ;
28+ use crate :: util:: asyncify;
2929
3030#[ derive( Copy , Clone , Eq , PartialEq , Hash ) ]
3131pub struct ScheduledReducerId {
@@ -398,16 +398,50 @@ impl SchedulerActor {
398398 } ;
399399 }
400400
401- /// Handle repeated schedule by adding it back to queue
402- /// return true if it is repeated schedule
403- fn handle_repeated_schedule (
401+ async fn delete_scheduled_reducer_row (
404402 & mut self ,
403+ db : & RelationalDB ,
405404 id : ScheduledReducerId ,
406- schedule_row : & RowRef < ' _ > ,
407- ) -> Result < bool , anyhow:: Error > {
408- let schedule_at = read_schedule_at ( schedule_row, id. at_column ) ?;
405+ module_host : ModuleHost ,
406+ ) {
407+ let host_clone = module_host. clone ( ) ;
408+ let db = db. clone ( ) ;
409+ let schedule_at = asyncify ( move || {
410+ let mut tx = db. begin_mut_tx ( IsolationLevel :: Serializable , Workload :: Internal ) ;
411+
412+ match get_schedule_row_mut ( & tx, & db, id) {
413+ Ok ( schedule_row) => {
414+ if let Ok ( schedule_at) = read_schedule_at ( & schedule_row, id. at_column ) {
415+ // If the schedule is an interval, we handle it as a repeated schedule
416+ if let ScheduleAt :: Interval ( _) = schedule_at {
417+ return Some ( schedule_at) ;
418+ }
419+ let row_ptr = schedule_row. pointer ( ) ;
420+ db. delete ( & mut tx, id. table_id , [ row_ptr] ) ;
421+
422+ commit_and_broadcast_deletion_event ( tx, host_clone) ;
423+ } else {
424+ log:: debug!(
425+ "Failed to read 'scheduled_at' from row: table_id {}, schedule_id {}" ,
426+ id. table_id,
427+ id. schedule_id
428+ ) ;
429+ }
430+ }
431+ Err ( _) => {
432+ log:: debug!(
433+ "Table row corresponding to yield scheduler ID not found: table_id {}, scheduler_id {}" ,
434+ id. table_id,
435+ id. schedule_id
436+ ) ;
437+ }
438+ }
439+ None
440+ } )
441+ . await ;
409442
410- if let ScheduleAt :: Interval ( dur) = schedule_at {
443+ // If this was repeated, we need to add it back to the queue.
444+ if let Some ( ScheduleAt :: Interval ( dur) ) = schedule_at {
411445 let key = self . queue . insert (
412446 QueueItem :: Id {
413447 id,
@@ -416,40 +450,6 @@ impl SchedulerActor {
416450 dur. to_duration ( ) . unwrap_or ( Duration :: ZERO ) ,
417451 ) ;
418452 self . key_map . insert ( id, key) ;
419- Ok ( true )
420- } else {
421- Ok ( false )
422- }
423- }
424-
425- async fn delete_scheduled_reducer_row (
426- & mut self ,
427- db : & RelationalDB ,
428- id : ScheduledReducerId ,
429- module_host : ModuleHost ,
430- ) {
431- let mut tx = db. begin_mut_tx ( IsolationLevel :: Serializable , Workload :: Internal ) ;
432-
433- match get_schedule_row_mut ( & tx, db, id) {
434- Ok ( schedule_row) => {
435- if let Ok ( is_repeated) = self . handle_repeated_schedule ( id, & schedule_row) {
436- if is_repeated {
437- return ; // Do not delete entry for repeated reducer
438- }
439-
440- let row_ptr = schedule_row. pointer ( ) ;
441- db. delete ( & mut tx, id. table_id , [ row_ptr] ) ;
442-
443- commit_and_broadcast_deletion_event ( tx, module_host) ;
444- }
445- }
446- Err ( _) => {
447- log:: debug!(
448- "Table row corresponding to yield scheduler ID not found: table_id {}, scheduler_id {}" ,
449- id. table_id,
450- id. schedule_id
451- ) ;
452- }
453453 }
454454 }
455455}
0 commit comments