Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- **Job-queue worker no longer leaks unhandled promise rejections on DB errors.** `JobQueueService` runs `executeJob` and `pollWorker` fire-and-forget from three callsites; only `pollAll` isolated rejections (via `allSettled`). A transient DB failure while finalizing a job (`repo.complete`/`repo.fail`) or claiming one (`repo.claimJob`) therefore surfaced as an unhandled rejection — a misleading "Unhandled Promise Rejection" log with no job context. `executeJob` now swallows + logs its own persistence errors (never rejects), and the immediate-start and per-job re-poll go through a `safePoll` wrapper that catches `pollWorker` rejections. The worker still frees the slot and re-polls in every case.

## [0.7.0] - 2026-06-05

### Added
Expand Down
70 changes: 70 additions & 0 deletions packages/gateway/src/services/job-queue-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,73 @@ describe('JobQueueService pollWorker concurrency', () => {
await new Promise((r) => setTimeout(r, 0));
});
});

describe('JobQueueService error resilience', () => {
type Svc = {
executeJob: (w: RunningWorkerLike, j: Record<string, unknown>) => Promise<void>;
pollWorker: (w: RunningWorkerLike) => Promise<void>;
};
interface RunningWorkerLike {
id: string;
queue: string;
concurrency: number;
handler: () => Promise<Record<string, unknown>>;
activeJobs: Set<string>;
stopped: boolean;
polling: boolean;
}
const makeWorker = (over: Partial<RunningWorkerLike> = {}): RunningWorkerLike => ({
id: 'w',
queue: 'q',
concurrency: 2,
handler: async () => ({}),
activeJobs: new Set<string>(),
stopped: false,
polling: false,
...over,
});
const job = { id: 'job-x', name: 'test', queue: 'q', payload: {} };

it('executeJob does not reject when repo.complete throws', async () => {
mockRepo.complete.mockRejectedValueOnce(new Error('db down'));
const svc = new JobQueueService() as unknown as Svc;
// Resolves (not rejects); falls back to repo.fail for retry.
await expect(svc.executeJob(makeWorker(), job)).resolves.toBeUndefined();
expect(mockRepo.fail).toHaveBeenCalledWith('job-x', 'db down');
});

it('executeJob does not reject when repo.fail also throws (no unhandled rejection)', async () => {
mockRepo.complete.mockRejectedValueOnce(new Error('complete failed'));
mockRepo.fail.mockRejectedValueOnce(new Error('fail failed'));
const svc = new JobQueueService() as unknown as Svc;
await expect(svc.executeJob(makeWorker(), job)).resolves.toBeUndefined();
});

it('frees the slot via finally even when the job handler + persistence throw', async () => {
available = 1;
mockRepo.complete.mockRejectedValueOnce(new Error('complete failed'));
const svc = new JobQueueService() as unknown as Svc;
const worker = makeWorker({ concurrency: 2, handler: async () => ({}) });
await svc.pollWorker(worker);
// Let executeJob + its finally settle.
await new Promise((r) => setTimeout(r, 20));
expect(worker.activeJobs.size).toBe(0);
});

it('startWorker immediate poll does not emit an unhandled rejection when claimJob throws', async () => {
const unhandled: unknown[] = [];
const onUnhandled = (reason: unknown) => unhandled.push(reason);
process.on('unhandledRejection', onUnhandled);
try {
mockRepo.claimJob.mockRejectedValueOnce(new Error('claim db error'));
const svc = new JobQueueService();
const stop = svc.startWorker(async () => ({}), { queue: 'q', name: 'w-claimthrow' });
// Drain microtasks + a macrotask so any rejection would have surfaced.
await new Promise((r) => setTimeout(r, 20));
stop();
expect(unhandled).toHaveLength(0);
} finally {
process.off('unhandledRejection', onUnhandled);
}
});
});
36 changes: 32 additions & 4 deletions packages/gateway/src/services/job-queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

import { randomUUID } from 'node:crypto';
import { getErrorMessage } from '@ownpilot/core';
import { getLog } from './log.js';
import { getJobsRepository, type CreateJobInput, type JobRecord } from '../db/repositories/jobs.js';

Expand Down Expand Up @@ -134,7 +135,7 @@ export class JobQueueService {
}

// Trigger first poll immediately
this.pollWorker(worker);
this.safePoll(worker);

// Return stop function
return () => this.stopWorker(workerId);
Expand All @@ -160,6 +161,19 @@ export class JobQueueService {
);
}

/**
* Fire-and-forget pollWorker invocation. pollWorker rejects if claimJob
* throws (a transient DB error). The immediate-start poll and the per-job
* re-poll don't await it, so without this wrapper that rejection surfaces as
* an unhandled promise rejection. (pollAll already isolates rejections via
* allSettled — this brings the other two callsites in line.)
*/
private safePoll(worker: RunningWorker): void {
this.pollWorker(worker).catch((err) => {
log.error('pollWorker error', { workerId: worker.id, error: getErrorMessage(err) });
});
}

private async pollWorker(worker: RunningWorker): Promise<void> {
if (worker.stopped || worker.polling) return;
if (worker.activeJobs.size >= worker.concurrency) return;
Expand All @@ -176,9 +190,11 @@ export class JobQueueService {
if (!job) break;

worker.activeJobs.add(job.id);
// executeJob is guaranteed not to reject (it swallows + logs its own
// errors); the finally frees the slot and re-polls via safePoll.
this.executeJob(worker, job).finally(() => {
worker.activeJobs.delete(job.id);
this.pollWorker(worker);
this.safePoll(worker);
});
}
} finally {
Expand All @@ -195,9 +211,21 @@ export class JobQueueService {
await this.repo.complete(job.id, result);
log.debug('Job completed', { ...logCtx });
} catch (err) {
const error = err instanceof Error ? err.message : String(err);
const error = getErrorMessage(err);
log.warn('Job failed', { ...logCtx, error });
await this.repo.fail(job.id, error);
// repo.fail is itself a DB write that can throw (e.g. transient
// connection loss). executeJob runs fire-and-forget from pollWorker, so
// a throw here would surface as an unhandled rejection. Swallow + log so
// executeJob never rejects and the slot is still freed by the caller's
// finally.
try {
await this.repo.fail(job.id, error);
} catch (failErr) {
log.error('Failed to persist job failure', {
...logCtx,
error: getErrorMessage(failErr),
});
}
}
}

Expand Down
Loading