Skip to content

Commit 5d72444

Browse files
committed
feat: enforce schema evolution config in native_datafusion scan
Pass schema_evolution_enabled config through protobuf to the native Parquet reader. When disabled, the SparkPhysicalExprAdapter rejects type mismatches between file and table schemas at runtime, matching Spark's behavior of throwing errors on incompatible types.
1 parent f123145 commit 5d72444

7 files changed

Lines changed: 114 additions & 1 deletion

File tree

native/core/src/execution/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,6 +1213,7 @@ impl PhysicalPlanner {
12131213
common.case_sensitive,
12141214
self.session_ctx(),
12151215
common.encryption_enabled,
1216+
common.schema_evolution_enabled,
12161217
)?;
12171218
Ok((
12181219
vec![],

native/core/src/parquet/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
775775
case_sensitive != JNI_FALSE,
776776
session_ctx,
777777
encryption_enabled,
778+
true, // schema_evolution_enabled (always true for iceberg_compat)
778779
)?;
779780

780781
let partition_index: usize = 0;

native/core/src/parquet/parquet_exec.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,14 @@ pub(crate) fn init_datasource_exec(
7272
case_sensitive: bool,
7373
session_ctx: &Arc<SessionContext>,
7474
encryption_enabled: bool,
75+
schema_evolution_enabled: bool,
7576
) -> Result<Arc<DataSourceExec>, ExecutionError> {
7677
let (table_parquet_options, spark_parquet_options) = get_options(
7778
session_timezone,
7879
case_sensitive,
7980
&object_store_url,
8081
encryption_enabled,
82+
schema_evolution_enabled,
8183
);
8284

8385
// Determine the schema and projection to use for ParquetSource.
@@ -181,6 +183,7 @@ fn get_options(
181183
case_sensitive: bool,
182184
object_store_url: &ObjectStoreUrl,
183185
encryption_enabled: bool,
186+
schema_evolution_enabled: bool,
184187
) -> (TableParquetOptions, SparkParquetOptions) {
185188
let mut table_parquet_options = TableParquetOptions::new();
186189
table_parquet_options.global.pushdown_filters = true;
@@ -190,6 +193,7 @@ fn get_options(
190193
SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
191194
spark_parquet_options.allow_cast_unsigned_ints = true;
192195
spark_parquet_options.case_sensitive = case_sensitive;
196+
spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled;
193197

194198
if encryption_enabled {
195199
table_parquet_options.crypto.configure_factory(

native/core/src/parquet/parquet_support.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ pub struct SparkParquetOptions {
7676
pub use_legacy_date_timestamp_or_ntz: bool,
7777
// Whether schema field names are case sensitive
7878
pub case_sensitive: bool,
79+
/// Whether schema evolution (type promotion) is enabled. When false, the adapter
80+
/// should reject type mismatches between file schema and table schema.
81+
pub schema_evolution_enabled: bool,
7982
}
8083

8184
impl SparkParquetOptions {
@@ -88,6 +91,7 @@ impl SparkParquetOptions {
8891
use_decimal_128: false,
8992
use_legacy_date_timestamp_or_ntz: false,
9093
case_sensitive: false,
94+
schema_evolution_enabled: true,
9195
}
9296
}
9397

@@ -100,6 +104,7 @@ impl SparkParquetOptions {
100104
use_decimal_128: false,
101105
use_legacy_date_timestamp_or_ntz: false,
102106
case_sensitive: false,
107+
schema_evolution_enabled: true,
103108
}
104109
}
105110
}

native/core/src/parquet/schema_adapter.rs

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::parquet::cast_column::CometCastColumnExpr;
1919
use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
2020
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2121
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
22-
use datafusion::common::Result as DataFusionResult;
22+
use datafusion::common::{DataFusionError, Result as DataFusionResult};
2323
use datafusion::physical_expr::expressions::Column;
2424
use datafusion::physical_expr::PhysicalExpr;
2525
use datafusion::physical_plan::ColumnarValue;
@@ -95,12 +95,61 @@ fn remap_physical_schema_names(
9595
Arc::new(Schema::new(remapped_fields))
9696
}
9797

98+
/// Check if the logical (table) schema and physical (file) schema have type
99+
/// mismatches. Returns an error message describing the first mismatch found,
100+
/// or None if all types match.
101+
fn detect_schema_mismatch(
102+
logical_schema: &SchemaRef,
103+
physical_schema: &SchemaRef,
104+
case_sensitive: bool,
105+
) -> Option<String> {
106+
for logical_field in logical_schema.fields() {
107+
let physical_field = if case_sensitive {
108+
physical_schema
109+
.fields()
110+
.iter()
111+
.find(|f| f.name() == logical_field.name())
112+
} else {
113+
physical_schema
114+
.fields()
115+
.iter()
116+
.find(|f| f.name().to_lowercase() == logical_field.name().to_lowercase())
117+
};
118+
if let Some(physical_field) = physical_field {
119+
if logical_field.data_type() != physical_field.data_type() {
120+
return Some(format!(
121+
"Parquet column cannot be converted. \
122+
Column: [{}], Expected: {}, Found: {} \
123+
(schema evolution is disabled)",
124+
logical_field.name(),
125+
logical_field.data_type(),
126+
physical_field.data_type()
127+
));
128+
}
129+
}
130+
}
131+
None
132+
}
133+
98134
impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
99135
fn create(
100136
&self,
101137
logical_file_schema: SchemaRef,
102138
physical_file_schema: SchemaRef,
103139
) -> Arc<dyn PhysicalExprAdapter> {
140+
// When schema evolution is disabled, check for type mismatches between the
141+
// logical (table) schema and the physical (file) schema. If any column has
142+
// a different type, store the error to be raised during rewrite().
143+
let schema_mismatch_error = if !self.parquet_options.schema_evolution_enabled {
144+
detect_schema_mismatch(
145+
&logical_file_schema,
146+
&physical_file_schema,
147+
self.parquet_options.case_sensitive,
148+
)
149+
} else {
150+
None
151+
};
152+
104153
// When case-insensitive, remap physical schema field names to match logical
105154
// field names. The DefaultPhysicalExprAdapter uses exact name matching, so
106155
// without this remapping, columns like "a" won't match logical "A" and will
@@ -154,6 +203,7 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
154203
default_values: self.default_values.clone(),
155204
default_adapter,
156205
logical_to_physical_names,
206+
schema_mismatch_error,
157207
})
158208
}
159209
}
@@ -183,10 +233,18 @@ struct SparkPhysicalExprAdapter {
183233
/// physical names so that downstream reassign_expr_columns can find
184234
/// columns in the actual stream schema.
185235
logical_to_physical_names: Option<HashMap<String, String>>,
236+
/// When schema evolution is disabled and file/table types differ, this
237+
/// holds the error message to return from rewrite().
238+
schema_mismatch_error: Option<String>,
186239
}
187240

188241
impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
189242
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
243+
// When schema evolution is disabled and types differ, reject the read
244+
if let Some(err) = &self.schema_mismatch_error {
245+
return Err(DataFusionError::Plan(err.clone()));
246+
}
247+
190248
// First let the default adapter handle column remapping, missing columns,
191249
// and simple scalar type casts. Then replace DataFusion's CastColumnExpr
192250
// with Spark-compatible equivalents.
@@ -496,11 +554,51 @@ mod test {
496554
Ok(())
497555
}
498556

557+
#[tokio::test]
558+
async fn parquet_schema_mismatch_rejected_when_evolution_disabled() {
559+
let file_schema = Arc::new(Schema::new(vec![
560+
Field::new("id", DataType::Int32, false),
561+
Field::new("name", DataType::Utf8, false),
562+
]));
563+
564+
let ids = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn arrow::array::Array>;
565+
let names = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"]))
566+
as Arc<dyn arrow::array::Array>;
567+
let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids, names]).unwrap();
568+
569+
// Read as Int64 (widening) with schema evolution disabled
570+
let required_schema = Arc::new(Schema::new(vec![
571+
Field::new("id", DataType::Int64, false),
572+
Field::new("name", DataType::Utf8, false),
573+
]));
574+
575+
let result =
576+
roundtrip_with_schema_evolution(&batch, required_schema.clone(), false).await;
577+
assert!(result.is_err(), "Expected error when schema evolution is disabled");
578+
let err_msg = result.unwrap_err().to_string();
579+
assert!(
580+
err_msg.contains("schema evolution is disabled"),
581+
"Error should mention schema evolution: {err_msg}"
582+
);
583+
584+
// Same read with schema evolution enabled should succeed
585+
let result = roundtrip_with_schema_evolution(&batch, required_schema, true).await;
586+
assert!(result.is_ok(), "Expected success when schema evolution is enabled");
587+
}
588+
499589
/// Create a Parquet file containing a single batch and then read the batch back using
500590
/// the specified required_schema. This will cause the PhysicalExprAdapter code to be used.
501591
async fn roundtrip(
502592
batch: &RecordBatch,
503593
required_schema: SchemaRef,
594+
) -> Result<RecordBatch, DataFusionError> {
595+
roundtrip_with_schema_evolution(batch, required_schema, true).await
596+
}
597+
598+
async fn roundtrip_with_schema_evolution(
599+
batch: &RecordBatch,
600+
required_schema: SchemaRef,
601+
schema_evolution_enabled: bool,
504602
) -> Result<RecordBatch, DataFusionError> {
505603
let filename = get_temp_filename();
506604
let filename = filename.as_path().as_os_str().to_str().unwrap().to_string();
@@ -513,6 +611,7 @@ mod test {
513611

514612
let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
515613
spark_parquet_options.allow_cast_unsigned_ints = true;
614+
spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled;
516615

517616
// Create expression adapter factory for Spark-compatible schema adaptation
518617
let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(

native/proto/src/proto/operator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ message NativeScanCommon {
100100
bool encryption_enabled = 11;
101101
string source = 12;
102102
repeated spark.spark_expression.DataType fields = 13;
103+
bool schema_evolution_enabled = 14;
103104
}
104105

105106
message NativeScan {

spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
174174
commonBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
175175
commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone"))
176176
commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))
177+
commonBuilder.setSchemaEvolutionEnabled(
178+
CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.get(scan.conf))
177179

178180
// Collect S3/cloud storage configurations
179181
val hadoopConf = scan.relation.sparkSession.sessionState

0 commit comments

Comments
 (0)