Skip to content

Commit ca76c96

Browse files
committed
fix(redis): make deleteSchedule atomic with multi/exec
1 parent eec8031 commit ca76c96

File tree

2 files changed

+43
-2
lines changed

2 files changed

+43
-2
lines changed

src/drivers/redis_adapter.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -815,8 +815,7 @@ export class RedisAdapter implements Adapter {
815815

816816
async deleteSchedule(id: string): Promise<void> {
817817
const scheduleKey = `${schedulesKey}::${id}`
818-
await this.#connection.del(scheduleKey)
819-
await this.#connection.srem(schedulesIndexKey, id)
818+
await this.#connection.multi().del(scheduleKey).srem(schedulesIndexKey, id).exec()
820819
}
821820

822821
async claimDueSchedule(): Promise<ScheduleData | null> {

tests/adapter.spec.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,48 @@ test.group('Adapter | Redis', (group) => {
8686
stream.write = originalWrite
8787
}
8888
})
89+
90+
test('deleteSchedule should not leave ghost index under write-failure chaos', async ({ assert }) => {
91+
const adapter = new RedisAdapter(connection)
92+
const id = 'chaos-delete-schedule'
93+
94+
await adapter.upsertSchedule({
95+
id,
96+
name: 'ChaosJob',
97+
payload: {},
98+
everyMs: 60_000,
99+
timezone: 'UTC',
100+
})
101+
102+
const stream = (connection as any).stream as { write: (...args: any[]) => any }
103+
const originalWrite = stream.write.bind(stream)
104+
let writes = 0
105+
106+
stream.write = ((...args: any[]) => {
107+
writes++
108+
if (writes === 2) {
109+
throw new Error('chaos: second network write blocked')
110+
}
111+
return originalWrite(...args)
112+
}) as typeof stream.write
113+
114+
try {
115+
await adapter.deleteSchedule(id)
116+
} finally {
117+
stream.write = originalWrite
118+
}
119+
120+
const scheduleExists = await connection.exists(`schedules::${id}`)
121+
const indexContains = await connection.sismember('schedules::index', id)
122+
123+
assert.equal(scheduleExists, 0)
124+
assert.equal(indexContains, 0)
125+
assert.equal(
126+
writes,
127+
1,
128+
'deleteSchedule should be emitted in a single write window to avoid partial state'
129+
)
130+
})
89131
})
90132

91133
test.group('Adapter | Knex (SQLite)', (group) => {

0 commit comments

Comments
 (0)