@@ -4,7 +4,10 @@ import type { PluginOptions } from './types.js';
44import { afLogger } from "adminforth" ;
55import pLimit from 'p-limit' ;
66import { Level } from 'level' ;
7+ import { resolve } from "node:dns" ;
8+ import { set } from "@vueuse/core" ;
79
10+ type JobStatus = 'SCHEDULED' | 'IN_PROGRESS' | 'DONE' | 'FAILED' ;
811type setStateFieldParams = ( state : Record < string , any > ) => void ;
912type getStateFieldParams = ( ) => any ;
1013
@@ -51,8 +54,8 @@ export default class extends AdminForthPlugin {
5154 }
5255
5356 private async createLevelDbTaskRecord ( levelDb : Level , taskId : string , initialState : Record < string , any > ) {
54- //create record in level db with task id as key and initial state as value and status IN_PROGRESS
55- await levelDb . put ( taskId , JSON . stringify ( { state : initialState , status : 'IN_PROGRESS ' } ) ) ;
57+ //create record in level db with task id as key and initial state as value and status SCHEDULED
58+ await levelDb . put ( taskId , JSON . stringify ( { state : initialState , status : 'SCHEDULED ' } ) ) ;
5659 }
5760
5861 private async setLevelDbTaskStateField ( levelDb : Level , taskId : string , state : Record < string , any > ) {
@@ -62,7 +65,7 @@ export default class extends AdminForthPlugin {
6265 await levelDb . put ( taskId , JSON . stringify ( { state, status } ) ) ;
6366 }
6467
65- private async setLevelDbTaskStatusField ( levelDb : Level , taskId : string , status : string ) {
68+ private async setLevelDbTaskStatusField ( levelDb : Level , taskId : string , status : JobStatus ) {
6669 const state = await this . getLevelDbTaskStateField ( levelDb , taskId ) ;
6770 await levelDb . del ( taskId ) ;
6871 await levelDb . put ( taskId , JSON . stringify ( { state, status } ) ) ;
@@ -78,7 +81,7 @@ export default class extends AdminForthPlugin {
7881 return Promise . resolve ( null ) ;
7982 }
8083
81- private async getLevelDbTaskStatusField ( levelDb : Level , taskId : string ) : Promise < Record < string , any > > {
84+ private async getLevelDbTaskStatusField ( levelDb : Level , taskId : string ) : Promise < JobStatus > {
8285 const state = await levelDb . get ( taskId ) ;
8386 if ( state ) {
8487 const parsedState = JSON . parse ( state ) ;
@@ -125,17 +128,21 @@ export default class extends AdminForthPlugin {
125128 createdAt : createdRecord [ this . options . createdAtField ] ,
126129 } ) ;
127130
128-
129131 //create a level db instance for the job with name as jobId
130132 const jobLevelDb = new Level ( `${ this . options . levelDbPath || './background-jobs-dbs/' } job_${ jobId } ` , { valueEncoding : 'json' } ) ;
131133
134+ const limit2 = pLimit ( parrallelLimit ) ;
135+ const createTaskRecordsPromises = tasks . map ( ( task , index ) => {
136+ return limit2 ( ( ) => this . createLevelDbTaskRecord ( jobLevelDb , index . toString ( ) , task . state ) ) ;
137+ } ) ;
138+
139+ await Promise . all ( createTaskRecordsPromises ) ;
140+
132141
133142 const totalTasks = tasks . length ;
134143 let completedTasks = 0 ;
135144
136145 const taskHandler = async ( taskIndex : number , taskState ) => {
137- // create a level db record for the task with status in progress
138- await this . createLevelDbTaskRecord ( jobLevelDb , taskIndex . toString ( ) , taskState ) ;
139146
140147 //define the setTaskStateField and getTaskStateField functions to pass to the task
141148 const setTaskStateField = async ( state : Record < string , any > ) => {
@@ -145,12 +152,14 @@ export default class extends AdminForthPlugin {
145152 return await this . getLevelDbTaskStateField ( jobLevelDb , taskIndex . toString ( ) ) ;
146153 }
147154
155+ await this . setLevelDbTaskStatusField ( jobLevelDb , taskIndex . toString ( ) , 'IN_PROGRESS' ) ;
156+
148157 //handling the task
149158 try {
150159 await handleTask ( { setTaskStateField, getTaskStateField } ) ;
151160
152161 //Set task status to completed in level db
153- await this . setLevelDbTaskStatusField ( jobLevelDb , taskIndex . toString ( ) , 'COMPLETED ' ) ;
162+ await this . setLevelDbTaskStatusField ( jobLevelDb , taskIndex . toString ( ) , 'DONE ' ) ;
154163 } catch ( error ) {
155164 afLogger . error ( `Error in handling task ${ taskIndex } of job ${ jobId } : ${ error } ` , ) ;
156165 await this . setLevelDbTaskStatusField ( jobLevelDb , taskIndex . toString ( ) , 'FAILED' ) ;
@@ -177,29 +186,60 @@ export default class extends AdminForthPlugin {
177186 [ this . options . statusField ] : 'DONE' ,
178187 } )
179188 this . adminforth . websocket . publish ( '/background-jobs' , { jobId, status : 'DONE' } ) ;
189+ }
190+
191+ private async runProcessingUnfinishedTasks (
192+ tasks : { state : Record < string , any > } [ ] ,
193+ parrallelLimit : number ,
194+ handleTask : ( { setTaskStateField, getTaskStateField } : { setTaskStateField : setStateFieldParams ; getTaskStateField : getStateFieldParams } ) => Promise < void > ,
195+ ) {
180196
181197 }
182198
183199 public async setJobField ( jobId : string , key : string , value : any ) {
184-
200+ const jobRecord = await this . adminforth . resource ( this . getResourceId ( ) ) . get ( Filters . EQ ( this . getResourcePk ( ) , jobId ) ) ;
201+ const state = jobRecord [ this . options . stateField ] ;
202+ const parsedState = JSON . parse ( state ) ;
203+ parsedState [ key ] = value ;
204+ await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
205+ [ this . options . stateField ] : JSON . stringify ( parsedState ) ,
206+ } ) ;
185207 }
186208
187209 public async getJobField ( jobId : string , key : string ) {
188-
210+ const jobRecord = await this . adminforth . resource ( this . getResourceId ( ) ) . get ( Filters . EQ ( this . getResourcePk ( ) , jobId ) ) ;
211+ const state = jobRecord [ this . options . stateField ] ;
212+ const parsedState = JSON . parse ( state ) ;
213+ return parsedState [ key ] ;
189214 }
190215
191216 public async getJobState ( jobId : string ) {
217+ const jobRecord = await this . adminforth . resource ( this . getResourceId ( ) ) . get ( Filters . EQ ( this . getResourcePk ( ) , jobId ) ) ;
218+ const state = jobRecord [ this . options . stateField ] ;
219+ return JSON . parse ( state ) ;
220+ }
192221
222+ private async processAllUnfinishedJobs ( ) {
223+ const resourceId = this . getResourceId ( ) ;
224+ const unprocessedJobs = await this . adminforth . resource ( resourceId ) . list ( Filters . EQ ( this . options . statusField , 'IN_PROGRESS' ) ) ;
225+
226+ console . log ( 'Unprocessed jobs found on startup:' , unprocessedJobs ) ;
193227 }
228+
194229
195- validateConfigAfterDiscover ( adminforth : IAdminForth , resourceConfig : AdminForthResource ) {
230+ async validateConfigAfterDiscover ( adminforth : IAdminForth , resourceConfig : AdminForthResource ) {
196231 // optional method where you can safely check field types after database discovery was performed
197232 this . checkIfFieldInResource ( resourceConfig , this . options . createdAtField , 'createdAtField' ) ;
198233 this . checkIfFieldInResource ( resourceConfig , this . options . startedByField , 'startedByField' ) ;
199234 this . checkIfFieldInResource ( resourceConfig , this . options . stateField , 'stateField' ) ;
200235 this . checkIfFieldInResource ( resourceConfig , this . options . progressField , 'progressField' ) ;
201236 this . checkIfFieldInResource ( resourceConfig , this . options . statusField , 'statusField' ) ;
202237 this . checkIfFieldInResource ( resourceConfig , this . options . nameField , 'nameField' ) ;
238+
239+
240+ //Add temp delay to make sure, that all resources active. Probably should be fixed
241+ // await new Promise(resolve => setTimeout(resolve, 1000));
242+ // this.processAllUnfinishedJobs();
203243 }
204244
205245 instanceUniqueRepresentation ( pluginOptions : any ) : string {
0 commit comments