Skip to content

Commit 71b0696

Browse files
author
Thinh Nguyen
committed
bugfix, add tests
1 parent 8b9ac0f commit 71b0696

4 files changed

Lines changed: 104 additions & 20 deletions

File tree

datajoint/autopopulate.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ def _call_populate1(key):
4141
:return: key, error if error, otherwise None
4242
"""
4343
process = mp.current_process()
44-
return process.table._populate1(key, process.jobs, **process.populate_kwargs)
44+
return process.table._populate1(
45+
key, process.reserve_jobs, **process.populate_kwargs
46+
)
4547

4648

4749
class AutoPopulate:
@@ -208,13 +210,18 @@ def handler(signum, frame):
208210
old_handler = signal.signal(signal.SIGTERM, handler)
209211

210212
if schedule_jobs:
211-
self.schedule_jobs()
213+
self.schedule_jobs(*restrictions)
212214

213215
keys = (
214216
self._Jobs
215217
& {"table_name": self.target.table_name}
216218
& 'status = "scheduled"'
217-
).fetch("KEY", limit=limit)
219+
).fetch("key", limit=limit)
220+
221+
if restrictions:
222+
# hitting the `key_source` again to apply the restrictions
223+
# this is expensive/suboptimal
224+
keys = (self._jobs_to_do(restrictions) & keys).fetch("KEY", limit=limit)
218225
else:
219226
keys = (self._jobs_to_do(restrictions) - self.target).fetch(
220227
"KEY", limit=limit
@@ -338,6 +345,9 @@ def _populate1(
338345
self._job_key(key),
339346
error_message=error_message,
340347
error_stack=traceback.format_exc(),
348+
run_duration=(
349+
datetime.datetime.utcnow() - make_start
350+
).total_seconds(),
341351
)
342352
if not suppress_errors or isinstance(error, SystemExit):
343353
raise
@@ -391,16 +401,18 @@ def _Jobs(self):
391401
def jobs(self):
392402
return self._Jobs & {"table_name": self.target.table_name}
393403

394-
def schedule_jobs(self, purge_invalid_jobs=True):
404+
def schedule_jobs(self, *restrictions, purge_invalid_jobs=True):
395405
"""
396406
Schedule new jobs for this autopopulate table
407+
:param restrictions: a list of restrictions each restrict
408+
(table.key_source - target.proj())
397409
:param purge_invalid_jobs: if True, remove invalid entry from the jobs table (potentially expensive operation)
398410
:return:
399411
"""
400412
try:
401413
with self.connection.transaction:
402414
schedule_count = 0
403-
for key in (self._jobs_to_do({}) - self.target).fetch("KEY"):
415+
for key in (self._jobs_to_do(restrictions) - self.target).fetch("KEY"):
404416
schedule_count += self._Jobs.schedule(self.target.table_name, key)
405417
except Exception as e:
406418
logger.exception(str(e))
@@ -425,11 +437,11 @@ def purge_invalid_jobs(self):
425437
invalid_count = len(jobs_query) - len(self._jobs_to_do({}))
426438
invalid_removed = 0
427439
if invalid_count > 0:
428-
for key, job_key in jobs_query.fetch("KEY", "key"):
440+
for key, job_key in zip(*jobs_query.fetch("KEY", "key")):
429441
if not (self._jobs_to_do({}) & job_key):
430442
(jobs_query & key).delete()
431443
invalid_removed += 1
432444

433-
logger.info(
434-
f"{invalid_removed}/{invalid_count} invalid jobs removed for `{to_camel_case(self.target.table_name)}`"
435-
)
445+
logger.info(
446+
f"{invalid_removed}/{invalid_count} invalid jobs removed for `{to_camel_case(self.target.table_name)}`"
447+
)

datajoint/jobs.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,9 @@ def schedule(self, table_name, key, seconds_delay=0, force=False):
7979
job_key = dict(table_name=table_name, key_hash=key_hash(key))
8080
if self & job_key:
8181
current_status = (self & job_key).fetch1("status")
82-
if current_status in ("scheduled", "reserved", "success"):
83-
return True
84-
if current_status in ("error", "ignore") and not force:
82+
if current_status in ("scheduled", "reserved", "success") or (
83+
current_status in ("error", "ignore") and not force
84+
):
8585
return False
8686

8787
job = dict(
@@ -109,9 +109,14 @@ def reserve(self, table_name, key):
109109
:param key: the dict of the job's primary key
110110
:return: True if reserved job successfully. False = the jobs is already taken
111111
"""
112+
job_key = dict(table_name=table_name, key_hash=key_hash(key))
113+
if self & job_key:
114+
current_status = (self & job_key).fetch1("status")
115+
if current_status != "scheduled":
116+
return False
117+
112118
job = dict(
113-
table_name=table_name,
114-
key_hash=key_hash(key),
119+
job_key,
115120
status="reserved",
116121
host=platform.node(),
117122
pid=os.getpid(),
@@ -120,11 +125,10 @@ def reserve(self, table_name, key):
120125
user=self._user,
121126
timestamp=datetime.datetime.utcnow(),
122127
)
123-
try:
124-
with config(enable_python_native_blobs=True):
125-
self.insert1(job, ignore_extra_fields=True)
126-
except DuplicateError:
127-
return False
128+
129+
with config(enable_python_native_blobs=True):
130+
self.insert1(job, replace=True, ignore_extra_fields=True)
131+
128132
return True
129133

