@@ -7,7 +7,7 @@ import { Level } from 'level';
77import { resolve } from "node:dns" ;
88import { set } from "@vueuse/core" ;
99
10- type JobStatus = 'SCHEDULED' | 'IN_PROGRESS' | 'DONE' | 'FAILED' ;
10+ type TaskStatus = 'SCHEDULED' | 'IN_PROGRESS' | 'DONE' | 'FAILED' ;
1111type setStateFieldParams = ( state : Record < string , any > ) => void ;
1212type getStateFieldParams = ( ) => any ;
1313
@@ -65,7 +65,7 @@ export default class extends AdminForthPlugin {
6565 await levelDb . put ( taskId , JSON . stringify ( { state, status } ) ) ;
6666 }
6767
68- private async setLevelDbTaskStatusField ( levelDb : Level , taskId : string , status : JobStatus ) {
68+ private async setLevelDbTaskStatusField ( levelDb : Level , taskId : string , status : TaskStatus ) {
6969 const state = await this . getLevelDbTaskStateField ( levelDb , taskId ) ;
7070 await levelDb . del ( taskId ) ;
7171 await levelDb . put ( taskId , JSON . stringify ( { state, status } ) ) ;
@@ -81,7 +81,7 @@ export default class extends AdminForthPlugin {
8181 return Promise . resolve ( null ) ;
8282 }
8383
84- private async getLevelDbTaskStatusField ( levelDb : Level , taskId : string ) : Promise < JobStatus > {
84+ private async getLevelDbTaskStatusField ( levelDb : Level , taskId : string ) : Promise < TaskStatus > {
8585 const state = await levelDb . get ( taskId ) ;
8686 if ( state ) {
8787 const parsedState = JSON . parse ( state ) ;
@@ -141,8 +141,23 @@ export default class extends AdminForthPlugin {
141141
142142 const totalTasks = tasks . length ;
143143 let completedTasks = 0 ;
144+ let failedTasks = 0 ;
145+ let lastJobStatus = 'IN_PROGRESS' ;
144146
145147 const taskHandler = async ( taskIndex : number , taskState ) => {
148+ if ( lastJobStatus === 'CANCELLED' ) {
149+ afLogger . info ( `Job ${ jobId } was cancelled. Skipping task ${ taskIndex } .` ) ;
150+ return ;
151+ }
152+
153+ const currentJobRecord = await this . adminforth . resource ( this . getResourceId ( ) ) . get ( Filters . EQ ( this . getResourcePk ( ) , jobId ) ) ;
154+ const currentJobStatus = currentJobRecord [ this . options . statusField ] ;
155+
156+ if ( currentJobStatus === 'CANCELLED' ) {
157+ lastJobStatus = 'CANCELLED' ;
158+ afLogger . info ( `Job ${ jobId } was cancelled. Skipping task ${ taskIndex } .` ) ;
159+ return ;
160+ }
146161
147162 //define the setTaskStateField and getTaskStateField functions to pass to the task
148163 const setTaskStateField = async ( state : Record < string , any > ) => {
@@ -163,6 +178,7 @@ export default class extends AdminForthPlugin {
163178 } catch ( error ) {
164179 afLogger . error ( `Error in handling task ${ taskIndex } of job ${ jobId } : ${ error } ` , ) ;
165180 await this . setLevelDbTaskStatusField ( jobLevelDb , taskIndex . toString ( ) , 'FAILED' ) ;
181+ failedTasks ++ ;
166182 return ;
167183 } finally {
168184 //Update progress
@@ -181,11 +197,17 @@ export default class extends AdminForthPlugin {
181197 } ) ;
182198
183199 await Promise . all ( tasksToExecute ) ;
184-
185- await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
186- [ this . options . statusField ] : 'DONE' ,
187- } )
188- this . adminforth . websocket . publish ( '/background-jobs' , { jobId, status : 'DONE' } ) ;
200+ if ( lastJobStatus !== 'CANCELLED' && failedTasks === 0 ) {
201+ await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
202+ [ this . options . statusField ] : 'DONE' ,
203+ } )
204+ this . adminforth . websocket . publish ( '/background-jobs' , { jobId, status : 'DONE' } ) ;
205+ } else if ( failedTasks > 0 ) {
206+ await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
207+ [ this . options . statusField ] : 'DONE_WITH_ERRORS' ,
208+ } )
209+ this . adminforth . websocket . publish ( '/background-jobs' , { jobId, status : 'DONE_WITH_ERRORS' } ) ;
210+ }
189211 }
190212
191213 private async runProcessingUnfinishedTasks (
@@ -268,6 +290,26 @@ export default class extends AdminForthPlugin {
268290 return { jobs : jobsToReturn } ;
269291 }
270292 } ) ;
293+
294+ server . endpoint ( {
295+ method : 'POST' ,
296+ path : `/plugin/${ this . pluginInstanceId } /cancel-job` ,
297+ handler : async ( { body } ) => {
298+ const jobId = body . jobId ;
299+ try {
300+ await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
301+ [ this . options . statusField ] : 'CANCELLED' ,
302+ } ) ;
303+ this . adminforth . websocket . publish ( '/background-jobs' , {
304+ jobId,
305+ status : 'CANCELLED' ,
306+ } ) ;
307+ return { ok : true } ;
308+ } catch ( error ) {
309+ return { ok : false }
310+ }
311+ }
312+ } ) ;
271313 }
272314
273315}
0 commit comments