diff --git a/docker/tests/test_lance_spark.py b/docker/tests/test_lance_spark.py index a21eff06..9c872fe2 100644 --- a/docker/tests/test_lance_spark.py +++ b/docker/tests/test_lance_spark.py @@ -492,7 +492,7 @@ def test_properties_persist_after_insert(self, spark): class TestDDLIndex: - """Test DDL index operations: CREATE INDEX (BTree, FTS).""" + """Test DDL index operations: CREATE INDEX (BTree, ZoneMap, FTS).""" def test_create_btree_index_on_int(self, spark): """Test CREATE INDEX with BTree on integer column.""" @@ -562,6 +562,42 @@ def test_create_btree_index_on_string(self, spark): """).collect() assert len(query_result) == 3 + def test_create_zonemap_index_on_int(self, spark): + """Test CREATE INDEX with ZoneMap on integer column.""" + spark.sql(""" + CREATE TABLE default.test_table ( + id INT, + name STRING, + value DOUBLE + ) + """) + + data = [(i, f"Name{i}", float(i * 10)) for i in range(100)] + df = spark.createDataFrame(data, ["id", "name", "value"]) + df.writeTo("default.test_table").append() + + result = spark.sql(""" + ALTER TABLE default.test_table + CREATE INDEX idx_id_zonemap USING zonemap (id) + WITH (rows_per_zone = 8) + """).collect() + + assert len(result) == 1 + assert result[0][1] == "idx_id_zonemap" + + indexes = spark.sql(""" + SHOW INDEXES IN default.test_table + """).collect() + zonemap_rows = [row for row in indexes if row["name"] == "idx_id_zonemap"] + assert len(zonemap_rows) == 1 + assert zonemap_rows[0]["index_type"] == "zonemap" + + query_result = spark.sql(""" + SELECT * FROM default.test_table WHERE id = 50 + """).collect() + assert len(query_result) == 1 + assert query_result[0].id == 50 + def test_create_fts_index(self, spark): """Test CREATE INDEX with full-text search (FTS).""" spark.sql(""" diff --git a/docs/src/operations/ddl/create-index.md b/docs/src/operations/ddl/create-index.md index 7e875d87..a26889f2 100755 --- a/docs/src/operations/ddl/create-index.md +++ b/docs/src/operations/ddl/create-index.md @@ -7,7 +7,7 @@ Creates a scalar index on a Lance table to accelerate queries. ## Overview -The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. This operation is performed in a distributed manner, building indexes for each data fragment in parallel. +The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. Depending on the index method, Lance Spark either uses a fragment-parallel build path or a driver-coordinated commit flow after parallel executor builds. ## Basic Usage @@ -24,6 +24,7 @@ The following index methods are supported: | Method | Description | |---------|-----------------------------------------------------------------------------| +| `zonemap` | Lightweight min/max index for fragment pruning on a scalar column. | | `btree` | B-tree index for efficient range queries and point lookups on scalar columns. | | `fts` | Full-text search (inverted) index for text search on string columns. | @@ -31,6 +32,14 @@ The following index methods are supported: The `CREATE INDEX` command supports options via the `WITH` clause to control index creation. These options are specific to the chosen index method. +### ZoneMap Options + +For the `zonemap` method, the following options are supported: + +| Option | Type | Description | +|-----------------|------|----------------------------------------------| +| `rows_per_zone` | Long | The approximate number of rows per zonemap zone. | + ### BTree Options For the `btree` method, the following options are supported: @@ -77,6 +86,15 @@ Create a composite index on multiple columns. ALTER TABLE lance.db.logs CREATE INDEX idx_ts_level USING btree (timestamp, level); ``` +### Lightweight Fragment Pruning + +Create a zonemap index when you want lightweight min/max-based fragment pruning: + +=== "SQL" + ```sql + ALTER TABLE lance.db.users CREATE INDEX idx_id_zonemap USING zonemap (id); + ``` + ### Indexing with Options Create an index and specify the `zone_size` for the B-tree: @@ -86,6 +104,15 @@ Create an index and specify the `zone_size` for the B-tree: ALTER TABLE lance.db.users CREATE INDEX idx_id_zoned USING btree (id) WITH (zone_size = 2048); ``` +### Zonemap with Options + +Create a zonemap index and specify the approximate number of rows per zone: + +=== "SQL" + ```sql + ALTER TABLE lance.db.users CREATE INDEX idx_id_zonemap USING zonemap (id) WITH (rows_per_zone = 2048); + ``` + ### Full-Text Search Index Create an FTS index on a text column: @@ -117,17 +144,19 @@ The `CREATE INDEX` command returns the following information about the operation Consider creating an index when: - You frequently filter a large table on a specific column. +- You want lightweight fragment pruning based on per-zone min/max statistics. - Your queries involve point lookups or small range scans. ## How It Works The `CREATE INDEX` command operates as follows: -1. **Distributed Index Building**: For each fragment in the Lance dataset, a separate task is launched to build an index on the specified column(s). -2. **Metadata Merging**: Once all per-fragment indexes are built, their metadata is collected and merged. +1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree`, `fts`, and `zonemap` can build physical index segments in parallel across fragments. Range-mode `btree` uses Spark repartitioning and sorted preprocessed data. +2. **Metadata Finalization**: Lance Spark merges or commits the resulting index metadata on the driver so the new logical index becomes visible atomically. 3. **Transactional Commit**: A new table version is committed with the new index information. The operation is atomic and ensures that concurrent reads are not affected. ## Notes and Limitations -- **Index Methods**: The `btree` and `fts` methods are supported for scalar index creation. -- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one. This is because the underlying implementation uses `replace(true)`. +- **Index Methods**: The `zonemap`, `btree`, and `fts` methods are supported for scalar index creation. +- **Zonemap Column Count**: Zonemap indexes currently support a single column only. The generic `CREATE INDEX` grammar accepts a column list, but Lance rejects multi-column zonemap creation. +- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one. diff --git a/docs/src/operations/ddl/show-indexes.md b/docs/src/operations/ddl/show-indexes.md index c53ee448..1c546cdc 100755 --- a/docs/src/operations/ddl/show-indexes.md +++ b/docs/src/operations/ddl/show-indexes.md @@ -49,7 +49,7 @@ The `SHOW INDEXES` command returns the following columns: |-------------------------|---------------|--------------------------------------------------------------------| | `name` | string | Logical name of the index. | | `fields` | array | List of column names included in the index. | -| `index_type` | string | Human-readable index type (for example `btree`). | +| `index_type` | string | Human-readable index type (for example `btree` or `zonemap`). | | `num_indexed_fragments` | long | Number of fragments fully or partially covered by the index. | | `num_indexed_rows` | long | Approximate number of rows covered by the index. | | `num_unindexed_fragments` | long | Number of fragments that are not yet indexed. | diff --git a/integration-tests/test_lance_spark.py b/integration-tests/test_lance_spark.py index f63666f2..894fc52d 100644 --- a/integration-tests/test_lance_spark.py +++ b/integration-tests/test_lance_spark.py @@ -620,7 +620,7 @@ def test_compression_metadata_reaches_lance_file(self, spark): class TestDDLIndex: - """Test DDL index operations: CREATE INDEX (BTree, FTS).""" + """Test DDL index operations: CREATE INDEX (BTree, ZoneMap, FTS).""" def test_create_btree_index_on_int(self, spark): """Test CREATE INDEX with BTree on integer column.""" @@ -690,6 +690,42 @@ def test_create_btree_index_on_string(self, spark): """).collect() assert len(query_result) == 3 + def test_create_zonemap_index_on_int(self, spark): + """Test CREATE INDEX with ZoneMap on integer column.""" + spark.sql(""" + CREATE TABLE default.test_table ( + id INT, + name STRING, + value DOUBLE + ) + """) + + data = [(i, f"Name{i}", float(i * 10)) for i in range(100)] + df = spark.createDataFrame(data, ["id", "name", "value"]) + df.writeTo("default.test_table").append() + + result = spark.sql(""" + ALTER TABLE default.test_table + CREATE INDEX idx_id_zonemap USING zonemap (id) + WITH (rows_per_zone = 8) + """).collect() + + assert len(result) == 1 + assert result[0][1] == "idx_id_zonemap" + + indexes = spark.sql(""" + SHOW INDEXES IN default.test_table + """).collect() + zonemap_rows = [row for row in indexes if row["name"] == "idx_id_zonemap"] + assert len(zonemap_rows) == 1 + assert zonemap_rows[0]["index_type"] == "zonemap" + + query_result = spark.sql(""" + SELECT * FROM default.test_table WHERE id = 50 + """).collect() + assert len(query_result) == 1 + assert query_result[0].id == 50 + def test_create_fts_index(self, spark): """Test CREATE INDEX with full-text search (FTS).""" spark.sql(""" diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java index 90199a9d..fe822f88 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java @@ -87,6 +87,7 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in if (inputPartition.getWhereCondition().isPresent()) { scanOptions.filter(inputPartition.getWhereCondition().get()); } + scanOptions.useScalarIndex(inputPartition.isUseScalarIndex()); scanOptions.batchSize(readOptions.getBatchSize()); if (readOptions.getNearest() != null) { scanOptions.nearest(readOptions.getNearest()); diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java index e80bb0fc..5235061a 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java @@ -77,6 +77,7 @@ private long computeCount() { if (inputPartition.getWhereCondition().isPresent()) { scanOptionsBuilder.filter(inputPartition.getWhereCondition().get()); } + scanOptionsBuilder.useScalarIndex(inputPartition.isUseScalarIndex()); scanOptionsBuilder.withRowId(true); scanOptionsBuilder.columns(Lists.newArrayList()); scanOptionsBuilder.fragmentIds(fragmentIds); diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceInputPartition.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceInputPartition.java index fc7fd14a..ecd8c29c 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceInputPartition.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceInputPartition.java @@ -38,6 +38,7 @@ public class LanceInputPartition implements HasPartitionKey { private final Optional> topNSortOrders; private final Optional pushedAggregation; private final String scanId; + private final boolean useScalarIndex; /** * Initial storage options fetched from namespace.describeTable() on the driver. These are passed @@ -69,6 +70,7 @@ public LanceInputPartition( Optional> topNSortOrders, Optional pushedAggregation, String scanId, + boolean useScalarIndex, Map initialStorageOptions, String namespaceImpl, Map namespaceProperties, @@ -83,6 +85,7 @@ public LanceInputPartition( this.topNSortOrders = topNSortOrders; this.pushedAggregation = pushedAggregation; this.scanId = scanId; + this.useScalarIndex = useScalarIndex; this.initialStorageOptions = initialStorageOptions; this.namespaceImpl = namespaceImpl; this.namespaceProperties = namespaceProperties; @@ -129,6 +132,10 @@ public String getScanId() { return scanId; } + public boolean isUseScalarIndex() { + return useScalarIndex; + } + public Map getInitialStorageOptions() { return initialStorageOptions; } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java index 9cfb41f9..3d813c98 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java @@ -107,6 +107,7 @@ public class LanceScan private final String namespaceImpl; private final java.util.Map namespaceProperties; + private final boolean useScalarIndex; public LanceScan( StructType schema, @@ -121,6 +122,7 @@ public LanceScan( java.util.Map> zonemapStats, Set survivingFragmentIds, ZonemapFragmentPruner.PartitionInfo partitionInfo, + boolean useScalarIndex, java.util.Map initialStorageOptions, String namespaceImpl, java.util.Map namespaceProperties) { @@ -137,6 +139,7 @@ public LanceScan( this.zonemapStats = zonemapStats != null ? zonemapStats : Collections.emptyMap(); this.cachedSurvivingFragmentIds = survivingFragmentIds; this.partitionInfo = partitionInfo; + this.useScalarIndex = useScalarIndex; this.initialStorageOptions = initialStorageOptions; this.namespaceImpl = namespaceImpl; this.namespaceProperties = namespaceProperties; @@ -191,6 +194,7 @@ public InputPartition[] planInputPartitions() { topNSortOrders, pushedAggregation, scanId, + useScalarIndex, initialStorageOptions, namespaceImpl, namespaceProperties, diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java index 8773b6a0..9f144760 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java @@ -16,7 +16,8 @@ import org.lance.Dataset; import org.lance.Fragment; import org.lance.ManifestSummary; -import org.lance.index.IndexDescription; +import org.lance.index.Index; +import org.lance.index.IndexType; import org.lance.index.scalar.ZoneStats; import org.lance.ipc.ColumnOrdering; import org.lance.schema.LanceField; @@ -47,6 +48,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -69,6 +71,7 @@ public class LanceScanBuilder private StructType schema; private Filter[] pushedFilters = new Filter[0]; + private boolean forcePostScanFiltering = false; private Optional limit = Optional.empty(); private Optional offset = Optional.empty(); private Optional> topNSortOrders = Optional.empty(); @@ -213,7 +216,11 @@ public Scan build() { // Close the lazily opened dataset - it's no longer needed after build closeLazyDataset(); - Optional whereCondition = FilterPushDown.compileFiltersToSqlWhereClause(pushedFilters); + Optional whereCondition = + forcePostScanFiltering + ? Optional.empty() + : FilterPushDown.compileFiltersToSqlWhereClause(pushedFilters); + boolean useScalarIndex = zonemapStats.isEmpty(); return new LanceScan( schema, readOptions, @@ -227,6 +234,7 @@ public Scan build() { zonemapStats, survivingFragmentIds, partitionInfo, + useScalarIndex, initialStorageOptions, namespaceImpl, namespaceProperties); @@ -244,7 +252,14 @@ public Filter[] pushFilters(Filter[] filters) { } Filter[][] processFilters = FilterPushDown.processFilters(filters); pushedFilters = processFilters[0]; - return processFilters[1]; + forcePostScanFiltering = shouldForcePostScanFiltering(pushedFilters); + if (!forcePostScanFiltering) { + return processFilters[1]; + } + LOG.info( + "Using Spark post-scan filtering for segmented zonemap query on dataset {}", + readOptions.getDatasetUri()); + return concatFilters(processFilters[0], processFilters[1]); } @Override @@ -304,6 +319,9 @@ public boolean pushTopN(SortOrder[] orders, int limit) { @Override public boolean pushAggregation(Aggregation aggregation) { + if (forcePostScanFiltering && pushedFilters.length > 0) { + return false; + } AggregateFunc[] funcs = aggregation.aggregateExpressions(); if (aggregation.groupByExpressions().length > 0) { return false; @@ -382,9 +400,9 @@ private Set findZonemapIndexedColumns(Dataset dataset) { fieldIdToName.put(field.getId(), field.getName()); } - for (IndexDescription idx : dataset.describeIndices()) { - if ("ZONEMAP".equalsIgnoreCase(idx.getIndexType())) { - for (int fieldId : idx.getFieldIds()) { + for (Index idx : dataset.getIndexes()) { + if (idx.indexType() == IndexType.ZONEMAP) { + for (int fieldId : idx.fields()) { String name = fieldIdToName.get(fieldId); if (name != null) { columns.add(name); @@ -407,4 +425,52 @@ private static Set extractReferencedColumns(Filter[] filters) { } return columns; } + + /** + * Segmented zonemap indexes are currently used safely for fragment pruning, but scan-time filter + * pushdown still needs a Spark-side fallback until Lance-core query execution fully handles that + * layout. + */ + private boolean shouldForcePostScanFiltering(Filter[] acceptedFilters) { + if (acceptedFilters.length == 0) { + return false; + } + + Set referencedColumns = extractReferencedColumns(acceptedFilters); + if (referencedColumns.isEmpty()) { + return false; + } + + Dataset dataset = getOrOpenDataset(); + Map fieldIdToName = new HashMap<>(); + for (LanceField field : dataset.getLanceSchema().fields()) { + fieldIdToName.put(field.getId(), field.getName()); + } + + Map segmentedZonemapCounts = new HashMap<>(); + for (Index idx : dataset.getIndexes()) { + if (idx.indexType() != IndexType.ZONEMAP || idx.fields().size() != 1) { + continue; + } + if (!idx.fragments().isPresent() || idx.fragments().get().size() != 1) { + continue; + } + + String columnName = fieldIdToName.get(idx.fields().get(0)); + if (columnName == null || !referencedColumns.contains(columnName)) { + continue; + } + + String key = idx.name() + ":" + columnName; + segmentedZonemapCounts.merge(key, 1, Integer::sum); + } + + return segmentedZonemapCounts.values().stream().anyMatch(count -> count > 1); + } + + private static Filter[] concatFilters(Filter[] first, Filter[] second) { + Filter[] combined = Arrays.copyOf(first, first.length + second.length); + System.arraycopy(second, 0, combined, first.length, second.length); + return combined; + } } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java index 06b5a6de..7a9e310d 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java @@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -159,7 +161,6 @@ private static Optional> analyzeFilter( return Optional.empty(); } - @SuppressWarnings("unchecked") private static Optional> analyzeComparison( String column, Object value, @@ -171,17 +172,9 @@ private static Optional> analyzeComparison( return Optional.empty(); } - Comparable target; - try { - target = (Comparable) value; - } catch (ClassCastException e) { - LOG.warn("Cannot cast filter value {} to Comparable for zonemap pruning", value); - return Optional.empty(); - } - Set matchingFragments = new HashSet<>(); for (ZoneStats zone : stats) { - if (zoneMatchesComparison(zone, target, type)) { + if (zoneMatchesComparison(zone, value, type)) { matchingFragments.add(zone.getFragmentId()); } } @@ -189,12 +182,10 @@ private static Optional> analyzeComparison( return Optional.of(matchingFragments); } - @SuppressWarnings("unchecked") - private static boolean zoneMatchesComparison( - ZoneStats zone, Comparable target, ComparisonType type) { + private static boolean zoneMatchesComparison(ZoneStats zone, Object target, ComparisonType type) { - Comparable min = (Comparable) zone.getMin(); - Comparable max = (Comparable) zone.getMax(); + Object min = zone.getMin(); + Object max = zone.getMax(); // If min or max is null, the zone contains only nulls for the indexed range; // non-null comparisons cannot match. @@ -206,27 +197,26 @@ private static boolean zoneMatchesComparison( switch (type) { case EQUALS: // target ∈ [min, max] - return target.compareTo(min) >= 0 && target.compareTo(max) <= 0; + return compareValues(target, min) >= 0 && compareValues(target, max) <= 0; case LESS_THAN: // ∃ row < target ⟺ zone.min < target - return min.compareTo(target) < 0; + return compareValues(min, target) < 0; case LESS_THAN_OR_EQUAL: - return min.compareTo(target) <= 0; + return compareValues(min, target) <= 0; case GREATER_THAN: - return max.compareTo(target) > 0; + return compareValues(max, target) > 0; case GREATER_THAN_OR_EQUAL: - return max.compareTo(target) >= 0; + return compareValues(max, target) >= 0; default: return true; // conservative } - } catch (ClassCastException e) { + } catch (ClassCastException | IllegalArgumentException e) { // Type mismatch between filter value and zone stats — be conservative LOG.warn("Type mismatch in zonemap comparison, skipping pruning for zone", e); return true; } } - @SuppressWarnings("unchecked") private static Optional> analyzeIn( String column, Object[] values, Map> statsByColumn) { @@ -244,14 +234,7 @@ private static Optional> analyzeIn( break; } } else { - try { - Comparable target = (Comparable) value; - if (zoneMatchesComparison(zone, target, ComparisonType.EQUALS)) { - matchingFragments.add(zone.getFragmentId()); - break; - } - } catch (ClassCastException e) { - // Non-comparable value, conservatively include + if (zoneMatchesComparison(zone, value, ComparisonType.EQUALS)) { matchingFragments.add(zone.getFragmentId()); break; } @@ -262,6 +245,44 @@ private static Optional> analyzeIn( return Optional.of(matchingFragments); } + @SuppressWarnings({"rawtypes", "unchecked"}) + private static int compareValues(Object left, Object right) { + if (left instanceof Number && right instanceof Number) { + return compareNumbers((Number) left, (Number) right); + } + if (isStringLike(left) && isStringLike(right)) { + return left.toString().compareTo(right.toString()); + } + return ((Comparable) left).compareTo(right); + } + + private static int compareNumbers(Number left, Number right) { + return toBigDecimal(left).compareTo(toBigDecimal(right)); + } + + private static BigDecimal toBigDecimal(Number value) { + if (value instanceof BigDecimal) { + return (BigDecimal) value; + } + if (value instanceof BigInteger) { + return new BigDecimal((BigInteger) value); + } + if (value instanceof Byte + || value instanceof Short + || value instanceof Integer + || value instanceof Long) { + return BigDecimal.valueOf(value.longValue()); + } + if (value instanceof Float || value instanceof Double) { + return BigDecimal.valueOf(value.doubleValue()); + } + return new BigDecimal(value.toString()); + } + + private static boolean isStringLike(Object value) { + return value instanceof CharSequence || value instanceof UTF8String; + } + private static Optional> analyzeIsNull( String column, Map> statsByColumn) { diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index d52ef85b..c923ad14 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -40,12 +40,12 @@ import java.util.{Collections, Optional, UUID} import scala.collection.JavaConverters._ /** - * Physical execution of distributed CREATE INDEX (ALTER TABLE ... CREATE INDEX ...) for Lance datasets. + * Physical execution of CREATE INDEX (ALTER TABLE ... CREATE INDEX ...) for Lance datasets. * *
    *
  • For BTREE index, it uses a range-based approach that redistributes and sorts data across partitions, creates indexes for each range in parallel, and finally merges them into a global index structure. - *
  • For other index types, it processes each fragment independently in parallel, merges index metadata - * and commits an index-creation transaction. + *
  • For fragment-trainable scalar index types, it processes each fragment independently in parallel, merges index metadata and commits an index-creation transaction. + *
  • For zonemap index, it builds one uncommitted segment per fragment in parallel and commits the logical index on the driver. *
