Skip to content

Commit 14d44d7

Browse files
Muad Abd El Hayclaude
authored andcommitted
Fix populate antijoin to use .proj() for correct pending key computation
The antijoin that computes pending keys (`key_source - self` in `_populate_direct`, `key_source - self._target` in `jobs.refresh`, and `todo - self` in `progress`) did not project the target table to its primary key before the subtraction. When the target table has secondary (non-PK) attributes, the antijoin fails to match on primary key alone and returns all keys instead of just the unpopulated ones. This caused: - `populate(reserve_jobs=False)`: all key_source entries were iterated instead of just pending ones (mitigated by `if key in self:` check inside `_populate1`, but wasted time on large tables) - `populate(reserve_jobs=True)`: `jobs.refresh()` inserted all keys into the jobs table as 'pending', not just truly pending ones. Workers then wasted their `max_calls` budget processing already-completed entries before reaching any real work. - `progress()`: reported incorrect remaining counts in some cases Fix: add `.proj()` to the target side of all three antijoins so the subtraction matches on primary key only, consistent with how DataJoint antijoins are meant to work. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0bd6e02 commit 14d44d7

File tree

3 files changed

+76
-3
lines changed

3 files changed

+76
-3
lines changed

src/datajoint/autopopulate.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ def _populate_direct(
403403
"""
404404
from tqdm import tqdm
405405

406-
keys = (self._jobs_to_do(restrictions) - self).keys()
406+
keys = (self._jobs_to_do(restrictions) - self.proj()).keys()
407407

408408
logger.debug("Found %d keys to populate" % len(keys))
409409

@@ -701,7 +701,7 @@ def progress(self, *restrictions: Any, display: bool = False) -> tuple[int, int]
701701
if not common_attrs:
702702
# No common attributes - fall back to two-query method
703703
total = len(todo)
704-
remaining = len(todo - self)
704+
remaining = len(todo - self.proj())
705705
else:
706706
# Build a single query that computes both total and remaining
707707
# Using LEFT JOIN with COUNT(DISTINCT) to handle 1:many relationships

src/datajoint/jobs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ def refresh(
370370

371371
# Keys that need jobs: in key_source, not in target, not in jobs
372372
# Disable semantic_check for Job table (self) because its attributes may not have matching lineage
373-
new_keys = (key_source - self._target).restrict(Not(self), semantic_check=False).proj()
373+
new_keys = (key_source - self._target.proj()).restrict(Not(self), semantic_check=False).proj()
374374
new_key_list = new_keys.keys()
375375

376376
if new_key_list:

tests/integration/test_autopopulate.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,79 @@ def test_allow_insert(clean_autopopulate, subject, experiment):
112112
experiment.insert1(key)
113113

114114

115+
def test_populate_antijoin_with_secondary_attrs(clean_autopopulate, subject, experiment):
116+
"""Test that populate correctly computes pending keys via antijoin.
117+
118+
Regression test for a bug where `key_source - self` returned all keys
119+
instead of just unpopulated ones when the target table has secondary
120+
attributes. The antijoin must match on primary key only, ignoring
121+
secondary attributes. Without `.proj()`, the antijoin could fail to
122+
exclude already-populated keys.
123+
124+
This affected both direct mode (reserve_jobs=False) and distributed mode
125+
(reserve_jobs=True), causing workers to waste time re-checking already
126+
completed entries.
127+
"""
128+
assert subject, "root tables are empty"
129+
assert not experiment, "table already filled?"
130+
131+
total_keys = len(experiment.key_source)
132+
assert total_keys > 0
133+
134+
# Partially populate (only 2 entries)
135+
experiment.populate(max_calls=2)
136+
assert len(experiment) == 2
137+
138+
# The critical test: key_source - target must return only unpopulated keys.
139+
# Before the fix, this returned all keys (== total_keys) because the
140+
# antijoin failed to match on PK when secondary attributes were present.
141+
pending = experiment.key_source - experiment
142+
assert len(pending) == total_keys - 2, (
143+
f"Antijoin returned {len(pending)} pending keys, expected {total_keys - 2}. "
144+
f"key_source - target may not be matching on primary key only."
145+
)
146+
147+
# Also verify progress() reports correct counts
148+
remaining, total = experiment.progress()
149+
assert total == total_keys
150+
assert remaining == total_keys - 2
151+
152+
# Populate the rest and verify antijoin returns 0
153+
experiment.populate()
154+
pending_after = experiment.key_source - experiment
155+
assert len(pending_after) == 0, (
156+
f"Antijoin returned {len(pending_after)} pending keys after full populate, expected 0."
157+
)
158+
159+
160+
def test_populate_distributed_antijoin(clean_autopopulate, subject, experiment):
161+
"""Test that reserve_jobs=True correctly identifies pending keys.
162+
163+
When using distributed mode, jobs.refresh() must only insert truly pending
164+
keys into the jobs table, not already-completed ones. This verifies the
165+
antijoin in jobs.refresh() works correctly with secondary attributes.
166+
"""
167+
assert subject, "root tables are empty"
168+
assert not experiment, "table already filled?"
169+
170+
total_keys = len(experiment.key_source)
171+
172+
# Partially populate
173+
experiment.populate(max_calls=2)
174+
assert len(experiment) == 2
175+
176+
# Refresh jobs — should only create entries for unpopulated keys
177+
experiment.jobs.refresh(delay=-1)
178+
pending_jobs = len(experiment.jobs.pending)
179+
assert pending_jobs == total_keys - 2, (
180+
f"jobs.refresh() created {pending_jobs} pending jobs, expected {total_keys - 2}. "
181+
f"The antijoin in refresh() may not be excluding already-completed keys."
182+
)
183+
184+
# Clean up
185+
experiment.jobs.delete_quick()
186+
187+
115188
def test_load_dependencies(prefix, connection_test):
116189
schema = dj.Schema(f"{prefix}_load_dependencies_populate", connection=connection_test)
117190

0 commit comments

Comments
 (0)