Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.lance.Dataset;
import org.lance.Fragment;
import org.lance.ManifestSummary;
import org.lance.index.IndexCriteria;
import org.lance.index.IndexDescription;
import org.lance.index.scalar.ZoneStats;
import org.lance.ipc.ColumnOrdering;
Expand Down Expand Up @@ -381,7 +382,10 @@ private Set<String> findZonemapIndexedColumns(Dataset dataset) {
fieldIdToName.put(field.getId(), field.getName());
}

for (IndexDescription idx : dataset.describeIndices()) {
// Use the criteria-based overload so that indexes missing index_details
// (created by older versions) are silently skipped instead of causing errors.
IndexCriteria criteria = new IndexCriteria.Builder().build();
for (IndexDescription idx : dataset.describeIndices(criteria)) {
if ("ZONEMAP".equalsIgnoreCase(idx.getIndexType())) {
for (int fieldId : idx.getFieldIds()) {
String name = fieldIdToName.get(fieldId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ case class AddIndexExec(
val uuid = UUID.randomUUID()
val indexType = IndexUtils.buildIndexType(method)

// Create distributed index job and run it
createIndexJob(lanceDataset, readOptions, uuid.toString, fragmentIds).run()
val indexDetails = createIndexJob(lanceDataset, readOptions, uuid.toString, fragmentIds).run()

val dataset = Utils.openDatasetBuilder(readOptions).build()
try {
Expand All @@ -99,15 +98,16 @@ case class AddIndexExec(

val datasetVersion = dataset.version()

val index = Index
val builder = Index
.builder()
.uuid(uuid)
.name(indexName)
.fields(fieldIds.map(java.lang.Integer.valueOf).asJava)
.datasetVersion(datasetVersion)
.indexVersion(0)
.fragments(fragmentIds.asJava)
.build()
indexDetails.foreach(builder.indexDetails)
val index = builder.build()

// Find existing indices with the same name to mark as removed (for replace)
val removedIndices = dataset.getIndexes.asScala
Expand Down Expand Up @@ -207,7 +207,9 @@ case class AddIndexExec(
* Interface for index job to implement different indexing strategies.
*/
trait IndexJob extends Serializable {
def run(): Unit

/** @return index_details bytes from a worker, or None if unavailable. */
def run(): Option[Array[Byte]]
}

/**
Expand All @@ -234,7 +236,7 @@ class FragmentBasedIndexJob(
tableId: Option[List[String]],
initialStorageOpts: Option[Map[String, String]]) extends IndexJob {

override def run(): Unit = {
override def run(): Option[Array[Byte]] = {
val encodedReadOptions = encode(readOptions)
val columns = addIndexExec.columns.toList
val argsJson = IndexUtils.toJson(addIndexExec.args)
Expand All @@ -255,10 +257,12 @@ class FragmentBasedIndexJob(
initialStorageOpts)
}.toSeq

addIndexExec.session.sparkContext
val results = addIndexExec.session.sparkContext
.parallelize(tasks, tasks.size)
.map(t => t.execute())
.collect()

IndexUtils.collectFirstIndexDetails(results)
}
}

Expand Down Expand Up @@ -311,12 +315,11 @@ case class FragmentIndexTask(
.build()

try {
dataset.createIndex(indexOptions)
val createdIndex = dataset.createIndex(indexOptions)
encode(IndexUtils.extractIndexDetails(createdIndex))
} finally {
dataset.close()
}

encode("OK")
}
}

Expand Down Expand Up @@ -344,7 +347,7 @@ class RangeBasedBTreeIndexJob(

private val VALUE_COLUMN_NAME = "value"

override def run(): Unit = {
override def run(): Option[Array[Byte]] = {
if (addIndexExec.columns.size != 1) {
throw new UnsupportedOperationException(
"Range-based BTree index currently supports a single column only")
Expand Down Expand Up @@ -390,9 +393,11 @@ class RangeBasedBTreeIndexJob(
initialStorageOpts,
rangeDf.schema)

rangeDf.queryExecution.toRdd.mapPartitionsWithIndex { case (rangeId, rowsIter) =>
val results = rangeDf.queryExecution.toRdd.mapPartitionsWithIndex { case (rangeId, rowsIter) =>
indexBuilder.buildForRange(rangeId, rowsIter)
}.collect()

IndexUtils.collectFirstIndexDetails(results)
}

}
Expand Down Expand Up @@ -424,7 +429,7 @@ case class RangeBTreeIndexBuilder(
initialStorageOptions: Option[Map[String, String]],
schema: StructType) extends Serializable {

def buildForRange(rangeId: Int, rowsIter: Iterator[InternalRow]): Iterator[Unit] = {
def buildForRange(rangeId: Int, rowsIter: Iterator[InternalRow]): Iterator[String] = {
// Initialize writer to write data to arrow stream
val allocator = LanceRuntime.allocator()
val data =
Expand Down Expand Up @@ -452,7 +457,7 @@ case class RangeBTreeIndexBuilder(
// No rows are written
if (data.getRowCount == 0) {
data.close()
return Iterator.empty
return Iterator(encode(None: Option[Array[Byte]]))
}

var stream: ArrowArrayStream = null
Expand Down Expand Up @@ -487,15 +492,14 @@ case class RangeBTreeIndexBuilder(
.withPreprocessedData(stream)
.build()

dataset.createIndex(indexOptions)
val createdIndex = dataset.createIndex(indexOptions)
Iterator(encode(IndexUtils.extractIndexDetails(createdIndex)))
} finally {
CloseableUtil.closeQuietly(stream)
CloseableUtil.closeQuietly(reader)
CloseableUtil.closeQuietly(data)
CloseableUtil.closeQuietly(dataset)
}

Iterator.empty
}
}

Expand All @@ -519,6 +523,19 @@ object IndexUtils {
}
}

/** Extracts index_details bytes from a newly created Index. */
def extractIndexDetails(index: Index): Option[Array[Byte]] = {
val opt = index.indexDetails()
if (opt.isPresent) Some(opt.get()) else None
}

/** Returns the first non-empty index_details from serialized worker results. */
def collectFirstIndexDetails(encodedResults: Array[String]): Option[Array[Byte]] = {
encodedResults.iterator
.map(encoded => decode[Option[Array[Byte]]](encoded))
.collectFirst { case Some(details) => details }
}

def toJson(args: Seq[LanceNamedArgument]): String = {
if (args.isEmpty) {
"{}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package org.lance.spark.update;

import org.lance.index.Index;
import org.lance.index.IndexCriteria;
import org.lance.index.IndexDescription;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -449,6 +451,86 @@ public void testDropIndexThenRecreate() {
Assertions.assertEquals(1L, query.count());
}

@Test
public void testBTreeIndexHasIndexDetails() {
prepareDataset();
spark.sql(
String.format("alter table %s create index idx_details_btree using btree (id)", fullTable));
verifyIndexDetails("idx_details_btree", "BTREE");
}

@Test
public void testRangeBTreeIndexHasIndexDetails() {
prepareDataset();
spark.sql(
String.format(
"alter table %s create index idx_details_range using btree (id) with (build_mode='range')",
fullTable));
verifyIndexDetails("idx_details_range", "BTREE");
}

@Test
public void testFtsIndexHasIndexDetails() {
prepareDataset();
spark.sql(
String.format(
"alter table %s create index idx_details_fts using fts (text) with ("
+ "base_tokenizer='simple', "
+ "language='English', "
+ "max_token_length=40, "
+ "lower_case=true, "
+ "stem=false, "
+ "remove_stop_words=false, "
+ "ascii_folding=false, "
+ "with_position=true"
+ ")",
fullTable));
verifyIndexDetails("idx_details_fts", "INVERTED");
}

/** Checks index_details is populated and both describeIndices overloads work. */
private void verifyIndexDetails(String indexName, String expectedIndexType) {
org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build();
try {
List<Index> indexList = lanceDataset.getIndexes();
Index index =
indexList.stream()
.filter(i -> indexName.equals(i.name()))
.findFirst()
.orElseThrow(
() -> new AssertionError("Index '" + indexName + "' not found in dataset"));
Assertions.assertTrue(
index.indexDetails().isPresent(),
"index_details should be populated for index '" + indexName + "'");

// criteria-based overload
IndexCriteria criteria = new IndexCriteria.Builder().build();
List<IndexDescription> descriptions = lanceDataset.describeIndices(criteria);
Assertions.assertFalse(
descriptions.isEmpty(), "describeIndices(criteria) should return at least one index");
IndexDescription desc =
descriptions.stream()
.filter(d -> indexName.equals(d.getName()))
.findFirst()
.orElseThrow(
() -> new AssertionError("Index description for '" + indexName + "' not found"));
Assertions.assertEquals(
expectedIndexType.toUpperCase(),
desc.getIndexType().toUpperCase(),
"Index type mismatch for '" + indexName + "'");

// no-arg overload
List<IndexDescription> noArgDescriptions = lanceDataset.describeIndices();
Assertions.assertFalse(
noArgDescriptions.isEmpty(), "describeIndices() no-arg should succeed");
Assertions.assertTrue(
noArgDescriptions.stream().anyMatch(d -> indexName.equals(d.getName())),
"describeIndices() no-arg should contain index '" + indexName + "'");
} finally {
lanceDataset.close();
}
}

private void checkIndex(String indexName) {
// Check index is created successfully
org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build();
Expand Down
Loading