Skip to content

Commit 158c3c2

Browse files
committed
fix: propagate index_details from distributed index creation and add defensive describeIndices
Root cause: AddIndexExec discarded protobuf index_details bytes from worker-created indexes, causing describeIndices() to fail with "Index details are required for index description" on the read path. Write-side fix (AddIndexExec): - FragmentIndexTask and RangeBTreeIndexBuilder now capture index_details bytes from the created Index and return them to the driver - Driver sets index_details on the Index.Builder before committing - Extract IndexUtils.extractIndexDetails() (Java Optional → Scala Option) and IndexUtils.collectFirstIndexDetails() to eliminate duplication and unify the serialization format across fragment-based and range-based paths Read-side fix (LanceScanBuilder): - findZonemapIndexedColumns() uses describeIndices(criteria) with empty IndexCriteria so the Rust side filters out legacy indexes that lack index_details, instead of failing Tests: - Add testBTreeIndexHasIndexDetails, testRangeBTreeIndexHasIndexDetails, testFtsIndexHasIndexDetails covering all three index creation paths - Shared verifyIndexDetails() helper checks index_details presence, criteria-based and no-arg describeIndices success
1 parent 7e1712e commit 158c3c2

3 files changed

Lines changed: 121 additions & 18 deletions

File tree

lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.lance.Dataset;
1717
import org.lance.Fragment;
1818
import org.lance.ManifestSummary;
19+
import org.lance.index.IndexCriteria;
1920
import org.lance.index.IndexDescription;
2021
import org.lance.index.scalar.ZoneStats;
2122
import org.lance.ipc.ColumnOrdering;
@@ -381,7 +382,10 @@ private Set<String> findZonemapIndexedColumns(Dataset dataset) {
381382
fieldIdToName.put(field.getId(), field.getName());
382383
}
383384

