forked from datajoint/datajoint-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_autopopulate.py
More file actions
149 lines (118 loc) · 4.45 KB
/
test_autopopulate.py
File metadata and controls
149 lines (118 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import platform
import pymysql
import pytest
import datajoint as dj
from datajoint import DataJointError
from . import schema
def test_populate(clean_autopopulate, trial, subject, experiment, ephys, channel):
# test simple populate
assert subject, "root tables are empty"
assert not experiment, "table already filled?"
experiment.populate()
assert len(experiment) == len(subject) * experiment.fake_experiments_per_subject
# test restricted populate
assert not trial, "table already filled?"
restriction = subject.proj(animal="subject_id").fetch("KEY")[0]
d = trial.connection.dependencies
d.load()
trial.populate(restriction)
assert trial, "table was not populated"
key_source = trial.key_source
assert len(key_source & trial) == len(key_source & restriction)
assert len(key_source - trial) == len(key_source - restriction)
# test subtable populate
assert not ephys
assert not channel
ephys.populate()
assert ephys
assert channel
def test_populate_with_success_count(clean_autopopulate, subject, experiment, trial):
# test simple populate
assert subject, "root tables are empty"
assert not experiment, "table already filled?"
ret = experiment.populate()
success_count = ret["success_count"]
assert len(experiment.key_source & experiment) == success_count
# test restricted populate
assert not trial, "table already filled?"
restriction = subject.proj(animal="subject_id").fetch("KEY")[0]
d = trial.connection.dependencies
d.load()
ret = trial.populate(restriction, suppress_errors=True)
success_count = ret["success_count"]
assert len(trial.key_source & trial) == success_count
def test_populate_key_list(clean_autopopulate, subject, experiment, trial):
# test simple populate
assert subject, "root tables are empty"
assert not experiment, "table already filled?"
keys = experiment.key_source.fetch("KEY", order_by="KEY")
n = 3
assert len(keys) > n
keys = keys[:n]
ret = experiment.populate(keys=keys)
assert n == ret["success_count"]
def test_populate_exclude_error_and_ignore_jobs(
clean_autopopulate, schema_any, subject, experiment
):
# test simple populate
assert subject, "root tables are empty"
assert not experiment, "table already filled?"
keys = experiment.key_source.fetch("KEY", limit=2)
for idx, key in enumerate(keys):
if idx == 0:
schema_any.jobs.ignore(experiment.table_name, key)
else:
schema_any.jobs.error(experiment.table_name, key, "")
experiment.populate(reserve_jobs=True)
assert len(experiment.key_source & experiment) == len(experiment.key_source) - 2
def test_allow_direct_insert(clean_autopopulate, subject, experiment):
assert subject, "root tables are empty"
key = subject.fetch("KEY", limit=1)[0]
key["experiment_id"] = 1000
key["experiment_date"] = "2018-10-30"
experiment.insert1(key, allow_direct_insert=True)
@pytest.mark.skipif(
platform.system() == "Darwin",
reason="multiprocessing with spawn method (macOS default) cannot pickle thread locks",
)
@pytest.mark.parametrize("processes", [None, 2])
def test_multi_processing(clean_autopopulate, subject, experiment, processes):
assert subject, "root tables are empty"
assert not experiment, "table already filled?"
experiment.populate(processes=processes)
assert len(experiment) == len(subject) * experiment.fake_experiments_per_subject
def test_allow_insert(clean_autopopulate, subject, experiment):
assert subject, "root tables are empty"
key = subject.fetch("KEY")[0]
key["experiment_id"] = 1001
key["experiment_date"] = "2018-10-30"
with pytest.raises(DataJointError):
experiment.insert1(key)
def test_load_dependencies(prefix):
schema = dj.Schema(f"{prefix}_load_dependencies_populate")
@schema
class ImageSource(dj.Lookup):
definition = """
image_source_id: int
"""
contents = [(0,)]
@schema
class Image(dj.Imported):
definition = """
-> ImageSource
---
image_data: longblob
"""
def make(self, key):
self.insert1(dict(key, image_data=dict()))
Image.populate()
@schema
class Crop(dj.Computed):
definition = """
-> Image
---
crop_image: longblob
"""
def make(self, key):
self.insert1(dict(key, crop_image=dict()))
Crop.populate()