Skip to content

Commit 4b086bb

Browse files
authored
fix: reject incompatible decimal precision/scale in native_datafusion scan (#4090)
1 parent c05bd16 commit 4b086bb

2 files changed

Lines changed: 59 additions & 0 deletions

File tree

native/core/src/parquet/schema_adapter.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,42 @@ impl SparkPhysicalExprAdapter {
419419
)));
420420
}
421421

422+
// Decimal-to-decimal scale-narrowing check.
423+
// Reject reads where the read schema has a smaller scale than the
424+
// file's, because Spark's Cast below would silently truncate
425+
// fractional digits, producing wrong values. This matches the
426+
// unconditionally-lossy case in issue #4089 (e.g. Decimal(10,2) read
427+
// as Decimal(5,0)).
428+
//
429+
// Other decimal mismatches are intentionally NOT rejected here,
430+
// even though Spark's vectorized reader would reject them via
431+
// `ParquetVectorUpdaterFactory#isDecimalTypeMatched` (which requires
432+
// exact precision and scale):
433+
//
434+
// - Precision-only changes with the same scale (e.g. Decimal(5,2)
435+
// read as Decimal(3,2)): Spark 4.0's parquet-mr fallback path
436+
// (PARQUET_VECTORIZED_READER_ENABLED=false) and the vectorized
437+
// type-widening path produce null on per-value overflow, which
438+
// DataFusion's cast already does in the adapting-schema path.
439+
//
440+
// - Scale widening (e.g. Decimal(10,2) read as Decimal(10,4)): the
441+
// cast is lossless (no truncation, no overflow), so allowing it
442+
// here is strictly more permissive than Spark's vectorized reader
443+
// without risking wrong values.
444+
if let (DataType::Decimal128(_src_p, src_s), DataType::Decimal128(_dst_p, dst_s)) =
445+
(physical_type, target_type)
446+
{
447+
if dst_s < src_s {
448+
return Err(DataFusionError::Plan(format!(
449+
"Parquet column cannot be converted. Column: [{}], \
450+
Expected: {}, Found: {}",
451+
cast.input_field().name(),
452+
target_type,
453+
physical_type,
454+
)));
455+
}
456+
}
457+
422458
// For complex nested types (Struct, List, Map), Timestamp timezone
423459
// mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr
424460
// with spark_parquet_convert which handles field-name-based selection,

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,29 @@ abstract class ParquetReadSuite extends CometTestBase {
10221022
}
10231023
}
10241024

1025+
test("native_datafusion rejects incompatible decimal precision/scale") {
1026+
// Regression guard for https://github.com/apache/datafusion-comet/issues/4089.
1027+
// Reading Decimal(10,2) under a Decimal(5,0) read schema is unconditionally
1028+
// lossy: target precision is smaller than source precision and scales differ.
1029+
// Spark's vectorized reader throws SchemaColumnConvertNotSupportedException
1030+
// here on all versions. The native_datafusion scan must reject this in its
1031+
// schema adapter rather than letting Spark Cast silently rescale/truncate.
1032+
withSQLConf(
1033+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
1034+
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
1035+
withTempPath { dir =>
1036+
val path = dir.getCanonicalPath
1037+
spark
1038+
.sql("select cast('123.45' as decimal(10,2)) as d " +
1039+
"union all select cast('67.89' as decimal(10,2))")
1040+
.write
1041+
.parquet(path)
1042+
val df = spark.read.schema("d decimal(5,0)").parquet(path)
1043+
assertThrows[SparkException](df.collect())
1044+
}
1045+
}
1046+
}
1047+
10251048
test("type widening: byte → short/int/long, short → int/long, int → long") {
10261049
withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") {
10271050
withTempPath { dir =>

0 commit comments

Comments
 (0)