*/ case class AddIndexExec( @@ -84,8 +84,20 @@ case class AddIndexExec( val uuid = UUID.randomUUID() val indexType = IndexUtils.buildIndexType(method) + if (indexType == IndexType.ZONEMAP && columns.size != 1) { + throw new UnsupportedOperationException( + "Zonemap index currently supports a single column only") + } + // Create distributed index job and run it - createIndexJob(lanceDataset, readOptions, uuid.toString, fragmentIds).run() + val createdSegments = createIndexJob(lanceDataset, readOptions, indexType, uuid.toString, fragmentIds).run() + + if (indexType == IndexType.ZONEMAP) { + commitIndexSegments(readOptions, columns.head, createdSegments) + return Seq(new GenericInternalRow(Array[Any]( + fragmentIds.size.toLong, + UTF8String.fromString(indexName)))) + } val dataset = Utils.openDatasetBuilder(readOptions).build() try { @@ -139,9 +151,59 @@ case class AddIndexExec( UTF8String.fromString(indexName)))) } + private def commitIndexSegments( + readOptions: LanceSparkReadOptions, + column: String, + segments: Seq[Index]): Unit = { + val dataset = Utils.openDatasetBuilder(readOptions).build() + try { + val existingIndices = dataset.getIndexes.asScala + .filter(_.name() == indexName) + .toList + + val committedSegments = + dataset.commitExistingIndexSegments(indexName, column, segments.toList.asJava).asScala.toList + + if (existingIndices.nonEmpty) { + val committedUuids = committedSegments.map(_.uuid()).toSet + val refreshedDataset = Utils.openDatasetBuilder(readOptions).build() + try { + val removedIndices = refreshedDataset.getIndexes.asScala + .filter(index => index.name() == indexName && !committedUuids.contains(index.uuid())) + .toList + .asJava + + if (!removedIndices.isEmpty) { + val op = AddIndexOperation.builder() + .withNewIndices(Collections.emptyList()) + .withRemovedIndices(removedIndices) + .build() + val txn = new Transaction.Builder() + .readVersion(refreshedDataset.version()) + .operation(op) + .build() + try { + val newDataset = new CommitBuilder(refreshedDataset) + .writeParams(readOptions.getStorageOptions) + .execute(txn) + newDataset.close() + } finally { + txn.close() + } + } + } finally { + refreshedDataset.close() + } + } + } finally { + dataset.close() + } + } + private def createIndexJob( lanceDataset: LanceDataset, readOptions: LanceSparkReadOptions, + indexType: IndexType, uuid: String, fragmentIds: List[Integer]): IndexJob = { // Get namespace info from catalog if available (for credential vending on workers) @@ -159,7 +221,7 @@ case class AddIndexExec( case _ => (None, None, None, None) } - IndexUtils.buildIndexType(method) match { + indexType match { case IndexType.BTREE => val mode = args.find(_.name == "build_mode").map(_.value.asInstanceOf[String]) mode match { @@ -177,7 +239,7 @@ case class AddIndexExec( new FragmentBasedIndexJob( this, readOptions, - uuid, + Some(uuid), fragmentIds, nsImpl, nsProps, @@ -193,7 +255,7 @@ case class AddIndexExec( new FragmentBasedIndexJob( this, readOptions, - uuid, + if (indexType == IndexType.ZONEMAP) None else Some(uuid), fragmentIds, nsImpl, nsProps, @@ -207,7 +269,7 @@ case class AddIndexExec( * Interface for index job to implement different indexing strategies. */ trait IndexJob extends Serializable { - def run(): Unit + def run(): Seq[Index] } /** @@ -227,28 +289,32 @@ trait IndexJob extends Serializable { class FragmentBasedIndexJob( addIndexExec: AddIndexExec, readOptions: LanceSparkReadOptions, - uuid: String, + indexUuid: Option[String], fragmentIds: List[Integer], nsImpl: Option[String], nsProps: Option[Map[String, String]], tableId: Option[List[String]], initialStorageOpts: Option[Map[String, String]]) extends IndexJob { - override def run(): Unit = { + override def run(): Seq[Index] = { val encodedReadOptions = encode(readOptions) val columns = addIndexExec.columns.toList val argsJson = IndexUtils.toJson(addIndexExec.args) + val fragmentBatches = if (IndexUtils.buildIndexType(addIndexExec.method) == IndexType.ZONEMAP) { + batchFragments(fragmentIds) + } else { + fragmentIds.map(fid => List(fid)) + } - // Build per-fragment tasks - val tasks = fragmentIds.map { fid => + val tasks = fragmentBatches.map { batch => FragmentIndexTask( encodedReadOptions, columns, addIndexExec.method, argsJson, addIndexExec.indexName, - uuid, - fid, + indexUuid, + batch, nsImpl, nsProps, tableId, @@ -259,6 +325,14 @@ class FragmentBasedIndexJob( .parallelize(tasks, tasks.size) .map(t => t.execute()) .collect() + .map(decode[Index]) + .toSeq + } + + private def batchFragments(fragmentIds: List[Integer]): Seq[List[Integer]] = { + val targetTasks = math.max(1, math.min(fragmentIds.size, addIndexExec.session.sparkContext.defaultParallelism)) + val batchSize = math.ceil(fragmentIds.size.toDouble / targetTasks.toDouble).toInt + fragmentIds.grouped(batchSize).map(_.toList).toSeq } } @@ -272,7 +346,7 @@ class FragmentBasedIndexJob( * @param argsJson JSON string containing index parameters * @param indexName Name of the index being created * @param uuid Unique identifier for this index operation - * @param fragmentId ID of the fragment to create index on + * @param fragmentIds IDs of the fragments to create index on * @param namespaceImpl Implementation class for namespace operations * @param namespaceProperties Properties of the namespace * @param tableId Identifier for the table within the namespace @@ -284,8 +358,8 @@ case class FragmentIndexTask( method: String, argsJson: String, indexName: String, - uuid: String, - fragmentId: Int, + indexUuid: Option[String], + fragmentIds: List[Integer], namespaceImpl: Option[String], namespaceProperties: Option[Map[String, String]], tableId: Option[List[String]], @@ -300,23 +374,25 @@ case class FragmentIndexTask( val indexOptions = IndexOptions .builder(java.util.Arrays.asList(columns: _*), indexType, params) - .replace(true) - .withIndexName(indexName) - .withIndexUUID(uuid) - .withFragmentIds(Collections.singletonList(fragmentId)) - .build() + .withFragmentIds(fragmentIds.asJava) + if (indexType == IndexType.ZONEMAP) { + indexOptions.replace(false) + } else { + indexOptions + .replace(true) + .withIndexName(indexName) + indexUuid.foreach(indexOptions.withIndexUUID) + } val dataset = Utils.openDatasetBuilder(readOptions) .initialStorageOptions(initialStorageOptions.map(_.asJava).orNull) .build() try { - dataset.createIndex(indexOptions) + encode(dataset.createIndex(indexOptions.build())) } finally { dataset.close() } - - encode("OK") } } @@ -344,7 +420,7 @@ class RangeBasedBTreeIndexJob( private val VALUE_COLUMN_NAME = "value" - override def run(): Unit = { + override def run(): Seq[Index] = { if (addIndexExec.columns.size != 1) { throw new UnsupportedOperationException( "Range-based BTree index currently supports a single column only") @@ -393,6 +469,8 @@ class RangeBasedBTreeIndexJob( rangeDf.queryExecution.toRdd.mapPartitionsWithIndex { case (rangeId, rowsIter) => indexBuilder.buildForRange(rangeId, rowsIter) }.collect() + + Seq.empty } } @@ -530,6 +608,7 @@ object IndexUtils { def buildIndexType(method: String): IndexType = { method match { case "btree" => IndexType.BTREE + case "zonemap" => IndexType.ZONEMAP case "fts" => IndexType.INVERTED case other => throw new UnsupportedOperationException(s"Unsupported index method: $other") } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/TestUtils.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/TestUtils.java index 5fd9fde4..3043c0fd 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/TestUtils.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/TestUtils.java @@ -23,6 +23,7 @@ import java.net.URL; import java.util.Arrays; +import java.util.Collections; import java.util.List; public class TestUtils { @@ -88,9 +89,10 @@ public static class TestTable1Config { Optional.empty() /* topNSortOrders */, Optional.empty() /* pushedAggregation */, "test" /* scanId */, - null /* initialStorageOptions */, + true /* useScalarIndex */, + Collections.emptyMap() /* initialStorageOptions */, null /* namespaceImpl */, - null /* namespaceProperties */, + Collections.emptyMap() /* namespaceProperties */, null /* partitionKeyRow */); } } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceColumnarPartitionReaderTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceColumnarPartitionReaderTest.java index 4a3cc39e..1c29d4f6 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceColumnarPartitionReaderTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceColumnarPartitionReaderTest.java @@ -43,9 +43,10 @@ public void test() throws Exception { Optional.empty() /* topNSortOrders */, Optional.empty() /* pushedAggregation */, "test" /* scanId */, - null /* initialStorageOptions */, + true /* useScalarIndex */, + Collections.emptyMap() /* initialStorageOptions */, null /* namespaceImpl */, - null /* namespaceProperties */, + Collections.emptyMap() /* namespaceProperties */, null /* partitionKeyRow */); try (LanceColumnarPartitionReader reader = new LanceColumnarPartitionReader(partition)) { List> expectedValues = TestUtils.TestTable1Config.expectedValues; @@ -86,9 +87,10 @@ public void testOffsetAndLimit() throws Exception { Optional.empty() /* topNSortOrders */, Optional.empty() /* pushedAggregation */, "testOffsetAndLimit" /* scanId */, - null /* initialStorageOptions */, + true /* useScalarIndex */, + Collections.emptyMap() /* initialStorageOptions */, null /* namespaceImpl */, - null /* namespaceProperties */, + Collections.emptyMap() /* namespaceProperties */, null /* partitionKeyRow */); try (LanceColumnarPartitionReader reader = new LanceColumnarPartitionReader(partition)) { List> expectedValues = TestUtils.TestTable1Config.expectedValues; @@ -131,9 +133,10 @@ public void testTopN() throws Exception { Optional.of(Collections.singletonList(builder.build())) /* topNSortOrders */, Optional.empty() /* pushedAggregation */, "testTopN" /* scanId */, - null /* initialStorageOptions */, + true /* useScalarIndex */, + Collections.emptyMap() /* initialStorageOptions */, null /* namespaceImpl */, - null /* namespaceProperties */, + Collections.emptyMap() /* namespaceProperties */, null /* partitionKeyRow */); try (LanceColumnarPartitionReader reader = new LanceColumnarPartitionReader(partition)) { List> expectedValues = TestUtils.TestTable1Config.expectedValues; diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceCountStarPartitionReaderTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceCountStarPartitionReaderTest.java index 23135af5..ce73fd0e 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceCountStarPartitionReaderTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceCountStarPartitionReaderTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -49,9 +50,10 @@ public void testCloseReleasesArrowMemory() throws Exception { Optional.empty(), Optional.of(countStarAgg), "testCountStarMemoryLeak", + true, + Collections.emptyMap(), null, - null, - null, + Collections.emptyMap(), null); LanceCountStarPartitionReader reader = new LanceCountStarPartitionReader(partition); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceDatasetReadTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceDatasetReadTest.java index 2a8c6670..a526a332 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceDatasetReadTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceDatasetReadTest.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -125,9 +126,10 @@ public void validateFragment(List> expectedValues, int fragment, St Optional.empty() /* topNSortOrders */, Optional.empty() /* pushedAggregation */, "validateFragment" /* scanId */, - null /* initialStorageOptions */, + true /* useScalarIndex */, + Collections.emptyMap() /* initialStorageOptions */, null /* namespaceImpl */, - null /* namespaceProperties */, + Collections.emptyMap() /* namespaceProperties */, null /* partitionKeyRow */))) { try (ArrowReader reader = scanner.getArrowReader()) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java index 13fe3115..22e6359a 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java @@ -215,6 +215,7 @@ public void testOutputPartitioningWithPartitionInfo() { Collections.emptyMap(), null, partInfo, + true, Collections.emptyMap(), null, Collections.emptyMap()); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/ZonemapFragmentPrunerTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/ZonemapFragmentPrunerTest.java index 35678a03..fe4e0a79 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/ZonemapFragmentPrunerTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/ZonemapFragmentPrunerTest.java @@ -75,6 +75,16 @@ public void testEqualToMatchesOneFragment() { assertEquals(Set.of(1), result.get()); } + @Test + public void testEqualToMatchesOneFragmentWithIntegerFilterValue() { + Map> stats = threeFragmentStats("x"); + Filter[] filters = new Filter[] {new EqualTo("x", 150)}; + + Optional> result = ZonemapFragmentPruner.pruneFragments(filters, stats); + assertTrue(result.isPresent()); + assertEquals(Set.of(1), result.get()); + } + @Test public void testEqualToMatchesNoFragment() { Map> stats = threeFragmentStats("x"); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java index 3133ee2e..31fc1181 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java @@ -14,10 +14,15 @@ package org.lance.spark.update; import org.lance.index.Index; +import org.lance.spark.LanceSparkReadOptions; +import org.lance.spark.read.LanceScan; +import org.lance.spark.read.LanceScanBuilder; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.sources.EqualTo; +import org.apache.spark.sql.sources.Filter; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -28,6 +33,7 @@ import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.UUID; @@ -195,6 +201,139 @@ public void testCreateBTreeIndexWithZoneSize() { Assertions.assertEquals("text_15", r.getString(1)); } + @Test + public void testCreateZonemapIndexWithRowsPerZone() { + prepareDataset(); + + Dataset result = + spark.sql( + String.format( + "alter table %s create index test_index_zonemap using zonemap (id) with (rows_per_zone=4)", + fullTable)); + + Assertions.assertEquals( + "StructType(StructField(fragments_indexed,LongType,true),StructField(index_name,StringType,true))", + result.schema().toString()); + + Row row = result.collectAsList().get(0); + long fragmentsIndexed = row.getLong(0); + String indexName = row.getString(1); + + Assertions.assertTrue(fragmentsIndexed >= 2, "Expected at least 2 fragments to be indexed"); + Assertions.assertEquals("test_index_zonemap", indexName); + + checkIndex("test_index_zonemap"); + + org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); + try { + int fragmentCount = lanceDataset.getFragments().size(); + int expectedSegmentCount = + Math.min(fragmentCount, spark.sparkContext().defaultParallelism()); + List zonemapSegments = + lanceDataset.getIndexes().stream() + .filter(index -> "test_index_zonemap".equals(index.name())) + .collect(Collectors.toList()); + int coveredFragments = + zonemapSegments.stream() + .map(index -> index.fragments().orElse(Collections.emptyList()).size()) + .mapToInt(Integer::intValue) + .sum(); + + Assertions.assertEquals( + expectedSegmentCount, + zonemapSegments.size(), + "Expected distributed zonemap build to batch fragments into bounded segment count"); + Assertions.assertTrue( + zonemapSegments.stream() + .allMatch(index -> index.fragments().isPresent() && !index.fragments().get().isEmpty()), + "Expected each zonemap segment to cover at least one fragment"); + Assertions.assertTrue( + zonemapSegments.stream() + .anyMatch(index -> index.fragments().isPresent() && index.fragments().get().size() > 1), + "Expected zonemap batching to create at least one multi-fragment segment"); + Assertions.assertEquals( + fragmentCount, + coveredFragments, + "Expected zonemap segments to cover all fragments exactly once"); + Assertions.assertFalse( + lanceDataset.getZonemapStats("id").isEmpty(), + "Expected zonemap stats for indexed column"); + } finally { + lanceDataset.close(); + } + + assertZonemapFilterPrunesFragments(15); + + Dataset query = spark.sql(String.format("select * from %s where id=15", fullTable)); + Assertions.assertEquals(1L, query.count()); + Row r = query.collectAsList().get(0); + Assertions.assertEquals(15, r.getInt(0)); + Assertions.assertEquals("text_15", r.getString(1)); + } + + @Test + public void testCreateMultiColumnZonemapIndexThrowsHelpfulError() { + prepareDataset(); + + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + spark.sql( + String.format( + "alter table %s create index test_index_zonemap_multi using zonemap (id, text) with (rows_per_zone=4)", + fullTable))); + + Assertions.assertTrue( + exception.getMessage().contains("single column"), + "Expected multi-column zonemap error to mention single-column support, got: " + + exception.getMessage()); + } + + @Test + public void testRepeatedCreateZonemapIndexReplacesExistingSegments() { + prepareDataset(); + + String sql = + String.format( + "alter table %s create index test_index_zonemap_repeat using zonemap (id) with (rows_per_zone=4)", + fullTable); + + spark.sql(sql); + spark.sql(sql); + + org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); + try { + int fragmentCount = lanceDataset.getFragments().size(); + int expectedSegmentCount = + Math.min(fragmentCount, spark.sparkContext().defaultParallelism()); + List zonemapSegments = + lanceDataset.getIndexes().stream() + .filter(index -> "test_index_zonemap_repeat".equals(index.name())) + .collect(Collectors.toList()); + int coveredFragments = + zonemapSegments.stream() + .map(index -> index.fragments().orElse(Collections.emptyList()).size()) + .mapToInt(Integer::intValue) + .sum(); + + Assertions.assertEquals( + expectedSegmentCount, + zonemapSegments.size(), + "Expected recreated zonemap index to replace existing batched segments instead of duplicating them"); + Assertions.assertTrue( + zonemapSegments.stream() + .allMatch(index -> index.fragments().isPresent() && !index.fragments().get().isEmpty()), + "Expected recreated zonemap segments to keep non-empty fragment coverage"); + Assertions.assertEquals( + fragmentCount, + coveredFragments, + "Expected recreated zonemap segments to cover all fragments exactly once"); + } finally { + lanceDataset.close(); + } + } + @Test public void testCreateBTreeIndexWithRangeMode() { prepareDataset(); @@ -449,6 +588,44 @@ public void testDropIndexThenRecreate() { Assertions.assertEquals(1L, query.count()); } + private void assertZonemapFilterPrunesFragments(int targetId) { + assertZonemapFilterPrunesFragments( + new EqualTo("id", targetId), String.format("id=%d", targetId)); + } + + private void assertZonemapFilterPrunesFragments(Filter filter, String description) { + LanceSparkReadOptions readOptions = LanceSparkReadOptions.from(tableDir); + + LanceScanBuilder unfilteredBuilder = + new LanceScanBuilder( + spark.table(fullTable).schema(), + readOptions, + Collections.emptyMap(), + null, + Collections.emptyMap(), + Collections.emptyMap()); + int unfilteredPartitions = ((LanceScan) unfilteredBuilder.build()).planInputPartitions().length; + Assertions.assertTrue( + unfilteredPartitions >= 2, "Expected multiple partitions for zonemap pruning test"); + + LanceScanBuilder filteredBuilder = + new LanceScanBuilder( + spark.table(fullTable).schema(), + readOptions, + Collections.emptyMap(), + null, + Collections.emptyMap(), + Collections.emptyMap()); + filteredBuilder.pushFilters(new Filter[] {filter}); + int filteredPartitions = ((LanceScan) filteredBuilder.build()).planInputPartitions().length; + + Assertions.assertTrue( + filteredPartitions < unfilteredPartitions, + String.format( + "Expected zonemap pruning to reduce planned partitions for %s (before=%d, after=%d)", + description, unfilteredPartitions, filteredPartitions)); + } + private void checkIndex(String indexName) { // Check index is created successfully org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseShowIndexesTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseShowIndexesTest.java index efd8b79f..fc4014d8 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseShowIndexesTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseShowIndexesTest.java @@ -131,4 +131,30 @@ public void testShowIndexes() { long numIndexedRows = row.getLong(4); Assertions.assertTrue(numIndexedRows >= 1L, "num_indexed_rows should be at least 1"); } + + @Test + public void testShowZonemapIndexes() { + prepareDataset(); + + spark.sql( + String.format( + "alter table %s create index test_zonemap using zonemap (id) with (rows_per_zone=4)", + fullTable)); + + Dataset result = spark.sql(String.format("show indexes from %s", fullTable)); + List rows = result.collectAsList(); + + Row row = + rows.stream() + .filter(r -> "test_zonemap".equals(r.getString(0))) + .findFirst() + .orElseThrow(() -> new AssertionError("Expected SHOW INDEXES to include test_zonemap")); + + @SuppressWarnings("unchecked") + List fieldNames = row.getList(1); + Assertions.assertTrue(fieldNames.contains("id"), "fields should contain column name 'id'"); + Assertions.assertEquals("zonemap", row.getString(2)); + Assertions.assertTrue(row.getLong(3) >= 1L, "num_indexed_fragments should be at least 1"); + Assertions.assertTrue(row.getLong(4) >= 1L, "num_indexed_rows should be at least 1"); + } }