Skip to content

Commit c9a4302

Browse files
committed
Support Query Expressions
Closes gh-129
1 parent eddd3bf commit c9a4302

21 files changed

Lines changed: 1374 additions & 81 deletions

.github/workflows/pr-build-workflow.yml

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,17 @@ jobs:
1414
with:
1515
java-version: 1.11
1616
- name: Install Reindexer
17+
# TODO: Temporarily build reindexer from sources until https://github.com/Restream/reindexer/issues/92 is released.
18+
# run: |
19+
# sudo curl https://repo.reindexer.io/RX-KEY.GPG -o /etc/apt/trusted.gpg.d/reindexer.asc
20+
# echo 'deb https://repo.reindexer.io/ubuntu-noble /' | sudo tee -a /etc/apt/sources.list
21+
# sudo apt-get update
22+
# sudo apt-get install -y reindexer-dev libopenblas-pthread-dev
1723
run: |
18-
sudo curl https://repo.reindexer.io/RX-KEY.GPG -o /etc/apt/trusted.gpg.d/reindexer.asc
19-
echo 'deb https://repo.reindexer.io/ubuntu-noble /' | sudo tee -a /etc/apt/sources.list
20-
sudo apt-get update
21-
sudo apt-get install -y reindexer-dev libopenblas-pthread-dev
24+
curl -L https://github.com/Restream/reindexer/raw/master/dependencies.sh | sudo bash -s
25+
git clone https://github.com/Restream/reindexer
26+
cmake reindexer -Breindexer/build
27+
cmake --build reindexer/build -- -j 4
28+
sudo cmake --install reindexer/build
2229
- name: Build with Maven
2330
run: mvn --batch-mode --update-snapshots verify

