-
Notifications
You must be signed in to change notification settings - Fork 96
Expand file tree
/
Copy pathtest_jobs.py
More file actions
202 lines (153 loc) · 7.5 KB
/
test_jobs.py
File metadata and controls
202 lines (153 loc) · 7.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""Tests for per-table Job management (AutoPopulate 2.0)."""
import random
import string
import datajoint as dj
from datajoint.jobs import ERROR_MESSAGE_LENGTH, TRUNCATION_APPENDIX
from tests import schema
def test_reserve_job(clean_jobs, subject, experiment):
"""Test job reservation, completion, and error workflows."""
assert subject
# Refresh jobs to create pending entries
experiment.jobs.refresh()
pending_count = len(experiment.jobs.pending)
assert pending_count > 0, "no pending jobs created"
# Reserve all pending jobs
keys = experiment.jobs.pending.keys()
for key in keys:
assert experiment.jobs.reserve(key), "failed to reserve a job"
# Try to reserve already-reserved jobs - should fail
for key in keys:
assert not experiment.jobs.reserve(key), "failed to respect reservation"
# Complete jobs
for key in keys:
experiment.jobs.complete(key)
# Check jobs are completed (or deleted if keep_completed=False)
if dj.config.jobs.keep_completed:
assert len(experiment.jobs.completed) == len(keys)
else:
assert len(experiment.jobs) == 0, "failed to free jobs"
# Refresh again to create new pending jobs
experiment.jobs.refresh()
keys = experiment.jobs.pending.keys()
# Reserve and mark as error
for key in keys:
experiment.jobs.reserve(key)
experiment.jobs.error(key, "error message")
# Try to reserve error jobs - should fail
for key in keys:
assert not experiment.jobs.reserve(key), "failed to ignore error jobs"
# Clear error jobs
experiment.jobs.errors.delete()
assert len(experiment.jobs) == 0, "failed to clear error jobs"
def test_job_status_filters(clean_jobs, subject, experiment):
"""Test job status filter properties."""
# Refresh to create pending jobs
experiment.jobs.refresh()
# All should be pending
total = len(experiment.jobs)
assert total > 0
assert len(experiment.jobs.pending) == total
assert len(experiment.jobs.reserved) == 0
assert len(experiment.jobs.errors) == 0
# Reserve some jobs
keys = experiment.jobs.pending.keys(limit=2)
for key in keys:
experiment.jobs.reserve(key)
assert len(experiment.jobs.reserved) == 2
# Mark one as error
experiment.jobs.error(keys[0], "test error")
assert len(experiment.jobs.errors) == 1
def test_sigint(clean_jobs, schema_any):
"""Test that KeyboardInterrupt is recorded as error."""
sig_int_table = schema.SigIntTable()
try:
sig_int_table.populate(reserve_jobs=True)
except KeyboardInterrupt:
pass
assert len(sig_int_table.jobs.errors) > 0, "SigInt job error not recorded"
status, error_message = sig_int_table.jobs.errors.fetch1("status", "error_message")
assert status == "error"
assert "KeyboardInterrupt" in error_message
def test_sigterm(clean_jobs, schema_any):
"""Test that SystemExit is recorded as error."""
sig_term_table = schema.SigTermTable()
try:
sig_term_table.populate(reserve_jobs=True)
except SystemExit:
pass
assert len(sig_term_table.jobs.errors) > 0, "SigTerm job error not recorded"
status, error_message = sig_term_table.jobs.errors.fetch1("status", "error_message")
assert status == "error"
assert "SIGTERM" in error_message or "SystemExit" in error_message
def test_suppress_dj_errors(clean_jobs, schema_any):
"""Test that DataJoint errors are suppressible without native py blobs."""
error_class = schema.ErrorClass()
with dj.config.override(enable_python_native_blobs=False):
error_class.populate(reserve_jobs=True, suppress_errors=True)
assert len(schema.DjExceptionName()) == len(error_class.jobs.errors) > 0
def test_long_error_message(clean_jobs, subject, experiment):
"""Test that long error messages are truncated."""
# Create long and short error messages
long_error_message = "".join(random.choice(string.ascii_letters) for _ in range(ERROR_MESSAGE_LENGTH + 100))
short_error_message = "".join(random.choice(string.ascii_letters) for _ in range(ERROR_MESSAGE_LENGTH // 2))
# Refresh to create pending jobs
experiment.jobs.refresh()
key = experiment.jobs.pending.keys(limit=1)[0]
# Test long error message truncation
experiment.jobs.reserve(key)
experiment.jobs.error(key, long_error_message)
error_message = experiment.jobs.errors.fetch1("error_message")
assert len(error_message) == ERROR_MESSAGE_LENGTH, "error message is longer than max allowed"
assert error_message.endswith(TRUNCATION_APPENDIX), "appropriate ending missing for truncated error message"
experiment.jobs.delete()
# Refresh and test short error message (not truncated)
experiment.jobs.refresh()
key = experiment.jobs.pending.keys(limit=1)[0]
experiment.jobs.reserve(key)
experiment.jobs.error(key, short_error_message)
error_message = experiment.jobs.errors.fetch1("error_message")
assert error_message == short_error_message, "error messages do not agree"
assert not error_message.endswith(TRUNCATION_APPENDIX), "error message should not be truncated"
def test_long_error_stack(clean_jobs, subject, experiment):
"""Test that long error stacks are stored correctly."""
# Create long error stack
STACK_SIZE = 89942 # Does not fit into small blob (should be 64k, but found to be higher)
long_error_stack = "".join(random.choice(string.ascii_letters) for _ in range(STACK_SIZE))
# Refresh to create pending jobs
experiment.jobs.refresh()
key = experiment.jobs.pending.keys(limit=1)[0]
# Test long error stack
experiment.jobs.reserve(key)
experiment.jobs.error(key, "error message", long_error_stack)
error_stack = experiment.jobs.errors.fetch1("error_stack")
assert error_stack == long_error_stack, "error stacks do not agree"
def test_populate_reserve_jobs_with_keep_completed(clean_jobs, subject, experiment):
"""Test populate(reserve_jobs=True) with keep_completed=True.
Regression test for https://github.com/datajoint/datajoint-python/issues/1379
"""
with dj.config.override(jobs={"keep_completed": True, "add_job_metadata": True}):
# Should not raise DataJointError about semantic matching
experiment.populate(reserve_jobs=True)
# Verify jobs completed successfully
assert len(experiment) > 0, "No data was populated"
assert len(experiment.jobs.errors) == 0, "Unexpected errors during populate"
# With keep_completed=True, completed jobs should be retained
assert len(experiment.jobs.completed) > 0, "Completed jobs not retained"
def test_populate_reserve_jobs_keep_completed_repend(clean_jobs, subject, experiment):
"""Test that completed jobs are re-pended when results are deleted.
Regression test for https://github.com/datajoint/datajoint-python/issues/1379
"""
with dj.config.override(jobs={"keep_completed": True, "add_job_metadata": True}):
# First populate
experiment.populate(reserve_jobs=True)
initial_count = len(experiment)
completed_count = len(experiment.jobs.completed)
assert initial_count > 0, "No data was populated"
assert completed_count > 0, "No completed jobs"
# Delete some results
first_key = experiment.keys(limit=1)[0]
(experiment & first_key).delete()
# Refresh should re-pend the deleted job
experiment.jobs.refresh()
# The job for the deleted entry should be pending again
assert len(experiment.jobs.pending) >= 1, "Deleted job not re-pended"