1- import env from './env' ;
21import * as jobs from '@launchql/job-utils' ;
32import type { PgClientLike } from '@launchql/job-utils' ;
43import schedule from 'node-schedule' ;
54import poolManager from '@launchql/job-pg' ;
65import type { Pool , PoolClient } from 'pg' ;
7-
8- /* eslint-disable no-console */
6+ import { Logger } from '@pgpmjs/logger' ;
97
108export interface ScheduledJobRow {
119 id : number | string ;
@@ -17,6 +15,8 @@ interface SchedulerJobHandle {
1715 cancel ( ) : void ;
1816}
1917
18+ const log = new Logger ( 'jobs:scheduler' ) ;
19+
2020export default class Scheduler {
2121 idleDelay : number ;
2222 supportedTaskNames : string [ ] ;
@@ -77,10 +77,8 @@ export default class Scheduler {
7777 } : { err ?: Error ; fatalError : unknown ; jobId : ScheduledJobRow [ 'id' ] }
7878 ) {
7979 const when = err ? `after failure '${ err . message } '` : 'after success' ;
80- console . error (
81- `Failed to release job '${ jobId } ' ${ when } ; committing seppuku`
82- ) ;
83- console . error ( fatalError ) ;
80+ log . error ( `Failed to release job '${ jobId } ' ${ when } ; committing seppuku` ) ;
81+ log . error ( String ( fatalError ) ) ;
8482 await poolManager . close ( ) ;
8583 process . exit ( 1 ) ;
8684 }
@@ -92,9 +90,8 @@ export default class Scheduler {
9290 duration
9391 } : { err : Error ; job : ScheduledJobRow ; duration : string }
9492 ) {
95- console . error (
96- `scheduler: Failed to initialize scheduler for ${ job . id } (${ job . task_identifier } ) with error ${ err . message } (${ duration } ms)` ,
97- { err, stack : err . stack }
93+ log . error (
94+ `Failed to initialize scheduler for ${ job . id } (${ job . task_identifier } ) with error ${ err . message } (${ duration } ms)`
9895 ) ;
9996 const j = this . jobs [ job . id ] ;
10097 if ( j ) j . cancel ( ) ;
@@ -107,8 +104,8 @@ export default class Scheduler {
107104 client : PgClientLike ,
108105 { job, duration } : { job : ScheduledJobRow ; duration : string }
109106 ) {
110- console . log (
111- `scheduler: initialized ${ job . id } (${ job . task_identifier } ) with success (${ duration } ms)`
107+ log . info (
108+ `initialized ${ job . id } (${ job . task_identifier } ) with success (${ duration } ms)`
112109 ) ;
113110 }
114111 async scheduleJob ( client : PgClientLike , job : ScheduledJobRow ) {
@@ -120,18 +117,18 @@ export default class Scheduler {
120117
121118 if ( newjob ) {
122119 if ( newjob . id ) {
123- console . log ( `spinning up job[${ newjob . task_identifier } ]` ) ;
120+ log . info ( `spinning up job[${ newjob . task_identifier } ]` ) ;
124121 } else {
125122 // this means the scheduled_job has been deleted from db, so cancel it
126123 console . log (
127- `scheduler: attempted job[${ job . task_identifier } ] but it's probably non existent, unscheduling...`
124+ `attempted job[${ job . task_identifier } ] but it's probably non existent, unscheduling...`
128125 ) ;
129126 const scheduledJob = this . jobs [ job . id ] ;
130127 if ( scheduledJob ) scheduledJob . cancel ( ) ;
131128 }
132129 } else {
133- console . log (
134- `scheduler: job already scheduled but not yet run or complete: [${ job . task_identifier } ]`
130+ log . info (
131+ `job already scheduled but not yet run or complete: [${ job . task_identifier } ]`
135132 ) ;
136133 }
137134 } ) ;
@@ -149,7 +146,7 @@ export default class Scheduler {
149146 try {
150147 const job = await jobs . getScheduledJob < ScheduledJobRow > ( client , {
151148 workerId : this . workerId ,
152- supportedTaskNames : env . JOBS_SUPPORT_ANY
149+ supportedTaskNames : jobs . getJobSupportAny ( )
153150 ? null
154151 : this . supportedTaskNames
155152 } ) ;
@@ -189,33 +186,33 @@ export default class Scheduler {
189186 }
190187 }
191188 listen ( ) {
192- const listenForChanges = (
189+ const listenForChanges = (
193190 err : Error | null ,
194191 client : PoolClient ,
195192 release : ( ) => void
196193 ) => {
197194 if ( err ) {
198- console . error ( 'scheduler: Error connecting with notify listener' , err ) ;
195+ log . error ( 'Error connecting with notify listener' ) ;
199196 // Try again in 5 seconds
200197 // should this really be done in the node process?
201198 setTimeout ( this . listen , 5000 ) ;
202199 return ;
203200 }
204201 client . on ( 'notification' , ( ) => {
205- console . log ( 'scheduler: a NEW scheduled JOB!') ;
202+ log . info ( ' a NEW scheduled JOB!') ;
206203 if ( this . doNextTimer ) {
207204 // Must be idle, do something!
208205 this . doNext ( client ) ;
209206 }
210207 } ) ;
211208 client . query ( 'LISTEN "scheduled_jobs:insert"' ) ;
212209 client . on ( 'error' , ( e : unknown ) => {
213- console . error ( 'scheduler: Error with database notify listener' , e ) ;
210+ log . error ( 'Error with database notify listener' ) ;
214211 release ( ) ;
215212 this . listen ( ) ;
216213 } ) ;
217- console . log (
218- `scheduler: ${ this . workerId } connected and looking for scheduled jobs...`
214+ log . info (
215+ `${ this . workerId } connected and looking for scheduled jobs...`
219216 ) ;
220217 this . doNext ( client ) ;
221218 } ;
0 commit comments