Skip to content

Commit ec70c59

Browse files
schenksjclaude
andcommitted
fix: address all remaining code review items
Fixes all 14 previously-deferred review findings: #4 Case-sensitivity in DV column detection: isDeltaDvFilterPattern and findAndStripDeltaScanBelow now use equalsIgnoreCase for the __delta_internal_is_row_deleted column name match. #8 S3 key documentation: added comment in JNI documenting the Hadoop-style key names that storageOptions carries and how extract_storage_config maps them. #10 Proto comment inaccuracy: updated reserved field number comments to describe purpose rather than referencing (now-stale) phase numbers. Added field numbering strategy note on DeltaScanCommon. #11 Module quarantine docs: updated delta/mod.rs doc comment to note that create_object_store returns Arc<dyn ObjectStore> from object_store_kernel 0.12, and that it never escapes the module. Updated public API listing to match current exports. #12 Optimizer rule double-init: added synchronized double-checked locking on the CometDeltaDvConfigRule to prevent concurrent threads from racing on the config set. #14 Incomplete partition type support: castPartitionString now throws IllegalArgumentException for unsupported types (STRUCT, ARRAY, MAP, etc.) instead of silently converting to UTF8String. #6 DV materialization clarity: added comment explaining why .unwrap_or_default() is safe (get_row_indexes returns Ok(None) only if has_vector() lied, which kernel guarantees doesn't happen; Err propagates via ?). #17 Consistent JNI null handling: extracted read_string_array helper for reading Java String[] into Vec<String>, consolidating the null-check + iteration pattern. #18-19 Proto field ordering: added numbering strategy comment to DeltaScanCommon and DeltaScanTask messages. #20 Memory note: added comment about potential driver OOM on extremely large tables (millions of files) with suggestion for future streaming/chunked processing. Tests: succeeded 35, failed 0, canceled 0, ignored 0, pending 0 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a35bbbe commit ec70c59

7 files changed

Lines changed: 82 additions & 41 deletions

File tree

native/core/src/delta/jni.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_planDeltaScan(
8080
};
8181

8282
// Phase 2: read column names for BoundReference resolution.
83-
let col_names: Vec<String> = if column_names.is_null() {
84-
Vec::new()
85-
} else {
86-
let len = column_names.len(env)?;
87-
let mut names = Vec::with_capacity(len);
88-
for i in 0..len {
89-
let obj = column_names.get_element(env, i)?;
90-
let jstr = unsafe { JString::from_raw(env, obj.into_raw()) };
91-
names.push(jstr.try_to_string(env)?);
92-
}
93-
names
94-
};
83+
// storageOptions map carries Hadoop-style keys (fs.s3a.access.key,
84+
// fs.s3a.secret.key, fs.s3a.endpoint, fs.s3a.path.style.access,
85+
// fs.s3a.endpoint.region, fs.s3a.session.token) extracted by
86+
// NativeConfig.extractObjectStoreOptions on the Scala side.
87+
// extract_storage_config below maps these to kernel's DeltaStorageConfig.
88+
let col_names = read_string_array(env, &column_names)?;
9589

9690
// Phase 2: deserialize the Catalyst predicate (if provided) for
9791
// kernel's stats-based file pruning. Empty bytes = no predicate.
@@ -232,6 +226,24 @@ fn extract_storage_config(
232226
})
233227
}
234228

