Skip to content

Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats#21732

Open
gaurav-amz wants to merge 11 commits into
opensearch-project:mainfrom
gaurav-amz:unified-allocator-followup
Open

Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats#21732
gaurav-amz wants to merge 11 commits into
opensearch-project:mainfrom
gaurav-amz:unified-allocator-followup

Conversation

@gaurav-amz
Copy link
Copy Markdown
Contributor

@gaurav-amz gaurav-amz commented May 19, 2026

Description

Builds on #21703 to add the query, datafusion with allocator, dynamic-tuning, stats, and pool-wiring pieces.

Architecture: three native-memory trackers, three knobs

Each tracker owns the bytes it can actually see, with a process-level cap above all of them.

            ┌──────────────────────────────────────────────────────────┐
            │  Process-level cap (admission control)                   │
            │                                                          │
            │  node.native_memory.limit                                │
            │    Defaults to 79% × (RAM − JVM heap), cgroup-aware.     │
            │                                                          │
            │  ┌────────────────────────┐  ┌────────────────────────┐  │
            │  │ Arrow tracker          │  │ DataFusion tracker     │  │
            │  │ (ArrowNativeAllocator. │  │ (Rust MemoryPool)      │  │
            │  │  root)                 │  │                        │  │
            │  │                        │  │ Tracks:                │  │
            │  │ Tracks:                │  │   HashAggregate        │  │
            │  │   POOL_FLIGHT (5% NM)  │  │   Sort buffers         │  │
            │  │   POOL_INGEST (8% NM)  │  │   TopK heap            │  │
            │  │   POOL_QUERY  (5% NM)  │  │   Spill staging        │  │
            │  │                        │  │                        │  │
            │  │ Bounded by:            │  │ Bounded by:            │  │
            │  │  native.allocator      │  │  datafusion.memory_    │  │
            │  │  .root.limit           │  │  pool_limit_bytes      │  │
            │  │  (20% × NM by default) │  │  (75% × NM by default) │  │
            │  └────────────────────────┘  └────────────────────────┘  │
            │                                                          │
            └──────────────────────────────────────────────────────────┘

Three layers, three responsibilities. Each is necessary; none can be replaced by another.

  • Arrow tracker (ArrowNativeAllocator) accounts for every Arrow BufferAllocator allocation in the JVM. Partitioned into FLIGHT / INGEST / QUERY pools so one plugin can't starve another. All Arrow buffers descend from the same RootAllocator, preserving Arrow's same-root invariant for cross-plugin zero-copy handoff.
  • DataFusion tracker (Rust MemoryPool) accounts for DataFusion's own working memory and triggers spill / fail-fast when a query exceeds budget. Lives entirely on the Rust side; updates flow in via FFM through NativeBridge.setMemoryPoolLimit.
  • Admission control caps the OS process. node.native_memory.limit is the operator-declared off-heap budget AC throttles against; framework-derived defaults (root, pools, DataFusion pool) all scale from this single number.

POOL_DATAFUSION is intentionally not present. DataFusion's working memory is allocated by Rust operators directly and reported only to DataFusion's own MemoryPool — it never flows through Arrow's BufferAllocator API. Adding a Java-side pool that pretended to track it would have required either a per-allocation FFM round-trip (performance disaster) or a config-only mirror (a setting that returns HTTP 200 and silently does nothing). The Rust-side datafusion.memory_pool_limit_bytes is the honest knob for that layer.

Worked example: 64 GB / 16 GB-heap node

With operator-declared -Xmx16g on a 64 GB host (bare metal or cgroup-limited container), defaults compose to:

64 GB RAM
  │ ├── JVM Heap (16 GB / 25%)
  │ │   ├── Indexing pressure, Writer buffer
  │ │   ├── Transport Netty
  │ │   └── etc.
  │ │
  │ └── Off-Heap (48 GB / 75%)
  │     │
  │     ├── node.native_memory.limit (37.92 GB / 79% of off-heap)
  │     │   │
  │     │   ├── Java Arrow Root (7.58 GB / 20% of NM)
  │     │   │   ├── POOL_FLIGHT  (1.90 GB / 5% of NM)
  │     │   │   │   └── server, client — flight transport batches
  │     │   │   ├── POOL_INGEST  (3.03 GB / 8% of NM)
  │     │   │   │   └── ArrowBufferPool / parquet VSR allocators
  │     │   │   └── POOL_QUERY   (1.90 GB / 5% of NM)
  │     │   │       └── analytics-search-service → per-query children
  │     │   │
  │     │   └── DataFusion Rust pool (28.44 GB / 75% of NM)
  │     │       └── datafusion.memory_pool_limit_bytes
  │     │           ├── hash tables, sort buffers, aggs, intermediate batches
  │     │           └── triggers spill on exhaustion
  │     │
  │     └── Unmanaged (~10.08 GB / 21% of off-heap)
  │         ├── Lucene mmap
  │         ├── OS Page Cache
  │         └── Sidecars / non-Arrow native consumers
  │
  │ Independent (disk):
  │   datafusion.spill_memory_limit_bytes (32 GB / 50% of physical RAM)

As a flat table for quick reference:

RAM                                              64 GB
JVM heap (operator-configured -Xmx)              16 GB
Off-heap (RAM − heap)                            48 GB
node.native_memory.limit  (79% of off-heap)      37.92 GB    ← AC throttle threshold
  native.allocator.root.limit  (20% of NM)        7.58 GB    ← Arrow framework cap
    pool.flight.max  (5% of NM)                   1.90 GB    ← Flight transport pool
    pool.ingest.max  (8% of NM)                   3.03 GB    ← Parquet VSR pool
    pool.query.max   (5% of NM)                   1.90 GB    ← analytics-engine pool
  datafusion.memory_pool_limit  (75% of NM)      28.44 GB    ← Rust runtime pool (sibling)
Unmanaged (Lucene mmap, OS page cache, etc.)     10.08 GB

Independent budget (disk staging, not memory):

datafusion.spill_memory_limit_bytes (50% of physical RAM)    32 GB

How a query flows through these layers

Take a concrete example: a user issues a PPL query that goes through analytics-engine and dispatches to DataFusion.

  1. Coordinator receives the request.
     → Admission control checks node.native_memory.limit budget.
     → If OK, proceeds; otherwise rejects with 429.

  2. AnalyticsSearchService creates a per-fragment Arrow allocator:
        allocator = getPoolAllocator(POOL_QUERY).newChildAllocator("frag-N", 0, Long.MAX_VALUE)
     → Future BufferAllocator.buffer(...) calls on this allocator
       increment POOL_QUERY's counter and the root counter via Arrow's
       parent-cap chain at allocateBytes.
     → Bounded by native.allocator.pool.query.max.

  3. AnalyticsSearchService dispatches to DataFusion via NativeBridge.
     → NativeBridge.executeQueryAsync(...) marshals plan bytes via FFM.

  4. DataFusion runs the query in Rust:
     → HashAggregate builds a hash table.
     → reservation.try_grow(50MB) → DataFusion MemoryPool counter += 50MB.
     → Bounded by datafusion.memory_pool_limit_bytes.
     → If exceeded, HashAggregate spills to disk
       (which itself is bounded by datafusion.spill_memory_limit_bytes).

  5. DataFusion produces a result batch in Rust.
     → Java imports it via Arrow C Data Interface.
     → The import allocates Java ArrowBufs under the per-fragment allocator
       from step 2. POOL_QUERY counter += result_size.

  6. Result returns to coordinator.
     → Per-fragment allocator closes; POOL_QUERY counter decrements.
     → DataFusion query completes; MemoryPool counter decrements.

