Skip to content

Commit 9188043

Browse files
committed
Native allocator follow-up: dynamic settings, query/datafusion pools, plugin nodeStats
Builds on #21703 to add the pieces a real production deployment needs: * Make pool min/max settings dynamic and grouped-validate cross-setting invariants on every cluster-state update (sum of pool mins <= root, per-pool min <= max). FLIGHT_MIN/INGEST_MIN defaults are 0L (the prior Long.MAX_VALUE defaults caused the new validator to reject any non-MAX root). setPoolMin now updates the live BufferAllocator.setLimit so the Dynamic property has observable effect even when the rebalancer is off. * Derive ROOT_LIMIT_SETTING default from the AC node-native-memory budget (limit minus buffer-percent) so the framework respects the same budget AC throttles on, with a Long.MAX_VALUE fallback when AC is unconfigured. * Add QUERY and DATAFUSION pools alongside FLIGHT and INGEST. Pools init at min when the rebalancer is enabled; otherwise at max so non-rebalanced nodes can still allocate. * Wire arrow-flight-rpc to the FLIGHT pool, parquet-data-format to the INGEST pool, and analytics-engine via the framework's allocator service. Hard-fail if the framework plugin is missing — silently skipping the wire-up is the silent-misconfiguration class of bug we want to prevent. * Cleanup ad-hoc allocator fallbacks in parquet-data-format / analytics-engine so all Arrow consumers go through the unified pool hierarchy. * Rebalancer now distributes headroom across all pools (not only those with current allocation > 0). Avoids the dead-pool corner case where a pool with min=0 starts at limit=0, can never make a first allocation, and never receives a bonus. Plugin _nodes/stats integration: * Add Plugin#nodeStats() hook + PluginNodeStats interface in server. * Wire NodeStats to carry Map<String, PluginNodeStats> with version-gated ser/deser at V_3_7_0 and top-level rendering under nodes.<id>.<name>. * Each entry is wire-framed as (name, length-prefixed bytes); the receiver wraps the inner payload with NamedWriteableAwareStreamInput and drops entries whose subtype is not registered locally. This makes mixed-version rolling upgrades safe — a coordinator that lacks the plugin a data node is running keeps decoding the rest of NodeStats instead of failing the whole response. * NativeAllocatorPluginStats adapter wraps NativeAllocatorPoolStats so the framework contributes to _nodes/stats. The dedicated _native_allocator/stats REST endpoint is gone — one observability surface, not two. * Plugin stats are emitted on every _nodes/stats request regardless of the ?metric= filter; matches RemoteStoreNodeStats precedent. A future PR can add a Metric.PLUGIN_STATS gate without breaking the wire protocol. DataFusion spill memory limit: * Promote datafusion.spill_memory_limit_bytes to Setting.Property.Dynamic. * Wire addSettingsUpdateConsumer that branches on NativeBridge.isSpillLimitDynamic(): mirrors live when df_set_spill_limit is exported, otherwise warns and waits for next node restart. API hygiene: * parquet.max_per_vsr_allocation_ratio is a divisor (limit/N), not a ratio. Renamed to parquet.max_per_vsr_allocation_divisor with a hard upper bound of 100 to reject fat-finger PUTs that would starve every VSR. Real regressions caught during self-audit: * Existing arrow-flight-rpc internalClusterTests and the sandbox coordinator ITs were not declaring the framework plugin in nodePlugins(). After the flight transport got wired to the FLIGHT pool, those ITs fail at node startup with "ArrowNativeAllocator not initialized". Each IT now installs the framework plugin and lists it as an extendedPlugin. Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
1 parent ac2b2fd commit 9188043

42 files changed

