Skip to content

Commit d428760

Browse files
jordepicJordan Epstein
andauthored
fix: count shared buffers once in hash join build-side memory accounting (#22862)
## Which issue does this PR close? - Closes #22861. ## Rationale for this change When using DataFusion comet I noticed that my hash join operator was failing with the following error: `Failed to acquire 142606336 bytes where 17142251456 bytes already reserved and the fair limit is 17179869184 bytes, 4 registered`. Looking into this more, DataFusion asks to reserve memory for each batch (by default 8192 rows) of the build side of a hash join - and tries to reserve (without actually allocating it) num_batches * batch_size. This is problematic when these are batches are zero-copy slices of a larger batch (e.g. GroupedHashAggregateStream), since the slice size is evaluated to be the size of the larger buffer. This is because the reference to the slice actually keeps the entire buffer from being freed. DataFusion doesn't overallocate memory (the underlying data is the same), but it does over-request it (in the centralized accounting system), which can lead to these "ResourcesExhausted" exceptions. ## What changes are included in this PR? In this change, we keep track of all of the buffers that we've already counted via a set of pointers. This way, we don't redundantly request memory for the whole arrow buffer for each sub-slice of it. We choose this approach as opposed to just requesting a smaller amount of memory per batch, because as mentioned before, the pointer to each batch technically keeps the entire arrow-buffer from being freed. ## Are these changes tested? The new hash join test fails on main with ResourcesExhausted and passes with this change. ## Are there any user-facing changes? No breaking changes. Adds a new public helper count_record_batch_memory_size to datafusion-common. Co-authored-by: Jordan Epstein <jordan.epstein@imc.com>
1 parent cb2542c commit d428760

2 files changed

Lines changed: 140 additions & 16 deletions

File tree

datafusion/common/src/utils/memory.rs

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use crate::error::_exec_datafusion_err;
2121
use crate::{HashSet, Result};
2222
use arrow::array::ArrayData;
2323
use arrow::record_batch::RecordBatch;
24-
use std::{mem::size_of, ptr::NonNull};
24+
use std::mem::size_of;
25+
use std::num::NonZero;
2526

2627
/// Estimates the memory size required for a hash table prior to allocation.
2728
///
@@ -131,34 +132,74 @@ pub fn estimate_memory_size<T>(num_elements: usize, fixed_size: usize) -> Result
131132
/// `Buffer`. This method provides temporary fix until the issue is resolved:
132133
/// <https://github.com/apache/arrow-rs/issues/6439>
133134
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
134-
// Store pointers to `Buffer`'s start memory address (instead of actual
135-
// used data region's pointer represented by current `Array`)
136-
let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
137-
let mut total_size = 0;
138-
139-
for array in batch.columns() {
140-
let array_data = array.to_data();
141-
count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size);
135+
RecordBatchMemoryCounter::new().count_batch(batch)
136+
}
137+
138+
/// Tracks the memory used by a sequence of [`RecordBatch`]es that may share
139+
/// underlying buffers, counting each buffer exactly once.
140+
///
141+
/// Use this instead of [`get_record_batch_memory_size`] to account for the
142+
/// total memory of a sequence of batches, e.g. when buffering the batches of
143+
/// an input stream. Such batches can share buffers (for example, operators
144+
/// like aggregates emit one large batch as multiple zero-copy slices), and
145+
/// calling [`get_record_batch_memory_size`] per batch counts the shared
146+
/// buffers once per batch, while this counter counts them exactly once. A
147+
/// batch's buffers are kept alive by the batch even when only a sub-range is
148+
/// referenced, so counting unique buffers in full reflects the memory the
149+
/// batches actually retain.
150+
#[derive(Debug, Default)]
151+
pub struct RecordBatchMemoryCounter {
152+
/// Start addresses of `Buffer`s that have already been counted (instead of
153+
/// actual used data region's pointer represented by current `Array`)
154+
counted_buffers: HashSet<NonZero<usize>>,
155+
/// Total memory of all unique buffers counted so far
156+
memory_usage: usize,
157+
}
158+
159+
impl RecordBatchMemoryCounter {
160+
pub fn new() -> Self {
161+
Self::default()
142162
}
143163

144-
total_size
164+
/// Count `batch`, returning the memory used by its buffers that have not
165+
/// been counted before.
166+
pub fn count_batch(&mut self, batch: &RecordBatch) -> usize {
167+
let mut total_size = 0;
168+
169+
for array in batch.columns() {
170+
let array_data = array.to_data();
171+
count_array_data_memory_size(
172+
&array_data,
173+
&mut self.counted_buffers,
174+
&mut total_size,
175+
);
176+
}
177+
178+
self.memory_usage += total_size;
179+
total_size
180+
}
181+
182+
/// Total memory of the unique buffers of all batches counted so far.
183+
pub fn memory_usage(&self) -> usize {
184+
self.memory_usage
185+
}
145186
}
146187

