Summary
Several hot paths eagerly materialize iterator outputs using list(...) (or equivalent full consumption patterns), increasing peak memory and reducing streaming behavior. This issue proposes targeted refactors to preserve iterator semantics where possible and reduce intermediate allocations.
All links below are immutable permalinks pinned to commit:
7425bc4657cd0e1f8a3003cc2f6493300e0b1d60
Problem areas and proposed improvements
1) ArrowScan task-level batch materialization
- Current code:
- Why it matters:
- Materializes all batches for each task before yielding to caller.
- Can cause high peak memory for large files/tasks.
- Proposed improvement:
- Replace per-task
list(...) aggregation with bounded producer/consumer streaming from worker threads (queue-based handoff), yielding batches as produced.
- Preserve existing limit semantics and cancellation behavior.
2) Manifest list read materialization
- Current code:
- Why it matters:
- Unnecessary intermediate list before cache processing.
- Proposed improvement:
- Iterate
read_manifest_list(file) directly in the cache loop under lock.
- Remove
manifest_files temporary list.
3) Append path: _dataframe_to_data_files eagerly materialized
- Current code:
- Why it matters:
- Builds full list of
DataFile before appending.
- Proposed improvement:
- Stream directly:
for data_file in _dataframe_to_data_files(...): append_files.append_data_file(data_file)
- No behavior change expected.
4) Dynamic partition overwrite: _dataframe_to_data_files eager list
- Current code:
- Why it matters:
- Full materialization can be large.
- Constraints:
- Current flow needs all partitions before delete, then appends with same generated files after delete.
- Proposed improvement (incremental):
- Keep current correctness but explore two-phase low-memory approach:
- Persist minimal metadata/state needed for partition predicate and post-delete append, instead of full in-memory list.
- Or spill
DataFile descriptors to temporary structure for replay after delete.
5) Delete rewrite path: _dataframe_to_data_files eager list
- Current code:
- Why it matters:
- Builds replacement file lists eagerly, then accumulates in
replaced_files.
- Proposed improvement:
- Stream rewritten files into overwrite stage incrementally where possible.
- Reduce or eliminate large
replaced_files intermediate accumulation.
Equivalent eager-consumption improvements identified
6) Ancestor membership check materialization
- Current code:
- Proposed improvement:
- Use
any(...) against ancestors_of(...) instead of building a full ancestor set for single membership tests.
- Benefit: lower transient memory, early exit.
7) latest_ancestor_before_timestamp full scan when early exit is possible
- Current code:
- Proposed improvement:
- Since
ancestors_of(...) traverses from newest to oldest, return first ancestor matching ancestor.timestamp_ms < timestamp_ms.
- Benefit: lower CPU for long histories.
8) _validation_history temporary list allocation in extend([...])
- Current code:
- Proposed improvement:
- Replace list comprehension inside
extend(...) with generator expression.
- Benefit: avoids temporary list allocation.
9) chain.from_iterable(scan.scan_plan_helper()) readability and streaming clarity
- Current code:
- Proposed improvement:
- Optional refactor to explicit nested loops for clearer streaming behavior (memory profile is similar).
10) InspectTable.history minor optimization
- Current code:
- Proposed improvement:
- Keep ancestor ID set (good for O(1) lookups), but precompute
snapshot_id -> snapshot map once to avoid repeated lookup overhead.
Expected impact
- Lower peak memory in scan/write-heavy workloads.
- Better streaming behavior and earlier first-result availability in key paths.
- Reduced temporary allocations in history/validation helpers.
Suggested implementation plan
- Low-risk changes first:
- Append path streaming in
Transaction.append.
manifest.py direct iteration without temporary list.
validate.py generator expression.
snapshots.py early return for latest_ancestor_before_timestamp.
- Medium-risk:
- Delete rewrite path incremental appends.
- Higher-risk/high-impact:
- Queue-based threaded streaming for
ArrowScan.to_record_batches replacing per-task list(...) materialization.
Acceptance criteria
- No regression in existing tests.
- Memory benchmarks show reduced peak RSS for large scans/writes.
to_record_batches preserves row ordering and limit behavior.
- Snapshot rollback/validation semantics unchanged.
Summary
Several hot paths eagerly materialize iterator outputs using
list(...)(or equivalent full consumption patterns), increasing peak memory and reducing streaming behavior. This issue proposes targeted refactors to preserve iterator semantics where possible and reduce intermediate allocations.All links below are immutable permalinks pinned to commit:
7425bc4657cd0e1f8a3003cc2f6493300e0b1d60Problem areas and proposed improvements
1)
ArrowScantask-level batch materializationreturn list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))list(...)aggregation with bounded producer/consumer streaming from worker threads (queue-based handoff), yielding batches as produced.2) Manifest list read materialization
manifest_files = list(read_manifest_list(file))read_manifest_list(file)directly in the cache loop under lock.manifest_filestemporary list.3) Append path:
_dataframe_to_data_fileseagerly materializedDataFilebefore appending.for data_file in _dataframe_to_data_files(...): append_files.append_data_file(data_file)4) Dynamic partition overwrite:
_dataframe_to_data_fileseager listDataFiledescriptors to temporary structure for replay after delete.5) Delete rewrite path:
_dataframe_to_data_fileseager listreplaced_files.replaced_filesintermediate accumulation.Equivalent eager-consumption improvements identified
6) Ancestor membership check materialization
any(...)againstancestors_of(...)instead of building a full ancestorsetfor single membership tests.7)
latest_ancestor_before_timestampfull scan when early exit is possibleancestors_of(...)traverses from newest to oldest, return first ancestor matchingancestor.timestamp_ms < timestamp_ms.8)
_validation_historytemporary list allocation inextend([...])extend(...)with generator expression.9)
chain.from_iterable(scan.scan_plan_helper())readability and streaming clarity10)
InspectTable.historyminor optimizationsnapshot_id -> snapshotmap once to avoid repeated lookup overhead.Expected impact
Suggested implementation plan
Transaction.append.manifest.pydirect iteration without temporary list.validate.pygenerator expression.snapshots.pyearly return forlatest_ancestor_before_timestamp.ArrowScan.to_record_batchesreplacing per-tasklist(...)materialization.Acceptance criteria
to_record_batchespreserves row ordering andlimitbehavior.