Skip to content

Commit 6f66c4c

Browse files
schenksjclaude
andcommitted
feat(phase-5b): Dynamic Partition Pruning support for Delta
Delta scans with DPP now go through Comet's native path instead of falling back to vanilla Spark. Changes: 1. CometScanRule: moved Delta detection BEFORE the DPP fallback check so DPP-bearing Delta scans reach nativeDeltaScan instead of bailing out. Non-Delta scans still fall back for DPP as before. 2. prunePartitions: filters out DynamicPruningExpression (wrapping InSubqueryExec) before building the InterpretedPredicate. These expressions aren't resolved at planning time; Spark applies them post-scan at runtime. Static partition filters are still evaluated for file-level pruning at planning time. 3. New test: "dynamic partition pruning through join" - star-schema join (fact partitioned by region + small dim table) that exercises DPP. Verifies CometDeltaNativeScanExec appears in the plan and results match vanilla row-for-row. Results: Tests: succeeded 35, failed 0, canceled 0, ignored 0, pending 0 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2d2c99f commit 6f66c4c

3 files changed

Lines changed: 77 additions & 10 deletions

File tree

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -246,20 +246,22 @@ case class CometScanRule(session: SparkSession)
246246

247247
private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec): SparkPlan = {
248248

249-
if (COMET_DPP_FALLBACK_ENABLED.get() &&
250-
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
251-
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
252-
}
253-
254249
scanExec.relation match {
255250
case r: HadoopFsRelation =>
256-
// Delta Lake (V1 path) — detect before the `isFileFormatSupported` check, which
257-
// only accepts the exact `ParquetFileFormat` class and otherwise rejects Delta's
258-
// `DeltaParquetFileFormat` subclass.
251+
// Delta Lake (V1 path): detect BEFORE the DPP fallback check below,
252+
// because Delta's native path handles DPP through partition pruning
253+
// at execution time (DPP expressions are filtered out of the
254+
// planning-time InterpretedPredicate and applied by Spark post-scan).
259255
if (DeltaReflection.isDeltaFileFormat(r.fileFormat)) {
260256
return nativeDeltaScan(session, scanExec, r, hadoopConfOrNull = null)
261257
.getOrElse(scanExec)
262258
}
259+
// DPP fallback for non-Delta scans (DataFusion/Iceberg-compat paths
260+
// don't support DPP natively).
261+
if (COMET_DPP_FALLBACK_ENABLED.get() &&
262+
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
263+
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
264+
}
263265
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
264266
return withInfo(scanExec, s"Unsupported file format ${r.fileFormat}")
265267
}

spark/src/main/scala/org/apache/comet/serde/operator/CometDeltaNativeScan.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,12 +320,20 @@ object CometDeltaNativeScan extends CometOperatorSerde[CometScanExec] with Loggi
320320
partitionSchema: StructType): Seq[OperatorOuterClass.DeltaScanTask] = {
321321
if (scan.partitionFilters.isEmpty || partitionSchema.isEmpty) return tasks
322322

323+
// Phase 5b: filter out DPP expressions (DynamicPruningExpression wrapping
324+
// InSubqueryExec) because they aren't resolved at planning time. Spark
325+
// applies them post-scan at runtime. Static partition filters are still
326+
// evaluated here for file-level pruning.
327+
val staticFilters = scan.partitionFilters.filterNot(
328+
_.exists(_.isInstanceOf[org.apache.spark.sql.catalyst.expressions.PlanExpression[_]]))
329+
if (staticFilters.isEmpty) return tasks
330+
323331
// Build an `InterpretedPredicate` that expects a row whose schema matches
324332
// `partitionSchema`. Rewrite attribute references to `BoundReference`s keyed by
325333
// partition-schema column name so it can evaluate against a row we assemble below.
326334
val partitionAttrsByName =
327-
scan.partitionFilters.flatMap(_.references).groupBy(_.name.toLowerCase(Locale.ROOT))
328-
val combined = scan.partitionFilters.reduce(And)
335+
staticFilters.flatMap(_.references).groupBy(_.name.toLowerCase(Locale.ROOT))
336+
val combined = staticFilters.reduce(And)
329337
val bound = combined.transform {
330338
case a: org.apache.spark.sql.catalyst.expressions.AttributeReference =>
331339
val idx = partitionSchema.fieldIndex(a.name)

spark/src/test/scala/org/apache/comet/CometDeltaNativeSuite.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,6 +1137,63 @@ class CometDeltaNativeSuite extends CometTestBase with AdaptiveSparkPlanHelper {
11371137
}
11381138
}
11391139

1140+
test("dynamic partition pruning through join") {
1141+
assume(deltaSparkAvailable, "delta-spark not on the test classpath; skipping")
1142+
withDeltaTable("dpp") { tablePath =>
1143+
val ss = spark
1144+
import ss.implicits._
1145+
1146+
val factPath = new java.io.File(tablePath, "fact").getAbsolutePath
1147+
val dimPath = new java.io.File(tablePath, "dim").getAbsolutePath
1148+
1149+
// Fact table: partitioned by region, many rows.
1150+
(0 until 100)
1151+
.map(i => (i.toLong, s"item_$i", Seq("us", "eu", "ap")(i % 3)))
1152+
.toDF("id", "item", "region")
1153+
.write
1154+
.partitionBy("region")
1155+
.format("delta")
1156+
.save(factPath)
1157+
1158+
// Dimension table: small, filters the fact via join.
1159+
Seq(("us", "United States"))
1160+
.toDF("region", "region_name")
1161+
.write
1162+
.format("delta")
1163+
.save(dimPath)
1164+
1165+
// Star-schema join: DPP should push the dim.region='us' filter into the
1166+
// fact scan as a dynamic partition filter, pruning eu and ap partitions.
1167+
val query = s"""
1168+
SELECT f.id, f.item, d.region_name
1169+
FROM delta.`$factPath` f
1170+
JOIN delta.`$dimPath` d ON f.region = d.region
1171+
"""
1172+
val df = spark.sql(query)
1173+
val plan = df.queryExecution.executedPlan
1174+
val scans = collect(plan) { case s: CometDeltaNativeScanExec => s }
1175+
1176+
// The fact scan should be a native Delta scan (DPP doesn't prevent it).
1177+
assert(scans.nonEmpty, s"expected CometDeltaNativeScanExec in DPP plan, got:\n$plan")
1178+
1179+
// Correctness: result should only contain 'us' region rows.
1180+
val rows = df.collect()
1181+
assert(
1182+
rows.forall(_.getString(2) == "United States"),
1183+
s"expected all rows to have region_name='United States'")
1184+
// ~34 rows (100/3 rounding)
1185+
assert(rows.length > 30 && rows.length < 40, s"expected ~34 rows, got ${rows.length}")
1186+
1187+
// Compare with vanilla for row-for-row correctness.
1188+
withSQLConf(CometConf.COMET_DELTA_NATIVE_ENABLED.key -> "false") {
1189+
val vanilla = spark.sql(query).collect()
1190+
assert(
1191+
rows.map(_.toSeq).toSet == vanilla.map(_.toSeq).toSet,
1192+
s"DPP result differs from vanilla")
1193+
}
1194+
}
1195+
}
1196+
11401197
test("wider primitive type coverage") {
11411198
assume(deltaSparkAvailable, "delta-spark not on the test classpath; skipping")
11421199
withDeltaTable("primitives") { tablePath =>

0 commit comments

Comments
 (0)