1- import { AdminForthPlugin , Filters } from "adminforth" ;
2- import type { IAdminForth , IHttpServer , AdminForthResourcePages , AdminForthResourceColumn , AdminForthDataTypes , AdminForthResource , AdminUser } from "adminforth" ;
1+ import { AdminForthPlugin , Filters , Sorts } from "adminforth" ;
2+ import type { IAdminForth , IHttpServer , AdminForthResourcePages , AdminForthResourceColumn , AdminForthDataTypes , AdminForthResource , AdminUser , AdminForthComponentDeclarationFull } from "adminforth" ;
33import type { PluginOptions } from './types.js' ;
44import { afLogger } from "adminforth" ;
55import pLimit from 'p-limit' ;
66import { Level } from 'level' ;
7- import { resolve } from "node:dns" ;
8- import { set } from "@vueuse/core" ;
97
108type TaskStatus = 'SCHEDULED' | 'IN_PROGRESS' | 'DONE' | 'FAILED' ;
119type setStateFieldParams = ( state : Record < string , any > ) => void ;
1210type getStateFieldParams = ( ) => any ;
11+ type taskHandlerType = ( { setTaskStateField, getTaskStateField } : { setTaskStateField : setStateFieldParams ; getTaskStateField : getStateFieldParams } ) => Promise < void > ;
12+ type taskType = {
13+ skip ?: boolean ;
14+ state : Record < string , any > ;
15+ }
1316
1417export default class extends AdminForthPlugin {
1518 options : PluginOptions ;
19+ private taskHandlers : Record < string , taskHandlerType > = { } ;
20+ private jobCustomComponents : Record < string , AdminForthComponentDeclarationFull > = { } ;
21+ private jobParallelLimits : Record < string , number > = { } ;
1622
1723 constructor ( options : PluginOptions ) {
1824 super ( options , import . meta. url ) ;
@@ -90,25 +96,42 @@ export default class extends AdminForthPlugin {
9096 return Promise . resolve ( null ) ;
9197 }
9298
93-
99+ public registerTaskHandler (
100+ jobHandlerName : string ,
101+ handler : taskHandlerType ,
102+ customComponent ?: AdminForthComponentDeclarationFull ,
103+ parrallelLimit : number = 3 ,
104+ ) {
105+ //register the handler in a map with jobHandlerName as key and handler as value
106+ this . taskHandlers [ jobHandlerName ] = handler ;
107+ this . jobParallelLimits [ jobHandlerName ] = parrallelLimit ;
108+ if ( customComponent ) {
109+ this . jobCustomComponents [ jobHandlerName ] = customComponent ;
110+ }
111+ }
94112
95113 public async startNewJob (
96114 jobName : string ,
97115 adminUser : AdminUser ,
98- tasks : { state : Record < string , any > } [ ] ,
99- parrallelLimit : number = 3 ,
116+ tasks : taskType [ ] ,
100117 initialFields : Record < string , any > = { } ,
101- handleTask : ( { setTaskStateField, getTaskStateField } : { setTaskStateField : setStateFieldParams ; getTaskStateField : getStateFieldParams } ) => Promise < void > ,
102- pathToComponentToRenderState ?: string
118+ jobHandlerName : string ,
103119 ) {
104120
121+ const handleTask : taskHandlerType = this . taskHandlers [ jobHandlerName ] ;
122+ if ( ! handleTask ) {
123+ throw new Error ( `No handler registered for jobHandler ${ jobHandlerName } . Please register a handler using the registerTaskHandler method before starting a job with this jobHandler.` ) ;
124+ }
125+ const customComponent = this . jobCustomComponents [ jobHandlerName ] ;
126+ const parrallelLimit = this . jobParallelLimits [ jobHandlerName ] || 3 ;
105127 //create a record for the job in the database with status in progress
106128 const objectToSave = {
107129 [ this . options . nameField ] : jobName ,
108130 [ this . options . startedByField ] : adminUser . pk ,
109131 [ this . options . stateField ] : JSON . stringify ( initialFields ) ,
110132 [ this . options . progressField ] : 0 ,
111133 [ this . options . statusField ] : 'IN_PROGRESS' ,
134+ [ this . options . jobHandlerField ] : jobHandlerName ,
112135 }
113136
114137 const creationResult = await this . adminforth . resource ( this . getResourceId ( ) ) . create ( objectToSave ) ;
@@ -126,6 +149,7 @@ export default class extends AdminForthPlugin {
126149 name : jobName ,
127150 progress : 0 ,
128151 createdAt : createdRecord [ this . options . createdAtField ] ,
152+ customComponent,
129153 } ) ;
130154
131155 //create a level db instance for the job with name as jobId
@@ -138,18 +162,30 @@ export default class extends AdminForthPlugin {
138162
139163 await Promise . all ( createTaskRecordsPromises ) ;
140164
165+ this . runProcessingTasks ( tasks , jobLevelDb , jobId , handleTask , parrallelLimit ) ;
166+ }
141167
168+ private async runProcessingTasks (
169+ tasks : taskType [ ] ,
170+ jobLevelDb : Level ,
171+ jobId : string ,
172+ handleTask : taskHandlerType ,
173+ parrallelLimit : number ,
174+ ) {
142175 const totalTasks = tasks . length ;
143176 let completedTasks = 0 ;
144177 let failedTasks = 0 ;
145178 let lastJobStatus = 'IN_PROGRESS' ;
146179
147- const taskHandler = async ( taskIndex : number , taskState ) => {
180+ const taskHandler = async ( taskIndex : number , task ) => {
181+ if ( task . skip ) {
182+ completedTasks = await this . handleFinishTask ( completedTasks , totalTasks , jobId , true ) ;
183+ return ;
184+ }
148185 if ( lastJobStatus === 'CANCELLED' ) {
149186 afLogger . info ( `Job ${ jobId } was cancelled. Skipping task ${ taskIndex } .` ) ;
150187 return ;
151188 }
152-
153189 const currentJobRecord = await this . adminforth . resource ( this . getResourceId ( ) ) . get ( Filters . EQ ( this . getResourcePk ( ) , jobId ) ) ;
154190 const currentJobStatus = currentJobRecord [ this . options . statusField ] ;
155191
@@ -182,18 +218,13 @@ export default class extends AdminForthPlugin {
182218 return ;
183219 } finally {
184220 //Update progress
185- completedTasks ++ ;
186- const progress = Math . round ( ( completedTasks / totalTasks ) * 100 ) ;
187- await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
188- [ this . options . progressField ] : progress ,
189- } )
190- this . adminforth . websocket . publish ( '/background-jobs' , { jobId, progress } ) ;
221+ completedTasks = await this . handleFinishTask ( completedTasks , totalTasks , jobId ) ;
191222 }
192223 }
193224
194225 const limit = pLimit ( parrallelLimit ) ;
195226 const tasksToExecute = tasks . map ( ( task , taskIndex ) => {
196- return limit ( ( ) => taskHandler ( taskIndex , task . state ) ) ;
227+ return limit ( ( ) => taskHandler ( taskIndex , task ) ) ;
197228 } ) ;
198229
199230 await Promise . all ( tasksToExecute ) ;
@@ -210,11 +241,55 @@ export default class extends AdminForthPlugin {
210241 }
211242 }
212243
244+
245+ private async handleFinishTask ( completedTasks : number , totalTasks : number , jobId : string , wasTaskSkipped : boolean = false ) {
246+ completedTasks ++ ;
247+ if ( wasTaskSkipped ) {
248+ return completedTasks ;
249+ }
250+ const progress = Math . round ( ( completedTasks / totalTasks ) * 100 ) ;
251+ await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
252+ [ this . options . progressField ] : progress ,
253+ } )
254+ this . adminforth . websocket . publish ( '/background-jobs' , { jobId, progress } ) ;
255+ return completedTasks ;
256+ }
213257 private async runProcessingUnfinishedTasks (
214- tasks : { state : Record < string , any > } [ ] ,
215- parrallelLimit : number ,
216- handleTask : ( { setTaskStateField, getTaskStateField } : { setTaskStateField : setStateFieldParams ; getTaskStateField : getStateFieldParams } ) => Promise < void > ,
258+ job : Record < string , any >
217259 ) {
260+ const levelDbPath = `${ this . options . levelDbPath || './background-jobs-dbs/' } job_${ job [ this . getResourcePk ( ) ] } ` ;
261+ const jobLevelDb = new Level ( levelDbPath , { valueEncoding : 'json' } ) ;
262+ const jobHandlerName = job [ this . options . jobHandlerField ] ;
263+ const handleTask : taskHandlerType = this . taskHandlers [ jobHandlerName ] ;
264+ if ( ! handleTask ) {
265+ afLogger . error ( `No handler registered for jobHandler ${ jobHandlerName } . Cannot process unfinished tasks for job ${ job [ this . getResourcePk ( ) ] } .` ) ;
266+ return ;
267+ }
268+ const parrallelLimit = this . jobParallelLimits [ jobHandlerName ] || 3 ;
269+
270+ const unfinishedTasks : taskType [ ] = [ ] ;
271+ let taskIndex = 0 ;
272+ while ( true ) {
273+ const taskData = await jobLevelDb . get ( taskIndex . toString ( ) ) ;
274+ if ( ! taskData ) {
275+ break ;
276+ }
277+ let parsedTaskData : { state : Record < string , any > , status : TaskStatus } ;
278+ try {
279+ parsedTaskData = JSON . parse ( taskData ) ;
280+ } catch ( error ) {
281+ afLogger . error ( `Error parsing task data for task ${ taskIndex } of job ${ job [ this . getResourcePk ( ) ] } : ${ error } ` ) ;
282+ taskIndex ++ ;
283+ continue ;
284+ }
285+ if ( parsedTaskData . status === 'IN_PROGRESS' || parsedTaskData . status === 'SCHEDULED' ) {
286+ unfinishedTasks . push ( { state : parsedTaskData . state } ) ;
287+ } else {
288+ unfinishedTasks . push ( { state : parsedTaskData . state , skip : true } ) ;
289+ }
290+ taskIndex ++ ;
291+ }
292+ await this . runProcessingTasks ( unfinishedTasks , jobLevelDb , job [ this . getResourcePk ( ) ] , handleTask , parrallelLimit ) ;
218293
219294 }
220295
@@ -244,8 +319,11 @@ export default class extends AdminForthPlugin {
244319 private async processAllUnfinishedJobs ( ) {
245320 const resourceId = this . getResourceId ( ) ;
246321 const unprocessedJobs = await this . adminforth . resource ( resourceId ) . list ( Filters . EQ ( this . options . statusField , 'IN_PROGRESS' ) ) ;
247-
248- console . log ( 'Unprocessed jobs found on startup:' , unprocessedJobs ) ;
322+ for ( const job of unprocessedJobs ) {
323+ const jobName = job [ this . options . nameField ] ;
324+ afLogger . info ( `Processing unfinished job with name ${ jobName } on startup.` ) ;
325+ this . runProcessingUnfinishedTasks ( job ) ;
326+ }
249327 }
250328
251329
@@ -260,8 +338,8 @@ export default class extends AdminForthPlugin {
260338
261339
262340 //Add temp delay to make sure, that all resources active. Probably should be fixed
263- // await new Promise(resolve => setTimeout(resolve, 1000));
264- // this.processAllUnfinishedJobs();
341+ await new Promise ( resolve => setTimeout ( resolve , 1000 ) ) ;
342+ this . processAllUnfinishedJobs ( ) ;
265343 }
266344
267345 instanceUniqueRepresentation ( pluginOptions : any ) : string {
@@ -276,7 +354,7 @@ export default class extends AdminForthPlugin {
276354 const user = adminUser ;
277355 const startedByField = this . options . startedByField ;
278356 const resourcePk = this . getResourcePk ( ) ;
279- const listOfJobs = await this . adminforth . resource ( this . resourceConfig . resourceId ) . list ( Filters . EQ ( startedByField , user . pk ) ) ;
357+ const listOfJobs = await this . adminforth . resource ( this . resourceConfig . resourceId ) . list ( Filters . EQ ( startedByField , user . pk ) , 100 , 0 , Sorts . DESC ( this . options . createdAtField ) ) ;
280358
281359 const jobsToReturn = listOfJobs . map ( job => {
282360 return {
@@ -285,6 +363,7 @@ export default class extends AdminForthPlugin {
285363 createdAt : job [ this . options . createdAtField ] ,
286364 status : job [ this . options . statusField ] ,
287365 progress : job [ this . options . progressField ] ,
366+ customComponent : this . jobCustomComponents [ job [ this . options . jobHandlerField ] ] ,
288367 }
289368 } ) ;
290369 return { jobs : jobsToReturn } ;
@@ -296,6 +375,11 @@ export default class extends AdminForthPlugin {
296375 path : `/plugin/${ this . pluginInstanceId } /cancel-job` ,
297376 handler : async ( { body } ) => {
298377 const jobId = body . jobId ;
378+ const currentJob = await this . adminforth . resource ( this . getResourceId ( ) ) . get ( Filters . EQ ( this . getResourcePk ( ) , jobId ) ) ;
379+ const oldStatus = currentJob [ this . options . statusField ] ;
380+ if ( oldStatus === 'DONE' || oldStatus === 'DONE_WITH_ERRORS' || oldStatus === 'CANCELLED' ) {
381+ return { ok : false , message : `Cannot cancel a job with status ${ oldStatus } .` } ;
382+ }
299383 try {
300384 await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
301385 [ this . options . statusField ] : 'CANCELLED' ,
@@ -306,7 +390,7 @@ export default class extends AdminForthPlugin {
306390 } ) ;
307391 return { ok : true } ;
308392 } catch ( error ) {
309- return { ok : false }
393+ return { ok : false , message : `Failed to cancel job with id ${ jobId } .` } ;
310394 }
311395 }
312396 } ) ;
0 commit comments