77import random
88import signal
99import traceback
10+ import os
11+ import platform
1012
1113import deepdiff
1214from tqdm import tqdm
@@ -270,7 +272,7 @@ def populate(
270272 )
271273
272274 if schedule_jobs :
273- self .schedule_jobs (* restrictions , purge_invalid_jobs = False )
275+ self .schedule_jobs (* restrictions )
274276
275277 # define and set up signal handler for SIGTERM:
276278 if reserve_jobs :
@@ -344,7 +346,7 @@ def handler(signum, frame):
344346 del self .connection ._conn .ctx # SSLContext is not pickleable
345347 with (
346348 mp .Pool (
347- processes , _initialize_populate , (self , jobs , populate_kwargs )
349+ processes , _initialize_populate , (self , True , populate_kwargs )
348350 ) as pool ,
349351 (
350352 tqdm (desc = "Processes: " , total = nkeys )
@@ -375,10 +377,10 @@ def handler(signum, frame):
375377 def _populate1 (
376378 self ,
377379 key ,
378- reserve_jobs ,
379- suppress_errors ,
380- return_exception_objects ,
381- make_kwargs = None ,
380+ reserve_jobs : bool ,
381+ suppress_errors : bool ,
382+ return_exception_objects : bool ,
383+ make_kwargs : dict = None ,
382384 ):
383385 """
384386 populates table for one source key, calling self.make inside a transaction.
@@ -475,6 +477,7 @@ def _populate1(
475477 datetime .datetime .utcnow () - make_start
476478 ).total_seconds (),
477479 )
480+
478481 logger .debug (f"Success making { key } -> { self .target .full_table_name } " )
479482 return True
480483 finally :
@@ -511,7 +514,7 @@ def _Jobs(self):
511514 def jobs (self ):
512515 return self ._Jobs & {"table_name" : self .target .table_name }
513516
514- def schedule_jobs (self , * restrictions , purge_invalid_jobs = True , min_scheduling_interval = None ):
517+ def schedule_jobs (self , * restrictions , purge_jobs = False , min_scheduling_interval = None ):
515518 """
516519 Schedule new jobs for this autopopulate table by finding keys that need computation.
517520
@@ -525,7 +528,7 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i
525528
526529 Args:
527530 restrictions: a list of restrictions each restrict (table.key_source - target.proj())
528- purge_invalid_jobs : if True, remove invalid entry from the jobs table (potentially expensive operation)
531+ purge_jobs : if True, remove orphaned jobs from the jobs table (potentially expensive operation)
529532 min_scheduling_interval: minimum time in seconds that must have passed since last job scheduling.
530533 If None, uses the value from dj.config["min_scheduling_interval"] (default: None)
531534
@@ -565,14 +568,14 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i
565568 f"{ schedule_count } new jobs scheduled for `{ to_camel_case (self .target .table_name )} `"
566569 )
567570 finally :
568- if purge_invalid_jobs :
569- self .purge_invalid_jobs ()
571+ if purge_jobs :
572+ self .purge_jobs ()
570573
571- def purge_invalid_jobs (self ):
574+ def purge_jobs (self ):
572575 """
573- Check and remove any invalid /outdated jobs in the JobTable for this autopopulate table.
576+ Check and remove any orphaned /outdated jobs in the JobTable for this autopopulate table.
574577
575- This method handles two types of invalid jobs:
578+ This method handles two types of orphaned jobs:
576579 1. Jobs that are no longer in the `key_source` (e.g. entries in upstream table(s) got deleted)
577580 2. Jobs with "success" status that are no longer in the target table (e.g. entries in target table got deleted)
578581
0 commit comments