Skip to content

Commit 6392ab6

Browse files
committed
feat: add distributed zonemap index build
1 parent ab8d099 commit 6392ab6

13 files changed

Lines changed: 269 additions & 63 deletions

File tree

docs/src/operations/ddl/create-index.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Creates a scalar index on a Lance table to accelerate queries.
77

88
## Overview
99

10-
The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. Depending on the index method, Lance Spark either uses a fragment-parallel build path or delegates to Lance's built-in single-phase index creation path.
10+
The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. Depending on the index method, Lance Spark either uses a fragment-parallel build path or a driver-coordinated commit flow after parallel executor builds.
1111

1212
## Basic Usage
1313

@@ -151,12 +151,12 @@ Consider creating an index when:
151151

152152
The `CREATE INDEX` command operates as follows:
153153

154-
1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree` can use fragment-parallel execution, while `zonemap` is built through Lance's single-phase create-index API.
155-
2. **Metadata Finalization**: Lance records the new index metadata as part of the index creation flow.
154+
1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree`, `fts`, and `zonemap` can build physical index segments in parallel across fragments. Range-mode `btree` uses Spark repartitioning and sorted preprocessed data.
155+
2. **Metadata Finalization**: Lance Spark merges or commits the resulting index metadata on the driver so the new logical index becomes visible atomically.
156156
3. **Transactional Commit**: A new table version is committed with the new index information. The operation is atomic and ensures that concurrent reads are not affected.
157157

158158
## Notes and Limitations
159159

160160
- **Index Methods**: The `zonemap`, `btree`, and `fts` methods are supported for scalar index creation.
161161
- **Zonemap Column Count**: Zonemap indexes currently support a single column only. The generic `CREATE INDEX` grammar accepts a column list, but Lance rejects multi-column zonemap creation.
162-
- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one. This is because the underlying implementation uses `replace(true)`.
162+
- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one.

lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
8787
if (inputPartition.getWhereCondition().isPresent()) {
8888
scanOptions.filter(inputPartition.getWhereCondition().get());
8989
}
90+
scanOptions.useScalarIndex(inputPartition.isUseScalarIndex());
9091
scanOptions.batchSize(readOptions.getBatchSize());
9192
if (readOptions.getNearest() != null) {
9293
scanOptions.nearest(readOptions.getNearest());

lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ private long computeCount() {
7777
if (inputPartition.getWhereCondition().isPresent()) {
7878
scanOptionsBuilder.filter(inputPartition.getWhereCondition().get());
7979
}
80+
scanOptionsBuilder.useScalarIndex(inputPartition.isUseScalarIndex());
8081
scanOptionsBuilder.withRowId(true);
8182
scanOptionsBuilder.columns(Lists.newArrayList());
8283
scanOptionsBuilder.fragmentIds(fragmentIds);

lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceInputPartition.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class LanceInputPartition implements HasPartitionKey {
3838
private final Optional<List<ColumnOrdering>> topNSortOrders;
3939
private final Optional<Aggregation> pushedAggregation;
4040
private final String scanId;
41+
private final boolean useScalarIndex;
4142

4243
/**
4344
* Initial storage options fetched from namespace.describeTable() on the driver. These are passed
@@ -69,6 +70,7 @@ public LanceInputPartition(
6970
Optional<List<ColumnOrdering>> topNSortOrders,
7071
Optional<Aggregation> pushedAggregation,
7172
String scanId,
73+
boolean useScalarIndex,
7274
Map<String, String> initialStorageOptions,
7375
String namespaceImpl,
7476
Map<String, String> namespaceProperties,
@@ -83,6 +85,7 @@ public LanceInputPartition(
8385
this.topNSortOrders = topNSortOrders;
8486
this.pushedAggregation = pushedAggregation;
8587
this.scanId = scanId;
88+
this.useScalarIndex = useScalarIndex;
8689
this.initialStorageOptions = initialStorageOptions;
8790
this.namespaceImpl = namespaceImpl;
8891
this.namespaceProperties = namespaceProperties;
@@ -129,6 +132,10 @@ public String getScanId() {
129132
return scanId;
130133
}
131134

135+
public boolean isUseScalarIndex() {
136+
return useScalarIndex;
137+
}
138+
132139
public Map<String, String> getInitialStorageOptions() {
133140
return initialStorageOptions;
134141
}

lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public class LanceScan
107107
private final String namespaceImpl;
108108

109109
private final java.util.Map<String, String> namespaceProperties;
110+
private final boolean useScalarIndex;
110111

111112
public LanceScan(
112113
StructType schema,
@@ -121,6 +122,7 @@ public LanceScan(
121122
java.util.Map<String, List<ZoneStats>> zonemapStats,
122123
Set<Integer> survivingFragmentIds,
123124
ZonemapFragmentPruner.PartitionInfo partitionInfo,
125+
boolean useScalarIndex,
124126
java.util.Map<String, String> initialStorageOptions,
125127
String namespaceImpl,
126128
java.util.Map<String, String> namespaceProperties) {
@@ -137,6 +139,7 @@ public LanceScan(
137139
this.zonemapStats = zonemapStats != null ? zonemapStats : Collections.emptyMap();
138140
this.cachedSurvivingFragmentIds = survivingFragmentIds;
139141
this.partitionInfo = partitionInfo;
142+
this.useScalarIndex = useScalarIndex;
140143
this.initialStorageOptions = initialStorageOptions;
141144
this.namespaceImpl = namespaceImpl;
142145
this.namespaceProperties = namespaceProperties;
@@ -191,6 +194,7 @@ public InputPartition[] planInputPartitions() {
191194
topNSortOrders,
192195
pushedAggregation,
193196
scanId,
197+
useScalarIndex,
194198
initialStorageOptions,
195199
namespaceImpl,
196200
namespaceProperties,

lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.slf4j.LoggerFactory;
4949

5050
import java.util.ArrayList;
51+
import java.util.Arrays;
5152
import java.util.Collections;
5253
import java.util.HashMap;
5354
import java.util.HashSet;
@@ -70,6 +71,7 @@ public class LanceScanBuilder
7071
private StructType schema;
7172

7273
private Filter[] pushedFilters = new Filter[0];
74+
private boolean forcePostScanFiltering = false;
7375
private Optional<Integer> limit = Optional.empty();
7476
private Optional<Integer> offset = Optional.empty();
7577
private Optional<List<ColumnOrdering>> topNSortOrders = Optional.empty();
@@ -214,7 +216,11 @@ public Scan build() {
214216
// Close the lazily opened dataset - it's no longer needed after build
215217
closeLazyDataset();
216218

217-
Optional<String> whereCondition = FilterPushDown.compileFiltersToSqlWhereClause(pushedFilters);
219+
Optional<String> whereCondition =
220+
forcePostScanFiltering
221+
? Optional.empty()
222+
: FilterPushDown.compileFiltersToSqlWhereClause(pushedFilters);
223+
boolean useScalarIndex = zonemapStats.isEmpty();
218224
return new LanceScan(
219225
schema,
220226
readOptions,
@@ -228,6 +234,7 @@ public Scan build() {
228234
zonemapStats,
229235
survivingFragmentIds,
230236
partitionInfo,
237+
useScalarIndex,
231238
initialStorageOptions,
232239
namespaceImpl,
233240
namespaceProperties);
@@ -245,7 +252,14 @@ public Filter[] pushFilters(Filter[] filters) {
245252
}
246253
Filter[][] processFilters = FilterPushDown.processFilters(filters);
247254
pushedFilters = processFilters[0];
248-
return processFilters[1];
255+
forcePostScanFiltering = shouldForcePostScanFiltering(pushedFilters);
256+
if (!forcePostScanFiltering) {
257+
return processFilters[1];
258+
}
259+
LOG.info(
260+
"Using Spark post-scan filtering for segmented zonemap query on dataset {}",
261+
readOptions.getDatasetUri());
262+
return concatFilters(processFilters[0], processFilters[1]);
249263
}
250264

251265
@Override
@@ -305,6 +319,9 @@ public boolean pushTopN(SortOrder[] orders, int limit) {
305319

306320
@Override
307321
public boolean pushAggregation(Aggregation aggregation) {
322+
if (forcePostScanFiltering && pushedFilters.length > 0) {
323+
return false;
324+
}
308325
AggregateFunc[] funcs = aggregation.aggregateExpressions();
309326
if (aggregation.groupByExpressions().length > 0) {
310327
return false;
@@ -408,4 +425,52 @@ private static Set<String> extractReferencedColumns(Filter[] filters) {
408425
}
409426
return columns;
410427
}
428+
429+
/**
430+
* Segmented zonemap indexes are currently used safely for fragment pruning, but scan-time filter
431+
* pushdown still needs a Spark-side fallback until Lance-core query execution fully handles that
432+
* layout.
433+
*/
434+
private boolean shouldForcePostScanFiltering(Filter[] acceptedFilters) {
435+
if (acceptedFilters.length == 0) {
436+
return false;
437+
}
438+
439+
Set<String> referencedColumns = extractReferencedColumns(acceptedFilters);
440+
if (referencedColumns.isEmpty()) {
441+
return false;
442+
}
443+
444+
Dataset dataset = getOrOpenDataset();
445+
Map<Integer, String> fieldIdToName = new HashMap<>();
446+
for (LanceField field : dataset.getLanceSchema().fields()) {
447+
fieldIdToName.put(field.getId(), field.getName());
448+
}
449+
450+
Map<String, Integer> segmentedZonemapCounts = new HashMap<>();
451+
for (Index idx : dataset.getIndexes()) {
452+
if (idx.indexType() != IndexType.ZONEMAP || idx.fields().size() != 1) {
453+
continue;
454+
}
455+
if (!idx.fragments().isPresent() || idx.fragments().get().size() != 1) {
456+
continue;
457+
}
458+
459+
String columnName = fieldIdToName.get(idx.fields().get(0));
460+
if (columnName == null || !referencedColumns.contains(columnName)) {
461+
continue;
462+
}
463+
464+
String key = idx.name() + ":" + columnName;
465+
segmentedZonemapCounts.merge(key, 1, Integer::sum);
466+
}
467+
468+
return segmentedZonemapCounts.values().stream().anyMatch(count -> count > 1);
469+
}
470+
471+
private static Filter[] concatFilters(Filter[] first, Filter[] second) {
472+
Filter[] combined = Arrays.copyOf(first, first.length + second.length);
473+
System.arraycopy(second, 0, combined, first.length, second.length);
474+
return combined;
475+
}
411476
}

0 commit comments

Comments
 (0)