Skip to content

Commit c9df56e

Browse files
beinanBeinan Wang
andauthored
feat: add distributed zonemap index build with configurable segments (#516)
## Summary - Add zonemap as a new index type in `CREATE INDEX` DDL with distributed build support - Batch fragments into configurable segments via `num_segments` option (defaults to `spark.default.parallelism`) - Each segment is built in parallel on Spark executors and committed as a logical index on the driver - Zonemap indexes currently support single column only ## What Changed - `AddIndexExec.scala`: Zonemap-specific path with `ZonemapIndexJob`/`ZonemapIndexTask` and `commitIndexSegments` - `create-index.md`: Document zonemap index type, options, and usage - Tests: unit tests for segment creation/validation and integration test ## Notes - Rebased cleanly onto current `main` - Depends on lance-core `7.0.0-beta.10` or newer which includes zonemap segment support - Supersedes PR #473 and closed PR #466 ## Test plan - [x] CI passes (lint, unit tests, integration tests across all Spark/Scala versions) - [x] Zonemap index creation with default segment count - [x] Zonemap index creation with explicit `num_segments` - [x] Repeated zonemap index creation replaces existing segments - [x] Query correctness after zonemap index creation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Beinan Wang <beinanwang@microsoft.com>
1 parent 0a14502 commit c9df56e

4 files changed

Lines changed: 624 additions & 172 deletions

File tree

docs/src/operations/ddl/create-index.md

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Creates a scalar index on a Lance table to accelerate queries.
77

88
## Overview
99

10-
The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. This operation is performed in a distributed manner, building indexes for each data fragment in parallel.
10+
The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. Depending on the index method, Lance Spark either uses a fragment-parallel build path or a driver-coordinated commit flow after parallel executor builds.
1111

1212
## Basic Usage
1313

@@ -24,13 +24,23 @@ The following index methods are supported:
2424

2525
| Method | Description |
2626
|---------|-----------------------------------------------------------------------------|
27+
| `zonemap` | Lightweight min/max index for fragment pruning on a scalar column. |
2728
| `btree` | B-tree index for efficient range queries and point lookups on scalar columns. |
2829
| `fts` | Full-text search (inverted) index for text search on string columns. |
2930

3031
## Options
3132

3233
The `CREATE INDEX` command supports options via the `WITH` clause to control index creation. These options are specific to the chosen index method.
3334

35+
### ZoneMap Options
36+
37+
For the `zonemap` method, the following options are supported:
38+
39+
| Option | Type | Description |
40+
|-----------------|------|----------------------------------------------|
41+
| `rows_per_zone` | Long | The approximate number of rows per zonemap zone. |
42+
| `num_segments` | Integer | Target number of index segments (upper bound; clamped to fragment count when larger). Each segment covers a batch of fragments. Defaults to `min(fragment_count, spark.default.parallelism)`. |
43+
3444
### BTree Options
3545

3646
For the `btree` method, the following options are supported:
@@ -92,6 +102,15 @@ Create a composite index on multiple columns.
92102
ALTER TABLE lance.db.logs CREATE INDEX idx_ts_level USING btree (timestamp, level);
93103
```
94104

105+
### Lightweight Fragment Pruning
106+
107+
Create a zonemap index when you want lightweight min/max-based fragment pruning:
108+
109+
=== "SQL"
110+
```sql
111+
ALTER TABLE lance.db.users CREATE INDEX idx_id_zonemap USING zonemap (id);
112+
```
113+
95114
### Indexing with Options
96115

97116
Create an index and specify the `zone_size` for the B-tree:
@@ -101,6 +120,15 @@ Create an index and specify the `zone_size` for the B-tree:
101120
ALTER TABLE lance.db.users CREATE INDEX idx_id_zoned USING btree (id) WITH (zone_size = 2048);
102121
```
103122

123+
### Zonemap with Options
124+
125+
Create a zonemap index and specify the approximate number of rows per zone:
126+
127+
=== "SQL"
128+
```sql
129+
ALTER TABLE lance.db.users CREATE INDEX idx_id_zonemap USING zonemap (id) WITH (rows_per_zone = 2048);
130+
```
131+
104132
### Full-Text Search Index
105133

106134
Create an FTS index on a text column:
@@ -133,17 +161,19 @@ The `CREATE INDEX` command returns the following information about the operation
133161
Consider creating an index when:
134162

135163
- You frequently filter a large table on a specific column.
164+
- You want lightweight fragment pruning based on per-zone min/max statistics.
136165
- Your queries involve point lookups or small range scans.
137166

138167
## How It Works
139168

140169
The `CREATE INDEX` command operates as follows:
141170

142-
1. **Distributed Index Building**: For each fragment in the Lance dataset, a separate task is launched to build an index on the specified column(s).
143-
2. **Metadata Merging**: Once all per-fragment indexes are built, their metadata is collected and merged.
171+
1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree`, `fts`, and `zonemap` can build physical index segments in parallel across fragments. `zonemap` publishes those segments directly as one logical index. Range-mode `btree` uses Spark repartitioning and sorted preprocessed data.
172+
2. **Metadata Finalization**: Lance Spark merges or commits the resulting index metadata on the driver so the new logical index becomes visible atomically.
144173
3. **Transactional Commit**: A new table version is committed with the new index information. The operation is atomic and ensures that concurrent reads are not affected.
145174

146175
## Notes and Limitations
147176

148-
- **Index Methods**: The `btree` and `fts` methods are supported for scalar index creation.
149-
- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one. This is because the underlying implementation uses `replace(true)`.
177+
- **Index Methods**: The `zonemap`, `btree`, and `fts` methods are supported for scalar index creation.
178+
- **Zonemap Column Count**: Zonemap indexes currently support a single column only. The generic `CREATE INDEX` grammar accepts a column list, but Lance rejects multi-column zonemap creation.
179+
- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one.

integration-tests/test_lance_spark.py

Lines changed: 81 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,42 @@ def test_create_btree_index_on_string(self, spark):
848848
assert len(query_result) == 3
849849
_assert_lance_index_metadata(spark, "default.employees", "idx_dept", "BTREE")
850850

851+
def test_create_zonemap_index_on_int(self, spark):
852+
"""Test CREATE INDEX with ZoneMap on integer column."""
853+
spark.sql("""
854+
CREATE TABLE default.test_table (
855+
id INT,
856+
name STRING,
857+
value DOUBLE
858+
)
859+
""")
860+
861+
data = [(i, f"Name{i}", float(i * 10)) for i in range(100)]
862+
df = spark.createDataFrame(data, ["id", "name", "value"])
863+
df.writeTo("default.test_table").append()
864+
865+
result = spark.sql("""
866+
ALTER TABLE default.test_table
867+
CREATE INDEX idx_id_zonemap USING zonemap (id)
868+
WITH (rows_per_zone = 8)
869+
""").collect()
870+
871+
assert len(result) == 1
872+
assert result[0][1] == "idx_id_zonemap"
873+
874+
indexes = spark.sql("""
875+
SHOW INDEXES IN default.test_table
876+
""").collect()
877+
zonemap_rows = [row for row in indexes if row["name"] == "idx_id_zonemap"]
878+
assert len(zonemap_rows) >= 1
879+
assert zonemap_rows[0]["index_type"] == "zonemap"
880+
881+
query_result = spark.sql("""
882+
SELECT * FROM default.test_table WHERE id = 50
883+
""").collect()
884+
assert len(query_result) == 1
885+
assert query_result[0].id == 50
886+
851887
def test_create_fts_index(self, spark):
852888
"""Test CREATE INDEX with full-text search (FTS)."""
853889
spark.sql("""
@@ -2582,26 +2618,26 @@ class TestStableRowIds:
25822618
_row_created_at_version for all rows in that fragment.
25832619
"""
25842620

2585-
def test_tblproperties_enable_stable_row_ids(self, spark, test_table):
2621+
def test_tblproperties_enable_stable_row_ids(self, spark):
25862622
"""Test that TBLPROPERTIES enables CDF version columns."""
2587-
spark.sql(f"""
2588-
CREATE TABLE {test_table} (
2623+
spark.sql("""
2624+
CREATE TABLE default.test_table (
25892625
id INT,
25902626
name STRING,
25912627
value INT
25922628
) TBLPROPERTIES ('enable_stable_row_ids' = 'true')
25932629
""")
25942630

2595-
spark.sql(f"""
2596-
INSERT INTO {test_table} VALUES
2631+
spark.sql("""
2632+
INSERT INTO default.test_table VALUES
25972633
(1, 'Alice', 100),
25982634
(2, 'Bob', 200),
25992635
(3, 'Charlie', 300)
26002636
""")
26012637

2602-
result = spark.sql(f"""
2638+
result = spark.sql("""
26032639
SELECT id, _row_created_at_version, _row_last_updated_at_version
2604-
FROM {test_table}
2640+
FROM default.test_table
26052641
ORDER BY id
26062642
""").collect()
26072643

@@ -2610,30 +2646,30 @@ def test_tblproperties_enable_stable_row_ids(self, spark, test_table):
26102646
assert row._row_created_at_version is not None
26112647
assert row._row_last_updated_at_version is not None
26122648

2613-
def test_default_behavior_no_stable_row_ids(self, spark, test_table):
2649+
def test_default_behavior_no_stable_row_ids(self, spark):
26142650
"""Test version columns without enable_stable_row_ids.
26152651
26162652
Without enable_stable_row_ids the Lance engine still populates
26172653
_row_created_at_version and _row_last_updated_at_version, but
26182654
returns a baseline value of 1 instead of the actual operation version.
26192655
"""
2620-
spark.sql(f"""
2621-
CREATE TABLE {test_table} (
2656+
spark.sql("""
2657+
CREATE TABLE default.test_table (
26222658
id INT,
26232659
name STRING,
26242660
value INT
26252661
)
26262662
""")
26272663

2628-
spark.sql(f"""
2629-
INSERT INTO {test_table} VALUES
2664+
spark.sql("""
2665+
INSERT INTO default.test_table VALUES
26302666
(1, 'Alice', 100),
26312667
(2, 'Bob', 200)
26322668
""")
26332669

2634-
result = spark.sql(f"""
2670+
result = spark.sql("""
26352671
SELECT id, _row_created_at_version, _row_last_updated_at_version
2636-
FROM {test_table}
2672+
FROM default.test_table
26372673
ORDER BY id
26382674
""").collect()
26392675

@@ -2644,23 +2680,23 @@ def test_default_behavior_no_stable_row_ids(self, spark, test_table):
26442680
assert row._row_last_updated_at_version == 1
26452681

26462682
@requires_update_or_merge
2647-
def test_cdc_incremental_ingestion_pattern(self, spark, test_table):
2683+
def test_cdc_incremental_ingestion_pattern(self, spark):
26482684
"""Test CDC incremental ingestion pipeline pattern.
26492685
26502686
Simulates a CDC pipeline that tracks the last processed version and
26512687
incrementally processes changes using version tracking columns.
26522688
"""
2653-
spark.sql(f"""
2654-
CREATE TABLE {test_table} (
2689+
spark.sql("""
2690+
CREATE TABLE default.test_table (
26552691
id INT,
26562692
name STRING,
26572693
value INT
26582694
) TBLPROPERTIES ('enable_stable_row_ids' = 'true')
26592695
""")
26602696

26612697
# v2: Initial data load
2662-
spark.sql(f"""
2663-
INSERT INTO {test_table} VALUES
2698+
spark.sql("""
2699+
INSERT INTO default.test_table VALUES
26642700
(1, 'Alice', 100),
26652701
(2, 'Bob', 200),
26662702
(3, 'Charlie', 300)
@@ -2670,7 +2706,7 @@ def test_cdc_incremental_ingestion_pattern(self, spark, test_table):
26702706
last_processed_version = 1
26712707
batch1 = spark.sql(f"""
26722708
SELECT id, name, value, _row_created_at_version, _row_last_updated_at_version
2673-
FROM {test_table}
2709+
FROM default.test_table
26742710
WHERE (_row_created_at_version > {last_processed_version})
26752711
OR (_row_last_updated_at_version > {last_processed_version})
26762712
ORDER BY id
@@ -2681,13 +2717,13 @@ def test_cdc_incremental_ingestion_pattern(self, spark, test_table):
26812717
last_processed_version = 2
26822718

26832719
# v3: Update one row, v4: Insert new row
2684-
spark.sql(f"UPDATE {test_table} SET value = value + 50 WHERE id = 1")
2685-
spark.sql(f"INSERT INTO {test_table} VALUES (4, 'David', 400)")
2720+
spark.sql("UPDATE default.test_table SET value = value + 50 WHERE id = 1")
2721+
spark.sql("INSERT INTO default.test_table VALUES (4, 'David', 400)")
26862722

26872723
# CDC Pipeline: Process batch 2 (changes since v2)
26882724
batch2 = spark.sql(f"""
26892725
SELECT id, name, value, _row_created_at_version, _row_last_updated_at_version
2690-
FROM {test_table}
2726+
FROM default.test_table
26912727
WHERE (_row_created_at_version > {last_processed_version})
26922728
OR (_row_last_updated_at_version > {last_processed_version})
26932729
ORDER BY id
@@ -2707,12 +2743,12 @@ def test_cdc_incremental_ingestion_pattern(self, spark, test_table):
27072743
last_processed_version = 4
27082744

27092745
# v5: More updates
2710-
spark.sql(f"UPDATE {test_table} SET value = value + 100 WHERE id IN (2, 3)")
2746+
spark.sql("UPDATE default.test_table SET value = value + 100 WHERE id IN (2, 3)")
27112747

27122748
# CDC Pipeline: Process batch 3 (changes since v4)
27132749
batch3 = spark.sql(f"""
27142750
SELECT id, name, value, _row_created_at_version, _row_last_updated_at_version
2715-
FROM {test_table}
2751+
FROM default.test_table
27162752
WHERE (_row_created_at_version > {last_processed_version})
27172753
OR (_row_last_updated_at_version > {last_processed_version})
27182754
ORDER BY id
@@ -2729,12 +2765,12 @@ def test_cdc_incremental_ingestion_pattern(self, spark, test_table):
27292765
last_processed_version = 5
27302766

27312767
# v6: Update entire table
2732-
spark.sql(f"UPDATE {test_table} SET value = value * 2")
2768+
spark.sql("UPDATE default.test_table SET value = value * 2")
27332769

27342770
# CDC Pipeline: Process batch 4 (changes since v5)
27352771
batch4 = spark.sql(f"""
27362772
SELECT id, name, value, _row_created_at_version, _row_last_updated_at_version
2737-
FROM {test_table}
2773+
FROM default.test_table
27382774
WHERE (_row_created_at_version > {last_processed_version})
27392775
OR (_row_last_updated_at_version > {last_processed_version})
27402776
ORDER BY id
@@ -2780,30 +2816,29 @@ def _register_cdf_catalog(self, spark):
27802816

27812817
return catalog_name
27822818

2783-
def test_catalog_level_stable_row_ids(self, spark, test_table):
2819+
def test_catalog_level_stable_row_ids(self, spark):
27842820
"""Test that catalog-level enable_stable_row_ids enables version columns without TBLPROPERTIES."""
27852821
catalog_name = self._register_cdf_catalog(spark)
2786-
cdf_table = f"{catalog_name}.{test_table}"
27872822

27882823
try:
27892824
# CREATE TABLE without TBLPROPERTIES — relies on catalog-level default
27902825
spark.sql(f"""
2791-
CREATE TABLE {cdf_table} (
2826+
CREATE TABLE {catalog_name}.default.test_table (
27922827
id INT,
27932828
name STRING,
27942829
value INT
27952830
)
27962831
""")
27972832

27982833
spark.sql(f"""
2799-
INSERT INTO {cdf_table} VALUES
2834+
INSERT INTO {catalog_name}.default.test_table VALUES
28002835
(1, 'Alice', 100),
28012836
(2, 'Bob', 200)
28022837
""")
28032838

28042839
result = spark.sql(f"""
28052840
SELECT id, _row_created_at_version, _row_last_updated_at_version
2806-
FROM {cdf_table}
2841+
FROM {catalog_name}.default.test_table
28072842
ORDER BY id
28082843
""").collect()
28092844

@@ -2813,52 +2848,52 @@ def test_catalog_level_stable_row_ids(self, spark, test_table):
28132848
assert row._row_last_updated_at_version is not None
28142849
finally:
28152850
try:
2816-
spark.sql(f"DROP TABLE IF EXISTS {cdf_table} PURGE")
2851+
spark.sql(f"DROP TABLE IF EXISTS {catalog_name}.default.test_table PURGE")
28172852
except Exception as e:
2818-
print(f"Failed to clean up {cdf_table}: {e}")
2853+
print(f"Failed to clean up {catalog_name}.default.test_table: {e}")
28192854

28202855

28212856
@requires_update_or_merge
2822-
def test_update_preserves_row_ids(self, spark, test_table):
2857+
def test_update_preserves_row_ids(self, spark):
28232858
"""Test that UPDATE preserves _rowid values when stable row IDs are enabled.
28242859
28252860
Verifies the native DeltaWriter.update() path with RowIdMeta attachment
28262861
keeps row IDs stable across updates, including multi-fragment scenarios.
28272862
"""
2828-
spark.sql(f"""
2829-
CREATE TABLE {test_table} (
2863+
spark.sql("""
2864+
CREATE TABLE default.test_table (
28302865
id INT,
28312866
name STRING,
28322867
value INT
28332868
) TBLPROPERTIES ('enable_stable_row_ids' = 'true')
28342869
""")
28352870

28362871
# Insert across two fragments
2837-
spark.sql(f"""
2838-
INSERT INTO {test_table} VALUES
2872+
spark.sql("""
2873+
INSERT INTO default.test_table VALUES
28392874
(1, 'Alice', 100),
28402875
(2, 'Bob', 200),
28412876
(3, 'Charlie', 300)
28422877
""")
2843-
spark.sql(f"""
2844-
INSERT INTO {test_table} VALUES
2878+
spark.sql("""
2879+
INSERT INTO default.test_table VALUES
28452880
(4, 'Dave', 400),
28462881
(5, 'Eve', 500)
28472882
""")
28482883

28492884
# Capture row IDs before update
2850-
before = spark.sql(f"""
2851-
SELECT id, _rowid FROM {test_table} ORDER BY id
2885+
before = spark.sql("""
2886+
SELECT id, _rowid FROM default.test_table ORDER BY id
28522887
""").collect()
28532888
row_ids_before = {row.id: row._rowid for row in before}
28542889
assert len(row_ids_before) == 5
28552890

28562891
# Update rows spanning both fragments
2857-
spark.sql(f"UPDATE {test_table} SET value = value + 1 WHERE value >= 200")
2892+
spark.sql("UPDATE default.test_table SET value = value + 1 WHERE value >= 200")
28582893

28592894
# Capture row IDs after update
2860-
after = spark.sql(f"""
2861-
SELECT id, _rowid, value FROM {test_table} ORDER BY id
2895+
after = spark.sql("""
2896+
SELECT id, _rowid, value FROM default.test_table ORDER BY id
28622897
""").collect()
28632898
row_ids_after = {row.id: row._rowid for row in after}
28642899

0 commit comments

Comments
 (0)