Apache Iceberg version
0.11.0
Please describe the bug 🐞
CC @goutamvenkat-anyscale @koenvo @Fokko
Hello! We are implementing distributed writes from Ray Data to Iceberg. As part of upserts, we:
- Write data files in parallel across Ray workers (each worker writes its share of Parquet files directly to storage and returns
DataFile metadata + the upsert key columns back to the driver)
- On the driver, concatenate all upsert keys collected from workers, call
create_match_filter to build a delete predicate, then call txn.delete() followed by an append to commit
Upserting 1M rows (383 MiB) into an Iceberg table takes ~17.5 minutes, almost entirely in the delete step:
create_match_filter (1M keys → In filter): 10.26s
txn.delete(): 1054.35s
append + commit: 1.14s
─────────────────────────────────────────────────────
Total upsert commit: 1065.75s
PyIceberg version 0.11.0
This matches what's reported in #2159 and #2138.
The bottlenecks are:
create_match_filter — constructs a Python BooleanExpression node per row, which is expensive at 1M+ keys
txn.delete() — evaluates the resulting giant In expression against the table's data files with no partition pruning, effectively doing a full table scan
We have a few questions:
- Merge-on-read upserts — is this on the roadmap, and if so, roughly when? MoR would let us avoid the expensive delete + rewrite cycle entirely for large upserts.
- Optimizing
create_match_filter or txn.delete() — is there a recommended way to speed these up today? For example, batching the In filter, or passing a partition-level hint to constrain the file scan?
- Partition-aware deletes — if the upsert key columns overlap with partition columns, is there a supported way to restrict
txn.delete() to only the relevant partitions, rather than scanning the full table?
Related
Willingness to contribute
Apache Iceberg version
0.11.0
Please describe the bug 🐞
CC @goutamvenkat-anyscale @koenvo @Fokko
Hello! We are implementing distributed writes from Ray Data to Iceberg. As part of upserts, we:
DataFilemetadata + the upsert key columns back to the driver)create_match_filterto build a delete predicate, then calltxn.delete()followed by an append to commitUpserting 1M rows (383 MiB) into an Iceberg table takes ~17.5 minutes, almost entirely in the delete step:
PyIceberg version
0.11.0This matches what's reported in #2159 and #2138.
The bottlenecks are:
create_match_filter— constructs a PythonBooleanExpressionnode per row, which is expensive at 1M+ keystxn.delete()— evaluates the resulting giantInexpression against the table's data files with no partition pruning, effectively doing a full table scanWe have a few questions:
create_match_filterortxn.delete()— is there a recommended way to speed these up today? For example, batching theInfilter, or passing a partition-level hint to constrain the file scan?txn.delete()to only the relevant partitions, rather than scanning the full table?Related
Willingness to contribute