From 1158399535c38f8a73d825d32ed8d3a9dc478907 Mon Sep 17 00:00:00 2001 From: beinan Date: Tue, 21 Apr 2026 21:38:10 +0000 Subject: [PATCH 1/7] feat: support zonemap indexes in CREATE INDEX DDL --- docker/tests/test_lance_spark.py | 38 ++++++++- docs/src/operations/ddl/create-index.md | 36 ++++++++- docs/src/operations/ddl/show-indexes.md | 2 +- integration-tests/test_lance_spark.py | 38 ++++++++- .../lance/spark/read/LanceScanBuilder.java | 9 ++- .../spark/read/ZonemapFragmentPruner.java | 79 +++++++++++------- .../datasources/v2/AddIndexExec.scala | 36 ++++++++- .../spark/read/ZonemapFragmentPrunerTest.java | 10 +++ .../lance/spark/update/BaseAddIndexTest.java | 80 +++++++++++++++++++ .../spark/update/BaseShowIndexesTest.java | 26 ++++++ 10 files changed, 310 insertions(+), 44 deletions(-) diff --git a/docker/tests/test_lance_spark.py b/docker/tests/test_lance_spark.py index a21eff06b..9c872fe23 100644 --- a/docker/tests/test_lance_spark.py +++ b/docker/tests/test_lance_spark.py @@ -492,7 +492,7 @@ def test_properties_persist_after_insert(self, spark): class TestDDLIndex: - """Test DDL index operations: CREATE INDEX (BTree, FTS).""" + """Test DDL index operations: CREATE INDEX (BTree, ZoneMap, FTS).""" def test_create_btree_index_on_int(self, spark): """Test CREATE INDEX with BTree on integer column.""" @@ -562,6 +562,42 @@ def test_create_btree_index_on_string(self, spark): """).collect() assert len(query_result) == 3 + def test_create_zonemap_index_on_int(self, spark): + """Test CREATE INDEX with ZoneMap on integer column.""" + spark.sql(""" + CREATE TABLE default.test_table ( + id INT, + name STRING, + value DOUBLE + ) + """) + + data = [(i, f"Name{i}", float(i * 10)) for i in range(100)] + df = spark.createDataFrame(data, ["id", "name", "value"]) + df.writeTo("default.test_table").append() + + result = spark.sql(""" + ALTER TABLE default.test_table + CREATE INDEX idx_id_zonemap USING zonemap (id) + WITH (rows_per_zone = 8) + """).collect() + + assert len(result) == 1 + assert result[0][1] == "idx_id_zonemap" + + indexes = spark.sql(""" + SHOW INDEXES IN default.test_table + """).collect() + zonemap_rows = [row for row in indexes if row["name"] == "idx_id_zonemap"] + assert len(zonemap_rows) == 1 + assert zonemap_rows[0]["index_type"] == "zonemap" + + query_result = spark.sql(""" + SELECT * FROM default.test_table WHERE id = 50 + """).collect() + assert len(query_result) == 1 + assert query_result[0].id == 50 + def test_create_fts_index(self, spark): """Test CREATE INDEX with full-text search (FTS).""" spark.sql(""" diff --git a/docs/src/operations/ddl/create-index.md b/docs/src/operations/ddl/create-index.md index 7e875d87f..fe04bcd30 100755 --- a/docs/src/operations/ddl/create-index.md +++ b/docs/src/operations/ddl/create-index.md @@ -7,7 +7,7 @@ Creates a scalar index on a Lance table to accelerate queries. ## Overview -The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. This operation is performed in a distributed manner, building indexes for each data fragment in parallel. +The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. Depending on the index method, Lance Spark either uses a fragment-parallel build path or delegates to Lance's built-in single-phase index creation path. ## Basic Usage @@ -24,6 +24,7 @@ The following index methods are supported: | Method | Description | |---------|-----------------------------------------------------------------------------| +| `zonemap` | Lightweight min/max index for fragment pruning on scalar columns. | | `btree` | B-tree index for efficient range queries and point lookups on scalar columns. | | `fts` | Full-text search (inverted) index for text search on string columns. | @@ -31,6 +32,14 @@ The following index methods are supported: The `CREATE INDEX` command supports options via the `WITH` clause to control index creation. These options are specific to the chosen index method. +### ZoneMap Options + +For the `zonemap` method, the following options are supported: + +| Option | Type | Description | +|-----------------|------|----------------------------------------------| +| `rows_per_zone` | Long | The approximate number of rows per zonemap zone. | + ### BTree Options For the `btree` method, the following options are supported: @@ -77,6 +86,15 @@ Create a composite index on multiple columns. ALTER TABLE lance.db.logs CREATE INDEX idx_ts_level USING btree (timestamp, level); ``` +### Lightweight Fragment Pruning + +Create a zonemap index when you want lightweight min/max-based fragment pruning: + +=== "SQL" + ```sql + ALTER TABLE lance.db.users CREATE INDEX idx_id_zonemap USING zonemap (id); + ``` + ### Indexing with Options Create an index and specify the `zone_size` for the B-tree: @@ -86,6 +104,15 @@ Create an index and specify the `zone_size` for the B-tree: ALTER TABLE lance.db.users CREATE INDEX idx_id_zoned USING btree (id) WITH (zone_size = 2048); ``` +### Zonemap with Options + +Create a zonemap index and specify the approximate number of rows per zone: + +=== "SQL" + ```sql + ALTER TABLE lance.db.users CREATE INDEX idx_id_zonemap USING zonemap (id) WITH (rows_per_zone = 2048); + ``` + ### Full-Text Search Index Create an FTS index on a text column: @@ -117,17 +144,18 @@ The `CREATE INDEX` command returns the following information about the operation Consider creating an index when: - You frequently filter a large table on a specific column. +- You want lightweight fragment pruning based on per-zone min/max statistics. - Your queries involve point lookups or small range scans. ## How It Works The `CREATE INDEX` command operates as follows: -1. **Distributed Index Building**: For each fragment in the Lance dataset, a separate task is launched to build an index on the specified column(s). -2. **Metadata Merging**: Once all per-fragment indexes are built, their metadata is collected and merged. +1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree` can use fragment-parallel execution, while `zonemap` is built through Lance's single-phase create-index API. +2. **Metadata Finalization**: Lance records the new index metadata as part of the index creation flow. 3. **Transactional Commit**: A new table version is committed with the new index information. The operation is atomic and ensures that concurrent reads are not affected. ## Notes and Limitations -- **Index Methods**: The `btree` and `fts` methods are supported for scalar index creation. +- **Index Methods**: The `zonemap`, `btree`, and `fts` methods are supported for scalar index creation. - **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one. This is because the underlying implementation uses `replace(true)`. diff --git a/docs/src/operations/ddl/show-indexes.md b/docs/src/operations/ddl/show-indexes.md index c53ee448a..1c546cdcd 100755 --- a/docs/src/operations/ddl/show-indexes.md +++ b/docs/src/operations/ddl/show-indexes.md @@ -49,7 +49,7 @@ The `SHOW INDEXES` command returns the following columns: |-------------------------|---------------|--------------------------------------------------------------------| | `name` | string | Logical name of the index. | | `fields` | array | List of column names included in the index. | -| `index_type` | string | Human-readable index type (for example `btree`). | +| `index_type` | string | Human-readable index type (for example `btree` or `zonemap`). | | `num_indexed_fragments` | long | Number of fragments fully or partially covered by the index. | | `num_indexed_rows` | long | Approximate number of rows covered by the index. | | `num_unindexed_fragments` | long | Number of fragments that are not yet indexed. | diff --git a/integration-tests/test_lance_spark.py b/integration-tests/test_lance_spark.py index f63666f23..894fc52d1 100644 --- a/integration-tests/test_lance_spark.py +++ b/integration-tests/test_lance_spark.py @@ -620,7 +620,7 @@ def test_compression_metadata_reaches_lance_file(self, spark): class TestDDLIndex: - """Test DDL index operations: CREATE INDEX (BTree, FTS).""" + """Test DDL index operations: CREATE INDEX (BTree, ZoneMap, FTS).""" def test_create_btree_index_on_int(self, spark): """Test CREATE INDEX with BTree on integer column.""" @@ -690,6 +690,42 @@ def test_create_btree_index_on_string(self, spark): """).collect() assert len(query_result) == 3 + def test_create_zonemap_index_on_int(self, spark): + """Test CREATE INDEX with ZoneMap on integer column.""" + spark.sql(""" + CREATE TABLE default.test_table ( + id INT, + name STRING, + value DOUBLE + ) + """) + + data = [(i, f"Name{i}", float(i * 10)) for i in range(100)] + df = spark.createDataFrame(data, ["id", "name", "value"]) + df.writeTo("default.test_table").append() + + result = spark.sql(""" + ALTER TABLE default.test_table + CREATE INDEX idx_id_zonemap USING zonemap (id) + WITH (rows_per_zone = 8) + """).collect() + + assert len(result) == 1 + assert result[0][1] == "idx_id_zonemap" + + indexes = spark.sql(""" + SHOW INDEXES IN default.test_table + """).collect() + zonemap_rows = [row for row in indexes if row["name"] == "idx_id_zonemap"] + assert len(zonemap_rows) == 1 + assert zonemap_rows[0]["index_type"] == "zonemap" + + query_result = spark.sql(""" + SELECT * FROM default.test_table WHERE id = 50 + """).collect() + assert len(query_result) == 1 + assert query_result[0].id == 50 + def test_create_fts_index(self, spark): """Test CREATE INDEX with full-text search (FTS).""" spark.sql(""" diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java index 1178bfa36..e7434e532 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java @@ -16,7 +16,8 @@ import org.lance.Dataset; import org.lance.Fragment; import org.lance.ManifestSummary; -import org.lance.index.IndexDescription; +import org.lance.index.Index; +import org.lance.index.IndexType; import org.lance.index.scalar.ZoneStats; import org.lance.ipc.ColumnOrdering; import org.lance.schema.LanceField; @@ -381,9 +382,9 @@ private Set findZonemapIndexedColumns(Dataset dataset) { fieldIdToName.put(field.getId(), field.getName()); } - for (IndexDescription idx : dataset.describeIndices()) { - if ("ZONEMAP".equalsIgnoreCase(idx.getIndexType())) { - for (int fieldId : idx.getFieldIds()) { + for (Index idx : dataset.getIndexes()) { + if (idx.indexType() == IndexType.ZONEMAP) { + for (int fieldId : idx.fields()) { String name = fieldIdToName.get(fieldId); if (name != null) { columns.add(name); diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java index 06b5a6deb..8e2e259a9 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java @@ -33,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.math.BigInteger; import java.io.Serializable; import java.util.Collections; import java.util.HashMap; @@ -159,7 +161,6 @@ private static Optional> analyzeFilter( return Optional.empty(); } - @SuppressWarnings("unchecked") private static Optional> analyzeComparison( String column, Object value, @@ -171,17 +172,9 @@ private static Optional> analyzeComparison( return Optional.empty(); } - Comparable target; - try { - target = (Comparable) value; - } catch (ClassCastException e) { - LOG.warn("Cannot cast filter value {} to Comparable for zonemap pruning", value); - return Optional.empty(); - } - Set matchingFragments = new HashSet<>(); for (ZoneStats zone : stats) { - if (zoneMatchesComparison(zone, target, type)) { + if (zoneMatchesComparison(zone, value, type)) { matchingFragments.add(zone.getFragmentId()); } } @@ -189,12 +182,10 @@ private static Optional> analyzeComparison( return Optional.of(matchingFragments); } - @SuppressWarnings("unchecked") - private static boolean zoneMatchesComparison( - ZoneStats zone, Comparable target, ComparisonType type) { + private static boolean zoneMatchesComparison(ZoneStats zone, Object target, ComparisonType type) { - Comparable min = (Comparable) zone.getMin(); - Comparable max = (Comparable) zone.getMax(); + Object min = zone.getMin(); + Object max = zone.getMax(); // If min or max is null, the zone contains only nulls for the indexed range; // non-null comparisons cannot match. @@ -206,27 +197,26 @@ private static boolean zoneMatchesComparison( switch (type) { case EQUALS: // target ∈ [min, max] - return target.compareTo(min) >= 0 && target.compareTo(max) <= 0; + return compareValues(target, min) >= 0 && compareValues(target, max) <= 0; case LESS_THAN: // ∃ row < target ⟺ zone.min < target - return min.compareTo(target) < 0; + return compareValues(min, target) < 0; case LESS_THAN_OR_EQUAL: - return min.compareTo(target) <= 0; + return compareValues(min, target) <= 0; case GREATER_THAN: - return max.compareTo(target) > 0; + return compareValues(max, target) > 0; case GREATER_THAN_OR_EQUAL: - return max.compareTo(target) >= 0; + return compareValues(max, target) >= 0; default: return true; // conservative } - } catch (ClassCastException e) { + } catch (ClassCastException | IllegalArgumentException e) { // Type mismatch between filter value and zone stats — be conservative LOG.warn("Type mismatch in zonemap comparison, skipping pruning for zone", e); return true; } } - @SuppressWarnings("unchecked") private static Optional> analyzeIn( String column, Object[] values, Map> statsByColumn) { @@ -244,14 +234,7 @@ private static Optional> analyzeIn( break; } } else { - try { - Comparable target = (Comparable) value; - if (zoneMatchesComparison(zone, target, ComparisonType.EQUALS)) { - matchingFragments.add(zone.getFragmentId()); - break; - } - } catch (ClassCastException e) { - // Non-comparable value, conservatively include + if (zoneMatchesComparison(zone, value, ComparisonType.EQUALS)) { matchingFragments.add(zone.getFragmentId()); break; } @@ -262,6 +245,42 @@ private static Optional> analyzeIn( return Optional.of(matchingFragments); } + @SuppressWarnings({"rawtypes", "unchecked"}) + private static int compareValues(Object left, Object right) { + if (left instanceof Number && right instanceof Number) { + return compareNumbers((Number) left, (Number) right); + } + if (isStringLike(left) && isStringLike(right)) { + return left.toString().compareTo(right.toString()); + } + return ((Comparable) left).compareTo(right); + } + + private static int compareNumbers(Number left, Number right) { + return toBigDecimal(left).compareTo(toBigDecimal(right)); + } + + private static BigDecimal toBigDecimal(Number value) { + if (value instanceof BigDecimal) { + return (BigDecimal) value; + } + if (value instanceof BigInteger) { + return new BigDecimal((BigInteger) value); + } + if (value instanceof Byte || value instanceof Short || value instanceof Integer + || value instanceof Long) { + return BigDecimal.valueOf(value.longValue()); + } + if (value instanceof Float || value instanceof Double) { + return BigDecimal.valueOf(value.doubleValue()); + } + return new BigDecimal(value.toString()); + } + + private static boolean isStringLike(Object value) { + return value instanceof CharSequence || value instanceof UTF8String; + } + private static Optional> analyzeIsNull( String column, Map> statsByColumn) { diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index 6f270d009..36af00cc8 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -40,12 +40,12 @@ import java.util.{Collections, Optional, UUID} import scala.collection.JavaConverters._ /** - * Physical execution of distributed CREATE INDEX (ALTER TABLE ... CREATE INDEX ...) for Lance datasets. + * Physical execution of CREATE INDEX (ALTER TABLE ... CREATE INDEX ...) for Lance datasets. * *
    *
  • For BTREE index, it uses a range-based approach that redistributes and sorts data across partitions, creates indexes for each range in parallel, and finally merges them into a global index structure. - *
  • For other index types, it processes each fragment independently in parallel, merges index metadata - * and commits an index-creation transaction. + *
  • For fragment-trainable scalar index types, it processes each fragment independently in parallel, merges index metadata and commits an index-creation transaction. + *
  • For zonemap index, it delegates to the underlying Lance single-phase index creation path because zonemap does not support fragment training. *
