@@ -9,6 +9,13 @@ const ORPHAN_TRACK_TTL_MS = 12 * 60 * 60 * 1000;
99const LOG_STATE_TTL_MS = 12 * 60 * 60 * 1000 ;
1010const DEFAULT_RETRY_DELAY_MS = 30 * 1000 ;
1111const DEFAULT_MAX_RETRY_DELAY_MS = 30 * 60 * 1000 ;
12+ const DAY_MS = 24 * 60 * 60 * 1000 ;
13+ const DEFAULT_STALE_PENDING_BLOCK_AGE_MS = 30 * DAY_MS ;
14+ const DEFAULT_STALE_PENDING_BLOCK_ALERT_CHECK_MS = DAY_MS ;
15+ const DEFAULT_STALE_PENDING_BLOCK_ALERT_COOLDOWN_MS = DAY_MS ;
16+ const DEFAULT_STALE_PENDING_BLOCK_SAMPLE_LIMIT = 20 ;
17+ const DEFAULT_STALE_PENDING_BLOCK_PROCESS_LIMIT = 5 ;
18+ const DEFAULT_STALE_PENDING_BLOCK_RETRY_DELAY_MS = 6 * 60 * 60 * 1000 ;
1219
1320function isOrphanHeader ( header ) {
1421 if ( ! header ) return false ;
@@ -56,6 +63,52 @@ function withDetail(fields, err, header) {
5663 return fields ;
5764}
5865
66+ function renderEmailTemplate ( item , values , fallback ) {
67+ if ( global . support && typeof global . support . renderEmailTemplate === "function" ) return global . support . renderEmailTemplate ( item , values , fallback ) ;
68+ const template = global . config && global . config . email && typeof global . config . email [ item ] === "string" ? global . config . email [ item ] : fallback ;
69+ return global . support && typeof global . support . formatTemplate === "function"
70+ ? global . support . formatTemplate ( template || "" , values || { } )
71+ : String ( template || "" ) . replace ( / % \( ( [ ^ ) ] + ) \) s / g, function replaceValue ( _match , key ) {
72+ return values && Object . prototype . hasOwnProperty . call ( values , key ) ? String ( values [ key ] ) : "" ;
73+ } ) ;
74+ }
75+
76+ function safeDecodePendingPayload ( job ) {
77+ if ( ! job || typeof job . payload !== "string" || ! global . protos ) return null ;
78+ try {
79+ if ( job . type === "block" ) return global . protos . Block . decode ( Buffer . from ( job . payload , "base64" ) ) ;
80+ if ( job . type === "altblock" ) return global . protos . AltBlock . decode ( Buffer . from ( job . payload , "base64" ) ) ;
81+ } catch ( _error ) { }
82+ return null ;
83+ }
84+
85+ function parseAltPortFromJobKey ( key ) {
86+ if ( typeof key !== "string" ) return null ;
87+ const parts = key . split ( ":" ) ;
88+ if ( parts . length < 2 ) return null ;
89+ const port = parseInt ( parts [ 1 ] , 10 ) ;
90+ return Number . isFinite ( port ) ? port : null ;
91+ }
92+
93+ function formatStalePendingBlock ( job , timeNow ) {
94+ const decoded = safeDecodePendingPayload ( job ) ;
95+ const fields = [ "type=" + job . type ] ;
96+ if ( job . type === "altblock" ) {
97+ const port = decoded && decoded . port ? decoded . port : parseAltPortFromJobKey ( job . key ) ;
98+ if ( port ) fields . push ( "chain=" + formatCoinPort ( port ) ) ;
99+ if ( decoded && decoded . height ) fields . push ( "height=" + decoded . height ) ;
100+ } else {
101+ fields . push ( "chain=" + formatCoinPort ( global . config && global . config . daemon ? global . config . daemon . port : 0 ) ) ;
102+ if ( typeof job . blockId !== "undefined" ) fields . push ( "blockId=" + job . blockId ) ;
103+ }
104+ if ( decoded && decoded . hash ) fields . push ( "hash=" + decoded . hash ) ;
105+ else if ( typeof job . key === "string" ) fields . push ( "job=" + job . key ) ;
106+ fields . push ( "ageDays=" + Math . floor ( Math . max ( 0 , timeNow - job . createdAt ) / DAY_MS ) ) ;
107+ fields . push ( "attempts=" + ( job . attempts || 0 ) ) ;
108+ if ( job . lastError ) fields . push ( "lastError=" + job . lastError ) ;
109+ return fields . join ( " " ) ;
110+ }
111+
59112function createLmdbStorage ( database ) {
60113 let pendingJobDb = null ;
61114
@@ -112,6 +165,7 @@ function createLmdbStorage(database) {
112165
113166 loadDueJobs ( timeNow , limit ) {
114167 const jobsByKey = new Map ( ) ;
168+ let nextDueAt = null ;
115169 const txn = database . env . beginTxn ( { readOnly : true } ) ;
116170 try {
117171 function collectJobs ( key , value ) {
@@ -120,6 +174,9 @@ function createLmdbStorage(database) {
120174 try {
121175 const job = JSON . parse ( value ) ;
122176 if ( job . nextAttemptAt <= timeNow ) jobsByKey . set ( key , job ) ;
177+ else if ( typeof job . nextAttemptAt === "number" && ( nextDueAt === null || job . nextAttemptAt < nextDueAt ) ) {
178+ nextDueAt = job . nextAttemptAt ;
179+ }
123180 } catch ( _error ) {
124181 jobsByKey . set ( key , { key, type : "invalid" , invalid : true } ) ;
125182 }
@@ -134,6 +191,7 @@ function createLmdbStorage(database) {
134191 jobs . sort ( function bySchedule ( left , right ) {
135192 return left . nextAttemptAt - right . nextAttemptAt || left . createdAt - right . createdAt ;
136193 } ) ;
194+ jobs . nextDueAt = nextDueAt ;
137195 return jobs ;
138196 } ,
139197
@@ -169,6 +227,12 @@ module.exports = function createPendingJobs(options) {
169227 const retryDelayMs = opts . retryDelayMs || DEFAULT_RETRY_DELAY_MS ;
170228 const maxRetryDelayMs = opts . maxRetryDelayMs || DEFAULT_MAX_RETRY_DELAY_MS ;
171229 const orphanGraceMs = opts . orphanGraceMs || 10 * 60 * 1000 ;
230+ const stalePendingBlockAgeMs = typeof opts . stalePendingBlockAgeMs === "number" ? opts . stalePendingBlockAgeMs : DEFAULT_STALE_PENDING_BLOCK_AGE_MS ;
231+ const stalePendingBlockAlertCheckMs = typeof opts . stalePendingBlockAlertCheckMs === "number" ? opts . stalePendingBlockAlertCheckMs : DEFAULT_STALE_PENDING_BLOCK_ALERT_CHECK_MS ;
232+ const stalePendingBlockAlertCooldownMs = typeof opts . stalePendingBlockAlertCooldownMs === "number" ? opts . stalePendingBlockAlertCooldownMs : DEFAULT_STALE_PENDING_BLOCK_ALERT_COOLDOWN_MS ;
233+ const stalePendingBlockSampleLimit = typeof opts . stalePendingBlockSampleLimit === "number" ? opts . stalePendingBlockSampleLimit : DEFAULT_STALE_PENDING_BLOCK_SAMPLE_LIMIT ;
234+ const stalePendingBlockProcessLimit = typeof opts . stalePendingBlockProcessLimit === "number" ? opts . stalePendingBlockProcessLimit : DEFAULT_STALE_PENDING_BLOCK_PROCESS_LIMIT ;
235+ const stalePendingBlockRetryDelayMs = typeof opts . stalePendingBlockRetryDelayMs === "number" ? opts . stalePendingBlockRetryDelayMs : DEFAULT_STALE_PENDING_BLOCK_RETRY_DELAY_MS ;
172236 const logger = opts . logger || console ;
173237 const storage = opts . storage || createLmdbStorage ( database ) ;
174238
@@ -182,7 +246,9 @@ module.exports = function createPendingJobs(options) {
182246 badAltBlocks : new Map ( ) ,
183247 potentiallyBadAltBlocks : new Map ( ) ,
184248 orphanSince : new Map ( ) ,
185- loggedStates : new Map ( )
249+ loggedStates : new Map ( ) ,
250+ lastStalePendingBlockAlertCheckAt : 0 ,
251+ nextDueJobCheckAt : 0
186252 } ;
187253
188254 function logState ( jobKey , nextState , message ) {
@@ -252,6 +318,7 @@ module.exports = function createPendingJobs(options) {
252318 const retryBackoffMs = retryDelayMs * retryMultiplier ;
253319 nextRetryDelayMs = Math . min ( retryBackoffMs , maxRetryDelayMs ) ;
254320 }
321+ if ( retryOptions . stale === true ) nextRetryDelayMs = Math . max ( nextRetryDelayMs , stalePendingBlockRetryDelayMs ) ;
255322 job . nextAttemptAt = Math . round ( timeNow + nextRetryDelayMs ) ;
256323 job . lastError = stateName ;
257324 storage . save ( job ) ;
@@ -264,6 +331,87 @@ module.exports = function createPendingJobs(options) {
264331 clearState ( job . key ) ;
265332 }
266333
334+ function isStalePendingBlock ( job , timeNow ) {
335+ if ( ! job || ( job . type !== "block" && job . type !== "altblock" ) ) return false ;
336+ if ( typeof job . createdAt !== "number" ) return false ;
337+ return timeNow - job . createdAt >= stalePendingBlockAgeMs ;
338+ }
339+
340+ function saveStaleThrottle ( job , timeNow ) {
341+ job . nextAttemptAt = Math . round ( timeNow + stalePendingBlockRetryDelayMs ) ;
342+ job . lastError = "stale_pending_throttled" ;
343+ storage . save ( job ) ;
344+ logState ( job . key , "stale_pending_throttled" , formatLogEvent ( "Pending job" , {
345+ job : job . key ,
346+ status : "stale-throttled" ,
347+ ageDays : Math . floor ( Math . max ( 0 , timeNow - job . createdAt ) / DAY_MS )
348+ } ) ) ;
349+ }
350+
351+ function selectJobsForProcessing ( jobs , timeNow ) {
352+ const selected = [ ] ;
353+ const staleLimit = Math . max ( 0 , stalePendingBlockProcessLimit ) ;
354+ let staleSelected = 0 ;
355+ for ( const job of jobs ) {
356+ if ( isStalePendingBlock ( job , timeNow ) ) {
357+ if ( staleSelected >= staleLimit ) {
358+ saveStaleThrottle ( job , timeNow ) ;
359+ continue ;
360+ }
361+ staleSelected += 1 ;
362+ }
363+ selected . push ( job ) ;
364+ }
365+ return selected ;
366+ }
367+
368+ function maybeSendStalePendingBlockAlert ( timeNow ) {
369+ if ( state . lastStalePendingBlockAlertCheckAt && timeNow - state . lastStalePendingBlockAlertCheckAt < stalePendingBlockAlertCheckMs ) return ;
370+ state . lastStalePendingBlockAlertCheckAt = timeNow ;
371+ if ( ! global . support || typeof global . support . sendAdminFyi !== "function" ) return ;
372+ if ( ! global . config || ! global . config . general || ! global . config . general . adminEmail ) return ;
373+
374+ let staleJobs ;
375+ try {
376+ staleJobs = storage . loadAllJobs ( ) . filter ( function findStalePendingBlock ( job ) {
377+ return isStalePendingBlock ( job , timeNow ) ;
378+ } ) ;
379+ } catch ( error ) {
380+ if ( logger && typeof logger . log === "function" ) {
381+ logger . log ( formatLogEvent ( "Pending block alert" , {
382+ status : "failed" ,
383+ detail : formatHeaderErrorDetail ( error )
384+ } ) ) ;
385+ }
386+ return ;
387+ }
388+ if ( staleJobs . length === 0 ) return ;
389+
390+ staleJobs . sort ( function byAge ( left , right ) {
391+ return left . createdAt - right . createdAt || String ( left . key ) . localeCompare ( String ( right . key ) ) ;
392+ } ) ;
393+
394+ const sampleJobs = staleJobs . slice ( 0 , Math . max ( 1 , stalePendingBlockSampleLimit ) ) ;
395+ const jobLines = sampleJobs . map ( function formatJob ( job ) {
396+ return "- " + formatStalePendingBlock ( job , timeNow ) ;
397+ } ) ;
398+ if ( staleJobs . length > sampleJobs . length ) {
399+ jobLines . push ( "- ... " + ( staleJobs . length - sampleJobs . length ) + " more pending block(s) omitted" ) ;
400+ }
401+
402+ const values = {
403+ count : staleJobs . length ,
404+ age_days : Math . floor ( stalePendingBlockAgeMs / DAY_MS ) ,
405+ jobs : jobLines . join ( "\n" )
406+ } ;
407+ global . support . sendAdminFyi (
408+ "remote_share:stale-pending-blocks" ,
409+ renderEmailTemplate ( "remoteShareStalePendingSubject" , values , "FYI: Pending blocks not verified for over a month" ) ,
410+ renderEmailTemplate ( "remoteShareStalePendingBody" , values , "remote_share has %(count)s pending block(s) older than %(age_days)s days.\n\n%(jobs)s\n\nPlease verify wallet/daemon sync and pending_blocks." ) ,
411+ { cooldownMs : stalePendingBlockAlertCooldownMs }
412+ ) ;
413+ }
414+
267415 function saveResolvedBlock ( blockId , blockDataDecoded ) {
268416 const shares = database . getCache ( poolTypeStr ( blockDataDecoded . poolType ) + "_stats2" ) ;
269417 blockDataDecoded . shares = shares ? shares . roundHashes : 0 ;
@@ -355,9 +503,11 @@ module.exports = function createPendingJobs(options) {
355503 }
356504
357505 if ( err || ! header || ! header . reward ) {
506+ const retryOptions = { backoff : true } ;
507+ if ( isStalePendingBlock ( job , timeNow ) ) retryOptions . stale = true ;
358508 saveRetry ( job , "waiting_block_header" , formatLogEvent ( "Pending block" , withDetail ( Object . assign ( { } , blockFields , {
359509 status : "waiting-header-reward"
360- } ) , err , header ) ) , { backoff : true } ) ;
510+ } ) , err , header ) ) , retryOptions ) ;
361511 return callback ( ) ;
362512 }
363513
@@ -419,7 +569,8 @@ module.exports = function createPendingJobs(options) {
419569
420570 const profile = global . coinFuncs . getPoolProfile ( blockDataDecoded . port ) ;
421571 if ( isWaitingForAltDepth ( profile , header ) ) {
422- saveRetry ( job , "waiting_for_depth" , formatLogEvent ( "Pending altblock" , Object . assign ( { } , altBlockFields , { status : "waiting-maturity" } ) ) ) ;
572+ const retryOptions = isStalePendingBlock ( job , timeNow ) ? { stale : true } : undefined ;
573+ saveRetry ( job , "waiting_for_depth" , formatLogEvent ( "Pending altblock" , Object . assign ( { } , altBlockFields , { status : "waiting-maturity" } ) ) , retryOptions ) ;
423574 return callback ( ) ;
424575 }
425576
@@ -431,9 +582,11 @@ module.exports = function createPendingJobs(options) {
431582 const badBlockMap = state . potentiallyBadAltBlocks . get ( blockDataDecoded . port ) || new Map ( ) ;
432583 badBlockMap . set ( blockDataDecoded . hash , timeNow ) ;
433584 state . potentiallyBadAltBlocks . set ( blockDataDecoded . port , badBlockMap ) ;
585+ const retryOptions = { backoff : true } ;
586+ if ( isStalePendingBlock ( job , timeNow ) ) retryOptions . stale = true ;
434587 saveRetry ( job , "waiting_altblock_header" , formatLogEvent ( "Pending altblock" , withDetail ( Object . assign ( { } , altBlockFields , {
435588 status : "waiting-header-reward"
436- } ) , err , header ) ) , { backoff : true } ) ;
589+ } ) , err , header ) ) , retryOptions ) ;
437590 return callback ( ) ;
438591 }
439592
@@ -462,6 +615,7 @@ module.exports = function createPendingJobs(options) {
462615
463616 return {
464617 enqueueBlock ( blockId , payload , block ) {
618+ state . nextDueJobCheckAt = 0 ;
465619 storage . save ( {
466620 key : "block:" + blockId + ":" + block . hash ,
467621 type : "block" ,
@@ -475,6 +629,7 @@ module.exports = function createPendingJobs(options) {
475629 } ,
476630
477631 enqueueAltBlock ( blockId , payload , block ) {
632+ state . nextDueJobCheckAt = 0 ;
478633 storage . save ( {
479634 key : "alt:" + block . port + ":" + block . height + ":" + block . hash ,
480635 type : "altblock" ,
@@ -489,19 +644,28 @@ module.exports = function createPendingJobs(options) {
489644
490645 processDueJobs ( ) {
491646 if ( state . processing || state . closed ) return ;
647+ const timeNow = Date . now ( ) ;
648+ maybeSendStalePendingBlockAlert ( timeNow ) ;
649+ if ( state . nextDueJobCheckAt && timeNow < state . nextDueJobCheckAt ) return ;
492650 state . processing = true ;
493651 try {
494652 pruneState ( ) ;
495653
496- const jobs = storage . loadDueJobs ( Date . now ( ) , 100 ) ;
654+ const jobs = storage . loadDueJobs ( timeNow , 100 ) ;
655+ if ( jobs . length === 0 && typeof jobs . nextDueAt === "number" ) {
656+ state . nextDueJobCheckAt = jobs . nextDueAt ;
657+ } else {
658+ state . nextDueJobCheckAt = 0 ;
659+ }
660+ const jobsToProcess = selectJobsForProcessing ( jobs , timeNow ) ;
497661 let index = 0 ;
498662 const next = ( ) => {
499- if ( index >= jobs . length ) {
663+ if ( index >= jobsToProcess . length ) {
500664 state . processing = false ;
501665 finishCloseIfIdle ( ) ;
502666 return ;
503667 }
504- processJob ( jobs [ index ++ ] , next ) ;
668+ processJob ( jobsToProcess [ index ++ ] , next ) ;
505669 } ;
506670 next ( ) ;
507671 } catch ( error ) {
0 commit comments