384-
for (IndexDescription idx : dataset.describeIndices()) {
385+
// Use the criteria-based overload so that indexes missing index_details
386+
// (created by older versions) are silently skipped instead of causing errors.
387+
IndexCriteria criteria = new IndexCriteria.Builder().build();
388+
for (IndexDescription idx : dataset.describeIndices(criteria)) {
385389
if ("ZONEMAP".equalsIgnoreCase(idx.getIndexType())) {
386390
for (int fieldId : idx.getFieldIds()) {
387391
String name = fieldIdToName.get(fieldId);

lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ case class AddIndexExec(
8484
val uuid = UUID.randomUUID()
8585
val indexType = IndexUtils.buildIndexType(method)
8686

87-
// Create distributed index job and run it
88-
createIndexJob(lanceDataset, readOptions, uuid.toString, fragmentIds).run()
87+
val indexDetails = createIndexJob(lanceDataset, readOptions, uuid.toString, fragmentIds).run()
8988

9089
val dataset = Utils.openDatasetBuilder(readOptions).build()
9190
try {
@@ -99,15 +98,16 @@ case class AddIndexExec(
9998

10099
val datasetVersion = dataset.version()
101100

102-
val index = Index
101+
val builder = Index
103102
.builder()
104103
.uuid(uuid)
105104
.name(indexName)
106105
.fields(fieldIds.map(java.lang.Integer.valueOf).asJava)
107106
.datasetVersion(datasetVersion)
108107
.indexVersion(0)
109108
.fragments(fragmentIds.asJava)
110-
.build()
109+
indexDetails.foreach(builder.indexDetails)
110+
val index = builder.build()
111111

112112
// Find existing indices with the same name to mark as removed (for replace)
113113
val removedIndices = dataset.getIndexes.asScala
@@ -207,7 +207,9 @@ case class AddIndexExec(
207207
* Interface for index job to implement different indexing strategies.
208208
*/
209209
trait IndexJob extends Serializable {
210-
def run(): Unit
210+
211+
/** @return index_details bytes from a worker, or None if unavailable. */
212+
def run(): Option[Array[Byte]]
211213
}
212214

213215
/**
@@ -234,7 +236,7 @@ class FragmentBasedIndexJob(
234236
tableId: Option[List[String]],
235237
initialStorageOpts: Option[Map[String, String]]) extends IndexJob {
236238

237-
override def run(): Unit = {
239+
override def run(): Option[Array[Byte]] = {
238240
val encodedReadOptions = encode(readOptions)
239241
val columns = addIndexExec.columns.toList
240242
val argsJson = IndexUtils.toJson(addIndexExec.args)
@@ -255,10 +257,12 @@ class FragmentBasedIndexJob(
255257
initialStorageOpts)
256258
}.toSeq
257259

258-
addIndexExec.session.sparkContext
260+
val results = addIndexExec.session.sparkContext
259261
.parallelize(tasks, tasks.size)
260262
.map(t => t.execute())
261263
.collect()
264+
265+
IndexUtils.collectFirstIndexDetails(results)
262266
}
263267
}
264268

@@ -311,12 +315,11 @@ case class FragmentIndexTask(
311315
.build()
312316

313317
try {
314-
dataset.createIndex(indexOptions)
318+
val createdIndex = dataset.createIndex(indexOptions)
319+
encode(IndexUtils.extractIndexDetails(createdIndex))
315320
} finally {
316321
dataset.close()
317322
}
318-
319-
encode("OK")
320323
}
321324
}
322325

@@ -344,7 +347,7 @@ class RangeBasedBTreeIndexJob(
344347

345348
private val VALUE_COLUMN_NAME = "value"
346349

347-
override def run(): Unit = {
350+
override def run(): Option[Array[Byte]] = {
348351
if (addIndexExec.columns.size != 1) {
349352
throw new UnsupportedOperationException(
350353
"Range-based BTree index currently supports a single column only")
@@ -390,9 +393,11 @@ class RangeBasedBTreeIndexJob(
390393
initialStorageOpts,
391394
rangeDf.schema)
392395

393-
rangeDf.queryExecution.toRdd.mapPartitionsWithIndex { case (rangeId, rowsIter) =>
396+
val results = rangeDf.queryExecution.toRdd.mapPartitionsWithIndex { case (rangeId, rowsIter) =>
394397
indexBuilder.buildForRange(rangeId, rowsIter)
395398
}.collect()
399+
400+
IndexUtils.collectFirstIndexDetails(results)
396401
}
397402

398403
}
@@ -424,7 +429,7 @@ case class RangeBTreeIndexBuilder(
424429
initialStorageOptions: Option[Map[String, String]],
425430
schema: StructType) extends Serializable {
426431

427-
def buildForRange(rangeId: Int, rowsIter: Iterator[InternalRow]): Iterator[Unit] = {
432+
def buildForRange(rangeId: Int, rowsIter: Iterator[InternalRow]): Iterator[String] = {
428433
// Initialize writer to write data to arrow stream
429434
val allocator = LanceRuntime.allocator()
430435
val data =
@@ -452,7 +457,7 @@ case class RangeBTreeIndexBuilder(
452457
// No rows are written
453458
if (data.getRowCount == 0) {
454459
data.close()
455-
return Iterator.empty
460+
return Iterator(encode(None: Option[Array[Byte]]))
456461
}
457462

458463
var stream: ArrowArrayStream = null
@@ -487,15 +492,14 @@ case class RangeBTreeIndexBuilder(
487492
.withPreprocessedData(stream)
488493
.build()
489494

490-
dataset.createIndex(indexOptions)
495+
val createdIndex = dataset.createIndex(indexOptions)
496+
Iterator(encode(IndexUtils.extractIndexDetails(createdIndex)))
491497
} finally {
492498
CloseableUtil.closeQuietly(stream)
493499
CloseableUtil.closeQuietly(reader)
494500
CloseableUtil.closeQuietly(data)
495501
CloseableUtil.closeQuietly(dataset)
496502
}
497-
498-
Iterator.empty
499503
}
500504
}
501505

@@ -519,6 +523,19 @@ object IndexUtils {
519523
}
520524
}
521525

526+
/** Extracts index_details bytes from a newly created Index. */
527+
def extractIndexDetails(index: Index): Option[Array[Byte]] = {
528+
val opt = index.indexDetails()
529+
if (opt.isPresent) Some(opt.get()) else None
530+
}
531+
532+
/** Returns the first non-empty index_details from serialized worker results. */
533+
def collectFirstIndexDetails(encodedResults: Array[String]): Option[Array[Byte]] = {
534+
encodedResults.iterator
535+
.map(encoded => decode[Option[Array[Byte]]](encoded))
536+
.collectFirst { case Some(details) => details }
537+
}
538+
522539
def toJson(args: Seq[LanceNamedArgument]): String = {
523540
if (args.isEmpty) {
524541
"{}"

lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
package org.lance.spark.update;
1515

1616
import org.lance.index.Index;
17+
import org.lance.index.IndexCriteria;
18+
import org.lance.index.IndexDescription;
1719

1820
import org.apache.spark.sql.Dataset;
1921
import org.apache.spark.sql.Row;
@@ -449,6 +451,86 @@ public void testDropIndexThenRecreate() {
449451
Assertions.assertEquals(1L, query.count());
450452
}
451453

454+
@Test
455+
public void testBTreeIndexHasIndexDetails() {
456+
prepareDataset();
457+
spark.sql(
458+
String.format("alter table %s create index idx_details_btree using btree (id)", fullTable));
459+
verifyIndexDetails("idx_details_btree", "BTREE");
460+
}
461+
462+
@Test
463+
public void testRangeBTreeIndexHasIndexDetails() {
464+
prepareDataset();
465+
spark.sql(
466+
String.format(
467+
"alter table %s create index idx_details_range using btree (id) with (build_mode='range')",
468+
fullTable));
469+
verifyIndexDetails("idx_details_range", "BTREE");
470+
}
471+
472+
@Test
473+
public void testFtsIndexHasIndexDetails() {
474+
prepareDataset();
475+
spark.sql(
476+
String.format(
477+
"alter table %s create index idx_details_fts using fts (text) with ("
478+
+ "base_tokenizer='simple', "
479+
+ "language='English', "
480+
+ "max_token_length=40, "
481+
+ "lower_case=true, "
482+
+ "stem=false, "
483+
+ "remove_stop_words=false, "
484+
+ "ascii_folding=false, "
485+
+ "with_position=true"
486+
+ ")",
487+
fullTable));
488+
verifyIndexDetails("idx_details_fts", "INVERTED");
489+
}
490+
491+
/** Checks index_details is populated and both describeIndices overloads work. */
492+
private void verifyIndexDetails(String indexName, String expectedIndexType) {
493+
org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build();
494+
try {
495+
List<Index> indexList = lanceDataset.getIndexes();
496+
Index index =
497+
indexList.stream()
498+
.filter(i -> indexName.equals(i.name()))
499+
.findFirst()
500+
.orElseThrow(
501+
() -> new AssertionError("Index '" + indexName + "' not found in dataset"));
502+
Assertions.assertTrue(
503+
index.indexDetails().isPresent(),
504+
"index_details should be populated for index '" + indexName + "'");
505+
506+
// criteria-based overload
507+
IndexCriteria criteria = new IndexCriteria.Builder().build();
508+
List<IndexDescription> descriptions = lanceDataset.describeIndices(criteria);
509+
Assertions.assertFalse(
510+
descriptions.isEmpty(), "describeIndices(criteria) should return at least one index");
511+
IndexDescription desc =
512+
descriptions.stream()
513+
.filter(d -> indexName.equals(d.getName()))
514+
.findFirst()
515+
.orElseThrow(
516+
() -> new AssertionError("Index description for '" + indexName + "' not found"));
517+
Assertions.assertEquals(
518+
expectedIndexType.toUpperCase(),
519+
desc.getIndexType().toUpperCase(),
520+
"Index type mismatch for '" + indexName + "'");
521+
522+
// no-arg overload
523+
List<IndexDescription> noArgDescriptions = lanceDataset.describeIndices();
524+
Assertions.assertFalse(
525+
noArgDescriptions.isEmpty(), "describeIndices() no-arg should succeed");
526+
Assertions.assertTrue(
527+
noArgDescriptions.stream().anyMatch(d -> indexName.equals(d.getName())),
528+
"describeIndices() no-arg should contain index '" + indexName + "'");
529+
} finally {
530+
lanceDataset.close();
531+
}
532+
}
533+
452534
private void checkIndex(String indexName) {
453535
// Check index is created successfully
454536
org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build();

0 commit comments

Comments
 (0)