Skip to content

Commit 5cd81ef

Browse files
evantahlerclaude
andauthored
Fix fire-and-forget async calls causing unhandled promise rejections during shutdown (#1261)
* Fix fire-and-forget async calls causing unhandled promise rejections during shutdown Add .catch() handlers to all fire-and-forget async calls in Worker and Scheduler that were previously discarding Promise return values from timer callbacks. This prevents unhandled rejections when a shared Redis connection is closed during shutdown. Fixes #1260. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Add tests for fire-and-forget error handling during shutdown Verify that Worker and Scheduler emit errors via the "error" event instead of causing unhandled promise rejections when Redis commands fail during ping/poll timer callbacks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 086f808 commit 5cd81ef

4 files changed

Lines changed: 73 additions & 5 deletions

File tree

__tests__/core/scheduler.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,36 @@ describe("scheduler", () => {
8787
await queue.end();
8888
});
8989

90+
test("emits error instead of unhandled rejection when poll fails", async () => {
91+
const testScheduler = new Scheduler({
92+
connection: specHelper.cleanConnectionDetails(),
93+
timeout: specHelper.timeout,
94+
});
95+
await testScheduler.connect();
96+
await testScheduler.start();
97+
98+
const errorPromise = new Promise<Error>((resolve) => {
99+
testScheduler.on("error", (err) => resolve(err));
100+
});
101+
102+
// stub redis.set to simulate a closed connection during tryForLeader
103+
const originalSet = testScheduler.connection.redis.set.bind(
104+
testScheduler.connection.redis,
105+
);
106+
const redisError = new Error("Connection is closed");
107+
testScheduler.connection.redis.set = async () => {
108+
throw redisError;
109+
};
110+
111+
// wait for pollAgainLater to fire poll() which will fail
112+
const emittedError = await errorPromise;
113+
expect(emittedError).toBe(redisError);
114+
115+
// restore and clean up
116+
testScheduler.connection.redis.set = originalSet;
117+
await testScheduler.end();
118+
});
119+
90120
test("queues can see who the leader is", async () => {
91121
await scheduler.poll();
92122
const leader = await queue.leader();

__tests__/core/worker.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,3 +358,39 @@ describe("worker", () => {
358358
});
359359
});
360360
});
361+
362+
describe("worker error handling during shutdown", () => {
363+
test("emits error instead of unhandled rejection when ping fails", async () => {
364+
const testWorker = new Worker(
365+
{
366+
connection: specHelper.cleanConnectionDetails(),
367+
timeout: specHelper.timeout,
368+
queues: ["shutdown_test_queue"],
369+
},
370+
jobs,
371+
);
372+
await testWorker.connect();
373+
await testWorker.start();
374+
375+
const errorPromise = new Promise<Error>((resolve) => {
376+
testWorker.on("error", (err) => resolve(err));
377+
});
378+
379+
// stub redis.set to simulate a closed connection during ping
380+
const originalSet = testWorker.connection.redis.set.bind(
381+
testWorker.connection.redis,
382+
);
383+
const redisError = new Error("Connection is closed");
384+
testWorker.connection.redis.set = async () => {
385+
throw redisError;
386+
};
387+
388+
// wait for the ping interval to fire and fail
389+
const emittedError = await errorPromise;
390+
expect(emittedError).toBe(redisError);
391+
392+
// restore and clean up
393+
testWorker.connection.redis.set = originalSet;
394+
await testWorker.end();
395+
});
396+
});

src/core/scheduler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ export class Scheduler extends EventEmitter {
159159
private async pollAgainLater() {
160160
if (this.running === true) {
161161
this.timer = setTimeout(() => {
162-
this.poll();
162+
this.poll().catch((e) => this.emit("error", e));
163163
}, this.options.timeout);
164164
}
165165
}

src/core/worker.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ export class Worker extends EventEmitter {
148148
this.started = true;
149149
this.emit("start", new Date());
150150
await this.init();
151-
this.poll();
151+
this.poll().catch((e) => this.emit("error", e));
152152
}
153153
}
154154

@@ -159,7 +159,9 @@ export class Worker extends EventEmitter {
159159
Math.round(new Date().getTime() / 1000),
160160
);
161161
await this.ping();
162-
this.pingTimer = setInterval(this.ping.bind(this), this.options.timeout);
162+
this.pingTimer = setInterval(() => {
163+
this.ping().catch((e) => this.emit("error", e));
164+
}, this.options.timeout);
163165
}
164166

165167
async end(): Promise<void> {
@@ -399,7 +401,7 @@ export class Worker extends EventEmitter {
399401
this.job = null;
400402

401403
if (this.options.looping) {
402-
this.poll();
404+
this.poll().catch((e) => this.emit("error", e));
403405
}
404406
}
405407

@@ -443,7 +445,7 @@ export class Worker extends EventEmitter {
443445
this.emit("pause");
444446
await new Promise((resolve) => {
445447
this.pollTimer = setTimeout(() => {
446-
this.poll();
448+
this.poll().catch((e) => this.emit("error", e));
447449
resolve(null);
448450
}, this.options.timeout);
449451
});

0 commit comments

Comments
 (0)