diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java index 1178bfa3..2edf7b22 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java @@ -16,6 +16,7 @@ import org.lance.Dataset; import org.lance.Fragment; import org.lance.ManifestSummary; +import org.lance.index.IndexCriteria; import org.lance.index.IndexDescription; import org.lance.index.scalar.ZoneStats; import org.lance.ipc.ColumnOrdering; @@ -381,7 +382,10 @@ private Set findZonemapIndexedColumns(Dataset dataset) { fieldIdToName.put(field.getId(), field.getName()); } - for (IndexDescription idx : dataset.describeIndices()) { + // Use the criteria-based overload so that indexes missing index_details + // (created by older versions) are silently skipped instead of causing errors. + IndexCriteria criteria = new IndexCriteria.Builder().build(); + for (IndexDescription idx : dataset.describeIndices(criteria)) { if ("ZONEMAP".equalsIgnoreCase(idx.getIndexType())) { for (int fieldId : idx.getFieldIds()) { String name = fieldIdToName.get(fieldId); diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index 6f270d00..10e1b844 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -84,8 +84,7 @@ case class AddIndexExec( val uuid = UUID.randomUUID() val indexType = IndexUtils.buildIndexType(method) - // Create distributed index job and run it - createIndexJob(lanceDataset, readOptions, uuid.toString, fragmentIds).run() + val indexDetails = createIndexJob(lanceDataset, readOptions, uuid.toString, fragmentIds).run() val dataset = Utils.openDatasetBuilder(readOptions).build() try { @@ -99,7 +98,7 @@ case class AddIndexExec( val datasetVersion = dataset.version() - val index = Index + val builder = Index .builder() .uuid(uuid) .name(indexName) @@ -107,7 +106,8 @@ case class AddIndexExec( .datasetVersion(datasetVersion) .indexVersion(0) .fragments(fragmentIds.asJava) - .build() + indexDetails.foreach(builder.indexDetails) + val index = builder.build() // Find existing indices with the same name to mark as removed (for replace) val removedIndices = dataset.getIndexes.asScala @@ -207,7 +207,9 @@ case class AddIndexExec( * Interface for index job to implement different indexing strategies. */ trait IndexJob extends Serializable { - def run(): Unit + + /** @return index_details bytes from a worker, or None if unavailable. */ + def run(): Option[Array[Byte]] } /** @@ -234,7 +236,7 @@ class FragmentBasedIndexJob( tableId: Option[List[String]], initialStorageOpts: Option[Map[String, String]]) extends IndexJob { - override def run(): Unit = { + override def run(): Option[Array[Byte]] = { val encodedReadOptions = encode(readOptions) val columns = addIndexExec.columns.toList val argsJson = IndexUtils.toJson(addIndexExec.args) @@ -255,10 +257,12 @@ class FragmentBasedIndexJob( initialStorageOpts) }.toSeq - addIndexExec.session.sparkContext + val results = addIndexExec.session.sparkContext .parallelize(tasks, tasks.size) .map(t => t.execute()) .collect() + + IndexUtils.collectFirstIndexDetails(results) } } @@ -311,12 +315,11 @@ case class FragmentIndexTask( .build() try { - dataset.createIndex(indexOptions) + val createdIndex = dataset.createIndex(indexOptions) + encode(IndexUtils.extractIndexDetails(createdIndex)) } finally { dataset.close() } - - encode("OK") } } @@ -344,7 +347,7 @@ class RangeBasedBTreeIndexJob( private val VALUE_COLUMN_NAME = "value" - override def run(): Unit = { + override def run(): Option[Array[Byte]] = { if (addIndexExec.columns.size != 1) { throw new UnsupportedOperationException( "Range-based BTree index currently supports a single column only") @@ -390,9 +393,11 @@ class RangeBasedBTreeIndexJob( initialStorageOpts, rangeDf.schema) - rangeDf.queryExecution.toRdd.mapPartitionsWithIndex { case (rangeId, rowsIter) => + val results = rangeDf.queryExecution.toRdd.mapPartitionsWithIndex { case (rangeId, rowsIter) => indexBuilder.buildForRange(rangeId, rowsIter) }.collect() + + IndexUtils.collectFirstIndexDetails(results) } } @@ -424,7 +429,7 @@ case class RangeBTreeIndexBuilder( initialStorageOptions: Option[Map[String, String]], schema: StructType) extends Serializable { - def buildForRange(rangeId: Int, rowsIter: Iterator[InternalRow]): Iterator[Unit] = { + def buildForRange(rangeId: Int, rowsIter: Iterator[InternalRow]): Iterator[String] = { // Initialize writer to write data to arrow stream val allocator = LanceRuntime.allocator() val data = @@ -452,7 +457,7 @@ case class RangeBTreeIndexBuilder( // No rows are written if (data.getRowCount == 0) { data.close() - return Iterator.empty + return Iterator(encode(None: Option[Array[Byte]])) } var stream: ArrowArrayStream = null @@ -487,15 +492,14 @@ case class RangeBTreeIndexBuilder( .withPreprocessedData(stream) .build() - dataset.createIndex(indexOptions) + val createdIndex = dataset.createIndex(indexOptions) + Iterator(encode(IndexUtils.extractIndexDetails(createdIndex))) } finally { CloseableUtil.closeQuietly(stream) CloseableUtil.closeQuietly(reader) CloseableUtil.closeQuietly(data) CloseableUtil.closeQuietly(dataset) } - - Iterator.empty } } @@ -519,6 +523,19 @@ object IndexUtils { } } + /** Extracts index_details bytes from a newly created Index. */ + def extractIndexDetails(index: Index): Option[Array[Byte]] = { + val opt = index.indexDetails() + if (opt.isPresent) Some(opt.get()) else None + } + + /** Returns the first non-empty index_details from serialized worker results. */ + def collectFirstIndexDetails(encodedResults: Array[String]): Option[Array[Byte]] = { + encodedResults.iterator + .map(encoded => decode[Option[Array[Byte]]](encoded)) + .collectFirst { case Some(details) => details } + } + def toJson(args: Seq[LanceNamedArgument]): String = { if (args.isEmpty) { "{}" diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java index 3133ee2e..9179187e 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java @@ -14,6 +14,8 @@ package org.lance.spark.update; import org.lance.index.Index; +import org.lance.index.IndexCriteria; +import org.lance.index.IndexDescription; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -449,6 +451,86 @@ public void testDropIndexThenRecreate() { Assertions.assertEquals(1L, query.count()); } + @Test + public void testBTreeIndexHasIndexDetails() { + prepareDataset(); + spark.sql( + String.format("alter table %s create index idx_details_btree using btree (id)", fullTable)); + verifyIndexDetails("idx_details_btree", "BTREE"); + } + + @Test + public void testRangeBTreeIndexHasIndexDetails() { + prepareDataset(); + spark.sql( + String.format( + "alter table %s create index idx_details_range using btree (id) with (build_mode='range')", + fullTable)); + verifyIndexDetails("idx_details_range", "BTREE"); + } + + @Test + public void testFtsIndexHasIndexDetails() { + prepareDataset(); + spark.sql( + String.format( + "alter table %s create index idx_details_fts using fts (text) with (" + + "base_tokenizer='simple', " + + "language='English', " + + "max_token_length=40, " + + "lower_case=true, " + + "stem=false, " + + "remove_stop_words=false, " + + "ascii_folding=false, " + + "with_position=true" + + ")", + fullTable)); + verifyIndexDetails("idx_details_fts", "INVERTED"); + } + + /** Checks index_details is populated and both describeIndices overloads work. */ + private void verifyIndexDetails(String indexName, String expectedIndexType) { + org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); + try { + List indexList = lanceDataset.getIndexes(); + Index index = + indexList.stream() + .filter(i -> indexName.equals(i.name())) + .findFirst() + .orElseThrow( + () -> new AssertionError("Index '" + indexName + "' not found in dataset")); + Assertions.assertTrue( + index.indexDetails().isPresent(), + "index_details should be populated for index '" + indexName + "'"); + + // criteria-based overload + IndexCriteria criteria = new IndexCriteria.Builder().build(); + List descriptions = lanceDataset.describeIndices(criteria); + Assertions.assertFalse( + descriptions.isEmpty(), "describeIndices(criteria) should return at least one index"); + IndexDescription desc = + descriptions.stream() + .filter(d -> indexName.equals(d.getName())) + .findFirst() + .orElseThrow( + () -> new AssertionError("Index description for '" + indexName + "' not found")); + Assertions.assertEquals( + expectedIndexType.toUpperCase(), + desc.getIndexType().toUpperCase(), + "Index type mismatch for '" + indexName + "'"); + + // no-arg overload + List noArgDescriptions = lanceDataset.describeIndices(); + Assertions.assertFalse( + noArgDescriptions.isEmpty(), "describeIndices() no-arg should succeed"); + Assertions.assertTrue( + noArgDescriptions.stream().anyMatch(d -> indexName.equals(d.getName())), + "describeIndices() no-arg should contain index '" + indexName + "'"); + } finally { + lanceDataset.close(); + } + } + private void checkIndex(String indexName) { // Check index is created successfully org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build();