Each layer accounts for what it owns. No double-counting (the result bytes are counted in DataFusion's pool only while they exist as Rust buffers; after Java imports them, ownership transfers and the Java side counts them; when Java closes the import, the bytes are freed). Each operator-tunable knob bounds a real, observable thing.

Operator surface

After this PR, an operator inspecting a node running the query above sees:

{
  "native_allocator": {
    "root": {"allocated": "150MB", "limit": "7.58GB"},
    "pools": {
      "flight":  {"allocated": "20MB",  "limit": "1.90GB"},
      "ingest":  {"allocated": "30MB",  "limit": "3.03GB"},
      "query":   {"allocated": "100MB", "limit": "1.90GB"}
    }
  },
  "datafusion": {
    "memory_pool": {"usage": "2.4GB", "limit": "28.44GB"},
    "spill":       {"usage": "0",     "limit": "32GB"}
  }
}

Three numbers, three sources, three knobs. The operator looks at each in isolation when tuning that layer:

  • "DataFusion queries are OOMing" → bump datafusion.memory_pool_limit_bytes.
  • "Flight RPC is starving Parquet ingest" → bump parquet.native.pool.flight.max or lower parquet.native.pool.ingest.max.
  • "Whole node is using too much memory" → lower node.native_memory.limit.

All pool min/max settings are Setting.Property.Dynamic. Pool limit changes propagate to consumer-side child allocators automatically via Arrow's parent-cap check at allocateBytes — child allocators are created with Long.MAX_VALUE and inherit the live parent cap on every allocation, so dynamic resizes reach in-flight workloads without restart and without an explicit notification SPI. The grouped validator rejects cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time with HTTP 400 rather than at the next allocation.

Behavior change to call out

Admission control is now active by default. node.native_memory.limit defaults to 79% × (RAM − JVM heap) instead of 0 (unconfigured). On upgrade, AC will start tracking native memory utilization and the framework's pool caps will derive sensible values from the operator's declared off-heap budget without any explicit configuration.

Operators who want pre-existing opt-out behavior (AC unconfigured, all framework caps unbounded) can set:

node.native_memory.limit: 0b

This restores the prior "explicit-opt-in" semantics — useful for nodes where Lucene mmap or non-Arrow native consumers dominate and the operator does not want admission control throttling search/index traffic.

Changes after initial review

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@gaurav-amz gaurav-amz requested a review from a team as a code owner May 19, 2026 08:45
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 19, 2026

PR Reviewer Guide 🔍

(Review updated until commit ab0dfd3)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The grouped validator runs on every dynamic update but the initial node-settings validation at startup is only in buildAllocator, which is called from createComponents. If an operator ships a malformed opensearch.yml (e.g., sum of pool mins exceeds root), the node starts and the validator only fires on the first cluster-state update. The node should reject the config at startup before accepting traffic.

private static void validateUpdate(Settings settings) {
    long rootLimit = ROOT_LIMIT_SETTING.get(settings);
    long flightMin = FLIGHT_MIN_SETTING.get(settings);
    long flightMax = FLIGHT_MAX_SETTING.get(settings);
    long ingestMin = INGEST_MIN_SETTING.get(settings);
    long ingestMax = INGEST_MAX_SETTING.get(settings);
    long queryMin = QUERY_MIN_SETTING.get(settings);
    long queryMax = QUERY_MAX_SETTING.get(settings);
    validateMinMax(NativeAllocatorPoolConfig.POOL_FLIGHT, flightMin, flightMax);
    validateMinMax(NativeAllocatorPoolConfig.POOL_INGEST, ingestMin, ingestMax);
    validateMinMax(NativeAllocatorPoolConfig.POOL_QUERY, queryMin, queryMax);
    validateMinSum(rootLimit, flightMin, ingestMin, queryMin);
}
Possible Issue

When rebalancerEnabled is false, getOrCreatePool sets initial limit to max. If max is Long.MAX_VALUE and a pool is created before any explicit limit is set, the child allocator starts unbounded. A subsequent setPoolLimit call to a finite value will cap it, but allocations made between pool creation and the first setPoolLimit are uncapped. This can exhaust memory if a workload starts immediately after node boot and before cluster settings propagate.

    poolMins.putIfAbsent(poolName, min);
    poolMaxes.putIfAbsent(poolName, max);
    return pools.computeIfAbsent(poolName, name -> {
        // Pick an initial limit that's safe for both rebalancer-on and rebalancer-off
        // deployments. When rebalancing is enabled, start at min (the original PR's
        // "guarantee + burst" semantics): the next rebalance tick will distribute
        // headroom up to each pool's max. When rebalancing is disabled (the default),
        // pools with min=0 would otherwise reject every allocation until a tick that
        // never comes — start at max so consumers can allocate immediately.
        long initial = rebalancerEnabled ? min : max;
        BufferAllocator child = root.newChildAllocator(name, 0, initial);
        return new ArrowPoolHandle(child);
    });
}
Possible Issue

setPoolMin raises the live limit to newMin capped at max, but if max is Long.MAX_VALUE and newMin is also Long.MAX_VALUE, the pool limit becomes Long.MAX_VALUE. A subsequent allocation can exhaust memory. The code should clamp target to a sane ceiling or reject Long.MAX_VALUE mins explicitly.

public void setPoolMin(String poolName, long newMin) {
    ArrowPoolHandle handle = pools.get(poolName);
    if (handle == null) {
        throw new IllegalStateException("Pool '" + poolName + "' does not exist");
    }
    poolMins.put(poolName, newMin);
    long max = poolMaxes.getOrDefault(poolName, Long.MAX_VALUE);
    long current = handle.allocator.getLimit();
    long target = Math.min(newMin, max);
    if (target > current) {
        handle.allocator.setLimit(target);
    }
}
Possible Issue

readPluginStats silently drops entries when the registry is null or when deserialization fails. If a plugin's stats are critical for monitoring, operators lose visibility without any indication in the API response. The method should at least log a warning at INFO level when entries are dropped, or include a count of skipped entries in the response metadata.

private static Map<String, PluginNodeStats> readPluginStats(StreamInput in) throws IOException {
    int size = in.readVInt();
    if (size == 0) {
        return Collections.emptyMap();
    }
    NamedWriteableRegistry registry = in.namedWriteableRegistry();
    Map<String, PluginNodeStats> result = new HashMap<>(size);
    for (int i = 0; i < size; i++) {
        String name = in.readString();
        BytesReference payload = in.readBytesReference();
        if (registry == null) {
            // No registry attached to the parent stream — we can't deserialize any
            // entry. Drop the whole map. This branch is defensive; production paths
            // always set a registry when they expect named writeables.
            continue;
        }
        try (
            StreamInput rawIn = payload.streamInput();
            NamedWriteableAwareStreamInput payloadIn = new NamedWriteableAwareStreamInput(rawIn, registry)
        ) {
            payloadIn.setVersion(in.getVersion());
            PluginNodeStats stats = payloadIn.readNamedWriteable(PluginNodeStats.class);
            result.put(name, stats);
        } catch (IOException | IllegalArgumentException e) {
            // Receiver doesn't have the plugin's NamedWriteable registered (typical
            // during rolling upgrades or in a non-uniform plugin install). Drop the
            // entry; the rest of NodeStats remains decodable.
        }
    }
    return result;
}
Possible Issue

deriveMemoryPoolLimitDefault returns Long.MAX_VALUE when nativeLimit.getBytes() is zero or negative. If an operator explicitly sets node.native_memory.limit to a negative value (which the setting parser might allow as a ByteSizeValue), the DataFusion pool becomes unbounded instead of rejecting the config. The fallback should only trigger on exactly zero, not on any non-positive value.

static String deriveMemoryPoolLimitDefault(Settings settings) {
    ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
    if (nativeLimit.getBytes() <= 0) {
        return Long.toString(Long.MAX_VALUE);
    }
    // 75% of node.native_memory.limit. DataFusion is the dominant native consumer for
    // analytics workloads; operators tune via the dynamic setting once they characterize
    // their workload.
    long pool = Math.max(0L, nativeLimit.getBytes() * 75 / 100);
    return Long.toString(pool);
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 19, 2026

PR Code Suggestions ✨

Latest suggestions up to ab0dfd3

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent arithmetic overflow in percentage calculation

Check for potential overflow when computing nativeLimit.getBytes() * 20. If
nativeLimit.getBytes() is close to Long.MAX_VALUE, multiplying by 20 can overflow
before the division. Use Math.multiplyExact or reorder the operation as
(nativeLimit.getBytes() / 100) * 20 to avoid overflow.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [88-94]

 static String deriveRootLimitDefault(Settings settings) {
     ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
     if (nativeLimit.getBytes() <= 0) {
         return Long.toString(Long.MAX_VALUE);
     }
-    return Long.toString(nativeLimit.getBytes() * 20 / 100);
+    long bytes = nativeLimit.getBytes();
+    return Long.toString((bytes / 100) * 20);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential overflow when nativeLimit.getBytes() is large. Reordering to (bytes / 100) * 20 avoids overflow and is a valid improvement. However, the impact is moderate because the overflow scenario is unlikely in practice (requires nativeLimit near Long.MAX_VALUE / 20).

Medium

Previous suggestions

Suggestions up to commit b8f7374
CategorySuggestion                                                                                                                                    Impact
Possible issue
Guard against overflow in percentage calculation

The multiplication nativeLimit.getBytes() * percent can overflow when nativeLimit is
near Long.MAX_VALUE and percent is non-zero. Use Math.multiplyExact or clamp the
result to prevent silent wraparound.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [217-224]

 static String derivePoolMaxDefault(Settings settings, int percent) {
     ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
     if (nativeLimit.getBytes() <= 0) {
         return Long.toString(Long.MAX_VALUE);
     }
-    long pool = Math.max(0L, nativeLimit.getBytes() * percent / 100);
+    long bytes = nativeLimit.getBytes();
+    long pool = (bytes > Long.MAX_VALUE / percent) ? Long.MAX_VALUE : Math.max(0L, bytes * percent / 100);
     return Long.toString(pool);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion identifies a real overflow risk when nativeLimit.getBytes() is large and percent is non-zero. The proposed guard (bytes > Long.MAX_VALUE / percent) prevents silent wraparound. This is a valid correctness concern, though the likelihood is low in practice since percent is typically small (5, 8, 20) and nativeLimit is bounded by physical RAM.

Medium
General
Return early when no pools exist

Division by zero is guarded, but when poolCount is zero the method continues to
iterate over an empty pools map. Consider returning early when pools.isEmpty() to
avoid unnecessary work and clarify intent.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [246-247]

-long bonusPerPool = poolCount > 0 ? headroom / poolCount : 0;
+if (pools.isEmpty()) {
+    return;
+}
+long bonusPerPool = headroom / pools.size();
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that when pools.isEmpty() the loop body never executes, making the division and iteration unnecessary. However, the existing code already guards against division by zero with the ternary operator. The improvement is a minor optimization that clarifies intent but does not fix a bug.

Low
Suggestions up to commit d96e97c
CategorySuggestion                                                                                                                                    Impact
General
Validate min-max constraint before updating state

The method updates poolMins before validating that newMin is within acceptable
bounds. If newMin exceeds max, the recorded min will be inconsistent with the
enforced limit. Validate newMin <= max before updating poolMins to maintain
invariant consistency.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [160-172]

 public void setPoolMin(String poolName, long newMin) {
     ArrowPoolHandle handle = pools.get(poolName);
     if (handle == null) {
         throw new IllegalStateException("Pool '" + poolName + "' does not exist");
     }
+    long max = poolMaxes.getOrDefault(poolName, Long.MAX_VALUE);
+    if (newMin > max) {
+        throw new IllegalArgumentException("Pool '" + poolName + "' min (" + newMin + ") exceeds max (" + max + ")");
+    }
     poolMins.put(poolName, newMin);
-    long max = poolMaxes.getOrDefault(poolName, Long.MAX_VALUE);
     long current = handle.allocator.getLimit();
-    long target = Math.min(newMin, max);
-    if (target > current) {
-        handle.allocator.setLimit(target);
+    if (newMin > current) {
+        handle.allocator.setLimit(newMin);
     }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that poolMins is updated before validating the min-max constraint. However, the current code uses Math.min(newMin, max) to cap the target, which prevents setting an invalid limit. The suggestion to throw an exception when newMin > max would enforce stricter validation and maintain clearer invariants, but the existing code is not incorrect—it silently caps rather than rejecting. The improvement is moderate.

Medium
Early-return when no pools exist

Division by zero is guarded, but when poolCount is zero the method continues to
iterate over an empty map. Early-return when pools.isEmpty() to avoid unnecessary
work and clarify intent.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [237-250]

 void rebalance() {
-    ...
+    if (pools.isEmpty()) {
+        return;
+    }
+    long rootLimit = root.getLimit();
+    long totalAllocated = 0;
+    for (Map.Entry<String, ArrowPoolHandle> entry : pools.entrySet()) {
+        totalAllocated += entry.getValue().allocator.getAllocatedMemory();
+    }
     long headroom = Math.max(0, rootLimit - totalAllocated);
-    int poolCount = pools.size();
-    long bonusPerPool = poolCount > 0 ? headroom / poolCount : 0;
+    long bonusPerPool = headroom / pools.size();
     ...
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion is correct that an early return when pools.isEmpty() would avoid unnecessary work. The existing code already guards division by zero with poolCount > 0 ? headroom / poolCount : 0, so the method is safe. The improvement is a minor optimization that clarifies intent but does not fix a bug.

Low
Remove redundant exception handler after guard

The method catches UnsupportedOperationException after checking
isSpillLimitDynamic(), but the check-then-act pattern is racy if another thread
changes the service state. Remove the redundant catch block since the guard already
prevents the call when the symbol is absent.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [350-375]

 void updateSpillMemoryLimit(long newLimitBytes) {
     DataFusionService service = dataFusionService;
     if (service == null) {
         logger.debug("DataFusion service not yet initialized; ignoring spill limit update to {}B", newLimitBytes);
         return;
     }
     if (!service.isSpillLimitDynamic()) {
         logger.warn(
             "Updated DataFusion spill memory limit to {}B at the cluster level; the loaded native library does not "
                 + "support runtime spill resize, so the new value will only take effect after a node restart",
             newLimitBytes
         );
         return;
     }
     try {
         service.setSpillMemoryLimit(newLimitBytes);
         logger.info("Updated DataFusion spill memory limit to {}B", newLimitBytes);
     } catch (IllegalStateException e) {
         logger.warn("Ignoring spill memory limit update to {}B; service is not running", newLimitBytes);
-    } catch (UnsupportedOperationException e) {
-        logger.warn("Ignoring spill memory limit update to {}B; native runtime does not support live updates", newLimitBytes);
     }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion argues that the UnsupportedOperationException catch block is redundant after the isSpillLimitDynamic() guard. However, the existing code includes a comment acknowledging the race condition and defends against it. Removing the catch would make the code less defensive. The suggestion is valid as a simplification but does not address a real issue given the defensive comment.

Low
Suggestions up to commit 79ca7c2
CategorySuggestion                                                                                                                                    Impact
General
Log dropped plugin stats entries

Silently dropping unknown plugin stats entries can hide genuine deserialization
bugs. Log the dropped entry at debug level so operators can diagnose missing plugins
or version mismatches during rolling upgrades.

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java [305-316]

 try (
     StreamInput rawIn = payload.streamInput();
     NamedWriteableAwareStreamInput payloadIn = new NamedWriteableAwareStreamInput(rawIn, registry)
 ) {
     payloadIn.setVersion(in.getVersion());
     PluginNodeStats stats = payloadIn.readNamedWriteable(PluginNodeStats.class);
     result.put(name, stats);
 } catch (IOException | IllegalArgumentException e) {
-    // Receiver doesn't have the plugin's NamedWriteable registered (typical
-    // during rolling upgrades or in a non-uniform plugin install). Drop the
-    // entry; the rest of NodeStats remains decodable.
+    logger.debug("Skipping unknown plugin stats entry [{}]: {}", name, e.getMessage());
 }
Suggestion importance[1-10]: 7

__

Why: Adding debug logging for dropped plugin stats entries would significantly improve observability during rolling upgrades and plugin version mismatches. This is a practical enhancement that helps operators diagnose issues without changing the core behavior. The suggestion correctly identifies a gap in operational visibility.

Medium
Include overflow values in error message

The overflow check is correct, but the error message should include the actual
values that caused the overflow to aid debugging. Include the current sum and the
min being added.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [377-381]

 try {
     sum = Math.addExact(sum, min);
 } catch (ArithmeticException overflow) {
-    throw new IllegalArgumentException("Sum of pool minimums overflows.", overflow);
+    throw new IllegalArgumentException(
+        "Sum of pool minimums overflows (current sum: " + sum + ", adding: " + min + ")",
+        overflow
+    );
 }
Suggestion importance[1-10]: 6

__

Why: Adding the actual values that caused the overflow to the error message would improve debuggability. This is a reasonable enhancement that helps operators diagnose configuration issues, though the existing error message is already functional.

Low
Return early when no pools exist

When poolCount is zero, the method should return early instead of computing a zero
bonus and iterating over an empty map. This avoids unnecessary work and makes the
no-pools case explicit.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [246-247]

-long bonusPerPool = poolCount > 0 ? headroom / poolCount : 0;
+if (poolCount == 0) {
+    return;
+}
+long bonusPerPool = headroom / poolCount;
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies a minor optimization opportunity. Returning early when poolCount is zero avoids unnecessary computation and iteration, though the performance impact is negligible since the loop would be empty anyway. The improvement is valid but has low practical impact.

Low
Remove redundant Math.max guard

The Math.max(0L, ...) call is defensive but unnecessary since nativeLimit.getBytes()
> 0 is already guaranteed by the preceding check. Remove it to simplify the logic.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [110-120]

 static String deriveMemoryPoolLimitDefault(Settings settings) {
     ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
     if (nativeLimit.getBytes() <= 0) {
         return Long.toString(Long.MAX_VALUE);
     }
-    long pool = Math.max(0L, nativeLimit.getBytes() * 75 / 100);
+    long pool = nativeLimit.getBytes() * 75 / 100;
     return Long.toString(pool);
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that Math.max(0L, ...) is redundant given the preceding check ensures nativeLimit.getBytes() > 0. However, the defensive guard is harmless and may protect against future code changes. The improvement is minor and primarily stylistic.

Low
Suggestions up to commit d4df93c
CategorySuggestion                                                                                                                                    Impact
General
Validate sum of pool mins for overflow

When rootLimit is Long.MAX_VALUE, the validator returns early without checking for
overflow in the sum of mins. If the sum of mins overflows but rootLimit is
unbounded, the overflow goes undetected. Consider validating the sum even when
rootLimit is MAX_VALUE to catch configuration errors early.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [374-386]

 static void validateMinSum(long rootLimit, long... mins) {
-    if (rootLimit == Long.MAX_VALUE) {
-        return;
-    }
     long sum = 0;
     for (long min : mins) {
         try {
             sum = Math.addExact(sum, min);
         } catch (ArithmeticException overflow) {
             throw new IllegalArgumentException("Sum of pool minimums overflows.", overflow);
         }
     }
-    if (sum > rootLimit) {
-        ...
+    if (rootLimit != Long.MAX_VALUE && sum > rootLimit) {
+        throw new IllegalArgumentException(
+            "Sum of pool minimums (" + sum + ") exceeds root limit (" + rootLimit + ")"
+        );
     }
 }
Suggestion importance[1-10]: 6

__

Why: Valid observation that overflow checking is skipped when rootLimit is Long.MAX_VALUE. The suggested change would catch configuration errors where operators set pool mins that overflow even when the root is unbounded. This improves robustness by detecting invalid configurations earlier, though it's a relatively minor edge case.

Low
Consolidate redundant exception handling branches

The method catches IllegalStateException and UnsupportedOperationException
separately but logs nearly identical messages. The isSpillLimitDynamic() guard above
should make the UnsupportedOperationException branch unreachable. Consider removing
the redundant catch block or consolidating the error handling to reduce code
duplication.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [350-375]

 void updateSpillMemoryLimit(long newLimitBytes) {
     DataFusionService service = dataFusionService;
     if (service == null) {
         logger.debug("DataFusion service not yet initialized; ignoring spill limit update to {}B", newLimitBytes);
         return;
     }
     if (!service.isSpillLimitDynamic()) {
         logger.warn(
             "Updated DataFusion spill memory limit to {}B at the cluster level; the loaded native library does not "
                 + "support runtime spill resize, so the new value will only take effect after a node restart",
             newLimitBytes
         );
         return;
     }
     try {
         service.setSpillMemoryLimit(newLimitBytes);
         logger.info("Updated DataFusion spill memory limit to {}B", newLimitBytes);
-    } catch (IllegalStateException e) {
-        logger.warn("Ignoring spill memory limit update to {}B; service is not running", newLimitBytes);
-    } catch (UnsupportedOperationException e) {
-        logger.warn("Ignoring spill memory limit update to {}B; native runtime does not support live updates", newLimitBytes);
+    } catch (IllegalStateException | UnsupportedOperationException e) {
+        logger.warn("Ignoring spill memory limit update to {}B; {}", newLimitBytes, e.getMessage());
     }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that the UnsupportedOperationException catch is defensive (the isSpillLimitDynamic() guard should prevent it) and that consolidating the catch blocks would reduce duplication. However, the defensive catch may be intentional to guard against race conditions, and the improved code loses the specific error messages for each exception type.

Low
Document intentional truncation in headroom distribution

The division headroom / poolCount can lose precision when headroom is not evenly
divisible by poolCount. Consider using a more precise distribution strategy or
documenting the intentional truncation behavior to avoid subtle allocation
imbalances across pools.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [247]

+// Integer division intentionally truncates remainder; pools compete for headroom on next tick.
 long bonusPerPool = poolCount > 0 ? headroom / poolCount : 0;
Suggestion importance[1-10]: 3

__

Why: The suggestion correctly identifies that integer division truncates the remainder, but this is standard behavior and the code is clear. Adding a comment would be marginally helpful for maintainability, but the impact is minimal since the behavior is expected and documented in the method's Javadoc.

Low
Suggestions up to commit 3ab722f
CategorySuggestion                                                                                                                                    Impact
General
Prevent arithmetic overflow in rebalance calculation

The addition min + bonusPerPool can overflow when both values are large. Use
Math.addExact or a checked addition to detect overflow and handle it gracefully,
preventing silent wraparound to negative values.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [232-239]

-long effectiveLimit = min + bonusPerPool;
+long effectiveLimit;
+try {
+    effectiveLimit = Math.addExact(min, bonusPerPool);
+} catch (ArithmeticException overflow) {
+    effectiveLimit = Long.MAX_VALUE;
+}
 
 // Cap at pool's max
 effectiveLimit = Math.min(effectiveLimit, max);
 
 // Never drop below current allocation or min
 effectiveLimit = Math.max(effectiveLimit, currentAllocation);
 effectiveLimit = Math.max(effectiveLimit, min);
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential overflow in min + bonusPerPool. However, the suggested fix of clamping to Long.MAX_VALUE on overflow may not be the desired behavior in all cases. The issue is valid and should be addressed, but the specific handling strategy should be carefully considered.

Medium
Prevent overflow in percentage calculation

The multiplication nativeLimit.getBytes() * 75 can overflow when the limit is very
large (e.g., close to Long.MAX_VALUE / 75). Use checked arithmetic or reorder the
operation to divide first, then multiply to prevent overflow.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [106-116]

 static String deriveMemoryPoolLimitDefault(Settings settings) {
     ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
     if (nativeLimit.getBytes() <= 0) {
         return Long.toString(Long.MAX_VALUE);
     }
-    long pool = Math.max(0L, nativeLimit.getBytes() * 75 / 100);
+    long bytes = nativeLimit.getBytes();
+    long pool = (bytes / 100) * 75 + (bytes % 100) * 75 / 100;
     return Long.toString(pool);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential overflow when nativeLimit.getBytes() * 75 is computed for very large values. The proposed solution of dividing first is mathematically sound and prevents overflow, though it introduces slightly more complex arithmetic with the modulo operation to maintain precision.

Medium
Include overflow values in error message

The overflow check is correct, but the error message should include the actual
values that caused the overflow to aid debugging. Consider adding the current sum
and the min value being added to the exception message.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [377-381]

 try {
     sum = Math.addExact(sum, min);
 } catch (ArithmeticException overflow) {
-    throw new IllegalArgumentException("Sum of pool minimums overflows.", overflow);
+    throw new IllegalArgumentException(
+        "Sum of pool minimums overflows. Current sum: " + sum + ", attempting to add: " + min,
+        overflow
+    );
 }
Suggestion importance[1-10]: 5

__

Why: Adding the actual values that caused the overflow would improve debugging, but the current error message is already functional. This is a minor enhancement to error reporting rather than a critical fix.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 9188043: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from 9188043 to d419b2a Compare May 19, 2026 09:10
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 19, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit d646dd4.

PathLineSeverityDescription
plugins/arrow-flight-rpc/build.gradle25highNew dependency added: `compileOnly project(':libs:opensearch-arrow-spi')`. Per mandatory flagging rule, all dependency additions must be flagged regardless of apparent legitimacy. This is an internal monorepo project reference rather than an external registry artifact.
sandbox/plugins/analytics-backend-datafusion/build.gradle51highMultiple new dependencies added: `compileOnly project(':libs:opensearch-arrow-spi')`, `compileOnly project(':plugins:arrow-base')`, and `testImplementation project(':plugins:arrow-base')`. Per mandatory flagging rule, all dependency additions must be flagged. These are internal monorepo project references.
sandbox/plugins/analytics-engine/build.gradle66highNew dependency added (`compileOnly project(':libs:opensearch-arrow-spi')`) and dependency scope changed from `testRuntimeOnly` to `testImplementation` for `project(':plugins:arrow-base')`. Per mandatory flagging rule, all dependency changes must be flagged.
plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java86low`ensureForTesting()` is a `public static` method in production code that forcibly closes any existing static `INSTANCE` singleton and replaces it unconditionally. Documented as test-only but accessible to any caller at runtime, including plugin code; misuse could disrupt the node-wide allocator lifecycle outside of tests.

The table above displays the top 10 most important findings.

Total: 4 | Critical: 0 | High: 3 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from d419b2a to 10f3617 Compare May 19, 2026 10:00
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 498b502

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 498b502: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Comment on lines +30 to +39
public interface NativeAllocatorListener {

/**
* Invoked after a pool's limit has been updated.
*
* @param poolName logical pool name
* @param newLimit the new limit in bytes
*/
void onPoolLimitChanged(String poolName, long newLimit);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use ClusterSetting's update consumer instead.

Copy link
Copy Markdown
Contributor Author

@gaurav-amz gaurav-amz May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the FlightTransport / AnalyticsSearchService / DefaultPlanExecutor migration, the SPI had zero production callers.

Copy link
Copy Markdown
Contributor

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to reject the request, throw OpenSearchRejectedException rather than oom-ing. Also lets try to see how we can wire up circuit-breaker for this. Maybe circuit-breaker stats is something we leverage for tracking memory used

// and capped by the framework alongside ingest, query, and datafusion. Hard-fail if
// the framework plugin is missing — silently falling back to a separate root would
// break the same-root invariant for cross-plugin Arrow handoff.
ArrowNativeAllocator nativeAllocator = ArrowNativeAllocator.instance();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the way of providing ArrowNativeAllocator.

See ArrowAllocatorService as the example.

Initialize:

// ArrowBasePlugin component: node-level Arrow allocator service for cross-plugin zero-copy
this.allocatorService = new DefaultArrowAllocatorService(allocator);

And return in createComponents, so will be part of GUICE

How plugin get it:
From plugin's createComponents

this.allocatorService = pluginComponentRegistry.getComponent(ArrowAllocatorService.class)
.orElseThrow(() -> new IllegalStateException("ArrowAllocatorService not available; arrow-base plugin must be installed"));

Or through GUICE

And we can remove the ArrowAllocatorService using this PR, as it's for providing the root allocator to other extension plugin. And ArrowNativeAllocator should be the one doing that now, through this getPoolAllocator.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for point this out, will do this change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bowenlan-amzn
Copy link
Copy Markdown
Member

This PR seems to be the good place to get some alignment on the memory assignment across different components in the system.
Let's take 64G RAM as example.

64 GB RAM
├── JVM Heap (16 GB / 25%)
│   ├── Indexing pressure, Writer buffer
│   ├── Transport Netty
│   └── etc. ...
|
└── Off-Heap (48 GB / 75%)
    │
    ├── node.native_memory.limit (40 G)
    │   │
    │   ├── Java Arrow Root (12G / 30%)
    │   │   ├── POOL_FLIGHT  (2G / 5%)
    │   │   │   └── flight/server, flight/client — batch ser/deser
    │   │   ├── POOL_INGEST  (4G / 10%)
    │   │   │   └── ArrowBufferPool children (per-VSR, pool_limit/divisor)
    │   │   └── POOL_QUERY   (2G / 5%)
    │   │       └── analytics-search-service → per-query children (256MB each)
    │   │
    │   └── Rust Memory Pool (28G / 70%)
    │       └── DataFusion Rust pool (datafusion.memory_pool_limit_bytes)
    │           ├── hash tables, sort buffers, aggs, intermediate batches
    │           └── spill staging (datafusion.spill_memory_limit_bytes)
    │
    └── Rest (no knobs, at least 8G)
        ├── Lucene mmap (~2 GB)
        └── Page Cache (whatever's left)

I think the Java Arrow memory are mostly used for intermediate data transfer, while Rust memory are used for query execution. That's one reason to provide more to Rust side.

@Bukhtawar @alchemist51 @mch2 @expani

@alchemist51 alchemist51 added the skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis. label May 19, 2026
@alchemist51 alchemist51 reopened this May 19, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit d646dd4

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for d646dd4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from d646dd4 to 53c0e36 Compare May 19, 2026 20:10
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 53c0e36

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e1f3c5b

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for e1f3c5b: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from e1f3c5b to 041dc0e Compare May 19, 2026 20:26
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 041dc0e

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 041dc0e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Copy Markdown
Member

@bowenlan-amzn bowenlan-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overral looks good! Thanks for refactoring the allocator related changes in arrow-base.
Left some comments that should be easy to accommodate. Approving now.

*
* <p>Default: empty.
*/
public List<PluginNodeStats> nodeStats() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope we can have a ExperimentalApi annotation here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

IndexSettings indexSettings,
ThreadPool threadPool
ThreadPool threadPool,
org.opensearch.arrow.allocator.ArrowNativeAllocator nativeAllocator
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use import. And all the other places

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +157 to +158
BufferAllocator flightPool = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_FLIGHT);
flightAllocator = flightPool.newChildAllocator("flight", 0, flightPool.getLimit());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not necessary to create flightAllocator here, the flightPool should be the flightAllocator.

For the 2 child allocator, server and client, can we set the limit to be Long.MAX_VALUE. I think they will still be bound by their parent which is the POOL_FLIGHT here.

This way seems we also don't need the listener setup here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The intermediate flightAllocator is gone, flightPool is the parent directly, and serverAllocator / clientAllocator are children at Long.MAX_VALUE.

Comment on lines +98 to +99
BufferAllocator queryPool = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_QUERY);
this.allocator = queryPool.newChildAllocator("analytics-search-service", 0, queryPool.getLimit());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same shape applied.

Comment on lines +118 to +119
BufferAllocator queryPool = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_QUERY);
this.coordinatorAllocator = queryPool.newChildAllocator("coordinator", 0, queryPool.getLimit());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 40688ad

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 40688ad: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f4cbdd2

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for f4cbdd2: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

…Stats

Builds on opensearch-project#21703 to add the pieces a real production deployment needs:

* Make pool min/max settings dynamic and grouped-validate cross-setting
  invariants on every cluster-state update (sum of pool mins <= root,
  per-pool min <= max). FLIGHT_MIN/INGEST_MIN defaults are 0L (the prior
  Long.MAX_VALUE defaults caused the new validator to reject any non-MAX
  root). setPoolMin now updates the live BufferAllocator.setLimit so the
  Dynamic property has observable effect even when the rebalancer is off.
* Derive ROOT_LIMIT_SETTING default from the AC node-native-memory budget
  (limit minus buffer-percent) so the framework respects the same budget
  AC throttles on, with a Long.MAX_VALUE fallback when AC is unconfigured.
* Add QUERY pool alongside FLIGHT and INGEST. Pools init at min when the
  rebalancer is enabled; otherwise at max so non-rebalanced nodes can
  still allocate.
* Wire arrow-flight-rpc to the FLIGHT pool, parquet-data-format to the
  INGEST pool, and analytics-engine to the QUERY pool via the framework's
  allocator service. Hard-fail if the framework plugin is missing —
  silently skipping the wire-up is the silent-misconfiguration class of
  bug we want to prevent.
* Cleanup ad-hoc allocator fallbacks in parquet-data-format / analytics-engine
  so all Arrow consumers go through the unified pool hierarchy.
* Rebalancer now distributes headroom across all pools (not only those with
  current allocation > 0). Avoids the dead-pool corner case where a pool
  with min=0 starts at limit=0, can never make a first allocation, and
  never receives a bonus.

DataFusion runtime memory accounting stays separate. The Rust-side
DataFusion MemoryPool is governed by datafusion.memory_pool_limit_bytes
(unchanged from before), which is the right knob: DataFusion's internal
sort/hash/group-by working memory is allocated by Rust, not through the
Arrow Java BufferAllocator hierarchy, so a Java-side pool would resize a
ceiling no allocator routes through. The framework's QUERY pool covers
the cross-plugin Arrow allocations the analytics-engine plumbing makes,
which is what we want bounded centrally.

Plugin _nodes/stats integration:
* Add Plugin#nodeStats() hook + PluginNodeStats interface in server.
* Wire NodeStats to carry Map<String, PluginNodeStats> with version-gated
  ser/deser at V_3_7_0 and top-level rendering under nodes.<id>.<name>.
* Each entry is wire-framed as (name, length-prefixed bytes); the receiver
  wraps the inner payload with NamedWriteableAwareStreamInput and drops
  entries whose subtype is not registered locally. This makes mixed-version
  rolling upgrades safe — a coordinator that lacks the plugin a data node
  is running keeps decoding the rest of NodeStats instead of failing the
  whole response.
* NativeAllocatorPluginStats adapter wraps NativeAllocatorPoolStats so the
  framework contributes to _nodes/stats. The dedicated _native_allocator/stats
  REST endpoint is gone — one observability surface, not two.
* Plugin stats are emitted on every _nodes/stats request regardless of the
  ?metric= filter; matches RemoteStoreNodeStats precedent.

DataFusion spill memory limit:
* Promote datafusion.spill_memory_limit_bytes to Setting.Property.Dynamic.
* Wire addSettingsUpdateConsumer that branches on
  NativeBridge.isSpillLimitDynamic(): mirrors live when df_set_spill_limit
  is exported, otherwise warns and waits for next node restart.

API hygiene:
* parquet.max_per_vsr_allocation_ratio is a divisor (limit/N), not a ratio.
  Renamed to parquet.max_per_vsr_allocation_divisor with a hard upper
  bound of 100 to reject fat-finger PUTs that would starve every VSR.

Real regressions caught during self-audit:
* Existing arrow-flight-rpc internalClusterTests and the sandbox coordinator
  ITs were not declaring the framework plugin in nodePlugins(). After the
  flight transport got wired to the FLIGHT pool, those ITs fail at node
  startup with "ArrowNativeAllocator not initialized". Each IT now installs
  the framework plugin and lists it as an extendedPlugin.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
…gistry injection

Removes ArrowNativeAllocator.instance(), the static INSTANCE field, and ensureForTesting(). Binds ArrowNativeAllocator into Guice from ArrowBasePlugin#createGuiceModules; consumer plugins (FlightStreamPlugin, AnalyticsPlugin, ParquetDataFormatPlugin) fetch it from PluginComponentRegistry and pass it through to FlightTransport, AnalyticsSearchService, ArrowBufferPool, and ParquetIndexingEngine via constructor parameters.

Tests construct the allocator directly with the standard pools instead of relying on the test-only ensureForTesting() helper.

Addresses bowen's review comment to align with the ArrowAllocatorService provisioning pattern (returned from createComponents, bound via Guice).

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
…ocatorService

Bowen's review feedback: every Arrow consumer should source its allocator from one of the framework's named pools (FLIGHT/INGEST/QUERY) via ArrowNativeAllocator#getPoolAllocator. The old ArrowAllocatorService SPI let callers create root-siblings outside any pool, which left their allocations invisible in _nodes/stats.native_allocator.pools.* and created a footgun for cross-plugin handoffs.

Migrations:
* DefaultPlanExecutor.coordinatorAllocator -> child of POOL_QUERY (coordinator-side bytes now count against parquet.native.pool.query.max alongside the data-node-side per-fragment allocators)
* TransportNativeArrowStreamDataAction (stream-transport-example) -> POOL_FLIGHT
* NativeArrowTransportIT.TransportTestArrowAction -> POOL_FLIGHT

Deletions:
* ArrowAllocatorService interface and DefaultArrowAllocatorService impl
* Guice binding bind(ArrowAllocatorService.class) in ArrowBasePlugin
* Dead allocatorService constructor parameter on FlightTransport and AnalyticsSearchService
* Now-empty org.opensearch.arrow.memory package (package-info.java)

ArrowBatchResponse Javadoc updated to point readers at the named-pool API.

Behavior change: on a node serving as both coordinator and data node for an analytics query, both sides of the query now count against parquet.native.pool.query.max. Operators upgrading should review the existing budget against total per-node analytics workload.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
Self-review surfaced six items worth fixing before pushing:

* DefaultPlanExecutor now subscribes to the QUERY pool listener and updates coordinatorAllocator on dynamic resize, matching the pattern used by AnalyticsSearchService and FlightTransport. Without this, runtime grows of parquet.native.pool.query.max never reached the coordinator-side allocator.
* DefaultPlanExecutor.coordinatorAllocator is now sized at queryPool.getLimit() instead of Long.MAX_VALUE so the literal matches the actual cap.
* Drop the dead try/catch around removeListener in FlightTransport.stopInternal and AnalyticsSearchService.close. Those catches were holdovers from the singleton era and can't fire now that nativeAllocator is constructor-injected.
* Fix the joined-import line in AnalyticsSearchService that two earlier patches concatenated.
* Expand ArrowBatchResponse Javadoc with a pool-selection decision tree (FLIGHT for transport, INGEST for ingest path, QUERY for query execution) so readers know which pool to pick.
* Update misleading Javadoc in TransportNativeArrowStreamDataAction and DefaultPlanExecutor that referenced non-existent close paths. The pre-existing leak is preserved (out of scope for this PR) but documented honestly with a path forward.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
… plugin nodeStats — review-feedback edition

Builds on opensearch-project#21703 to add the query, datafusion with allocator, dynamic-tuning, stats,
and pool-wiring pieces. This commit is the consolidated version after addressing
review feedback on PR opensearch-project#21732.

## Architecture: three native-memory trackers, three knobs

Each tracker owns the bytes it can actually see, with a process-level cap above all
of them. On a 64 GB / 16 GB-heap node the defaults compose to:

  RAM                                            64 GB
  JVM heap (operator-configured -Xmx)            16 GB
  Off-heap (RAM - heap)                          48 GB
  node.native_memory.limit  (79% of off-heap)    37.92 GB     ← AC throttle threshold
    native.allocator.root.limit (20% of NM)       7.58 GB     ← Arrow framework cap
      pool.flight.max  (5% of NM)                 1.90 GB     ← Flight transport pool
      pool.ingest.max  (8% of NM)                 3.03 GB     ← Parquet VSR pool
      pool.query.max   (5% of NM)                 1.90 GB     ← analytics-engine pool
    datafusion.memory_pool_limit (75% of NM)     28.44 GB     ← Rust runtime pool (sibling to Arrow)
  Unmanaged                                      10.08 GB     ← Lucene mmap, OS page cache, etc.

  Independent (disk budget):
    datafusion.spill_memory_limit_bytes (50% of physical RAM)  32 GB

## What this commit does

### 1. Dynamic settings + cluster-update consumers

All native-allocator settings are Setting.Property.Dynamic. Cluster-settings update
consumers in ArrowBasePlugin#registerSettingsUpdateConsumers wire PUTs to live
ArrowNativeAllocator state. The grouped validator (validateUpdate) rejects
cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time
with HTTP 400.

### 2. Query and DataFusion pools

POOL_QUERY added for analytics-engine per-query allocators (children of
nativeAllocator.getPoolAllocator(POOL_QUERY)). DataFusion gets its own setting
(datafusion.memory_pool_limit_bytes) since DataFusion's working memory lives
entirely on the Rust side and is reported only through DataFusion's MemoryPool
— a Java-side pool that pretended to track it would either need a per-allocation
FFM round-trip (performance disaster) or be a config-only mirror (HTTP 200 with
no observable effect).

POOL_DATAFUSION is intentionally absent; the Rust-side
datafusion.memory_pool_limit_bytes is the honest knob for that layer.

### 3. Plugin#nodeStats SPI

New SPI method Plugin#nodeStats() returning List<PluginNodeStats>. NodeStats
serializes plugin-contributed payloads via NamedWriteable, surfacing them under
_nodes/stats. Annotated @experimentalapi to match PluginNodeStats.

### 4. Pool wiring through Guice (no static singleton)

ArrowNativeAllocator is constructed in ArrowBasePlugin#createComponents and
returned to Node.java which auto-binds it via Guice. Consumers
(AnalyticsSearchService, FlightTransport, DefaultPlanExecutor) receive it through
constructor @Inject — no static instance() / INSTANCE / ensureForTesting.

The createComponents body is extracted into a package-private buildAllocator
helper so unit tests can exercise the full pool/consumer wiring without the
heavyweight ClusterService fixture.

### 5. Drop NativeAllocatorListener SPI [@bowenlan-amzn #3269655712, #3269660170,
   @Bukhtawar #3267186318]

The original design used NativeAllocatorListener to push pool-limit changes into
consumer-side child allocators that captured the pool's limit at construction
time. With children created at Long.MAX_VALUE, Arrow's Accountant.allocate (lines
191-203 of Accountant.java in arrow-java v18.3.0) walks the parent chain on every
allocation and checks the parent's allocationLimit — so dynamic resizes of
parquet.native.pool.{flight,query}.max take effect immediately on subsequent
allocations through any descendant. The listener was emulating this Arrow-native
behavior and is no longer needed.

  - AnalyticsSearchService: drop poolListener; child uses Long.MAX_VALUE
  - DefaultPlanExecutor: drop poolListener; coordinator uses Long.MAX_VALUE
  - FlightTransport: drop intermediate flightAllocator + poolListener; pool is
    the parent, server/client are Long.MAX_VALUE children
  - libs/arrow-spi/.../NativeAllocatorListener.java: deleted
  - NativeAllocator interface: addListener / removeListener removed
  - ArrowNativeAllocator: listeners field, fireListeners method, and listener
    calls in setPoolLimit/setPoolMin/rebalance removed

The rebalancer continues to function as before: it changes pool limits via
setLimit, which children read via Arrow's parent-cap check at allocateBytes.
Rebalancer is off by default.

### 6. Fix Guice duplicate-binding regression on ArrowNativeAllocator

Removed the redundant ArrowBasePlugin#createGuiceModules override. Node.java
line 1748 already binds every component returned from createComponents:

    pluginComponents.stream()
        .forEach(p -> b.bind((Class) p.getClass()).toInstance(p));

ArrowBasePlugin#createComponents already returns the allocator, so the explicit
bind(ArrowNativeAllocator.class).toInstance(allocator) in createGuiceModules
registered the same binding a second time and caused Guice CreationException at
cluster startup. This was the root cause of the gradle-check Jenkins CI failures
on commits 9188043, 498b502, d646dd4, e1f3c5b, 041dc0e — every integration test
failed during MockNode construction.

### 7. @experimentalapi on Plugin#nodeStats() [@bowenlan-amzn #3269563833]

Annotate the new SPI hook to match the @experimentalapi annotation already
present on PluginNodeStats (the type the method returns).

### 8. FQN -> import nit in ParquetIndexingEngine [@bowenlan-amzn #3269589758]

Replace 3 fully-qualified
org.opensearch.arrow.allocator.ArrowNativeAllocator references in constructor
parameters with a single import.

### 9. Memory defaults aligned to the partitioning model

  - node.native_memory.limit defaults to 79% × (RAM - heap) via OsProbe (cgroup-
    aware: container memory limit on K8s/Docker, total physical RAM on bare
    metal). Falls back to ZERO if probe fails or heap >= RAM, preserving the
    pre-default opt-in semantics. The OsProbe-reading entry point delegates to
    a pure overload deriveNativeMemoryLimitDefault(long, long) so unit tests
    cover the fallback branches deterministically.

  - native.allocator.root.limit defaults to 20% × node.native_memory.limit.

  - parquet.native.pool.{flight,ingest,query}.max default to 5%/8%/5% of
    node.native_memory.limit (sum 18% < root's 20%, leaving 2 pp headroom inside
    the framework cap).

  - datafusion.memory_pool_limit_bytes defaults to 75% × node.native_memory.limit
    (called out by @bharath-techie #3271093086: prior default
    Runtime.maxMemory() / 4 was JVM-heap derived, wrong baseline for an off-heap
    runtime).

  - datafusion.spill_memory_limit_bytes defaults to 50% × physical RAM,
    independent of node.native_memory.limit (spill is a disk-staging budget,
    not a memory budget).

Behavior change to call out in upgrade notes: admission control is now active
by default. Operators wanting pre-existing opt-out behavior can set
node.native_memory.limit: 0b — all framework caps then fall back to
Long.MAX_VALUE unbounded mode.

### 10. Translate Arrow OutOfMemoryException to OpenSearchRejectedExecutionException
   at per-request boundaries [reviewer feedback]

Added an AllocationRejection helper in plugins/arrow-base that wraps Arrow OOM
with OpenSearch's standard backpressure signal so the REST layer maps these
failures to HTTP 429 (Too Many Requests) instead of a generic 500. Wraps applied
at the per-request allocation boundaries:

  - DefaultPlanExecutor.executeInternal: per-query child allocator creation
  - VSRPool.tryRotate: VSR rotation during ingest

Lifetime/startup allocations (FlightTransport.doStart, AnalyticsSearchService
constructor, VSRPool.initializeActiveVSR) are intentionally left unwrapped — a
failure there is a framework misconfiguration, not a per-request backpressure
signal.

### 11. Integration tests for cap-enforcement boundaries

Added NativeAllocatorBoundaryIT under
plugins/arrow-flight-rpc/src/internalClusterTest. Three tests boot a single-node
cluster with tight memory settings (root=16 MiB, pool maxes=16 MiB) and exercise
real Arrow allocations via the Guice-injected ArrowNativeAllocator:

  - testPoolMaxRejectsAllocationsBeyondCap: PUT pool.query.max=4 MiB, verify a
    8 MiB request through the pool throws OutOfMemoryException.
  - testRootLimitRejectsAllocationsBeyondCap: hold 8 MiB in FLIGHT, verify a
    16 MiB QUERY request fails at the root level (8+16 > 16 root cap) even
    though QUERY's own max would individually allow it.
  - testDynamicPoolResizeAffectsInFlightAllocations: create child at
    Long.MAX_VALUE (the AnalyticsSearchService / DefaultPlanExecutor pattern
    post-listener-removal), verify a 2 MiB allocation succeeds, PUT
    query.max=1 MiB via cluster settings, verify a 2 MiB request now throws
    OOM via Arrow's parent-cap check at allocateBytes.

These cover the boundaries the framework is supposed to enforce. The third test
specifically verifies the listener-replacement contract.

### 12. Unit tests for codecov patch-coverage gates

Added unit tests covering paths flagged by codecov as uncovered diff lines:

  - ArrowBasePluginTests:
      * extended testPoolMaxRejectsNegative to cover INGEST_MAX and QUERY_MAX
        rejection branches.
      * testBuildAllocatorWiresAllPoolsAndSettingsConsumers exercises the full
        createComponents code path via the extracted buildAllocator helper:
        all three pools registered, pool maxes match operator-set values,
        cluster-settings update consumers wired.
  - NodeStatsTests:
      * testPluginStatsDropsAllEntriesWhenReceiverHasNoRegistry covers the
        registry-null branch in NodeStats.readPluginStats (defensive path
        when the parent stream has no NamedWriteableRegistry attached).
      * testPluginNodeStatsDefaultReturnsEmpty covers the Plugin#nodeStats()
        default empty-list return.
  - NodeResourceUsageTrackerTests:
      * testDeriveNativeMemoryLimitDefaultFallbackPaths covers the
        ResourceTrackerSettings.deriveNativeMemoryLimitDefault fallback
        branches (probe-returns-0, heap >= ram) plus the happy path.
  - AllocationRejectionTests:
      * 4 tests covering success pass-through, OOM translation, non-OOM
        propagation, and the Runnable overload of AllocationRejection.wrap.

## How a query flows through these layers

Take a concrete example: a user issues a PPL query that goes through
analytics-engine and dispatches to DataFusion.

  1. Coordinator receives the request.
     → Admission control checks node.native_memory.limit budget.
     → If OK, proceeds; otherwise rejects with 429.

  2. AnalyticsSearchService creates a per-fragment Arrow allocator:
        allocator = getPoolAllocator(POOL_QUERY).newChildAllocator("frag-N", 0, Long.MAX_VALUE)
     → Future BufferAllocator.buffer(...) calls on this allocator increment
       POOL_QUERY's counter and the root counter via Arrow's parent-cap chain.
     → Bounded by parquet.native.pool.query.max.

  3. AnalyticsSearchService dispatches to DataFusion via NativeBridge.
     → NativeBridge.executeQueryAsync(...) marshals plan bytes via FFM.

  4. DataFusion runs the query in Rust:
     → HashAggregate builds a hash table.
     → reservation.try_grow(50MB) → DataFusion MemoryPool counter += 50MB.
     → Bounded by datafusion.memory_pool_limit_bytes.
     → If exceeded, HashAggregate spills to disk
       (which itself is bounded by datafusion.spill_memory_limit_bytes).

  5. DataFusion produces a result batch in Rust.
     → Java imports it via Arrow C Data Interface.
     → The import allocates Java ArrowBufs under the per-fragment allocator
       from step 2. POOL_QUERY counter += result_size.

  6. Result returns to coordinator.
     → Per-fragment allocator closes; POOL_QUERY counter decrements.
     → DataFusion query completes; MemoryPool counter decrements.

Each layer accounts for what it owns. No double-counting. Each operator-tunable
knob bounds a real, observable thing.

## Verification

  ./gradlew -Dsandbox.enabled=true \
    :plugins:arrow-base:test :plugins:arrow-flight-rpc:test \
    :sandbox:plugins:{analytics-engine,analytics-backend-datafusion,parquet-data-format}:test \
    :server:test --tests "org.opensearch.node.resource.tracker.*" \
    :server:test --tests "org.opensearch.action.admin.cluster.node.stats.NodeStatsTests" \
    :plugins:arrow-flight-rpc:internalClusterTest --tests "*NativeAllocatorBoundaryIT*" \
    spotlessJavaCheck
  → BUILD SUCCESSFUL

By submitting this pull request, I confirm that my contribution is made under
the terms of the Apache 2.0 license.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
…load

The Runnable overload of AllocationRejection.wrap was missing @param tags
for context and body, failing the missingJavadoc gradle-check on CI:

  AllocationRejection.java:75: error: AllocationRejection.wrap (method):
    missing javadoc @param for parameter 'context'

Mirror the @param/@throws structure already present on the Supplier
overload above.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a63141a

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3ab722f

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 3ab722f: SUCCESS

@gaurav-amz
Copy link
Copy Markdown
Contributor Author

We might need to reject the request, throw OpenSearchRejectedException rather than oom-ing. Also lets try to see how we can wire up circuit-breaker for this. Maybe circuit-breaker stats is something we leverage for tracking memory used

OpenSearchRejectedException handling has been addressed in this PR. Circuit breaker-related changes will be included in a follow-up PR. Stats-related refactoring will be needed once Ajay’s PR is merged.

Comment on lines +118 to +125
public static final Setting<Integer> MAX_PER_VSR_ALLOCATION_DIVISOR = Setting.intSetting(
"parquet.max_per_vsr_allocation_divisor",
10,
1,
100,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be function of cores?

Comment on lines +279 to +300
private static Map<String, PluginNodeStats> readPluginStats(StreamInput in) throws IOException {
int size = in.readVInt();
if (size == 0) {
return Collections.emptyMap();
}
NamedWriteableRegistry registry = in.namedWriteableRegistry();
Map<String, PluginNodeStats> result = new HashMap<>(size);
for (int i = 0; i < size; i++) {
String name = in.readString();
BytesReference payload = in.readBytesReference();
if (registry == null) {
// No registry attached to the parent stream — we can't deserialize any
// entry. Drop the whole map. This branch is defensive; production paths
// always set a registry when they expect named writeables.
continue;
}
try (
StreamInput rawIn = payload.streamInput();
NamedWriteableAwareStreamInput payloadIn = new NamedWriteableAwareStreamInput(rawIn, registry)
) {
payloadIn.setVersion(in.getVersion());
PluginNodeStats stats = payloadIn.readNamedWriteable(PluginNodeStats.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will _node/stats?metrics=<metric_name> work?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, The stats will be under this path _nodes/stats/native_allocator

…lowup

# Conflicts:
#	sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java
#	server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java
#	server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java
#	server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java
#	server/src/main/java/org/opensearch/node/NodeService.java
#	test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java
@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from 3ab722f to d4df93c Compare May 21, 2026 05:13
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit d4df93c

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for d4df93c: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 79ca7c2

@gaurav-amz gaurav-amz changed the title Native allocator follow-up: dynamic settings, query/datafusion pools, plugin nodeStats Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats May 21, 2026
@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 79ca7c2: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit d96e97c

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit b8f7374

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for b8f7374: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ab0dfd3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants