Skip to content

Commit 10dd91a

Browse files
schenksjclaude
andcommitted
feat: add native Delta Lake scan via delta-kernel-rs
Add native Delta Lake read support to Comet using delta-kernel-rs for log replay, matching the existing Iceberg native scan path. Core implementation: - delta-kernel-rs 0.19 for log replay (arrow-57 isolated from Comet's arrow-58) - JNI entry point: Native.planDeltaScan() calls kernel on the driver - DeltaScanCommon/DeltaScan/DeltaScanTask protobuf messages - CometScanRule: detect DeltaParquetFileFormat, stripDeltaDvWrappers - CometDeltaNativeScan: serde with partition pruning, predicate pushdown - CometDeltaNativeScanExec: split-mode serialization, DPP, metrics - DeltaPlanDataInjector: LRU-cached split-mode injection - Rust planner: DeltaScan match arm with ColumnMappingFilterRewriter - DeltaDvFilterExec: per-batch deletion vector row masking - DeltaReflection: class-name detection (no spark-delta compile dep) - CometDeltaDvConfigRule: auto-configure useMetadataRowIndex=false Supports: partitioned/unpartitioned tables, schema evolution, time travel, column mapping (none/id/name), deletion vectors, stats-based file pruning, data filter pushdown, DPP, complex types, cloud storage (S3/Azure/GCS), protocol feature gating with graceful fallback. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 88c1ffc commit 10dd91a

27 files changed

Lines changed: 3818 additions & 164 deletions

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ object CometConf extends ShimCometConf {
116116

117117
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
118118
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
119+
val SCAN_NATIVE_DELTA_COMPAT = "native_delta_compat"
119120
val SCAN_AUTO = "auto"
120121

121122
val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl")
@@ -153,6 +154,38 @@ object CometConf extends ShimCometConf {
153154
.checkValue(v => v > 0, "Data file concurrency limit must be positive")
154155
.createWithDefault(1)
155156

157+
val COMET_DELTA_NATIVE_ENABLED: ConfigEntry[Boolean] =
158+
conf("spark.comet.scan.deltaNative.enabled")
159+
.category(CATEGORY_SCAN)
160+
.doc(
161+
"Whether to enable native Delta Lake table scan using delta-kernel-rs. When " +
162+
"enabled, Delta tables are read through Comet's native Parquet reader, with " +
163+
"the transaction log replayed by delta-kernel-rs on the driver. Bypasses " +
164+
"Spark's DataSource V1/V2 paths for better performance.")
165+
.booleanConf
166+
.createWithDefault(false)
167+
168+
val COMET_DELTA_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] =
169+
conf("spark.comet.scan.deltaNative.dataFileConcurrencyLimit")
170+
.category(CATEGORY_SCAN)
171+
.doc(
172+
"The number of Delta data files to read concurrently within a single task. " +
173+
"Higher values improve throughput for tables with many small files by overlapping " +
174+
"I/O latency, but increase memory usage. Values between 2 and 8 are suggested.")
175+
.intConf
176+
.checkValue(v => v > 0, "Data file concurrency limit must be positive")
177+
.createWithDefault(1)
178+
179+
val COMET_DELTA_FALLBACK_ON_UNSUPPORTED_FEATURE: ConfigEntry[Boolean] =
180+
conf("spark.comet.scan.deltaNative.fallbackOnUnsupportedFeature")
181+
.category(CATEGORY_SCAN)
182+
.doc(
183+
"When true, Delta tables declaring a reader feature that Comet does not yet " +
184+
"support (e.g. rowTracking, variantType) are tagged for fallback to Spark's " +
185+
"regular path rather than failing the query. Turn off only for testing.")
186+
.booleanConf
187+
.createWithDefault(true)
188+
156189
val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
157190
conf("spark.comet.scan.csv.v2.enabled")
158191
.category(CATEGORY_TESTING)

0 commit comments

Comments
 (0)