Skip to content

Commit 0f5ab15

Browse files
committed
add memory_limit to expose the info in MemoryPool.
1 parent 4510f29 commit 0f5ab15

3 files changed

Lines changed: 35 additions & 16 deletions

File tree

datafusion/execution/src/memory_pool/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,21 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
141141

142142
/// Return the total amount of memory reserved
143143
fn reserved(&self) -> usize;
144+
145+
/// Return the memory limit of the pool
146+
///
147+
/// It is useful to know the limit of the pool sometime (e.g. we can know
148+
/// if spilling will be triggered, it only we be triggered when the memory
149+
/// limit isn't infinite)
150+
fn memory_limit(&self) -> MemoryLimit {
151+
MemoryLimit::Unknown
152+
}
153+
}
154+
155+
pub enum MemoryLimit {
156+
Infinite,
157+
Finite(usize),
158+
Unknown,
144159
}
145160

146161
/// A memory consumer is a named allocation traced by a particular

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
18+
use crate::memory_pool::{MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation};
1919
use datafusion_common::HashMap;
2020
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
2121
use log::debug;
@@ -48,6 +48,10 @@ impl MemoryPool for UnboundedMemoryPool {
4848
fn reserved(&self) -> usize {
4949
self.used.load(Ordering::Relaxed)
5050
}
51+
52+
fn memory_limit(&self) -> MemoryLimit {
53+
MemoryLimit::Infinite
54+
}
5155
}
5256

5357
/// A [`MemoryPool`] that implements a greedy first-come first-serve limit.
@@ -100,6 +104,10 @@ impl MemoryPool for GreedyMemoryPool {
100104
fn reserved(&self) -> usize {
101105
self.used.load(Ordering::Relaxed)
102106
}
107+
108+
fn memory_limit(&self) -> MemoryLimit {
109+
MemoryLimit::Finite(self.pool_size)
110+
}
103111
}
104112

105113
/// A [`MemoryPool`] that prevents spillable reservations from using more than
@@ -233,6 +241,10 @@ impl MemoryPool for FairSpillPool {
233241
let state = self.state.lock();
234242
state.spillable + state.unspillable
235243
}
244+
245+
fn memory_limit(&self) -> MemoryLimit {
246+
MemoryLimit::Finite(self.pool_size)
247+
}
236248
}
237249

238250
/// Constructs a resources error based upon the individual [`MemoryReservation`].
@@ -408,6 +420,10 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
408420
fn reserved(&self) -> usize {
409421
self.inner.reserved()
410422
}
423+
424+
fn memory_limit(&self) -> MemoryLimit {
425+
self.inner.memory_limit()
426+
}
411427
}
412428

413429
fn provide_top_memory_consumers_to_error_msg(

datafusion/execution/src/runtime_env.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -253,24 +253,12 @@ impl RuntimeEnvBuilder {
253253
cache_manager,
254254
object_store_registry,
255255
} = self;
256-
257-
// If `memory_pool` is not set, it represents spilling is disabled.
258-
// Because `UnboundedMemoryPool` will be used in this case, and
259-
// it will never limit memory usage.
260-
// And we disable the `disk_manager`(use `DiskManagerConfig::Disabled`)
261-
// to tell we have disabled spilling.
262-
let (memory_pool, disk_manager_config) = if let Some(pool) = memory_pool {
263-
(pool, disk_manager)
264-
} else {
265-
(
266-
Arc::new(UnboundedMemoryPool::default()) as _,
267-
DiskManagerConfig::Disabled,
268-
)
269-
};
256+
let memory_pool =
257+
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
270258

271259
Ok(RuntimeEnv {
272260
memory_pool,
273-
disk_manager: DiskManager::try_new(disk_manager_config)?,
261+
disk_manager: DiskManager::try_new(disk_manager)?,
274262
cache_manager: CacheManager::try_new(&cache_manager)?,
275263
object_store_registry,
276264
})

0 commit comments

Comments
 (0)