Lines changed: 1567 additions & 126 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocator.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,24 @@ public interface NativeAllocator extends Closeable {
5555
*/
5656
NativeAllocatorPoolStats stats();
5757

58+
/**
59+
* Registers a listener that is invoked after a pool's limit changes.
60+
*
61+
* <p>Listener invocation is synchronous on the caller thread. See
62+
* {@link NativeAllocatorListener} for threading constraints.
63+
*
64+
* @param listener the listener to register
65+
*/
66+
void addListener(NativeAllocatorListener listener);
67+
68+
/**
69+
* Unregisters a previously registered listener. No-op if the listener
70+
* was not registered.
71+
*
72+
* @param listener the listener to remove
73+
*/
74+
void removeListener(NativeAllocatorListener listener);
75+
5876
/**
5977
* Opaque handle to a memory pool. Plugins downcast to the concrete type
6078
* (e.g., Arrow's {@code BufferAllocator}) in the implementation layer.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.arrow.spi;
10+
11+
/**
12+
* Callback invoked when a pool's limit changes.
13+
*
14+
* <p>Consumers that mirror a Java-side pool limit to a separate accountant
15+
* (for example, a native runtime that holds its own memory pool) implement
16+
* this interface and register it via {@link NativeAllocator#addListener} so
17+
* resize events propagate without polling.
18+
*
19+
* <p>The interface is allocator-agnostic — no Arrow types appear in the
20+
* signature — so non-Arrow consumers can also subscribe.
21+
*
22+
* <p><b>Threading:</b> listeners are invoked synchronously on the thread that
23+
* caused the resize (a settings-update thread or the rebalancer thread).
24+
* Implementations must not block or take locks held by the caller; if either
25+
* is required, dispatch the work to another executor.
26+
*
27+
* @opensearch.api
28+
*/
29+
@FunctionalInterface
30+
public interface NativeAllocatorListener {
31+
32+
/**
33+
* Invoked after a pool's limit has been updated.
34+
*
35+
* @param poolName logical pool name
36+
* @param newLimit the new limit in bytes
37+
*/
38+
void onPoolLimitChanged(String poolName, long newLimit);
39+
}

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocatorPoolConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public final class NativeAllocatorPoolConfig {
3030
public static final String POOL_FLIGHT = "flight";
3131
/** Pool name for ingest pipeline memory. */
3232
public static final String POOL_INGEST = "ingest";
33+
/** Pool name for query-execution memory (analytics-engine fragments and per-query allocators). */
34+
public static final String POOL_QUERY = "query";
35+
/** Pool name for DataFusion native runtime memory. Mirrored to the Rust-side memory pool via {@link NativeAllocatorListener}. */
36+
public static final String POOL_DATAFUSION = "datafusion";
3337

3438
/** Setting key for the root allocator limit. */
3539
public static final String SETTING_ROOT_LIMIT = "native.allocator.root.limit";
@@ -42,6 +46,14 @@ public final class NativeAllocatorPoolConfig {
4246
public static final String SETTING_INGEST_MIN = "native.allocator.pool.ingest.min";
4347
/** Setting key for the ingest pool maximum. */
4448
public static final String SETTING_INGEST_MAX = "native.allocator.pool.ingest.max";
49+
/** Setting key for the query pool minimum. */
50+
public static final String SETTING_QUERY_MIN = "native.allocator.pool.query.min";
51+
/** Setting key for the query pool maximum. */
52+
public static final String SETTING_QUERY_MAX = "native.allocator.pool.query.max";
53+
/** Setting key for the DataFusion pool minimum. */
54+
public static final String SETTING_DATAFUSION_MIN = "native.allocator.pool.datafusion.min";
55+
/** Setting key for the DataFusion pool maximum. */
56+
public static final String SETTING_DATAFUSION_MAX = "native.allocator.pool.datafusion.max";
4557

4658
private NativeAllocatorPoolConfig() {}
4759
}

libs/arrow-spi/src/test/java/org/opensearch/arrow/spi/NativeAllocatorPoolConfigTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,19 @@ public class NativeAllocatorPoolConfigTests extends OpenSearchTestCase {
1515
public void testPoolConstants() {
1616
assertEquals("flight", NativeAllocatorPoolConfig.POOL_FLIGHT);
1717
assertEquals("ingest", NativeAllocatorPoolConfig.POOL_INGEST);
18+
assertEquals("query", NativeAllocatorPoolConfig.POOL_QUERY);
19+
assertEquals("datafusion", NativeAllocatorPoolConfig.POOL_DATAFUSION);
1820
}
1921

2022
public void testSettingKeys() {
2123
assertEquals("native.allocator.pool.flight.min", NativeAllocatorPoolConfig.SETTING_FLIGHT_MIN);
2224
assertEquals("native.allocator.pool.flight.max", NativeAllocatorPoolConfig.SETTING_FLIGHT_MAX);
2325
assertEquals("native.allocator.pool.ingest.min", NativeAllocatorPoolConfig.SETTING_INGEST_MIN);
2426
assertEquals("native.allocator.pool.ingest.max", NativeAllocatorPoolConfig.SETTING_INGEST_MAX);
27+
assertEquals("native.allocator.pool.query.min", NativeAllocatorPoolConfig.SETTING_QUERY_MIN);
28+
assertEquals("native.allocator.pool.query.max", NativeAllocatorPoolConfig.SETTING_QUERY_MAX);
29+
assertEquals("native.allocator.pool.datafusion.min", NativeAllocatorPoolConfig.SETTING_DATAFUSION_MIN);
30+
assertEquals("native.allocator.pool.datafusion.max", NativeAllocatorPoolConfig.SETTING_DATAFUSION_MAX);
2531
}
2632

2733
public void testRootSettingKey() {

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java

Lines changed: 177 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,18 @@
1515
import org.opensearch.cluster.service.ClusterService;
1616
import org.opensearch.common.inject.AbstractModule;
1717
import org.opensearch.common.inject.Module;
18+
import org.opensearch.common.settings.ClusterSettings;
1819
import org.opensearch.common.settings.Setting;
1920
import org.opensearch.common.settings.Settings;
2021
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
22+
import org.opensearch.core.common.unit.ByteSizeValue;
2123
import org.opensearch.core.xcontent.NamedXContentRegistry;
2224
import org.opensearch.env.Environment;
2325
import org.opensearch.env.NodeEnvironment;
26+
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
2427
import org.opensearch.plugins.ExtensiblePlugin;
2528
import org.opensearch.plugins.Plugin;
29+
import org.opensearch.plugins.PluginNodeStats;
2630
import org.opensearch.repositories.RepositoriesService;
2731
import org.opensearch.script.ScriptService;
2832
import org.opensearch.threadpool.ThreadPool;
@@ -50,19 +54,49 @@ public class ArrowBasePlugin extends Plugin implements ExtensiblePlugin {
5054
/** Creates the plugin. */
5155
public ArrowBasePlugin() {}
5256

53-
/** Maximum bytes for the root Arrow allocator. */
54-
public static final Setting<Long> ROOT_LIMIT_SETTING = Setting.longSetting(
57+
/**
58+
* Maximum bytes for the root Arrow allocator.
59+
*
60+
* <p>When unset, the default is derived from the admission-control budget
61+
* {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_LIMIT_SETTING} reduced by
62+
* {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_BUFFER_PERCENT_SETTING} —
63+
* the same budget AC throttles on. If AC is unconfigured (limit = 0), the
64+
* default is {@link Long#MAX_VALUE}, preserving pre-AC behaviour.
65+
*/
66+
public static final Setting<Long> ROOT_LIMIT_SETTING = new Setting<>(
5567
NativeAllocatorPoolConfig.SETTING_ROOT_LIMIT,
56-
Long.MAX_VALUE,
57-
0L,
68+
ArrowBasePlugin::deriveRootLimitDefault,
69+
s -> {
70+
long v = Long.parseLong(s);
71+
if (v < 0) {
72+
throw new IllegalArgumentException(
73+
"Setting [" + NativeAllocatorPoolConfig.SETTING_ROOT_LIMIT + "] must be >= 0, got " + v
74+
);
75+
}
76+
return v;
77+
},
5878
Setting.Property.NodeScope,
5979
Setting.Property.Dynamic
6080
);
6181

82+
/**
83+
* Computes the default for {@link #ROOT_LIMIT_SETTING} from the AC native-memory budget.
84+
* Returns the bytes-as-string representation expected by the Setting parser.
85+
*/
86+
static String deriveRootLimitDefault(Settings settings) {
87+
ByteSizeValue acLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
88+
if (acLimit.getBytes() <= 0) {
89+
return Long.toString(Long.MAX_VALUE);
90+
}
91+
int bufferPercent = ResourceTrackerSettings.NODE_NATIVE_MEMORY_BUFFER_PERCENT_SETTING.get(settings);
92+
long usable = acLimit.getBytes() - (acLimit.getBytes() * bufferPercent / 100L);
93+
return Long.toString(Math.max(0L, usable));
94+
}
95+
6296
/** Minimum guaranteed bytes for the Flight pool. */
6397
public static final Setting<Long> FLIGHT_MIN_SETTING = Setting.longSetting(
6498
NativeAllocatorPoolConfig.SETTING_FLIGHT_MIN,
65-
Long.MAX_VALUE,
99+
0L,
66100
0L,
67101
Setting.Property.NodeScope,
68102
Setting.Property.Dynamic
@@ -80,7 +114,7 @@ public ArrowBasePlugin() {}
80114
/** Minimum guaranteed bytes for the ingest pool. */
81115
public static final Setting<Long> INGEST_MIN_SETTING = Setting.longSetting(
82116
NativeAllocatorPoolConfig.SETTING_INGEST_MIN,
83-
Long.MAX_VALUE,
117+
0L,
84118
0L,
85119
Setting.Property.NodeScope,
86120
Setting.Property.Dynamic
@@ -95,6 +129,66 @@ public ArrowBasePlugin() {}
95129
Setting.Property.Dynamic
96130
);
97131

132+
/**
133+
* Minimum guaranteed bytes for the query pool. Honored by the rebalancer (when
134+
* enabled) — sets a floor below which the rebalancer will not shrink the pool.
135+
* Has no effect when rebalancing is disabled.
136+
*/
137+
public static final Setting<Long> QUERY_MIN_SETTING = Setting.longSetting(
138+
NativeAllocatorPoolConfig.SETTING_QUERY_MIN,
139+
0L,
140+
0L,
141+
Setting.Property.NodeScope,
142+
Setting.Property.Dynamic
143+
);
144+
145+
/**
146+
* Maximum bytes the query pool can allocate. Enforced by Arrow's child-allocator
147+
* limit — analytics-engine's per-query allocators are children of this pool, so the
148+
* sum of in-flight per-query allocations is capped here.
149+
*
150+
* <p>Note: each individual analytics query is also bounded by
151+
* {@code analytics.exec.QueryContext} per-query limit (currently the constant
152+
* {@code DEFAULT_PER_QUERY_MEMORY_LIMIT = 256 MB}). Lowering {@code QUERY_MAX}
153+
* below {@code 256 MB × concurrent-queries} can starve queries even when each
154+
* individual query is within its per-query limit.
155+
*/
156+
public static final Setting<Long> QUERY_MAX_SETTING = Setting.longSetting(
157+
NativeAllocatorPoolConfig.SETTING_QUERY_MAX,
158+
Long.MAX_VALUE,
159+
0L,
160+
Setting.Property.NodeScope,
161+
Setting.Property.Dynamic
162+
);
163+
164+
/**
165+
* Minimum guaranteed bytes for the DataFusion pool. Honored by the rebalancer
166+
* (when enabled). Has no effect when rebalancing is disabled.
167+
*/
168+
public static final Setting<Long> DATAFUSION_MIN_SETTING = Setting.longSetting(
169+
NativeAllocatorPoolConfig.SETTING_DATAFUSION_MIN,
170+
0L,
171+
0L,
172+
Setting.Property.NodeScope,
173+
Setting.Property.Dynamic
174+
);
175+
176+
/**
177+
* Maximum bytes the DataFusion pool can allocate. Mirrored to the Rust-side
178+
* {@code MemoryPool} via {@code df_set_memory_pool_limit} on every dynamic update
179+
* (see the listener registered in {@code DataFusionPlugin}). DataFusion's
180+
* native runtime checks this cap on every allocation request, so updates take
181+
* effect immediately for new sessions; existing query reservations that already
182+
* exceed the new cap are not reclaimed but drain naturally as queries complete.
183+
*/
184+
public static final Setting<Long> DATAFUSION_MAX_SETTING = Setting.longSetting(
185+
NativeAllocatorPoolConfig.SETTING_DATAFUSION_MAX,
186+
Long.MAX_VALUE,
187+
0L,
188+
Setting.Property.NodeScope,
189+
Setting.Property.Dynamic
190+
);
191+
98192
/** Interval in seconds between pool rebalance cycles. 0 disables rebalancing. */
99193
public static final Setting<Long> REBALANCE_INTERVAL_SETTING = Setting.longSetting(
100194
"native.allocator.rebalance.interval_seconds",
@@ -127,20 +221,34 @@ public Collection<Object> createComponents(
127221
allocator = new ArrowNativeAllocator(rootLimit);
128222
allocator.setRebalanceInterval(REBALANCE_INTERVAL_SETTING.get(settings));
129223

130-
long flightMin = FLIGHT_MIN_SETTING.get(settings);
131-
long flightMax = FLIGHT_MAX_SETTING.get(settings);
132-
long ingestMin = INGEST_MIN_SETTING.get(settings);
133-
long ingestMax = INGEST_MAX_SETTING.get(settings);
224+
// Single source of truth for cross-setting invariants — same logic runs on
225+
// dynamic updates via the grouped consumer below.
226+
validateUpdate(settings);
134227

135-
validateMinMax(NativeAllocatorPoolConfig.POOL_FLIGHT, flightMin, flightMax);
136-
validateMinMax(NativeAllocatorPoolConfig.POOL_INGEST, ingestMin, ingestMax);
137-
validateMinSum(rootLimit, flightMin, ingestMin);
228+
allocator.getOrCreatePool(NativeAllocatorPoolConfig.POOL_FLIGHT, FLIGHT_MIN_SETTING.get(settings), FLIGHT_MAX_SETTING.get(settings));
229+
allocator.getOrCreatePool(NativeAllocatorPoolConfig.POOL_INGEST, INGEST_MIN_SETTING.get(settings), INGEST_MAX_SETTING.get(settings));
230+
allocator.getOrCreatePool(NativeAllocatorPoolConfig.POOL_QUERY, QUERY_MIN_SETTING.get(settings), QUERY_MAX_SETTING.get(settings));
231+
allocator.getOrCreatePool(
232+
NativeAllocatorPoolConfig.POOL_DATAFUSION,
233+
DATAFUSION_MIN_SETTING.get(settings),
234+
DATAFUSION_MAX_SETTING.get(settings)
235+
);
138236

139-
allocator.getOrCreatePool(NativeAllocatorPoolConfig.POOL_FLIGHT, flightMin, flightMax);
140-
allocator.getOrCreatePool(NativeAllocatorPoolConfig.POOL_INGEST, ingestMin, ingestMax);
237+
ClusterSettings cs = clusterService.getClusterSettings();
238+
cs.addSettingsUpdateConsumer(ROOT_LIMIT_SETTING, allocator::setRootLimit);
239+
cs.addSettingsUpdateConsumer(REBALANCE_INTERVAL_SETTING, allocator::setRebalanceInterval);
240+
cs.addSettingsUpdateConsumer(FLIGHT_MAX_SETTING, v -> allocator.setPoolLimit(NativeAllocatorPoolConfig.POOL_FLIGHT, v));
241+
cs.addSettingsUpdateConsumer(FLIGHT_MIN_SETTING, v -> allocator.setPoolMin(NativeAllocatorPoolConfig.POOL_FLIGHT, v));
242+
cs.addSettingsUpdateConsumer(INGEST_MAX_SETTING, v -> allocator.setPoolLimit(NativeAllocatorPoolConfig.POOL_INGEST, v));
243+
cs.addSettingsUpdateConsumer(INGEST_MIN_SETTING, v -> allocator.setPoolMin(NativeAllocatorPoolConfig.POOL_INGEST, v));
244+
cs.addSettingsUpdateConsumer(QUERY_MAX_SETTING, v -> allocator.setPoolLimit(NativeAllocatorPoolConfig.POOL_QUERY, v));
245+
cs.addSettingsUpdateConsumer(QUERY_MIN_SETTING, v -> allocator.setPoolMin(NativeAllocatorPoolConfig.POOL_QUERY, v));
246+
cs.addSettingsUpdateConsumer(DATAFUSION_MAX_SETTING, v -> allocator.setPoolLimit(NativeAllocatorPoolConfig.POOL_DATAFUSION, v));
247+
cs.addSettingsUpdateConsumer(DATAFUSION_MIN_SETTING, v -> allocator.setPoolMin(NativeAllocatorPoolConfig.POOL_DATAFUSION, v));
141248

142-
clusterService.getClusterSettings().addSettingsUpdateConsumer(ROOT_LIMIT_SETTING, allocator::setRootLimit);
143-
clusterService.getClusterSettings().addSettingsUpdateConsumer(REBALANCE_INTERVAL_SETTING, allocator::setRebalanceInterval);
249+
// Grouped validator runs across the related settings on every dynamic update so cross-setting
250+
// invariants (sum of pool mins ≤ root, per-pool min ≤ max) are enforced post-startup.
251+
cs.addSettingsUpdateConsumer(s -> {}, MIN_MAX_SETTINGS, ArrowBasePlugin::validateUpdate);
144252

145253
// ArrowBasePlugin component: node-level Arrow allocator service for cross-plugin zero-copy
146254
this.allocatorService = new DefaultArrowAllocatorService(allocator);
@@ -161,6 +269,35 @@ protected void configure() {
161269
});
162270
}
163271

272+
private static final List<Setting<Long>> MIN_MAX_SETTINGS = List.of(
273+
ROOT_LIMIT_SETTING,
274+
FLIGHT_MIN_SETTING,
275+
FLIGHT_MAX_SETTING,
276+
INGEST_MIN_SETTING,
277+
INGEST_MAX_SETTING,
278+
QUERY_MIN_SETTING,
279+
QUERY_MAX_SETTING,
280+
DATAFUSION_MIN_SETTING,
281+
DATAFUSION_MAX_SETTING
282+
);
283+
284+
private static void validateUpdate(Settings settings) {
285+
long rootLimit = ROOT_LIMIT_SETTING.get(settings);
286+
long flightMin = FLIGHT_MIN_SETTING.get(settings);
287+
long flightMax = FLIGHT_MAX_SETTING.get(settings);
288+
long ingestMin = INGEST_MIN_SETTING.get(settings);
289+
long ingestMax = INGEST_MAX_SETTING.get(settings);
290+
long queryMin = QUERY_MIN_SETTING.get(settings);
291+
long queryMax = QUERY_MAX_SETTING.get(settings);
292+
long datafusionMin = DATAFUSION_MIN_SETTING.get(settings);
293+
long datafusionMax = DATAFUSION_MAX_SETTING.get(settings);
294+
validateMinMax(NativeAllocatorPoolConfig.POOL_FLIGHT, flightMin, flightMax);
295+
validateMinMax(NativeAllocatorPoolConfig.POOL_INGEST, ingestMin, ingestMax);
296+
validateMinMax(NativeAllocatorPoolConfig.POOL_QUERY, queryMin, queryMax);
297+
validateMinMax(NativeAllocatorPoolConfig.POOL_DATAFUSION, datafusionMin, datafusionMax);
298+
validateMinSum(rootLimit, flightMin, ingestMin, queryMin, datafusionMin);
299+
}
300+
164301
@Override
165302
public List<Setting<?>> getSettings() {
166303
return List.of(
@@ -169,10 +306,29 @@ public List<Setting<?>> getSettings() {
169306
FLIGHT_MAX_SETTING,
170307
INGEST_MIN_SETTING,
171308
INGEST_MAX_SETTING,
309+
QUERY_MIN_SETTING,
310+
QUERY_MAX_SETTING,
311+
DATAFUSION_MIN_SETTING,
312+
DATAFUSION_MAX_SETTING,
172313
REBALANCE_INTERVAL_SETTING
173314
);
174315
}
175316

317+
@Override
318+
public List<PluginNodeStats> nodeStats() {
319+
if (allocator == null) {
320+
return List.of();
321+
}
322+
return List.of(new NativeAllocatorPluginStats(allocator.stats()));
323+
}
324+
325+
@Override
326+
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
327+
return List.of(
328+
new NamedWriteableRegistry.Entry(PluginNodeStats.class, NativeAllocatorPluginStats.NAME, NativeAllocatorPluginStats::new)
329+
);
330+
}
331+
176332
private static void validateMinMax(String poolName, long min, long max) {
177333
if (min > max) {
178334
throw new IllegalArgumentException("Pool '" + poolName + "' min (" + min + ") exceeds max (" + max + ")");
@@ -185,10 +341,10 @@ private static void validateMinSum(long rootLimit, long... mins) {
185341
}
186342
long sum = 0;
187343
for (long min : mins) {
188-
long prev = sum;
189-
sum += min;
190-
if (sum < prev) {
191-
throw new IllegalArgumentException("Sum of pool minimums overflows.");
344+
try {
345+
sum = Math.addExact(sum, min);
346+
} catch (ArithmeticException overflow) {
347+
throw new IllegalArgumentException("Sum of pool minimums overflows.", overflow);
192348
}
193349
}
194350
if (sum > rootLimit) {

0 commit comments

Comments
 (0)