Skip to content

Commit 688b633

Browse files
schenksjclaude
andcommitted
feat: auto-detect and configure Delta DV read strategy
Users no longer need to manually set `spark.databricks.delta.deletionVectors.useMetadataRowIndex=false`. Comet now registers a Catalyst optimizer rule (`CometDeltaDvConfigRule`) via `CometSparkSessionExtensions.injectOptimizerRule` that automatically sets the config when `COMET_DELTA_NATIVE_ENABLED=true`. The rule runs in the optimizer phase which executes BEFORE Delta's planner strategies, so the config takes effect for the SAME query where it's first needed. Delta's default `useMetadataRowIndex=true` strategy applies DVs opaquely inside `DeletionVectorBoundFileFormat` at read time — no plan-level Filter is inserted, making it impossible for Comet's physical-plan rewrite to detect and handle. Setting it to `false` forces Delta's older strategy (Project + Filter + __delta_internal_is_row_deleted) which our `stripDeltaDvWrappers` can recognize and route through DeltaDvFilterExec. Removed the manual config from CometDeltaNativeSuite's sparkConf. Tests: succeeded 34, failed 0, canceled 0, ignored 0, pending 0 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 393507d commit 688b633

2 files changed

Lines changed: 37 additions & 9 deletions

File tree

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,40 @@ class CometSparkSessionExtensions
4949
extensions.injectColumnar { session => CometExecColumnar(session) }
5050
extensions.injectQueryStagePrepRule { session => CometScanRule(session) }
5151
extensions.injectQueryStagePrepRule { session => CometExecRule(session) }
52+
53+
// Phase 3 auto-config: tell Delta to use its older DV read strategy
54+
// (Project + Filter with __delta_internal_is_row_deleted) instead of
55+
// the default metadata-row-index strategy. The older strategy inserts
56+
// a plan-level Filter that CometScanRule.stripDeltaDvWrappers can
57+
// detect and remove, routing DVs through our DeltaDvFilterExec.
58+
//
59+
// The metadata-row-index strategy applies DVs opaquely inside
60+
// DeletionVectorBoundFileFormat at read time, which Comet's native
61+
// reader can't intercept. By setting this config in an optimizer rule,
62+
// it takes effect BEFORE Delta's PreprocessTableWithDVsStrategy runs
63+
// (optimizer -> strategies ordering in Catalyst).
64+
extensions.injectOptimizerRule { session =>
65+
new org.apache.spark.sql.catalyst.rules.Rule[
66+
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan] {
67+
@volatile private var configured = false
68+
override def apply(plan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
69+
: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = {
70+
if (!configured) {
71+
try {
72+
if (CometConf.COMET_DELTA_NATIVE_ENABLED.get(session.sessionState.conf)) {
73+
session.conf
74+
.set("spark.databricks.delta.deletionVectors.useMetadataRowIndex", "false")
75+
}
76+
} catch {
77+
case _: Throwable => // delta-spark not on classpath; ignore
78+
}
79+
configured = true
80+
}
81+
plan
82+
}
83+
override val ruleName: String = "CometDeltaDvConfigRule"
84+
}
85+
}
5286
}
5387

5488
case class CometScanColumnar(session: SparkSession) extends ColumnarRule {

spark/src/test/scala/org/apache/comet/CometDeltaNativeSuite.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,9 @@ class CometDeltaNativeSuite extends CometTestBase with AdaptiveSparkPlanHelper {
6262
// Explicitly clear the prefixes so production-shaped filenames are used.
6363
conf.set("spark.databricks.delta.testOnly.dataFileNamePrefix", "")
6464
conf.set("spark.databricks.delta.testOnly.dvFileNamePrefix", "")
65-
// Delta 3.x's default `useMetadataRowIndex=true` strategy rewrites DV-in-use
66-
// scans to read the parquet file with `_metadata.row_index` + other metadata
67-
// columns, and applies the DV *inside* `DeletionVectorBoundFileFormat` at read
68-
// time - no Filter is inserted in the plan. That makes the DV completely
69-
// opaque to any physical-plan rewrite: there's nothing to detect.
70-
// Setting this to false falls Delta back to its older strategy that DOES
71-
// insert `Project -> Filter(__delta_internal_is_row_deleted = 0) -> scan`,
72-
// which our `stripDeltaDvWrappers` rewrite can recognize and unwind.
73-
conf.set("spark.databricks.delta.deletionVectors.useMetadataRowIndex", "false")
65+
// useMetadataRowIndex is now set automatically by CometDeltaDvConfigRule
66+
// (injected via CometSparkSessionExtensions.injectOptimizerRule) when
67+
// COMET_DELTA_NATIVE_ENABLED=true. No manual config needed.
7468
conf
7569
}
7670

0 commit comments

Comments
 (0)