Skip to content

[SPARK-56346][SQL] Use PartitionPredicate in DSV2 Metadata Only Delete#55179

Closed
szehon-ho wants to merge 3 commits intoapache:masterfrom
szehon-ho:delete_partition_filter
Closed

[SPARK-56346][SQL] Use PartitionPredicate in DSV2 Metadata Only Delete#55179
szehon-ho wants to merge 3 commits intoapache:masterfrom
szehon-ho:delete_partition_filter

Conversation

@szehon-ho
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho commented Apr 2, 2026

What changes were proposed in this pull request?

When OptimizeMetadataOnlyDeleteFromTable fails to translate all delete predicates to standard V2 filters, it now falls back to a second pass that converts partition-column filters to PartitionPredicates (reusing the SPARK-55596 infrastructure), translates any remaining data-column filters to standard V2 predicates, and combines them for table.canDeleteWhere. This mirrors the two-pass approach already used for scan filter pushdown in PushDownUtils.pushPartitionPredicates.

Why are the changes needed?

Currently, OptimizeMetadataOnlyDeleteFromTable only attempts to translate all delete predicates to standard V2 filters. If any predicate cannot be translated (e.g. complex expressions on partition columns), the optimization falls back to an expensive row-level delete even though the table could accept the predicates as PartitionPredicates. This change enables the metadata-only delete path in more cases by leveraging the PartitionPredicate infrastructure introduced in SPARK-55596.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New test suite DataSourceV2EnhancedDeleteFilterSuite with 9 test cases covering: first-pass accept, second-pass accept/reject, mixed partition+data filters, UDF on non-contiguous partition columns, multiple PartitionPredicates, and row-level fallback. Existing suites verified for no regressions: DataSourceV2EnhancedPartitionFilterSuite, GroupBasedDeleteFromTableSuite.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Cursor agent mode)

@szehon-ho szehon-ho force-pushed the delete_partition_filter branch from cb0ff92 to 33d100e Compare April 3, 2026 00:05
* Evaluates a single V2 predicate by resolving column values through the
* given function. Supports =, <=>, IS_NULL, IS_NOT_NULL, and ALWAYS_TRUE.
*/
def evalPredicate(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just refactor for re-use in new test InMemoryTable

}

/**
* Separates partition filters from data filters and converts pushable partition
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, refactor for re-use in OptimizeMetadataOnlyDeleteQuery

* Returns a map from flattened expression to original.
*/
private def normalizeNestedPartitionFilters(
private[v2] def flattenNestedPartitionFilters(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename, because 'normalize' is already used in OptimizeMetadataOnlyDelete

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Prior state and problem: OptimizeMetadataOnlyDeleteFromTable could only perform metadata-only deletes when all filter expressions translated to standard V2 predicates (e.g., =, <=>, IS_NULL). Filters like IN, STARTS_WITH, or UDFs on partition columns caused a fallback to expensive row-level operations even though the table might accept PartitionPredicates.

Design approach: Add a second-pass fallback in the delete optimization rule that mirrors the existing two-pass approach in PushDownUtils.pushPartitionPredicates for scan filter pushdown. When the first pass (V2 translation) fails or is rejected, the second pass:

  1. Separates filters into partition-column and data-column categories
  2. Converts partition filters to PartitionPredicates via PartitionPredicateImpl
  3. Translates remaining data filters to standard V2 predicates
  4. Combines both and calls table.canDeleteWhere

Key design decisions:

  • No supportsIterativePushdown gate for the delete path (the scan path has one). This is intentional — canDeleteWhere already serves as the acceptance gate, and the supportsIterativePushdown opt-in is specific to ScanBuilder.
  • All-or-nothing semantics: if any remaining data filter can't translate to V2, the entire second pass fails and falls back to row-level. This differs from the scan path (which returns remaining filters for post-scan evaluation) because metadata-only deletes require complete filter acceptance.

Implementation sketch: OptimizeMetadataOnlyDeleteFromTable.apply → first tries tryTranslateToV2 (standard V2 path), on failure → tryDeleteWithPartitionPredicates (second pass via shared PushDownUtils.createPartitionPredicates and flattenNestedPartitionFilters), on failure → row-level plan. The shared methods were extracted from the existing pushPartitionPredicates and made package-private for reuse.

General comments

  • The logDebug message on the original first-pass success path was removed. With three possible outcomes now (first-pass V2, second-pass partition predicates, row-level fallback), adding logDebug for each path would help with debugging filter pushdown behavior.

}
if (fields.length == transforms.length) {
Some(fields.toSeq)
Some(fields.toSeq).filter(_.nonEmpty)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This .filter(_.nonEmpty) guard is redundant: the outer check at line 139 guarantees transforms.nonEmpty, and fields.length == transforms.length at line 151 ensures fields is non-empty.

Suggested change
Some(fields.toSeq).filter(_.nonEmpty)
Some(fields.toSeq)

candidateKeys
}

// Handle data predicates (simulate data source with data column statistics)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "data column statistics" but the code evaluates predicates row-by-row, not via statistics.

Suggested change
// Handle data predicates (simulate data source with data column statistics)
// Handle data predicates (simulate a data source applying row-level data filters)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm yea I thought about this too. I think using 'simulate a data source with data column statistics' is more accurate. The test table goes row-by-row, but its a simulation.

This is for the use-case where a data column predicate can still be completely handled by the data source. For example, iceberg min-max statistics.
delete from table t where x < 10, and all Iceberg data files have min value less than 10.

Of course the InMemoryPartitionPredicateDeleteTable is not structured that way, so its a 'simulation'. It'd be a bit overkill to implement min/max stats to make it a more accurate simulation? Maybe in a follow up.

By the way, this pushdown is not about PartitionPredicate, but the existing case, its just I wanted to add a test that mixes the two.

@cloud-fan
Copy link
Copy Markdown
Contributor

LGTM if CI is green

@peter-toth
Copy link
Copy Markdown
Contributor

@szehon-ho , can you please update the PR description to follow the template?

Copy link
Copy Markdown
Contributor

@peter-toth peter-toth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit and CI seems to fail with a linter issue, but LGTM.


/** Translates all expressions to V2 filters, or returns [[None]] if any fail. */
private def tryTranslateToV2(predicates: Seq[Expression]): Option[Array[Predicate]] = {
val filters = toDataSourceV2Filters(predicates)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this seems to be only callsite of toDataSourceV2Filters() so you can probably combine them.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks

@szehon-ho
Copy link
Copy Markdown
Member Author

Done, thanks!

When `OptimizeMetadataOnlyDeleteFromTable` fails to push standard V2 predicates for a metadata-only delete, it now falls back to a second pass that converts partition-column filters to `PartitionPredicate`s (SPARK-55596) and combines them with translated V2 data filters.
- Remove redundant .filter(_.nonEmpty) guard in PushDownUtils
- Fix misleading comment in InMemoryPartitionPredicateDeleteTable
- Add logDebug for each delete optimization outcome path
@szehon-ho szehon-ho force-pushed the delete_partition_filter branch from 379b162 to 75e1f51 Compare April 8, 2026 00:58
@cloud-fan
Copy link
Copy Markdown
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 590b0d5 Apr 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants