Skip to content

Commit 998f89c

Browse files
committed
fix: use spill writer's schema instead of the first batch schema for spill files
1 parent 0bf9def commit 998f89c

File tree

2 files changed

+291
-1
lines changed

2 files changed

+291
-1
lines changed
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use arrow::array::{Array, Decimal128Array, Int64Array, RecordBatch};
21+
use arrow::datatypes::{DataType, Field, Schema};
22+
use datafusion::datasource::memory::MemorySourceConfig;
23+
use datafusion_execution::config::SessionConfig;
24+
use datafusion_execution::memory_pool::FairSpillPool;
25+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
26+
use datafusion_physical_plan::repartition::RepartitionExec;
27+
use datafusion_physical_plan::union::UnionExec;
28+
use datafusion_physical_plan::{ExecutionPlan, Partitioning};
29+
use futures::StreamExt;
30+
31+
/// Exercises spilling through UnionExec -> RepartitionExec where union children
32+
/// have mismatched nullability (one child's `val` is non-nullable, the other's
33+
/// is nullable with NULLs). A tiny FairSpillPool forces all batches to spill.
34+
///
35+
/// UnionExec returns child streams without schema coercion, so batches from
36+
/// different children carry different per-field nullability into the shared
37+
/// SpillPool. The IPC writer must use the SpillManager's canonical (nullable)
38+
/// schema — not the first batch's schema — so readback batches are valid.
39+
#[tokio::test]
40+
async fn test_union_repartition_spill_mixed_nullability() {
41+
// Schema where `val` is NOT nullable (simulates literal-projection UNION branch)
42+
let non_nullable_schema = Arc::new(Schema::new(vec![
43+
Field::new("key", DataType::Int64, false),
44+
Field::new("val", DataType::Decimal128(15, 7), false),
45+
]));
46+
47+
// Schema where `val` IS nullable (simulates table column-projection UNION branch with NULLs)
48+
let nullable_schema = Arc::new(Schema::new(vec![
49+
Field::new("key", DataType::Int64, false),
50+
Field::new("val", DataType::Decimal128(15, 7), true),
51+
]));
52+
53+
// Generate many batches (4000 rows total) to ensure reliable spilling and coverage.
54+
let num_batches: usize = 200;
55+
let rows_per_batch: usize = 10;
56+
57+
let make_batches =
58+
|schema: &Arc<Schema>, vals: Decimal128Array| -> Vec<RecordBatch> {
59+
(0..num_batches)
60+
.map(|i| {
61+
let start = (i * rows_per_batch) as i64;
62+
let keys: Vec<i64> = (start..start + rows_per_batch as i64).collect();
63+
RecordBatch::try_new(
64+
Arc::clone(schema),
65+
vec![Arc::new(Int64Array::from(keys)), Arc::new(vals.clone())],
66+
)
67+
.unwrap()
68+
})
69+
.collect()
70+
};
71+
72+
let non_nullable_batches = make_batches(
73+
&non_nullable_schema,
74+
Decimal128Array::from(vec![0i128; rows_per_batch])
75+
.with_precision_and_scale(15, 7)
76+
.unwrap(),
77+
);
78+
let nullable_batches = make_batches(
79+
&nullable_schema,
80+
Decimal128Array::from(
81+
(0..rows_per_batch)
82+
.map(|j| {
83+
if j % 3 == 1 {
84+
None
85+
} else {
86+
Some(j as i128 * 10_000_000)
87+
}
88+
})
89+
.collect::<Vec<_>>(),
90+
)
91+
.with_precision_and_scale(15, 7)
92+
.unwrap(),
93+
);
94+
95+
// Create MemoryExec plans (1 partition each)
96+
let non_nullable_exec = MemorySourceConfig::try_new_exec(
97+
&[non_nullable_batches],
98+
Arc::clone(&non_nullable_schema),
99+
None,
100+
)
101+
.unwrap();
102+
103+
let nullable_exec = MemorySourceConfig::try_new_exec(
104+
&[nullable_batches],
105+
Arc::clone(&nullable_schema),
106+
None,
107+
)
108+
.unwrap();
109+
110+
// UnionExec: declared schema coerces val to nullable
111+
let union_exec = UnionExec::try_new(vec![non_nullable_exec, nullable_exec]).unwrap();
112+
113+
assert!(
114+
union_exec.schema().field(1).is_nullable(),
115+
"UnionExec schema should declare val as nullable"
116+
);
117+
118+
// RepartitionExec: round-robin to 1 output partition.
119+
// This ensures all batches from both children flow to the same output,
120+
// sharing the same SpillPool and InProgressSpillFile.
121+
let repartition =
122+
RepartitionExec::try_new(union_exec, Partitioning::RoundRobinBatch(1)).unwrap();
123+
124+
// Use a tiny memory pool to force most batches to spill.
125+
let session_config = SessionConfig::new().with_batch_size(2);
126+
let runtime = RuntimeEnvBuilder::new()
127+
.with_memory_pool(Arc::new(FairSpillPool::new(200)))
128+
.build_arc()
129+
.unwrap();
130+
let task_ctx = Arc::new(
131+
datafusion_execution::TaskContext::default()
132+
.with_session_config(session_config)
133+
.with_runtime(runtime),
134+
);
135+
136+
// Execute partition 0 (the only output partition)
137+
let mut stream = repartition.execute(0, task_ctx).unwrap();
138+
139+
let mut total_rows = 0usize;
140+
let mut total_nulls = 0usize;
141+
let mut batch_idx = 0usize;
142+
while let Some(result) = stream.next().await {
143+
let batch = result.unwrap();
144+
145+
RecordBatch::try_new(batch.schema(), batch.columns().to_vec()).unwrap_or_else(
146+
|e| {
147+
panic!(
148+
"Output batch {batch_idx} is invalid (schema says non-nullable \
149+
but data has nulls): {e}"
150+
)
151+
},
152+
);
153+
154+
total_rows += batch.num_rows();
155+
total_nulls += batch.column(1).null_count();
156+
batch_idx += 1;
157+
}
158+
159+
assert_eq!(
160+
total_rows,
161+
num_batches * rows_per_batch * 2,
162+
"All rows from both UNION branches should be present"
163+
);
164+
assert!(
165+
total_nulls > 0,
166+
"Expected some null values in output (i.e. nullable batches were processed)"
167+
);
168+
}

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ impl InProgressSpillFile {
6262
));
6363
}
6464
if self.writer.is_none() {
65-
let schema = batch.schema();
65+
// Use the SpillManager's declared schema rather than the batch's schema.
66+
// Individual batches may have different schemas (e.g., different nullability)
67+
// when they come from different branches of a UnionExec. The SpillManager's
68+
// schema represents the canonical schema that all batches should conform to.
69+
let schema = Arc::clone(self.spill_writer.schema());
6670
if let Some(in_progress_file) = &mut self.in_progress_file {
6771
self.writer = Some(IPCStreamWriter::new(
6872
in_progress_file.path(),
@@ -138,3 +142,121 @@ impl InProgressSpillFile {
138142
Ok(self.in_progress_file.take())
139143
}
140144
}
145+
146+
#[cfg(test)]
147+
mod tests {
148+
use super::*;
149+
use arrow::array::{Array, Decimal128Array, Int64Array};
150+
use arrow_schema::{DataType, Field, Schema};
151+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
152+
use datafusion_physical_expr_common::metrics::{
153+
ExecutionPlanMetricsSet, SpillMetrics,
154+
};
155+
use futures::StreamExt;
156+
157+
/// Unit-level test: proves that InProgressSpillFile uses the SpillManager's
158+
/// declared schema for the IPC writer, so readback batches always have the
159+
/// correct schema even when input batches have mismatched nullability.
160+
///
161+
/// Scenario:
162+
/// - SpillManager declares schema with nullable `val`
163+
/// - First appended batch has non-nullable `val` (simulates literal-projection UNION branch)
164+
/// - Second appended batch has nullable `val` with NULLs (simulates table UNION branch)
165+
/// - On readback, both batches must have the nullable schema
166+
#[tokio::test]
167+
async fn test_spill_file_uses_spill_manager_schema() {
168+
let nullable_schema = Arc::new(Schema::new(vec![
169+
Field::new("key", DataType::Int64, false),
170+
Field::new("val", DataType::Decimal128(15, 7), true),
171+
]));
172+
let non_nullable_schema = Arc::new(Schema::new(vec![
173+
Field::new("key", DataType::Int64, false),
174+
Field::new("val", DataType::Decimal128(15, 7), false),
175+
]));
176+
177+
let runtime = Arc::new(RuntimeEnvBuilder::new().build().unwrap());
178+
let metrics_set = ExecutionPlanMetricsSet::new();
179+
let spill_metrics = SpillMetrics::new(&metrics_set, 0);
180+
let spill_manager = Arc::new(SpillManager::new(
181+
runtime,
182+
spill_metrics,
183+
Arc::clone(&nullable_schema),
184+
));
185+
186+
let mut in_progress = spill_manager.create_in_progress_file("test").unwrap();
187+
188+
// First batch: non-nullable val (simulates literal-0 UNION branch)
189+
let non_nullable_batch = RecordBatch::try_new(
190+
Arc::clone(&non_nullable_schema),
191+
vec![
192+
Arc::new(Int64Array::from(vec![1, 2, 3])),
193+
Arc::new(
194+
Decimal128Array::from(vec![0i128; 3])
195+
.with_precision_and_scale(15, 7)
196+
.unwrap(),
197+
),
198+
],
199+
)
200+
.unwrap();
201+
in_progress.append_batch(&non_nullable_batch).unwrap();
202+
203+
// Second batch: nullable val with NULLs (simulates table UNION branch)
204+
let nullable_batch = RecordBatch::try_new(
205+
Arc::clone(&nullable_schema),
206+
vec![
207+
Arc::new(Int64Array::from(vec![4, 5, 6])),
208+
Arc::new(
209+
Decimal128Array::from(vec![
210+
Some(10_000_000i128),
211+
None,
212+
Some(30_000_000i128),
213+
])
214+
.with_precision_and_scale(15, 7)
215+
.unwrap(),
216+
),
217+
],
218+
)
219+
.unwrap();
220+
in_progress.append_batch(&nullable_batch).unwrap();
221+
222+
let spill_file = in_progress.finish().unwrap().unwrap();
223+
224+
// Read back
225+
let mut stream = spill_manager
226+
.read_spill_as_stream(spill_file, None)
227+
.unwrap();
228+
229+
assert!(
230+
stream.schema().field(1).is_nullable(),
231+
"Stream schema should be nullable"
232+
);
233+
234+
let mut batches = vec![];
235+
while let Some(result) = stream.next().await {
236+
batches.push(result.unwrap());
237+
}
238+
assert_eq!(batches.len(), 2);
239+
240+
// Both readback batches must have the SpillManager's nullable schema
241+
for (i, batch) in batches.iter().enumerate() {
242+
assert!(
243+
batch.schema().field(1).is_nullable(),
244+
"Readback batch {i} should have nullable schema from SpillManager"
245+
);
246+
}
247+
248+
// The second batch must preserve its NULL data
249+
let val_col = batches[1]
250+
.column(1)
251+
.as_any()
252+
.downcast_ref::<Decimal128Array>()
253+
.unwrap();
254+
assert_eq!(val_col.null_count(), 1, "Second batch should have 1 null");
255+
256+
// Rebuilding the batch with its own schema must succeed (would fail if
257+
// schema said non-nullable but data contained nulls)
258+
RecordBatch::try_new(batches[1].schema(), batches[1].columns().to_vec()).expect(
259+
"Readback batch should be valid: schema should match data nullability",
260+
);
261+
}
262+
}

0 commit comments

Comments
 (0)