-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathquery_builder.py
More file actions
584 lines (520 loc) · 22.2 KB
/
query_builder.py
File metadata and controls
584 lines (520 loc) · 22.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
"""RediSearch query builder - generates query syntax from analyzed queries."""
from __future__ import annotations
import re
import warnings
# Redis default stopwords - these are not indexed by default
# See: https://redis.io/docs/latest/develop/ai/search-and-query/advanced-concepts/stopwords/
REDIS_DEFAULT_STOPWORDS = frozenset(
{
"a",
"is",
"the",
"an",
"and",
"are",
"as",
"at",
"be",
"but",
"by",
"for",
"if",
"in",
"into",
"it",
"no",
"not",
"of",
"on",
"or",
"such",
"that",
"their",
"then",
"there",
"these",
"they",
"this",
"to",
"was",
"will",
"with",
}
)
class QueryBuilder:
"""Builds RediSearch query syntax from conditions."""
# Characters that need escaping in TAG values.
# `|` separates values inside an IN list at the syntax level, so a literal
# pipe inside a value must be escaped; otherwise `status = 'a|b'` parses
# as `status IN ('a', 'b')`.
TAG_SPECIAL_CHARS = r".,<>{}[]\"':;!@#$%^&*()-+=~|"
# Characters that have special meaning in RediSearch free-text queries
# (outside double-quoted phrases). Must be escaped with backslash.
# Includes double-quote to prevent starting/ending quoted phrases.
TEXT_QUERY_SPECIAL_CHARS = set('\\|-()"@~!{}[]^$><=;:*+')
@classmethod
def _escape_fulltext_term(cls, term: str) -> str:
"""Escape characters that have special meaning in RediSearch free-text queries.
Applied to individual terms used outside of double-quoted phrases (e.g.,
in parenthesized FULLTEXT expressions, LIKE, FUZZY) so that user input
containing RediSearch operator characters does not alter query semantics
or produce syntax errors.
"""
result = []
for char in term:
if char in cls.TEXT_QUERY_SPECIAL_CHARS:
result.append(f"\\{char}")
else:
result.append(char)
return "".join(result)
@staticmethod
def _escape_text_value(value: str) -> str:
"""Escape characters that are special inside RediSearch double-quoted phrases.
Backslashes and double quotes must be escaped so they don't break
the query syntax or alter its meaning.
"""
# Escape backslashes first (so we don't double-escape the quote escapes),
# then escape double quotes.
return value.replace("\\", "\\\\").replace('"', '\\"')
@classmethod
def _escape_text_equality_term(cls, term: str) -> str:
"""Escape single-term equality while preserving legacy wildcard semantics.
For backward compatibility, TEXT equality on a single token continues to
behave like a RediSearch token query instead of an exact quoted phrase.
This preserves wildcard markers like `*` and fuzzy markers like `%term%`,
while still escaping other operator characters.
"""
result = []
for index, char in enumerate(term):
if char == "*" or (char == "~" and index == 0):
result.append(char)
elif char in cls.TEXT_QUERY_SPECIAL_CHARS:
result.append(f"\\{char}")
else:
result.append(char)
return "".join(result)
def build_text_condition(
self,
field: str | list[str],
operator: str,
value: str,
negated: bool = False,
*,
fuzzy_level: int | None = None,
slop: int | None = None,
inorder: bool = False,
) -> str:
"""Build query syntax for TEXT field conditions.
Args:
field: Field name or list of field names for multi-field search.
operator: One of =, !=, FULLTEXT, LIKE, FUZZY.
- = / !=: single-term token match, or multi-word exact phrase.
- FULLTEXT: tokenized keyword search with stopword filtering.
- LIKE: prefix/suffix/infix pattern (SQL % → RediSearch *).
- FUZZY: Levenshtein fuzzy match.
value: The search term or pattern.
negated: If True, prefix with - for negation.
fuzzy_level: Levenshtein distance for FUZZY (1, 2, or 3). Default 1.
slop: Maximum distance between terms for proximity search.
inorder: If True with slop, require terms in order.
Returns:
RediSearch query syntax like @field:term or @field:"exact phrase".
"""
# Derive negation from both the flag and the operator itself,
# consistent with how build_tag_condition handles != via operator.
prefix = "-" if negated or operator == "!=" else ""
# Build search_value based on operator — shared by single- and multi-field paths
if operator == "LIKE":
# LIKE '' produces no token and no wildcard, so RediSearch emits a
# bare `@field:` which is a syntax error at runtime. Reject early.
if value == "":
raise ValueError(
"LIKE pattern must not be empty. Use IS NULL / IS NOT NULL "
"to test for field absence, or '%' to match any value."
)
# Escape special chars in the non-wildcard portion, then convert % → *
# Split on %, escape each segment, rejoin with *
parts = value.split("%")
escaped_parts = [self._escape_fulltext_term(p) for p in parts]
search_value = "*".join(escaped_parts)
# If the non-wildcard portion contains spaces, wrap in parens
# so all tokens stay scoped to the field (e.g. '%gaming laptop%'
# → *gaming laptop* needs grouping to avoid token leaking).
non_wildcard = value.strip("%")
if " " in non_wildcard:
search_value = f"({search_value})"
elif operator == "FUZZY":
# Escape special chars before wrapping with % markers
escaped = self._escape_fulltext_term(value)
level = fuzzy_level if fuzzy_level is not None else 1
if level not in (1, 2, 3):
raise ValueError(
f"Fuzzy level must be 1, 2, or 3 (got {level}). "
"RediSearch supports a maximum Levenshtein distance of 3."
)
pct = "%" * level
search_value = f"{pct}{escaped}{pct}"
elif operator in ("=", "!="):
words = value.split()
if len(words) == 1:
search_value = self._escape_text_equality_term(words[0])
else:
# Multi-word equality remains an exact phrase match.
# Strip default stopwords because RediSearch does not index them;
# keeping them in the quoted phrase causes a query-time error
# (e.g. "diagnosing and treating" fails on "and").
# Since the indexer assigns consecutive positions after dropping
# stopwords, the stripped phrase matches correctly.
removed = [w for w in words if w.lower() in REDIS_DEFAULT_STOPWORDS]
filtered = [
w for w in words if w.lower() not in REDIS_DEFAULT_STOPWORDS
]
if removed:
phrase_words = filtered if filtered else words
if filtered:
sw_msg = f"Stopwords {removed} were removed from"
else:
sw_msg = (
f"All tokens in '{value}' are stopwords and may not "
"be indexed in"
)
warnings.warn(
f"{sw_msg} exact phrase '{value}'. "
"By default, Redis does not index stopwords. "
"To include stopwords in your index, create it "
"with STOPWORDS 0.",
UserWarning,
stacklevel=2,
)
else:
phrase_words = words
escaped = self._escape_text_value(" ".join(phrase_words))
search_value = f'"{escaped}"'
elif re.search(r"(?:^|\s+)OR(?:\s+|$)", value):
# OR union within text field: split on uppercase-only OR with
# flexible whitespace, escape each term, join with |.
# Only uppercase OR is treated as a boolean operator; lowercase
# "or" is treated as a regular search term (e.g. "bank or america"
# stays as a multi-word AND search, not bank|america).
# Multi-word operands (e.g. "gaming laptop OR tablet") are wrapped
# in parentheses so each side is an atomic subexpression.
# The regex also matches leading/trailing OR (e.g. "laptop OR"
# or "OR tablet") so that the empty-operand check below catches
# these malformed inputs instead of silently dropping "OR".
or_parts: list[str] = []
all_removed: list[str] = []
for part in re.split(r"(?:^|\s+)OR(?:\s+|$)", value):
words = part.strip().split()
if not words:
raise ValueError(
"Empty operand in OR expression — each side of OR "
"must contain at least one search term."
)
# Filter stopwords from this operand (same logic as
# the multi-word FULLTEXT branch).
removed = [w for w in words if w.lower() in REDIS_DEFAULT_STOPWORDS]
filtered = [
w for w in words if w.lower() not in REDIS_DEFAULT_STOPWORDS
]
if removed:
all_removed.extend(removed)
# Use filtered list if any non-stopword tokens remain;
# otherwise fall back to original words so we don't
# silently produce an empty operand.
effective = filtered if filtered else words
if not effective:
raise ValueError(
"Empty operand in OR expression — each side of OR "
"must contain at least one search term."
)
if len(effective) > 1:
escaped_tokens = []
for w in effective:
if w.startswith("~"):
escaped_tokens.append(
"~" + self._escape_fulltext_term(w[1:])
)
else:
escaped_tokens.append(self._escape_fulltext_term(w))
or_parts.append(f"({' '.join(escaped_tokens)})")
else:
token = effective[0]
if token.startswith("~"):
or_parts.append("~" + self._escape_fulltext_term(token[1:]))
else:
or_parts.append(self._escape_fulltext_term(token))
if all_removed:
warnings.warn(
f"Stopwords {all_removed} were removed from OR "
f"expression '{value}'. By default, Redis does not "
"index stopwords. To include stopwords in your "
"index, create it with STOPWORDS 0.",
UserWarning,
stacklevel=2,
)
search_value = f"({'|'.join(or_parts)})"
elif " " in value:
# FULLTEXT with multi-word: tokenized search with stopword filtering.
# Each term is escaped to prevent accidental operator injection, but a
# leading ~ (optional-term modifier) is preserved as an intentional
# RediSearch operator.
words = value.split()
removed_stopwords = [
w for w in words if w.lower() in REDIS_DEFAULT_STOPWORDS
]
filtered_words = [
w for w in words if w.lower() not in REDIS_DEFAULT_STOPWORDS
]
if removed_stopwords:
if filtered_words:
sw_action = f"Stopwords {removed_stopwords} were removed from"
else:
sw_action = f"All tokens in '{value}' are stopwords and may not be indexed in"
warnings.warn(
f"{sw_action} text search '{value}'. "
"By default, Redis does not index stopwords. "
"To include stopwords in your index, create it "
"with STOPWORDS 0.",
UserWarning,
stacklevel=2,
)
escaped_words = []
for w in filtered_words if filtered_words else words:
if w.startswith("~"):
# Preserve ~ optional-term prefix, escape the rest
escaped_words.append("~" + self._escape_fulltext_term(w[1:]))
else:
escaped_words.append(self._escape_fulltext_term(w))
terms = " ".join(escaped_words)
search_value = f"({terms})"
else:
# Single-word FULLTEXT — escape to prevent accidental operator injection.
# Preserve ~ optional-term prefix (same as multi-word branch).
if value.startswith("~"):
search_value = "~" + self._escape_fulltext_term(value[1:])
else:
search_value = self._escape_fulltext_term(value)
# Handle multi-field search — use computed search_value with multi-field syntax
if isinstance(field, list):
field_str = "|".join(field)
base = f"{prefix}(@{field_str}:{search_value})"
else:
base = f"{prefix}@{field}:{search_value}"
# Append query attributes (slop, inorder) if specified
if slop is not None:
if not isinstance(slop, int) or isinstance(slop, bool) or slop < 0:
raise ValueError(f"slop must be a non-negative integer (got {slop!r})")
attrs = f"$slop: {slop};"
if inorder:
attrs += " $inorder: true;"
base = f"{base} => {{ {attrs} }}"
return base
def _escape_tag_value(self, value: str) -> str:
"""Escape special characters in TAG values."""
result = []
for char in value:
if char in self.TAG_SPECIAL_CHARS:
result.append(f"\\{char}")
else:
result.append(char)
return "".join(result)
def build_tag_condition(
self,
field: str,
operator: str,
value: str | list[str],
negated: bool = False,
) -> str:
"""Build query syntax for TAG field conditions.
Args:
field: Field name.
operator: One of =, !=, IN.
value: Tag value or list of values for IN.
negated: If True, prefix with `-` for negation. Covers NOT IN
and NOT field = .... The `!=` operator is also honored.
Returns:
RediSearch query syntax like @field:{value} or @field:{v1|v2}.
"""
prefix = "-" if negated or operator == "!=" else ""
if isinstance(value, list):
# IN clause - join with |
escaped_values = [self._escape_tag_value(v) for v in value]
tag_str = "|".join(escaped_values)
else:
tag_str = self._escape_tag_value(value)
return f"{prefix}@{field}:{{{tag_str}}}"
def build_numeric_condition(
self,
field: str,
operator: str,
value: int | float | tuple[int | float, int | float],
negated: bool = False,
) -> str:
"""Build query syntax for NUMERIC field conditions.
Args:
field: Field name.
operator: One of =, !=, <, <=, >, >=, BETWEEN.
value: Numeric value or (min, max) tuple for BETWEEN.
negated: If True, prefix the resulting range with - so that
NOT field > x, NOT BETWEEN, etc. are honored. Without this,
NOT on comparison operators was silently dropped.
Returns:
RediSearch query syntax like @field:[min max].
"""
prefix = "-" if negated or operator == "!=" else ""
if operator == "BETWEEN":
if isinstance(value, tuple):
min_val, max_val = value
return f"{prefix}@{field}:[{min_val} {max_val}]"
raise ValueError("BETWEEN operator requires a tuple (min, max)")
elif operator == "=":
return f"{prefix}@{field}:[{value} {value}]"
elif operator == "!=":
return f"{prefix}@{field}:[{value} {value}]"
elif operator == ">":
return f"{prefix}@{field}:[({value} +inf]"
elif operator == ">=":
return f"{prefix}@{field}:[{value} +inf]"
elif operator == "<":
return f"{prefix}@{field}:[-inf ({value}]"
elif operator == "<=":
return f"{prefix}@{field}:[-inf {value}]"
else:
raise ValueError(f"Unknown numeric operator: {operator}")
def build_vector_condition(
self,
field: str,
k: int,
alias: str,
prefilter: str | None = None,
) -> str:
"""Build query syntax for VECTOR KNN search.
Args:
field: Vector field name.
k: Number of nearest neighbors.
alias: Alias for the distance score.
prefilter: Optional pre-filter query string.
Returns:
RediSearch query syntax like =>[KNN k @field $BLOB AS alias].
"""
knn_part = f"=>[KNN {k} @{field} $BLOB AS {alias}]"
if prefilter:
return f"({prefilter}){knn_part}"
return knn_part
def build_geo_filter(
self,
field: str,
lon: float,
lat: float,
radius: float,
unit: str = "km",
) -> str:
"""Build GEOFILTER clause for GEO fields.
Args:
field: GEO field name.
lon: Longitude.
lat: Latitude.
radius: Search radius.
unit: Distance unit (km, m, mi, ft).
Returns:
GEOFILTER clause like "GEOFILTER field lon lat radius unit".
"""
return f"GEOFILTER {field} {lon} {lat} {radius} {unit}"
def build_geo_distance_apply(
self,
field: str,
lon: float,
lat: float,
alias: str,
unit: str = "m",
) -> tuple[str, str]:
"""Build geodistance expression and alias for APPLY.
Args:
field: GEO field name.
lon: Longitude.
lat: Latitude.
alias: Alias for the distance result.
unit: Distance unit for conversion.
Returns:
Tuple of (expression, alias) for use in APPLY clause.
"""
base_expr = f"geodistance(@{field}, {lon}, {lat})"
# geodistance returns meters - convert if needed
# Use consistent conversion factors (same as translator._convert_to_meters)
if unit == "km":
expr = f"({base_expr}/1000)"
elif unit == "mi":
# 1 mile = 1609.344 meters (consistent with translator)
expr = f"({base_expr}/1609.344)"
elif unit == "ft":
# 1 foot = 0.3048 meters, so meters * (1/0.3048) = meters * 3.28084
expr = f"({base_expr}*3.28084)"
else:
expr = base_expr
return (expr, alias)
def combine_conditions(
self,
conditions: list[str],
operator: str = "AND",
) -> str:
"""Combine multiple condition strings with boolean operator.
Args:
conditions: List of query condition strings.
operator: Boolean operator (AND, OR).
Returns:
Combined query string.
"""
if not conditions:
return "*"
if len(conditions) == 1:
return conditions[0]
if operator == "OR":
# OR uses pipe separator - each condition needs parentheses
parenthesized = [
f"({c})" if not c.startswith("(") else c for c in conditions
]
return "(" + "|".join(parenthesized) + ")"
else:
# AND uses space separator
return " ".join(conditions)
def build_query_string(
self,
text_conditions: list[tuple] | None = None,
numeric_conditions: list[tuple] | None = None,
tag_conditions: list[tuple] | None = None,
field_types: dict[str, str] | None = None,
) -> str:
"""Build complete query string from conditions.
Args:
text_conditions: List of (field, operator, value) tuples.
numeric_conditions: List of (field, operator, value) tuples.
tag_conditions: List of (field, operator, value) tuples.
field_types: Dict mapping field names to types.
Returns:
Complete RediSearch query string.
"""
parts = []
# Build text conditions
if text_conditions:
for field, operator, value in text_conditions:
parts.append(self.build_text_condition(field, operator, value))
# Build numeric conditions
if numeric_conditions:
for field, operator, value in numeric_conditions:
parts.append(self.build_numeric_condition(field, operator, value))
# Build tag conditions
if tag_conditions:
for field, operator, value in tag_conditions:
parts.append(self.build_tag_condition(field, operator, value))
return self.combine_conditions(parts, "AND")
def build_missing_condition(self, field: str, *, is_missing: bool) -> str:
"""Build ismissing() query fragment for IS NULL / IS NOT NULL.
Args:
field: Field name (without @ prefix).
is_missing: True for IS NULL (ismissing), False for IS NOT NULL (-ismissing).
Returns:
Query fragment like "ismissing(@field)" or "-ismissing(@field)".
"""
if is_missing:
return f"ismissing(@{field})"
return f"-ismissing(@{field})"