Skip to content

Commit cc4a165

Browse files
Shubham8287bfops
authored andcommitted
fix #3174 (#3179)
# Description of Changes fixes #3174 . During initialization, entries were added to the `DelayQueue` but not to `key_map`. ### Detailed Explanation: 1. `DelayQueue` is **not set-semantic**, so we track uniqueness with a `key_map: FxHashMap` but that wasn't updated during initiliaziation. 2. `Scheduler::schedule` is **not transactional**: it enqueues reducers even if the DB transaction later fails (abort, duplicate row, etc.). On yield, `SchedulerActor` checks the DB before execution. **Combined Effect**: A transaction that does not actually change a scheduled entry but still calls `Scheduler::schedule` after a module update will cause a duplicate entry in the `DelayQueue`, since `key_map` does not yet contain that entry. **Why It Didn’t Show Earlier**: When a repeating reducer executes, we re-schedule it by updating both `DelayQueue` and `key_map` correctly. The bug only appears in the window after updating module but before the first execution, if a transaction calls schedule without actually modifying the DB row. Which was indeed happening as per discord chat: > but yeah most likely order of event was modue was updated > and then update_scheduled_timers_from_static_data was called window between update module and first execution is 1 hour for this case. ## Repo steps: 1. publish this module, it makes `send_scheduled_message` reducer to be called every 10 secs. ```rust #[spacetimedb::table(name = scheduled_message, public, scheduled(send_scheduled_message))] pub struct ScheduledMessage { #[primary_key] #[auto_inc] scheduled_id: u64, scheduled_at: ScheduleAt, } #[spacetimedb::reducer] fn send_scheduled_message(ctx: &ReducerContext, sched: ScheduledMessage) -> Result<(), String> { info!("Sending scheduled message: {:?}", ctx.timestamp); Ok(()) } #[spacetimedb::reducer(init)] pub fn init(ctx: &ReducerContext) { ctx.db.scheduled_message().insert(ScheduledMessage { scheduled_id: 0, scheduled_at: Duration::from_secs(10).into(), }); } #[spacetimedb::reducer] pub fn update_timer(ctx: &ReducerContext) { for mut timer in ctx.db.scheduled_message().iter() { timer.scheduled_at = Duration::from_secs(10).into(); ctx.db.scheduled_message().scheduled_id().update(timer); log::info!("building decay agent timer was updated"); } } ``` 2. Update module to support automigration (add a table) and re-publish it. 3. Call reducer `update_timer` and do it before first execution of `send_scheduled_message` after updating module. 4. As `update_timer` doesn't change the existing scheduler but calls `Scheduler::schedule` it will cause duplicate entry in `DelayQueue`. # API and ABI breaking changes N/A # Expected complexity level and risk 1, pretty obvious fix. # Testing manually. The code fix is straightforward, but the issue only becomes visible under specific conditions.
1 parent f7d94ef commit cc4a165

1 file changed

Lines changed: 13 additions & 2 deletions

File tree

crates/core/src/host/scheduler.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ impl SchedulerStarter {
9191
// time to make it better right now.
9292
pub fn start(mut self, module_host: &ModuleHost) -> anyhow::Result<()> {
9393
let mut queue: DelayQueue<QueueItem> = DelayQueue::new();
94+
let mut key_map = FxHashMap::default();
9495

9596
let tx = self.db.begin_tx(Workload::Internal);
9697

@@ -126,15 +127,25 @@ impl SchedulerStarter {
126127
id_column,
127128
at_column,
128129
};
129-
queue.insert_at(QueueItem::Id { id, at }, now_instant + duration);
130+
let key = queue.insert_at(QueueItem::Id { id, at }, now_instant + duration);
131+
132+
// This should never happen as duplicate entries should be gated by unique
133+
// constraint voilation in scheduled tables.
134+
if key_map.insert(id, key).is_some() {
135+
return Err(anyhow!(
136+
"Duplicate key found in scheduler queue: table_id {}, schedule_id {}",
137+
id.table_id,
138+
id.schedule_id
139+
));
140+
}
130141
}
131142
}
132143

133144
tokio::spawn(
134145
SchedulerActor {
135146
rx: self.rx,
136147
queue,
137-
key_map: FxHashMap::default(),
148+
key_map,
138149
module_host: module_host.downgrade(),
139150
}
140151
.run(),

0 commit comments

Comments
 (0)