Skip to content

Commit ed347f2

Browse files
authored
feat(writer): honor write.parquet.* via ParquetWriterBuilder::from_table_properties (#2561)
## Which issue does this PR close? N/A — no tracking issue. ## What changes are included in this PR? Only the DataFusion `INSERT INTO` path honored `write.parquet.*` table properties, and only the content-defined-chunking (CDC) keys, via a hand-rolled translation inlined in `IcebergWriteExec`. Anything writing through the writer stack directly (`DataFileWriter` → `ParquetWriterBuilder`) silently fell back to parquet-rs defaults. This PR moves that translation behind a reusable constructor: - Add `ParquetWriterBuilder::from_table_properties(&TableProperties, schema)`, the single place that maps `write.parquet.*` into `WriterProperties`. It currently translates the CDC keys (`write.parquet.content-defined-chunking.*`); other keys still fall back to parquet-rs defaults and can be added here later. - Add a chainable `with_match_mode` setter so the field match mode can be overridden — DataFusion needs name-based matching since its Arrow batches carry no field-id metadata. - Refactor the DataFusion `insert_into` writer to build via `from_table_properties`, reusing the `TableProperties` it already parses and dropping the inline CDC translation. Additive only: `new` and `new_with_match_mode` are unchanged; no breaking changes. ## Are these changes tested? - Unit tests in `parquet_writer.rs`: CDC is off by default, and CDC properties propagate through `build()` to the writer's `WriterProperties` — asserted on the getter rather than by re-reading a written file, so future `write.parquet.*` options only need an assertion on their corresponding getter. - Existing `test_insert_into*` DataFusion integration tests cover the refactored path, which is behaviorally unchanged.
1 parent c915c31 commit ed347f2

3 files changed

Lines changed: 123 additions & 17 deletions

File tree

crates/iceberg/public-api.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3205,8 +3205,10 @@ pub async fn iceberg::writer::file_writer::ParquetWriter::close(self) -> iceberg
32053205
pub async fn iceberg::writer::file_writer::ParquetWriter::write(&mut self, batch: &arrow_array::record_batch::RecordBatch) -> iceberg::Result<()>
32063206
pub struct iceberg::writer::file_writer::ParquetWriterBuilder
32073207
impl iceberg::writer::file_writer::ParquetWriterBuilder
3208+
pub fn iceberg::writer::file_writer::ParquetWriterBuilder::from_table_properties(table_props: &iceberg::spec::TableProperties, schema: iceberg::spec::SchemaRef) -> Self
32083209
pub fn iceberg::writer::file_writer::ParquetWriterBuilder::new(props: parquet::file::properties::WriterProperties, schema: iceberg::spec::SchemaRef) -> Self
32093210
pub fn iceberg::writer::file_writer::ParquetWriterBuilder::new_with_match_mode(props: parquet::file::properties::WriterProperties, schema: iceberg::spec::SchemaRef, match_mode: iceberg::arrow::FieldMatchMode) -> Self
3211+
pub fn iceberg::writer::file_writer::ParquetWriterBuilder::with_match_mode(self, match_mode: iceberg::arrow::FieldMatchMode) -> Self
32103212
impl core::clone::Clone for iceberg::writer::file_writer::ParquetWriterBuilder
32113213
pub fn iceberg::writer::file_writer::ParquetWriterBuilder::clone(&self) -> iceberg::writer::file_writer::ParquetWriterBuilder
32123214
impl core::fmt::Debug for iceberg::writer::file_writer::ParquetWriterBuilder

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use parquet::arrow::AsyncArrowWriter;
2828
use parquet::arrow::async_reader::AsyncFileReader;
2929
use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
3030
use parquet::file::metadata::ParquetMetaData;
31-
use parquet::file::properties::WriterProperties;
31+
use parquet::file::properties::{CdcOptions, WriterProperties};
3232
use parquet::file::statistics::Statistics;
3333

3434
use super::{FileWriter, FileWriterBuilder};
@@ -40,7 +40,7 @@ use crate::io::{FileIO, FileWrite, OutputFile};
4040
use crate::spec::{
4141
DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType,
4242
NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
43-
StructType, TableMetadata, Type, visit_schema,
43+
StructType, TableMetadata, TableProperties, Type, visit_schema,
4444
};
4545
use crate::transform::create_transform_function;
4646
use crate::writer::{CurrentFileStatus, DataFile};
@@ -57,6 +57,10 @@ pub struct ParquetWriterBuilder {
5757
impl ParquetWriterBuilder {
5858
/// Create a new `ParquetWriterBuilder`
5959
/// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
60+
///
61+
/// When writing into an existing Iceberg table, prefer
62+
/// [`Self::from_table_properties`], which derives `WriterProperties` from
63+
/// the table's `write.parquet.*` properties.
6064
pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
6165
Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
6266
}
@@ -73,6 +77,37 @@ impl ParquetWriterBuilder {
7377
match_mode,
7478
}
7579
}
80+
81+
/// Build a `ParquetWriterBuilder` from Iceberg table properties and a
82+
/// schema, translating `write.parquet.*` settings into `WriterProperties`
83+
/// instead of using parquet-rs defaults.
84+
///
85+
/// Currently translates the content-defined-chunking keys
86+
/// (`write.parquet.content-defined-chunking.*`); other keys fall back to
87+
/// parquet-rs defaults.
88+
pub fn from_table_properties(table_props: &TableProperties, schema: SchemaRef) -> Self {
89+
let cdc = table_props.cdc_enabled.then_some(CdcOptions {
90+
min_chunk_size: table_props.cdc_min_chunk_size,
91+
max_chunk_size: table_props.cdc_max_chunk_size,
92+
norm_level: table_props.cdc_norm_level,
93+
});
94+
// TODO: translate the remaining write.parquet.* keys (e.g. compression-codec,
95+
// row-group-size-bytes, page-size-bytes).
96+
// This constructor is intended to be the single place that maps them.
97+
let props = WriterProperties::builder()
98+
.set_content_defined_chunking(cdc)
99+
.build();
100+
Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
101+
}
102+
103+
/// Set the field match mode used to map Arrow fields to Iceberg fields.
104+
///
105+
/// Defaults to [`FieldMatchMode::Id`]. Use [`FieldMatchMode::Name`] when the
106+
/// incoming Arrow schema does not carry Iceberg field-id metadata.
107+
pub fn with_match_mode(mut self, match_mode: FieldMatchMode) -> Self {
108+
self.match_mode = match_mode;
109+
self
110+
}
76111
}
77112

