@@ -507,20 +507,36 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i
507507
508508 def purge_invalid_jobs (self ):
509509 """
510- Check and remove any invalid/outdated jobs in the JobTable for this autopopulate table
511- Job keys that are in the JobTable (regardless of status) but are no longer in the `key_source`
512- (e.g. jobs added but entries in upstream table(s) got deleted)
513- This is potentially a time-consuming process - but should not expect to have to run very often
510+ Check and remove any invalid/outdated jobs in the JobTable for this autopopulate table.
511+
512+ This method handles two types of invalid jobs:
513+ 1. Jobs that are no longer in the `key_source` (e.g. entries in upstream table(s) got deleted)
514+ 2. Jobs with "success" status that are no longer in the target table (e.g. entries in target table got deleted)
515+
516+ The method is potentially time-consuming as it needs to:
517+ - Compare all jobs against the current key_source
518+ - For success jobs, verify their existence in the target table
519+ - Delete any jobs that fail these checks
520+
521+ This cleanup should not need to run very often, but helps maintain database consistency.
514522 """
515-
516- invalid_count = len (self .jobs ) - len (self ._jobs_to_do ({}))
517523 invalid_removed = 0
518- if invalid_count > 0 :
519- for key , job_key in zip (* self .jobs .fetch ("KEY" , "key" )):
520- if not (self ._jobs_to_do ({}) & job_key ):
524+
525+ invalid_success = len (self .jobs & "status = 'success'" ) - len (self .target )
526+ if invalid_success > 0 :
527+ for key , job_key in zip (* (self .jobs & "status = 'success'" ).fetch ("KEY" , "key" )):
528+ if not (self .target & job_key ):
521529 (self .jobs & key ).delete ()
522530 invalid_removed += 1
523531
524- logger .info (
525- f"{ invalid_removed } /{ invalid_count } invalid jobs removed for `{ to_camel_case (self .target .table_name )} `"
526- )
532+ keys2do = self ._jobs_to_do ({}).fetch ("KEY" )
533+ invalid_incomplete = len (self .jobs & "status != 'success'" ) - len (keys2do )
534+ if invalid_incomplete > 0 :
535+ for key , job_key in zip (* (self .jobs & "status != 'success'" ).fetch ("KEY" , "key" )):
536+ if job_key not in keys2do :
537+ (self .jobs & key ).delete ()
538+ invalid_removed += 1
539+
540+ logger .info (
541+ f"{ invalid_removed } invalid jobs removed for `{ to_camel_case (self .target .table_name )} `"
542+ )
0 commit comments