Skip to content

Commit 27b1c2b

Browse files
authored
feat(vector): add partition search parallelism (#6475)
## Feature This PR adds query-time `query_parallelism` for vector search partition execution and wires it through Rust, Python, and Java. Callers can control how many IVF partitions a single query may search concurrently: - `0`: auto policy. The current implementation maps auto to the single-worker sequential path. - `1`: single-worker sequential partition search. - `-1`: use the CPU pool size. - `>= 2`: partition-parallel search, clamped to the CPU pool size. The default is `0`, which currently preserves the optimized sequential execution path. ## Performance Improvement The performance issue is per-query worker fan-out. With many concurrent queries, spawning a CPU task for every partition of every query increases contention on the CPU worker pool and lengthens queueing time. This is especially visible for fixed-`nprobes` IVF workloads where every query searches the same number of partitions. This PR improves that path by making query partition scheduling configurable while keeping the optimized sequential model as the default auto behavior. Sequential fixed-`nprobes` search prepares partitions asynchronously, then searches prepared partitions on one CPU worker using a query-level global top-k heap. Parallel execution remains available for workloads that benefit from intra-query partition parallelism. ## Implementation - Rust, Python, and Java expose `query_parallelism` on vector search APIs. - `ANNIvfSubIndexExec` converts the configured value into an effective partition concurrency. - Effective partition concurrency is clamped to the CPU pool size. - IVF v2 splits partition search into async prepare and sync execute phases. - Sequential fixed-`nprobes` search uses a query-level global top-k heap. - Sequential late-search keeps the existing per-partition output shape so early-stop behavior is preserved. - Parallel execution uses direct per-partition search tasks, matching the original execution model. - IVF_RQ/Flat sub-index search reuses caller-owned scratch buffers for RQ distance-table quantization and top-k accumulation to reduce allocation overhead. ## Developer Impact - Rust callers can call `Scanner::query_parallelism(...)`. - Python callers can pass `query_parallelism` in vector search APIs. - Java callers can use `Query.Builder#setQueryParallelism(...)`. - `parallel_mode` / `ParallelMode` have been replaced by the concurrency-based API. ## Benchmark GCP VM benchmark using the same index for all modes. All rows had matching result checksums. Configuration: `gist / IVF_RQ / target_partition_size=8192 / k=100 / nprobes=20 / columns=[] / prewarm / max_queries=1000`. Percentages are relative to `main`. | Threads | Mode | Avg | P50 | P90 | P95 | P99 | QPS | |---:|---|---:|---:|---:|---:|---:|---:| | 8 | main | 4.98 ms | 4.92 ms | 6.28 ms | 6.86 ms | 7.56 ms | 1584.6 | | 8 | sequential | 4.27 ms (-14.2%) | 4.21 ms (-14.5%) | 5.03 ms (-19.9%) | 5.27 ms (-23.1%) | 5.97 ms (-21.0%) | 1838.9 (+16.1%) | | 8 | parallel | 5.02 ms (+0.7%) | 4.97 ms (+0.9%) | 6.27 ms (-0.2%) | 6.60 ms (-3.7%) | 7.36 ms (-2.7%) | 1575.4 (-0.6%) | | 16 | main | 9.95 ms | 9.74 ms | 13.12 ms | 14.11 ms | 16.83 ms | 1583.1 | | 16 | sequential | 8.09 ms (-18.6%) | 7.98 ms (-18.0%) | 9.84 ms (-25.0%) | 10.47 ms (-25.8%) | 11.67 ms (-30.6%) | 1936.8 (+22.3%) | | 16 | parallel | 9.95 ms (+0.0%) | 9.68 ms (-0.6%) | 13.37 ms (+1.9%) | 14.42 ms (+2.2%) | 16.83 ms (+0.0%) | 1583.6 (+0.0%) | | 32 | main | 18.68 ms | 18.16 ms | 26.66 ms | 28.77 ms | 33.12 ms | 1652.7 | | 32 | sequential | 15.50 ms (-17.0%) | 15.15 ms (-16.6%) | 20.57 ms (-22.9%) | 22.16 ms (-23.0%) | 25.32 ms (-23.6%) | 2000.6 (+21.1%) | | 32 | parallel | 19.13 ms (+2.4%) | 18.58 ms (+2.3%) | 26.49 ms (-0.6%) | 29.12 ms (+1.2%) | 33.92 ms (+2.4%) | 1625.9 (-1.6%) | | 64 | main | 33.98 ms | 33.01 ms | 49.65 ms | 53.87 ms | 63.58 ms | 1718.4 | | 64 | sequential | 29.95 ms (-11.8%) | 29.67 ms (-10.1%) | 42.44 ms (-14.5%) | 46.81 ms (-13.1%) | 54.42 ms (-14.4%) | 1949.4 (+13.4%) | | 64 | parallel | 35.17 ms (+3.5%) | 34.37 ms (+4.1%) | 50.70 ms (+2.1%) | 55.04 ms (+2.2%) | 69.09 ms (+8.7%) | 1650.7 (-3.9%) | | 128 | main | 38.73 ms | 36.07 ms | 68.28 ms | 76.78 ms | 96.81 ms | 1663.3 | | 128 | sequential | 37.48 ms (-3.2%) | 35.90 ms (-0.5%) | 65.26 ms (-4.4%) | 71.82 ms (-6.5%) | 87.48 ms (-9.6%) | 1915.3 (+15.2%) | | 128 | parallel | 41.29 ms (+6.6%) | 39.38 ms (+9.2%) | 71.68 ms (+5.0%) | 80.70 ms (+5.1%) | 96.20 ms (-0.6%) | 1577.2 (-5.2%) | ## Validation - `cargo fmt --all --check` - `cargo check -p lance --tests` - `cd python && cargo check` - `cd java && cargo check --manifest-path ./lance-jni/Cargo.toml` - `uv run --with maturin maturin develop` - `uv run pytest python/tests/test_vector_index.py::test_vector_index_with_query_parallelism python/tests/test_vector_index.py::test_vector_index_invalid_query_parallelism` - Python ruff check / format check for touched Python files
1 parent 8962655 commit 27b1c2b

22 files changed

Lines changed: 1989 additions & 168 deletions

File tree

java/lance-jni/src/blocking_scanner.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,11 @@ pub(crate) fn build_scanner_with_options<'a>(
320320

321321
let use_index = env.get_boolean_from_method(&java_obj, "isUseIndex")?;
322322
scanner.use_index(use_index);
323+
324+
let query_parallelism = env
325+
.call_method(&java_obj, "getQueryParallelism", "()I", &[])?
326+
.i()?;
327+
scanner.query_parallelism(query_parallelism);
323328
Ok(())
324329
})?;
325330