*/ case class AddIndexExec( @@ -84,6 +84,13 @@ case class AddIndexExec( val uuid = UUID.randomUUID() val indexType = IndexUtils.buildIndexType(method) + if (indexType == IndexType.ZONEMAP) { + createDirectScalarIndex(readOptions, indexType) + return Seq(new GenericInternalRow(Array[Any]( + fragmentIds.size.toLong, + UTF8String.fromString(indexName)))) + } + // Create distributed index job and run it createIndexJob(lanceDataset, readOptions, uuid.toString, fragmentIds).run() @@ -139,6 +146,28 @@ case class AddIndexExec( UTF8String.fromString(indexName)))) } + private def createDirectScalarIndex( + readOptions: LanceSparkReadOptions, + indexType: IndexType): Unit = { + val argsJson = IndexUtils.toJson(args) + val params = IndexParams.builder() + .setScalarIndexParams(ScalarIndexParams.create(method, argsJson)) + .build() + + val indexOptions = IndexOptions + .builder(columns.asJava, indexType, params) + .replace(true) + .withIndexName(indexName) + .build() + + val dataset = Utils.openDatasetBuilder(readOptions).build() + try { + dataset.createIndex(indexOptions) + } finally { + dataset.close() + } + } + private def createIndexJob( lanceDataset: LanceDataset, readOptions: LanceSparkReadOptions, @@ -514,6 +543,7 @@ object IndexUtils { def buildIndexType(method: String): IndexType = { method match { case "btree" => IndexType.BTREE + case "zonemap" => IndexType.ZONEMAP case "fts" => IndexType.INVERTED case other => throw new UnsupportedOperationException(s"Unsupported index method: $other") } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/ZonemapFragmentPrunerTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/ZonemapFragmentPrunerTest.java index 35678a03d..fe4e0a797 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/ZonemapFragmentPrunerTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/ZonemapFragmentPrunerTest.java @@ -75,6 +75,16 @@ public void testEqualToMatchesOneFragment() { assertEquals(Set.of(1), result.get()); } + @Test + public void testEqualToMatchesOneFragmentWithIntegerFilterValue() { + Map> stats = threeFragmentStats("x"); + Filter[] filters = new Filter[] {new EqualTo("x", 150)}; + + Optional> result = ZonemapFragmentPruner.pruneFragments(filters, stats); + assertTrue(result.isPresent()); + assertEquals(Set.of(1), result.get()); + } + @Test public void testEqualToMatchesNoFragment() { Map> stats = threeFragmentStats("x"); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java index 3133ee2e2..1634d9beb 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java @@ -14,10 +14,15 @@ package org.lance.spark.update; import org.lance.index.Index; +import org.lance.spark.LanceSparkReadOptions; +import org.lance.spark.read.LanceScan; +import org.lance.spark.read.LanceScanBuilder; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.sources.EqualTo; +import org.apache.spark.sql.sources.Filter; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -28,6 +33,7 @@ import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.UUID; @@ -195,6 +201,47 @@ public void testCreateBTreeIndexWithZoneSize() { Assertions.assertEquals("text_15", r.getString(1)); } + @Test + public void testCreateZonemapIndexWithRowsPerZone() { + prepareDataset(); + + Dataset result = + spark.sql( + String.format( + "alter table %s create index test_index_zonemap using zonemap (id) with (rows_per_zone=4)", + fullTable)); + + Assertions.assertEquals( + "StructType(StructField(fragments_indexed,LongType,true),StructField(index_name,StringType,true))", + result.schema().toString()); + + Row row = result.collectAsList().get(0); + long fragmentsIndexed = row.getLong(0); + String indexName = row.getString(1); + + Assertions.assertTrue(fragmentsIndexed >= 2, "Expected at least 2 fragments to be indexed"); + Assertions.assertEquals("test_index_zonemap", indexName); + + checkIndex("test_index_zonemap"); + + org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); + try { + Assertions.assertFalse( + lanceDataset.getZonemapStats("id").isEmpty(), + "Expected zonemap stats for indexed column"); + } finally { + lanceDataset.close(); + } + + assertZonemapFilterPrunesFragments(15); + + Dataset query = spark.sql(String.format("select * from %s where id=15", fullTable)); + Assertions.assertEquals(1L, query.count()); + Row r = query.collectAsList().get(0); + Assertions.assertEquals(15, r.getInt(0)); + Assertions.assertEquals("text_15", r.getString(1)); + } + @Test public void testCreateBTreeIndexWithRangeMode() { prepareDataset(); @@ -449,6 +496,39 @@ public void testDropIndexThenRecreate() { Assertions.assertEquals(1L, query.count()); } + private void assertZonemapFilterPrunesFragments(int targetId) { + LanceSparkReadOptions readOptions = LanceSparkReadOptions.from(tableDir); + + LanceScanBuilder unfilteredBuilder = + new LanceScanBuilder( + spark.table(fullTable).schema(), + readOptions, + Collections.emptyMap(), + null, + Collections.emptyMap(), + Collections.emptyMap()); + int unfilteredPartitions = ((LanceScan) unfilteredBuilder.build()).planInputPartitions().length; + Assertions.assertTrue( + unfilteredPartitions >= 2, "Expected multiple partitions for zonemap pruning test"); + + LanceScanBuilder filteredBuilder = + new LanceScanBuilder( + spark.table(fullTable).schema(), + readOptions, + Collections.emptyMap(), + null, + Collections.emptyMap(), + Collections.emptyMap()); + filteredBuilder.pushFilters(new Filter[] {new EqualTo("id", targetId)}); + int filteredPartitions = ((LanceScan) filteredBuilder.build()).planInputPartitions().length; + + Assertions.assertTrue( + filteredPartitions < unfilteredPartitions, + String.format( + "Expected zonemap pruning to reduce planned partitions for id=%d (before=%d, after=%d)", + targetId, unfilteredPartitions, filteredPartitions)); + } + private void checkIndex(String indexName) { // Check index is created successfully org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseShowIndexesTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseShowIndexesTest.java index efd8b79fd..fc4014d8a 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseShowIndexesTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseShowIndexesTest.java @@ -131,4 +131,30 @@ public void testShowIndexes() { long numIndexedRows = row.getLong(4); Assertions.assertTrue(numIndexedRows >= 1L, "num_indexed_rows should be at least 1"); } + + @Test + public void testShowZonemapIndexes() { + prepareDataset(); + + spark.sql( + String.format( + "alter table %s create index test_zonemap using zonemap (id) with (rows_per_zone=4)", + fullTable)); + + Dataset result = spark.sql(String.format("show indexes from %s", fullTable)); + List rows = result.collectAsList(); + + Row row = + rows.stream() + .filter(r -> "test_zonemap".equals(r.getString(0))) + .findFirst() + .orElseThrow(() -> new AssertionError("Expected SHOW INDEXES to include test_zonemap")); + + @SuppressWarnings("unchecked") + List fieldNames = row.getList(1); + Assertions.assertTrue(fieldNames.contains("id"), "fields should contain column name 'id'"); + Assertions.assertEquals("zonemap", row.getString(2)); + Assertions.assertTrue(row.getLong(3) >= 1L, "num_indexed_fragments should be at least 1"); + Assertions.assertTrue(row.getLong(4) >= 1L, "num_indexed_rows should be at least 1"); + } } From ed8e5c31f1cfc806c839d30a2bee1c61f020a741 Mon Sep 17 00:00:00 2001 From: beinan Date: Tue, 21 Apr 2026 21:57:18 +0000 Subject: [PATCH 2/7] fix: clarify zonemap single-column limitation --- docs/src/operations/ddl/create-index.md | 3 +- .../datasources/v2/AddIndexExec.scala | 4 +++ .../lance/spark/update/BaseAddIndexTest.java | 29 +++++++++++++++++-- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/docs/src/operations/ddl/create-index.md b/docs/src/operations/ddl/create-index.md index fe04bcd30..bf06e4ae0 100755 --- a/docs/src/operations/ddl/create-index.md +++ b/docs/src/operations/ddl/create-index.md @@ -24,7 +24,7 @@ The following index methods are supported: | Method | Description | |---------|-----------------------------------------------------------------------------| -| `zonemap` | Lightweight min/max index for fragment pruning on scalar columns. | +| `zonemap` | Lightweight min/max index for fragment pruning on a scalar column. | | `btree` | B-tree index for efficient range queries and point lookups on scalar columns. | | `fts` | Full-text search (inverted) index for text search on string columns. | @@ -158,4 +158,5 @@ The `CREATE INDEX` command operates as follows: ## Notes and Limitations - **Index Methods**: The `zonemap`, `btree`, and `fts` methods are supported for scalar index creation. +- **Zonemap Column Count**: Zonemap indexes currently support a single column only. The generic `CREATE INDEX` grammar accepts a column list, but Lance rejects multi-column zonemap creation. - **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one. This is because the underlying implementation uses `replace(true)`. diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index 36af00cc8..4fb2fdc60 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -85,6 +85,10 @@ case class AddIndexExec( val indexType = IndexUtils.buildIndexType(method) if (indexType == IndexType.ZONEMAP) { + if (columns.size != 1) { + throw new UnsupportedOperationException( + "Zonemap index currently supports a single column only") + } createDirectScalarIndex(readOptions, indexType) return Seq(new GenericInternalRow(Array[Any]( fragmentIds.size.toLong, diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java index 1634d9beb..c6ae633cd 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java @@ -242,6 +242,25 @@ public void testCreateZonemapIndexWithRowsPerZone() { Assertions.assertEquals("text_15", r.getString(1)); } + @Test + public void testCreateMultiColumnZonemapIndexThrowsHelpfulError() { + prepareDataset(); + + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + spark.sql( + String.format( + "alter table %s create index test_index_zonemap_multi using zonemap (id, text) with (rows_per_zone=4)", + fullTable))); + + Assertions.assertTrue( + exception.getMessage().contains("single column"), + "Expected multi-column zonemap error to mention single-column support, got: " + + exception.getMessage()); + } + @Test public void testCreateBTreeIndexWithRangeMode() { prepareDataset(); @@ -497,6 +516,10 @@ public void testDropIndexThenRecreate() { } private void assertZonemapFilterPrunesFragments(int targetId) { + assertZonemapFilterPrunesFragments(new EqualTo("id", targetId), String.format("id=%d", targetId)); + } + + private void assertZonemapFilterPrunesFragments(Filter filter, String description) { LanceSparkReadOptions readOptions = LanceSparkReadOptions.from(tableDir); LanceScanBuilder unfilteredBuilder = @@ -519,14 +542,14 @@ private void assertZonemapFilterPrunesFragments(int targetId) { null, Collections.emptyMap(), Collections.emptyMap()); - filteredBuilder.pushFilters(new Filter[] {new EqualTo("id", targetId)}); + filteredBuilder.pushFilters(new Filter[] {filter}); int filteredPartitions = ((LanceScan) filteredBuilder.build()).planInputPartitions().length; Assertions.assertTrue( filteredPartitions < unfilteredPartitions, String.format( - "Expected zonemap pruning to reduce planned partitions for id=%d (before=%d, after=%d)", - targetId, unfilteredPartitions, filteredPartitions)); + "Expected zonemap pruning to reduce planned partitions for %s (before=%d, after=%d)", + description, unfilteredPartitions, filteredPartitions)); } private void checkIndex(String indexName) { From 44efa706df5d1492e10009adbbb206218ff97290 Mon Sep 17 00:00:00 2001 From: beinan Date: Tue, 21 Apr 2026 22:30:50 +0000 Subject: [PATCH 3/7] style: apply spotless fixes for zonemap changes --- .../java/org/lance/spark/read/ZonemapFragmentPruner.java | 6 ++++-- .../test/java/org/lance/spark/update/BaseAddIndexTest.java | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java index 8e2e259a9..7a9e310da 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/ZonemapFragmentPruner.java @@ -33,9 +33,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.math.BigDecimal; import java.math.BigInteger; -import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -267,7 +267,9 @@ private static BigDecimal toBigDecimal(Number value) { if (value instanceof BigInteger) { return new BigDecimal((BigInteger) value); } - if (value instanceof Byte || value instanceof Short || value instanceof Integer + if (value instanceof Byte + || value instanceof Short + || value instanceof Integer || value instanceof Long) { return BigDecimal.valueOf(value.longValue()); } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java index c6ae633cd..d0d51b622 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java @@ -516,7 +516,8 @@ public void testDropIndexThenRecreate() { } private void assertZonemapFilterPrunesFragments(int targetId) { - assertZonemapFilterPrunesFragments(new EqualTo("id", targetId), String.format("id=%d", targetId)); + assertZonemapFilterPrunesFragments( + new EqualTo("id", targetId), String.format("id=%d", targetId)); } private void assertZonemapFilterPrunesFragments(Filter filter, String description) { From ad3f07c493bb9db82ec01b3fbf7def18d65d62c0 Mon Sep 17 00:00:00 2001 From: beinan Date: Thu, 23 Apr 2026 00:52:39 +0000 Subject: [PATCH 4/7] feat: add distributed zonemap index build --- docs/src/operations/ddl/create-index.md | 8 +- .../spark/internal/LanceFragmentScanner.java | 1 + .../read/LanceCountStarPartitionReader.java | 1 + .../lance/spark/read/LanceInputPartition.java | 7 + .../java/org/lance/spark/read/LanceScan.java | 4 + .../lance/spark/read/LanceScanBuilder.java | 69 ++++++++- .../datasources/v2/AddIndexExec.scala | 135 ++++++++++++------ .../test/java/org/lance/spark/TestUtils.java | 6 +- .../LanceColumnarPartitionReaderTest.java | 15 +- .../LanceCountStarPartitionReaderTest.java | 6 +- .../spark/read/LanceDatasetReadTest.java | 6 +- .../org/lance/spark/read/LanceScanTest.java | 1 + .../lance/spark/update/BaseAddIndexTest.java | 73 ++++++++++ 13 files changed, 269 insertions(+), 63 deletions(-) diff --git a/docs/src/operations/ddl/create-index.md b/docs/src/operations/ddl/create-index.md index bf06e4ae0..a26889f2b 100755 --- a/docs/src/operations/ddl/create-index.md +++ b/docs/src/operations/ddl/create-index.md @@ -7,7 +7,7 @@ Creates a scalar index on a Lance table to accelerate queries. ## Overview -The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. Depending on the index method, Lance Spark either uses a fragment-parallel build path or delegates to Lance's built-in single-phase index creation path. +The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. Depending on the index method, Lance Spark either uses a fragment-parallel build path or a driver-coordinated commit flow after parallel executor builds. ## Basic Usage @@ -151,12 +151,12 @@ Consider creating an index when: The `CREATE INDEX` command operates as follows: -1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree` can use fragment-parallel execution, while `zonemap` is built through Lance's single-phase create-index API. -2. **Metadata Finalization**: Lance records the new index metadata as part of the index creation flow. +1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree`, `fts`, and `zonemap` can build physical index segments in parallel across fragments. Range-mode `btree` uses Spark repartitioning and sorted preprocessed data. +2. **Metadata Finalization**: Lance Spark merges or commits the resulting index metadata on the driver so the new logical index becomes visible atomically. 3. **Transactional Commit**: A new table version is committed with the new index information. The operation is atomic and ensures that concurrent reads are not affected. ## Notes and Limitations - **Index Methods**: The `zonemap`, `btree`, and `fts` methods are supported for scalar index creation. - **Zonemap Column Count**: Zonemap indexes currently support a single column only. The generic `CREATE INDEX` grammar accepts a column list, but Lance rejects multi-column zonemap creation. -- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one. This is because the underlying implementation uses `replace(true)`. +- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one. diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java index 90199a9d7..fe822f881 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java @@ -87,6 +87,7 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in if (inputPartition.getWhereCondition().isPresent()) { scanOptions.filter(inputPartition.getWhereCondition().get()); } + scanOptions.useScalarIndex(inputPartition.isUseScalarIndex()); scanOptions.batchSize(readOptions.getBatchSize()); if (readOptions.getNearest() != null) { scanOptions.nearest(readOptions.getNearest()); diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java index e80bb0fca..5235061ac 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java @@ -77,6 +77,7 @@ private long computeCount() { if (inputPartition.getWhereCondition().isPresent()) { scanOptionsBuilder.filter(inputPartition.getWhereCondition().get()); } + scanOptionsBuilder.useScalarIndex(inputPartition.isUseScalarIndex()); scanOptionsBuilder.withRowId(true); scanOptionsBuilder.columns(Lists.newArrayList()); scanOptionsBuilder.fragmentIds(fragmentIds); diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceInputPartition.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceInputPartition.java index fc7fd14ab..ecd8c29cf 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceInputPartition.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceInputPartition.java @@ -38,6 +38,7 @@ public class LanceInputPartition implements HasPartitionKey { private final Optional> topNSortOrders; private final Optional pushedAggregation; private final String scanId; + private final boolean useScalarIndex; /** * Initial storage options fetched from namespace.describeTable() on the driver. These are passed @@ -69,6 +70,7 @@ public LanceInputPartition( Optional> topNSortOrders, Optional pushedAggregation, String scanId, + boolean useScalarIndex, Map initialStorageOptions, String namespaceImpl, Map namespaceProperties, @@ -83,6 +85,7 @@ public LanceInputPartition( this.topNSortOrders = topNSortOrders; this.pushedAggregation = pushedAggregation; this.scanId = scanId; + this.useScalarIndex = useScalarIndex; this.initialStorageOptions = initialStorageOptions; this.namespaceImpl = namespaceImpl; this.namespaceProperties = namespaceProperties; @@ -129,6 +132,10 @@ public String getScanId() { return scanId; } + public boolean isUseScalarIndex() { + return useScalarIndex; + } + public Map getInitialStorageOptions() { return initialStorageOptions; } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java index 9cfb41f9b..3d813c98f 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java @@ -107,6 +107,7 @@ public class LanceScan private final String namespaceImpl; private final java.util.Map namespaceProperties; + private final boolean useScalarIndex; public LanceScan( StructType schema, @@ -121,6 +122,7 @@ public LanceScan( java.util.Map> zonemapStats, Set survivingFragmentIds, ZonemapFragmentPruner.PartitionInfo partitionInfo, + boolean useScalarIndex, java.util.Map initialStorageOptions, String namespaceImpl, java.util.Map namespaceProperties) { @@ -137,6 +139,7 @@ public LanceScan( this.zonemapStats = zonemapStats != null ? zonemapStats : Collections.emptyMap(); this.cachedSurvivingFragmentIds = survivingFragmentIds; this.partitionInfo = partitionInfo; + this.useScalarIndex = useScalarIndex; this.initialStorageOptions = initialStorageOptions; this.namespaceImpl = namespaceImpl; this.namespaceProperties = namespaceProperties; @@ -191,6 +194,7 @@ public InputPartition[] planInputPartitions() { topNSortOrders, pushedAggregation, scanId, + useScalarIndex, initialStorageOptions, namespaceImpl, namespaceProperties, diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java index e7434e532..6d4c1d6a0 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -71,6 +72,7 @@ public class LanceScanBuilder private StructType schema; private Filter[] pushedFilters = new Filter[0]; + private boolean forcePostScanFiltering = false; private Optional limit = Optional.empty(); private Optional offset = Optional.empty(); private Optional> topNSortOrders = Optional.empty(); @@ -213,7 +215,11 @@ public Scan build() { // Close the lazily opened dataset - it's no longer needed after build closeLazyDataset(); - Optional whereCondition = FilterPushDown.compileFiltersToSqlWhereClause(pushedFilters); + Optional whereCondition = + forcePostScanFiltering + ? Optional.empty() + : FilterPushDown.compileFiltersToSqlWhereClause(pushedFilters); + boolean useScalarIndex = zonemapStats.isEmpty(); return new LanceScan( schema, readOptions, @@ -227,6 +233,7 @@ public Scan build() { zonemapStats, survivingFragmentIds, partitionInfo, + useScalarIndex, initialStorageOptions, namespaceImpl, namespaceProperties); @@ -244,7 +251,14 @@ public Filter[] pushFilters(Filter[] filters) { } Filter[][] processFilters = FilterPushDown.processFilters(filters); pushedFilters = processFilters[0]; - return processFilters[1]; + forcePostScanFiltering = shouldForcePostScanFiltering(pushedFilters); + if (!forcePostScanFiltering) { + return processFilters[1]; + } + LOG.info( + "Using Spark post-scan filtering for segmented zonemap query on dataset {}", + readOptions.getDatasetUri()); + return concatFilters(processFilters[0], processFilters[1]); } @Override @@ -304,6 +318,9 @@ public boolean pushTopN(SortOrder[] orders, int limit) { @Override public boolean pushAggregation(Aggregation aggregation) { + if (forcePostScanFiltering && pushedFilters.length > 0) { + return false; + } AggregateFunc[] funcs = aggregation.aggregateExpressions(); if (aggregation.groupByExpressions().length > 0) { return false; @@ -407,4 +424,52 @@ private static Set extractReferencedColumns(Filter[] filters) { } return columns; } + + /** + * Segmented zonemap indexes are currently used safely for fragment pruning, but scan-time filter + * pushdown still needs a Spark-side fallback until Lance-core query execution fully handles that + * layout. + */ + private boolean shouldForcePostScanFiltering(Filter[] acceptedFilters) { + if (acceptedFilters.length == 0) { + return false; + } + + Set referencedColumns = extractReferencedColumns(acceptedFilters); + if (referencedColumns.isEmpty()) { + return false; + } + + Dataset dataset = getOrOpenDataset(); + Map fieldIdToName = new HashMap<>(); + for (LanceField field : dataset.getLanceSchema().fields()) { + fieldIdToName.put(field.getId(), field.getName()); + } + + Map segmentedZonemapCounts = new HashMap<>(); + for (Index idx : dataset.getIndexes()) { + if (idx.indexType() != IndexType.ZONEMAP || idx.fields().size() != 1) { + continue; + } + if (!idx.fragments().isPresent() || idx.fragments().get().size() != 1) { + continue; + } + + String columnName = fieldIdToName.get(idx.fields().get(0)); + if (columnName == null || !referencedColumns.contains(columnName)) { + continue; + } + + String key = idx.name() + ":" + columnName; + segmentedZonemapCounts.merge(key, 1, Integer::sum); + } + + return segmentedZonemapCounts.values().stream().anyMatch(count -> count > 1); + } + + private static Filter[] concatFilters(Filter[] first, Filter[] second) { + Filter[] combined = Arrays.copyOf(first, first.length + second.length); + System.arraycopy(second, 0, combined, first.length, second.length); + return combined; + } } diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index 4fb2fdc60..bfb85bde2 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -45,7 +45,7 @@ import scala.collection.JavaConverters._ *
    *
  • For BTREE index, it uses a range-based approach that redistributes and sorts data across partitions, creates indexes for each range in parallel, and finally merges them into a global index structure. *
  • For fragment-trainable scalar index types, it processes each fragment independently in parallel, merges index metadata and commits an index-creation transaction. - *
  • For zonemap index, it delegates to the underlying Lance single-phase index creation path because zonemap does not support fragment training. + *
  • For zonemap index, it builds one uncommitted segment per fragment in parallel and commits the logical index on the driver. *
*/ case class AddIndexExec( @@ -84,20 +84,21 @@ case class AddIndexExec( val uuid = UUID.randomUUID() val indexType = IndexUtils.buildIndexType(method) + if (indexType == IndexType.ZONEMAP && columns.size != 1) { + throw new UnsupportedOperationException( + "Zonemap index currently supports a single column only") + } + + // Create distributed index job and run it + val createdSegments = createIndexJob(lanceDataset, readOptions, indexType, uuid.toString, fragmentIds).run() + if (indexType == IndexType.ZONEMAP) { - if (columns.size != 1) { - throw new UnsupportedOperationException( - "Zonemap index currently supports a single column only") - } - createDirectScalarIndex(readOptions, indexType) + commitIndexSegments(readOptions, columns.head, createdSegments) return Seq(new GenericInternalRow(Array[Any]( fragmentIds.size.toLong, UTF8String.fromString(indexName)))) } - // Create distributed index job and run it - createIndexJob(lanceDataset, readOptions, uuid.toString, fragmentIds).run() - val dataset = Utils.openDatasetBuilder(readOptions).build() try { // Merge index metadata after all fragments are indexed @@ -150,23 +151,50 @@ case class AddIndexExec( UTF8String.fromString(indexName)))) } - private def createDirectScalarIndex( + private def commitIndexSegments( readOptions: LanceSparkReadOptions, - indexType: IndexType): Unit = { - val argsJson = IndexUtils.toJson(args) - val params = IndexParams.builder() - .setScalarIndexParams(ScalarIndexParams.create(method, argsJson)) - .build() - - val indexOptions = IndexOptions - .builder(columns.asJava, indexType, params) - .replace(true) - .withIndexName(indexName) - .build() - + column: String, + segments: Seq[Index]): Unit = { val dataset = Utils.openDatasetBuilder(readOptions).build() try { - dataset.createIndex(indexOptions) + val existingIndices = dataset.getIndexes.asScala + .filter(_.name() == indexName) + .toList + + val committedSegments = + dataset.commitExistingIndexSegments(indexName, column, segments.toList.asJava).asScala.toList + + if (existingIndices.nonEmpty) { + val committedUuids = committedSegments.map(_.uuid()).toSet + val refreshedDataset = Utils.openDatasetBuilder(readOptions).build() + try { + val removedIndices = refreshedDataset.getIndexes.asScala + .filter(index => index.name() == indexName && !committedUuids.contains(index.uuid())) + .toList + .asJava + + if (!removedIndices.isEmpty) { + val op = AddIndexOperation.builder() + .withNewIndices(Collections.emptyList()) + .withRemovedIndices(removedIndices) + .build() + val txn = new Transaction.Builder() + .readVersion(refreshedDataset.version()) + .operation(op) + .build() + try { + val newDataset = new CommitBuilder(refreshedDataset) + .writeParams(readOptions.getStorageOptions) + .execute(txn) + newDataset.close() + } finally { + txn.close() + } + } + } finally { + refreshedDataset.close() + } + } } finally { dataset.close() } @@ -175,6 +203,7 @@ case class AddIndexExec( private def createIndexJob( lanceDataset: LanceDataset, readOptions: LanceSparkReadOptions, + indexType: IndexType, uuid: String, fragmentIds: List[Integer]): IndexJob = { // Get namespace info from catalog if available (for credential vending on workers) @@ -192,7 +221,7 @@ case class AddIndexExec( case _ => (None, None, None, None) } - IndexUtils.buildIndexType(method) match { + indexType match { case IndexType.BTREE => val mode = args.find(_.name == "build_mode").map(_.value.asInstanceOf[String]) mode match { @@ -210,7 +239,7 @@ case class AddIndexExec( new FragmentBasedIndexJob( this, readOptions, - uuid, + Some(uuid), fragmentIds, nsImpl, nsProps, @@ -226,7 +255,7 @@ case class AddIndexExec( new FragmentBasedIndexJob( this, readOptions, - uuid, + if (indexType == IndexType.ZONEMAP) None else Some(uuid), fragmentIds, nsImpl, nsProps, @@ -240,7 +269,7 @@ case class AddIndexExec( * Interface for index job to implement different indexing strategies. */ trait IndexJob extends Serializable { - def run(): Unit + def run(): Seq[Index] } /** @@ -260,28 +289,32 @@ trait IndexJob extends Serializable { class FragmentBasedIndexJob( addIndexExec: AddIndexExec, readOptions: LanceSparkReadOptions, - uuid: String, + indexUuid: Option[String], fragmentIds: List[Integer], nsImpl: Option[String], nsProps: Option[Map[String, String]], tableId: Option[List[String]], initialStorageOpts: Option[Map[String, String]]) extends IndexJob { - override def run(): Unit = { + override def run(): Seq[Index] = { val encodedReadOptions = encode(readOptions) val columns = addIndexExec.columns.toList val argsJson = IndexUtils.toJson(addIndexExec.args) + val fragmentBatches = if (IndexUtils.buildIndexType(addIndexExec.method) == IndexType.ZONEMAP) { + batchFragments(fragmentIds) + } else { + fragmentIds.map(fid => List(fid)) + } - // Build per-fragment tasks - val tasks = fragmentIds.map { fid => + val tasks = fragmentBatches.map { batch => FragmentIndexTask( encodedReadOptions, columns, addIndexExec.method, argsJson, addIndexExec.indexName, - uuid, - fid, + indexUuid, + batch, nsImpl, nsProps, tableId, @@ -292,6 +325,14 @@ class FragmentBasedIndexJob( .parallelize(tasks, tasks.size) .map(t => t.execute()) .collect() + .map(decode[Index]) + .toSeq + } + + private def batchFragments(fragmentIds: List[Integer]): Seq[List[Integer]] = { + val targetTasks = math.max(1, math.min(fragmentIds.size, addIndexExec.session.sparkContext.defaultParallelism)) + val batchSize = math.ceil(fragmentIds.size.toDouble / targetTasks.toDouble).toInt + fragmentIds.grouped(batchSize).map(_.toList).toSeq } } @@ -305,7 +346,7 @@ class FragmentBasedIndexJob( * @param argsJson JSON string containing index parameters * @param indexName Name of the index being created * @param uuid Unique identifier for this index operation - * @param fragmentId ID of the fragment to create index on + * @param fragmentIds IDs of the fragments to create index on * @param namespaceImpl Implementation class for namespace operations * @param namespaceProperties Properties of the namespace * @param tableId Identifier for the table within the namespace @@ -317,8 +358,8 @@ case class FragmentIndexTask( method: String, argsJson: String, indexName: String, - uuid: String, - fragmentId: Int, + indexUuid: Option[String], + fragmentIds: List[Integer], namespaceImpl: Option[String], namespaceProperties: Option[Map[String, String]], tableId: Option[List[String]], @@ -333,23 +374,25 @@ case class FragmentIndexTask( val indexOptions = IndexOptions .builder(java.util.Arrays.asList(columns: _*), indexType, params) - .replace(true) - .withIndexName(indexName) - .withIndexUUID(uuid) - .withFragmentIds(Collections.singletonList(fragmentId)) - .build() + .withFragmentIds(fragmentIds.asJava) + if (indexType == IndexType.ZONEMAP) { + indexOptions.replace(false) + } else { + indexOptions + .replace(true) + .withIndexName(indexName) + indexUuid.foreach(indexOptions.withIndexUUID) + } val dataset = Utils.openDatasetBuilder(readOptions) .initialStorageOptions(initialStorageOptions.map(_.asJava).orNull) .build() try { - dataset.createIndex(indexOptions) + encode(dataset.createIndex(indexOptions.build())) } finally { dataset.close() } - - encode("OK") } } @@ -377,7 +420,7 @@ class RangeBasedBTreeIndexJob( private val VALUE_COLUMN_NAME = "value" - override def run(): Unit = { + override def run(): Seq[Index] = { if (addIndexExec.columns.size != 1) { throw new UnsupportedOperationException( "Range-based BTree index currently supports a single column only") @@ -426,6 +469,8 @@ class RangeBasedBTreeIndexJob( rangeDf.queryExecution.toRdd.mapPartitionsWithIndex { case (rangeId, rowsIter) => indexBuilder.buildForRange(rangeId, rowsIter) }.collect() + + Seq.empty } } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/TestUtils.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/TestUtils.java index 5fd9fde4f..3043c0fd8 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/TestUtils.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/TestUtils.java @@ -23,6 +23,7 @@ import java.net.URL; import java.util.Arrays; +import java.util.Collections; import java.util.List; public class TestUtils { @@ -88,9 +89,10 @@ public static class TestTable1Config { Optional.empty() /* topNSortOrders */, Optional.empty() /* pushedAggregation */, "test" /* scanId */, - null /* initialStorageOptions */, + true /* useScalarIndex */, + Collections.emptyMap() /* initialStorageOptions */, null /* namespaceImpl */, - null /* namespaceProperties */, + Collections.emptyMap() /* namespaceProperties */, null /* partitionKeyRow */); } } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceColumnarPartitionReaderTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceColumnarPartitionReaderTest.java index 4a3cc39e1..1c29d4f67 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceColumnarPartitionReaderTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceColumnarPartitionReaderTest.java @@ -43,9 +43,10 @@ public void test() throws Exception { Optional.empty() /* topNSortOrders */, Optional.empty() /* pushedAggregation */, "test" /* scanId */, - null /* initialStorageOptions */, + true /* useScalarIndex */, + Collections.emptyMap() /* initialStorageOptions */, null /* namespaceImpl */, - null /* namespaceProperties */, + Collections.emptyMap() /* namespaceProperties */, null /* partitionKeyRow */); try (LanceColumnarPartitionReader reader = new LanceColumnarPartitionReader(partition)) { List> expectedValues = TestUtils.TestTable1Config.expectedValues; @@ -86,9 +87,10 @@ public void testOffsetAndLimit() throws Exception { Optional.empty() /* topNSortOrders */, Optional.empty() /* pushedAggregation */, "testOffsetAndLimit" /* scanId */, - null /* initialStorageOptions */, + true /* useScalarIndex */, + Collections.emptyMap() /* initialStorageOptions */, null /* namespaceImpl */, - null /* namespaceProperties */, + Collections.emptyMap() /* namespaceProperties */, null /* partitionKeyRow */); try (LanceColumnarPartitionReader reader = new LanceColumnarPartitionReader(partition)) { List> expectedValues = TestUtils.TestTable1Config.expectedValues; @@ -131,9 +133,10 @@ public void testTopN() throws Exception { Optional.of(Collections.singletonList(builder.build())) /* topNSortOrders */, Optional.empty() /* pushedAggregation */, "testTopN" /* scanId */, - null /* initialStorageOptions */, + true /* useScalarIndex */, + Collections.emptyMap() /* initialStorageOptions */, null /* namespaceImpl */, - null /* namespaceProperties */, + Collections.emptyMap() /* namespaceProperties */, null /* partitionKeyRow */); try (LanceColumnarPartitionReader reader = new LanceColumnarPartitionReader(partition)) { List> expectedValues = TestUtils.TestTable1Config.expectedValues; diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceCountStarPartitionReaderTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceCountStarPartitionReaderTest.java index 23135af59..ce73fd0e8 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceCountStarPartitionReaderTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceCountStarPartitionReaderTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -49,9 +50,10 @@ public void testCloseReleasesArrowMemory() throws Exception { Optional.empty(), Optional.of(countStarAgg), "testCountStarMemoryLeak", + true, + Collections.emptyMap(), null, - null, - null, + Collections.emptyMap(), null); LanceCountStarPartitionReader reader = new LanceCountStarPartitionReader(partition); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceDatasetReadTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceDatasetReadTest.java index 2a8c6670c..a526a332a 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceDatasetReadTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceDatasetReadTest.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -125,9 +126,10 @@ public void validateFragment(List> expectedValues, int fragment, St Optional.empty() /* topNSortOrders */, Optional.empty() /* pushedAggregation */, "validateFragment" /* scanId */, - null /* initialStorageOptions */, + true /* useScalarIndex */, + Collections.emptyMap() /* initialStorageOptions */, null /* namespaceImpl */, - null /* namespaceProperties */, + Collections.emptyMap() /* namespaceProperties */, null /* partitionKeyRow */))) { try (ArrowReader reader = scanner.getArrowReader()) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java index 13fe3115b..22e6359a7 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java @@ -215,6 +215,7 @@ public void testOutputPartitioningWithPartitionInfo() { Collections.emptyMap(), null, partInfo, + true, Collections.emptyMap(), null, Collections.emptyMap()); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java index d0d51b622..31fc11812 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java @@ -226,6 +226,35 @@ public void testCreateZonemapIndexWithRowsPerZone() { org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); try { + int fragmentCount = lanceDataset.getFragments().size(); + int expectedSegmentCount = + Math.min(fragmentCount, spark.sparkContext().defaultParallelism()); + List zonemapSegments = + lanceDataset.getIndexes().stream() + .filter(index -> "test_index_zonemap".equals(index.name())) + .collect(Collectors.toList()); + int coveredFragments = + zonemapSegments.stream() + .map(index -> index.fragments().orElse(Collections.emptyList()).size()) + .mapToInt(Integer::intValue) + .sum(); + + Assertions.assertEquals( + expectedSegmentCount, + zonemapSegments.size(), + "Expected distributed zonemap build to batch fragments into bounded segment count"); + Assertions.assertTrue( + zonemapSegments.stream() + .allMatch(index -> index.fragments().isPresent() && !index.fragments().get().isEmpty()), + "Expected each zonemap segment to cover at least one fragment"); + Assertions.assertTrue( + zonemapSegments.stream() + .anyMatch(index -> index.fragments().isPresent() && index.fragments().get().size() > 1), + "Expected zonemap batching to create at least one multi-fragment segment"); + Assertions.assertEquals( + fragmentCount, + coveredFragments, + "Expected zonemap segments to cover all fragments exactly once"); Assertions.assertFalse( lanceDataset.getZonemapStats("id").isEmpty(), "Expected zonemap stats for indexed column"); @@ -261,6 +290,50 @@ public void testCreateMultiColumnZonemapIndexThrowsHelpfulError() { + exception.getMessage()); } + @Test + public void testRepeatedCreateZonemapIndexReplacesExistingSegments() { + prepareDataset(); + + String sql = + String.format( + "alter table %s create index test_index_zonemap_repeat using zonemap (id) with (rows_per_zone=4)", + fullTable); + + spark.sql(sql); + spark.sql(sql); + + org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); + try { + int fragmentCount = lanceDataset.getFragments().size(); + int expectedSegmentCount = + Math.min(fragmentCount, spark.sparkContext().defaultParallelism()); + List zonemapSegments = + lanceDataset.getIndexes().stream() + .filter(index -> "test_index_zonemap_repeat".equals(index.name())) + .collect(Collectors.toList()); + int coveredFragments = + zonemapSegments.stream() + .map(index -> index.fragments().orElse(Collections.emptyList()).size()) + .mapToInt(Integer::intValue) + .sum(); + + Assertions.assertEquals( + expectedSegmentCount, + zonemapSegments.size(), + "Expected recreated zonemap index to replace existing batched segments instead of duplicating them"); + Assertions.assertTrue( + zonemapSegments.stream() + .allMatch(index -> index.fragments().isPresent() && !index.fragments().get().isEmpty()), + "Expected recreated zonemap segments to keep non-empty fragment coverage"); + Assertions.assertEquals( + fragmentCount, + coveredFragments, + "Expected recreated zonemap segments to cover all fragments exactly once"); + } finally { + lanceDataset.close(); + } + } + @Test public void testCreateBTreeIndexWithRangeMode() { prepareDataset(); From aac8b23a49d34c56e07d017ca9c9a00401ee1c1c Mon Sep 17 00:00:00 2001 From: beinan Date: Fri, 8 May 2026 07:07:17 +0000 Subject: [PATCH 5/7] feat: implement segmented zonemap index creation and add validation --- docs/src/operations/ddl/create-index.md | 4 +- .../datasources/v2/AddIndexExec.scala | 74 +++++++++++++++---- .../lance/spark/update/BaseAddIndexTest.java | 65 ++++++++++++++-- pom.xml | 2 +- 4 files changed, 121 insertions(+), 24 deletions(-) diff --git a/docs/src/operations/ddl/create-index.md b/docs/src/operations/ddl/create-index.md index a26889f2b..e8d86bfb1 100755 --- a/docs/src/operations/ddl/create-index.md +++ b/docs/src/operations/ddl/create-index.md @@ -47,7 +47,7 @@ For the `btree` method, the following options are supported: | Option | Type | Description | |-------------|--------|----------------------------------------------| | `zone_size` | Long | The number of rows per zone in the B-tree index. | -| `build_mode`| String | Index building mode: 'fragment' builds indexes in parallel by fragment; 'range' sorts data by indexed columns first, then partitions and builds indexes in parallel by partition. Default is 'fragment'.| +| `build_mode`| String | Index building mode: `'fragment'` builds BTREE segments in parallel across fragment batches and commits them as one logical index for single-column BTREE builds; `'range'` sorts data by indexed columns first, then partitions and builds indexes in parallel by partition. Default is `'fragment'`.| ### FTS Options @@ -151,7 +151,7 @@ Consider creating an index when: The `CREATE INDEX` command operates as follows: -1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree`, `fts`, and `zonemap` can build physical index segments in parallel across fragments. Range-mode `btree` uses Spark repartitioning and sorted preprocessed data. +1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree`, `fts`, and `zonemap` can build physical index segments in parallel across fragments. Single-column fragment-mode `btree` and `zonemap` publish those segments directly as one logical index. Range-mode `btree` uses Spark repartitioning and sorted preprocessed data. 2. **Metadata Finalization**: Lance Spark merges or commits the resulting index metadata on the driver so the new logical index becomes visible atomically. 3. **Transactional Commit**: A new table version is committed with the new index information. The operation is atomic and ensures that concurrent reads are not affected. diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index bfb85bde2..d30aec8c7 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -43,9 +43,9 @@ import scala.collection.JavaConverters._ * Physical execution of CREATE INDEX (ALTER TABLE ... CREATE INDEX ...) for Lance datasets. * *
    - *
  • For BTREE index, it uses a range-based approach that redistributes and sorts data across partitions, creates indexes for each range in parallel, and finally merges them into a global index structure. - *
  • For fragment-trainable scalar index types, it processes each fragment independently in parallel, merges index metadata and commits an index-creation transaction. - *
  • For zonemap index, it builds one uncommitted segment per fragment in parallel and commits the logical index on the driver. + *
  • For BTREE index, range mode redistributes and sorts data across partitions, creates indexes for each range in parallel, and finally merges them into a global index structure. + *
  • For single-column BTREE fragment mode and zonemap, it builds uncommitted index segments in parallel and commits the logical index on the driver. + *
  • For other fragment-trainable scalar index types, it processes each fragment independently in parallel, merges index metadata and commits an index-creation transaction. *
*/ case class AddIndexExec( @@ -89,10 +89,21 @@ case class AddIndexExec( "Zonemap index currently supports a single column only") } - // Create distributed index job and run it - val createdSegments = createIndexJob(lanceDataset, readOptions, indexType, uuid.toString, fragmentIds).run() + val btreeBuildMode = IndexUtils.btreeBuildMode(indexType, args) + val useLogicalSegmentCommit = + IndexUtils.useLogicalSegmentCommit(indexType, columns, btreeBuildMode) - if (indexType == IndexType.ZONEMAP) { + // Create distributed index job and run it + val createdSegments = createIndexJob( + lanceDataset, + readOptions, + indexType, + uuid.toString, + fragmentIds, + btreeBuildMode, + useLogicalSegmentCommit).run() + + if (useLogicalSegmentCommit) { commitIndexSegments(readOptions, columns.head, createdSegments) return Seq(new GenericInternalRow(Array[Any]( fragmentIds.size.toLong, @@ -162,7 +173,10 @@ case class AddIndexExec( .toList val committedSegments = - dataset.commitExistingIndexSegments(indexName, column, segments.toList.asJava).asScala.toList + dataset.commitExistingIndexSegments( + indexName, + column, + segments.toList.asJava).asScala.toList if (existingIndices.nonEmpty) { val committedUuids = committedSegments.map(_.uuid()).toSet @@ -205,7 +219,9 @@ case class AddIndexExec( readOptions: LanceSparkReadOptions, indexType: IndexType, uuid: String, - fragmentIds: List[Integer]): IndexJob = { + fragmentIds: List[Integer], + btreeBuildMode: Option[String], + useLogicalSegmentCommit: Boolean): IndexJob = { // Get namespace info from catalog if available (for credential vending on workers) val (nsImpl, nsProps, tableId, initialStorageOpts): ( Option[String], @@ -223,8 +239,7 @@ case class AddIndexExec( indexType match { case IndexType.BTREE => - val mode = args.find(_.name == "build_mode").map(_.value.asInstanceOf[String]) - mode match { + btreeBuildMode match { case Some("range") => return new RangeBasedBTreeIndexJob( this, @@ -239,8 +254,9 @@ case class AddIndexExec( new FragmentBasedIndexJob( this, readOptions, - Some(uuid), + if (useLogicalSegmentCommit) None else Some(uuid), fragmentIds, + useLogicalSegmentCommit, nsImpl, nsProps, tableId, @@ -257,6 +273,7 @@ case class AddIndexExec( readOptions, if (indexType == IndexType.ZONEMAP) None else Some(uuid), fragmentIds, + indexType == IndexType.ZONEMAP, nsImpl, nsProps, tableId, @@ -291,6 +308,7 @@ class FragmentBasedIndexJob( readOptions: LanceSparkReadOptions, indexUuid: Option[String], fragmentIds: List[Integer], + groupFragmentsIntoSegments: Boolean, nsImpl: Option[String], nsProps: Option[Map[String, String]], tableId: Option[List[String]], @@ -300,7 +318,7 @@ class FragmentBasedIndexJob( val encodedReadOptions = encode(readOptions) val columns = addIndexExec.columns.toList val argsJson = IndexUtils.toJson(addIndexExec.args) - val fragmentBatches = if (IndexUtils.buildIndexType(addIndexExec.method) == IndexType.ZONEMAP) { + val fragmentBatches = if (groupFragmentsIntoSegments) { batchFragments(fragmentIds) } else { fragmentIds.map(fid => List(fid)) @@ -330,7 +348,8 @@ class FragmentBasedIndexJob( } private def batchFragments(fragmentIds: List[Integer]): Seq[List[Integer]] = { - val targetTasks = math.max(1, math.min(fragmentIds.size, addIndexExec.session.sparkContext.defaultParallelism)) + val targetTasks = + math.max(1, math.min(fragmentIds.size, addIndexExec.session.sparkContext.defaultParallelism)) val batchSize = math.ceil(fragmentIds.size.toDouble / targetTasks.toDouble).toInt fragmentIds.grouped(batchSize).map(_.toList).toSeq } @@ -368,6 +387,7 @@ case class FragmentIndexTask( def execute(): String = { val readOptions = decode[LanceSparkReadOptions](encodedReadOptions) val indexType = IndexUtils.buildIndexType(method) + val publishAsLogicalSegment = indexUuid.isEmpty val params = IndexParams.builder() .setScalarIndexParams(ScalarIndexParams.create(method, argsJson)) .build() @@ -375,7 +395,7 @@ case class FragmentIndexTask( val indexOptions = IndexOptions .builder(java.util.Arrays.asList(columns: _*), indexType, params) .withFragmentIds(fragmentIds.asJava) - if (indexType == IndexType.ZONEMAP) { + if (publishAsLogicalSegment) { indexOptions.replace(false) } else { indexOptions @@ -598,6 +618,32 @@ object IndexUtils { } } + def btreeBuildMode(indexType: IndexType, args: Seq[LanceNamedArgument]): Option[String] = { + if (indexType != IndexType.BTREE) { + None + } else { + val buildMode = args.find(_.name == "build_mode").map(_.value.asInstanceOf[String]) + buildMode match { + case Some("fragment") | Some("range") | None => + buildMode + case Some(unknown) => + throw new IllegalArgumentException( + s"Unrecognized build_mode: '$unknown'. Supported values are 'fragment' and 'range'.") + } + } + } + + def useLogicalSegmentCommit( + indexType: IndexType, + columns: Seq[String], + btreeBuildMode: Option[String]): Boolean = { + indexType match { + case IndexType.ZONEMAP => true + case IndexType.BTREE => btreeBuildMode != Some("range") && columns.size == 1 + case _ => false + } + } + def toJson(args: Seq[LanceNamedArgument]): String = { if (args.isEmpty) { "{}" diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java index 31fc11812..1eb43b1c9 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java @@ -131,6 +131,7 @@ public void testCreateIndexDistributed() { // Check index is created successfully checkIndex("test_index"); + assertSegmentedIndexCoverage("test_index"); } @Test @@ -152,6 +153,7 @@ public void testRepeatedCreateIndex() { // Check index is created successfully checkIndex("test_index_repeat"); + assertSegmentedIndexCoverage("test_index_repeat"); Dataset result2 = spark.sql( @@ -168,6 +170,7 @@ public void testRepeatedCreateIndex() { // Check index is created successfully checkIndex("test_index_repeat"); + assertSegmentedIndexCoverage("test_index_repeat"); } @Test @@ -227,8 +230,7 @@ public void testCreateZonemapIndexWithRowsPerZone() { org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); try { int fragmentCount = lanceDataset.getFragments().size(); - int expectedSegmentCount = - Math.min(fragmentCount, spark.sparkContext().defaultParallelism()); + int expectedSegmentCount = Math.min(fragmentCount, spark.sparkContext().defaultParallelism()); List zonemapSegments = lanceDataset.getIndexes().stream() .filter(index -> "test_index_zonemap".equals(index.name())) @@ -245,11 +247,13 @@ public void testCreateZonemapIndexWithRowsPerZone() { "Expected distributed zonemap build to batch fragments into bounded segment count"); Assertions.assertTrue( zonemapSegments.stream() - .allMatch(index -> index.fragments().isPresent() && !index.fragments().get().isEmpty()), + .allMatch( + index -> index.fragments().isPresent() && !index.fragments().get().isEmpty()), "Expected each zonemap segment to cover at least one fragment"); Assertions.assertTrue( zonemapSegments.stream() - .anyMatch(index -> index.fragments().isPresent() && index.fragments().get().size() > 1), + .anyMatch( + index -> index.fragments().isPresent() && index.fragments().get().size() > 1), "Expected zonemap batching to create at least one multi-fragment segment"); Assertions.assertEquals( fragmentCount, @@ -305,8 +309,7 @@ public void testRepeatedCreateZonemapIndexReplacesExistingSegments() { org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); try { int fragmentCount = lanceDataset.getFragments().size(); - int expectedSegmentCount = - Math.min(fragmentCount, spark.sparkContext().defaultParallelism()); + int expectedSegmentCount = Math.min(fragmentCount, spark.sparkContext().defaultParallelism()); List zonemapSegments = lanceDataset.getIndexes().stream() .filter(index -> "test_index_zonemap_repeat".equals(index.name())) @@ -323,7 +326,8 @@ public void testRepeatedCreateZonemapIndexReplacesExistingSegments() { "Expected recreated zonemap index to replace existing batched segments instead of duplicating them"); Assertions.assertTrue( zonemapSegments.stream() - .allMatch(index -> index.fragments().isPresent() && !index.fragments().get().isEmpty()), + .allMatch( + index -> index.fragments().isPresent() && !index.fragments().get().isEmpty()), "Expected recreated zonemap segments to keep non-empty fragment coverage"); Assertions.assertEquals( fragmentCount, @@ -383,6 +387,13 @@ public void testCreateBTreeIndexWithFragmentMode() { Assertions.assertEquals("test_index_btree_fragment", indexName); checkIndex("test_index_btree_fragment"); + assertSegmentedIndexCoverage("test_index_btree_fragment"); + + Dataset query = spark.sql(String.format("select * from %s where id=15", fullTable)); + Assertions.assertEquals(1L, query.count()); + Row r = query.collectAsList().get(0); + Assertions.assertEquals(15, r.getInt(0)); + Assertions.assertEquals("text_15", r.getString(1)); } @Test @@ -638,4 +649,44 @@ private void checkIndex(String indexName) { lanceDataset.close(); } } + + private void assertSegmentedIndexCoverage(String indexName) { + org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); + try { + int fragmentCount = lanceDataset.getFragments().size(); + int expectedSegmentCount = Math.min(fragmentCount, spark.sparkContext().defaultParallelism()); + List segments = + lanceDataset.getIndexes().stream() + .filter(index -> indexName.equals(index.name())) + .collect(Collectors.toList()); + int coveredFragments = + segments.stream() + .map(index -> index.fragments().orElse(Collections.emptyList()).size()) + .mapToInt(Integer::intValue) + .sum(); + + Assertions.assertEquals( + expectedSegmentCount, + segments.size(), + "Expected segmented index build to batch fragments into bounded segment count"); + Assertions.assertTrue( + segments.stream() + .allMatch( + index -> index.fragments().isPresent() && !index.fragments().get().isEmpty()), + "Expected each index segment to cover at least one fragment"); + if (fragmentCount > expectedSegmentCount) { + Assertions.assertTrue( + segments.stream() + .anyMatch( + index -> index.fragments().isPresent() && index.fragments().get().size() > 1), + "Expected batched segment build to create at least one multi-fragment segment"); + } + Assertions.assertEquals( + fragmentCount, + coveredFragments, + "Expected committed segments to cover all fragments exactly once"); + } finally { + lanceDataset.close(); + } + } } diff --git a/pom.xml b/pom.xml index 4d0e26cff..ffbf9cbba 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 0.4.0-beta.3 - 6.0.0-beta.2 + 6.0.0-beta.3 0.1.3 14.0.2 From be30bfc4cda890e645de6bddb526af81a5240af1 Mon Sep 17 00:00:00 2001 From: beinan Date: Fri, 8 May 2026 07:46:32 +0000 Subject: [PATCH 6/7] refactor: scope logical segment commit to zonemap only Simplify useLogicalSegmentCommit to only return true for ZONEMAP. BTREE fragment-mode segment commit is not yet supported by the lance-jni layer (missing page_lookup.lance files), so remove the BTREE path and clean up related test assertions and docs. Co-Authored-By: Beinan Wang --- docs/src/operations/ddl/create-index.md | 4 ++-- .../datasources/v2/AddIndexExec.scala | 18 +++++------------- .../lance/spark/update/BaseAddIndexTest.java | 4 ---- 3 files changed, 7 insertions(+), 19 deletions(-) diff --git a/docs/src/operations/ddl/create-index.md b/docs/src/operations/ddl/create-index.md index e8d86bfb1..fdb092264 100755 --- a/docs/src/operations/ddl/create-index.md +++ b/docs/src/operations/ddl/create-index.md @@ -47,7 +47,7 @@ For the `btree` method, the following options are supported: | Option | Type | Description | |-------------|--------|----------------------------------------------| | `zone_size` | Long | The number of rows per zone in the B-tree index. | -| `build_mode`| String | Index building mode: `'fragment'` builds BTREE segments in parallel across fragment batches and commits them as one logical index for single-column BTREE builds; `'range'` sorts data by indexed columns first, then partitions and builds indexes in parallel by partition. Default is `'fragment'`.| +| `build_mode`| String | Index building mode: `'fragment'` builds indexes in parallel by fragment; `'range'` sorts data by indexed columns first, then partitions and builds indexes in parallel by partition. Default is `'fragment'`.| ### FTS Options @@ -151,7 +151,7 @@ Consider creating an index when: The `CREATE INDEX` command operates as follows: -1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree`, `fts`, and `zonemap` can build physical index segments in parallel across fragments. Single-column fragment-mode `btree` and `zonemap` publish those segments directly as one logical index. Range-mode `btree` uses Spark repartitioning and sorted preprocessed data. +1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree`, `fts`, and `zonemap` can build physical index segments in parallel across fragments. `zonemap` publishes those segments directly as one logical index. Range-mode `btree` uses Spark repartitioning and sorted preprocessed data. 2. **Metadata Finalization**: Lance Spark merges or commits the resulting index metadata on the driver so the new logical index becomes visible atomically. 3. **Transactional Commit**: A new table version is committed with the new index information. The operation is atomic and ensures that concurrent reads are not affected. diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index d30aec8c7..8de2a5b31 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -44,8 +44,8 @@ import scala.collection.JavaConverters._ * *
    *
  • For BTREE index, range mode redistributes and sorts data across partitions, creates indexes for each range in parallel, and finally merges them into a global index structure. - *
  • For single-column BTREE fragment mode and zonemap, it builds uncommitted index segments in parallel and commits the logical index on the driver. - *
  • For other fragment-trainable scalar index types, it processes each fragment independently in parallel, merges index metadata and commits an index-creation transaction. + *
  • For zonemap index, it builds uncommitted index segments in parallel and commits the logical index on the driver. + *
  • For fragment-trainable scalar index types (BTREE fragment mode, FTS, etc.), it processes each fragment independently in parallel, merges index metadata and commits an index-creation transaction. *
*/ case class AddIndexExec( @@ -90,8 +90,7 @@ case class AddIndexExec( } val btreeBuildMode = IndexUtils.btreeBuildMode(indexType, args) - val useLogicalSegmentCommit = - IndexUtils.useLogicalSegmentCommit(indexType, columns, btreeBuildMode) + val useLogicalSegmentCommit = IndexUtils.useLogicalSegmentCommit(indexType) // Create distributed index job and run it val createdSegments = createIndexJob( @@ -633,15 +632,8 @@ object IndexUtils { } } - def useLogicalSegmentCommit( - indexType: IndexType, - columns: Seq[String], - btreeBuildMode: Option[String]): Boolean = { - indexType match { - case IndexType.ZONEMAP => true - case IndexType.BTREE => btreeBuildMode != Some("range") && columns.size == 1 - case _ => false - } + def useLogicalSegmentCommit(indexType: IndexType): Boolean = { + indexType == IndexType.ZONEMAP } def toJson(args: Seq[LanceNamedArgument]): String = { diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java index 1eb43b1c9..74b8fc05c 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java @@ -131,7 +131,6 @@ public void testCreateIndexDistributed() { // Check index is created successfully checkIndex("test_index"); - assertSegmentedIndexCoverage("test_index"); } @Test @@ -153,7 +152,6 @@ public void testRepeatedCreateIndex() { // Check index is created successfully checkIndex("test_index_repeat"); - assertSegmentedIndexCoverage("test_index_repeat"); Dataset result2 = spark.sql( @@ -170,7 +168,6 @@ public void testRepeatedCreateIndex() { // Check index is created successfully checkIndex("test_index_repeat"); - assertSegmentedIndexCoverage("test_index_repeat"); } @Test @@ -387,7 +384,6 @@ public void testCreateBTreeIndexWithFragmentMode() { Assertions.assertEquals("test_index_btree_fragment", indexName); checkIndex("test_index_btree_fragment"); - assertSegmentedIndexCoverage("test_index_btree_fragment"); Dataset query = spark.sql(String.format("select * from %s where id=15", fullTable)); Assertions.assertEquals(1L, query.count()); From b18d4099f917d166b372c66a42a3538bb3c07af7 Mon Sep 17 00:00:00 2001 From: beinan Date: Mon, 11 May 2026 17:48:29 +0000 Subject: [PATCH 7/7] feat: add num_segments option for configurable zonemap segment count Allow users to control the number of index segments created during distributed zonemap builds via the num_segments DDL option. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/src/operations/ddl/create-index.md | 1 + .../datasources/v2/AddIndexExec.scala | 17 +++++-- .../lance/spark/update/BaseAddIndexTest.java | 47 +++++++++++++++++++ 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/docs/src/operations/ddl/create-index.md b/docs/src/operations/ddl/create-index.md index fdb092264..715a48449 100755 --- a/docs/src/operations/ddl/create-index.md +++ b/docs/src/operations/ddl/create-index.md @@ -39,6 +39,7 @@ For the `zonemap` method, the following options are supported: | Option | Type | Description | |-----------------|------|----------------------------------------------| | `rows_per_zone` | Long | The approximate number of rows per zonemap zone. | +| `num_segments` | Integer | Number of index segments to create. Each segment covers a batch of fragments. Defaults to `min(fragment_count, spark.default.parallelism)`. | ### BTree Options diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index 8de2a5b31..3131ab745 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -92,6 +92,12 @@ case class AddIndexExec( val btreeBuildMode = IndexUtils.btreeBuildMode(indexType, args) val useLogicalSegmentCommit = IndexUtils.useLogicalSegmentCommit(indexType) + val numSegmentsOpt = args.find(_.name == "num_segments") + if (numSegmentsOpt.isDefined && !useLogicalSegmentCommit) { + throw new IllegalArgumentException( + "num_segments option is only supported for index types that use segmented builds (e.g., zonemap)") + } + // Create distributed index job and run it val createdSegments = createIndexJob( lanceDataset, @@ -317,8 +323,9 @@ class FragmentBasedIndexJob( val encodedReadOptions = encode(readOptions) val columns = addIndexExec.columns.toList val argsJson = IndexUtils.toJson(addIndexExec.args) + val numSegments = addIndexExec.args.find(_.name == "num_segments").map(_.value.asInstanceOf[Number].intValue()) val fragmentBatches = if (groupFragmentsIntoSegments) { - batchFragments(fragmentIds) + batchFragments(fragmentIds, numSegments) } else { fragmentIds.map(fid => List(fid)) } @@ -346,9 +353,11 @@ class FragmentBasedIndexJob( .toSeq } - private def batchFragments(fragmentIds: List[Integer]): Seq[List[Integer]] = { - val targetTasks = - math.max(1, math.min(fragmentIds.size, addIndexExec.session.sparkContext.defaultParallelism)) + private def batchFragments(fragmentIds: List[Integer], numSegments: Option[Int] = None): Seq[List[Integer]] = { + val targetTasks = numSegments match { + case Some(n) => math.max(1, math.min(fragmentIds.size, n)) + case None => math.max(1, math.min(fragmentIds.size, addIndexExec.session.sparkContext.defaultParallelism)) + } val batchSize = math.ceil(fragmentIds.size.toDouble / targetTasks.toDouble).toInt fragmentIds.grouped(batchSize).map(_.toList).toSeq } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java index 74b8fc05c..27e4b9660 100755 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java @@ -335,6 +335,53 @@ public void testRepeatedCreateZonemapIndexReplacesExistingSegments() { } } + @Test + public void testCreateZonemapIndexWithNumSegments() { + prepareDataset(); + + Dataset result = + spark.sql( + String.format( + "alter table %s create index test_index_zonemap_segments using zonemap (id) with (num_segments = 3)", + fullTable)); + + Row row = result.collectAsList().get(0); + long fragmentsIndexed = row.getLong(0); + String indexName = row.getString(1); + + Assertions.assertTrue(fragmentsIndexed >= 2, "Expected at least 2 fragments to be indexed"); + Assertions.assertEquals("test_index_zonemap_segments", indexName); + + // Verify the number of segments matches the requested num_segments + org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build(); + try { + int fragmentCount = lanceDataset.getFragments().size(); + int expectedSegmentCount = Math.min(fragmentCount, 3); + List segments = + lanceDataset.getIndexes().stream() + .filter(index -> "test_index_zonemap_segments".equals(index.name())) + .collect(Collectors.toList()); + + Assertions.assertEquals( + expectedSegmentCount, + segments.size(), + "Expected num_segments=3 to produce exactly 3 segments (or fewer if fragment count < 3)"); + + int coveredFragments = + segments.stream() + .map(index -> index.fragments().orElse(Collections.emptyList()).size()) + .mapToInt(Integer::intValue) + .sum(); + Assertions.assertEquals( + fragmentCount, + coveredFragments, + "Expected committed segments to cover all fragments exactly once"); + } finally { + lanceDataset.close(); + } + } + + @Test public void testCreateBTreeIndexWithRangeMode() { prepareDataset();