229+
/// Read a Java `String[]` into a `Vec<String>`. Returns empty vec for null arrays.
230+
fn read_string_array(
231+
env: &mut Env,
232+
arr: &jni::objects::JObjectArray,
233+
) -> CometResult<Vec<String>> {
234+
if arr.is_null() {
235+
return Ok(Vec::new());
236+
}
237+
let len = arr.len(env)?;
238+
let mut result = Vec::with_capacity(len);
239+
for i in 0..len {
240+
let obj = arr.get_element(env, i)?;
241+
let jstr = unsafe { JString::from_raw(env, obj.into_raw()) };
242+
result.push(jstr.try_to_string(env)?);
243+
}
244+
Ok(result)
245+
}
246+
235247
/// `map.get(key)` for a `java.util.Map<String, String>` surfaced as a
236248
/// `JMap`. Returns `None` if the key is absent or the value is `null`.
237249
fn map_get_string(

native/core/src/delta/mod.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,24 @@
2020
//! This module is intentionally quarantined: it imports `delta_kernel`,
2121
//! `object_store_kernel` (object_store 0.12), and kernel's own `arrow` (57),
2222
//! which live in a *separate* dep subtree from Comet's `arrow 58` /
23-
//! `object_store 0.13`. Nothing typed by those crates leaves this module
23+
//! `object_store 0.13`. Nothing typed by those crates leaves this module:
2424
//! only plain Rust types (`String`, `i64`, `HashMap<String,String>`, our own
2525
//! `DeltaFileEntry`, etc.) cross the boundary to the rest of Comet.
2626
//!
27-
//! The public API is deliberately small:
28-
//! - `DeltaStorageConfig`: credential struct (mirrors tantivy4java).
29-
//! - `list_delta_files`: log-replay driver that returns the active file
30-
//! list for a given snapshot version.
31-
//! - `DeltaFileEntry`: per-file metadata with partition values + DV flag +
32-
//! record-count stats.
27+
//! Note: `create_object_store()` returns `Arc<dyn ObjectStore>` from the
28+
//! `object_store_kernel` crate (0.12). This type only travels within this
29+
//! module as an opaque trait object fed to kernel's `DefaultEngine`; callers
30+
//! outside the delta module never see it or downcast it.
3331
//!
34-
//! Later phases add expression translation, deletion-vector decoding, and
35-
//! the `DeltaScanExec` physical operator; those live in sibling files.
32+
//! Public API:
33+
//! - `DeltaStorageConfig`: credential struct for S3/Azure/local access.
34+
//! - `plan_delta_scan` / `list_delta_files`: log-replay drivers that
35+
//! return the active file list, DV indexes, column mappings, and
36+
//! unsupported-feature flags for a given snapshot version.
37+
//! - `DeltaFileEntry` / `DeltaScanPlan`: per-file metadata with
38+
//! partition values, deleted row indexes, and record-count stats.
39+
//! - `catalyst_to_kernel_predicate_with_names`: Catalyst-proto Expr to
40+
//! kernel Predicate translator for stats-based file pruning.
3641
3742
pub mod engine;
3843
pub mod error;

native/core/src/delta/scan.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,16 @@ pub fn plan_delta_scan_with_predicate(
234234
// and the various storage-type variants transparently. This runs on the
235235
// driver (same process that's building the scan plan), so we only pay
236236
// the DV-fetch latency once per query.
237+
//
238+
// Note: for very large tables (millions of files), this collects all
239+
// entries into memory before returning. Consider streaming/chunked
240+
// processing if driver OOM becomes an issue at extreme scale.
237241
let mut entries: Vec<DeltaFileEntry> = Vec::with_capacity(raw.len());
238242
for r in raw {
243+
// get_row_indexes returns Ok(Some(Vec<u64>)) when a DV is present,
244+
// Ok(None) when has_vector() lied (shouldn't happen), or Err on I/O
245+
// failure. The `?` propagates I/O errors; `unwrap_or_default` handles
246+
// the None case defensively (empty = no deletions, safe fallback).
239247
let deleted_row_indexes = if r.dv_info.has_vector() {
240248
r.dv_info
241249
.get_row_indexes(&engine, &table_root_url)?

native/proto/src/proto/operator.proto

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ message IcebergDeleteFile {
255255
// Common data shared by all partitions in split mode for a Delta native scan
256256
// (sent once at planning time). Mirrors NativeScanCommon/IcebergScanCommon so
257257
// the same downstream ParquetSource wiring can be reused.
258+
//
259+
// Field numbering note: fields are grouped by concern, not sequential.
260+
// New fields should use the next available number at the END of the
261+
// message to maintain wire compatibility with existing serialized data.
258262
message DeltaScanCommon {
259263
// Required schema (projection applied to data schema)
260264
repeated SparkStructField required_schema = 1;
@@ -308,9 +312,9 @@ message DeltaScanTask {
308312
// Empty means the file has no DV in use. Row indexes are 0-based into the file's
309313
// physical parquet row space (same semantics as `DvInfo::get_row_indexes`).
310314
repeated uint64 deleted_row_indexes = 5;
311-
// Reserved for future phases:
312-
// 6: residual_predicate_idx (Phase 5)
313-
// 7: physical_schema_idx (Phase 4 — column mapping)
315+
// Reserved for future use:
316+
// 6: per-task residual predicate index (predicates kernel couldn't fully evaluate)
317+
// 7: per-task physical schema index (for per-file schema evolution)
314318
}
315319

316320
// Partition column value extracted from a Delta add action. Delta represents

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,25 @@ class CometSparkSessionExtensions
6464
extensions.injectOptimizerRule { session =>
6565
new org.apache.spark.sql.catalyst.rules.Rule[
6666
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan] {
67+
private val configLock = new Object()
6768
@volatile private var configured = false
6869
override def apply(plan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
6970
: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = {
7071
if (!configured) {
71-
try {
72-
if (CometConf.COMET_DELTA_NATIVE_ENABLED.get(session.sessionState.conf)) {
73-
session.conf
74-
.set("spark.databricks.delta.deletionVectors.useMetadataRowIndex", "false")
72+
configLock.synchronized {
73+
if (!configured) {
74+
try {
75+
if (CometConf.COMET_DELTA_NATIVE_ENABLED.get(session.sessionState.conf)) {
76+
session.conf.set(
77+
"spark.databricks.delta.deletionVectors.useMetadataRowIndex",
78+
"false")
79+
}
80+
} catch {
81+
case _: Throwable => // delta-spark not on classpath; ignore
82+
}
83+
configured = true
7584
}
76-
} catch {
77-
case _: Throwable => // delta-spark not on classpath; ignore
7885
}
79-
configured = true
8086
}
8187
plan
8288
}

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,16 @@ case class CometScanRule(session: SparkSession)
183183
}
184184

185185
/** Matches `__delta_internal_is_row_deleted = 0` (the filter Delta injects). */
186-
private def isDeltaDvFilterPattern(cond: Expression): Boolean = cond match {
187-
case EqualTo(attr: AttributeReference, lit: Literal)
188-
if attr.name == DeltaReflection.IsRowDeletedColumnName =>
189-
lit.value != null && lit.value.toString == "0"
190-
case EqualTo(lit: Literal, attr: AttributeReference)
191-
if attr.name == DeltaReflection.IsRowDeletedColumnName =>
192-
lit.value != null && lit.value.toString == "0"
193-
case _ => false
186+
private def isDeltaDvFilterPattern(cond: Expression): Boolean = {
187+
def isRowDeletedRef(name: String): Boolean =
188+
name.equalsIgnoreCase(DeltaReflection.IsRowDeletedColumnName)
189+
cond match {
190+
case EqualTo(attr: AttributeReference, lit: Literal) if isRowDeletedRef(attr.name) =>
191+
lit.value != null && lit.value.toString == "0"
192+
case EqualTo(lit: Literal, attr: AttributeReference) if isRowDeletedRef(attr.name) =>
193+
lit.value != null && lit.value.toString == "0"
194+
case _ => false
195+
}
194196
}
195197

196198
/**
@@ -204,7 +206,7 @@ case class CometScanRule(session: SparkSession)
204206
userOutput: Seq[Attribute]): Option[SparkPlan] = plan match {
205207
case scan: FileSourceScanExec
206208
if DeltaReflection.isDeltaFileFormat(scan.relation.fileFormat) &&
207-
scan.output.exists(_.name == DeltaReflection.IsRowDeletedColumnName) =>
209+
scan.output.exists(_.name.equalsIgnoreCase(DeltaReflection.IsRowDeletedColumnName)) =>
208210
Some(rebuildDeltaScanWithoutDvColumn(scan, userOutput))
209211
case other if other.children.size == 1 =>
210212
// Single-child wrappers (Project, ColumnarToRow, etc.) Delta may insert between

spark/src/main/scala/org/apache/comet/serde/operator/CometDeltaNativeScan.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,11 @@ object CometDeltaNativeScan extends CometOperatorSerde[CometScanExec] with Loggi
385385
val dec = org.apache.spark.sql.types.Decimal(new java.math.BigDecimal(s))
386386
dec.changePrecision(d.precision, d.scale)
387387
dec
388-
case _ => UTF8String.fromString(s)
388+
case other =>
389+
throw new IllegalArgumentException(
390+
s"Unsupported Delta partition column type $other for value '$s'. " +
391+
"Supported types: string, int, long, short, byte, float, double, " +
392+
"boolean, date, timestamp, decimal.")
389393
}
390394
}
391395

0 commit comments

Comments
 (0)