Skip to content

Commit 45b5658

Browse files
committed
feat: improve logic to prevent excessive scheduling
1 parent 3f5247b commit 45b5658

1 file changed

Lines changed: 18 additions & 17 deletions

File tree

datajoint/autopopulate.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,8 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i
453453
Schedule new jobs for this autopopulate table by finding keys that need computation.
454454
455455
This method implements an optimization strategy to avoid excessive scheduling:
456-
1. First checks if any jobs were scheduled recently (within min_scheduling_interval)
457-
2. If recent jobs exist, skips scheduling to prevent database load
456+
1. First checks if jobs were scheduled recently (within min_scheduling_interval)
457+
2. If recent scheduling event exists, skips scheduling to prevent database load
458458
3. Otherwise, finds keys that need computation and schedules them
459459
460460
The method also optionally purges invalid jobs (jobs that no longer exist in key_source)
@@ -469,22 +469,23 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i
469469
Returns:
470470
None
471471
"""
472+
__scheduled_event = {
473+
"table_name": self.target.table_name,
474+
"__type__": "jobs scheduling event"
475+
}
476+
472477
if min_scheduling_interval is None:
473478
min_scheduling_interval = config["min_scheduling_interval"]
474479

475-
# First check if we have any recent jobs
476480
if min_scheduling_interval > 0:
477-
recent_jobs = len(
478-
self.jobs
479-
& {"status": "scheduled"}
480-
& f"timestamp <= UTC_TIMESTAMP()" # Only consider jobs up to current UTC time
481+
recent_scheduling_event = (
482+
self._Jobs
483+
& {"table_name": f"__{self.target.table_name}__"}
484+
& {"key_hash": key_hash(__scheduled_event)}
481485
& f"timestamp >= DATE_SUB(UTC_TIMESTAMP(), INTERVAL {min_scheduling_interval} SECOND)"
482486
)
483-
if recent_jobs > 0:
484-
logger.debug(
485-
f"Skipping job scheduling for `{to_camel_case(self.target.table_name)}` - "
486-
f"found {recent_jobs} jobs created within last {min_scheduling_interval} seconds"
487-
)
487+
if recent_scheduling_event:
488+
logger.debug(f"Skipping jobs scheduling for `{to_camel_case(self.target.table_name)}` (most recent scheduling event was within {min_scheduling_interval} seconds)")
488489
return
489490

490491
try:
@@ -495,6 +496,8 @@ def schedule_jobs(self, *restrictions, purge_invalid_jobs=True, min_scheduling_i
495496
except Exception as e:
496497
logger.exception(str(e))
497498
else:
499+
self._Jobs.ignore(f"__{self.target.table_name}__", __scheduled_event,
500+
message=f"Jobs scheduling event: {__scheduled_event['table_name']}")
498501
logger.info(
499502
f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`"
500503
)
@@ -510,14 +513,12 @@ def purge_invalid_jobs(self):
510513
This is potentially a time-consuming process - but should not expect to have to run very often
511514
"""
512515

513-
jobs_query = self._Jobs & {"table_name": self.target.table_name}
514-
515-
invalid_count = len(jobs_query) - len(self._jobs_to_do({}))
516+
invalid_count = len(self.jobs) - len(self._jobs_to_do({}))
516517
invalid_removed = 0
517518
if invalid_count > 0:
518-
for key, job_key in zip(*jobs_query.fetch("KEY", "key")):
519+
for key, job_key in zip(*self.jobs.fetch("KEY", "key")):
519520
if not (self._jobs_to_do({}) & job_key):
520-
(jobs_query & key).delete()
521+
(self.jobs & key).delete()
521522
invalid_removed += 1
522523

523524
logger.info(

0 commit comments

Comments
 (0)