Skip to content

Commit 4b44591

Browse files
authored
Merge branch 'main' into opt/pushdown-filter
2 parents 8c2db0d + 70caf37 commit 4b44591

28 files changed

Lines changed: 1643 additions & 231 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,16 @@ config_namespace! {
908908
/// nanosecond resolution.
909909
pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
910910

911+
/// (reading) Optional timezone applied to INT96 columns when `coerce_int96`
912+
/// is set. When `Some`, INT96 columns coerce to
913+
/// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default
914+
/// `Timestamp(<coerce_int96>, None)`. Spark and other systems write INT96
915+
/// values as UTC-adjusted instants, so callers that need the resulting
916+
/// Arrow type to be timezone-aware (e.g. for Spark `TimestampType`
917+
/// semantics) should set this to `"UTC"`. No effect when `coerce_int96`
918+
/// is `None`.
919+
pub coerce_int96_tz: Option<String>, default = None
920+
911921
/// (reading) Use any available bloom filters when reading parquet files
912922
pub bloom_filter_on_read: bool, default = true
913923

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ impl ParquetOptions {
208208
schema_force_view_types: _,
209209
binary_as_string: _, // not used for writer props
210210
coerce_int96: _, // not used for writer props
211+
coerce_int96_tz: _, // not used for writer props
211212
skip_arrow_metadata: _,
212213
max_predicate_cache_size: _,
213214
} = self;
@@ -482,6 +483,7 @@ mod tests {
482483
binary_as_string: defaults.binary_as_string,
483484
skip_arrow_metadata: defaults.skip_arrow_metadata,
484485
coerce_int96: None,
486+
coerce_int96_tz: None,
485487
max_predicate_cache_size: defaults.max_predicate_cache_size,
486488
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
487489
}
@@ -600,6 +602,7 @@ mod tests {
600602
binary_as_string: global_options_defaults.binary_as_string,
601603
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
602604
coerce_int96: None,
605+
coerce_int96_tz: None,
603606
use_content_defined_chunking: props.content_defined_chunking().map(|c| {
604607
CdcOptions {
605608
min_chunk_size: c.min_chunk_size,

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::sync::Arc;
2525
// Re-export so the historical `file_format::*` paths still resolve.
2626
#[expect(deprecated)]
2727
pub use crate::schema_coercion::{
28-
apply_file_schema_type_coercions, coerce_file_schema_to_string_type,
28+
Int96Coercer, apply_file_schema_type_coercions, coerce_file_schema_to_string_type,
2929
coerce_file_schema_to_view_type, coerce_int96_to_resolution,
3030
transform_binary_to_string, transform_schema_to_view,
3131
};
@@ -56,7 +56,9 @@ use datafusion_session::Session;
5656

5757
use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns};
5858
use crate::reader::CachedParquetFileReaderFactory;
59-
use crate::source::{ParquetSource, parse_coerce_int96_string};
59+
use crate::source::{
60+
ParquetSource, parse_coerce_int96_string, parse_coerce_int96_tz_string,
61+
};
6062
use async_trait::async_trait;
6163
use bytes::Bytes;
6264
use datafusion_datasource::source::DataSourceExec;
@@ -333,6 +335,13 @@ impl FileFormat for ParquetFormat {
333335
Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
334336
None => None,
335337
};
338+
let coerce_int96_tz = self
339+
.options
340+
.global
341+
.coerce_int96_tz
342+
.as_ref()
343+
.map(|tz| parse_coerce_int96_tz_string(tz))
344+
.transpose()?;
336345

337346
let file_metadata_cache =
338347
state.runtime_env().cache_manager.get_file_metadata_cache();
@@ -350,6 +359,7 @@ impl FileFormat for ParquetFormat {
350359
.with_decryption_properties(file_decryption_properties)
351360
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
352361
.with_coerce_int96(coerce_int96)
362+
.with_coerce_int96_tz(coerce_int96_tz.clone())
353363
.fetch_schema_with_location()
354364
.await?;
355365
Ok::<_, DataFusionError>(result)

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! [`DFParquetMetadata`] for fetching Parquet file metadata, statistics
1919
//! and schema information.
2020
21-
use crate::{apply_file_schema_type_coercions, coerce_int96_to_resolution};
21+
use crate::{Int96Coercer, apply_file_schema_type_coercions};
2222
use arrow::array::{Array, ArrayRef, BooleanArray};
2323
use arrow::compute::and;
2424
use arrow::compute::kernels::cmp::eq;
@@ -72,6 +72,8 @@ pub struct DFParquetMetadata<'a> {
7272
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
7373
/// timeunit to coerce INT96 timestamps to
7474
pub coerce_int96: Option<TimeUnit>,
75+
/// Optional timezone applied to INT96-coerced timestamps.
76+
pub coerce_int96_tz: Option<Arc<str>>,
7577
}
7678

7779
impl<'a> DFParquetMetadata<'a> {
@@ -83,6 +85,7 @@ impl<'a> DFParquetMetadata<'a> {
8385
decryption_properties: None,
8486
file_metadata_cache: None,
8587
coerce_int96: None,
88+
coerce_int96_tz: None,
8689
}
8790
}
8891

@@ -116,6 +119,12 @@ impl<'a> DFParquetMetadata<'a> {
116119
self
117120
}
118121

122+
/// Set the optional timezone applied to INT96-coerced timestamps.
123+
pub fn with_coerce_int96_tz(mut self, timezone: Option<Arc<str>>) -> Self {
124+
self.coerce_int96_tz = timezone;
125+
self
126+
}
127+
119128
/// Fetch parquet metadata from the remote object store
120129
pub async fn fetch_metadata(&self) -> Result<Arc<ParquetMetaData>> {
121130
// implementation to fetch parquet metadata
@@ -218,11 +227,9 @@ impl<'a> DFParquetMetadata<'a> {
218227
.coerce_int96
219228
.as_ref()
220229
.and_then(|time_unit| {
221-
coerce_int96_to_resolution(
222-
file_metadata.schema_descr(),
223-
&schema,
224-
time_unit,
225-
)
230+
Int96Coercer::new(file_metadata.schema_descr(), &schema, time_unit)
231+
.with_timezone(self.coerce_int96_tz.clone())
232+
.coerce()
226233
})
227234
.unwrap_or(schema);
228235
Ok(schema)

datafusion/datasource-parquet/src/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub use row_filter::can_expr_be_pushed_down_with_schemas;
5454
pub use row_group_filter::RowGroupAccessPlanFilter;
5555
#[expect(deprecated)]
5656
pub use schema_coercion::{
57-
apply_file_schema_type_coercions, coerce_file_schema_to_string_type,
57+
Int96Coercer, apply_file_schema_type_coercions, coerce_file_schema_to_string_type,
5858
coerce_file_schema_to_view_type, coerce_int96_to_resolution,
5959
transform_binary_to_string, transform_schema_to_view,
6060
};

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState};
2929
use crate::row_filter::{RowFilterGenerator, build_projection_read_plan};
3030
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
3131
use crate::{
32-
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
33-
apply_file_schema_type_coercions, coerce_int96_to_resolution,
32+
Int96Coercer, ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
33+
apply_file_schema_type_coercions,
3434
};
3535
use arrow::array::RecordBatch;
3636
use arrow::datatypes::DataType;
@@ -121,6 +121,10 @@ pub(super) struct ParquetMorselizer {
121121
pub enable_row_group_stats_pruning: bool,
122122
/// Coerce INT96 timestamps to specific TimeUnit
123123
pub coerce_int96: Option<TimeUnit>,
124+
/// Optional timezone applied to INT96-coerced timestamps. When `Some`, the
125+
/// coerced column type becomes `Timestamp(<coerce_int96>, Some(<tz>))`.
126+
/// No effect when `coerce_int96` is `None`.
127+
pub coerce_int96_tz: Option<Arc<str>>,
124128
/// Optional parquet FileDecryptionProperties
125129
#[cfg(feature = "parquet_encryption")]
126130
pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
@@ -283,6 +287,7 @@ struct PreparedParquetOpen {
283287
enable_row_group_stats_pruning: bool,
284288
limit: Option<usize>,
285289
coerce_int96: Option<TimeUnit>,
290+
coerce_int96_tz: Option<Arc<str>>,
286291
expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
287292
predicate_creation_errors: Count,
288293
max_predicate_cache_size: Option<usize>,
@@ -653,6 +658,7 @@ impl ParquetMorselizer {
653658
enable_row_group_stats_pruning: self.enable_row_group_stats_pruning,
654659
limit: self.limit,
655660
coerce_int96: self.coerce_int96,
661+
coerce_int96_tz: self.coerce_int96_tz.clone(),
656662
expr_adapter_factory: Arc::clone(&self.expr_adapter_factory),
657663
predicate_creation_errors,
658664
max_predicate_cache_size: self.max_predicate_cache_size,
@@ -780,11 +786,13 @@ impl MetadataLoadedParquetOpen {
780786
}
781787

782788
if let Some(ref coerce) = prepared.coerce_int96
783-
&& let Some(merged) = coerce_int96_to_resolution(
789+
&& let Some(merged) = Int96Coercer::new(
784790
reader_metadata.parquet_schema(),
785791
&physical_file_schema,
786792
coerce,
787793
)
794+
.with_timezone(prepared.coerce_int96_tz.clone())
795+
.coerce()
788796
{
789797
physical_file_schema = Arc::new(merged);
790798
options = options.with_schema(Arc::clone(&physical_file_schema));
@@ -1603,6 +1611,10 @@ mod test {
16031611
enable_bloom_filter: self.enable_bloom_filter,
16041612
enable_row_group_stats_pruning: self.enable_row_group_stats_pruning,
16051613
coerce_int96: self.coerce_int96,
1614+
// End-to-end coercion behavior (including timezone) is
1615+
// covered by parquet.slt. No opener-level test currently
1616+
// needs a non-default value here.
1617+
coerce_int96_tz: None,
16061618
#[cfg(feature = "parquet_encryption")]
16071619
file_decryption_properties: None,
16081620
expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),

0 commit comments

Comments
 (0)