147188
/// Count the memory usage of `array_data` and its children recursively.
148189
fn count_array_data_memory_size(
149190
array_data: &ArrayData,
150-
counted_buffers: &mut HashSet<NonNull<u8>>,
191+
counted_buffers: &mut HashSet<NonZero<usize>>,
151192
total_size: &mut usize,
152193
) {
153194
// Count memory usage for `array_data`
154195
for buffer in array_data.buffers() {
155-
if counted_buffers.insert(buffer.data_ptr()) {
196+
if counted_buffers.insert(buffer.data_ptr().addr()) {
156197
*total_size += buffer.capacity();
157198
} // Otherwise the buffer's memory is already counted
158199
}
159200

160201
if let Some(null_buffer) = array_data.nulls()
161-
&& counted_buffers.insert(null_buffer.inner().inner().data_ptr())
202+
&& counted_buffers.insert(null_buffer.inner().inner().data_ptr().addr())
162203
{
163204
*total_size += null_buffer.inner().inner().capacity();
164205
}
@@ -295,6 +336,29 @@ mod record_batch_tests {
295336
assert_eq!(size_origin, size_sliced);
296337
}
297338

339+
#[test]
340+
fn test_record_batch_memory_counter_buffer_shared_across_batches() {
341+
let schema = Arc::new(Schema::new(vec![Field::new(
342+
"ints",
343+
DataType::Int32,
344+
false,
345+
)]));
346+
347+
let int_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
348+
let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap();
349+
let slices = [batch.slice(0, 2), batch.slice(2, 2), batch.slice(4, 2)];
350+
351+
// Counting each slice individually counts the shared buffer once per slice
352+
let summed: usize = slices.iter().map(get_record_batch_memory_size).sum();
353+
assert_eq!(summed, 3 * get_record_batch_memory_size(&batch));
354+
355+
// A counter shared across the batches counts it exactly once
356+
let mut counter = RecordBatchMemoryCounter::new();
357+
let deduped: usize = slices.iter().map(|slice| counter.count_batch(slice)).sum();
358+
assert_eq!(deduped, get_record_batch_memory_size(&batch));
359+
assert_eq!(counter.memory_usage(), get_record_batch_memory_size(&batch));
360+
}
361+
298362
#[test]
299363
fn test_get_record_batch_memory_size_nested_array() {
300364
let schema = Arc::new(Schema::new(vec![

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use crate::projection::{
5252
try_pushdown_through_join,
5353
};
5454
use crate::repartition::REPARTITION_RANDOM_STATE;
55-
use crate::spill::get_record_batch_memory_size;
5655
use crate::{
5756
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
5857
PlanProperties, SendableRecordBatchStream, Statistics,
@@ -72,7 +71,7 @@ use arrow::record_batch::RecordBatch;
7271
use arrow::util::bit_util;
7372
use arrow_schema::{DataType, Schema};
7473
use datafusion_common::config::ConfigOptions;
75-
use datafusion_common::utils::memory::estimate_memory_size;
74+
use datafusion_common::utils::memory::{RecordBatchMemoryCounter, estimate_memory_size};
7675
use datafusion_common::{
7776
JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err,
7877
plan_err, project_schema,
@@ -1817,6 +1816,10 @@ struct BuildSideState {
18171816
metrics: BuildProbeJoinMetrics,
18181817
reservation: MemoryReservation,
18191818
bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
1819+
/// Counts the memory of `batches` for `reservation`. Batches can share
1820+
/// underlying buffers (e.g. when the input emits zero-copy slices of one
1821+
/// larger batch), so each buffer must be reserved only once.
1822+
memory_counter: RecordBatchMemoryCounter,
18201823
}
18211824

18221825
impl BuildSideState {
@@ -1833,6 +1836,7 @@ impl BuildSideState {
18331836
num_rows: 0,
18341837
metrics,
18351838
reservation,
1839+
memory_counter: RecordBatchMemoryCounter::new(),
18361840
bounds_accumulators: should_compute_dynamic_filters
18371841
.then(|| {
18381842
on_left
@@ -1923,7 +1927,7 @@ async fn collect_left_input(
19231927
}
19241928

19251929
// Decide if we spill or not
1926-
let batch_size = get_record_batch_memory_size(&batch);
1930+
let batch_size = state.memory_counter.count_batch(&batch);
19271931
// Reserve memory for incoming batch
19281932
state.reservation.try_grow(batch_size)?;
19291933
// Update metrics
@@ -1945,6 +1949,7 @@ async fn collect_left_input(
19451949
metrics,
19461950
mut reservation,
19471951
bounds_accumulators,
1952+
memory_counter: _,
19481953
} = state;
19491954

19501955
// Compute bounds
@@ -5369,6 +5374,61 @@ mod tests {
53695374
Ok(())
53705375
}
53715376

5377+
#[tokio::test]
5378+
async fn build_side_sliced_batches_memory_accounting() -> Result<()> {
5379+
// The build side emits zero-copy slices of one large batch, as e.g. an
5380+
// aggregate emitting its output in batch_size chunks does. The buffers
5381+
// shared by the slices must be reserved once in total, not once per
5382+
// slice: per-slice accounting reserves number_of_slices x parent size
5383+
// and aborts queries that fit in memory with room to spare.
5384+
let n = 4096;
5385+
let v: Vec<i32> = (0..n).collect();
5386+
let parent = build_table_i32(("a1", &v), ("b1", &v), ("c1", &v));
5387+
let slices: Vec<RecordBatch> =
5388+
(0..16).map(|i| parent.slice(i * 256, 256)).collect();
5389+
let left =
5390+
TestMemoryExec::try_new_exec(&[slices], parent.schema(), None).unwrap();
5391+
5392+
let right_batch = build_table_i32(
5393+
("a2", &vec![10, 11]),
5394+
("b2", &vec![0, 1]),
5395+
("c2", &vec![14, 15]),
5396+
);
5397+
let right = TestMemoryExec::try_new_exec(
5398+
&[vec![right_batch.clone()]],
5399+
right_batch.schema(),
5400+
None,
5401+
)
5402+
.unwrap();
5403+
let on = vec![(
5404+
Arc::new(Column::new_with_schema("b1", &parent.schema())?) as _,
5405+
Arc::new(Column::new_with_schema("b2", &right_batch.schema())?) as _,
5406+
)];
5407+
5408+
// Enough for the parent batch (~48KB) plus the join hash table, but far
5409+
// below the ~768KB that per-slice accounting would reserve
5410+
let runtime = RuntimeEnvBuilder::new()
5411+
.with_memory_limit(400_000, 1.0)
5412+
.build_arc()?;
5413+
let task_ctx = TaskContext::default().with_runtime(runtime);
5414+
let task_ctx = Arc::new(task_ctx);
5415+
5416+
let join = join(
5417+
left,
5418+
right,
5419+
on,
5420+
&JoinType::Inner,
5421+
NullEquality::NullEqualsNothing,
5422+
)?;
5423+
5424+
let stream = join.execute(0, task_ctx)?;
5425+
let batches = common::collect(stream).await?;
5426+
let num_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
5427+
assert_eq!(num_rows, 2);
5428+
5429+
Ok(())
5430+
}
5431+
53725432
#[tokio::test]
53735433
async fn partitioned_join_overallocation() -> Result<()> {
53745434
// Prepare partitioned inputs for HashJoinExec

0 commit comments

Comments
 (0)