Skip to content

Commit 317e0f0

Browse files
committed
fix: use spill writer's schema instead of the first batch schema for spill files
1 parent 0bf9def commit 317e0f0

2 files changed

Lines changed: 135 additions & 1 deletion

File tree

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use arrow::array::{Array, Decimal128Array, Int64Array, RecordBatch};
21+
use arrow::datatypes::{DataType, Field, Schema};
22+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
23+
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
24+
use datafusion_physical_plan::spill::SpillManager;
25+
use futures::StreamExt;
26+
27+
/// Proves that InProgressSpillFile uses the SpillManager's declared schema for
28+
/// the IPC writer, so readback batches always have the correct schema even when
29+
/// input batches have mismatched nullability.
30+
///
31+
/// Scenario:
32+
/// - SpillManager declares schema with nullable `val`
33+
/// - First appended batch has non-nullable `val` (simulates literal-0 UNION branch)
34+
/// - Second appended batch has nullable `val` with NULLs (simulates table UNION branch)
35+
/// - On readback, both batches must have the nullable schema
36+
#[tokio::test]
37+
async fn test_spill_file_uses_spill_manager_schema() {
38+
let nullable_schema = Arc::new(Schema::new(vec![
39+
Field::new("key", DataType::Int64, false),
40+
Field::new("val", DataType::Decimal128(15, 7), true),
41+
]));
42+
let non_nullable_schema = Arc::new(Schema::new(vec![
43+
Field::new("key", DataType::Int64, false),
44+
Field::new("val", DataType::Decimal128(15, 7), false),
45+
]));
46+
47+
let runtime = Arc::new(RuntimeEnvBuilder::new().build().unwrap());
48+
let metrics_set = ExecutionPlanMetricsSet::new();
49+
let spill_metrics = SpillMetrics::new(&metrics_set, 0);
50+
let spill_manager = Arc::new(SpillManager::new(
51+
runtime,
52+
spill_metrics,
53+
Arc::clone(&nullable_schema),
54+
));
55+
56+
let mut in_progress = spill_manager.create_in_progress_file("test").unwrap();
57+
58+
// First batch: non-nullable val (simulates literal-0 UNION branch)
59+
let non_nullable_batch = RecordBatch::try_new(
60+
Arc::clone(&non_nullable_schema),
61+
vec![
62+
Arc::new(Int64Array::from(vec![1, 2, 3])),
63+
Arc::new(
64+
Decimal128Array::from(vec![0i128; 3])
65+
.with_precision_and_scale(15, 7)
66+
.unwrap(),
67+
),
68+
],
69+
)
70+
.unwrap();
71+
in_progress.append_batch(&non_nullable_batch).unwrap();
72+
73+
// Second batch: nullable val with NULLs (simulates table UNION branch)
74+
let nullable_batch = RecordBatch::try_new(
75+
Arc::clone(&nullable_schema),
76+
vec![
77+
Arc::new(Int64Array::from(vec![4, 5, 6])),
78+
Arc::new(
79+
Decimal128Array::from(vec![
80+
Some(10_000_000i128),
81+
None,
82+
Some(30_000_000i128),
83+
])
84+
.with_precision_and_scale(15, 7)
85+
.unwrap(),
86+
),
87+
],
88+
)
89+
.unwrap();
90+
in_progress.append_batch(&nullable_batch).unwrap();
91+
92+
let spill_file = in_progress.finish().unwrap().unwrap();
93+
94+
// Read back
95+
let mut stream = spill_manager
96+
.read_spill_as_stream(spill_file, None)
97+
.unwrap();
98+
99+
assert!(
100+
stream.schema().field(1).is_nullable(),
101+
"Stream schema should be nullable"
102+
);
103+
104+
let mut batches = vec![];
105+
while let Some(result) = stream.next().await {
106+
batches.push(result.unwrap());
107+
}
108+
assert_eq!(batches.len(), 2);
109+
110+
// Both readback batches must have the SpillManager's nullable schema
111+
for (i, batch) in batches.iter().enumerate() {
112+
assert!(
113+
batch.schema().field(1).is_nullable(),
114+
"Readback batch {i} should have nullable schema from SpillManager"
115+
);
116+
}
117+
118+
// The second batch must preserve its NULL data
119+
let val_col = batches[1]
120+
.column(1)
121+
.as_any()
122+
.downcast_ref::<Decimal128Array>()
123+
.unwrap();
124+
assert_eq!(val_col.null_count(), 1, "Second batch should have 1 null");
125+
126+
// Rebuilding the batch with its own schema must succeed (would fail if
127+
// schema said non-nullable but data contained nulls)
128+
RecordBatch::try_new(batches[1].schema(), batches[1].columns().to_vec())
129+
.expect("Readback batch should be valid: schema should match data nullability");
130+
}

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ impl InProgressSpillFile {
6262
));
6363
}
6464
if self.writer.is_none() {
65-
let schema = batch.schema();
65+
// Use the SpillManager's declared schema rather than the batch's schema.
66+
// Individual batches may have different schemas (e.g., different nullability)
67+
// when they come from different branches of a UnionExec. The SpillManager's
68+
// schema represents the canonical schema that all batches should conform to.
69+
let schema = Arc::clone(self.spill_writer.schema());
6670
if let Some(in_progress_file) = &mut self.in_progress_file {
6771
self.writer = Some(IPCStreamWriter::new(
6872
in_progress_file.path(),

0 commit comments

Comments
 (0)