Skip to content

Commit 6d3436c

Browse files
committed
Native allocator follow-up: dynamic settings, query/datafusion pools, plugin nodeStats — review-feedback edition
Builds on #21703 to add the query, datafusion with allocator, dynamic-tuning, stats, and pool-wiring pieces. This commit is the consolidated version after addressing review feedback on PR #21732. ## Architecture: three native-memory trackers, three knobs Each tracker owns the bytes it can actually see, with a process-level cap above all of them. On a 64 GB / 16 GB-heap node the defaults compose to: RAM 64 GB JVM heap (operator-configured -Xmx) 16 GB Off-heap (RAM - heap) 48 GB node.native_memory.limit (79% of off-heap) 37.92 GB ← AC throttle threshold native.allocator.root.limit (20% of NM) 7.58 GB ← Arrow framework cap pool.flight.max (5% of NM) 1.90 GB ← Flight transport pool pool.ingest.max (8% of NM) 3.03 GB ← Parquet VSR pool pool.query.max (5% of NM) 1.90 GB ← analytics-engine pool datafusion.memory_pool_limit (75% of NM) 28.44 GB ← Rust runtime pool (sibling to Arrow) Unmanaged 10.08 GB ← Lucene mmap, OS page cache, etc. Independent (disk budget): datafusion.spill_memory_limit_bytes (50% of physical RAM) 32 GB ## What this commit does ### 1. Dynamic settings + cluster-update consumers All native-allocator settings are Setting.Property.Dynamic. Cluster-settings update consumers in ArrowBasePlugin#registerSettingsUpdateConsumers wire PUTs to live ArrowNativeAllocator state. The grouped validator (validateUpdate) rejects cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time with HTTP 400. ### 2. Query and DataFusion pools POOL_QUERY added for analytics-engine per-query allocators (children of nativeAllocator.getPoolAllocator(POOL_QUERY)). DataFusion gets its own setting (datafusion.memory_pool_limit_bytes) since DataFusion's working memory lives entirely on the Rust side and is reported only through DataFusion's MemoryPool — a Java-side pool that pretended to track it would either need a per-allocation FFM round-trip (performance disaster) or be a config-only mirror (HTTP 200 with no observable effect). POOL_DATAFUSION is intentionally absent; the Rust-side datafusion.memory_pool_limit_bytes is the honest knob for that layer. ### 3. Plugin#nodeStats SPI New SPI method Plugin#nodeStats() returning List<PluginNodeStats>. NodeStats serializes plugin-contributed payloads via NamedWriteable, surfacing them under _nodes/stats. Annotated @experimentalapi to match PluginNodeStats. ### 4. Pool wiring through Guice (no static singleton) ArrowNativeAllocator is constructed in ArrowBasePlugin#createComponents and returned to Node.java which auto-binds it via Guice. Consumers (AnalyticsSearchService, FlightTransport, DefaultPlanExecutor) receive it through constructor @Inject — no static instance() / INSTANCE / ensureForTesting. The createComponents body is extracted into a package-private buildAllocator helper so unit tests can exercise the full pool/consumer wiring without the heavyweight ClusterService fixture. ### 5. Drop NativeAllocatorListener SPI [@bowenlan-amzn #3269655712, #3269660170, @Bukhtawar #3267186318] The original design used NativeAllocatorListener to push pool-limit changes into consumer-side child allocators that captured the pool's limit at construction time. With children created at Long.MAX_VALUE, Arrow's Accountant.allocate (lines 191-203 of Accountant.java in arrow-java v18.3.0) walks the parent chain on every allocation and checks the parent's allocationLimit — so dynamic resizes of parquet.native.pool.{flight,query}.max take effect immediately on subsequent allocations through any descendant. The listener was emulating this Arrow-native behavior and is no longer needed. - AnalyticsSearchService: drop poolListener; child uses Long.MAX_VALUE - DefaultPlanExecutor: drop poolListener; coordinator uses Long.MAX_VALUE - FlightTransport: drop intermediate flightAllocator + poolListener; pool is the parent, server/client are Long.MAX_VALUE children - libs/arrow-spi/.../NativeAllocatorListener.java: deleted - NativeAllocator interface: addListener / removeListener removed - ArrowNativeAllocator: listeners field, fireListeners method, and listener calls in setPoolLimit/setPoolMin/rebalance removed The rebalancer continues to function as before: it changes pool limits via setLimit, which children read via Arrow's parent-cap check at allocateBytes. Rebalancer is off by default. ### 6. Fix Guice duplicate-binding regression on ArrowNativeAllocator Removed the redundant ArrowBasePlugin#createGuiceModules override. Node.java line 1748 already binds every component returned from createComponents: pluginComponents.stream() .forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); ArrowBasePlugin#createComponents already returns the allocator, so the explicit bind(ArrowNativeAllocator.class).toInstance(allocator) in createGuiceModules registered the same binding a second time and caused Guice CreationException at cluster startup. This was the root cause of the gradle-check Jenkins CI failures on commits 9188043, 498b502, d646dd4, e1f3c5b, 041dc0e — every integration test failed during MockNode construction. ### 7. @experimentalapi on Plugin#nodeStats() [@bowenlan-amzn #3269563833] Annotate the new SPI hook to match the @experimentalapi annotation already present on PluginNodeStats (the type the method returns). ### 8. FQN -> import nit in ParquetIndexingEngine [@bowenlan-amzn #3269589758] Replace 3 fully-qualified org.opensearch.arrow.allocator.ArrowNativeAllocator references in constructor parameters with a single import. ### 9. Memory defaults aligned to the partitioning model - node.native_memory.limit defaults to 79% × (RAM - heap) via OsProbe (cgroup- aware: container memory limit on K8s/Docker, total physical RAM on bare metal). Falls back to ZERO if probe fails or heap >= RAM, preserving the pre-default opt-in semantics. The OsProbe-reading entry point delegates to a pure overload deriveNativeMemoryLimitDefault(long, long) so unit tests cover the fallback branches deterministically. - native.allocator.root.limit defaults to 20% × node.native_memory.limit. - parquet.native.pool.{flight,ingest,query}.max default to 5%/8%/5% of node.native_memory.limit (sum 18% < root's 20%, leaving 2 pp headroom inside the framework cap). - datafusion.memory_pool_limit_bytes defaults to 75% × node.native_memory.limit (called out by @bharath-techie #3271093086: prior default Runtime.maxMemory() / 4 was JVM-heap derived, wrong baseline for an off-heap runtime). - datafusion.spill_memory_limit_bytes defaults to 50% × physical RAM, independent of node.native_memory.limit (spill is a disk-staging budget, not a memory budget). Behavior change to call out in upgrade notes: admission control is now active by default. Operators wanting pre-existing opt-out behavior can set node.native_memory.limit: 0b — all framework caps then fall back to Long.MAX_VALUE unbounded mode. ### 10. Translate Arrow OutOfMemoryException to OpenSearchRejectedExecutionException at per-request boundaries [reviewer feedback] Added an AllocationRejection helper in plugins/arrow-base that wraps Arrow OOM with OpenSearch's standard backpressure signal so the REST layer maps these failures to HTTP 429 (Too Many Requests) instead of a generic 500. Wraps applied at the per-request allocation boundaries: - DefaultPlanExecutor.executeInternal: per-query child allocator creation - VSRPool.tryRotate: VSR rotation during ingest Lifetime/startup allocations (FlightTransport.doStart, AnalyticsSearchService constructor, VSRPool.initializeActiveVSR) are intentionally left unwrapped — a failure there is a framework misconfiguration, not a per-request backpressure signal. ### 11. Integration tests for cap-enforcement boundaries Added NativeAllocatorBoundaryIT under plugins/arrow-flight-rpc/src/internalClusterTest. Three tests boot a single-node cluster with tight memory settings (root=16 MiB, pool maxes=16 MiB) and exercise real Arrow allocations via the Guice-injected ArrowNativeAllocator: - testPoolMaxRejectsAllocationsBeyondCap: PUT pool.query.max=4 MiB, verify a 8 MiB request through the pool throws OutOfMemoryException. - testRootLimitRejectsAllocationsBeyondCap: hold 8 MiB in FLIGHT, verify a 16 MiB QUERY request fails at the root level (8+16 > 16 root cap) even though QUERY's own max would individually allow it. - testDynamicPoolResizeAffectsInFlightAllocations: create child at Long.MAX_VALUE (the AnalyticsSearchService / DefaultPlanExecutor pattern post-listener-removal), verify a 2 MiB allocation succeeds, PUT query.max=1 MiB via cluster settings, verify a 2 MiB request now throws OOM via Arrow's parent-cap check at allocateBytes. These cover the boundaries the framework is supposed to enforce. The third test specifically verifies the listener-replacement contract. ### 12. Unit tests for codecov patch-coverage gates Added unit tests covering paths flagged by codecov as uncovered diff lines: - ArrowBasePluginTests: * extended testPoolMaxRejectsNegative to cover INGEST_MAX and QUERY_MAX rejection branches. * testBuildAllocatorWiresAllPoolsAndSettingsConsumers exercises the full createComponents code path via the extracted buildAllocator helper: all three pools registered, pool maxes match operator-set values, cluster-settings update consumers wired. - NodeStatsTests: * testPluginStatsDropsAllEntriesWhenReceiverHasNoRegistry covers the registry-null branch in NodeStats.readPluginStats (defensive path when the parent stream has no NamedWriteableRegistry attached). * testPluginNodeStatsDefaultReturnsEmpty covers the Plugin#nodeStats() default empty-list return. - NodeResourceUsageTrackerTests: * testDeriveNativeMemoryLimitDefaultFallbackPaths covers the ResourceTrackerSettings.deriveNativeMemoryLimitDefault fallback branches (probe-returns-0, heap >= ram) plus the happy path. - AllocationRejectionTests: * 4 tests covering success pass-through, OOM translation, non-OOM propagation, and the Runnable overload of AllocationRejection.wrap. ## How a query flows through these layers Take a concrete example: a user issues a PPL query that goes through analytics-engine and dispatches to DataFusion. 1. Coordinator receives the request. → Admission control checks node.native_memory.limit budget. → If OK, proceeds; otherwise rejects with 429. 2. AnalyticsSearchService creates a per-fragment Arrow allocator: allocator = getPoolAllocator(POOL_QUERY).newChildAllocator("frag-N", 0, Long.MAX_VALUE) → Future BufferAllocator.buffer(...) calls on this allocator increment POOL_QUERY's counter and the root counter via Arrow's parent-cap chain. → Bounded by parquet.native.pool.query.max. 3. AnalyticsSearchService dispatches to DataFusion via NativeBridge. → NativeBridge.executeQueryAsync(...) marshals plan bytes via FFM. 4. DataFusion runs the query in Rust: → HashAggregate builds a hash table. → reservation.try_grow(50MB) → DataFusion MemoryPool counter += 50MB. → Bounded by datafusion.memory_pool_limit_bytes. → If exceeded, HashAggregate spills to disk (which itself is bounded by datafusion.spill_memory_limit_bytes). 5. DataFusion produces a result batch in Rust. → Java imports it via Arrow C Data Interface. → The import allocates Java ArrowBufs under the per-fragment allocator from step 2. POOL_QUERY counter += result_size. 6. Result returns to coordinator. → Per-fragment allocator closes; POOL_QUERY counter decrements. → DataFusion query completes; MemoryPool counter decrements. Each layer accounts for what it owns. No double-counting. Each operator-tunable knob bounds a real, observable thing. ## Verification ./gradlew -Dsandbox.enabled=true \ :plugins:arrow-base:test :plugins:arrow-flight-rpc:test \ :sandbox:plugins:{analytics-engine,analytics-backend-datafusion,parquet-data-format}:test \ :server:test --tests "org.opensearch.node.resource.tracker.*" \ :server:test --tests "org.opensearch.action.admin.cluster.node.stats.NodeStatsTests" \ :plugins:arrow-flight-rpc:internalClusterTest --tests "*NativeAllocatorBoundaryIT*" \ spotlessJavaCheck → BUILD SUCCESSFUL By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
1 parent 40688ad commit 6d3436c

