Skip to content

Commit 1e862f9

Browse files
authored
perf(read): consolidate driver-side Dataset opens and pin version for… (#567)
… snapshot isolation ## Summary Consolidate the two driver-side `Dataset.open()` calls during scan planning into one, and pin the resolved table version onto the read options shipped to executors. This reduces manifest IO and closes a cross-task snapshot-isolation gap on large tables. ## Motivation For each Spark scan, `LanceScanBuilder.build()` opens a dataset to read manifest summary, schema, sharding spec, and zonemap stats. Then `LanceScan.planInputPartitions()` opens the dataset *again* via `LanceSplit.planScan(readOptions)` to enumerate fragments and per-fragment row counts. Two issues: 1. **Performance** — Driver pays the manifest IO / `Dataset.open()` cost twice per query. On very large tables this measurably increases planning latency. 2. **Snapshot isolation bug** — When the user does not specify a version, both opens resolve "latest" independently. If a concurrent writer commits a new version between the two opens (or between the driver-side open and an executor-side open), tasks can observe a newer version than the one used for fragment pruning / statistics. The query then sees an inconsistent view. ## Changes ### `LanceSplit` - New overload `planScan(Dataset)` that accepts an already-opened dataset and does not close it. - Existing `planScan(LanceSparkReadOptions)` becomes a thin wrapper, kept for tests and external callers. ### `LanceScanBuilder.build()` - Calls `LanceSplit.planScan(dataset)` against the same handle used for manifest / zonemap loading, before `closeLazyDataset()`. - Calls `readOptions.withVersion(resolvedVersion)` to produce a pinned, immutable copy of the read options. - Passes the pre-computed splits, per-fragment row counts, and pinned options to `LanceScan`. ### `LanceScan` - Constructor accepts `precomputedSplits` and `precomputedFragmentRowCounts`. - `planInputPartitions()` no longer opens the dataset; it consumes the pre-computed result and skips the redundant `withVersion` wrap (the options are already pinned upstream). - The per-fragment row-count map is marked `transient` so it does not bloat plan-tree serialization or affect `BatchScanExec` / `ReusedExchange` comparisons. ### Tests - `LanceScanTest` updated for the new constructor parameters. - `LanceSplitTest` unchanged — both overloads remain covered. ## Why this is safe - `LanceSparkReadOptions.withVersion(...)` returns a new instance via the existing builder; the user-supplied options object is never mutated. - When the user explicitly pinned a version upstream, `dataset.getVersion().getId()` returns that same version, so the pin is a no-op. - All existing pruning paths (`pruneByRowAddrFilters`, `pruneByZonemapStats`, `pruneByLimit`) continue to operate on the same `List<LanceSplit>` shape; only the source of the list changed. ## Performance impact - Driver `Dataset.open()` calls per scan: **2 → 1**. - Manifest reads per scan: **2 → 1**. - No change to executor-side IO. ## Correctness impact - All tasks of a single query are now guaranteed to read the same dataset version, even under concurrent writes.
1 parent 20345de commit 1e862f9

8 files changed

Lines changed: 253 additions & 126 deletions

File tree

lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkReadOptions.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public class LanceSparkReadOptions implements Serializable {
107107
private final String datasetName;
108108
private final boolean pushDownFilters;
109109
private final Integer blockSize;
110-
private final Integer version;
110+
private final Long version;
111111
private final Integer indexCacheSize;
112112
private final Integer metadataCacheSize;
113113
private final int batchSize;
@@ -238,7 +238,7 @@ public Integer getBlockSize() {
238238
return blockSize;
239239
}
240240

241-
public Integer getVersion() {
241+
public Long getVersion() {
242242
return version;
243243
}
244244

@@ -312,7 +312,7 @@ public void setNamespace(LanceNamespace namespace) {
312312
* @param newVersion the version to use
313313
* @return a new LanceSparkReadOptions with the specified version
314314
*/
315-
public LanceSparkReadOptions withVersion(int newVersion) {
315+
public LanceSparkReadOptions withVersion(long newVersion) {
316316
return builder()
317317
.datasetUri(this.datasetUri)
318318
.pushDownFilters(this.pushDownFilters)
@@ -411,7 +411,7 @@ public static class Builder {
411411
private boolean pushDownFilters = DEFAULT_PUSH_DOWN_FILTERS;
412412
private Integer blockSize;
413413
private Query nearest;
414-
private Integer version;
414+
private Long version;
415415
private Integer indexCacheSize;
416416
private Integer metadataCacheSize;
417417
private int batchSize = DEFAULT_BATCH_SIZE;
@@ -453,7 +453,7 @@ public Builder nearest(String json) {
453453
return this;
454454
}
455455

456-
public Builder version(Integer version) {
456+
public Builder version(Long version) {
457457
this.version = version;
458458
return this;
459459
}
@@ -546,7 +546,7 @@ private void parseTypedFlags(Map<String, String> opts) {
546546
this.blockSize = Integer.parseInt(opts.get(CONFIG_BLOCK_SIZE));
547547
}
548548
if (opts.containsKey(CONFIG_VERSION)) {
549-
this.version = Integer.parseInt(opts.get(CONFIG_VERSION));
549+
this.version = Long.parseLong(opts.get(CONFIG_VERSION));
550550
}
551551
if (opts.containsKey(CONFIG_INDEX_CACHE_SIZE)) {
552552
this.indexCacheSize = Integer.parseInt(opts.get(CONFIG_INDEX_CACHE_SIZE));

lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,22 @@ public class LanceScan
8989
*/
9090
private final Set<Integer> cachedSurvivingFragmentIds;
9191

92+
/**
93+
* Splits pre-computed on the driver during {@link LanceScanBuilder#build()}. Each entry is one
94+
* fragment. Built from a single {@code Dataset} handle that was already opened for manifest /
95+
* schema / zonemap loading, so no second {@code Dataset.open()} is needed at {@link
96+
* #planInputPartitions()} time.
97+
*/
98+
private final List<LanceSplit> precomputedSplits;
99+
100+
/**
101+
* Per-fragment logical row counts (after deletions), captured together with {@link
102+
* #precomputedSplits} on the driver. Consumed by {@link #pruneByLimit}. Not declared {@code
103+
* transient} because Java deserialization would skip the constructor and leave the field {@code
104+
* null}, which would NPE inside {@link #pruneByLimit}.
105+
*/
106+
private final java.util.Map<Integer, Long> precomputedFragmentRowCounts;
107+
92108
/** Number of partitions after pruning, set during {@link #planInputPartitions()}. */
93109
private transient int numPartitions = -1;
94110

@@ -121,6 +137,8 @@ public LanceScan(
121137
LanceStatistics statistics,
122138
java.util.Map<String, List<ZoneStats>> zonemapStats,
123139
Set<Integer> survivingFragmentIds,
140+
List<LanceSplit> precomputedSplits,
141+
java.util.Map<Integer, Long> precomputedFragmentRowCounts,
124142
Expression activeShardingExpression,
125143
java.util.Map<Integer, Object> fragmentShardingKeys,
126144
java.util.Map<String, String> initialStorageOptions,
@@ -140,6 +158,11 @@ public LanceScan(
140158
this.statistics = statistics;
141159
this.zonemapStats = zonemapStats != null ? zonemapStats : Collections.emptyMap();
142160
this.cachedSurvivingFragmentIds = survivingFragmentIds;
161+
this.precomputedSplits = precomputedSplits;
162+
this.precomputedFragmentRowCounts =
163+
precomputedFragmentRowCounts != null
164+
? precomputedFragmentRowCounts
165+
: Collections.emptyMap();
143166
this.activeShardingExpression = activeShardingExpression;
144167
this.fragmentShardingKeys = fragmentShardingKeys;
145168
this.initialStorageOptions = initialStorageOptions;
@@ -154,8 +177,10 @@ public Batch toBatch() {
154177

155178
@Override
156179
public InputPartition[] planInputPartitions() {
157-
LanceSplit.ScanPlanResult planResult = LanceSplit.planScan(readOptions);
158-
List<LanceSplit> prunedSplits = pruneByRowAddrFilters(planResult.getSplits());
180+
// Splits and per-fragment row counts are pre-computed on the driver during
181+
// LanceScanBuilder.build() from the same Dataset handle that loaded manifest /
182+
// schema / zonemap stats. This avoids a second Dataset.open() at plan time.
183+
List<LanceSplit> prunedSplits = pruneByRowAddrFilters(precomputedSplits);
159184

160185
// Zonemap-based fragment pruning: uses per-column min/max/null_count
161186
// statistics to eliminate fragments that provably cannot match
@@ -166,15 +191,13 @@ public InputPartition[] planInputPartitions() {
166191
// use per-fragment row counts to plan only enough splits to satisfy the limit.
167192
// This avoids scheduling hundreds of unnecessary tasks. Correctness is guaranteed
168193
// because Spark still keeps a global CollectLimit on top (isPartiallyPushed = true).
169-
prunedSplits = pruneByLimit(prunedSplits, planResult.getFragmentRowCounts());
194+
prunedSplits = pruneByLimit(prunedSplits, precomputedFragmentRowCounts);
170195

171196
// Capture as effectively final for use in lambda
172197
final List<LanceSplit> finalSplits = prunedSplits;
173198

174-
// Use resolved version for snapshot isolation - ensures all workers read the same version
175-
LanceSparkReadOptions resolvedReadOptions =
176-
readOptions.withVersion((int) planResult.getResolvedVersion());
177-
199+
// readOptions is already pinned to the resolved version by LanceScanBuilder for
200+
// snapshot isolation across all workers.
178201
InputPartition[] result =
179202
IntStream.range(0, finalSplits.size())
180203
.mapToObj(
@@ -192,7 +215,7 @@ public InputPartition[] planInputPartitions() {
192215
schema,
193216
i,
194217
split,
195-
resolvedReadOptions,
218+
readOptions,
196219
whereConditions,
197220
limit,
198221
offset,

lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java

Lines changed: 116 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -147,113 +147,129 @@ private void closeLazyDataset() {
147147

148148
@Override
149149
public Scan build() {
150-
// Return LocalScan if we have a metadata-only aggregation result
151-
if (localScan != null) {
152-
closeLazyDataset();
153-
return localScan;
154-
}
150+
// Wrap the entire planning body in try/finally to guarantee that the lazily-opened native
151+
// dataset handle (lazyDataset) is always released, including when intermediate steps such as
152+
// zonemap loading or LanceSplit.planScan(dataset) throw.
153+
try {
154+
// Return LocalScan if we have a metadata-only aggregation result
155+
if (localScan != null) {
156+
return localScan;
157+
}
155158

156-
// Get statistics from manifest summary before closing dataset
157-
ManifestSummary summary = getOrOpenDataset().getVersion().getManifestSummary();
158-
159-
// Collect all columns that need zonemap stats: filter columns + sharding columns.
160-
Set<String> columnsToLoad = extractReferencedColumns(pushedPredicates);
161-
Dataset dataset = getOrOpenDataset();
162-
LanceSchema lanceSchema = dataset.getLanceSchema();
163-
ShardingSpec activeShardingSpec =
164-
SparkLanceShardingUtils.isEmpty(shardingSpec)
165-
? SparkLanceShardingUtils.firstShardingSpec(dataset)
166-
: shardingSpec;
167-
for (ShardingField field : SparkLanceShardingUtils.fields(activeShardingSpec)) {
168-
columnsToLoad.add(SparkLanceShardingUtils.columnName(field, lanceSchema));
169-
}
159+
// Get statistics from manifest summary before closing dataset
160+
ManifestSummary summary = getOrOpenDataset().getVersion().getManifestSummary();
161+
162+
// Collect all columns that need zonemap stats: filter columns + sharding columns.
163+
Set<String> columnsToLoad = extractReferencedColumns(pushedPredicates);
164+
Dataset dataset = getOrOpenDataset();
165+
LanceSchema lanceSchema = dataset.getLanceSchema();
166+
ShardingSpec activeShardingSpec =
167+
SparkLanceShardingUtils.isEmpty(shardingSpec)
168+
? SparkLanceShardingUtils.firstShardingSpec(dataset)
169+
: shardingSpec;
170+
for (ShardingField field : SparkLanceShardingUtils.fields(activeShardingSpec)) {
171+
columnsToLoad.add(SparkLanceShardingUtils.columnName(field, lanceSchema));
172+
}
170173

171-
// Load zonemap stats for all requested columns in one pass.
172-
Map<String, List<ZoneStats>> zonemapStats = loadZonemapStats(getOrOpenDataset(), columnsToLoad);
173-
174-
// Detect sharding-compatible fragments from zonemap stats. Each field checks its column's
175-
// zones; if every fragment has a single sharding value, we get a fragment-to-key map.
176-
Map<Integer, Object> fragmentShardingKeys = null;
177-
Expression activeShardingExpression = null;
178-
for (ShardingField field : SparkLanceShardingUtils.fields(activeShardingSpec)) {
179-
String column = SparkLanceShardingUtils.columnName(field, lanceSchema);
180-
List<ZoneStats> colStats = zonemapStats.get(column);
181-
if (colStats == null || colStats.isEmpty()) {
182-
LOG.warn(
183-
"Sharding column '{}' (transform={}) has no zonemap stats; sharding detection disabled",
184-
column,
185-
field.transform().orElse(null));
186-
continue;
174+
// Load zonemap stats for all requested columns in one pass.
175+
Map<String, List<ZoneStats>> zonemapStats =
176+
loadZonemapStats(getOrOpenDataset(), columnsToLoad);
177+
178+
// Detect sharding-compatible fragments from zonemap stats. Each field checks its column's
179+
// zones; if every fragment has a single sharding value, we get a fragment-to-key map.
180+
Map<Integer, Object> fragmentShardingKeys = null;
181+
Expression activeShardingExpression = null;
182+
for (ShardingField field : SparkLanceShardingUtils.fields(activeShardingSpec)) {
183+
String column = SparkLanceShardingUtils.columnName(field, lanceSchema);
184+
List<ZoneStats> colStats = zonemapStats.get(column);
185+
if (colStats == null || colStats.isEmpty()) {
186+
LOG.warn(
187+
"Sharding column '{}' (transform={}) has no zonemap stats;"
188+
+ " sharding detection disabled",
189+
column,
190+
field.transform().orElse(null));
191+
continue;
192+
}
193+
java.util.Optional<Map<Integer, Object>> keys =
194+
SparkLanceShardingUtils.detectFragmentKeys(field, lanceSchema, colStats);
195+
if (keys.isPresent()) {
196+
fragmentShardingKeys = keys.get();
197+
activeShardingExpression = SparkLanceShardingUtils.toSparkExpression(field, lanceSchema);
198+
LOG.info(
199+
"Detected Lance sharding field {}('{}') with {} fragments",
200+
field.transform().orElse(null),
201+
column,
202+
fragmentShardingKeys.size());
203+
break;
204+
}
187205
}
188-
java.util.Optional<Map<Integer, Object>> keys =
189-
SparkLanceShardingUtils.detectFragmentKeys(field, lanceSchema, colStats);
190-
if (keys.isPresent()) {
191-
fragmentShardingKeys = keys.get();
192-
activeShardingExpression = SparkLanceShardingUtils.toSparkExpression(field, lanceSchema);
193-
LOG.info(
194-
"Detected Lance sharding field {}('{}') with {} fragments",
195-
field.transform().orElse(null),
196-
column,
197-
fragmentShardingKeys.size());
198-
break;
206+
207+
// Pre-compute fragment pruning so we can (a) estimate post-pruning statistics for
208+
// JoinSelection (BroadcastHashJoin vs SortMergeJoin) and (b) pass the cached result
209+
// to LanceScan to avoid re-computing during planInputPartitions().
210+
Set<Integer> survivingFragmentIds = null;
211+
if (pushedPredicates.length > 0 && !zonemapStats.isEmpty()) {
212+
survivingFragmentIds =
213+
ZonemapFragmentPruner.pruneFragments(pushedPredicates, zonemapStats).orElse(null);
199214
}
200-
}
201215

202-
// Pre-compute fragment pruning so we can (a) estimate post-pruning statistics for
203-
// JoinSelection (BroadcastHashJoin vs SortMergeJoin) and (b) pass the cached result
204-
// to LanceScan to avoid re-computing during planInputPartitions().
205-
Set<Integer> survivingFragmentIds = null;
206-
if (pushedPredicates.length > 0 && !zonemapStats.isEmpty()) {
207-
survivingFragmentIds =
208-
ZonemapFragmentPruner.pruneFragments(pushedPredicates, zonemapStats).orElse(null);
209-
}
216+
// Scale rows and full size by the zonemap fragment-pruning ratio first, then let
217+
// LanceStatistics.estimateProjected apply the column-width ratio on top
218+
// (when the projected schema is narrower than the full schema).
219+
long projectedRows = summary.getTotalRows();
220+
long projectedFullSize = summary.getTotalFilesSize();
221+
if (survivingFragmentIds != null && summary.getTotalFragments() > 0) {
222+
double ratio = (double) survivingFragmentIds.size() / summary.getTotalFragments();
223+
projectedRows = (long) (projectedRows * ratio);
224+
projectedFullSize = (long) (projectedFullSize * ratio);
225+
}
226+
LanceStatistics statistics =
227+
LanceStatistics.estimateProjected(projectedRows, projectedFullSize, fullSchema, schema);
228+
if (survivingFragmentIds != null) {
229+
LOG.debug(
230+
"Scan statistics after pruning: {} of {} fragments survive,"
231+
+ " estimatedSize={}, estimatedRows={} (full: size={}, rows={})",
232+
survivingFragmentIds.size(),
233+
summary.getTotalFragments(),
234+
statistics.sizeInBytes(),
235+
statistics.numRows(),
236+
summary.getTotalFilesSize(),
237+
summary.getTotalRows());
238+
}
210239

211-
// Scale rows and full size by the zonemap fragment-pruning ratio first, then let
212-
// LanceStatistics.estimateProjected apply the column-width ratio on top
213-
// (when the projected schema is narrower than the full schema).
214-
long projectedRows = summary.getTotalRows();
215-
long projectedFullSize = summary.getTotalFilesSize();
216-
if (survivingFragmentIds != null && summary.getTotalFragments() > 0) {
217-
double ratio = (double) survivingFragmentIds.size() / summary.getTotalFragments();
218-
projectedRows = (long) (projectedRows * ratio);
219-
projectedFullSize = (long) (projectedFullSize * ratio);
220-
}
221-
LanceStatistics statistics =
222-
LanceStatistics.estimateProjected(projectedRows, projectedFullSize, fullSchema, schema);
223-
if (survivingFragmentIds != null) {
224-
LOG.debug(
225-
"Scan statistics after pruning: {} of {} fragments survive,"
226-
+ " estimatedSize={}, estimatedRows={} (full: size={}, rows={})",
227-
survivingFragmentIds.size(),
228-
summary.getTotalFragments(),
229-
statistics.sizeInBytes(),
230-
statistics.numRows(),
231-
summary.getTotalFilesSize(),
232-
summary.getTotalRows());
240+
// Pre-compute splits and per-fragment row counts from the same Dataset handle that we
241+
// already opened above. This consolidates two driver-side opens into one and lets us pin
242+
// the resolved version onto the read options shipped to workers, providing snapshot
243+
// isolation across all tasks of this query. The version is kept as a long end-to-end so
244+
// long-lived high-write-frequency datasets do not silently truncate to a wrong version.
245+
LanceSplit.ScanPlanResult scanPlan = LanceSplit.planScan(dataset);
246+
LanceSparkReadOptions resolvedReadOptions =
247+
readOptions.withVersion(scanPlan.getResolvedVersion());
248+
249+
Optional<String> whereCondition =
250+
FilterPushDown.compileFiltersToSqlWhereClause(pushedPredicates);
251+
return new LanceScan(
252+
schema,
253+
resolvedReadOptions,
254+
whereCondition,
255+
limit,
256+
offset,
257+
topNSortOrders,
258+
pushedAggregation,
259+
pushedPredicates,
260+
statistics,
261+
zonemapStats,
262+
survivingFragmentIds,
263+
scanPlan.getSplits(),
264+
scanPlan.getFragmentRowCounts(),
265+
activeShardingExpression,
266+
fragmentShardingKeys,
267+
initialStorageOptions,
268+
namespaceImpl,
269+
namespaceProperties);
270+
} finally {
271+
closeLazyDataset();
233272
}
234-
235-
// Close the lazily opened dataset - it's no longer needed after build
236-
closeLazyDataset();
237-
238-
Optional<String> whereCondition =
239-
FilterPushDown.compileFiltersToSqlWhereClause(pushedPredicates);
240-
return new LanceScan(
241-
schema,
242-
readOptions,
243-
whereCondition,
244-
limit,
245-
offset,
246-
topNSortOrders,
247-
pushedAggregation,
248-
pushedPredicates,
249-
statistics,
250-
zonemapStats,
251-
survivingFragmentIds,
252-
activeShardingExpression,
253-
fragmentShardingKeys,
254-
initialStorageOptions,
255-
namespaceImpl,
256-
namespaceProperties);
257273
}
258274

259275
@Override

0 commit comments

Comments
 (0)