diff --git a/CHANGELOG.md b/CHANGELOG.md index f5b0712e..39446b0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- **Job-queue worker no longer leaks unhandled promise rejections on DB errors.** `JobQueueService` runs `executeJob` and `pollWorker` fire-and-forget from three callsites; only `pollAll` isolated rejections (via `allSettled`). A transient DB failure while finalizing a job (`repo.complete`/`repo.fail`) or claiming one (`repo.claimJob`) therefore surfaced as an unhandled rejection — a misleading "Unhandled Promise Rejection" log with no job context. `executeJob` now swallows + logs its own persistence errors (never rejects), and the immediate-start and per-job re-poll go through a `safePoll` wrapper that catches `pollWorker` rejections. The worker still frees the slot and re-polls in every case. + ## [0.7.0] - 2026-06-05 ### Added diff --git a/packages/gateway/src/services/job-queue-service.test.ts b/packages/gateway/src/services/job-queue-service.test.ts index e8722aac..9a2cb0b0 100644 --- a/packages/gateway/src/services/job-queue-service.test.ts +++ b/packages/gateway/src/services/job-queue-service.test.ts @@ -129,3 +129,73 @@ describe('JobQueueService pollWorker concurrency', () => { await new Promise((r) => setTimeout(r, 0)); }); }); + +describe('JobQueueService error resilience', () => { + type Svc = { + executeJob: (w: RunningWorkerLike, j: Record) => Promise; + pollWorker: (w: RunningWorkerLike) => Promise; + }; + interface RunningWorkerLike { + id: string; + queue: string; + concurrency: number; + handler: () => Promise>; + activeJobs: Set; + stopped: boolean; + polling: boolean; + } + const makeWorker = (over: Partial = {}): RunningWorkerLike => ({ + id: 'w', + queue: 'q', + concurrency: 2, + handler: async () => ({}), + activeJobs: new Set(), + stopped: false, + polling: false, + ...over, + }); + const job = { id: 'job-x', name: 'test', queue: 'q', payload: {} }; + + it('executeJob does not reject when repo.complete throws', async () => { + mockRepo.complete.mockRejectedValueOnce(new Error('db down')); + const svc = new JobQueueService() as unknown as Svc; + // Resolves (not rejects); falls back to repo.fail for retry. + await expect(svc.executeJob(makeWorker(), job)).resolves.toBeUndefined(); + expect(mockRepo.fail).toHaveBeenCalledWith('job-x', 'db down'); + }); + + it('executeJob does not reject when repo.fail also throws (no unhandled rejection)', async () => { + mockRepo.complete.mockRejectedValueOnce(new Error('complete failed')); + mockRepo.fail.mockRejectedValueOnce(new Error('fail failed')); + const svc = new JobQueueService() as unknown as Svc; + await expect(svc.executeJob(makeWorker(), job)).resolves.toBeUndefined(); + }); + + it('frees the slot via finally even when the job handler + persistence throw', async () => { + available = 1; + mockRepo.complete.mockRejectedValueOnce(new Error('complete failed')); + const svc = new JobQueueService() as unknown as Svc; + const worker = makeWorker({ concurrency: 2, handler: async () => ({}) }); + await svc.pollWorker(worker); + // Let executeJob + its finally settle. + await new Promise((r) => setTimeout(r, 20)); + expect(worker.activeJobs.size).toBe(0); + }); + + it('startWorker immediate poll does not emit an unhandled rejection when claimJob throws', async () => { + const unhandled: unknown[] = []; + const onUnhandled = (reason: unknown) => unhandled.push(reason); + process.on('unhandledRejection', onUnhandled); + try { + mockRepo.claimJob.mockRejectedValueOnce(new Error('claim db error')); + const svc = new JobQueueService(); + const stop = svc.startWorker(async () => ({}), { queue: 'q', name: 'w-claimthrow' }); + // Drain microtasks + a macrotask so any rejection would have surfaced. + await new Promise((r) => setTimeout(r, 20)); + stop(); + expect(unhandled).toHaveLength(0); + } finally { + process.off('unhandledRejection', onUnhandled); + } + }); +}); diff --git a/packages/gateway/src/services/job-queue-service.ts b/packages/gateway/src/services/job-queue-service.ts index 9df2ccf7..945c7e72 100644 --- a/packages/gateway/src/services/job-queue-service.ts +++ b/packages/gateway/src/services/job-queue-service.ts @@ -7,6 +7,7 @@ */ import { randomUUID } from 'node:crypto'; +import { getErrorMessage } from '@ownpilot/core'; import { getLog } from './log.js'; import { getJobsRepository, type CreateJobInput, type JobRecord } from '../db/repositories/jobs.js'; @@ -134,7 +135,7 @@ export class JobQueueService { } // Trigger first poll immediately - this.pollWorker(worker); + this.safePoll(worker); // Return stop function return () => this.stopWorker(workerId); @@ -160,6 +161,19 @@ export class JobQueueService { ); } + /** + * Fire-and-forget pollWorker invocation. pollWorker rejects if claimJob + * throws (a transient DB error). The immediate-start poll and the per-job + * re-poll don't await it, so without this wrapper that rejection surfaces as + * an unhandled promise rejection. (pollAll already isolates rejections via + * allSettled — this brings the other two callsites in line.) + */ + private safePoll(worker: RunningWorker): void { + this.pollWorker(worker).catch((err) => { + log.error('pollWorker error', { workerId: worker.id, error: getErrorMessage(err) }); + }); + } + private async pollWorker(worker: RunningWorker): Promise { if (worker.stopped || worker.polling) return; if (worker.activeJobs.size >= worker.concurrency) return; @@ -176,9 +190,11 @@ export class JobQueueService { if (!job) break; worker.activeJobs.add(job.id); + // executeJob is guaranteed not to reject (it swallows + logs its own + // errors); the finally frees the slot and re-polls via safePoll. this.executeJob(worker, job).finally(() => { worker.activeJobs.delete(job.id); - this.pollWorker(worker); + this.safePoll(worker); }); } } finally { @@ -195,9 +211,21 @@ export class JobQueueService { await this.repo.complete(job.id, result); log.debug('Job completed', { ...logCtx }); } catch (err) { - const error = err instanceof Error ? err.message : String(err); + const error = getErrorMessage(err); log.warn('Job failed', { ...logCtx, error }); - await this.repo.fail(job.id, error); + // repo.fail is itself a DB write that can throw (e.g. transient + // connection loss). executeJob runs fire-and-forget from pollWorker, so + // a throw here would surface as an unhandled rejection. Swallow + log so + // executeJob never rejects and the slot is still freed by the caller's + // finally. + try { + await this.repo.fail(job.id, error); + } catch (failErr) { + log.error('Failed to persist job failure', { + ...logCtx, + error: getErrorMessage(failErr), + }); + } } }