78113
impl FileWriterBuilder for ParquetWriterBuilder {
@@ -2279,4 +2314,81 @@ mod tests {
22792314
assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
22802315
assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
22812316
}
2317+
2318+
// -----------------------------------------------------------------
2319+
// ParquetWriterBuilder::from_table_properties
2320+
// -----------------------------------------------------------------
2321+
2322+
fn cdc_test_schema() -> SchemaRef {
2323+
Arc::new(
2324+
Schema::builder()
2325+
.with_schema_id(1)
2326+
.with_fields(vec![
2327+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
2328+
NestedField::required(2, "payload", Type::Primitive(PrimitiveType::String))
2329+
.into(),
2330+
])
2331+
.build()
2332+
.unwrap(),
2333+
)
2334+
}
2335+
2336+
fn table_props(entries: HashMap<String, String>) -> TableProperties {
2337+
TableProperties::try_from(&entries).unwrap()
2338+
}
2339+
2340+
#[test]
2341+
fn test_from_table_properties_no_cdc_by_default() {
2342+
let tp = table_props(HashMap::new());
2343+
let builder = ParquetWriterBuilder::from_table_properties(&tp, cdc_test_schema());
2344+
assert!(builder.props.content_defined_chunking().is_none());
2345+
}
2346+
2347+
#[tokio::test]
2348+
async fn test_from_table_properties_propagate_to_writer() {
2349+
// `build()` must carry the translated `WriterProperties` through to the
2350+
// `ParquetWriter` unchanged — otherwise the `write.parquet.*` settings
2351+
// derived in `from_table_properties` would never reach parquet-rs.
2352+
//
2353+
// Asserting on the writer's `WriterProperties` (rather than re-reading a
2354+
// written file) keeps this a direct propagation check: every future
2355+
// `write.parquet.*` option just adds an assertion on its corresponding
2356+
// `WriterProperties` getter here.
2357+
let tp = table_props(HashMap::from([
2358+
(
2359+
TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
2360+
"true".to_string(),
2361+
),
2362+
(
2363+
TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
2364+
"4096".to_string(),
2365+
),
2366+
(
2367+
TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE.to_string(),
2368+
"8192".to_string(),
2369+
),
2370+
(
2371+
TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
2372+
"2".to_string(),
2373+
),
2374+
]));
2375+
2376+
let tmp = TempDir::new().unwrap();
2377+
let output = FileIO::new_with_fs()
2378+
.new_output(format!("{}/cdc.parquet", tmp.path().to_str().unwrap()))
2379+
.unwrap();
2380+
let writer = ParquetWriterBuilder::from_table_properties(&tp, cdc_test_schema())
2381+
.build(output)
2382+
.await
2383+
.unwrap();
2384+
2385+
let cdc = writer
2386+
.writer_properties
2387+
.content_defined_chunking()
2388+
.copied()
2389+
.expect("CDC should be enabled on the built writer");
2390+
assert_eq!(cdc.min_chunk_size, 4096);
2391+
assert_eq!(cdc.max_chunk_size, 8192);
2392+
assert_eq!(cdc.norm_level, 2);
2393+
}
22822394
}

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use iceberg::writer::file_writer::location_generator::{
4545
};
4646
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
4747
use iceberg::{Error, ErrorKind};
48-
use parquet::file::properties::{CdcOptions, WriterPropertiesBuilder};
4948
use uuid::Uuid;
5049

5150
use crate::physical_plan::DATA_FILES_COL_NAME;
@@ -225,21 +224,14 @@ impl ExecutionPlan for IcebergWriteExec {
225224
)));
226225
}
227226

228-
// Create data file writer builder
229-
let cdc_options = table_props.cdc_enabled.then_some(CdcOptions {
230-
min_chunk_size: table_props.cdc_min_chunk_size,
231-
max_chunk_size: table_props.cdc_max_chunk_size,
232-
norm_level: table_props.cdc_norm_level,
233-
});
234-
let writer_properties = WriterPropertiesBuilder::default()
235-
.set_content_defined_chunking(cdc_options)
236-
.build();
237-
238-
let parquet_file_writer_builder = ParquetWriterBuilder::new_with_match_mode(
239-
writer_properties,
227+
// Build the writer from the already-parsed table properties so it honors
228+
// `write.parquet.*` settings (e.g. CDC). Arrow batches flowing through
229+
// DataFusion carry no field-id metadata, so match fields by name.
230+
let parquet_file_writer_builder = ParquetWriterBuilder::from_table_properties(
231+
&table_props,
240232
self.table.metadata().current_schema().clone(),
241-
FieldMatchMode::Name,
242-
);
233+
)
234+
.with_match_mode(FieldMatchMode::Name);
243235
let target_file_size = table_props.write_target_file_size_bytes;
244236

245237
let file_io = self.table.file_io().clone();

0 commit comments

Comments
 (0)