Skip to content

Commit fec1dc6

Browse files
committed
fix: use spill writer's schema instead of the first batch schema for spill files
1 parent 0bf9def commit fec1dc6

File tree

2 files changed

+132
-1
lines changed

2 files changed

+132
-1
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Regression test for: InProgressSpillFile::append_batch() used batch.schema()
2+
// to initialize the IPC writer, meaning the first spilled batch's schema
3+
// determined the IPC file schema. When UnionExec returns child streams
4+
// directly (without coercing batch schemas to the union's declared schema),
5+
// batches from different UNION branches can have different nullability.
6+
// If a non-nullable batch is the first to be spilled, the IPC file declares
7+
// the column as non-nullable. Subsequent batches with NULLs spilled to the
8+
// same file then get read back with a non-nullable schema but contain null
9+
// values — an invalid state that causes downstream RecordBatch::try_new to
10+
// fail with "Column is declared as non-nullable but contains null values".
11+
//
12+
// The fix: use the SpillManager's declared schema (which represents the
13+
// canonical schema from the operator) instead of the first batch's schema.
14+
15+
use std::sync::Arc;
16+
17+
use arrow::array::{Array, Decimal128Array, Int64Array, RecordBatch};
18+
use arrow::datatypes::{DataType, Field, Schema};
19+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
20+
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
21+
use datafusion_physical_plan::spill::SpillManager;
22+
use futures::StreamExt;
23+
24+
/// Proves that InProgressSpillFile uses the SpillManager's declared schema for
25+
/// the IPC writer, so readback batches always have the correct schema even when
26+
/// input batches have mismatched nullability.
27+
///
28+
/// Scenario:
29+
/// - SpillManager declares schema with nullable `val`
30+
/// - First appended batch has non-nullable `val` (simulates literal-0 UNION branch)
31+
/// - Second appended batch has nullable `val` with NULLs (simulates table UNION branch)
32+
/// - On readback, both batches must have the nullable schema
33+
#[tokio::test]
34+
async fn test_spill_file_uses_spill_manager_schema() {
35+
let nullable_schema = Arc::new(Schema::new(vec![
36+
Field::new("key", DataType::Int64, false),
37+
Field::new("val", DataType::Decimal128(15, 7), true),
38+
]));
39+
let non_nullable_schema = Arc::new(Schema::new(vec![
40+
Field::new("key", DataType::Int64, false),
41+
Field::new("val", DataType::Decimal128(15, 7), false),
42+
]));
43+
44+
let runtime = Arc::new(RuntimeEnvBuilder::new().build().unwrap());
45+
let metrics_set = ExecutionPlanMetricsSet::new();
46+
let spill_metrics = SpillMetrics::new(&metrics_set, 0);
47+
let spill_manager = Arc::new(SpillManager::new(
48+
runtime,
49+
spill_metrics,
50+
Arc::clone(&nullable_schema),
51+
));
52+
53+
let mut in_progress = spill_manager.create_in_progress_file("test").unwrap();
54+
55+
// First batch: non-nullable val (simulates literal-0 UNION branch)
56+
let non_nullable_batch = RecordBatch::try_new(
57+
Arc::clone(&non_nullable_schema),
58+
vec![
59+
Arc::new(Int64Array::from(vec![1, 2, 3])),
60+
Arc::new(
61+
Decimal128Array::from(vec![0i128; 3])
62+
.with_precision_and_scale(15, 7)
63+
.unwrap(),
64+
),
65+
],
66+
)
67+
.unwrap();
68+
in_progress.append_batch(&non_nullable_batch).unwrap();
69+
70+
// Second batch: nullable val with NULLs (simulates table UNION branch)
71+
let nullable_batch = RecordBatch::try_new(
72+
Arc::clone(&nullable_schema),
73+
vec![
74+
Arc::new(Int64Array::from(vec![4, 5, 6])),
75+
Arc::new(
76+
Decimal128Array::from(vec![
77+
Some(10_000_000i128),
78+
None,
79+
Some(30_000_000i128),
80+
])
81+
.with_precision_and_scale(15, 7)
82+
.unwrap(),
83+
),
84+
],
85+
)
86+
.unwrap();
87+
in_progress.append_batch(&nullable_batch).unwrap();
88+
89+
let spill_file = in_progress.finish().unwrap().unwrap();
90+
91+
// Read back
92+
let mut stream = spill_manager
93+
.read_spill_as_stream(spill_file, None)
94+
.unwrap();
95+
96+
assert!(
97+
stream.schema().field(1).is_nullable(),
98+
"Stream schema should be nullable"
99+
);
100+
101+
let mut batches = vec![];
102+
while let Some(result) = stream.next().await {
103+
batches.push(result.unwrap());
104+
}
105+
assert_eq!(batches.len(), 2);
106+
107+
// Both readback batches must have the SpillManager's nullable schema
108+
for (i, batch) in batches.iter().enumerate() {
109+
assert!(
110+
batch.schema().field(1).is_nullable(),
111+
"Readback batch {i} should have nullable schema from SpillManager"
112+
);
113+
}
114+
115+
// The second batch must preserve its NULL data
116+
let val_col = batches[1]
117+
.column(1)
118+
.as_any()
119+
.downcast_ref::<Decimal128Array>()
120+
.unwrap();
121+
assert_eq!(val_col.null_count(), 1, "Second batch should have 1 null");
122+
123+
// Rebuilding the batch with its own schema must succeed (would fail if
124+
// schema said non-nullable but data contained nulls)
125+
RecordBatch::try_new(batches[1].schema(), batches[1].columns().to_vec())
126+
.expect("Readback batch should be valid: schema should match data nullability");
127+
}

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ impl InProgressSpillFile {
6262
));
6363
}
6464
if self.writer.is_none() {
65-
let schema = batch.schema();
65+
// Use the SpillManager's declared schema rather than the batch's schema.
66+
// Individual batches may have different schemas (e.g., different nullability)
67+
// when they come from different branches of a UnionExec. The SpillManager's
68+
// schema represents the canonical schema that all batches should conform to.
69+
let schema = Arc::clone(self.spill_writer.schema());
6670
if let Some(in_progress_file) = &mut self.in_progress_file {
6771
self.writer = Some(IPCStreamWriter::new(
6872
in_progress_file.path(),

0 commit comments

Comments
 (0)