feat: support distributed IVF vector index builds#479
feat: support distributed IVF vector index builds#479jiaoew1991 wants to merge 1 commit intolance-format:mainfrom
Conversation
…/ IVF_SQ)
Adds a VectorIndexJob path in AddIndexExec that uses lance-core's
multi-segment commit API: the driver pre-trains IVF centroids (and PQ
codebook for IVF_PQ) once via VectorTrainer, then each Spark task calls
createIndex(withFragmentIds([fid])) with those shared artifacts. The
driver collects the uncommitted per-fragment segments and publishes them
atomically under one logical index name via commitExistingIndexSegments.
Pre-trained centroids are required: lance-core's distributed build path
rejects per-fragment-trained centroids (rust/lance/src/index/vector.rs
"missing precomputed IVF centroids"). Sharing centroids across segments
also keeps them in the same query-time compatibility group, avoiding
per-segment routing overhead.
Grammar is unchanged — users write:
ALTER TABLE t CREATE INDEX v_idx USING ivf_pq (embedding)
WITH (num_partitions=256, num_sub_vectors=16, metric_type='l2');
Scalar BTREE / INVERTED paths are untouched; those still produce partial
per-fragment files that require mergeIndexMetadata and cannot use the
segment commit API in v6.0.0-beta.2.
Replace-on-recreate is preserved via dropIndex(name) before commit, since
commitExistingIndexSegments is additive by default.
| * v1 trains centroids independently per fragment (each segment is its own compatibility | ||
| * group at query time). Shared-centroid broadcast is a follow-up. |
There was a problem hiding this comment.
This doc contradicts the implementation, on a correctness-relevant property.
runAndCommit calls trainArtifactsOnDriver once (line 578) and ships centroids+codebook to every task via trainedSpec — and the inline comment at lines 574–577 correctly states the opposite of this paragraph:
"Train once here, broadcast to executors, and every per-fragment segment shares the same centroids/codebook so segments land in the same compatibility group at query time."
The PR description matches the inline comment too. This class doc looks like a stale draft from before driver-side training was added. Could you remove or rewrite it before merge? A future maintainer trusting "each segment is its own compatibility group" could file a bogus bug — or "fix" the implementation to match the doc and break the compatibility-group guarantee.
| private def trainArtifactsOnDriver(column: String): (Array[Float], Array[Float]) = { | ||
| val dataset = Utils.openDatasetBuilder(readOptions).build() | ||
| try { | ||
| val ivfBuilder = new IvfBuildParams.Builder().setNumPartitions(spec.numPartitions) | ||
| spec.sampleRate.foreach(ivfBuilder.setSampleRate) | ||
| spec.maxIters.foreach(ivfBuilder.setMaxIters) | ||
| val centroids = VectorTrainer.trainIvfCentroids(dataset, column, ivfBuilder.build()) | ||
|
|
||
| val codebook: Array[Float] = indexType match { | ||
| case IndexType.IVF_PQ => | ||
| val pqBuilder = new PQBuildParams.Builder() | ||
| spec.numSubVectors.foreach(pqBuilder.setNumSubVectors) | ||
| spec.pqNumBits.foreach(pqBuilder.setNumBits) | ||
| spec.pqMaxIters.foreach(pqBuilder.setMaxIters) | ||
| spec.sampleRate.foreach(pqBuilder.setSampleRate) | ||
| VectorTrainer.trainPqCodebook(dataset, column, pqBuilder.build()) | ||
| case _ => null |
There was a problem hiding this comment.
Question: should useResidual be applied here too?
The executor-side toVectorIndexParams (line 714) sets useResidual on the IVF build params used to build the per-fragment segment, but neither IvfBuildParams.Builder nor PQBuildParams.Builder constructed in this method see it. If useResidual=true is supposed to influence either centroid training or — more likely — PQ codebook training (residual-PQ wants the codebook trained on vector − centroid, not raw vectors), the codebook would be shaped for raw-vector chunks while per-fragment build encodes residual chunks against it. The index would still build, but recall would be silently degraded.
Does lance-core's VectorTrainer.trainPqCodebook derive the residual mode from somewhere else, or does it need to be told via IvfBuildParams.setUseResidual / PQBuildParams? If the latter, this is a bug. If lance-core auto-derives it, ignore — and possibly worth a one-line comment here noting that the omission is deliberate.
| val sameNameExists = dataset.getIndexes.asScala.exists(_.name() == indexName) | ||
| if (sameNameExists) { | ||
| dataset.dropIndex(indexName) | ||
| } | ||
| dataset.commitExistingIndexSegments(indexName, column, segments) |
There was a problem hiding this comment.
FYI / minor — dropIndex and commitExistingIndexSegments each perform their own Lance dataset commit (manifest version bump), so they're two separate atomic steps, not one. If the second commit fails (version conflict, transient I/O error, driver crash between calls), the table is left at the intermediate state — old index dropped, new index not yet installed — until the user reruns CREATE INDEX. The existing scalar CreateIndex path is single-commit, so this is a small behavioral regression specifically for the replace-on-recreate case.
The inline comment above this block correctly notes why the pre-drop is needed (the segment commit API is additive), but doesn't acknowledge the failure window. Two suggestions, in increasing order of effort:
- Add a one-liner comment here noting that the replace is non-atomic by design (so future readers / debuggers don't have to re-derive that).
- Longer term: consider a
replaceflag oncommitExistingIndexSegmentsupstream in lance-core so the drop+commit can fold into a single manifest commit. Probably a separate issue, not this PR.
Not a blocker for me — just want to make sure the trade-off is on the record.
| val ivfBuilder = new IvfBuildParams.Builder().setNumPartitions(spec.numPartitions) | ||
| spec.sampleRate.foreach(ivfBuilder.setSampleRate) | ||
| spec.maxIters.foreach(ivfBuilder.setMaxIters) | ||
| val centroids = VectorTrainer.trainIvfCentroids(dataset, column, ivfBuilder.build()) |
There was a problem hiding this comment.
Question: should metric_type be applied to driver-side training too?
spec.metricType is parsed correctly and threaded into toVectorIndexParams on the executor side (line 719: setDistanceType(dt)), so each per-fragment build encodes against the user-requested metric. But the IvfBuildParams.Builder here on line 623 — and the PQBuildParams.Builder on line 630 — never see it. Worse, IvfBuildParams.Builder and PQBuildParams.Builder in lance@6.0.0-beta.2 (the version pinned in pom.xml) don't actually expose a setDistanceType / setMetricType setter, so even if you wanted to pass it from here, you can't.
Digging into the JNI bridge: java/lance-jni/src/vector_trainer.rs in both inner_train_ivf_centroids and inner_train_pq_codebook hardcodes let metric_type = MetricType::L2; with the comment "For now we default to L2 metric; tests and Java bindings currently use L2." This hardcode is still present on main and on v7.0.0-beta.1. So today, VectorTrainer.trainIvfCentroids / trainPqCodebook always cluster with L2 regardless of what the user asked for.
Concretely, lance-core's build_ivf_model (rust/lance/src/index/vector/ivf.rs) L2-normalizes training data when metric is cosine, and train_ivf_model passes the metric into KMeansParams::new(..., metric_type). So for metric_type='cosine', 'dot', or 'hamming', this PR builds centroids on un-normalized L2 geometry, then per-fragment encoders quantize against those centroids using the user's actual metric — silently mismatched index, degraded recall. BaseAddIndexTest#testCreateIvfSqIndex (line 549) which uses metric_type='cosine' only asserts segment count, so the regression is invisible to CI.
Two things to consider before merge:
-
Until lance-core exposes a metric setter on
IvfBuildParams.Builder/PQBuildParams.Builder(and the JNI stops hardcoding L2), should this code path reject non-L2metric_typewith a clear error so users don't get silent recall loss? Or fall back to the existing single-segmentCreateIndexpath for non-L2? -
Same shape as my line-636 comment about
useResidual— the JNI'sbuild_ivf_params_from_javadoesn't readgetUseResidualeither, so even thoughIvfBuildParams.Builderhas asetUseResidualsetter, training currently ignores it. Both of these need an upstream lance-core fix to actually be honored.
If lance-core changes are in flight that I missed, ignore — but please link them here so future debuggers don't have to re-derive the L2 hardcode.
Summary
Adds
ivf_flat,ivf_pq,ivf_sqasCREATE INDEXmethods, using lance-core's multi-segment commit API (commitExistingIndexSegments) to publish per-fragment segments atomically under one logical index name.User-facing
Grammar is untouched;
IndexUtils.buildIndexTypelearns three new cases. Replace-on-recreate is preserved viadropIndex(name)before commit (the segment commit API is additive by default).Design
rust/lance/src/index/vector.rs:514) rejects per-fragment-trained centroids — it requires pre-computed IVF centroids (and a PQ codebook for IVF_PQ). The driver callsVectorTrainer.trainIvfCentroids/trainPqCodebookonce; every per-fragment task uses the same artifacts. This also keeps all segments in the same query-time compatibility group.createIndex(IndexOptions.builder(...).withFragmentIds([fid]).build()), returns an uncommittedIndex.org.lance.index.Indexis notSerializable, so a small Scala case classLanceIndexHandlecarries its fields as primitives and the driver rebuildsIndexvia the builder.commitExistingIndexSegments(name, column, segments).FragmentBasedIndexJob/RangeBasedBTreeIndexJobstill usemergeIndexMetadata+ manualCreateIndextransaction. In v6.0.0-beta.2 scalar per-fragmentcreateIndexproduces partial files that require the merge finalize step — the segment commit API is effectively vector-only for this release.Known follow-ups (not in this PR)
sample_rate,max_iters,hnsw_m,hnsw_ef_construction, etc.OPTIMIZE INDEXfor incremental builds over new fragmentsReferences
Test plan
AddIndexTestcases green onlance-spark-3.5_2.12(11 existing scalar + 5 new vector)AddIndexTestcases green onlance-spark-4.0_2.13