Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion docker/tests/test_lance_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def test_properties_persist_after_insert(self, spark):


class TestDDLIndex:
"""Test DDL index operations: CREATE INDEX (BTree, FTS)."""
"""Test DDL index operations: CREATE INDEX (BTree, ZoneMap, FTS)."""

def test_create_btree_index_on_int(self, spark):
"""Test CREATE INDEX with BTree on integer column."""
Expand Down Expand Up @@ -562,6 +562,42 @@ def test_create_btree_index_on_string(self, spark):
""").collect()
assert len(query_result) == 3

def test_create_zonemap_index_on_int(self, spark):
"""Test CREATE INDEX with ZoneMap on integer column."""
spark.sql("""
CREATE TABLE default.test_table (
id INT,
name STRING,
value DOUBLE
)
""")

data = [(i, f"Name{i}", float(i * 10)) for i in range(100)]
df = spark.createDataFrame(data, ["id", "name", "value"])
df.writeTo("default.test_table").append()

result = spark.sql("""
ALTER TABLE default.test_table
CREATE INDEX idx_id_zonemap USING zonemap (id)
WITH (rows_per_zone = 8)
""").collect()

assert len(result) == 1
assert result[0][1] == "idx_id_zonemap"

indexes = spark.sql("""
SHOW INDEXES IN default.test_table
""").collect()
zonemap_rows = [row for row in indexes if row["name"] == "idx_id_zonemap"]
assert len(zonemap_rows) == 1
assert zonemap_rows[0]["index_type"] == "zonemap"

query_result = spark.sql("""
SELECT * FROM default.test_table WHERE id = 50
""").collect()
assert len(query_result) == 1
assert query_result[0].id == 50

def test_create_fts_index(self, spark):
"""Test CREATE INDEX with full-text search (FTS)."""
spark.sql("""
Expand Down
39 changes: 34 additions & 5 deletions docs/src/operations/ddl/create-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Creates a scalar index on a Lance table to accelerate queries.

## Overview

The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. This operation is performed in a distributed manner, building indexes for each data fragment in parallel.
The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. Depending on the index method, Lance Spark either uses a fragment-parallel build path or a driver-coordinated commit flow after parallel executor builds.

## Basic Usage

Expand All @@ -24,13 +24,22 @@ The following index methods are supported:

| Method | Description |
|---------|-----------------------------------------------------------------------------|
| `zonemap` | Lightweight min/max index for fragment pruning on a scalar column. |
| `btree` | B-tree index for efficient range queries and point lookups on scalar columns. |
| `fts` | Full-text search (inverted) index for text search on string columns. |

## Options

The `CREATE INDEX` command supports options via the `WITH` clause to control index creation. These options are specific to the chosen index method.

### ZoneMap Options

For the `zonemap` method, the following options are supported:

| Option | Type | Description |
|-----------------|------|----------------------------------------------|
| `rows_per_zone` | Long | The approximate number of rows per zonemap zone. |

### BTree Options

For the `btree` method, the following options are supported:
Expand Down Expand Up @@ -77,6 +86,15 @@ Create a composite index on multiple columns.
ALTER TABLE lance.db.logs CREATE INDEX idx_ts_level USING btree (timestamp, level);
```

### Lightweight Fragment Pruning

Create a zonemap index when you want lightweight min/max-based fragment pruning:

=== "SQL"
```sql
ALTER TABLE lance.db.users CREATE INDEX idx_id_zonemap USING zonemap (id);
```

### Indexing with Options

Create an index and specify the `zone_size` for the B-tree:
Expand All @@ -86,6 +104,15 @@ Create an index and specify the `zone_size` for the B-tree:
ALTER TABLE lance.db.users CREATE INDEX idx_id_zoned USING btree (id) WITH (zone_size = 2048);
```

### Zonemap with Options

Create a zonemap index and specify the approximate number of rows per zone:

