Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/proto-models/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,7 @@ message FileScanExecConf {
optional uint64 batch_size = 12;

optional ProjectionExprs projection_exprs = 13;
optional bool partitioned_by_file_group = 14;
}

message ParquetScanExecNode {
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto-models/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6848,6 +6848,9 @@ impl serde::Serialize for FileScanExecConf {
if self.projection_exprs.is_some() {
len += 1;
}
if self.partitioned_by_file_group.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?;
if !self.file_groups.is_empty() {
struct_ser.serialize_field("fileGroups", &self.file_groups)?;
Expand Down Expand Up @@ -6884,6 +6887,9 @@ impl serde::Serialize for FileScanExecConf {
if let Some(v) = self.projection_exprs.as_ref() {
struct_ser.serialize_field("projectionExprs", v)?;
}
if let Some(v) = self.partitioned_by_file_group.as_ref() {
struct_ser.serialize_field("partitionedByFileGroup", v)?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -6911,6 +6917,8 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf {
"batchSize",
"projection_exprs",
"projectionExprs",
"partitioned_by_file_group",
"partitionedByFileGroup",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -6926,6 +6934,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf {
Constraints,
BatchSize,
ProjectionExprs,
PartitionedByFileGroup,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -6958,6 +6967,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf {
"constraints" => Ok(GeneratedField::Constraints),
"batchSize" | "batch_size" => Ok(GeneratedField::BatchSize),
"projectionExprs" | "projection_exprs" => Ok(GeneratedField::ProjectionExprs),
"partitionedByFileGroup" | "partitioned_by_file_group" => Ok(GeneratedField::PartitionedByFileGroup),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand Down Expand Up @@ -6988,6 +6998,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf {
let mut constraints__ = None;
let mut batch_size__ = None;
let mut projection_exprs__ = None;
let mut partitioned_by_file_group__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::FileGroups => {
Expand Down Expand Up @@ -7061,6 +7072,12 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf {
}
projection_exprs__ = map_.next_value()?;
}
GeneratedField::PartitionedByFileGroup => {
if partitioned_by_file_group__.is_some() {
return Err(serde::de::Error::duplicate_field("partitionedByFileGroup"));
}
partitioned_by_file_group__ = map_.next_value()?;
}
}
}
Ok(FileScanExecConf {
Expand All @@ -7075,6 +7092,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf {
constraints: constraints__,
batch_size: batch_size__,
projection_exprs: projection_exprs__,
partitioned_by_file_group: partitioned_by_file_group__,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto-models/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,8 @@ pub struct FileScanExecConf {
pub batch_size: ::core::option::Option<u64>,
#[prost(message, optional, tag = "13")]
pub projection_exprs: ::core::option::Option<ProjectionExprs>,
#[prost(bool, optional, tag = "14")]
pub partitioned_by_file_group: ::core::option::Option<bool>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ParquetScanExecNode {
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ pub fn parse_protobuf_file_scan_config(
.with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize))
.with_output_ordering(output_ordering)
.with_batch_size(proto.batch_size.map(|s| s as usize))
.with_partitioned_by_file_group(proto.partitioned_by_file_group.unwrap_or(false))
.build();
Ok(config)
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ pub fn serialize_file_scan_config(
constraints: Some(conf.constraints.clone().into()),
batch_size: conf.batch_size.map(|s| s as u64),
projection_exprs,
partitioned_by_file_group: Some(conf.partitioned_by_file_group),
})
}

Expand Down
49 changes: 49 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4067,5 +4067,54 @@ fn test_custom_node_with_dynamic_filter_dedup_roundtrip() -> Result<()> {
// rewrite can reconstruct the remapped form on the other side.
assert_dynamic_filters_equal(deser_custom_df, deser_filter_df);
assert_dynamic_filter_update_is_visible(deser_custom_df, deser_filter_df)?;

Ok(())
}

#[test]
fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> {
use datafusion::datasource::physical_plan::FileScanConfig;

let file_schema =
Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema)));
let scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
"/path/to/file.parquet".to_string(),
1024,
)])])
.with_partitioned_by_file_group(true)
.build();

assert!(scan_config.partitioned_by_file_group);

let exec_plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(scan_config);

let ctx = SessionContext::new();
let codec = DefaultPhysicalExtensionCodec {};
let proto_converter = DefaultPhysicalProtoConverter {};
let bytes = physical_plan_to_bytes_with_proto_converter(
Arc::clone(&exec_plan),
&codec,
&proto_converter,
)?;
let result_plan = physical_plan_from_bytes_with_proto_converter(
bytes.as_ref(),
ctx.task_ctx().as_ref(),
&codec,
&proto_converter,
)?;

let data_source_exec = result_plan
.downcast_ref::<DataSourceExec>()
.expect("Expected DataSourceExec");
let file_scan_config = data_source_exec
.data_source()
.downcast_ref::<FileScanConfig>()
.expect("Expected FileScanConfig");

assert!(file_scan_config.partitioned_by_file_group);

Ok(())
}
Loading