Skip to content

Commit 75af7db

Browse files
feat: Expose used MemoryPool in ResourcesExhausted error messages
1 parent 53b0ffb commit 75af7db

File tree

10 files changed

+78
-50
lines changed

10 files changed

+78
-50
lines changed

datafusion-cli/tests/cli_integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,11 @@ fn test_cli_top_memory_consumers<'a>(
228228
"Consumer(can spill: bool) consumed XB, peak XB",
229229
);
230230
settings.add_filter(
231-
r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
231+
r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total '.*?' pool",
232232
"Error: Failed to allocate ",
233233
);
234234
settings.add_filter(
235-
r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
235+
r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total '.*?' pool",
236236
"Resources exhausted: Failed to allocate",
237237
);
238238

datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ exit_code: 1
1616
[CLI_VERSION]
1717
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
1818
caused by
19-
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
19+
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) using 'greedy' pool as:
2020
Consumer(can spill: bool) consumed XB, peak XB,
2121
Consumer(can spill: bool) consumed XB, peak XB.
2222
Error: Failed to allocate

datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ exit_code: 1
1414
[CLI_VERSION]
1515
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
1616
caused by
17-
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
17+
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) using 'greedy' pool as:
1818
Consumer(can spill: bool) consumed XB, peak XB,
1919
Consumer(can spill: bool) consumed XB, peak XB,
2020
Consumer(can spill: bool) consumed XB, peak XB.

datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,14 @@ async fn automatic_usage_example() -> Result<()> {
113113
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit',
114114
or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
115115
caused by
116-
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
116+
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) using 'greedy' pool as:
117117
ExternalSorterMerge[3]#112(can spill: false) consumed 10.0 MB, peak 10.0 MB,
118118
ExternalSorterMerge[10]#147(can spill: false) consumed 10.0 MB, peak 10.0 MB,
119119
ExternalSorter[1]#93(can spill: true) consumed 69.0 KB, peak 69.0 KB,
120120
ExternalSorter[13]#155(can spill: true) consumed 67.6 KB, peak 67.6 KB,
121121
ExternalSorter[8]#140(can spill: true) consumed 67.2 KB, peak 67.2 KB.
122-
Error: Failed to allocate additional 10.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated for this reservation - 7.1 MB remain available for the total pool
122+
Error: Failed to allocate additional 10.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated
123+
for this reservation - 7.1 MB remain available for the total 'greedy' pool
123124
*/
124125
}
125126
}

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ async fn group_by_none() {
8787
.with_query("select median(request_bytes) from t")
8888
.with_expected_errors(vec![
8989
"Resources exhausted: Additional allocation failed",
90-
"with top memory consumers (across reservations) as:\n AggregateStream",
90+
"using GreedyMemoryPool with top memory consumers (across reservations) using 'greedy' pool as:\n AggregateStream",
9191
])
9292
.with_memory_limit(2_000)
9393
.run()
@@ -99,7 +99,7 @@ async fn group_by_row_hash() {
9999
TestCase::new()
100100
.with_query("select count(*) from t GROUP BY response_bytes")
101101
.with_expected_errors(vec![
102-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
102+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n GroupedHashAggregateStream"
103103
])
104104
.with_memory_limit(2_000)
105105
.run()
@@ -112,7 +112,7 @@ async fn group_by_hash() {
112112
// group by dict column
113113
.with_query("select count(*) from t GROUP BY service, host, pod, container")
114114
.with_expected_errors(vec![
115-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
115+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n GroupedHashAggregateStream"
116116
])
117117
.with_memory_limit(1_000)
118118
.run()
@@ -126,7 +126,7 @@ async fn join_by_key_multiple_partitions() {
126126
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
127127
.with_expected_errors(vec![
128128
"Resources exhausted: Additional allocation failed",
129-
"with top memory consumers (across reservations) as:\n HashJoinInput",
129+
"with top memory consumers (across reservations) using 'greedy' pool as:\n HashJoinInput",
130130
])
131131
.with_memory_limit(1_000)
132132
.with_config(config)
@@ -141,7 +141,7 @@ async fn join_by_key_single_partition() {
141141
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
142142
.with_expected_errors(vec![
143143
"Resources exhausted: Additional allocation failed",
144-
"with top memory consumers (across reservations) as:\n HashJoinInput",
144+
"with top memory consumers (across reservations) using 'greedy' pool as:\n HashJoinInput",
145145
])
146146
.with_memory_limit(1_000)
147147
.with_config(config)
@@ -154,7 +154,7 @@ async fn join_by_expression() {
154154
TestCase::new()
155155
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service")
156156
.with_expected_errors(vec![
157-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]",
157+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n NestedLoopJoinLoad[0]",
158158
])
159159
.with_memory_limit(1_000)
160160
.run()
@@ -167,7 +167,7 @@ async fn cross_join() {
167167
.with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2")
168168
.with_expected_errors(vec![
169169
"Resources exhausted: Additional allocation failed",
170-
"with top memory consumers (across reservations) as:\n CrossJoinExec",
170+
"with top memory consumers (across reservations) using 'greedy' pool as:\n CrossJoinExec",
171171
])
172172
.with_memory_limit(1_000)
173173
.run()
@@ -223,7 +223,7 @@ async fn symmetric_hash_join() {
223223
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
224224
)
225225
.with_expected_errors(vec![
226-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SymmetricHashJoinStream",
226+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n SymmetricHashJoinStream",
227227
])
228228
.with_memory_limit(1_000)
229229
.with_scenario(Scenario::AccessLogStreaming)
@@ -241,7 +241,7 @@ async fn sort_preserving_merge() {
241241
// so only a merge is needed
242242
.with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10")
243243
.with_expected_errors(vec![
244-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SortPreservingMergeExec",
244+
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n SortPreservingMergeExec",
245245
])
246246
// provide insufficient memory to merge
247247
.with_memory_limit(partition_size / 2)
@@ -321,7 +321,7 @@ async fn sort_spill_reservation() {
321321
test.clone()
322322
.with_expected_errors(vec![
323323
"Resources exhausted: Additional allocation failed",
324-
"with top memory consumers (across reservations) as:",
324+
"with top memory consumers (across reservations) using 'greedy' pool as:",
325325
"B for ExternalSorterMerge",
326326
])
327327
.with_config(config)
@@ -352,7 +352,7 @@ async fn oom_recursive_cte() {
352352
)
353353
.with_expected_errors(vec![
354354
"Resources exhausted: Additional allocation failed",
355-
"with top memory consumers (across reservations) as:\n RecursiveQuery",
355+
"with top memory consumers (across reservations) using 'greedy' pool as:\n RecursiveQuery",
356356
])
357357
.with_memory_limit(2_000)
358358
.run()
@@ -404,7 +404,7 @@ async fn oom_with_tracked_consumer_pool() {
404404
.with_expected_errors(vec![
405405
"Failed to allocate additional",
406406
"for ParquetSink(ArrowColumnWriter)",
407-
"Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)"
407+
"Additional allocation failed", "with top memory consumers (across reservations) using 'greedy' pool as:\n ParquetSink(ArrowColumnWriter)"
408408
])
409409
.with_memory_pool(Arc::new(
410410
TrackConsumersPool::new(

datafusion/execution/src/memory_pool/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ pub use pool::*;
178178
/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
179179
/// providing better error messages on the largest memory users.
180180
pub trait MemoryPool: Send + Sync + std::fmt::Debug {
181+
/// Return pool name
182+
fn name(&self) -> &str;
183+
181184
/// Registers a new [`MemoryConsumer`]
182185
///
183186
/// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory

0 commit comments

Comments
 (0)