Skip to content

Commit c428701

Browse files
committed
Native allocator follow-up: dynamic settings, query/datafusion pools, plugin nodeStats — review-feedback edition
Builds on #21703 to add the query, datafusion with allocator, dynamic-tuning, stats, and pool-wiring pieces. This commit is the consolidated version after addressing review feedback on PR #21732. ## Architecture: three native-memory trackers, three knobs Each tracker owns the bytes it can actually see, with a process-level cap above all of them. On a 64 GB / 16 GB-heap node the defaults compose to: RAM 64 GB JVM heap (operator-configured -Xmx) 16 GB Off-heap (RAM - heap) 48 GB node.native_memory.limit (79% of off-heap) 37.92 GB ← AC throttle threshold native.allocator.root.limit (20% of NM) 7.58 GB ← Arrow framework cap pool.flight.max (5% of NM) 1.90 GB ← Flight transport pool pool.ingest.max (8% of NM) 3.03 GB ← Parquet VSR pool pool.query.max (5% of NM) 1.90 GB ← analytics-engine pool datafusion.memory_pool_limit (75% of NM) 28.44 GB ← Rust runtime pool (sibling to Arrow) Unmanaged 10.08 GB ← Lucene mmap, OS page cache, etc. Independent (disk budget): datafusion.spill_memory_limit_bytes (50% of physical RAM) 32 GB ## What this commit does ### 1. Dynamic settings + cluster-update consumers All native-allocator settings are Setting.Property.Dynamic. Cluster-settings update consumers in ArrowBasePlugin#registerSettingsUpdateConsumers wire PUTs to live ArrowNativeAllocator state. The grouped validator (validateUpdate) rejects cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time with HTTP 400. ### 2. Query and DataFusion pools POOL_QUERY added for analytics-engine per-query allocators (children of nativeAllocator.getPoolAllocator(POOL_QUERY)). DataFusion gets its own setting (datafusion.memory_pool_limit_bytes) since DataFusion's working memory lives entirely on the Rust side and is reported only through DataFusion's MemoryPool — a Java-side pool that pretended to track it would either need a per-allocation FFM round-trip (performance disaster) or be a config-only mirror (HTTP 200 with no observable effect). POOL_DATAFUSION is intentionally absent; the Rust-side datafusion.memory_pool_limit_bytes is the honest knob for that layer. ### 3. Plugin#nodeStats SPI New SPI method Plugin#nodeStats() returning List<PluginNodeStats>. NodeStats serializes plugin-contributed payloads via NamedWriteable, surfacing them under _nodes/stats. Annotated @experimentalapi to match PluginNodeStats. ### 4. Pool wiring through Guice (no static singleton) ArrowNativeAllocator is constructed in ArrowBasePlugin#createComponents and returned to Node.java which auto-binds it via Guice. Consumers (AnalyticsSearchService, FlightTransport, DefaultPlanExecutor) receive it through constructor @Inject — no static instance() / INSTANCE / ensureForTesting. ### 5. Drop NativeAllocatorListener SPI [@bowenlan-amzn #3269655712, #3269660170, @Bukhtawar #3267186318] The original design used NativeAllocatorListener to push pool-limit changes into consumer-side child allocators that captured the pool's limit at construction time. With children created at Long.MAX_VALUE, Arrow's Accountant.allocate (lines 191-203 of Accountant.java in arrow-java v18.3.0) walks the parent chain on every allocation and checks the parent's allocationLimit — so dynamic resizes of parquet.native.pool.{flight,query}.max take effect immediately on subsequent allocations through any descendant. The listener was emulating this Arrow-native behavior and is no longer needed. - AnalyticsSearchService: drop poolListener; child uses Long.MAX_VALUE - DefaultPlanExecutor: drop poolListener; coordinator uses Long.MAX_VALUE - FlightTransport: drop intermediate flightAllocator + poolListener; pool is the parent, server/client are Long.MAX_VALUE children - libs/arrow-spi/.../NativeAllocatorListener.java: deleted - NativeAllocator interface: addListener / removeListener removed - ArrowNativeAllocator: listeners field, fireListeners method, and listener calls in setPoolLimit/setPoolMin/rebalance removed The rebalancer continues to function as before: it changes pool limits via setLimit, which children read via Arrow's parent-cap check at allocateBytes. Rebalancer is off by default. ### 6. Fix Guice duplicate-binding regression on ArrowNativeAllocator Removed the redundant ArrowBasePlugin#createGuiceModules override. Node.java line 1748 already binds every component returned from createComponents: pluginComponents.stream() .forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); ArrowBasePlugin#createComponents already returns the allocator, so the explicit bind(ArrowNativeAllocator.class).toInstance(allocator) in createGuiceModules registered the same binding a second time and caused Guice CreationException at cluster startup. This was the root cause of the gradle-check Jenkins CI failures on commits 9188043, 498b502, d646dd4, e1f3c5b, 041dc0e — every integration test failed during MockNode construction. ### 7. Memory defaults aligned to the partitioning model - node.native_memory.limit defaults to 79% × (RAM - heap) via OsProbe (cgroup- aware: container memory limit on K8s/Docker, total physical RAM on bare metal). Falls back to ZERO if probe fails or heap >= RAM, preserving the pre-default opt-in semantics. - native.allocator.root.limit defaults to 20% × node.native_memory.limit. - parquet.native.pool.{flight,ingest,query}.max default to 5%/8%/5% of node.native_memory.limit (sum 18% < root's 20%, leaving 2 pp headroom inside the framework cap). - datafusion.memory_pool_limit_bytes defaults to 75% × node.native_memory.limit (called out by @bharath-techie #3271093086: prior default Runtime.maxMemory() / 4 was JVM-heap derived, wrong baseline for an off-heap runtime). - datafusion.spill_memory_limit_bytes defaults to 50% × physical RAM, independent of node.native_memory.limit (spill is a disk-staging budget, not a memory budget). Behavior change to call out in upgrade notes: admission control is now active by default. Operators wanting pre-existing opt-out behavior can set node.native_memory.limit: 0b — all framework caps then fall back to Long.MAX_VALUE unbounded mode. ### 8. PLUGIN_STATS metric is opt-in Plugin-contributed nodeStats are gated behind a new NodesStatsRequest.Metric (plugin_stats) so that operators not opting in don't pay the serialization cost on every _nodes/stats poll. Default for cluster-stats is opt-out (false). ### 9. @experimentalapi on Plugin#nodeStats() [@bowenlan-amzn #3269563833] Annotate the new SPI hook to match the @experimentalapi annotation already present on PluginNodeStats. ### 10. FQN -> import nit in ParquetIndexingEngine [@bowenlan-amzn #3269589758] Replace 3 fully-qualified org.opensearch.arrow.allocator.ArrowNativeAllocator references in constructor parameters with a single import. ### 11. Integration tests for cap-enforcement boundaries Added NativeAllocatorBoundaryIT under plugins/arrow-flight-rpc/src/internalClusterTest. Three tests boot a single-node cluster with tight memory settings (root=16 MiB, pool maxes=16 MiB) and exercise real Arrow allocations via the Guice-injected ArrowNativeAllocator: - testPoolMaxRejectsAllocationsBeyondCap: PUT pool.query.max=4 MiB, verify a 8 MiB request through the pool throws OutOfMemoryException. - testRootLimitRejectsAllocationsBeyondCap: hold 8 MiB in FLIGHT, verify a 16 MiB QUERY request fails at the root level (8+16 > 16 root cap) even though QUERY's own max would individually allow it. - testDynamicPoolResizeAffectsInFlightAllocations: create child at Long.MAX_VALUE (the AnalyticsSearchService / DefaultPlanExecutor pattern post-listener-removal), verify a 2 MiB allocation succeeds, PUT query.max=1 MiB via cluster settings, verify a 2 MiB request now throws OOM via Arrow's parent-cap check at allocateBytes. These cover the boundaries the framework is supposed to enforce. The third test specifically verifies the listener-replacement contract: dynamic pool resize takes effect on in-flight Long.MAX_VALUE children without any listener machinery, exactly as Arrow's parent-cap check provides natively. ## How a query flows through these layers Take a concrete example: a user issues a PPL query that goes through analytics-engine and dispatches to DataFusion. 1. Coordinator receives the request. → Admission control checks node.native_memory.limit budget. → If OK, proceeds; otherwise rejects with 429. 2. AnalyticsSearchService creates a per-fragment Arrow allocator: allocator = getPoolAllocator(POOL_QUERY).newChildAllocator("frag-N", 0, Long.MAX_VALUE) → Future BufferAllocator.buffer(...) calls on this allocator increment POOL_QUERY's counter and the root counter via Arrow's parent-cap chain. → Bounded by parquet.native.pool.query.max. 3. AnalyticsSearchService dispatches to DataFusion via NativeBridge. → NativeBridge.executeQueryAsync(...) marshals plan bytes via FFM. 4. DataFusion runs the query in Rust: → HashAggregate builds a hash table. → reservation.try_grow(50MB) → DataFusion MemoryPool counter += 50MB. → Bounded by datafusion.memory_pool_limit_bytes. → If exceeded, HashAggregate spills to disk (bounded by datafusion.spill_memory_limit_bytes). 5. DataFusion produces a result batch in Rust. → Java imports it via Arrow C Data Interface. → The import allocates Java ArrowBufs under the per-fragment allocator from step 2. POOL_QUERY counter += result_size. 6. Result returns to coordinator. → Per-fragment allocator closes; POOL_QUERY counter decrements. → DataFusion query completes; MemoryPool counter decrements. Each layer accounts for what it owns. No double-counting. Each operator-tunable knob bounds a real, observable thing. ## Verification ./gradlew -Dsandbox.enabled=true \\ :plugins:arrow-base:test :plugins:arrow-flight-rpc:test \\ :sandbox:plugins:{analytics-engine,analytics-backend-datafusion,parquet-data-format}:test \\ :server:test --tests "org.opensearch.node.resource.tracker.*" \\ :plugins:arrow-flight-rpc:internalClusterTest --tests "*NativeAllocatorBoundaryIT*" → BUILD SUCCESSFUL By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
1 parent 40688ad commit c428701