130134
def ignore(self, table_name, key, message=""):

datajoint/schemas.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from .diagram import Diagram, _get_tier
99
from .settings import config
1010
from .errors import DataJointError, AccessError
11-
from .jobs import JobTable, JobConfigTable
11+
from .jobs import JobTable
1212
from .external import ExternalMapping
1313
from .heading import Heading
1414
from .utils import user_choice, to_camel_case

tests/test_autopopulate.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def tearDown(self):
2424
self.trial.Condition.delete_quick()
2525
self.trial.delete_quick()
2626
self.experiment.delete_quick()
27+
schema.schema.jobs.delete_quick()
2728

2829
def test_populate(self):
2930
# test simple populate
@@ -70,6 +71,73 @@ def test_populate_with_success_count(self):
7071
)
7172
assert_equal(len(self.trial.key_source & self.trial), success_count)
7273

74+
def test_schedule_jobs(self):
75+
assert_true(self.subject, "root tables are empty")
76+
assert_false(self.experiment, "table already filled?")
77+
# test schedule jobs
78+
self.experiment.schedule_jobs()
79+
assert_true(
80+
len(
81+
schema.schema.jobs
82+
& {"table_name": self.experiment.table_name, "status": "scheduled"}
83+
)
84+
== len(self.experiment.key_source),
85+
"failed scheduling jobs",
86+
)
87+
# test executing jobs
88+
self.experiment.populate(reserve_jobs=True, schedule_jobs=False)
89+
assert_true(
90+
len(
91+
schema.schema.jobs
92+
& {"table_name": self.experiment.table_name, "status": "success"}
93+
)
94+
== len(self.experiment.key_source),
95+
"failed executing jobs",
96+
)
97+
# test schedule and execute jobs with restriction
98+
restriction = self.subject.proj(animal="subject_id").fetch("KEY")[0]
99+
self.trial.schedule_jobs(restriction)
100+
assert_true(
101+
len(
102+
schema.schema.jobs
103+
& {"table_name": self.trial.table_name, "status": "scheduled"}
104+
)
105+
== len(self.trial.key_source & restriction),
106+
"failed scheduling jobs",
107+
)
108+
self.trial.schedule_jobs()
109+
assert_true(
110+
len(
111+
schema.schema.jobs
112+
& {"table_name": self.trial.table_name, "status": "scheduled"}
113+
)
114+
== len(self.trial.key_source),
115+
"failed scheduling jobs",
116+
)
117+
self.trial.populate(restriction, reserve_jobs=True, schedule_jobs=False)
118+
assert_equal(
119+
len(self.trial.key_source & self.trial),
120+
len(self.trial.key_source & restriction),
121+
)
122+
assert_equal(
123+
len(self.trial.key_source - self.trial),
124+
len(self.trial.key_source - restriction),
125+
)
126+
self.trial.populate(reserve_jobs=True, schedule_jobs=False)
127+
assert_equal(
128+
len(self.trial.key_source & self.trial), len(self.trial.key_source)
129+
)
130+
# test purge invalid jobs
131+
restriction["subject_id"] = restriction.pop("animal")
132+
with dj.config(safemode=False):
133+
(self.experiment & restriction).delete()
134+
self.trial.purge_invalid_jobs()
135+
assert_true(
136+
len(schema.schema.jobs & {"table_name": self.trial.table_name})
137+
== len(self.trial.key_source),
138+
"failed purging invalid jobs",
139+
)
140+
73141
def test_populate_exclude_error_and_ignore_jobs(self):
74142
# test simple populate
75143
assert_true(self.subject, "root tables are empty")

0 commit comments

Comments
 (0)