Skip to content

Commit 96cd9a0

Browse files
authored
[vector] Support vector search options (#8203)
Expose query-time vector search options through the Java and Python vector search APIs, and thread them into Flink, Spark, and Lumina search execution. This lets callers configure index-specific query parameters such as `ivf.nprobe` and `hnsw.ef_search` at search time.
1 parent 64b0534 commit 96cd9a0

25 files changed

Lines changed: 673 additions & 54 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/predicate/VectorSearch.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import javax.annotation.Nullable;
2525

2626
import java.io.Serializable;
27+
import java.util.Collections;
28+
import java.util.HashMap;
29+
import java.util.Map;
2730

2831
/** VectorSearch to perform vector similarity search. * */
2932
public class VectorSearch implements Serializable {
@@ -33,10 +36,15 @@ public class VectorSearch implements Serializable {
3336
private final float[] vector;
3437
private final String fieldName;
3538
private final int limit;
39+
private final Map<String, String> options;
3640

3741
@Nullable private RoaringNavigableMap64 includeRowIds;
3842

3943
public VectorSearch(float[] vector, int limit, String fieldName) {
44+
this(vector, limit, fieldName, Collections.emptyMap());
45+
}
46+
47+
public VectorSearch(float[] vector, int limit, String fieldName, Map<String, String> options) {
4048
if (vector == null) {
4149
throw new IllegalArgumentException("Search cannot be null");
4250
}
@@ -49,6 +57,10 @@ public VectorSearch(float[] vector, int limit, String fieldName) {
4957
this.vector = vector;
5058
this.limit = limit;
5159
this.fieldName = fieldName;
60+
this.options =
61+
options == null
62+
? Collections.emptyMap()
63+
: Collections.unmodifiableMap(new HashMap<>(options));
5264
}
5365

5466
public float[] vector() {
@@ -63,6 +75,10 @@ public String fieldName() {
6375
return fieldName;
6476
}
6577

78+
public Map<String, String> options() {
79+
return options == null ? Collections.emptyMap() : options;
80+
}
81+
6682
public RoaringNavigableMap64 includeRowIds() {
6783
return includeRowIds;
6884
}
@@ -81,7 +97,7 @@ public VectorSearch offsetRange(long from, long to) {
8197
for (long rowId : and64) {
8298
roaringNavigableMap64Offset.add(rowId - from);
8399
}
84-
VectorSearch target = new VectorSearch(vector, limit, fieldName);
100+
VectorSearch target = new VectorSearch(vector, limit, fieldName, options());
85101
target.withIncludeRowIds(roaringNavigableMap64Offset);
86102
return target;
87103
}

paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexReader.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,29 @@ public class TestVectorGlobalIndexReader implements GlobalIndexReader {
5656
private final GlobalIndexFileReader fileReader;
5757
private final GlobalIndexIOMeta ioMeta;
5858
private final String metric;
59+
private final String requiredOptionKey;
60+
private final String requiredOptionValue;
5961

6062
private float[][] vectors;
6163
private int dimension;
6264
private int count;
6365

6466
public TestVectorGlobalIndexReader(
6567
GlobalIndexFileReader fileReader, GlobalIndexIOMeta ioMeta, String metric) {
68+
this(fileReader, ioMeta, metric, null, null);
69+
}
70+
71+
public TestVectorGlobalIndexReader(
72+
GlobalIndexFileReader fileReader,
73+
GlobalIndexIOMeta ioMeta,
74+
String metric,
75+
String requiredOptionKey,
76+
String requiredOptionValue) {
6677
this.fileReader = fileReader;
6778
this.ioMeta = ioMeta;
6879
this.metric = metric;
80+
this.requiredOptionKey = requiredOptionKey;
81+
this.requiredOptionValue = requiredOptionValue;
6982
}
7083

7184
@Override
@@ -78,6 +91,18 @@ public CompletableFuture<Optional<ScoredGlobalIndexResult>> visitVectorSearch(
7891
}
7992

8093
float[] queryVector = vectorSearch.vector();
94+
if (requiredOptionKey != null) {
95+
String actual = vectorSearch.options().get(requiredOptionKey);
96+
if (!requiredOptionValue.equals(actual)) {
97+
throw new IllegalArgumentException(
98+
"Required option "
99+
+ requiredOptionKey
100+
+ " expected "
101+
+ requiredOptionValue
102+
+ " but got "
103+
+ actual);
104+
}
105+
}
81106
if (queryVector.length != dimension) {
82107
throw new IllegalArgumentException(
83108
String.format(

paimon-common/src/test/java/org/apache/paimon/globalindex/testvector/TestVectorGlobalIndexer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,15 @@ public class TestVectorGlobalIndexer implements GlobalIndexer {
5555
/** Option key for distance metric. */
5656
public static final String OPT_METRIC = "test.vector.metric";
5757

58+
public static final String OPT_REQUIRED_OPTION_KEY = "test.vector.required-option.key";
59+
60+
public static final String OPT_REQUIRED_OPTION_VALUE = "test.vector.required-option.value";
61+
5862
private final DataType fieldType;
5963
private final int dimension;
6064
private final String metric;
65+
private final String requiredOptionKey;
66+
private final String requiredOptionValue;
6167

6268
public TestVectorGlobalIndexer(DataType fieldType, Options options) {
6369
checkArgument(
@@ -67,6 +73,8 @@ public TestVectorGlobalIndexer(DataType fieldType, Options options) {
6773
this.fieldType = fieldType;
6874
this.dimension = options.getInteger(OPT_DIMENSION, 0);
6975
this.metric = options.getString(OPT_METRIC, "l2");
76+
this.requiredOptionKey = options.getString(OPT_REQUIRED_OPTION_KEY, null);
77+
this.requiredOptionValue = options.getString(OPT_REQUIRED_OPTION_VALUE, null);
7078
}
7179

7280
@Override
@@ -80,7 +88,8 @@ public GlobalIndexReader createReader(
8088
List<GlobalIndexIOMeta> files,
8189
ExecutorService executor) {
8290
checkArgument(files.size() == 1, "Expected exactly one index file per shard");
83-
return new TestVectorGlobalIndexReader(fileReader, files.get(0), metric);
91+
return new TestVectorGlobalIndexReader(
92+
fileReader, files.get(0), metric, requiredOptionKey, requiredOptionValue);
8493
}
8594

8695
public int dimension() {

paimon-common/src/test/java/org/apache/paimon/predicate/VectorSearchTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
import org.junit.jupiter.api.Test;
2525

26+
import java.util.HashMap;
2627
import java.util.List;
28+
import java.util.Map;
2729

2830
import static org.assertj.core.api.Assertions.assertThat;
2931

@@ -49,4 +51,26 @@ public void testVectorSearchOffset() {
4951
List<Range> ranges = vectorSearch.includeRowIds().toRangeList();
5052
assertThat(ranges.get(0)).isEqualTo(new Range(40L, 90L));
5153
}
54+
55+
@Test
56+
public void testVectorSearchOffsetKeepsOptions() {
57+
Map<String, String> options = new HashMap<>();
58+
options.put("ivf.nprobe", "16");
59+
options.put("hnsw.ef_search", "64");
60+
61+
VectorSearch vectorSearch = new VectorSearch(new float[] {1.0f, 0.0f}, 1, "test", options);
62+
63+
RoaringNavigableMap64 includeRowIds = new RoaringNavigableMap64();
64+
includeRowIds.addRange(new Range(100L, 200L));
65+
vectorSearch.withIncludeRowIds(includeRowIds);
66+
67+
VectorSearch offset = vectorSearch.offsetRange(60, 150);
68+
69+
assertThat(offset.options()).isEqualTo(options);
70+
options.put("ivf.nprobe", "32");
71+
assertThat(offset.options())
72+
.containsEntry("ivf.nprobe", "16")
73+
.containsEntry("hnsw.ef_search", "64")
74+
.hasSize(2);
75+
}
5276
}

paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@
4444
import java.io.IOException;
4545
import java.io.Serializable;
4646
import java.util.ArrayList;
47+
import java.util.Collections;
4748
import java.util.Comparator;
49+
import java.util.HashMap;
4850
import java.util.List;
51+
import java.util.Map;
4952
import java.util.Optional;
5053
import java.util.Set;
5154
import java.util.TreeSet;
@@ -65,18 +68,33 @@ public class VectorReadImpl implements VectorRead, Serializable {
6568
protected final int limit;
6669
protected final DataField vectorColumn;
6770
protected final float[] vector;
71+
protected final Map<String, String> options;
6872

6973
public VectorReadImpl(
7074
FileStoreTable table,
7175
Predicate filter,
7276
int limit,
7377
DataField vectorColumn,
7478
float[] vector) {
79+
this(table, filter, limit, vectorColumn, vector, Collections.emptyMap());
80+
}
81+
82+
public VectorReadImpl(
83+
FileStoreTable table,
84+
Predicate filter,
85+
int limit,
86+
DataField vectorColumn,
87+
float[] vector,
88+
Map<String, String> options) {
7589
this.table = table;
7690
this.filter = filter;
7791
this.limit = limit;
7892
this.vectorColumn = vectorColumn;
7993
this.vector = vector;
94+
this.options =
95+
options == null
96+
? Collections.emptyMap()
97+
: Collections.unmodifiableMap(new HashMap<>(options));
8098
}
8199

82100
@Override
@@ -165,7 +183,7 @@ protected CompletableFuture<Optional<ScoredGlobalIndexResult>> eval(
165183
GlobalIndexReader reader =
166184
globalIndexer.createReader(indexFileReader, indexIOMetaList, executor);
167185
VectorSearch vectorSearch =
168-
new VectorSearch(vector, limit, vectorColumn.name())
186+
new VectorSearch(vector, limit, vectorColumn.name(), options)
169187
.withIncludeRowIds(includeRowIds);
170188
return new OffsetGlobalIndexReader(reader, rowRangeStart, rowRangeEnd)
171189
.visitVectorSearch(vectorSearch)

paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.predicate.Predicate;
2424

2525
import java.io.Serializable;
26+
import java.util.Map;
2627

2728
/** Builder to build vector search. */
2829
public interface VectorSearchBuilder extends Serializable {
@@ -42,6 +43,18 @@ public interface VectorSearchBuilder extends Serializable {
4243
/** The vector to search. */
4344
VectorSearchBuilder withVector(float[] vector);
4445

46+
/** Option for vector indexes. */
47+
default VectorSearchBuilder withOption(String key, String value) {
48+
throw new UnsupportedOperationException(
49+
getClass().getName() + " does not support vector options.");
50+
}
51+
52+
/** Options for vector indexes. */
53+
default VectorSearchBuilder withOptions(Map<String, String> options) {
54+
throw new UnsupportedOperationException(
55+
getClass().getName() + " does not support vector options.");
56+
}
57+
4558
/** Create vector scan to scan index files. */
4659
VectorScan newVectorScan();
4760

paimon-core/src/main/java/org/apache/paimon/table/source/VectorSearchBuilderImpl.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.apache.paimon.table.InnerTable;
2626
import org.apache.paimon.types.DataField;
2727

28+
import java.util.HashMap;
29+
import java.util.Map;
30+
2831
import static org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicate;
2932

3033
/** Implementation for {@link VectorSearchBuilder}. */
@@ -39,6 +42,7 @@ public class VectorSearchBuilderImpl implements VectorSearchBuilder {
3942
protected int limit;
4043
protected DataField vectorColumn;
4144
protected float[] vector;
45+
protected Map<String, String> options = new HashMap<>();
4246

4347
public VectorSearchBuilderImpl(InnerTable table) {
4448
this.table = (FileStoreTable) table;
@@ -80,13 +84,27 @@ public VectorSearchBuilder withVector(float[] vector) {
8084
return this;
8185
}
8286

87+
@Override
88+
public VectorSearchBuilder withOptions(Map<String, String> options) {
89+
if (options != null) {
90+
this.options.putAll(options);
91+
}
92+
return this;
93+
}
94+
95+
@Override
96+
public VectorSearchBuilder withOption(String key, String value) {
97+
this.options.put(key, value);
98+
return this;
99+
}
100+
83101
@Override
84102
public VectorScan newVectorScan() {
85103
return new VectorScanImpl(table, partitionFilter, filter, vectorColumn);
86104
}
87105

88106
@Override
89107
public VectorRead newVectorRead() {
90-
return new VectorReadImpl(table, filter, limit, vectorColumn, vector);
108+
return new VectorReadImpl(table, filter, limit, vectorColumn, vector, options);
91109
}
92110
}

paimon-core/src/test/java/org/apache/paimon/table/source/VectorSearchBuilderTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,39 @@ public void testVectorSearchTopKLimit() throws Exception {
227227
assertThat(ids.size()).isLessThanOrEqualTo(5);
228228
}
229229

230+
@Test
231+
public void testVectorSearchThreadsOptions() throws Exception {
232+
catalog.createTable(
233+
identifier("options_table"),
234+
Schema.newBuilder()
235+
.column("id", DataTypes.INT())
236+
.column(VECTOR_FIELD_NAME, new ArrayType(DataTypes.FLOAT()))
237+
.option(CoreOptions.BUCKET.key(), "-1")
238+
.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true")
239+
.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true")
240+
.option("test.vector.dimension", String.valueOf(DIMENSION))
241+
.option("test.vector.metric", "l2")
242+
.option("test.vector.required-option.key", "ivf.nprobe")
243+
.option("test.vector.required-option.value", "16")
244+
.build(),
245+
false);
246+
FileStoreTable table = getTable(identifier("options_table"));
247+
248+
float[][] vectors = {{1.0f, 0.0f}, {0.0f, 1.0f}};
249+
writeVectors(table, vectors);
250+
buildAndCommitIndex(table, vectors);
251+
252+
GlobalIndexResult result =
253+
table.newVectorSearchBuilder()
254+
.withVector(new float[] {1.0f, 0.0f})
255+
.withLimit(1)
256+
.withVectorColumn(VECTOR_FIELD_NAME)
257+
.withOption("ivf.nprobe", "16")
258+
.executeLocal();
259+
260+
assertThat(result.results().isEmpty()).isFalse();
261+
}
262+
230263
@Test
231264
public void testVectorSearchWithMultipleIndexFiles() throws Exception {
232265
createTableDefault();

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/VectorSearchProcedure.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public String[] call(
106106
.withVector(queryVector)
107107
.withVectorColumn(vectorColumn)
108108
.withLimit(topK)
109+
.withOptions(optionsMap)
109110
.executeLocal();
110111

111112
RowType tableRowType = table.rowType();

0 commit comments

Comments
 (0)