Skip to content

Commit c05bd16

Browse files
authored
fix: reject string/binary read as numeric in native_datafusion scan (#4091)
1 parent b5b86b1 commit c05bd16

2 files changed

Lines changed: 58 additions & 0 deletions

File tree

native/core/src/parquet/schema_adapter.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,40 @@ impl SparkPhysicalExprAdapter {
385385
let physical_type = cast.input_field().data_type();
386386
let target_type = cast.target_field().data_type();
387387

388+
// Reject reading a string/binary Parquet column as anything other
389+
// than string, binary, or a binary-encoded decimal. This mirrors
390+
// Spark's TypeUtil.checkParquetType for the BINARY case (lines
391+
// 208-221): a BINARY (or UTF8-annotated BINARY) physical column is
392+
// only readable as StringType, BinaryType, or a binary-encoded
393+
// decimal; every other target type (numeric, boolean, date,
394+
// timestamp, ...) raises SchemaColumnConvertNotSupportedException.
395+
//
396+
// Without this guard, Spark's Cast below (in is_adapting_schema
397+
// mode) falls through to DataFusion's cast, which silently parses
398+
// the bytes (returning nulls for non-numeric strings, parsing
399+
// date/timestamp/boolean strings, or in some paths reinterpreting
400+
// raw bytes). See issue #4088.
401+
if matches!(
402+
physical_type,
403+
DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary
404+
) && !matches!(
405+
target_type,
406+
DataType::Utf8
407+
| DataType::LargeUtf8
408+
| DataType::Binary
409+
| DataType::LargeBinary
410+
| DataType::Decimal128(_, _)
411+
| DataType::Decimal256(_, _)
412+
) {
413+
return Err(DataFusionError::Plan(format!(
414+
"Parquet column cannot be converted. Column: [{}], \
415+
Expected: {}, Found: {}",
416+
cast.input_field().name(),
417+
target_type,
418+
physical_type,
419+
)));
420+
}
421+
388422
// For complex nested types (Struct, List, Map), Timestamp timezone
389423
// mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr
390424
// with spark_parquet_convert which handles field-name-based selection,

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,30 @@ abstract class ParquetReadSuite extends CometTestBase {
998998
}
999999
}
10001000

1001+
test("native_datafusion rejects string read as non-string/binary type") {
1002+
// Regression guard for https://github.com/apache/datafusion-comet/issues/4088.
1003+
// Spark's vectorized reader rejects reading a Parquet BINARY column as
1004+
// anything except StringType, BinaryType, or a binary-encoded decimal (see
1005+
// TypeUtil.checkParquetType, BINARY case). The native_datafusion scan
1006+
// must do the same in its schema adapter rather than letting DataFusion's
1007+
// cast silently parse the bytes or reinterpret them.
1008+
withSQLConf(
1009+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
1010+
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
1011+
withTempPath { dir =>
1012+
val path = dir.getCanonicalPath
1013+
Seq("a", "b", "c").toDF("c").write.parquet(path)
1014+
// Cover representative non-string/binary target types: numeric,
1015+
// boolean, date, and timestamp. Each would silently produce wrong
1016+
// results without the schema-adapter guard.
1017+
Seq("int", "bigint", "double", "boolean", "date", "timestamp").foreach { sqlType =>
1018+
val df = spark.read.schema(s"c $sqlType").parquet(path)
1019+
assertThrows[SparkException](df.collect())
1020+
}
1021+
}
1022+
}
1023+
}
1024+
10011025
test("type widening: byte → short/int/long, short → int/long, int → long") {
10021026
withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") {
10031027
withTempPath { dir =>

0 commit comments

Comments
 (0)