@@ -19,6 +19,7 @@ export default class extends AdminForthPlugin {
1919 private taskHandlers : Record < string , taskHandlerType > = { } ;
2020 private jobCustomComponents : Record < string , AdminForthComponentDeclarationFull > = { } ;
2121 private jobParallelLimits : Record < string , number > = { } ;
22+ private levelDbInstances : Record < string , Level > = { } ;
2223
2324 constructor ( options : PluginOptions ) {
2425 super ( options , import . meta. url ) ;
@@ -154,6 +155,7 @@ export default class extends AdminForthPlugin {
154155
155156 //create a level db instance for the job with name as jobId
156157 const jobLevelDb = new Level ( `${ this . options . levelDbPath || './background-jobs-dbs/' } job_${ jobId } ` , { valueEncoding : 'json' } ) ;
158+ this . levelDbInstances [ jobId ] = jobLevelDb ;
157159
158160 const limit2 = pLimit ( parrallelLimit ) ;
159161 const createTaskRecordsPromises = tasks . map ( ( task , index ) => {
@@ -186,11 +188,10 @@ export default class extends AdminForthPlugin {
186188 afLogger . info ( `Job ${ jobId } was cancelled. Skipping task ${ taskIndex } .` ) ;
187189 return ;
188190 }
189- const currentJobRecord = await this . adminforth . resource ( this . getResourceId ( ) ) . get ( Filters . EQ ( this . getResourcePk ( ) , jobId ) ) ;
190- const currentJobStatus = currentJobRecord [ this . options . statusField ] ;
191+ const currentJobStatus = await this . getLastJobStatus ( jobId ) ;
191192
192193 if ( currentJobStatus === 'CANCELLED' ) {
193- lastJobStatus = 'CANCELLED' ;
194+ lastJobStatus = currentJobStatus ;
194195 afLogger . info ( `Job ${ jobId } was cancelled. Skipping task ${ taskIndex } .` ) ;
195196 return ;
196197 }
@@ -204,21 +205,31 @@ export default class extends AdminForthPlugin {
204205 }
205206
206207 await this . setLevelDbTaskStatusField ( jobLevelDb , taskIndex . toString ( ) , 'IN_PROGRESS' ) ;
208+ this . adminforth . websocket . publish ( `/background-jobs-task-update/${ jobId } ` , { taskIndex, status : "IN_PROGRESS" } ) ;
207209
208210 //handling the task
209211 try {
210212 await handleTask ( { setTaskStateField, getTaskStateField } ) ;
211213
212214 //Set task status to completed in level db
213215 await this . setLevelDbTaskStatusField ( jobLevelDb , taskIndex . toString ( ) , 'DONE' ) ;
216+ this . adminforth . websocket . publish ( `/background-jobs-task-update/${ jobId } ` , { taskIndex, status : "DONE" } ) ;
214217 } catch ( error ) {
215218 afLogger . error ( `Error in handling task ${ taskIndex } of job ${ jobId } : ${ error } ` , ) ;
216219 await this . setLevelDbTaskStatusField ( jobLevelDb , taskIndex . toString ( ) , 'FAILED' ) ;
220+ this . adminforth . websocket . publish ( `/background-jobs-task-update/${ jobId } ` , { taskIndex, status : "FAILED" } ) ;
217221 failedTasks ++ ;
218222 return ;
219223 } finally {
220224 //Update progress
221- completedTasks = await this . handleFinishTask ( completedTasks , totalTasks , jobId ) ;
225+ const currentJobStatus = await this . getLastJobStatus ( jobId ) ;
226+ if ( currentJobStatus === 'CANCELLED' ) {
227+ lastJobStatus = currentJobStatus ;
228+ afLogger . debug ( `Job ${ jobId } was cancelled during processing of task ${ taskIndex } . Progress will not be updated.` ) ;
229+ return ;
230+ }
231+
232+ completedTasks = await this . handleFinishTask ( completedTasks , totalTasks , jobId ) ;
222233 }
223234 }
224235
@@ -241,6 +252,11 @@ export default class extends AdminForthPlugin {
241252 }
242253 }
243254
255+ private async getLastJobStatus ( jobId : string ) : Promise < string > {
256+ const currentJobRecord = await this . adminforth . resource ( this . getResourceId ( ) ) . get ( Filters . EQ ( this . getResourcePk ( ) , jobId ) ) ;
257+ const currentJobStatus = currentJobRecord [ this . options . statusField ] ;
258+ return currentJobStatus ;
259+ }
244260
245261 private async handleFinishTask ( completedTasks : number , totalTasks : number , jobId : string , wasTaskSkipped : boolean = false ) {
246262 completedTasks ++ ;
@@ -254,11 +270,14 @@ export default class extends AdminForthPlugin {
254270 this . adminforth . websocket . publish ( '/background-jobs' , { jobId, progress } ) ;
255271 return completedTasks ;
256272 }
273+
274+
257275 private async runProcessingUnfinishedTasks (
258276 job : Record < string , any >
259277 ) {
260278 const levelDbPath = `${ this . options . levelDbPath || './background-jobs-dbs/' } job_${ job [ this . getResourcePk ( ) ] } ` ;
261279 const jobLevelDb = new Level ( levelDbPath , { valueEncoding : 'json' } ) ;
280+ this . levelDbInstances [ job [ this . getResourcePk ( ) ] ] = jobLevelDb ;
262281 const jobHandlerName = job [ this . options . jobHandlerField ] ;
263282 const handleTask : taskHandlerType = this . taskHandlers [ jobHandlerName ] ;
264283 if ( ! handleTask ) {
@@ -394,6 +413,47 @@ export default class extends AdminForthPlugin {
394413 }
395414 }
396415 } ) ;
416+
417+ server . endpoint ( {
418+ method : 'POST' ,
419+ path : `/plugin/${ this . pluginInstanceId } /get-tasks` ,
420+ handler : async ( { body } ) => {
421+ const { jobId, limit, offset } = body ;
422+ const levelDbPath = `${ this . options . levelDbPath || './background-jobs-dbs/' } job_${ jobId } ` ;
423+ let jobLevelDb : Level ;
424+ if ( this . levelDbInstances [ jobId ] ) {
425+ jobLevelDb = this . levelDbInstances [ jobId ] ;
426+ } else {
427+ try {
428+ jobLevelDb = new Level ( levelDbPath , { valueEncoding : 'json' } ) ;
429+ } catch ( error ) {
430+ return { ok : false , message : `Failed to access tasks for job with id ${ jobId } .` } ;
431+ }
432+ }
433+ const tasks = [ ] ;
434+ let taskIndex = 0 + offset ;
435+ while ( true ) {
436+ if ( limit && tasks . length >= limit ) {
437+ break ;
438+ }
439+ const taskData = await jobLevelDb . get ( taskIndex . toString ( ) ) ;
440+ if ( ! taskData ) {
441+ break ;
442+ }
443+ let parsedTaskData : { state : Record < string , any > , status : TaskStatus } ;
444+ try {
445+ parsedTaskData = JSON . parse ( taskData ) ;
446+ } catch ( error ) {
447+ afLogger . error ( `Error parsing task data for task ${ taskIndex } of job ${ jobId } : ${ error } ` ) ;
448+ taskIndex ++ ;
449+ continue ;
450+ }
451+ tasks . push ( parsedTaskData ) ;
452+ taskIndex ++ ;
453+ }
454+ return { ok : true , tasks } ;
455+ }
456+ } ) ;
397457 }
398458
399459}
0 commit comments