21 files changed

Lines changed: 688 additions & 325 deletions

File tree

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocator.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ public interface NativeAllocator extends Closeable {
3636
PoolHandle getOrCreatePool(String poolName, long limit);
3737

3838
/**
39-
* Updates the limit of an existing pool.
39+
* Updates the limit of an existing pool. Children of the pool allocator
40+
* inherit the change automatically via Arrow's parent-cap check at
41+
* allocation time — no notification SPI is needed.
4042
*
4143
* @param poolName logical pool name
4244
* @param newLimit new maximum bytes for the pool
@@ -55,24 +57,6 @@ public interface NativeAllocator extends Closeable {
5557
*/
5658
NativeAllocatorPoolStats stats();
5759

58-
/**
59-
* Registers a listener that is invoked after a pool's limit changes.
60-
*
61-
* <p>Listener invocation is synchronous on the caller thread. See
62-
* {@link NativeAllocatorListener} for threading constraints.
63-
*
64-
* @param listener the listener to register
65-
*/
66-
void addListener(NativeAllocatorListener listener);
67-
68-
/**
69-
* Unregisters a previously registered listener. No-op if the listener
70-
* was not registered.
71-
*
72-
* @param listener the listener to remove
73-
*/
74-
void removeListener(NativeAllocatorListener listener);
75-
7660
/**
7761
* Opaque handle to a memory pool. Plugins downcast to the concrete type
7862
* (e.g., Arrow's {@code BufferAllocator}) in the implementation layer.

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocatorListener.java

Lines changed: 0 additions & 39 deletions
This file was deleted.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java

Lines changed: 101 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import org.opensearch.arrow.spi.NativeAllocatorPoolConfig;
1212
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
1313
import org.opensearch.cluster.service.ClusterService;
14-
import org.opensearch.common.inject.AbstractModule;
15-
import org.opensearch.common.inject.Module;
1614
import org.opensearch.common.settings.ClusterSettings;
1715
import org.opensearch.common.settings.Setting;
1816
import org.opensearch.common.settings.Settings;
@@ -55,11 +53,13 @@ public ArrowBasePlugin() {}
5553
/**
5654
* Maximum bytes for the root Arrow allocator.
5755
*
58-
* <p>When unset, the default is derived from the admission-control budget
59-
* {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_LIMIT_SETTING} reduced by
60-
* {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_BUFFER_PERCENT_SETTING} —
61-
* the same budget AC throttles on. If AC is unconfigured (limit = 0), the
62-
* default is {@link Long#MAX_VALUE}, preserving pre-AC behaviour.
56+
* <p>When unset, the default is 20% of
57+
* {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_LIMIT_SETTING}; see
58+
* {@link #deriveRootLimitDefault}. The Arrow framework gets a small fraction of the
59+
* native budget because the dominant consumer of native memory in analytics workloads
60+
* is the DataFusion Rust runtime (~75% of {@code node.native_memory.limit}), not Arrow.
61+
* If AC is unconfigured (limit = 0), the default is {@link Long#MAX_VALUE}, preserving
62+
* pre-AC behaviour.
6363
*/
6464
public static final Setting<Long> ROOT_LIMIT_SETTING = new Setting<>(
6565
NativeAllocatorPoolConfig.SETTING_ROOT_LIMIT,
@@ -78,17 +78,22 @@ public ArrowBasePlugin() {}
7878
);
7979

8080
/**
81-
* Computes the default for {@link #ROOT_LIMIT_SETTING} from the AC native-memory budget.
82-
* Returns the bytes-as-string representation expected by the Setting parser.
81+
* Computes the default for {@link #ROOT_LIMIT_SETTING} as 20% of
82+
* {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_LIMIT_SETTING}. The Arrow framework's
83+
* hard cap covers only Arrow allocations — DataFusion's Rust runtime is a sibling of
84+
* Arrow root and gets the larger share of the native budget (see
85+
* {@code DataFusionPlugin#deriveMemoryPoolLimitDefault}).
86+
*
87+
* <p>Returns the bytes-as-string representation expected by the {@link Setting} parser.
88+
* If the AC limit is unset (== 0), the default is {@link Long#MAX_VALUE} — unbounded —
89+
* preserving pre-AC behaviour.
8390
*/
8491
static String deriveRootLimitDefault(Settings settings) {
85-
ByteSizeValue acLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
86-
if (acLimit.getBytes() <= 0) {
92+
ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
93+
if (nativeLimit.getBytes() <= 0) {
8794
return Long.toString(Long.MAX_VALUE);
8895
}
89-
int bufferPercent = ResourceTrackerSettings.NODE_NATIVE_MEMORY_BUFFER_PERCENT_SETTING.get(settings);
90-
long usable = acLimit.getBytes() - (acLimit.getBytes() * bufferPercent / 100L);
91-
return Long.toString(Math.max(0L, usable));
96+
return Long.toString(nativeLimit.getBytes() * 20 / 100);
9297
}
9398

9499
/** Minimum guaranteed bytes for the Flight pool. */
@@ -100,11 +105,24 @@ static String deriveRootLimitDefault(Settings settings) {
100105
Setting.Property.Dynamic
101106
);
102107

103-
/** Maximum bytes the Flight pool can burst to. */
104-
public static final Setting<Long> FLIGHT_MAX_SETTING = Setting.longSetting(
108+
/**
109+
* Maximum bytes the Flight pool can burst to. Default is 5% of
110+
* {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_LIMIT_SETTING}; see
111+
* {@link #derivePoolMaxDefault}. Falls back to {@link Long#MAX_VALUE} when AC is
112+
* unconfigured. Matches the partitioning model documented in PR #21732.
113+
*/
114+
public static final Setting<Long> FLIGHT_MAX_SETTING = new Setting<>(
105115
NativeAllocatorPoolConfig.SETTING_FLIGHT_MAX,
106-
Long.MAX_VALUE,
107-
0L,
116+
s -> derivePoolMaxDefault(s, 5),
117+
s -> {
118+
long v = Long.parseLong(s);
119+
if (v < 0) {
120+
throw new IllegalArgumentException(
121+
"Setting [" + NativeAllocatorPoolConfig.SETTING_FLIGHT_MAX + "] must be >= 0, got " + v
122+
);
123+
}
124+
return v;
125+
},
108126
Setting.Property.NodeScope,
109127
Setting.Property.Dynamic
110128
);
@@ -118,11 +136,25 @@ static String deriveRootLimitDefault(Settings settings) {
118136
Setting.Property.Dynamic
119137
);
120138

121-
/** Maximum bytes the ingest pool can burst to. */
122-
public static final Setting<Long> INGEST_MAX_SETTING = Setting.longSetting(
139+
/**
140+
* Maximum bytes the ingest pool can burst to. Default is 8% of
141+
* {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_LIMIT_SETTING}; see
142+
* {@link #derivePoolMaxDefault}. Falls back to {@link Long#MAX_VALUE} when AC is
143+
* unconfigured. Ingest gets a larger fraction than Flight/Query because parquet VSR
144+
* allocators dominate write-path memory usage — see partitioning model in PR #21732.
145+
*/
146+
public static final Setting<Long> INGEST_MAX_SETTING = new Setting<>(
123147
NativeAllocatorPoolConfig.SETTING_INGEST_MAX,
124-
Long.MAX_VALUE,
125-
0L,
148+
s -> derivePoolMaxDefault(s, 8),
149+
s -> {
150+
long v = Long.parseLong(s);
151+
if (v < 0) {
152+
throw new IllegalArgumentException(
153+
"Setting [" + NativeAllocatorPoolConfig.SETTING_INGEST_MAX + "] must be >= 0, got " + v
154+
);
155+
}
156+
return v;
157+
},
126158
Setting.Property.NodeScope,
127159
Setting.Property.Dynamic
128160
);
@@ -141,24 +173,65 @@ static String deriveRootLimitDefault(Settings settings) {
141173
);
142174

143175
/**
144-
* Maximum bytes the query pool can allocate. Enforced by Arrow's child-allocator
145-
* limit — analytics-engine's per-query allocators are children of this pool, so the
146-
* sum of in-flight per-query allocations is capped here.
176+
* Maximum bytes the query pool can allocate. Default is 5% of
177+
* {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_LIMIT_SETTING}; see
178+
* {@link #derivePoolMaxDefault}. Falls back to {@link Long#MAX_VALUE} when AC is
179+
* unconfigured. Enforced by Arrow's child-allocator limit — analytics-engine's
180+
* per-query allocators are children of this pool, so the sum of in-flight per-query
181+
* allocations is capped here.
147182
*
148183
* <p>Note: each individual analytics query is also bounded by
149184
* {@code analytics.exec.QueryContext} per-query limit (currently the constant
150185
* {@code DEFAULT_PER_QUERY_MEMORY_LIMIT = 256 MB}). Lowering {@code QUERY_MAX}
151186
* below {@code 256 MB × concurrent-queries} can starve queries even when each
152187
* individual query is within its per-query limit.
153188
*/
154-
public static final Setting<Long> QUERY_MAX_SETTING = Setting.longSetting(
189+
public static final Setting<Long> QUERY_MAX_SETTING = new Setting<>(
155190
NativeAllocatorPoolConfig.SETTING_QUERY_MAX,
156-
Long.MAX_VALUE,
157-
0L,
191+
s -> derivePoolMaxDefault(s, 5),
192+
s -> {
193+
long v = Long.parseLong(s);
194+
if (v < 0) {
195+
throw new IllegalArgumentException(
196+
"Setting [" + NativeAllocatorPoolConfig.SETTING_QUERY_MAX + "] must be >= 0, got " + v
197+
);
198+
}
199+
return v;
200+
},
158201
Setting.Property.NodeScope,
159202
Setting.Property.Dynamic
160203
);
161204

205+
/**
206+
* Computes the default for a pool max as a percentage of
207+
* {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_LIMIT_SETTING} (the operator's
208+
* declared off-heap budget), falling back to {@link Long#MAX_VALUE} when AC is
209+
* unconfigured. Returns the bytes-as-string representation expected by the
210+
* {@link Setting} parser.
211+
*
212+
* <p>Pools are anchored to {@code node.native_memory.limit} rather than to
213+
* {@link #ROOT_LIMIT_SETTING} so the diagrammed partitioning (PR #21732) holds:
214+
* sum of pool maxes (5+8+5 = 18% of native_memory.limit) fits within the framework
215+
* root cap (20% of native_memory.limit) by default. Operator overrides of
216+
* {@code root.limit} that drop it below {@code sum(pool.max)} are caught by the
217+
* grouped validator.
218+
*
219+
* <p>The fraction is taken straight from {@code node.native_memory.limit}, not from
220+
* {@code limit - buffer_percent}. {@code buffer_percent} is an admission-control
221+
* throttle margin, not a framework budget reduction.
222+
*
223+
* @param settings node settings
224+
* @param percent fraction of {@code node.native_memory.limit} the pool max defaults to
225+
*/
226+
static String derivePoolMaxDefault(Settings settings, int percent) {
227+
ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
228+
if (nativeLimit.getBytes() <= 0) {
229+
return Long.toString(Long.MAX_VALUE);
230+
}
231+
long pool = Math.max(0L, nativeLimit.getBytes() * percent / 100);
232+
return Long.toString(pool);
233+
}
234+
162235
/** Interval in seconds between pool rebalance cycles. 0 disables rebalancing. */
163236
public static final Setting<Long> REBALANCE_INTERVAL_SETTING = Setting.longSetting(
164237
"native.allocator.rebalance.interval_seconds",
@@ -206,16 +279,6 @@ public Collection<Object> createComponents(
206279
return components;
207280
}
208281

209-
@Override
210-
public Collection<Module> createGuiceModules() {
211-
return List.of(new AbstractModule() {
212-
@Override
213-
protected void configure() {
214-
bind(ArrowNativeAllocator.class).toInstance(allocator);
215-
}
216-
});
217-
}
218-
219282
/**
220283
* Registers cluster-settings update consumers that propagate dynamic setting changes
221284
* into the live {@link ArrowNativeAllocator}. Package-private so unit tests can exercise

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@
1010

1111
import org.apache.arrow.memory.BufferAllocator;
1212
import org.apache.arrow.memory.RootAllocator;
13-
import org.apache.logging.log4j.LogManager;
14-
import org.apache.logging.log4j.Logger;
1513
import org.opensearch.arrow.spi.NativeAllocator;
16-
import org.opensearch.arrow.spi.NativeAllocatorListener;
1714
import org.opensearch.arrow.spi.NativeAllocatorPoolConfig;
1815
import org.opensearch.arrow.spi.NativeAllocatorPoolStats;
1916

@@ -24,7 +21,6 @@
2421
import java.util.Set;
2522
import java.util.concurrent.ConcurrentHashMap;
2623
import java.util.concurrent.ConcurrentMap;
27-
import java.util.concurrent.CopyOnWriteArrayList;
2824
import java.util.concurrent.ScheduledExecutorService;
2925
import java.util.concurrent.ScheduledFuture;
3026
import java.util.concurrent.TimeUnit;
@@ -49,13 +45,10 @@
4945
*/
5046
public class ArrowNativeAllocator implements NativeAllocator {
5147

52-
private static final Logger logger = LogManager.getLogger(ArrowNativeAllocator.class);
53-
5448
private final RootAllocator root;
5549
private final ConcurrentMap<String, ArrowPoolHandle> pools = new ConcurrentHashMap<>();
5650
private final ConcurrentMap<String, Long> poolMins = new ConcurrentHashMap<>();
5751
private final ConcurrentMap<String, Long> poolMaxes = new ConcurrentHashMap<>();
58-
private final CopyOnWriteArrayList<NativeAllocatorListener> listeners = new CopyOnWriteArrayList<>();
5952
private final ScheduledExecutorService rebalancer;
6053
private volatile ScheduledFuture<?> rebalanceTask;
6154
/**
@@ -140,37 +133,7 @@ public void setPoolLimit(String poolName, long newLimit) {
140133
throw new IllegalStateException("Pool '" + poolName + "' does not exist");
141134
}
142135
poolMaxes.put(poolName, newLimit);
143-
long previous = handle.allocator.getLimit();
144136
handle.allocator.setLimit(newLimit);
145-
if (newLimit != previous) {
146-
fireListeners(poolName, newLimit);
147-
}
148-
}
149-
150-
@Override
151-
public void addListener(NativeAllocatorListener listener) {
152-
listeners.addIfAbsent(listener);
153-
}
154-
155-
@Override
156-
public void removeListener(NativeAllocatorListener listener) {
157-
listeners.remove(listener);
158-
}
159-
160-
/**
161-
* Notifies all registered listeners of a pool limit change. Each listener
162-
* is invoked synchronously on the caller thread; exceptions thrown by one
163-
* listener are logged and isolated so they do not block the others or the
164-
* caller.
165-
*/
166-
private void fireListeners(String poolName, long newLimit) {
167-
for (NativeAllocatorListener listener : listeners) {
168-
try {
169-
listener.onPoolLimitChanged(poolName, newLimit);
170-
} catch (Exception e) {
171-
logger.warn("NativeAllocatorListener threw on pool [{}] limit update", poolName, e);
172-
}
173-
}
174137
}
175138

176139
/**
@@ -183,8 +146,10 @@ private void fireListeners(String poolName, long newLimit) {
183146
* <p>Live propagation rules:
184147
* <ul>
185148
* <li>If {@code newMin} exceeds the pool's current limit, the limit is raised to
186-
* {@code newMin} (capped at the configured pool max), and listeners fire so
187-
* downstream consumers (e.g. the DataFusion Rust runtime) see the new ceiling.
149+
* {@code newMin} (capped at the configured pool max). Children of the pool
150+
* allocator inherit the change automatically via Arrow's parent-cap check at
151+
* allocation time, so dynamic resizes reach in-flight workloads without an
152+
* explicit notification SPI.
188153
* <li>If {@code newMin} is below the current limit, the limit is left alone —
189154
* the rebalancer is the only path that shrinks live limits, so a min change
190155
* on its own never reduces capacity in flight.
@@ -204,7 +169,6 @@ public void setPoolMin(String poolName, long newMin) {
204169
long target = Math.min(newMin, max);
205170
if (target > current) {
206171
handle.allocator.setLimit(target);
207-
fireListeners(poolName, target);
208172
}
209173
}
210174

@@ -292,11 +256,7 @@ void rebalance() {
292256
// Never exceed root
293257
effectiveLimit = Math.min(effectiveLimit, rootLimit);
294258

295-
long previous = alloc.getLimit();
296259
alloc.setLimit(effectiveLimit);
297-
if (effectiveLimit != previous) {
298-
fireListeners(name, effectiveLimit);
299-
}
300260
}
301261
}
302262

0 commit comments

Comments
 (0)