Skip to content

Commit 245082e

Browse files
authored
Merge pull request #83 from ownpilot/fix/job-queue-unhandled-rejection
fix(jobs): stop JobQueueService leaking unhandled rejections on DB errors
2 parents 07612a6 + 33d7249 commit 245082e

3 files changed

Lines changed: 106 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- **Job-queue worker no longer leaks unhandled promise rejections on DB errors.** `JobQueueService` runs `executeJob` and `pollWorker` fire-and-forget from three callsites; only `pollAll` isolated rejections (via `allSettled`). A transient DB failure while finalizing a job (`repo.complete`/`repo.fail`) or claiming one (`repo.claimJob`) therefore surfaced as an unhandled rejection — a misleading "Unhandled Promise Rejection" log with no job context. `executeJob` now swallows + logs its own persistence errors (never rejects), and the immediate-start and per-job re-poll go through a `safePoll` wrapper that catches `pollWorker` rejections. The worker still frees the slot and re-polls in every case.
13+
1014
## [0.7.0] - 2026-06-05
1115

1216
### Added

packages/gateway/src/services/job-queue-service.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,73 @@ describe('JobQueueService pollWorker concurrency', () => {
129129
await new Promise((r) => setTimeout(r, 0));
130130
});
131131
});
132+
133+
describe('JobQueueService error resilience', () => {
134+
type Svc = {
135+
executeJob: (w: RunningWorkerLike, j: Record<string, unknown>) => Promise<void>;
136+
pollWorker: (w: RunningWorkerLike) => Promise<void>;
137+
};
138+
interface RunningWorkerLike {
139+
id: string;
140+
queue: string;
141+
concurrency: number;
142+
handler: () => Promise<Record<string, unknown>>;
143+
activeJobs: Set<string>;
144+
stopped: boolean;
145+
polling: boolean;
146+
}
147+
const makeWorker = (over: Partial<RunningWorkerLike> = {}): RunningWorkerLike => ({
148+
id: 'w',
149+
queue: 'q',
150+
concurrency: 2,
151+
handler: async () => ({}),
152+
activeJobs: new Set<string>(),
153+
stopped: false,
154+
polling: false,
155+
...over,
156+
});
157+
const job = { id: 'job-x', name: 'test', queue: 'q', payload: {} };
158+
159+
it('executeJob does not reject when repo.complete throws', async () => {
160+
mockRepo.complete.mockRejectedValueOnce(new Error('db down'));
161+
const svc = new JobQueueService() as unknown as Svc;
162+
// Resolves (not rejects); falls back to repo.fail for retry.
163+
await expect(svc.executeJob(makeWorker(), job)).resolves.toBeUndefined();
164+
expect(mockRepo.fail).toHaveBeenCalledWith('job-x', 'db down');
165+
});
166+
167+
it('executeJob does not reject when repo.fail also throws (no unhandled rejection)', async () => {
168+
mockRepo.complete.mockRejectedValueOnce(new Error('complete failed'));
169+
mockRepo.fail.mockRejectedValueOnce(new Error('fail failed'));
170+
const svc = new JobQueueService() as unknown as Svc;
171+
await expect(svc.executeJob(makeWorker(), job)).resolves.toBeUndefined();
172+
});
173+
174+
it('frees the slot via finally even when the job handler + persistence throw', async () => {
175+
available = 1;
176+
mockRepo.complete.mockRejectedValueOnce(new Error('complete failed'));
177+
const svc = new JobQueueService() as unknown as Svc;
178+
const worker = makeWorker({ concurrency: 2, handler: async () => ({}) });
179+
await svc.pollWorker(worker);
180+
// Let executeJob + its finally settle.
181+
await new Promise((r) => setTimeout(r, 20));
182+
expect(worker.activeJobs.size).toBe(0);
183+
});
184+
185+
it('startWorker immediate poll does not emit an unhandled rejection when claimJob throws', async () => {
186+
const unhandled: unknown[] = [];
187+
const onUnhandled = (reason: unknown) => unhandled.push(reason);
188+
process.on('unhandledRejection', onUnhandled);
189+
try {
190+
mockRepo.claimJob.mockRejectedValueOnce(new Error('claim db error'));
191+
const svc = new JobQueueService();
192+
const stop = svc.startWorker(async () => ({}), { queue: 'q', name: 'w-claimthrow' });
193+
// Drain microtasks + a macrotask so any rejection would have surfaced.
194+
await new Promise((r) => setTimeout(r, 20));
195+
stop();
196+
expect(unhandled).toHaveLength(0);
197+
} finally {
198+
process.off('unhandledRejection', onUnhandled);
199+
}
200+
});
201+
});

packages/gateway/src/services/job-queue-service.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88

99
import { randomUUID } from 'node:crypto';
10+
import { getErrorMessage } from '@ownpilot/core';
1011
import { getLog } from './log.js';
1112
import { getJobsRepository, type CreateJobInput, type JobRecord } from '../db/repositories/jobs.js';
1213

@@ -134,7 +135,7 @@ export class JobQueueService {
134135
}
135136

136137
// Trigger first poll immediately
137-
this.pollWorker(worker);
138+
this.safePoll(worker);
138139

139140
// Return stop function
140141
return () => this.stopWorker(workerId);
@@ -160,6 +161,19 @@ export class JobQueueService {
160161
);
161162
}
162163

164+
/**
165+
* Fire-and-forget pollWorker invocation. pollWorker rejects if claimJob
166+
* throws (a transient DB error). The immediate-start poll and the per-job
167+
* re-poll don't await it, so without this wrapper that rejection surfaces as
168+
* an unhandled promise rejection. (pollAll already isolates rejections via
169+
* allSettled — this brings the other two callsites in line.)
170+
*/
171+
private safePoll(worker: RunningWorker): void {
172+
this.pollWorker(worker).catch((err) => {
173+
log.error('pollWorker error', { workerId: worker.id, error: getErrorMessage(err) });
174+
});
175+
}
176+
163177
private async pollWorker(worker: RunningWorker): Promise<void> {
164178
if (worker.stopped || worker.polling) return;
165179
if (worker.activeJobs.size >= worker.concurrency) return;
@@ -176,9 +190,11 @@ export class JobQueueService {
176190
if (!job) break;
177191

178192
worker.activeJobs.add(job.id);
193+
// executeJob is guaranteed not to reject (it swallows + logs its own
194+
// errors); the finally frees the slot and re-polls via safePoll.
179195
this.executeJob(worker, job).finally(() => {
180196
worker.activeJobs.delete(job.id);
181-
this.pollWorker(worker);
197+
this.safePoll(worker);
182198
});
183199
}
184200
} finally {
@@ -195,9 +211,21 @@ export class JobQueueService {
195211
await this.repo.complete(job.id, result);
196212
log.debug('Job completed', { ...logCtx });
197213
} catch (err) {
198-
const error = err instanceof Error ? err.message : String(err);
214+
const error = getErrorMessage(err);
199215
log.warn('Job failed', { ...logCtx, error });
200-
await this.repo.fail(job.id, error);
216+
// repo.fail is itself a DB write that can throw (e.g. transient
217+
// connection loss). executeJob runs fire-and-forget from pollWorker, so
218+
// a throw here would surface as an unhandled rejection. Swallow + log so
219+
// executeJob never rejects and the slot is still freed by the caller's
220+
// finally.
221+
try {
222+
await this.repo.fail(job.id, error);
223+
} catch (failErr) {
224+
log.error('Failed to persist job failure', {
225+
...logCtx,
226+
error: getErrorMessage(failErr),
227+
});
228+
}
201229
}
202230
}
203231

0 commit comments

Comments
 (0)