Skip to content

Commit 8e2254f

Browse files
committed
document schema staleness risk
1 parent 0692a84 commit 8e2254f

2 files changed

Lines changed: 7 additions & 1 deletion

File tree

crates/integrations/datafusion/src/physical_plan/partitioned_scan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl IcebergPartitionedScan {
4444
fn compute_properties(schema: ArrowSchemaRef, n_partitions: usize) -> PlanProperties {
4545
PlanProperties::new(
4646
EquivalenceProperties::new(schema),
47-
Partitioning::UnknownPartitioning(n_partitions.max(1)),
47+
Partitioning::UnknownPartitioning(n_partitions),
4848
EmissionType::Incremental,
4949
Boundedness::Bounded,
5050
)

crates/integrations/datafusion/src/table/partitioned.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ impl IcebergPartitionedTableProvider {
3131
name: impl Into<String>,
3232
) -> Result<Self> {
3333
let table_ident = TableIdent::new(namespace, name.into());
34+
// First load: used only to snapshot the Arrow schema for DataFusion planning.
35+
// A second load_table is issued at scan time to guarantee the freshest snapshot.
3436
let table = catalog.load_table(&table_ident).await?;
3537
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
3638
Ok(Self {
@@ -45,12 +47,16 @@ impl IcebergPartitionedTableProvider {
4547
projection: Option<Vec<usize>>,
4648
filters: Vec<Expr>,
4749
) -> DFResult<IcebergPartitionedScan> {
50+
// Second load: fetch the latest snapshot so scans always reflect current table state.
4851
let table = self
4952
.catalog
5053
.load_table(&self.table_ident)
5154
.await
5255
.map_err(to_datafusion_error)?;
5356

57+
// TODO: schema staleness risk, projection indices are resolved against self.schema,
58+
// which was captured at try_new time. If the table schema evolved between try_new and
59+
// this scan, the column names may be incorrect. This logic is inherited from IcebergTableProvider.
5460
let col_names = projection.as_ref().map(|indices| {
5561
indices
5662
.iter()

0 commit comments

Comments
 (0)