-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcel_sql_builder.py
More file actions
395 lines (327 loc) · 13.2 KB
/
cel_sql_builder.py
File metadata and controls
395 lines (327 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
"""SQL Builder module for CEL Domain SQL generation.
This module provides composable SQL query building utilities using Odoo's SQL class.
All methods return SQL objects or None if conversion is not possible.
All subqueries use expression.expression() to include record rules automatically.
Per SPEC_SQL_SCALABILITY.md v2.0
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
from odoo.tools.sql import SQL
if TYPE_CHECKING:
from odoo.api import Environment
_logger = logging.getLogger("odoo.addons.spp_cel_domain.sql_builder")
# Operators supported for SQL term generation
SUPPORTED_COMPARISON_OPS = {"=", "!=", ">", ">=", "<", "<=", "=="}
SUPPORTED_LIST_OPS = {"in", "not in"}
UNSUPPORTED_OPS = {"like", "ilike", "=like", "=ilike", "child_of", "parent_of"}
class SQLBuilder:
"""Composable SQL query builder using Odoo's SQL class.
All methods return SQL objects or None if conversion not possible.
All subqueries use expression.expression() to include record rules.
"""
def __init__(self, env: Environment):
self.env = env
# =========================================================================
# Term Builder
# =========================================================================
def term(self, alias: str, field: str, op: str, value: Any) -> SQL | None:
"""Convert a single comparison to SQL.
Supported operators: =, !=, >, >=, <, <=, in, not in
NULL handling: = None becomes IS NULL, != None becomes IS NOT NULL
Returns None for unsupported operators (like, ilike, child_of).
Examples:
term("m", "is_ended", "=", False) -> SQL("m.is_ended = %s", False)
term("m", "status", "!=", "ended") -> SQL("m.status != %s", "ended")
term("m", "count", ">=", 5) -> SQL("m.count >= %s", 5)
term("m", "field", "=", None) -> SQL("m.field IS NULL")
term("m", "ids", "in", [1,2,3]) -> SQL("m.ids IN %s", (1,2,3))
term("m", "name", "like", "%x%") -> None (unsupported)
"""
# Normalize operator
normalized_op = "=" if op == "==" else op
# Check for unsupported operators
if normalized_op in UNSUPPORTED_OPS:
return None
# Validate operator is one we support
if normalized_op not in SUPPORTED_COMPARISON_OPS and normalized_op not in SUPPORTED_LIST_OPS:
return None
# Build the qualified field reference
field_ref = SQL("%s.%s", SQL.identifier(alias), SQL.identifier(field))
# Handle NULL comparisons
if value is None:
if normalized_op == "=":
return SQL("%s IS NULL", field_ref)
elif normalized_op == "!=":
return SQL("%s IS NOT NULL", field_ref)
else:
# Other ops with NULL don't make sense
return None
# Handle list operators
if normalized_op == "in":
if not value:
# Empty list: no records match
return SQL("1=0")
# Convert to tuple for SQL IN clause
return SQL("%s IN %s", field_ref, tuple(value))
if normalized_op == "not in":
if not value:
# Empty list: all records match
return SQL("1=1")
return SQL("%s NOT IN %s", field_ref, tuple(value))
# Handle regular comparison operators
op_sql_map = {
"=": "=",
"!=": "!=",
">": ">",
">=": ">=",
"<": "<",
"<=": "<=",
}
sql_op = op_sql_map.get(normalized_op)
if sql_op is None:
return None
return SQL("%s " + sql_op + " %s", field_ref, value)
# =========================================================================
# WHERE Clause Builder
# =========================================================================
def where_and(self, conditions: list[SQL]) -> SQL:
"""Combine conditions with AND.
Empty list returns SQL("1=1").
Single condition returns that condition.
Multiple conditions combined with AND.
"""
if not conditions:
return SQL("1=1")
if len(conditions) == 1:
return conditions[0]
result = conditions[0]
for cond in conditions[1:]:
result = SQL("%s AND %s", result, cond)
return result
def where_or(self, conditions: list[SQL]) -> SQL:
"""Combine conditions with OR.
Empty list returns SQL("1=0") (nothing matches).
Single condition returns that condition.
Multiple conditions combined with OR.
"""
if not conditions:
return SQL("1=0")
if len(conditions) == 1:
return conditions[0]
result = conditions[0]
for cond in conditions[1:]:
result = SQL("(%s OR %s)", result, cond)
return result
# =========================================================================
# SELECT Builders
# =========================================================================
def select_ids_from_domain(self, model: str, domain: list[Any]) -> SQL | None:
"""Generate SELECT id FROM model WHERE domain.
Uses expression.expression() to include record rules.
Uses Query.select() to include all necessary JOINs (e.g. for
related field lookups like gender_id.uri).
Returns None if domain cannot be converted.
This is the PRIMARY method for generating subqueries.
All other methods should use this for model references.
"""
try:
from odoo.osv import expression as osv_expression
Model = self.env[model]
table = Model._table
# Build the expression (applies record rules even for empty domain)
expr = osv_expression.expression(model=Model, domain=domain or [])
query = expr.query
# Use query.select() to get the full SQL including FROM clause
# with all JOINs (needed for related field domains like gender_id.uri)
select_sql = query.select(SQL.identifier(table, "id"))
return SQL("(%s)", select_sql)
except Exception as e:
_logger.debug("[SQLBuilder] Failed to convert domain to SQL: %s", e)
return None
def select_distinct_column(self, table: str, alias: str, column: str, where: SQL) -> SQL:
"""SELECT DISTINCT column FROM table WHERE ...
Used for ExistsThrough to get parent IDs.
"""
return SQL(
"(SELECT DISTINCT %s.%s FROM %s %s WHERE %s)",
SQL.identifier(alias),
SQL.identifier(column),
SQL.identifier(table),
SQL.identifier(alias),
where,
)
def select_grouped_count(
self,
table: str,
alias: str,
group_col: str,
where: SQL,
having_op: str,
having_value: int,
) -> SQL:
"""SELECT with GROUP BY and HAVING COUNT(*).
Used for CountThrough.
Example:
select_grouped_count(
"spp_group_membership", "m", "group",
where=SQL("m.is_ended = false"),
having_op=">=", having_value=2
)
-> SELECT m.group FROM spp_group_membership m
WHERE m.is_ended = false
GROUP BY m.group
HAVING COUNT(*) >= 2
"""
# Normalize operator
op_map = {
"==": "=",
"=": "=",
"!=": "!=",
">": ">",
">=": ">=",
"<": "<",
"<=": "<=",
}
sql_op = op_map.get(having_op)
if sql_op is None:
raise ValueError(f"Unsupported operator for HAVING: {having_op}")
return SQL(
"(SELECT %s.%s FROM %s %s WHERE %s GROUP BY %s.%s HAVING COUNT(*) %s %s)",
SQL.identifier(alias),
SQL.identifier(group_col),
SQL.identifier(table),
SQL.identifier(alias),
where,
SQL.identifier(alias),
SQL.identifier(group_col),
SQL(sql_op),
having_value,
)
def select_grouped_aggregate(
self,
through_table: str,
through_alias: str,
child_subquery: SQL,
parent_col: str,
link_col: str,
agg_func: str,
agg_field: str,
child_table: str,
having_op: str,
having_value: float | int,
where: SQL | None = None,
) -> SQL:
"""SELECT with aggregate function on joined data.
Used for FieldAggregateThrough (sum, avg, min, max).
IMPORTANT: child_subquery must come from select_ids_from_domain()
to ensure record rules are applied.
Uses CTE pattern for clarity and correctness:
WITH allowed_members AS (
SELECT id, {agg_field} FROM {child_table}
WHERE id IN {child_subquery}
)
SELECT m.{parent_col} FROM {through_table} m
JOIN allowed_members c ON c.id = m.{link_col}
WHERE {where}
GROUP BY m.{parent_col}
HAVING {agg_func}(c.{agg_field}) {having_op} {having_value}
"""
# Validate aggregate function
agg_func = agg_func.upper()
if agg_func not in {"SUM", "AVG", "MIN", "MAX", "COUNT"}:
raise ValueError(f"Unsupported aggregate function: {agg_func}")
# Normalize operator
op_map = {
"==": "=",
"=": "=",
"!=": "!=",
">": ">",
">=": ">=",
"<": "<",
"<=": "<=",
}
sql_op = op_map.get(having_op)
if sql_op is None:
raise ValueError(f"Unsupported operator for HAVING: {having_op}")
# Build WHERE clause for through table
where_clause = where if where else SQL("1=1")
# Use CTE pattern for cleaner SQL and correct record rule application
# COALESCE handles NULL values from aggregate functions (e.g., SUM(NULL) = NULL)
return SQL(
"""(WITH allowed_children AS (
SELECT id, %s FROM %s WHERE id IN %s
)
SELECT %s.%s FROM %s %s
JOIN allowed_children c ON c.id = %s.%s
WHERE %s
GROUP BY %s.%s
HAVING COALESCE(%s(c.%s), 0) %s %s)""",
SQL.identifier(agg_field),
SQL.identifier(child_table),
child_subquery,
SQL.identifier(through_alias),
SQL.identifier(parent_col),
SQL.identifier(through_table),
SQL.identifier(through_alias),
SQL.identifier(through_alias),
SQL.identifier(link_col),
where_clause,
SQL.identifier(through_alias),
SQL.identifier(parent_col),
SQL(agg_func),
SQL.identifier(agg_field),
SQL(sql_op),
having_value,
)
# =========================================================================
# Set Operations
# =========================================================================
def intersect(self, queries: list[SQL]) -> SQL:
"""Combine queries with INTERSECT (for AND)."""
if not queries:
raise ValueError("Cannot INTERSECT empty list of queries")
if len(queries) == 1:
return queries[0]
result = queries[0]
for query in queries[1:]:
result = SQL("(%s INTERSECT %s)", result, query)
return result
def union(self, queries: list[SQL]) -> SQL:
"""Combine queries with UNION (for OR)."""
if not queries:
raise ValueError("Cannot UNION empty list of queries")
if len(queries) == 1:
return queries[0]
result = queries[0]
for query in queries[1:]:
result = SQL("(%s UNION %s)", result, query)
return result
# =========================================================================
# Helpers for Default Domain Processing
# =========================================================================
def build_default_domain_sql(self, alias: str, default_domain: list[Any] | None) -> tuple[list[SQL], bool]:
"""Convert default_domain list to SQL terms.
Returns:
(list of SQL terms, success flag)
If success is False, caller should fall back to Python path.
"""
if not default_domain:
return [], True
terms: list[SQL] = []
for item in default_domain:
if isinstance(item, tuple | list) and len(item) == 3:
field, op, value = item
term_sql = self.term(alias, field, op, value)
if term_sql is None:
# Unsupported operator
return [], False
terms.append(term_sql)
elif item in ("&", "|", "!"):
# Domain operators - we only support simple AND chains
if item != "&":
return [], False
else:
# Unknown format
return [], False
return terms, True