Skip to content

Commit 8430e2a

Browse files
committed
Simplify reserve() to use update1
- reserve() now uses update1 instead of raw SQL - Remove status='pending' check since populate verifies this - Change return type from bool to None - Update autopopulate.py to not check reserve return value - Update tests to reflect new behavior
1 parent 608020a commit 8430e2a

3 files changed

Lines changed: 30 additions & 42 deletions

File tree

src/datajoint/autopopulate.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,9 +390,9 @@ def _populate1(self, key, jobs, suppress_errors, return_exception_objects, make_
390390
job_key = self._job_key(key)
391391
start_time = time.time()
392392

393-
# Try to reserve the job (per-key, before make)
394-
if jobs is not None and not jobs.reserve(job_key):
395-
return False
393+
# Reserve the job (per-key, before make)
394+
if jobs is not None:
395+
jobs.reserve(job_key)
396396

397397
# if make is a generator, transaction can be delayed until the final stage
398398
is_generator = inspect.isgeneratorfunction(make)

src/datajoint/jobs.py

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -328,41 +328,31 @@ def _insert_job_with_delay(self, key: dict, priority: int, delay: float) -> None
328328
"""
329329
self.connection.query(sql)
330330

331-
def reserve(self, key: dict) -> bool:
331+
def reserve(self, key: dict) -> None:
332332
"""
333-
Attempt to reserve a job for processing.
333+
Reserve a job for processing.
334334
335-
Updates status to 'reserved' if currently 'pending' and scheduled_time <= now.
335+
Updates the job record to 'reserved' status. The caller (populate) is
336+
responsible for verifying the job is pending before calling this method.
336337
337338
Args:
338339
key: Primary key dict for the job
339-
340-
Returns:
341-
True if reservation successful, False if job not found or not pending.
342340
"""
343341
self._ensure_declared()
344342

345-
# Build WHERE clause for the key
346343
pk_attrs = [name for name, _ in self._get_fk_derived_primary_key()]
347-
key_conditions = " AND ".join(
348-
f"`{attr}`='{key[attr]}'" if isinstance(key[attr], str) else f"`{attr}`={key[attr]}" for attr in pk_attrs
349-
)
344+
job_key = {attr: key[attr] for attr in pk_attrs if attr in key}
350345

351-
# Attempt atomic update: pending -> reserved
352-
sql = f"""
353-
UPDATE {self.full_table_name}
354-
SET status='reserved',
355-
reserved_time=NOW(6),
356-
user='{self._user}',
357-
host='{platform.node()}',
358-
pid={os.getpid()},
359-
connection_id={self.connection.connection_id}
360-
WHERE {key_conditions}
361-
AND status='pending'
362-
AND scheduled_time <= NOW(6)
363-
"""
364-
result = self.connection.query(sql)
365-
return result.rowcount > 0
346+
update_row = {
347+
**job_key,
348+
"status": "reserved",
349+
"reserved_time": datetime.now(),
350+
"user": self._user,
351+
"host": platform.node(),
352+
"pid": os.getpid(),
353+
"connection_id": self.connection.connection_id,
354+
}
355+
self.update1(update_row)
366356

367357
def complete(self, key: dict, duration: float = None, keep: bool = None) -> None:
368358
"""

tests/test_jobs.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,32 +97,30 @@ def test_reserve_pending_job(self, schema_any):
9797

9898
# Get first pending job
9999
key = jobs.pending.fetch("KEY", limit=1)[0]
100-
assert jobs.reserve(key)
100+
jobs.reserve(key)
101101

102102
# Verify status changed
103103
status = (jobs & key).fetch1("status")
104104
assert status == "reserved"
105105

106-
def test_reserve_already_reserved(self, schema_any):
107-
"""Test that reserve() returns False for already reserved job."""
106+
def test_reserve_sets_metadata(self, schema_any):
107+
"""Test that reserve() sets user, host, pid, connection_id."""
108108
table = schema.SigIntTable()
109109
jobs = table.jobs
110110
jobs.delete()
111111
jobs.refresh()
112112

113113
key = jobs.pending.fetch("KEY", limit=1)[0]
114-
assert jobs.reserve(key)
115-
assert not jobs.reserve(key) # Second reserve should fail
116-
117-
def test_reserve_scheduled_future(self, schema_any):
118-
"""Test that reserve() fails for jobs scheduled in the future."""
119-
table = schema.SigIntTable()
120-
jobs = table.jobs
121-
jobs.delete()
122-
jobs.refresh(delay=3600) # 1 hour delay
114+
jobs.reserve(key)
123115

124-
key = jobs.fetch("KEY", limit=1)[0]
125-
assert not jobs.reserve(key) # Should fail - not yet scheduled
116+
# Verify metadata was set
117+
row = (jobs & key).fetch1()
118+
assert row["status"] == "reserved"
119+
assert row["reserved_time"] is not None
120+
assert row["user"] != ""
121+
assert row["host"] != ""
122+
assert row["pid"] > 0
123+
assert row["connection_id"] > 0
126124

127125

128126
class TestJobsComplete:

0 commit comments

Comments
 (0)