-
Notifications
You must be signed in to change notification settings - Fork 96
Expand file tree
/
Copy pathtest_aggr_regressions.py
More file actions
249 lines (204 loc) · 7.69 KB
/
test_aggr_regressions.py
File metadata and controls
249 lines (204 loc) · 7.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""
Regression tests for issues 386, 449, 484, and 558 — all related to processing complex aggregations and projections.
"""
import pytest
import datajoint as dj
from tests.schema_aggr_regress import LOCALS_AGGR_REGRESS, A, B, Q, R, S, X
from tests.schema_uuid import Item, Topic
@pytest.fixture(scope="function")
def schema_aggr_reg(connection_test, prefix):
schema = dj.Schema(
prefix + "_aggr_regress",
context=LOCALS_AGGR_REGRESS,
connection=connection_test,
)
schema(R)
schema(Q)
schema(S)
yield schema
schema.drop()
@pytest.fixture(scope="function")
def schema_aggr_reg_with_abx(connection_test, prefix):
schema = dj.Schema(
prefix + "_aggr_regress_with_abx",
context=LOCALS_AGGR_REGRESS,
connection=connection_test,
)
schema(R)
schema(Q)
schema(S)
schema(A)
schema(B)
schema(X)
yield schema
schema.drop()
def test_issue386(schema_aggr_reg):
"""
--------------- ISSUE 386 -------------------
Issue 386 resulted from the loss of aggregated attributes when the aggregation was used as the restrictor
Q & (R.aggr(S, n='count(*)') & 'n=2')
Error: Unknown column 'n' in HAVING
"""
result = R.aggr(S, n="count(*)") & "n=10"
result = Q & result
result.to_dicts()
def test_issue449(schema_aggr_reg):
"""
---------------- ISSUE 449 ------------------
Issue 449 arises from incorrect group by attributes after joining with a dj.U()
Note: dj.U() * table pattern is no longer supported in 2.0, use dj.U() & table instead
"""
result = dj.U("n") & R.aggr(S, n="max(s)")
result.to_dicts()
def test_issue484(schema_aggr_reg):
"""
---------------- ISSUE 484 -----------------
Issue 484
"""
q = dj.U().aggr(S, n="max(s)")
q.to_arrays("n")
q.fetch1("n")
q = dj.U().aggr(S, n="avg(s)")
result = dj.U().aggr(q, m="max(n)")
result.to_dicts()
def test_union_join(schema_aggr_reg_with_abx):
"""
This test fails if it runs after TestIssue558.
https://github.com/datajoint/datajoint-python/issues/930
"""
A.insert(zip([100, 200, 300, 400, 500, 600]))
B.insert([(100, 11), (200, 22), (300, 33), (400, 44)])
q1 = B & "id < 300"
q2 = B & "id > 300"
expected_data = [
{"id": 0, "id2": 5},
{"id": 1, "id2": 6},
{"id": 2, "id2": 7},
{"id": 3, "id2": 8},
{"id": 4, "id2": 9},
{"id": 100, "id2": 11},
{"id": 200, "id2": 22},
{"id": 400, "id2": 44},
]
assert ((q1 + q2) * A).to_dicts() == expected_data
class TestIssue558:
"""
--------------- ISSUE 558 ------------------
Issue 558 resulted from the fact that DataJoint saves subqueries and often combines a restriction followed
by a projection into a single SELECT statement, which in several unusual cases produces unexpected results.
"""
def test_issue558_part1(self, schema_aggr_reg_with_abx):
q = (A - B).proj(id2="3")
assert len(A - B) == len(q)
def test_issue558_part2(self, schema_aggr_reg_with_abx):
d = dict(id=3, id2=5)
assert len(X & d) == len((X & d).proj(id2="3"))
def test_left_join_invalid_raises_error(schema_uuid):
"""Left join requires A → B. Topic ↛ Item, so this should raise an error."""
from datajoint.errors import DataJointError
# Clean up from previous tests
Item().delete_quick()
Topic().delete_quick()
Topic().add("jeff")
Item.populate()
with pytest.raises(DataJointError) as exc_info:
Topic.join(Item, left=True)
assert "left operand to determine" in str(exc_info.value).lower()
def test_left_join_valid(schema_uuid):
"""Left join where A → B: Item → Topic (topic_id is in Item)."""
# Clean up from previous tests
Item().delete_quick()
Topic().delete_quick()
Topic().add("jeff")
Item.populate()
Topic().add("jeff2") # Topic without Items
# Item.join(Topic, left=True) is valid because Item → Topic
q = Item.join(Topic, left=True)
qf = q.to_arrays()
assert len(q) == len(qf)
# All Items should have matching Topics since they were populated from Topics
assert len(q) == len(Item())
def test_extend_valid(schema_uuid):
"""extend() is an alias for join(left=True) when A → B."""
# Clean up from previous tests
Item().delete_quick()
Topic().delete_quick()
Topic().add("alice")
Item.populate()
# Item → Topic (topic_id is in Item), so extend is valid
q_extend = Item.extend(Topic)
q_left_join = Item.join(Topic, left=True)
# Should produce identical results
assert len(q_extend) == len(q_left_join)
assert set(q_extend.heading.names) == set(q_left_join.heading.names)
assert q_extend.primary_key == q_left_join.primary_key
def test_extend_invalid_raises_error(schema_uuid):
"""extend() requires A → B. Topic ↛ Item, so this should raise an error."""
from datajoint.errors import DataJointError
# Clean up from previous tests
Item().delete_quick()
Topic().delete_quick()
Topic().add("bob")
Item.populate()
# Topic ↛ Item (item_id not in Topic), so extend should fail
with pytest.raises(DataJointError) as exc_info:
Topic.extend(Item)
assert "left operand to determine" in str(exc_info.value).lower()
class TestBoolMethod:
"""
Tests for __bool__ method on Aggregation and Union (issue #1234).
bool(query) should return True if query has rows, False if empty.
"""
def test_aggregation_bool_with_results(self, schema_aggr_reg_with_abx):
"""Aggregation with results should be truthy."""
A.insert([(1,), (2,), (3,)])
B.insert([(1, 10), (1, 20), (2, 30)])
aggr = A.aggr(B, count="count(id2)")
assert bool(aggr) is True
assert len(aggr) > 0
def test_aggregation_bool_empty(self, schema_aggr_reg_with_abx):
"""Aggregation with no results should be falsy."""
A.insert([(1,), (2,), (3,)])
B.insert([(1, 10), (1, 20), (2, 30)])
# Restrict to non-existent entry
aggr = (A & "id=999").aggr(B, count="count(id2)")
assert bool(aggr) is False
assert len(aggr) == 0
def test_aggregation_bool_matches_len(self, schema_aggr_reg_with_abx):
"""bool(aggr) should equal len(aggr) > 0."""
A.insert([(10,), (20,)])
B.insert([(10, 100)])
# With results
aggr_has = A.aggr(B, count="count(id2)")
assert bool(aggr_has) == (len(aggr_has) > 0)
# Without results
aggr_empty = (A & "id=999").aggr(B, count="count(id2)")
assert bool(aggr_empty) == (len(aggr_empty) > 0)
def test_union_bool_with_results(self, schema_aggr_reg_with_abx):
"""Union with results should be truthy."""
A.insert([(100,), (200,)])
B.insert([(100, 1), (200, 2)])
q1 = B & "id=100"
q2 = B & "id=200"
union = q1 + q2
assert bool(union) is True
assert len(union) > 0
def test_union_bool_empty(self, schema_aggr_reg_with_abx):
"""Union with no results should be falsy."""
A.insert([(100,), (200,)])
B.insert([(100, 1), (200, 2)])
q1 = B & "id=999"
q2 = B & "id=998"
union = q1 + q2
assert bool(union) is False
assert len(union) == 0
def test_union_bool_matches_len(self, schema_aggr_reg_with_abx):
"""bool(union) should equal len(union) > 0."""
A.insert([(100,), (200,)])
B.insert([(100, 1)])
# With results
union_has = (B & "id=100") + (B & "id=100")
assert bool(union_has) == (len(union_has) > 0)
# Without results
union_empty = (B & "id=999") + (B & "id=998")
assert bool(union_empty) == (len(union_empty) > 0)