README.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,72 @@ InnerJoins can be used as a condition in Where clause:
305305
```
306306
Note that usually Or operator implements short-circuiting for Where conditions: if the previous condition is true the next one is not evaluated. But in case of InnerJoin it works differently: in query1 (from the example above) both InnerJoin conditions are evaluated despite the result of WhereInt. Limit(0) as part of InnerJoin (query3 from the example above) does not join any data - it works like a filter only to verify conditions.
307307

308+
### Query Expressions
309+
310+
#### Functions
311+
Reindexer provides built-in functions that can be used within `WHERE` clauses to enable advanced filtering capabilities beyond simple field comparisons.
312+
313+
##### flat_array_len(field_name)
314+
The `flat_array_len` function returns the length or cardinality of a specified field, making it particularly useful for filtering based on array sizes or field presence.
315+
The `flat_array_len` function can be used in both `SELECT` and `UPDATE` queries.
316+
317+
Behavior by Field Type:
318+
- Array Fields: returns the number of elements in the array
319+
- Scalar Fields (integers, strings, etc.): always returns 1
320+
- Object Fields: always returns 1
321+
- Nested Array Elements: returns the count of occurrences when the field is nested within arrays
322+
323+
Examples:
324+
325+
```java
326+
// Find social media posts with between 10 and 50 comments
327+
// and at least 3 attached media files.
328+
List<Post> posts = db.query("posts", Post.class)
329+
.where(Expression.flatArrayLength("comments"), RANGE, Expression.values(10, 50))
330+
.where(Expression.flatArrayLength("media"), GE, Expression.values(3))
331+
.toList();
332+
333+
// Update field 'size' with flat_array_len function.
334+
db.query("posts", Post.class)
335+
.where("id", EQ, 1)
336+
.setExpression("size", Expression.string("flat_array_len(comments)"))
337+
.update();
338+
```
339+
340+
Notes:
341+
- `flat_array_len` function operates efficiently on indexed fields
342+
- Returns 0 if the specified field does not exist in a document
343+
- Supports the following comparison operators: (`=`, `>`, `>=`, `<`, `<=`, `Range`, `Set`)
344+
- Can be used in both `SELECT` and `UPDATE` queries
345+
346+
##### now(unit)
347+
The `now()` function returns the current system timestamp, making it particularly useful for time-based filtering and data synchronization.
348+
This function can be used in both `SELECT` and `UPDATE` queries.
349+
350+
Arguments:
351+
- `sec` - returns timestamp in seconds (default if no argument is provided)
352+
- `msec` - returns timestamp in milliseconds
353+
- `usec` - returns timestamp in microseconds
354+
- `nsec` - returns timestamp in nanoseconds
355+
356+
```java
357+
// Find events that occurred in the past.
358+
List<Event> events = db.query("events", Event.class)
359+
.where(Expression.field("timestamp"), LE, Expression.now(TimeUnit.SECONDS))
360+
.toList();
361+
362+
db.query("items", Item.class)
363+
.where("id", EQ, 42)
364+
.setExpression("updated_at", Expression.string("now(usec)"))
365+
.update();
366+
```
367+
368+
Notes:
369+
- The returned timestamp represents seconds (or subunits) since the Unix epoch (January 1, 1970)
370+
- Time resolution depends on the specified unit - use `nsec` for maximum precision
371+
- All instances of `now()` within a single query share the same value, which is computed at the start of the query execution
372+
- Useful for implementing TTL (Time-To-Live) functionality and audit logging
373+
308374
### Transactions and batch update
309375

310376
Reindexer supports transactions. Transaction are performs atomic namespace update. There are synchronous and

src/main/java/ru/rt/restream/reindexer/Query.java

Lines changed: 99 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
import ru.rt.restream.reindexer.annotations.Hnsw;
2121
import ru.rt.restream.reindexer.annotations.Ivf;
2222
import ru.rt.restream.reindexer.annotations.VecBf;
23-
import ru.rt.restream.reindexer.binding.Consts;
2423
import ru.rt.restream.reindexer.binding.QueryResult;
2524
import ru.rt.restream.reindexer.binding.RequestContext;
2625
import ru.rt.restream.reindexer.binding.TransactionContext;
2726
import ru.rt.restream.reindexer.binding.cproto.ByteBuffer;
2827
import ru.rt.restream.reindexer.binding.cproto.cjson.PayloadType;
28+
import ru.rt.restream.reindexer.expression.WhereExpression;
29+
import ru.rt.restream.reindexer.expression.Expression;
30+
import ru.rt.restream.reindexer.expression.SetExpression;
2931
import ru.rt.restream.reindexer.util.JsonSerializer;
3032
import ru.rt.restream.reindexer.util.Pair;
3133
import ru.rt.restream.reindexer.vector.params.KnnSearchParam;
@@ -36,10 +38,10 @@
3638
import java.util.Collections;
3739
import java.util.Deque;
3840
import java.util.List;
41+
import java.util.Objects;
3942
import java.util.Optional;
4043
import java.util.Spliterator;
4144
import java.util.Spliterators;
42-
import java.util.UUID;
4345
import java.util.stream.Stream;
4446
import java.util.stream.StreamSupport;
4547

@@ -56,8 +58,6 @@
5658
import static ru.rt.restream.reindexer.binding.Consts.MERGE;
5759
import static ru.rt.restream.reindexer.binding.Consts.MODE_ACCURATE_TOTAL;
5860
import static ru.rt.restream.reindexer.binding.Consts.OR_INNER_JOIN;
59-
import static ru.rt.restream.reindexer.binding.Consts.VALUE_BOOL;
60-
import static ru.rt.restream.reindexer.binding.Consts.VALUE_NULL;
6161
import static ru.rt.restream.reindexer.binding.Consts.VALUE_STRING;
6262

6363
/**
@@ -112,6 +112,7 @@ public class Query<T> {
112112
private static final int QUERY_FIELD_SUB_QUERY_CONDITION = 30;
113113
private static final int QUERY_LOCAL = 31;
114114
private static final int QUERY_KNN_CONDITION = 32;
115+
private static final int QUERY_EXPRESSION_CONDITION = 36;
115116

116117
/**
117118
* Condition types.
@@ -403,7 +404,7 @@ public Query<T> where(String indexName, Condition condition, Object... values) {
403404

404405
buffer.putVarUInt32(values.length);
405406
for (Object key : values) {
406-
putValue(key);
407+
buffer.putValue(key);
407408
}
408409

409410
return this;
@@ -444,7 +445,7 @@ public Query<T> where(Query<?> subquery, Condition condition, Object... values)
444445

445446
buffer.putVarUInt32(values.length);
446447
for (Object key : values) {
447-
putValue(key);
448+
buffer.putValue(key);
448449
}
449450

450451
return this;
@@ -472,6 +473,51 @@ public Query<T> where(String indexName, Condition condition, Query<?> subquery)
472473
return this;
473474
}
474475

476+
/**
477+
* Where predicate between {@code left} and {@code right} expressions using {@code condition}.
478+
* Supported combinations:
479+
* <ul>
480+
* <li>{@link Expression#field(String) field} condition {@link Expression#values(Object...) values}</li>
481+
* <li>{@link Expression#field(String) field} condition {@link Expression#now(TimeUnit) now(unit)}</li>
482+
* <li>{@link Expression#field(String) field} condition {@link Expression#flatArrayLength(String) flat_array_len(field)}</li>
483+
* <li>{@link Expression#field(String) field} condition {@link Expression#subQuery(Query) subquery}</li>
484+
* <li>{@link Expression#field(String) field} condition {@link Expression#field(String) field}</li>
485+
* <li>{@link Expression#flatArrayLength(String) flat_array_len(field)} condition {@link Expression#values(Object...) values}</li>
486+
* <li>{@link Expression#flatArrayLength(String) flat_array_len(field)} condition {@link Expression#subQuery(Query) subquery}</li>
487+
* <li>{@link Expression#subQuery(Query) subquery} condition {@link Expression#values(Object...) values}</li>
488+
* <li>{@link Expression#subQuery(Query) subquery} condition {@link Expression#now(TimeUnit) now(unit)}</li>
489+
* <li>{@link Expression#subQuery(Query) subquery} condition {@link Expression#flatArrayLength(String) flat_array_len(field)}</li>
490+
* </ul>
491+
* Example: {@code where(Expression.field("created_at"), Condition.LE, Expression.now())}.
492+
* <p>
493+
*
494+
* @param left the left operand {@link Expression} to use
495+
* @param condition the {@link Condition} to use
496+
* @param right the right operand {@link Expression} to use
497+
* @return the {@link Query} for further customizations
498+
* @see Expression for factory methods to build different types of expressions
499+
*/
500+
public Query<T> where(WhereExpression left, Condition condition, WhereExpression right) {
501+
Objects.requireNonNull(left, "left expression cannot be null");
502+
Objects.requireNonNull(right, "right expression cannot be null");
503+
504+
logBuilder.where(nextOperation, left, condition.code, right);
505+
506+
buffer.putVarUInt32(QUERY_EXPRESSION_CONDITION);
507+
508+
left.serializeWhere(buffer);
509+
510+
buffer.putVarUInt32(nextOperation);
511+
buffer.putVarUInt32(condition.code);
512+
513+
right.serializeWhere(buffer);
514+
515+
nextOperation = OP_AND;
516+
queryCount++;
517+
518+
return this;
519+
}
520+
475521
/**
476522
* The condition are possible only on the vector indexed fields,
477523
* marked with {@link Hnsw}, {@link Ivf}, {@link VecBf} annotations.
@@ -785,7 +831,7 @@ public Query<T> sort(String index, boolean desc, Object... values) {
785831

786832
buffer.putVarUInt32(values.length);
787833
for (Object value : values) {
788-
putValue(value);
834+
buffer.putValue(value);
789835
}
790836

791837
return this;
@@ -803,55 +849,6 @@ public Query<T> fetchCount(int fetchCount) {
803849
return this;
804850
}
805851

806-
private void putValue(Object value) {
807-
if (value == null) {
808-
buffer.putVarUInt32(VALUE_NULL);
809-
} else if (value instanceof Boolean) {
810-
buffer.putVarUInt32(VALUE_BOOL);
811-
if ((Boolean) value) {
812-
buffer.putVarUInt32(1);
813-
} else {
814-
buffer.putVarUInt32(0);
815-
}
816-
} else if (value instanceof Integer) {
817-
buffer.putVarUInt32(Consts.VALUE_INT)
818-
.putVarInt64((Integer) value);
819-
} else if (value instanceof String) {
820-
buffer.putVarUInt32(Consts.VALUE_STRING)
821-
.putVString((String) value);
822-
} else if (value instanceof Long) {
823-
buffer.putVarUInt32(Consts.VALUE_INT_64)
824-
.putVarInt64((Long) value);
825-
} else if (value instanceof Byte) {
826-
buffer.putVarUInt32(Consts.VALUE_INT)
827-
.putVarInt64((Byte) value);
828-
} else if (value instanceof Short) {
829-
buffer.putVarUInt32(Consts.VALUE_INT)
830-
.putVarInt64((Short) value);
831-
} else if (value instanceof Double) {
832-
buffer.putVarUInt32(Consts.VALUE_DOUBLE)
833-
.putDouble((Double) value);
834-
} else if (value instanceof Float) {
835-
Float floatValue = (Float) value;
836-
buffer.putVarUInt32(Consts.VALUE_DOUBLE)
837-
.putDouble(floatValue.doubleValue());
838-
} else if (value instanceof Character) {
839-
Character character = (Character) value;
840-
buffer.putVarUInt32(Consts.VALUE_STRING)
841-
.putVString(character.toString());
842-
} else if (value instanceof UUID) {
843-
buffer.putVarUInt32(Consts.VALUE_UUID)
844-
.putUuid((UUID) value);
845-
} else if (value instanceof Object[]) {
846-
buffer.putVarUInt32(Consts.VALUE_TUPLE);
847-
Object[] objects = (Object[]) value;
848-
buffer.putVarUInt32(objects.length);
849-
for (Object object : objects) {
850-
putValue(object);
851-
}
852-
}
853-
}
854-
855852
/**
856853
* Will execute query, and return stream of items.
857854
* The returned stream must be closed using the {@link Stream#close()} method or
@@ -1122,7 +1119,7 @@ public Query<T> set(String fieldName, Object value) {
11221119
buffer.putVarUInt32(values.size());
11231120
for (Object v : values) {
11241121
buffer.putVarUInt32(0);
1125-
putValue(v);
1122+
buffer.putValue(v);
11261123
}
11271124
} else if (value != null && value.getClass().isArray()) {
11281125
Object[] values = (Object[]) value;
@@ -1137,19 +1134,43 @@ public Query<T> set(String fieldName, Object value) {
11371134
buffer.putVarUInt32(values.length);
11381135
for (Object v : values) {
11391136
buffer.putVarUInt32(0);
1140-
putValue(v);
1137+
buffer.putValue(v);
11411138
}
11421139
} else {
11431140
buffer.putVarUInt32(cmd);
11441141
buffer.putVString(fieldName);
11451142
buffer.putVarUInt32(1);
11461143
buffer.putVarUInt32(0);
1147-
putValue(value);
1144+
buffer.putValue(value);
11481145
}
11491146

11501147
return this;
11511148
}
11521149

1150+
/**
1151+
* Updates indexed field by {@link SetExpression}.
1152+
* <p>
1153+
* Example: {@code setExpression("created_at", Expression.string("now() - 1 * 24 * 60 * 60"))}.
1154+
* <p>
1155+
*
1156+
* @param fieldName the field name to use
1157+
* @param expression the expression to use
1158+
* @return the {@link Query} for further customizations
1159+
* @see Expression for factory methods to build different types of expressions
1160+
*/
1161+
public Query<T> setExpression(String fieldName, SetExpression expression) {
1162+
Objects.requireNonNull(expression, "expression cannot be null");
1163+
1164+
logBuilder.set(fieldName, expression);
1165+
1166+
buffer.putVarUInt32(QUERY_UPDATE_FIELD);
1167+
buffer.putVString(fieldName);
1168+
1169+
expression.serializeSet(buffer);
1170+
1171+
return this;
1172+
}
1173+
11531174
private void setObject(String fieldName, Object value) {
11541175
boolean isArray = false;
11551176
int count = 1;
@@ -1286,4 +1307,23 @@ String getSql() {
12861307
return logBuilder.getSql();
12871308
}
12881309

1310+
/**
1311+
* Returns all used bytes from the {@link ByteBuffer}.
1312+
*
1313+
* @return all used bytes from the {@code ByteBuffer}
1314+
*/
1315+
public byte[] bytes() {
1316+
return buffer.bytes();
1317+
}
1318+
1319+
/**
1320+
* Returns the string representation of the query.
1321+
*
1322+
* @return the string representation of the query
1323+
*/
1324+
@Override
1325+
public String toString() {
1326+
return getSql();
1327+
}
1328+
12891329
}

src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package ru.rt.restream.reindexer;
1717

1818
import org.apache.commons.lang3.StringUtils;
19+
import ru.rt.restream.reindexer.expression.Expression;
1920
import ru.rt.restream.reindexer.vector.params.KnnSearchParam;
2021

2122
import java.util.ArrayDeque;
@@ -59,7 +60,7 @@ private static class AggregateEntry {
5960

6061
private static class QueryEntry {
6162
private Operation operation;
62-
private String field;
63+
private Object field;
6364
private Condition condition;
6465
private String secondField;
6566
private int joinIndex = -1;
@@ -299,6 +300,20 @@ void where(int operationCode, String field, int conditionCode, Object... values)
299300
}
300301
}
301302

303+
void where(int operationCode, Expression left, int conditionCode, Expression right) {
304+
QueryEntry queryEntry = new QueryEntry();
305+
queryEntry.operation = getOperation(operationCode);
306+
queryEntry.field = left;
307+
queryEntry.condition = getCondition(conditionCode);
308+
queryEntry.values.add(right);
309+
if (!whereStack.isEmpty()) {
310+
QueryEntry parent = whereStack.getLast();
311+
parent.children.add(queryEntry);
312+
} else {
313+
whereEntries.add(queryEntry);
314+
}
315+
}
316+
302317
void whereKnn(int operationCode, String indexName, float[] vector, KnnSearchParam params) {
303318
QueryEntry queryEntry = new QueryEntry();
304319
queryEntry.operation = getOperation(operationCode);

0 commit comments

Comments
 (0)