java/lance-jni/src/utils.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ pub fn get_query(env: &mut JNIEnv, query_obj: JObject) -> Result<Option<Query>>
207207
};
208208

209209
let use_index = env.get_boolean_from_method(&java_obj, "isUseIndex")?;
210+
let query_parallelism = env
211+
.call_method(&java_obj, "getQueryParallelism", "()I", &[])?
212+
.i()?;
210213

211214
Ok(Query {
212215
column,
@@ -221,6 +224,7 @@ pub fn get_query(env: &mut JNIEnv, query_obj: JObject) -> Result<Option<Query>>
221224
metric_type: distance_type,
222225
use_index,
223226
dist_q_c: 0.0,
227+
query_parallelism,
224228
})
225229
})?;
226230

java/src/main/java/org/lance/ipc/Query.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class Query {
3131
private final Optional<Integer> refineFactor;
3232
private final Optional<DistanceType> distanceType;
3333
private final boolean useIndex;
34+
private final int queryParallelism;
3435

3536
private Query(Builder builder) {
3637
this.column = Preconditions.checkNotNull(builder.column, "Columns must be set");
@@ -50,6 +51,7 @@ private Query(Builder builder) {
5051
this.refineFactor = builder.refineFactor;
5152
this.distanceType = builder.distanceType;
5253
this.useIndex = builder.useIndex;
54+
this.queryParallelism = builder.queryParallelism;
5355
}
5456

5557
public String getColumn() {
@@ -92,6 +94,10 @@ public boolean isUseIndex() {
9294
return useIndex;
9395
}
9496

97+
public int getQueryParallelism() {
98+
return queryParallelism;
99+
}
100+
95101
@Override
96102
public String toString() {
97103
return MoreObjects.toStringHelper(this)
@@ -104,6 +110,7 @@ public String toString() {
104110
.add("refineFactor", refineFactor.orElse(null))
105111
.add("distanceType", distanceType.orElse(null))
106112
.add("useIndex", useIndex)
113+
.add("queryParallelism", queryParallelism)
107114
.toString();
108115
}
109116

@@ -117,6 +124,7 @@ public static class Builder {
117124
private Optional<Integer> refineFactor = Optional.empty();
118125
private Optional<DistanceType> distanceType = Optional.empty();
119126
private boolean useIndex = true;
127+
private int queryParallelism = 0;
120128

121129
/**
122130
* Sets the column to be searched.
@@ -245,6 +253,24 @@ public Builder setUseIndex(boolean useIndex) {
245253
return this;
246254
}
247255

256+
/**
257+
* Sets vector partition search concurrency for each query.
258+
*
259+
* <p>The default is 0. Value 0 uses the automatic policy, which currently maps to the
260+
* single-worker sequential path. Value -1 uses the CPU pool size. Value 1 uses the
261+
* single-worker sequential path. Values greater than or equal to 2 use the partition-parallel
262+
* path and are clamped to the CPU pool size.
263+
*
264+
* @param queryParallelism The partition search concurrency policy.
265+
* @return The Builder instance for method chaining.
266+
*/
267+
public Builder setQueryParallelism(int queryParallelism) {
268+
Preconditions.checkArgument(
269+
queryParallelism >= -1, "Query parallelism must be greater than or equal to -1");
270+
this.queryParallelism = queryParallelism;
271+
return this;
272+
}
273+
248274
/**
249275
* Builds the Query object.
250276
*

java/src/test/java/org/lance/JNITest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public void testQuery() {
5959
.setRefineFactor(40)
6060
.setDistanceType(DistanceType.L2)
6161
.setUseIndex(true)
62+
.setQueryParallelism(-1)
6263
.build()));
6364
}
6465

protos/ann.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ message VectorQueryProto {
2525
optional lance.index.pb.VectorMetricType metric_type = 10;
2626
bool use_index = 11;
2727
optional float dist_q_c = 12;
28+
optional int32 query_parallelism = 13;
2829
}
2930

3031
// Serializable form of ANNIvfSubIndexExec — the IVF sub-index search node.

python/python/lance/dataset.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import copy
77
import dataclasses
88
import json
9+
import operator
910
import os
1011
import random
1112
import time
@@ -5631,8 +5632,21 @@ def nearest(
56315632
refine_factor: Optional[int] = None,
56325633
use_index: bool = True,
56335634
ef: Optional[int] = None,
5635+
query_parallelism: Optional[int] = None,
56345636
distance_range: Optional[tuple[Optional[float], Optional[float]]] = None,
56355637
) -> ScannerBuilder:
5638+
"""Configure nearest neighbor search.
5639+
5640+
Parameters
5641+
----------
5642+
query_parallelism: int, optional
5643+
Maximum partition-search concurrency for a single vector query.
5644+
The default is 0. Value 0 uses the automatic policy, which
5645+
currently maps to the single-worker sequential path. Value -1 uses
5646+
the CPU pool size. Value 1 uses the single-worker sequential path.
5647+
Values >= 2 use the partition-parallel path and are clamped to the
5648+
CPU pool size.
5649+
"""
56365650
self._nearest = _build_vector_search_query(
56375651
column,
56385652
q,
@@ -5645,6 +5659,7 @@ def nearest(
56455659
refine_factor=refine_factor,
56465660
use_index=use_index,
56475661
ef=ef,
5662+
query_parallelism=query_parallelism,
56485663
distance_range=distance_range,
56495664
)
56505665
return self
@@ -6760,6 +6775,7 @@ def _build_vector_search_query(
67606775
refine_factor: Optional[int] = None,
67616776
use_index: bool = True,
67626777
ef: Optional[int] = None,
6778+
query_parallelism: Optional[int] = None,
67636779
distance_range: Optional[tuple[Optional[float], Optional[float]]] = None,
67646780
) -> dict:
67656781
"""Configure nearest neighbor search.
@@ -6787,6 +6803,12 @@ def _build_vector_search_query(
67876803
Whether to use the index for the search.
67886804
ef: int, optional
67896805
The ef parameter for HNSW search.
6806+
query_parallelism: int, optional
6807+
Maximum partition-search concurrency for a single vector query.
6808+
The default is 0. Value 0 uses the automatic policy, which currently
6809+
maps to the single-worker sequential path. Value -1 uses the CPU pool
6810+
size. Value 1 uses the single-worker sequential path. Values >= 2 use
6811+
the partition-parallel path and are clamped to the CPU pool size.
67906812
distance_range: tuple[Optional[float], Optional[float]], optional
67916813
A tuple of (lower_bound, upper_bound) to filter results by distance.
67926814
Both bounds are optional. The lower bound is inclusive and the upper
@@ -6854,6 +6876,11 @@ def _build_vector_search_query(
68546876
# `ef` should be >= `k`, but `k` could be None so we can't check it here
68556877
# the rust code will check it
68566878
raise ValueError(f"ef must be > 0 but got {ef}")
6879+
if query_parallelism is not None:
6880+
query_parallelism = operator.index(query_parallelism)
6881+
6882+
if query_parallelism is not None and query_parallelism < -1:
6883+
raise ValueError("query_parallelism must be >= -1")
68576884

68586885
if distance_range is not None:
68596886
if len(distance_range) != 2:
@@ -6871,6 +6898,7 @@ def _build_vector_search_query(
68716898
"refine_factor": refine_factor,
68726899
"use_index": use_index,
68736900
"ef": ef,
6901+
"query_parallelism": query_parallelism,
68746902
"distance_range": distance_range,
68756903
}
68766904

@@ -7043,6 +7071,7 @@ def __init__(
70437071
refine_factor: Optional[int] = None,
70447072
use_index: bool = True,
70457073
ef: Optional[int] = None,
7074+
query_parallelism: Optional[int] = None,
70467075
):
70477076
self._inner = _build_vector_search_query(
70487077
column,
@@ -7055,6 +7084,7 @@ def __init__(
70557084
refine_factor=refine_factor,
70567085
use_index=use_index,
70577086
ef=ef,
7087+
query_parallelism=query_parallelism,
70587088
)
70597089

70607090
def inner(self):

python/python/tests/test_vector_index.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,6 +1864,41 @@ def test_vector_index_with_nprobes(indexed_dataset):
18641864
).analyze_plan()
18651865

18661866

1867+
def test_vector_index_with_query_parallelism(indexed_dataset):
1868+
q = np.random.randn(128)
1869+
1870+
sequential = indexed_dataset.to_table(
1871+
nearest={
1872+
"column": "vector",
1873+
"q": q,
1874+
"k": 10,
1875+
"query_parallelism": 0,
1876+
}
1877+
)
1878+
parallel = indexed_dataset.to_table(
1879+
nearest={
1880+
"column": "vector",
1881+
"q": q,
1882+
"k": 10,
1883+
"query_parallelism": -1,
1884+
}
1885+
)
1886+
1887+
assert sequential == parallel
1888+
1889+
1890+
def test_vector_index_invalid_query_parallelism(indexed_dataset):
1891+
with pytest.raises(ValueError, match="query_parallelism"):
1892+
indexed_dataset.scanner(
1893+
nearest={
1894+
"column": "vector",
1895+
"q": np.random.randn(128),
1896+
"k": 10,
1897+
"query_parallelism": -2,
1898+
}
1899+
)
1900+
1901+
18671902
def test_knn_deleted_rows(tmp_path):
18681903
data = create_table()
18691904
ds = lance.write_dataset(data, tmp_path)

python/src/dataset.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ use lance_index::{
7777
progress::{IndexBuildProgress, NoopIndexBuildProgress},
7878
scalar::{FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams},
7979
vector::{
80-
Query as VectorQuery, hnsw::builder::HnswBuildParams, ivf::IvfBuildParams,
81-
pq::PQBuildParams, sq::builder::SQBuildParams,
80+
DEFAULT_QUERY_PARALLELISM, Query as VectorQuery, hnsw::builder::HnswBuildParams,
81+
ivf::IvfBuildParams, pq::PQBuildParams, sq::builder::SQBuildParams,
8282
},
8383
};
8484
use lance_index::{
@@ -1251,6 +1251,7 @@ impl Dataset {
12511251
refine_factor,
12521252
use_index,
12531253
ef,
1254+
query_parallelism,
12541255
) = vector_query_params_from_dict(nearest, default_k)?;
12551256

12561257
let (_, element_type) = get_vector_type(self_.ds.schema(), &column)
@@ -1311,6 +1312,7 @@ impl Dataset {
13111312
if let Some(ef) = ef {
13121313
s = s.ef(ef);
13131314
}
1315+
s = s.query_parallelism(query_parallelism);
13141316
s.use_index(use_index);
13151317
if let Some((lower, upper)) = distance_range {
13161318
s.distance_range(lower, upper);
@@ -4309,8 +4311,28 @@ type VectorQueryParams = (
43094311
Option<u32>,
43104312
bool,
43114313
Option<usize>,
4314+
i32,
43124315
);
43134316

4317+
fn extract_query_parallelism(value: &Bound<'_, PyAny>) -> PyResult<i32> {
4318+
let query_parallelism = value.extract()?;
4319+
if query_parallelism < -1 {
4320+
Err(PyValueError::new_err("query_parallelism must be >= -1"))
4321+
} else {
4322+
Ok(query_parallelism)
4323+
}
4324+
}
4325+
4326+
fn vector_query_query_parallelism_from_dict(dict: &Bound<'_, PyDict>) -> PyResult<i32> {
4327+
if let Some(query_parallelism) = dict.get_item("query_parallelism")?
4328+
&& !query_parallelism.is_none()
4329+
{
4330+
extract_query_parallelism(&query_parallelism)
4331+
} else {
4332+
Ok(DEFAULT_QUERY_PARALLELISM)
4333+
}
4334+
}
4335+
43144336
fn vector_query_params_from_dict(
43154337
dict: &Bound<'_, PyDict>,
43164338
default_k: usize,
@@ -4416,6 +4438,8 @@ fn vector_query_params_from_dict(
44164438
None
44174439
};
44184440

4441+
let query_parallelism = vector_query_query_parallelism_from_dict(dict)?;
4442+
44194443
Ok((
44204444
column,
44214445
key,
@@ -4426,6 +4450,7 @@ fn vector_query_params_from_dict(
44264450
refine_factor,
44274451
use_index,
44284452
ef,
4453+
query_parallelism,
44294454
))
44304455
}
44314456

@@ -4461,6 +4486,7 @@ impl PySearchFilter {
44614486
refine_factor,
44624487
use_index,
44634488
ef,
4489+
query_parallelism,
44644490
) = vector_query_params_from_dict(query, default_k)?;
44654491

44664492
let metric_type = Some(metric_type_opt.unwrap_or(MetricType::L2));
@@ -4477,6 +4503,7 @@ impl PySearchFilter {
44774503
refine_factor,
44784504
metric_type,
44794505
use_index,
4506+
query_parallelism,
44804507
dist_q_c: 0.0,
44814508
};
44824509

0 commit comments

Comments
 (0)