=== "SQL"
```sql
ALTER TABLE lance.db.users CREATE INDEX idx_id_zonemap USING zonemap (id) WITH (rows_per_zone = 2048);
```

### Full-Text Search Index

Create an FTS index on a text column:
Expand Down Expand Up @@ -117,17 +144,19 @@ The `CREATE INDEX` command returns the following information about the operation
Consider creating an index when:

- You frequently filter a large table on a specific column.
- You want lightweight fragment pruning based on per-zone min/max statistics.
- Your queries involve point lookups or small range scans.

## How It Works

The `CREATE INDEX` command operates as follows:

1. **Distributed Index Building**: For each fragment in the Lance dataset, a separate task is launched to build an index on the specified column(s).
2. **Metadata Merging**: Once all per-fragment indexes are built, their metadata is collected and merged.
1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree`, `fts`, and `zonemap` can build physical index segments in parallel across fragments. Range-mode `btree` uses Spark repartitioning and sorted preprocessed data.
2. **Metadata Finalization**: Lance Spark merges or commits the resulting index metadata on the driver so the new logical index becomes visible atomically.
3. **Transactional Commit**: A new table version is committed with the new index information. The operation is atomic and ensures that concurrent reads are not affected.

## Notes and Limitations

- **Index Methods**: The `btree` and `fts` methods are supported for scalar index creation.
- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one. This is because the underlying implementation uses `replace(true)`.
- **Index Methods**: The `zonemap`, `btree`, and `fts` methods are supported for scalar index creation.
- **Zonemap Column Count**: Zonemap indexes currently support a single column only. The generic `CREATE INDEX` grammar accepts a column list, but Lance rejects multi-column zonemap creation.
- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one.
2 changes: 1 addition & 1 deletion docs/src/operations/ddl/show-indexes.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The `SHOW INDEXES` command returns the following columns:
|-------------------------|---------------|--------------------------------------------------------------------|
| `name` | string | Logical name of the index. |
| `fields` | array<string> | List of column names included in the index. |
| `index_type` | string | Human-readable index type (for example `btree`). |
| `index_type` | string | Human-readable index type (for example `btree` or `zonemap`). |
| `num_indexed_fragments` | long | Number of fragments fully or partially covered by the index. |
| `num_indexed_rows` | long | Approximate number of rows covered by the index. |
| `num_unindexed_fragments` | long | Number of fragments that are not yet indexed. |
Expand Down
38 changes: 37 additions & 1 deletion integration-tests/test_lance_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ def test_compression_metadata_reaches_lance_file(self, spark):


class TestDDLIndex:
"""Test DDL index operations: CREATE INDEX (BTree, FTS)."""
"""Test DDL index operations: CREATE INDEX (BTree, ZoneMap, FTS)."""

def test_create_btree_index_on_int(self, spark):
"""Test CREATE INDEX with BTree on integer column."""
Expand Down Expand Up @@ -690,6 +690,42 @@ def test_create_btree_index_on_string(self, spark):
""").collect()
assert len(query_result) == 3

def test_create_zonemap_index_on_int(self, spark):
"""Test CREATE INDEX with ZoneMap on integer column."""
spark.sql("""
CREATE TABLE default.test_table (
id INT,
name STRING,
value DOUBLE
)
""")

data = [(i, f"Name{i}", float(i * 10)) for i in range(100)]
df = spark.createDataFrame(data, ["id", "name", "value"])
df.writeTo("default.test_table").append()

result = spark.sql("""
ALTER TABLE default.test_table
CREATE INDEX idx_id_zonemap USING zonemap (id)
WITH (rows_per_zone = 8)
""").collect()

assert len(result) == 1
assert result[0][1] == "idx_id_zonemap"

indexes = spark.sql("""
SHOW INDEXES IN default.test_table
""").collect()
zonemap_rows = [row for row in indexes if row["name"] == "idx_id_zonemap"]
assert len(zonemap_rows) == 1
assert zonemap_rows[0]["index_type"] == "zonemap"

query_result = spark.sql("""
SELECT * FROM default.test_table WHERE id = 50
""").collect()
assert len(query_result) == 1
assert query_result[0].id == 50

def test_create_fts_index(self, spark):
"""Test CREATE INDEX with full-text search (FTS)."""
spark.sql("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
if (inputPartition.getWhereCondition().isPresent()) {
scanOptions.filter(inputPartition.getWhereCondition().get());
}
scanOptions.useScalarIndex(inputPartition.isUseScalarIndex());
scanOptions.batchSize(readOptions.getBatchSize());
if (readOptions.getNearest() != null) {
scanOptions.nearest(readOptions.getNearest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private long computeCount() {
if (inputPartition.getWhereCondition().isPresent()) {
scanOptionsBuilder.filter(inputPartition.getWhereCondition().get());
}
scanOptionsBuilder.useScalarIndex(inputPartition.isUseScalarIndex());
scanOptionsBuilder.withRowId(true);
scanOptionsBuilder.columns(Lists.newArrayList());
scanOptionsBuilder.fragmentIds(fragmentIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class LanceInputPartition implements HasPartitionKey {
private final Optional<List<ColumnOrdering>> topNSortOrders;
private final Optional<Aggregation> pushedAggregation;
private final String scanId;
private final boolean useScalarIndex;

/**
* Initial storage options fetched from namespace.describeTable() on the driver. These are passed
Expand Down Expand Up @@ -69,6 +70,7 @@ public LanceInputPartition(
Optional<List<ColumnOrdering>> topNSortOrders,
Optional<Aggregation> pushedAggregation,
String scanId,
boolean useScalarIndex,
Map<String, String> initialStorageOptions,
String namespaceImpl,
Map<String, String> namespaceProperties,
Expand All @@ -83,6 +85,7 @@ public LanceInputPartition(
this.topNSortOrders = topNSortOrders;
this.pushedAggregation = pushedAggregation;
this.scanId = scanId;
this.useScalarIndex = useScalarIndex;
this.initialStorageOptions = initialStorageOptions;
this.namespaceImpl = namespaceImpl;
this.namespaceProperties = namespaceProperties;
Expand Down Expand Up @@ -129,6 +132,10 @@ public String getScanId() {
return scanId;
}

public boolean isUseScalarIndex() {
return useScalarIndex;
}

public Map<String, String> getInitialStorageOptions() {
return initialStorageOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class LanceScan
private final String namespaceImpl;

private final java.util.Map<String, String> namespaceProperties;
private final boolean useScalarIndex;

public LanceScan(
StructType schema,
Expand All @@ -121,6 +122,7 @@ public LanceScan(
java.util.Map<String, List<ZoneStats>> zonemapStats,
Set<Integer> survivingFragmentIds,
ZonemapFragmentPruner.PartitionInfo partitionInfo,
boolean useScalarIndex,
java.util.Map<String, String> initialStorageOptions,
String namespaceImpl,
java.util.Map<String, String> namespaceProperties) {
Expand All @@ -137,6 +139,7 @@ public LanceScan(
this.zonemapStats = zonemapStats != null ? zonemapStats : Collections.emptyMap();
this.cachedSurvivingFragmentIds = survivingFragmentIds;
this.partitionInfo = partitionInfo;
this.useScalarIndex = useScalarIndex;
this.initialStorageOptions = initialStorageOptions;
this.namespaceImpl = namespaceImpl;
this.namespaceProperties = namespaceProperties;
Expand Down Expand Up @@ -191,6 +194,7 @@ public InputPartition[] planInputPartitions() {
topNSortOrders,
pushedAggregation,
scanId,
useScalarIndex,
initialStorageOptions,
namespaceImpl,
namespaceProperties,
Expand Down
Loading
Loading