[SPARK-56346][SQL] Use PartitionPredicate in DSV2 Metadata Only Delete#55179
[SPARK-56346][SQL] Use PartitionPredicate in DSV2 Metadata Only Delete#55179szehon-ho wants to merge 3 commits intoapache:masterfrom
Conversation
cb0ff92 to
33d100e
Compare
| * Evaluates a single V2 predicate by resolving column values through the | ||
| * given function. Supports =, <=>, IS_NULL, IS_NOT_NULL, and ALWAYS_TRUE. | ||
| */ | ||
| def evalPredicate( |
There was a problem hiding this comment.
just refactor for re-use in new test InMemoryTable
| } | ||
|
|
||
| /** | ||
| * Separates partition filters from data filters and converts pushable partition |
There was a problem hiding this comment.
again, refactor for re-use in OptimizeMetadataOnlyDeleteQuery
| * Returns a map from flattened expression to original. | ||
| */ | ||
| private def normalizeNestedPartitionFilters( | ||
| private[v2] def flattenNestedPartitionFilters( |
There was a problem hiding this comment.
rename, because 'normalize' is already used in OptimizeMetadataOnlyDelete
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
Prior state and problem: OptimizeMetadataOnlyDeleteFromTable could only perform metadata-only deletes when all filter expressions translated to standard V2 predicates (e.g., =, <=>, IS_NULL). Filters like IN, STARTS_WITH, or UDFs on partition columns caused a fallback to expensive row-level operations even though the table might accept PartitionPredicates.
Design approach: Add a second-pass fallback in the delete optimization rule that mirrors the existing two-pass approach in PushDownUtils.pushPartitionPredicates for scan filter pushdown. When the first pass (V2 translation) fails or is rejected, the second pass:
- Separates filters into partition-column and data-column categories
- Converts partition filters to
PartitionPredicates viaPartitionPredicateImpl - Translates remaining data filters to standard V2 predicates
- Combines both and calls
table.canDeleteWhere
Key design decisions:
- No
supportsIterativePushdowngate for the delete path (the scan path has one). This is intentional —canDeleteWherealready serves as the acceptance gate, and thesupportsIterativePushdownopt-in is specific toScanBuilder. - All-or-nothing semantics: if any remaining data filter can't translate to V2, the entire second pass fails and falls back to row-level. This differs from the scan path (which returns remaining filters for post-scan evaluation) because metadata-only deletes require complete filter acceptance.
Implementation sketch: OptimizeMetadataOnlyDeleteFromTable.apply → first tries tryTranslateToV2 (standard V2 path), on failure → tryDeleteWithPartitionPredicates (second pass via shared PushDownUtils.createPartitionPredicates and flattenNestedPartitionFilters), on failure → row-level plan. The shared methods were extracted from the existing pushPartitionPredicates and made package-private for reuse.
General comments
- The
logDebugmessage on the original first-pass success path was removed. With three possible outcomes now (first-pass V2, second-pass partition predicates, row-level fallback), addinglogDebugfor each path would help with debugging filter pushdown behavior.
| } | ||
| if (fields.length == transforms.length) { | ||
| Some(fields.toSeq) | ||
| Some(fields.toSeq).filter(_.nonEmpty) |
There was a problem hiding this comment.
This .filter(_.nonEmpty) guard is redundant: the outer check at line 139 guarantees transforms.nonEmpty, and fields.length == transforms.length at line 151 ensures fields is non-empty.
| Some(fields.toSeq).filter(_.nonEmpty) | |
| Some(fields.toSeq) |
| candidateKeys | ||
| } | ||
|
|
||
| // Handle data predicates (simulate data source with data column statistics) |
There was a problem hiding this comment.
The comment says "data column statistics" but the code evaluates predicates row-by-row, not via statistics.
| // Handle data predicates (simulate data source with data column statistics) | |
| // Handle data predicates (simulate a data source applying row-level data filters) |
There was a problem hiding this comment.
Hm yea I thought about this too. I think using 'simulate a data source with data column statistics' is more accurate. The test table goes row-by-row, but its a simulation.
This is for the use-case where a data column predicate can still be completely handled by the data source. For example, iceberg min-max statistics.
delete from table t where x < 10, and all Iceberg data files have min value less than 10.
Of course the InMemoryPartitionPredicateDeleteTable is not structured that way, so its a 'simulation'. It'd be a bit overkill to implement min/max stats to make it a more accurate simulation? Maybe in a follow up.
By the way, this pushdown is not about PartitionPredicate, but the existing case, its just I wanted to add a test that mixes the two.
|
LGTM if CI is green |
|
@szehon-ho , can you please update the PR description to follow the template? |
peter-toth
left a comment
There was a problem hiding this comment.
Just a nit and CI seems to fail with a linter issue, but LGTM.
|
|
||
| /** Translates all expressions to V2 filters, or returns [[None]] if any fail. */ | ||
| private def tryTranslateToV2(predicates: Seq[Expression]): Option[Array[Predicate]] = { | ||
| val filters = toDataSourceV2Filters(predicates) |
There was a problem hiding this comment.
Nit: this seems to be only callsite of toDataSourceV2Filters() so you can probably combine them.
|
Done, thanks! |
04456d0 to
379b162
Compare
When `OptimizeMetadataOnlyDeleteFromTable` fails to push standard V2 predicates for a metadata-only delete, it now falls back to a second pass that converts partition-column filters to `PartitionPredicate`s (SPARK-55596) and combines them with translated V2 data filters.
- Remove redundant .filter(_.nonEmpty) guard in PushDownUtils - Fix misleading comment in InMemoryPartitionPredicateDeleteTable - Add logDebug for each delete optimization outcome path
379b162 to
75e1f51
Compare
|
thanks, merging to master! |
What changes were proposed in this pull request?
When
OptimizeMetadataOnlyDeleteFromTablefails to translate all delete predicates to standard V2 filters, it now falls back to a second pass that converts partition-column filters toPartitionPredicates (reusing the SPARK-55596 infrastructure), translates any remaining data-column filters to standard V2 predicates, and combines them fortable.canDeleteWhere. This mirrors the two-pass approach already used for scan filter pushdown inPushDownUtils.pushPartitionPredicates.Why are the changes needed?
Currently,
OptimizeMetadataOnlyDeleteFromTableonly attempts to translate all delete predicates to standard V2 filters. If any predicate cannot be translated (e.g. complex expressions on partition columns), the optimization falls back to an expensive row-level delete even though the table could accept the predicates asPartitionPredicates. This change enables the metadata-only delete path in more cases by leveraging thePartitionPredicateinfrastructure introduced in SPARK-55596.Does this PR introduce any user-facing change?
No.
How was this patch tested?
New test suite
DataSourceV2EnhancedDeleteFilterSuitewith 9 test cases covering: first-pass accept, second-pass accept/reject, mixed partition+data filters, UDF on non-contiguous partition columns, multiple PartitionPredicates, and row-level fallback. Existing suites verified for no regressions:DataSourceV2EnhancedPartitionFilterSuite,GroupBasedDeleteFromTableSuite.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Cursor agent mode)