Skip to content

Commit eec8031

Browse files
committed
perf(redis): pipeline listSchedules to avoid N+1 queries
1 parent c9b731d commit eec8031

File tree

2 files changed

+64
-8
lines changed

2 files changed

+64
-8
lines changed

src/drivers/redis_adapter.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -757,17 +757,36 @@ export class RedisAdapter implements Adapter {
757757

758758
async listSchedules(options?: ScheduleListOptions): Promise<ScheduleData[]> {
759759
const ids = await this.#connection.smembers(schedulesIndexKey)
760-
const schedules: ScheduleData[] = []
760+
if (ids.length === 0) {
761+
return []
762+
}
763+
764+
const pipeline = this.#connection.pipeline()
761765

762766
for (const id of ids) {
763-
const schedule = await this.getSchedule(id)
764-
if (schedule) {
765-
// Filter by status if provided
766-
if (options?.status && schedule.status !== options.status) {
767-
continue
768-
}
769-
schedules.push(schedule)
767+
pipeline.hgetall(`${schedulesKey}::${id}`)
768+
}
769+
770+
const results = await pipeline.exec()
771+
if (!results) {
772+
return []
773+
}
774+
775+
const schedules: ScheduleData[] = []
776+
777+
for (const [, data] of results) {
778+
if (!data || Object.keys(data).length === 0) {
779+
continue
780+
}
781+
782+
const schedule = this.#hashToScheduleData(data as Record<string, string>)
783+
784+
// Filter by status if provided
785+
if (options?.status && schedule.status !== options.status) {
786+
continue
770787
}
788+
789+
schedules.push(schedule)
771790
}
772791

773792
return schedules

tests/adapter.spec.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,43 @@ test.group('Adapter | Redis', (group) => {
4949
test,
5050
createAdapter: () => new RedisAdapter(connection),
5151
})
52+
53+
test('listSchedules should use bounded network round-trips as schedule count grows', async ({
54+
assert,
55+
}) => {
56+
const adapter = new RedisAdapter(connection)
57+
58+
for (let i = 0; i < 50; i++) {
59+
await adapter.upsertSchedule({
60+
id: `perf-schedule-${i}`,
61+
name: 'PerfJob',
62+
payload: { i },
63+
everyMs: 60_000,
64+
timezone: 'UTC',
65+
})
66+
}
67+
68+
const stream = (connection as any).stream as { write: (...args: any[]) => any }
69+
const originalWrite = stream.write.bind(stream)
70+
let writes = 0
71+
72+
stream.write = ((...args: any[]) => {
73+
writes++
74+
return originalWrite(...args)
75+
}) as typeof stream.write
76+
77+
try {
78+
const schedules = await adapter.listSchedules()
79+
assert.lengthOf(schedules, 50)
80+
assert.isAtMost(
81+
writes,
82+
4,
83+
`Expected bounded write count with pipelining, got ${writes} writes for 50 schedules`
84+
)
85+
} finally {
86+
stream.write = originalWrite
87+
}
88+
})
5289
})
5390

5491
test.group('Adapter | Knex (SQLite)', (group) => {

0 commit comments

Comments
 (0)