Skip to content

Commit 070e029

Browse files
Merge remote-tracking branch 'origin/master' into design/restricted-diagram
2 parents b87e6e8 + cda7e1a commit 070e029

File tree

8 files changed

+294
-14
lines changed

8 files changed

+294
-14
lines changed

src/datajoint/autopopulate.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,11 @@ def handler(signum, frame):
493493

494494
# Fetch pending jobs ordered by priority (use CURRENT_TIMESTAMP(3) for datetime(3) precision)
495495
pending_query = self.jobs.pending & "scheduled_time <= CURRENT_TIMESTAMP(3)"
496+
if restrictions:
497+
# Restrict to jobs whose keys match the caller's restrictions.
498+
# semantic_check=False is required because the jobs table PK has
499+
# different lineage than key_source (see jobs.py refresh()).
500+
pending_query = pending_query.restrict(self._jobs_to_do(restrictions), semantic_check=False)
496501
if priority is not None:
497502
pending_query = pending_query & f"priority <= {priority}"
498503

src/datajoint/builtin_codecs/filepath.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,17 +145,22 @@ def encode(self, value: Any, *, key: dict | None = None, store_name: str | None
145145
if not backend.exists(path):
146146
raise FileNotFoundError(f"File not found in store '{store_name or 'default'}': {path}")
147147

148-
# Get file info
149-
try:
150-
size = backend.size(path)
151-
except Exception:
152-
size = None
148+
# Detect whether the path is a directory or a file
149+
is_dir = backend.isdir(path)
150+
151+
# Get file size (not applicable for directories)
152+
size = None
153+
if not is_dir:
154+
try:
155+
size = backend.size(path)
156+
except (FileNotFoundError, OSError):
157+
pass
153158

154159
return {
155160
"path": path,
156161
"store": store_name,
157162
"size": size,
158-
"is_dir": False,
163+
"is_dir": is_dir,
159164
"timestamp": datetime.now(timezone.utc).isoformat(),
160165
}
161166

src/datajoint/objectref.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,8 @@ def _verify_folder(self) -> bool:
379379
manifest_path = f"{self.path}.manifest.json"
380380

381381
if not self._backend.exists(manifest_path):
382-
raise IntegrityError(f"Manifest file missing: {manifest_path}")
382+
# Directory was stored without a manifest — treat as unverified but valid
383+
return True
383384

384385
# Read manifest
385386
manifest_data = self._backend.get_buffer(manifest_path)

src/datajoint/storage.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ def get_buffer(self, remote_path: str | PurePosixPath) -> bytes:
555555

556556
def exists(self, remote_path: str | PurePosixPath) -> bool:
557557
"""
558-
Check if a file exists in storage.
558+
Check if a path (file or directory) exists in storage.
559559
560560
Parameters
561561
----------
@@ -565,15 +565,28 @@ def exists(self, remote_path: str | PurePosixPath) -> bool:
565565
Returns
566566
-------
567567
bool
568-
True if file exists.
568+
True if the path exists.
569569
"""
570570
full_path = self._full_path(remote_path)
571571
logger.debug(f"exists: {self.protocol}:{full_path}")
572+
return self.fs.exists(full_path)
572573

573-
if self.protocol == "file":
574-
return Path(full_path).is_file()
575-
else:
576-
return self.fs.exists(full_path)
574+
def isdir(self, remote_path: str | PurePosixPath) -> bool:
575+
"""
576+
Check if a path refers to a directory in storage.
577+
578+
Parameters
579+
----------
580+
remote_path : str or PurePosixPath
581+
Path in storage.
582+
583+
Returns
584+
-------
585+
bool
586+
True if the path is a directory.
587+
"""
588+
full_path = self._full_path(remote_path)
589+
return self.fs.isdir(full_path)
577590

578591
def remove(self, remote_path: str | PurePosixPath) -> None:
579592
"""

src/datajoint/table.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,13 @@ def insert(
746746
replace : bool, optional
747747
If True, replaces the existing tuple.
748748
skip_duplicates : bool, optional
749-
If True, silently skip duplicate inserts.
749+
If True, silently skip rows with duplicate primary key values.
750+
On **PostgreSQL**, secondary unique constraint violations still
751+
raise an error even when ``skip_duplicates=True``, because the
752+
generated ``ON CONFLICT (pk) DO NOTHING`` clause targets only
753+
the primary key. On **MySQL**, ``ON DUPLICATE KEY UPDATE``
754+
catches all unique-key conflicts, so secondary unique violations
755+
are also silently skipped.
750756
ignore_extra_fields : bool, optional
751757
If False (default), fields that are not in the heading raise error.
752758
allow_direct_insert : bool, optional

tests/integration/test_autopopulate.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,3 +352,46 @@ def make_insert(self, key, result, scale):
352352
row = (TripartiteComputed & "source_id = 2").fetch1()
353353
assert row["scale"] == 5
354354
assert row["result"] == 1000 # 200 * 5
355+
356+
357+
def test_populate_reserve_jobs_respects_restrictions(clean_autopopulate, subject, experiment):
358+
"""Regression test for #1413: populate() with reserve_jobs=True must honour restrictions.
359+
360+
Previously _populate_distributed() refreshed the job queue with the
361+
restriction but then fetched *all* pending jobs, ignoring the restriction
362+
and processing every pending key.
363+
"""
364+
assert subject, "subject table is empty"
365+
assert not experiment, "experiment table already has rows"
366+
367+
# Clear any stale jobs from previous tests (success/error entries would
368+
# prevent refresh() from re-adding them as pending).
369+
experiment.jobs.delete_quick()
370+
371+
# Refresh the full job queue (no restriction) so that all subjects have
372+
# pending jobs — this simulates the real-world scenario where workers share
373+
# a single job queue but each worker restricts to its own subset.
374+
experiment.jobs.refresh(delay=-1)
375+
total_pending = len(experiment.jobs.pending)
376+
assert total_pending > 0, "job refresh produced no pending entries"
377+
378+
# Pick one subject to use as the restriction.
379+
first_subject_id = subject.keys(order_by="subject_id ASC", limit=1)[0]["subject_id"]
380+
restriction = {"subject_id": first_subject_id}
381+
382+
# Populate only for the restricted subject. refresh=False so we use the
383+
# existing queue populated above. The bug was that this call would process
384+
# ALL pending jobs instead of only those matching the restriction.
385+
experiment.populate(restriction, reserve_jobs=True, refresh=False)
386+
387+
# Only rows for the restricted subject should exist.
388+
assert len(experiment) > 0, "no rows were populated"
389+
assert len(experiment - restriction) == 0, (
390+
"populate(reserve_jobs=True) processed keys outside the restriction "
391+
f"({len(experiment - restriction)} extra rows found)"
392+
)
393+
394+
# Rows for all other subjects must still be absent.
395+
other_subjects = subject - restriction
396+
if other_subjects:
397+
assert len(experiment & other_subjects.proj()) == 0, "rows for unrestricted subjects were incorrectly populated"
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
"""
2+
Tests for skip_duplicates behavior with secondary unique constraints.
3+
4+
Verifies that skip_duplicates=True on PostgreSQL skips primary key
5+
duplicates while still raising on secondary unique constraint violations.
6+
Resolves #1049.
7+
"""
8+
9+
import time
10+
11+
import pytest
12+
13+
import datajoint as dj
14+
from datajoint.errors import DuplicateError
15+
16+
17+
@pytest.fixture(scope="function")
18+
def schema_by_backend(connection_by_backend, db_creds_by_backend):
19+
"""Create a fresh schema per test, parameterized across backends."""
20+
backend = db_creds_by_backend["backend"]
21+
test_id = str(int(time.time() * 1000))[-8:]
22+
schema_name = f"djtest_skipdup_{backend}_{test_id}"[:64]
23+
24+
if connection_by_backend.is_connected:
25+
try:
26+
connection_by_backend.query(
27+
f"DROP DATABASE IF EXISTS {connection_by_backend.adapter.quote_identifier(schema_name)}"
28+
)
29+
except Exception:
30+
pass
31+
32+
schema = dj.Schema(schema_name, connection=connection_by_backend)
33+
yield schema
34+
35+
if connection_by_backend.is_connected:
36+
try:
37+
connection_by_backend.query(
38+
f"DROP DATABASE IF EXISTS {connection_by_backend.adapter.quote_identifier(schema_name)}"
39+
)
40+
except Exception:
41+
pass
42+
43+
44+
def test_skip_duplicates_pk_match(schema_by_backend):
45+
"""skip_duplicates=True silently skips rows whose PK already exists."""
46+
47+
@schema_by_backend
48+
class Item(dj.Manual):
49+
definition = """
50+
item_id : int
51+
---
52+
name : varchar(100)
53+
email : varchar(100)
54+
unique index (email)
55+
"""
56+
57+
Item.insert1(dict(item_id=1, name="Alice", email="alice@example.com"))
58+
59+
# Same PK, different values — should be silently skipped
60+
Item.insert1(
61+
dict(item_id=1, name="Bob", email="bob@example.com"),
62+
skip_duplicates=True,
63+
)
64+
65+
# Original row unchanged
66+
row = (Item & "item_id=1").fetch1()
67+
assert row["name"] == "Alice"
68+
assert row["email"] == "alice@example.com"
69+
70+
71+
def test_skip_duplicates_unique_violation_raises_on_postgres(schema_by_backend, db_creds_by_backend):
72+
"""On PostgreSQL, skip_duplicates=True still raises on secondary unique violations.
73+
74+
Regression test for #1049: a row with a *new* PK but a *conflicting*
75+
secondary unique index value must raise DuplicateError on PostgreSQL.
76+
"""
77+
if db_creds_by_backend["backend"] != "postgresql":
78+
pytest.skip("PostgreSQL-specific: ON CONFLICT (pk) DO NOTHING preserves unique constraints")
79+
80+
@schema_by_backend
81+
class Item(dj.Manual):
82+
definition = """
83+
item_id : int
84+
---
85+
name : varchar(100)
86+
email : varchar(100)
87+
unique index (email)
88+
"""
89+
90+
Item.insert1(dict(item_id=1, name="Alice", email="alice@example.com"))
91+
92+
# New PK (2) but email conflicts with existing row (1)
93+
with pytest.raises(DuplicateError):
94+
Item.insert1(
95+
dict(item_id=2, name="Bob", email="alice@example.com"),
96+
skip_duplicates=True,
97+
)
98+
99+
100+
def test_skip_duplicates_unique_on_mysql(schema_by_backend, db_creds_by_backend):
101+
"""On MySQL, skip_duplicates=True silently skips secondary unique conflicts.
102+
103+
Documents the known MySQL asymmetry: ON DUPLICATE KEY UPDATE catches
104+
all unique key conflicts, not just primary key.
105+
"""
106+
if db_creds_by_backend["backend"] != "mysql":
107+
pytest.skip("MySQL-specific: ON DUPLICATE KEY UPDATE catches all unique keys")
108+
109+
@schema_by_backend
110+
class Item(dj.Manual):
111+
definition = """
112+
item_id : int
113+
---
114+
name : varchar(100)
115+
email : varchar(100)
116+
unique index (email)
117+
"""
118+
119+
Item.insert1(dict(item_id=1, name="Alice", email="alice@example.com"))
120+
121+
# New PK (2) but email conflicts — MySQL silently skips
122+
Item.insert1(
123+
dict(item_id=2, name="Bob", email="alice@example.com"),
124+
skip_duplicates=True,
125+
)
126+
127+
# Only the original row exists
128+
assert len(Item()) == 1
129+
assert (Item & "item_id=1").fetch1()["name"] == "Alice"
130+
131+
132+
def test_skip_duplicates_no_unique_index(schema_by_backend):
133+
"""skip_duplicates=True works normally on tables without secondary unique indexes."""
134+
135+
@schema_by_backend
136+
class Simple(dj.Manual):
137+
definition = """
138+
item_id : int
139+
---
140+
name : varchar(100)
141+
"""
142+
143+
Simple.insert1(dict(item_id=1, name="Alice"))
144+
145+
# Same PK, different name — silently skipped
146+
Simple.insert1(dict(item_id=1, name="Bob"), skip_duplicates=True)
147+
assert (Simple & "item_id=1").fetch1()["name"] == "Alice"
148+
149+
# New PK — inserted
150+
Simple.insert1(dict(item_id=2, name="Bob"), skip_duplicates=True)
151+
assert len(Simple()) == 2
152+
153+
154+
def test_skip_duplicates_composite_unique(schema_by_backend, db_creds_by_backend):
155+
"""skip_duplicates=True with a composite secondary unique index."""
156+
if db_creds_by_backend["backend"] != "postgresql":
157+
pytest.skip("PostgreSQL-specific unique constraint enforcement")
158+
159+
@schema_by_backend
160+
class Record(dj.Manual):
161+
definition = """
162+
record_id : int
163+
---
164+
first_name : varchar(100)
165+
last_name : varchar(100)
166+
data : varchar(255)
167+
unique index (first_name, last_name)
168+
"""
169+
170+
Record.insert1(dict(record_id=1, first_name="Alice", last_name="Smith", data="v1"))
171+
172+
# New PK but composite unique (first_name, last_name) conflicts
173+
with pytest.raises(DuplicateError):
174+
Record.insert1(
175+
dict(record_id=2, first_name="Alice", last_name="Smith", data="v2"),
176+
skip_duplicates=True,
177+
)
178+
179+
180+
def test_skip_duplicates_batch_mixed(schema_by_backend, db_creds_by_backend):
181+
"""Batch insert with skip_duplicates=True: PK duplicates skipped, unique conflicts raise."""
182+
if db_creds_by_backend["backend"] != "postgresql":
183+
pytest.skip("PostgreSQL-specific unique constraint enforcement")
184+
185+
@schema_by_backend
186+
class Item(dj.Manual):
187+
definition = """
188+
item_id : int
189+
---
190+
email : varchar(100)
191+
unique index (email)
192+
"""
193+
194+
Item.insert1(dict(item_id=1, email="alice@example.com"))
195+
196+
# Batch: row 2 is new (OK), row 1 is PK dup (skip), row 3 conflicts on email
197+
with pytest.raises(DuplicateError):
198+
Item.insert(
199+
[
200+
dict(item_id=2, email="bob@example.com"),
201+
dict(item_id=1, email="duplicate-pk@example.com"), # PK dup — skipped
202+
dict(item_id=3, email="alice@example.com"), # unique conflict — error
203+
],
204+
skip_duplicates=True,
205+
)

tests/unit/test_codecs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,7 @@ def test_filepath_allows_user_paths(self):
548548
with patch("datajoint.hash_registry.get_store_backend") as mock_get_backend:
549549
mock_backend = MagicMock()
550550
mock_backend.exists.return_value = True
551+
mock_backend.isdir.return_value = False
551552
mock_backend.size.return_value = 1024
552553
mock_get_backend.return_value = mock_backend
553554

@@ -636,6 +637,7 @@ def test_filepath_enforces_filepath_prefix(self):
636637
with patch("datajoint.hash_registry.get_store_backend") as mock_get_backend:
637638
mock_backend = MagicMock()
638639
mock_backend.exists.return_value = True
640+
mock_backend.isdir.return_value = False
639641
mock_backend.size.return_value = 3072
640642
mock_get_backend.return_value = mock_backend
641643

0 commit comments

Comments
 (0)