Skip to content

Commit f167d8a

Browse files
committed
feat: add updateJobFieldsAtomicly method
1 parent 8661b3c commit f167d8a

File tree

1 file changed

+35
-3
lines changed

1 file changed

+35
-3
lines changed

index.ts

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ import { afLogger } from "adminforth";
55
import pLimit from 'p-limit';
66
import { Level } from 'level';
77
import fs from 'fs/promises';
8-
import {Mutex, MutexInterface, Semaphore, SemaphoreInterface, withTimeout} from 'async-mutex';
9-
10-
const mutex = new Mutex();
8+
import { Mutex } from 'async-mutex';
119

1210
type TaskStatus = 'SCHEDULED' | 'IN_PROGRESS' | 'DONE' | 'FAILED';
1311
type setStateFieldParams = (state: Record<string, any>) => void;
@@ -24,6 +22,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
2422
private jobCustomComponents: Record<string, AdminForthComponentDeclarationFull> = {};
2523
private jobParallelLimits: Record<string, number> = {};
2624
private levelDbInstances: Record<string, Level> = {};
25+
private jobStateMutexes: Record<string, Mutex> = {};
2726

2827
constructor(options: PluginOptions) {
2928
super(options, import.meta.url);
@@ -73,6 +72,9 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
7372
delete this.levelDbInstances[recordId];
7473
}
7574

75+
// cleanup per-job mutex as well
76+
delete this.jobStateMutexes[recordId];
77+
7678
//delete level db folder for the job
7779
await fs.rm(levelDbPath, {
7880
recursive: true,
@@ -83,6 +85,13 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
8385
})
8486
}
8587

88+
private cleanupJobMutexIfTerminalStatus(jobId: string, status: string) {
89+
// Keep mutex while job is active to preserve atomicity between concurrent tasks.
90+
if (status === 'DONE' || status === 'DONE_WITH_ERRORS' || status === 'CANCELLED') {
91+
delete this.jobStateMutexes[jobId];
92+
}
93+
}
94+
8695
private checkIfFieldInResource(resourceConfig: AdminForthResource, fieldName: string, fieldString?: string) {
8796
if (!fieldName) {
8897
throw new Error(`Field name for ${fieldString} is not provided. Please check your plugin options.`);
@@ -279,12 +288,14 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
279288
[this.options.finishedAtField]: (new Date()).toISOString(),
280289
})
281290
this.adminforth.websocket.publish('/background-jobs', { jobId, status: 'DONE', finishedAt: (new Date()).toISOString() });
291+
this.cleanupJobMutexIfTerminalStatus(jobId, 'DONE');
282292
} else if (failedTasks > 0) {
283293
await this.adminforth.resource(this.getResourceId()).update(jobId, {
284294
[this.options.statusField]: 'DONE_WITH_ERRORS',
285295
[this.options.finishedAtField]: (new Date()).toISOString(),
286296
})
287297
this.adminforth.websocket.publish('/background-jobs', { jobId, status: 'DONE_WITH_ERRORS' });
298+
this.cleanupJobMutexIfTerminalStatus(jobId, 'DONE_WITH_ERRORS');
288299
}
289300
}
290301

@@ -372,6 +383,27 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
372383
return JSON.parse(state);
373384
}
374385

386+
public async updateJobFieldsAtomicly(jobId: string, updateFunction: () => Promise<void>) {
387+
if (!jobId) {
388+
throw new Error('updateJobFieldsAtomicly: jobId is required');
389+
}
390+
if (typeof updateFunction !== 'function') {
391+
throw new Error('updateJobFieldsAtomicly: updateFunction must be a function');
392+
}
393+
394+
// Ensure updates are atomic per jobId.
395+
// Different jobs are not blocked by each other.
396+
let mutex = this.jobStateMutexes[jobId];
397+
if (!mutex) {
398+
mutex = new Mutex();
399+
this.jobStateMutexes[jobId] = mutex;
400+
}
401+
402+
return mutex.runExclusive(async () => {
403+
await updateFunction();
404+
});
405+
}
406+
375407
private async processAllUnfinishedJobs() {
376408
const resourceId = this.getResourceId();
377409
const unprocessedJobs = await this.adminforth.resource(resourceId).list(Filters.EQ(this.options.statusField, 'IN_PROGRESS'));

0 commit comments

Comments
 (0)