29 files changed

Lines changed: 1052 additions & 385 deletions

File tree

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocator.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ public interface NativeAllocator extends Closeable {
3636
PoolHandle getOrCreatePool(String poolName, long limit);
3737

3838
/**
39-
* Updates the limit of an existing pool.
39+
* Updates the limit of an existing pool. Children of the pool allocator
40+
* inherit the change automatically via Arrow's parent-cap check at
41+
* allocation time — no notification SPI is needed.
4042
*
4143
* @param poolName logical pool name
4244
* @param newLimit new maximum bytes for the pool
@@ -55,24 +57,6 @@ public interface NativeAllocator extends Closeable {
5557
*/
5658
NativeAllocatorPoolStats stats();
5759

58-
/**
59-
* Registers a listener that is invoked after a pool's limit changes.
60-
*
61-
* <p>Listener invocation is synchronous on the caller thread. See
62-
* {@link NativeAllocatorListener} for threading constraints.
63-
*
64-
* @param listener the listener to register
65-
*/
66-
void addListener(NativeAllocatorListener listener);
67-
68-
/**
69-
* Unregisters a previously registered listener. No-op if the listener
70-
* was not registered.
71-
*
72-
* @param listener the listener to remove
73-
*/
74-
void removeListener(NativeAllocatorListener listener);
75-
7660
/**
7761
* Opaque handle to a memory pool. Plugins downcast to the concrete type
7862
* (e.g., Arrow's {@code BufferAllocator}) in the implementation layer.

libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocatorListener.java

Lines changed: 0 additions & 39 deletions
This file was deleted.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.arrow.allocator;
10+
11+
import org.apache.arrow.memory.OutOfMemoryException;
12+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
13+
14+
import java.util.function.Supplier;
15+
16+
/**
17+
* Translates Arrow's {@link OutOfMemoryException} into OpenSearch's standard
18+
* {@link OpenSearchRejectedExecutionException} at allocation boundaries.
19+
*
20+
* <p>Arrow throws {@code OutOfMemoryException} when an allocation would exceed
21+
* a parent's {@code allocationLimit}. This is a per-request, recoverable
22+
* condition — the caller should back off and retry — but Arrow's exception type
23+
* is not recognized by OpenSearch's REST layer, so it surfaces to clients as a
24+
* generic 500. {@link OpenSearchRejectedExecutionException} (a subclass of
25+
* {@link java.util.concurrent.RejectedExecutionException}) maps to HTTP 429
26+
* (Too Many Requests) and is the framework's standard rejection signal.
27+
*
28+
* <p>Apply this wrapper at <em>per-request</em> allocation sites — places where
29+
* a failure represents "this request didn't fit" rather than "the framework
30+
* failed to start". Examples:
31+
* <ul>
32+
* <li>per-query child allocator creation in
33+
* {@code DefaultPlanExecutor.executeInternal}</li>
34+
* <li>per-VSR child allocator creation in
35+
* {@code ArrowBufferPool.createChildAllocator}</li>
36+
* <li>request-handler-side allocations in transport/RPC layers</li>
37+
* </ul>
38+
*
39+
* <p><strong>Do not</strong> wrap startup-time / lifetime-of-component
40+
* allocations (e.g. transport server/client allocators created in
41+
* {@code doStart}, service-level allocators created in plugin constructors).
42+
* Those failing during node init is a configuration error, not a per-request
43+
* back-pressure signal — they should propagate as the original exception.
44+
*/
45+
public final class AllocationRejection {
46+
47+
private AllocationRejection() {}
48+
49+
/**
50+
* Runs {@code body} and translates any Arrow {@link OutOfMemoryException}
51+
* into {@link OpenSearchRejectedExecutionException}.
52+
*
53+
* @param context short label included in the rejection message (typically the
54+
* pool or operation name; e.g. "query-pool", "ingest-vsr")
55+
* @param body the allocation site to invoke
56+
* @param <T> return type of {@code body}
57+
* @return whatever {@code body} returns on success
58+
* @throws OpenSearchRejectedExecutionException if {@code body} throws
59+
* {@link OutOfMemoryException}; the original Arrow exception is
60+
* attached as the cause
61+
*/
62+
public static <T> T wrap(String context, Supplier<T> body) {
63+
try {
64+
return body.get();
65+
} catch (OutOfMemoryException e) {
66+
throw rejection(context, e);
67+
}
68+
}
69+
70+
/**
71+
* Runs {@code body} and translates any Arrow {@link OutOfMemoryException}
72+
* into {@link OpenSearchRejectedExecutionException}. Use this overload for
73+
* void allocation sites.
74+
*/
75+
public static void wrap(String context, Runnable body) {
76+
try {
77+
body.run();
78+
} catch (OutOfMemoryException e) {
79+
throw rejection(context, e);
80+
}
81+
}
82+
83+
private static OpenSearchRejectedExecutionException rejection(String context, OutOfMemoryException cause) {
84+
OpenSearchRejectedExecutionException rejection = new OpenSearchRejectedExecutionException(
85+
"native memory allocation rejected at [" + context + "]: " + cause.getMessage()
86+
);
87+
rejection.initCause(cause);
88+
return rejection;
89+
}
90+
}

0 commit comments

Comments
 (0)