Skip to content

Native allocator follow-up: dynamic settings, query/datafusion pools, plugin nodeStats#21732

Open
gaurav-amz wants to merge 7 commits into
opensearch-project:mainfrom
gaurav-amz:unified-allocator-followup
Open

Native allocator follow-up: dynamic settings, query/datafusion pools, plugin nodeStats#21732
gaurav-amz wants to merge 7 commits into
opensearch-project:mainfrom
gaurav-amz:unified-allocator-followup

Conversation

@gaurav-amz
Copy link
Copy Markdown
Contributor

@gaurav-amz gaurav-amz commented May 19, 2026

Description

Builds on #21703 to add the query, datafusion with allocator, dynamic-tuning, stats, and pool-wiring pieces.

Architecture: three native-memory trackers, three knobs

Each tracker owns the bytes it can actually see, with a process-level cap above all of them.

            ┌──────────────────────────────────────────────────────────┐
            │  Process-level cap (admission control)                   │
            │                                                          │
            │  node.native_memory.limit                                │
            │    Defaults to 79% × (RAM − JVM heap), cgroup-aware.     │
            │                                                          │
            │  ┌────────────────────────┐  ┌────────────────────────┐  │
            │  │ Arrow tracker          │  │ DataFusion tracker     │  │
            │  │ (ArrowNativeAllocator. │  │ (Rust MemoryPool)      │  │
            │  │  root)                 │  │                        │  │
            │  │                        │  │ Tracks:                │  │
            │  │ Tracks:                │  │   HashAggregate        │  │
            │  │   POOL_FLIGHT (5% NM)  │  │   Sort buffers         │  │
            │  │   POOL_INGEST (8% NM)  │  │   TopK heap            │  │
            │  │   POOL_QUERY  (5% NM)  │  │   Spill staging        │  │
            │  │                        │  │                        │  │
            │  │ Bounded by:            │  │ Bounded by:            │  │
            │  │  native.allocator      │  │  datafusion.memory_    │  │
            │  │  .root.limit           │  │  pool_limit_bytes      │  │
            │  │  (20% × NM by default) │  │  (75% × NM by default) │  │
            │  └────────────────────────┘  └────────────────────────┘  │
            │                                                          │
            └──────────────────────────────────────────────────────────┘

Three layers, three responsibilities. Each is necessary; none can be replaced by another.

  • Arrow tracker (ArrowNativeAllocator) accounts for every Arrow BufferAllocator allocation in the JVM. Partitioned into FLIGHT / INGEST / QUERY pools so one plugin can't starve another. All Arrow buffers descend from the same RootAllocator, preserving Arrow's same-root invariant for cross-plugin zero-copy handoff.
  • DataFusion tracker (Rust MemoryPool) accounts for DataFusion's own working memory and triggers spill / fail-fast when a query exceeds budget. Lives entirely on the Rust side; updates flow in via FFM through NativeBridge.setMemoryPoolLimit.
  • Admission control caps the OS process. node.native_memory.limit is the operator-declared off-heap budget AC throttles against; framework-derived defaults (root, pools, DataFusion pool) all scale from this single number.

POOL_DATAFUSION is intentionally not present. DataFusion's working memory is allocated by Rust operators directly and reported only to DataFusion's own MemoryPool — it never flows through Arrow's BufferAllocator API. Adding a Java-side pool that pretended to track it would have required either a per-allocation FFM round-trip (performance disaster) or a config-only mirror (a setting that returns HTTP 200 and silently does nothing). The Rust-side datafusion.memory_pool_limit_bytes is the honest knob for that layer.

Worked example: 64 GB / 16 GB-heap node

With operator-declared -Xmx16g on a 64 GB host (bare metal or cgroup-limited container), defaults compose to:

64 GB RAM
  │ ├── JVM Heap (16 GB / 25%)
  │ │   ├── Indexing pressure, Writer buffer
  │ │   ├── Transport Netty
  │ │   └── etc.
  │ │
  │ └── Off-Heap (48 GB / 75%)
  │     │
  │     ├── node.native_memory.limit (37.92 GB / 79% of off-heap)
  │     │   │
  │     │   ├── Java Arrow Root (7.58 GB / 20% of NM)
  │     │   │   ├── POOL_FLIGHT  (1.90 GB / 5% of NM)
  │     │   │   │   └── server, client — flight transport batches
  │     │   │   ├── POOL_INGEST  (3.03 GB / 8% of NM)
  │     │   │   │   └── ArrowBufferPool / parquet VSR allocators
  │     │   │   └── POOL_QUERY   (1.90 GB / 5% of NM)
  │     │   │       └── analytics-search-service → per-query children
  │     │   │
  │     │   └── DataFusion Rust pool (28.44 GB / 75% of NM)
  │     │       └── datafusion.memory_pool_limit_bytes
  │     │           ├── hash tables, sort buffers, aggs, intermediate batches
  │     │           └── triggers spill on exhaustion
  │     │
  │     └── Unmanaged (~10.08 GB / 21% of off-heap)
  │         ├── Lucene mmap
  │         ├── OS Page Cache
  │         └── Sidecars / non-Arrow native consumers
  │
  │ Independent (disk):
  │   datafusion.spill_memory_limit_bytes (32 GB / 50% of physical RAM)

As a flat table for quick reference:

RAM                                              64 GB
JVM heap (operator-configured -Xmx)              16 GB
Off-heap (RAM − heap)                            48 GB
node.native_memory.limit  (79% of off-heap)      37.92 GB    ← AC throttle threshold
  native.allocator.root.limit  (20% of NM)        7.58 GB    ← Arrow framework cap
    pool.flight.max  (5% of NM)                   1.90 GB    ← Flight transport pool
    pool.ingest.max  (8% of NM)                   3.03 GB    ← Parquet VSR pool
    pool.query.max   (5% of NM)                   1.90 GB    ← analytics-engine pool
  datafusion.memory_pool_limit  (75% of NM)      28.44 GB    ← Rust runtime pool (sibling)
Unmanaged (Lucene mmap, OS page cache, etc.)     10.08 GB

Independent budget (disk staging, not memory):

datafusion.spill_memory_limit_bytes (50% of physical RAM)    32 GB

How a query flows through these layers

Take a concrete example: a user issues a PPL query that goes through analytics-engine and dispatches to DataFusion.

  1. Coordinator receives the request.
     → Admission control checks node.native_memory.limit budget.
     → If OK, proceeds; otherwise rejects with 429.

  2. AnalyticsSearchService creates a per-fragment Arrow allocator:
        allocator = getPoolAllocator(POOL_QUERY).newChildAllocator("frag-N", 0, Long.MAX_VALUE)
     → Future BufferAllocator.buffer(...) calls on this allocator
       increment POOL_QUERY's counter and the root counter via Arrow's
       parent-cap chain at allocateBytes.
     → Bounded by native.allocator.pool.query.max.

  3. AnalyticsSearchService dispatches to DataFusion via NativeBridge.
     → NativeBridge.executeQueryAsync(...) marshals plan bytes via FFM.

  4. DataFusion runs the query in Rust:
     → HashAggregate builds a hash table.
     → reservation.try_grow(50MB) → DataFusion MemoryPool counter += 50MB.
     → Bounded by datafusion.memory_pool_limit_bytes.
     → If exceeded, HashAggregate spills to disk
       (which itself is bounded by datafusion.spill_memory_limit_bytes).

  5. DataFusion produces a result batch in Rust.
     → Java imports it via Arrow C Data Interface.
     → The import allocates Java ArrowBufs under the per-fragment allocator
       from step 2. POOL_QUERY counter += result_size.

  6. Result returns to coordinator.
     → Per-fragment allocator closes; POOL_QUERY counter decrements.
     → DataFusion query completes; MemoryPool counter decrements.

Each layer accounts for what it owns. No double-counting (the result bytes are counted in DataFusion's pool only while they exist as Rust buffers; after Java imports them, ownership transfers and the Java side counts them; when Java closes the import, the bytes are freed). Each operator-tunable knob bounds a real, observable thing.

Operator surface

After this PR, an operator inspecting a node running the query above sees:

{
  "native_allocator": {
    "root": {"allocated": "150MB", "limit": "7.58GB"},
    "pools": {
      "flight":  {"allocated": "20MB",  "limit": "1.90GB"},
      "ingest":  {"allocated": "30MB",  "limit": "3.03GB"},
      "query":   {"allocated": "100MB", "limit": "1.90GB"}
    }
  },
  "datafusion": {
    "memory_pool": {"usage": "2.4GB", "limit": "28.44GB"},
    "spill":       {"usage": "0",     "limit": "32GB"}
  }
}

Three numbers, three sources, three knobs. The operator looks at each in isolation when tuning that layer:

  • "DataFusion queries are OOMing" → bump datafusion.memory_pool_limit_bytes.
  • "Flight RPC is starving Parquet ingest" → bump parquet.native.pool.flight.max or lower parquet.native.pool.ingest.max.
  • "Whole node is using too much memory" → lower node.native_memory.limit.

All pool min/max settings are Setting.Property.Dynamic. Pool limit changes propagate to consumer-side child allocators automatically via Arrow's parent-cap check at allocateBytes — child allocators are created with Long.MAX_VALUE and inherit the live parent cap on every allocation, so dynamic resizes reach in-flight workloads without restart and without an explicit notification SPI. The grouped validator rejects cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time with HTTP 400 rather than at the next allocation.

Behavior change to call out

Admission control is now active by default. node.native_memory.limit defaults to 79% × (RAM − JVM heap) instead of 0 (unconfigured). On upgrade, AC will start tracking native memory utilization and the framework's pool caps will derive sensible values from the operator's declared off-heap budget without any explicit configuration.

Operators who want pre-existing opt-out behavior (AC unconfigured, all framework caps unbounded) can set:

node.native_memory.limit: 0b

This restores the prior "explicit-opt-in" semantics — useful for nodes where Lucene mmap or non-Arrow native consumers dominate and the operator does not want admission control throttling search/index traffic.

Changes after initial review

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@gaurav-amz gaurav-amz requested a review from a team as a code owner May 19, 2026 08:45
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 19, 2026

PR Reviewer Guide 🔍

(Review updated until commit d4df93c)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

In validateMinSum, overflow is caught but the sum variable is not reset or handled correctly after the exception. If Math.addExact throws ArithmeticException, the sum remains at its pre-overflow value, and the subsequent check 'sum > rootLimit' uses stale data. This can incorrectly accept configurations where the true sum would exceed rootLimit.

for (long min : mins) {
    try {
        sum = Math.addExact(sum, min);
    } catch (ArithmeticException overflow) {
        throw new IllegalArgumentException("Sum of pool minimums overflows.", overflow);
    }
}
if (sum > rootLimit) {
Resource Leak

In readPluginStats, when an IOException or IllegalArgumentException is caught during deserialization of a PluginNodeStats entry, the payloadIn StreamInput is closed by the try-with-resources block, but the outer loop continues without ensuring all remaining entries are safely skipped. If the payload.streamInput() itself is malformed or the NamedWriteableAwareStreamInput constructor throws before the try block completes, resources may leak. Additionally, the parent 'in' stream position may be left inconsistent if payload reading fails partway through.

NamedWriteableRegistry registry = in.namedWriteableRegistry();
Map<String, PluginNodeStats> result = new HashMap<>(size);
for (int i = 0; i < size; i++) {
    String name = in.readString();
    BytesReference payload = in.readBytesReference();
    if (registry == null) {
        // No registry attached to the parent stream — we can't deserialize any
        // entry. Drop the whole map. This branch is defensive; production paths
        // always set a registry when they expect named writeables.
        continue;
    }
    try (
        StreamInput rawIn = payload.streamInput();
        NamedWriteableAwareStreamInput payloadIn = new NamedWriteableAwareStreamInput(rawIn, registry)
    ) {
        payloadIn.setVersion(in.getVersion());
        PluginNodeStats stats = payloadIn.readNamedWriteable(PluginNodeStats.class);
        result.put(name, stats);
    } catch (IOException | IllegalArgumentException e) {
        // Receiver doesn't have the plugin's NamedWriteable registered (typical
        // during rolling upgrades or in a non-uniform plugin install). Drop the
        // entry; the rest of NodeStats remains decodable.
    }
}
return result;
Possible Issue

In updateSpillMemoryLimit, the method checks isSpillLimitDynamic() and logs a warning if false, but still returns normally. The cluster-settings update is accepted unconditionally because 'the value is read at next node startup'. However, if an operator repeatedly updates the setting expecting immediate effect, they receive no actionable feedback beyond a log warning. On a busy cluster with log volume, this warning may be missed, leading to confusion about why the setting appears to update successfully but has no runtime effect.

void updateSpillMemoryLimit(long newLimitBytes) {
    DataFusionService service = dataFusionService;
    if (service == null) {
        logger.debug("DataFusion service not yet initialized; ignoring spill limit update to {}B", newLimitBytes);
        return;
    }
    if (!service.isSpillLimitDynamic()) {
        logger.warn(
            "Updated DataFusion spill memory limit to {}B at the cluster level; the loaded native library does not "
                + "support runtime spill resize, so the new value will only take effect after a node restart",
            newLimitBytes
        );
        return;
    }
    try {
        service.setSpillMemoryLimit(newLimitBytes);
        logger.info("Updated DataFusion spill memory limit to {}B", newLimitBytes);
    } catch (IllegalStateException e) {
        logger.warn("Ignoring spill memory limit update to {}B; service is not running", newLimitBytes);
    } catch (UnsupportedOperationException e) {
        // isSpillLimitDynamic() guard above should make this unreachable, but defend
        // against a race between probe and call.
        logger.warn("Ignoring spill memory limit update to {}B; native runtime does not support live updates", newLimitBytes);
    }
}
Possible Issue

In setPoolMin, the method computes 'target = Math.min(newMin, max)' and raises the live limit to target if target > current. However, if newMin itself exceeds max (a configuration error caught by validateMinMax elsewhere), this silently caps the live limit at max rather than the requested newMin. The poolMins map is updated to newMin, but the live allocator limit is set to max, creating a discrepancy between the recorded min and the actual enforced limit. Subsequent rebalance ticks will see poolMins[pool] = newMin but the live limit capped at max, potentially causing the rebalancer to thrash or fail to honor the intended guarantee.

public void setPoolMin(String poolName, long newMin) {
    ArrowPoolHandle handle = pools.get(poolName);
    if (handle == null) {
        throw new IllegalStateException("Pool '" + poolName + "' does not exist");
    }
    poolMins.put(poolName, newMin);
    long max = poolMaxes.getOrDefault(poolName, Long.MAX_VALUE);
    long current = handle.allocator.getLimit();
    long target = Math.min(newMin, max);
    if (target > current) {
        handle.allocator.setLimit(target);
    }
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 19, 2026

PR Code Suggestions ✨

Latest suggestions up to d4df93c

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Validate sum of pool mins for overflow

When rootLimit is Long.MAX_VALUE, the validator returns early without checking for
overflow in the sum of mins. If the sum of mins overflows but rootLimit is
unbounded, the overflow goes undetected. Consider validating the sum even when
rootLimit is MAX_VALUE to catch configuration errors early.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [374-386]

 static void validateMinSum(long rootLimit, long... mins) {
-    if (rootLimit == Long.MAX_VALUE) {
-        return;
-    }
     long sum = 0;
     for (long min : mins) {
         try {
             sum = Math.addExact(sum, min);
         } catch (ArithmeticException overflow) {
             throw new IllegalArgumentException("Sum of pool minimums overflows.", overflow);
         }
     }
-    if (sum > rootLimit) {
-        ...
+    if (rootLimit != Long.MAX_VALUE && sum > rootLimit) {
+        throw new IllegalArgumentException(
+            "Sum of pool minimums (" + sum + ") exceeds root limit (" + rootLimit + ")"
+        );
     }
 }
Suggestion importance[1-10]: 6

__

Why: Valid observation that overflow checking is skipped when rootLimit is Long.MAX_VALUE. The suggested change would catch configuration errors where operators set pool mins that overflow even when the root is unbounded. This improves robustness by detecting invalid configurations earlier, though it's a relatively minor edge case.

Low
Consolidate redundant exception handling branches

The method catches IllegalStateException and UnsupportedOperationException
separately but logs nearly identical messages. The isSpillLimitDynamic() guard above
should make the UnsupportedOperationException branch unreachable. Consider removing
the redundant catch block or consolidating the error handling to reduce code
duplication.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [350-375]

 void updateSpillMemoryLimit(long newLimitBytes) {
     DataFusionService service = dataFusionService;
     if (service == null) {
         logger.debug("DataFusion service not yet initialized; ignoring spill limit update to {}B", newLimitBytes);
         return;
     }
     if (!service.isSpillLimitDynamic()) {
         logger.warn(
             "Updated DataFusion spill memory limit to {}B at the cluster level; the loaded native library does not "
                 + "support runtime spill resize, so the new value will only take effect after a node restart",
             newLimitBytes
         );
         return;
     }
     try {
         service.setSpillMemoryLimit(newLimitBytes);
         logger.info("Updated DataFusion spill memory limit to {}B", newLimitBytes);
-    } catch (IllegalStateException e) {
-        logger.warn("Ignoring spill memory limit update to {}B; service is not running", newLimitBytes);
-    } catch (UnsupportedOperationException e) {
-        logger.warn("Ignoring spill memory limit update to {}B; native runtime does not support live updates", newLimitBytes);
+    } catch (IllegalStateException | UnsupportedOperationException e) {
+        logger.warn("Ignoring spill memory limit update to {}B; {}", newLimitBytes, e.getMessage());
     }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that the UnsupportedOperationException catch is defensive (the isSpillLimitDynamic() guard should prevent it) and that consolidating the catch blocks would reduce duplication. However, the defensive catch may be intentional to guard against race conditions, and the improved code loses the specific error messages for each exception type.

Low
Document intentional truncation in headroom distribution

The division headroom / poolCount can lose precision when headroom is not evenly
divisible by poolCount. Consider using a more precise distribution strategy or
documenting the intentional truncation behavior to avoid subtle allocation
imbalances across pools.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [247]

+// Integer division intentionally truncates remainder; pools compete for headroom on next tick.
 long bonusPerPool = poolCount > 0 ? headroom / poolCount : 0;
Suggestion importance[1-10]: 3

__

Why: The suggestion correctly identifies that integer division truncates the remainder, but this is standard behavior and the code is clear. Adding a comment would be marginally helpful for maintainability, but the impact is minimal since the behavior is expected and documented in the method's Javadoc.

Low

Previous suggestions

Suggestions up to commit 3ab722f
CategorySuggestion                                                                                                                                    Impact
General
Prevent arithmetic overflow in rebalance calculation

The addition min + bonusPerPool can overflow when both values are large. Use
Math.addExact or a checked addition to detect overflow and handle it gracefully,
preventing silent wraparound to negative values.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [232-239]

-long effectiveLimit = min + bonusPerPool;
+long effectiveLimit;
+try {
+    effectiveLimit = Math.addExact(min, bonusPerPool);
+} catch (ArithmeticException overflow) {
+    effectiveLimit = Long.MAX_VALUE;
+}
 
 // Cap at pool's max
 effectiveLimit = Math.min(effectiveLimit, max);
 
 // Never drop below current allocation or min
 effectiveLimit = Math.max(effectiveLimit, currentAllocation);
 effectiveLimit = Math.max(effectiveLimit, min);
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential overflow in min + bonusPerPool. However, the suggested fix of clamping to Long.MAX_VALUE on overflow may not be the desired behavior in all cases. The issue is valid and should be addressed, but the specific handling strategy should be carefully considered.

Medium
Prevent overflow in percentage calculation

The multiplication nativeLimit.getBytes() * 75 can overflow when the limit is very
large (e.g., close to Long.MAX_VALUE / 75). Use checked arithmetic or reorder the
operation to divide first, then multiply to prevent overflow.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [106-116]

 static String deriveMemoryPoolLimitDefault(Settings settings) {
     ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
     if (nativeLimit.getBytes() <= 0) {
         return Long.toString(Long.MAX_VALUE);
     }
-    long pool = Math.max(0L, nativeLimit.getBytes() * 75 / 100);
+    long bytes = nativeLimit.getBytes();
+    long pool = (bytes / 100) * 75 + (bytes % 100) * 75 / 100;
     return Long.toString(pool);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential overflow when nativeLimit.getBytes() * 75 is computed for very large values. The proposed solution of dividing first is mathematically sound and prevents overflow, though it introduces slightly more complex arithmetic with the modulo operation to maintain precision.

Medium
Include overflow values in error message

The overflow check is correct, but the error message should include the actual
values that caused the overflow to aid debugging. Consider adding the current sum
and the min value being added to the exception message.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [377-381]

 try {
     sum = Math.addExact(sum, min);
 } catch (ArithmeticException overflow) {
-    throw new IllegalArgumentException("Sum of pool minimums overflows.", overflow);
+    throw new IllegalArgumentException(
+        "Sum of pool minimums overflows. Current sum: " + sum + ", attempting to add: " + min,
+        overflow
+    );
 }
Suggestion importance[1-10]: 5

__

Why: Adding the actual values that caused the overflow would improve debugging, but the current error message is already functional. This is a minor enhancement to error reporting rather than a critical fix.

Low
Suggestions up to commit f4cbdd2
CategorySuggestion                                                                                                                                    Impact
General
Handle null registry before loop

When registry == null, the method continues the loop but still reads name and
payload from the stream. If the registry is null for the entire map, all entries are
skipped but the stream position advances correctly. However, the early continue
inside the loop means the result map remains empty even when size > 0. Consider
breaking out of the loop entirely when registry is null to avoid unnecessary stream
reads.

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java [279-309]

 private static Map<String, PluginNodeStats> readPluginStats(StreamInput in) throws IOException {
     int size = in.readVInt();
     if (size == 0) {
         return Collections.emptyMap();
     }
     NamedWriteableRegistry registry = in.namedWriteableRegistry();
+    if (registry == null) {
+        for (int i = 0; i < size; i++) {
+            in.readString();
+            in.readBytesReference();
+        }
+        return Collections.emptyMap();
+    }
     Map<String, PluginNodeStats> result = new HashMap<>(size);
     for (int i = 0; i < size; i++) {
         String name = in.readString();
         BytesReference payload = in.readBytesReference();
-        if (registry == null) {
-            continue;
-        }
         try (
             StreamInput rawIn = payload.streamInput();
             NamedWriteableAwareStreamInput payloadIn = new NamedWriteableAwareStreamInput(rawIn, registry)
         ) {
             payloadIn.setVersion(in.getVersion());
             PluginNodeStats stats = payloadIn.readNamedWriteable(PluginNodeStats.class);
             result.put(name, stats);
         } catch (IOException | IllegalArgumentException e) {
             ...
         }
     }
     return result;
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that when registry == null, the current code continues reading entries but discards them all. The improved version short-circuits by consuming the stream entries upfront and returning early, which is more efficient and clearer. This is a good optimization that avoids unnecessary object allocations and try-catch blocks.

Low
Validate min before updating poolMins

The method updates poolMins before validating that newMin is within bounds (e.g.,
newMin <= max). If newMin > max, the recorded min will be inconsistent with the
actual limit set. Consider validating newMin <= max before updating poolMins to
maintain invariant consistency.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [160-172]

 public void setPoolMin(String poolName, long newMin) {
     ArrowPoolHandle handle = pools.get(poolName);
     if (handle == null) {
         throw new IllegalStateException("Pool '" + poolName + "' does not exist");
     }
+    long max = poolMaxes.getOrDefault(poolName, Long.MAX_VALUE);
+    if (newMin > max) {
+        throw new IllegalArgumentException("Pool '" + poolName + "' min (" + newMin + ") exceeds max (" + max + ")");
+    }
     poolMins.put(poolName, newMin);
-    long max = poolMaxes.getOrDefault(poolName, Long.MAX_VALUE);
     long current = handle.allocator.getLimit();
     long target = Math.min(newMin, max);
     if (target > current) {
         handle.allocator.setLimit(target);
     }
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that poolMins is updated before validation, which could lead to inconsistent state if newMin > max. However, the current code uses Math.min(newMin, max) to cap the target, so the actual limit set is always consistent. Adding explicit validation would improve clarity and fail-fast behavior, making it a moderate improvement.

Low
Add negative sum check after addition

The overflow check using Math.addExact is correct, but the final comparison sum >
rootLimit can still overflow when sum wraps around to a negative value before the
exception is thrown. Consider adding an explicit check for negative sum after each
addition to catch overflow earlier and provide clearer diagnostics.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [376-381]

 private static void validateMinSum(long rootLimit, long... mins) {
     if (rootLimit == Long.MAX_VALUE) {
         return;
     }
     long sum = 0;
     for (long min : mins) {
         try {
             sum = Math.addExact(sum, min);
         } catch (ArithmeticException overflow) {
             throw new IllegalArgumentException("Sum of pool minimums overflows.", overflow);
         }
+        if (sum < 0) {
+            throw new IllegalArgumentException("Sum of pool minimums overflowed to negative value.");
+        }
     }
     if (sum > rootLimit) {
         throw new IllegalArgumentException(
             "Sum of pool minimums (" + sum + ") exceeds root limit (" + rootLimit + ")"
         );
     }
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion is technically redundant because Math.addExact already throws ArithmeticException on overflow before the sum can wrap to negative. The additional check for sum < 0 would never be reached in overflow scenarios, making it unnecessary defensive code that adds no real value.

Low
Suggestions up to commit 7abcfce
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent silent overflow in percentage calculation

Use Math.multiplyExact to detect overflow when computing the default root limit. If
nativeLimit.getBytes() is close to Long.MAX_VALUE, the multiplication
nativeLimit.getBytes() * 20 can overflow silently, producing a negative or incorrect
result. Wrap the multiplication in a try-catch to fall back to Long.MAX_VALUE on
overflow.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [89-95]

 static String deriveRootLimitDefault(Settings settings) {
     ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
     if (nativeLimit.getBytes() <= 0) {
         return Long.toString(Long.MAX_VALUE);
     }
-    return Long.toString(nativeLimit.getBytes() * 20 / 100);
+    try {
+        long result = Math.multiplyExact(nativeLimit.getBytes(), 20) / 100;
+        return Long.toString(result);
+    } catch (ArithmeticException overflow) {
+        return Long.toString(Long.MAX_VALUE);
+    }
 }
Suggestion importance[1-10]: 7

__

Why: The multiplication nativeLimit.getBytes() * 20 can overflow when nativeLimit is close to Long.MAX_VALUE, producing incorrect results. Using Math.multiplyExact with overflow handling ensures correctness and prevents silent failures.

Medium
Prevent silent overflow in pool calculation

Use Math.multiplyExact to detect overflow when computing pool max defaults. The
multiplication nativeLimit.getBytes() * percent can overflow silently if the native
limit is large, producing incorrect pool caps. Wrap in a try-catch to fall back to
Long.MAX_VALUE on overflow.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [218-225]

 static String derivePoolMaxDefault(Settings settings, int percent) {
     ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
     if (nativeLimit.getBytes() <= 0) {
         return Long.toString(Long.MAX_VALUE);
     }
-    long pool = Math.max(0L, nativeLimit.getBytes() * percent / 100);
-    return Long.toString(pool);
+    try {
+        long pool = Math.multiplyExact(nativeLimit.getBytes(), percent) / 100;
+        return Long.toString(Math.max(0L, pool));
+    } catch (ArithmeticException overflow) {
+        return Long.toString(Long.MAX_VALUE);
+    }
 }
Suggestion importance[1-10]: 7

__

Why: The multiplication nativeLimit.getBytes() * percent can overflow silently for large native limits, producing incorrect pool caps. Using Math.multiplyExact with overflow handling prevents this issue.

Medium
Prevent silent overflow in memory calculation

Use Math.multiplyExact to detect overflow when computing the DataFusion memory pool
limit. The multiplication nativeLimit.getBytes() * 75 can overflow silently for
large native limits, producing incorrect pool sizes. Wrap in a try-catch to fall
back to Long.MAX_VALUE on overflow.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [106-116]

 static String deriveMemoryPoolLimitDefault(Settings settings) {
     ByteSizeValue nativeLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
     if (nativeLimit.getBytes() <= 0) {
         return Long.toString(Long.MAX_VALUE);
     }
-    long pool = Math.max(0L, nativeLimit.getBytes() * 75 / 100);
-    return Long.toString(pool);
+    try {
+        long pool = Math.multiplyExact(nativeLimit.getBytes(), 75) / 100;
+        return Long.toString(Math.max(0L, pool));
+    } catch (ArithmeticException overflow) {
+        return Long.toString(Long.MAX_VALUE);
+    }
 }
Suggestion importance[1-10]: 7

__

Why: The multiplication nativeLimit.getBytes() * 75 can overflow for large native limits, producing incorrect pool sizes. Using Math.multiplyExact with overflow handling ensures correctness.

Medium
Prevent silent overflow in limit calculation

Use Math.multiplyExact to detect overflow when computing the native memory limit
default. The multiplication offHeap * 79 can overflow silently if offHeap is large,
producing incorrect memory limits. Wrap in a try-catch to fall back to "0b" on
overflow.

server/src/main/java/org/opensearch/node/resource/tracker/ResourceTrackerSettings.java [131-143]

 static String deriveNativeMemoryLimitDefault() {
     long ram = OsProbe.getInstance().getTotalPhysicalMemorySize();
     long heap = Runtime.getRuntime().maxMemory();
     if (ram <= 0 || heap <= 0 || heap >= ram) {
         return "0b";
     }
     long offHeap = ram - heap;
-    return Long.toString(offHeap * 79 / 100) + "b";
+    try {
+        long limit = Math.multiplyExact(offHeap, 79) / 100;
+        return Long.toString(limit) + "b";
+    } catch (ArithmeticException overflow) {
+        return "0b";
+    }
 }
Suggestion importance[1-10]: 7

__

Why: The multiplication offHeap * 79 can overflow silently if offHeap is large, producing incorrect memory limits. Using Math.multiplyExact with overflow handling prevents this issue and falls back to a safe default.

Medium
Suggestions up to commit c428701
CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate divisor is positive before division

When divisor is retrieved from the supplier, there is no validation that it is
positive. If the supplier returns zero or negative (due to a race or
misconfiguration), the division will throw ArithmeticException or produce incorrect
results. Add a guard to ensure divisor > 0.

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/memory/ArrowBufferPool.java [76-80]

 public BufferAllocator createChildAllocator(String name) {
     long limit = poolAllocator.getLimit();
     int divisor = divisorSupplier.getAsInt();
+    if (divisor <= 0) {
+        throw new IllegalStateException(
+            "Invalid divisor from supplier: " + divisor + "; must be > 0"
+        );
+    }
     long maxChildAllocation = limit == Long.MAX_VALUE ? Long.MAX_VALUE : limit / divisor;
     return poolAllocator.newChildAllocator(name, 0, maxChildAllocation);
 }
Suggestion importance[1-10]: 8

__

Why: This is a valid defensive programming suggestion. While the MAX_PER_VSR_ALLOCATION_DIVISOR setting has validation (min=1), the divisorSupplier is a generic IntSupplier that could theoretically return invalid values due to race conditions or implementation bugs. Adding a runtime guard prevents potential ArithmeticException or incorrect behavior.

Medium
General
Include overflow values in error message

The overflow check uses Math.addExact but the error message does not include the
actual values that caused the overflow. Include the current sum and the min being
added to aid debugging when operators hit this validation error.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [369-373]

 try {
     sum = Math.addExact(sum, min);
 } catch (ArithmeticException overflow) {
-    throw new IllegalArgumentException("Sum of pool minimums overflows.", overflow);
+    throw new IllegalArgumentException(
+        "Sum of pool minimums overflows: current sum=" + sum + ", adding min=" + min,
+        overflow
+    );
 }
Suggestion importance[1-10]: 7

__

Why: Adding the actual values that caused the overflow to the error message would improve debuggability. The suggestion correctly identifies that the current error message lacks context about which values triggered the overflow, making it harder for operators to diagnose configuration issues.

Medium
Log skipped plugin stats entries

The catch block silently drops unknown plugin stats entries without logging. During
rolling upgrades or plugin mismatches, operators have no visibility into which stats
were skipped. Add a debug-level log statement with the plugin name and exception
message.

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java [295-307]

 try (
     StreamInput rawIn = payload.streamInput();
     NamedWriteableAwareStreamInput payloadIn = new NamedWriteableAwareStreamInput(rawIn, registry)
 ) {
     payloadIn.setVersion(in.getVersion());
     PluginNodeStats stats = payloadIn.readNamedWriteable(PluginNodeStats.class);
     result.put(name, stats);
 } catch (IOException | IllegalArgumentException e) {
-    // Receiver doesn't have the plugin's NamedWriteable registered (typical
-    // during rolling upgrades or in a non-uniform plugin install). Drop the
-    // entry; the rest of NodeStats remains decodable.
+    logger.debug("Skipping unknown plugin stats entry '{}': {}", name, e.getMessage());
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion to add debug logging for skipped plugin stats entries is valuable for operational visibility during rolling upgrades or plugin mismatches. The current silent drop makes it difficult to diagnose why certain stats are missing. However, a logger instance would need to be added to the class first.

Medium
Distribute headroom only to eligible pools

The rebalance logic distributes headroom equally across all pools, but does not
account for pools that have already reached their max. This can lead to wasted
headroom when some pools are capped. Consider distributing only to pools that can
accept more capacity (current limit < max).

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [240]

-long bonusPerPool = poolCount > 0 ? headroom / poolCount : 0;
+List<String> eligiblePools = new ArrayList<>();
+for (Map.Entry<String, ArrowPoolHandle> entry : pools.entrySet()) {
+    long max = poolMaxes.getOrDefault(entry.getKey(), Long.MAX_VALUE);
+    if (entry.getValue().allocator.getLimit() < max) {
+        eligiblePools.add(entry.getKey());
+    }
+}
+long bonusPerPool = eligiblePools.size() > 0 ? headroom / eligiblePools.size() : 0;
Suggestion importance[1-10]: 6

__

Why: The suggestion to distribute headroom only to pools that haven't reached their max is a valid optimization. However, the current implementation is intentional per the PR's design (distributing to all pools avoids the dead-pool corner case where a pool with min=0 starts at limit=0). The suggestion would reintroduce that issue, so while technically correct as an alternative approach, it contradicts the PR's stated goals.

Low
Suggestions up to commit 40688ad
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent arithmetic overflow in calculation

The arithmetic acLimit.getBytes() * bufferPercent / 100L can overflow when acLimit
is near Long.MAX_VALUE and bufferPercent is non-zero. Use Math.multiplyExact or
restructure the calculation to avoid overflow, ensuring the derived default remains
valid.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [84-92]

 static String deriveRootLimitDefault(Settings settings) {
     ByteSizeValue acLimit = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings);
     if (acLimit.getBytes() <= 0) {
         return Long.toString(Long.MAX_VALUE);
     }
     int bufferPercent = ResourceTrackerSettings.NODE_NATIVE_MEMORY_BUFFER_PERCENT_SETTING.get(settings);
-    long usable = acLimit.getBytes() - (acLimit.getBytes() * bufferPercent / 100L);
+    long bytes = acLimit.getBytes();
+    long buffer = bytes / 100L * bufferPercent;
+    long usable = bytes - buffer;
     return Long.toString(Math.max(0L, usable));
 }
Suggestion importance[1-10]: 7

__

Why: The calculation acLimit.getBytes() * bufferPercent / 100L can overflow when acLimit is near Long.MAX_VALUE. The suggested restructuring (bytes / 100L * bufferPercent) reduces overflow risk by dividing first, though it may introduce minor precision loss. This is a valid concern for edge cases.

Medium
General
Handle null registry more efficiently

When registry == null, the loop continues reading name and payload but discards
them. If the outer loop runs size times with a null registry, the method reads and
discards all entries but returns an empty map. Consider breaking early or logging a
warning so operators know entries were dropped.

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java [279-309]

 private static Map<String, PluginNodeStats> readPluginStats(StreamInput in) throws IOException {
     int size = in.readVInt();
     if (size == 0) {
         return Collections.emptyMap();
     }
     NamedWriteableRegistry registry = in.namedWriteableRegistry();
+    if (registry == null) {
+        for (int i = 0; i < size; i++) {
+            in.readString();
+            in.readBytesReference();
+        }
+        return Collections.emptyMap();
+    }
     Map<String, PluginNodeStats> result = new HashMap<>(size);
     for (int i = 0; i < size; i++) {
         String name = in.readString();
         BytesReference payload = in.readBytesReference();
-        if (registry == null) {
-            continue;
-        }
         try (
             StreamInput rawIn = payload.streamInput();
             NamedWriteableAwareStreamInput payloadIn = new NamedWriteableAwareStreamInput(rawIn, registry)
         ) {
             payloadIn.setVersion(in.getVersion());
             PluginNodeStats stats = payloadIn.readNamedWriteable(PluginNodeStats.class);
             result.put(name, stats);
         } catch (IOException | IllegalArgumentException e) {
             // ...
         }
     }
     return result;
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion to break early when registry == null is a minor optimization that avoids unnecessary map allocation. However, the null-registry case is defensive (production paths always set a registry), so the impact is minimal. The improved code is cleaner but not critical.

Low
Document threading constraints for listeners

Listener invocation is synchronous on the caller thread. If a listener blocks or
takes locks held by the caller, it can deadlock the resize path. Document this
constraint in the method javadoc and consider adding a timeout or async dispatch for
safety.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [166-174]

+/**
+ * Notifies all registered listeners of a pool limit change. Each listener
+ * is invoked synchronously on the caller thread; exceptions thrown by one
+ * listener are logged and isolated so they do not block the others or the
+ * caller. Listeners must not block or take locks held by the caller.
+ */
 private void fireListeners(String poolName, long newLimit) {
     for (NativeAllocatorListener listener : listeners) {
         try {
             listener.onPoolLimitChanged(poolName, newLimit);
         } catch (Exception e) {
             logger.warn("NativeAllocatorListener threw on pool [{}] limit update", poolName, e);
         }
     }
 }
Suggestion importance[1-10]: 3

__

Why: The threading constraint is already documented in NativeAllocatorListener interface javadoc (lines 22-25 of NativeAllocatorListener.java). Adding redundant documentation to the private fireListeners method provides minimal value, though it doesn't hurt.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 9188043: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from 9188043 to d419b2a Compare May 19, 2026 09:10
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 19, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit d646dd4.

PathLineSeverityDescription
plugins/arrow-flight-rpc/build.gradle25highNew dependency added: `compileOnly project(':libs:opensearch-arrow-spi')`. Per mandatory flagging rule, all dependency additions must be flagged regardless of apparent legitimacy. This is an internal monorepo project reference rather than an external registry artifact.
sandbox/plugins/analytics-backend-datafusion/build.gradle51highMultiple new dependencies added: `compileOnly project(':libs:opensearch-arrow-spi')`, `compileOnly project(':plugins:arrow-base')`, and `testImplementation project(':plugins:arrow-base')`. Per mandatory flagging rule, all dependency additions must be flagged. These are internal monorepo project references.
sandbox/plugins/analytics-engine/build.gradle66highNew dependency added (`compileOnly project(':libs:opensearch-arrow-spi')`) and dependency scope changed from `testRuntimeOnly` to `testImplementation` for `project(':plugins:arrow-base')`. Per mandatory flagging rule, all dependency changes must be flagged.
plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java86low`ensureForTesting()` is a `public static` method in production code that forcibly closes any existing static `INSTANCE` singleton and replaces it unconditionally. Documented as test-only but accessible to any caller at runtime, including plugin code; misuse could disrupt the node-wide allocator lifecycle outside of tests.

The table above displays the top 10 most important findings.

Total: 4 | Critical: 0 | High: 3 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from d419b2a to 10f3617 Compare May 19, 2026 10:00
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 498b502

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 498b502: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Comment on lines +30 to +39
public interface NativeAllocatorListener {

/**
* Invoked after a pool's limit has been updated.
*
* @param poolName logical pool name
* @param newLimit the new limit in bytes
*/
void onPoolLimitChanged(String poolName, long newLimit);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use ClusterSetting's update consumer instead.

Copy link
Copy Markdown
Contributor Author

@gaurav-amz gaurav-amz May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the FlightTransport / AnalyticsSearchService / DefaultPlanExecutor migration, the SPI had zero production callers.

Copy link
Copy Markdown
Contributor

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to reject the request, throw OpenSearchRejectedException rather than oom-ing. Also lets try to see how we can wire up circuit-breaker for this. Maybe circuit-breaker stats is something we leverage for tracking memory used

// and capped by the framework alongside ingest, query, and datafusion. Hard-fail if
// the framework plugin is missing — silently falling back to a separate root would
// break the same-root invariant for cross-plugin Arrow handoff.
ArrowNativeAllocator nativeAllocator = ArrowNativeAllocator.instance();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the way of providing ArrowNativeAllocator.

See ArrowAllocatorService as the example.

Initialize:

// ArrowBasePlugin component: node-level Arrow allocator service for cross-plugin zero-copy
this.allocatorService = new DefaultArrowAllocatorService(allocator);

And return in createComponents, so will be part of GUICE

How plugin get it:
From plugin's createComponents

this.allocatorService = pluginComponentRegistry.getComponent(ArrowAllocatorService.class)
.orElseThrow(() -> new IllegalStateException("ArrowAllocatorService not available; arrow-base plugin must be installed"));

Or through GUICE

And we can remove the ArrowAllocatorService using this PR, as it's for providing the root allocator to other extension plugin. And ArrowNativeAllocator should be the one doing that now, through this getPoolAllocator.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for point this out, will do this change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bowenlan-amzn
Copy link
Copy Markdown
Member

This PR seems to be the good place to get some alignment on the memory assignment across different components in the system.
Let's take 64G RAM as example.

64 GB RAM
├── JVM Heap (16 GB / 25%)
│   ├── Indexing pressure, Writer buffer
│   ├── Transport Netty
│   └── etc. ...
|
└── Off-Heap (48 GB / 75%)
    │
    ├── node.native_memory.limit (40 G)
    │   │
    │   ├── Java Arrow Root (12G / 30%)
    │   │   ├── POOL_FLIGHT  (2G / 5%)
    │   │   │   └── flight/server, flight/client — batch ser/deser
    │   │   ├── POOL_INGEST  (4G / 10%)
    │   │   │   └── ArrowBufferPool children (per-VSR, pool_limit/divisor)
    │   │   └── POOL_QUERY   (2G / 5%)
    │   │       └── analytics-search-service → per-query children (256MB each)
    │   │
    │   └── Rust Memory Pool (28G / 70%)
    │       └── DataFusion Rust pool (datafusion.memory_pool_limit_bytes)
    │           ├── hash tables, sort buffers, aggs, intermediate batches
    │           └── spill staging (datafusion.spill_memory_limit_bytes)
    │
    └── Rest (no knobs, at least 8G)
        ├── Lucene mmap (~2 GB)
        └── Page Cache (whatever's left)

I think the Java Arrow memory are mostly used for intermediate data transfer, while Rust memory are used for query execution. That's one reason to provide more to Rust side.

@Bukhtawar @alchemist51 @mch2 @expani

@alchemist51 alchemist51 added the skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis. label May 19, 2026
@alchemist51 alchemist51 reopened this May 19, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit d646dd4

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for d646dd4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from d646dd4 to 53c0e36 Compare May 19, 2026 20:10
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 53c0e36

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e1f3c5b

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for e1f3c5b: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from e1f3c5b to 041dc0e Compare May 19, 2026 20:26
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 041dc0e

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 041dc0e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Copy Markdown
Member

@bowenlan-amzn bowenlan-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overral looks good! Thanks for refactoring the allocator related changes in arrow-base.
Left some comments that should be easy to accommodate. Approving now.

*
* <p>Default: empty.
*/
public List<PluginNodeStats> nodeStats() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope we can have a ExperimentalApi annotation here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

IndexSettings indexSettings,
ThreadPool threadPool
ThreadPool threadPool,
org.opensearch.arrow.allocator.ArrowNativeAllocator nativeAllocator
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use import. And all the other places

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +157 to +158
BufferAllocator flightPool = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_FLIGHT);
flightAllocator = flightPool.newChildAllocator("flight", 0, flightPool.getLimit());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not necessary to create flightAllocator here, the flightPool should be the flightAllocator.

For the 2 child allocator, server and client, can we set the limit to be Long.MAX_VALUE. I think they will still be bound by their parent which is the POOL_FLIGHT here.

This way seems we also don't need the listener setup here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The intermediate flightAllocator is gone, flightPool is the parent directly, and serverAllocator / clientAllocator are children at Long.MAX_VALUE.

Comment on lines +98 to +99
BufferAllocator queryPool = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_QUERY);
this.allocator = queryPool.newChildAllocator("analytics-search-service", 0, queryPool.getLimit());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same shape applied.

Comment on lines +118 to +119
BufferAllocator queryPool = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_QUERY);
this.coordinatorAllocator = queryPool.newChildAllocator("coordinator", 0, queryPool.getLimit());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 40688ad

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 40688ad: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c428701

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for c428701: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

gaurav-amz added a commit to gaurav-amz/OpenSearch that referenced this pull request May 20, 2026
… plugin nodeStats — review-feedback edition

Builds on opensearch-project#21703 to add the query, datafusion with allocator, dynamic-tuning, stats,
and pool-wiring pieces. This commit is the consolidated version after addressing
review feedback on PR opensearch-project#21732.

## Architecture: three native-memory trackers, three knobs

Each tracker owns the bytes it can actually see, with a process-level cap above all
of them. On a 64 GB / 16 GB-heap node the defaults compose to:

  RAM                                            64 GB
  JVM heap (operator-configured -Xmx)            16 GB
  Off-heap (RAM - heap)                          48 GB
  node.native_memory.limit  (79% of off-heap)    37.92 GB     ← AC throttle threshold
    native.allocator.root.limit (20% of NM)       7.58 GB     ← Arrow framework cap
      pool.flight.max  (5% of NM)                 1.90 GB     ← Flight transport pool
      pool.ingest.max  (8% of NM)                 3.03 GB     ← Parquet VSR pool
      pool.query.max   (5% of NM)                 1.90 GB     ← analytics-engine pool
    datafusion.memory_pool_limit (75% of NM)     28.44 GB     ← Rust runtime pool (sibling to Arrow)
  Unmanaged                                      10.08 GB     ← Lucene mmap, OS page cache, etc.

  Independent (disk budget):
    datafusion.spill_memory_limit_bytes (50% of physical RAM)  32 GB

## What this commit does

### 1. Dynamic settings + cluster-update consumers

All native-allocator settings are Setting.Property.Dynamic. Cluster-settings update
consumers in ArrowBasePlugin#registerSettingsUpdateConsumers wire PUTs to live
ArrowNativeAllocator state. The grouped validator (validateUpdate) rejects
cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time
with HTTP 400.

### 2. Query and DataFusion pools

POOL_QUERY added for analytics-engine per-query allocators (children of
nativeAllocator.getPoolAllocator(POOL_QUERY)). DataFusion gets its own setting
(datafusion.memory_pool_limit_bytes) since DataFusion's working memory lives
entirely on the Rust side and is reported only through DataFusion's MemoryPool
— a Java-side pool that pretended to track it would either need a per-allocation
FFM round-trip (performance disaster) or be a config-only mirror (HTTP 200 with
no observable effect).

POOL_DATAFUSION is intentionally absent; the Rust-side
datafusion.memory_pool_limit_bytes is the honest knob for that layer.

### 3. Plugin#nodeStats SPI

New SPI method Plugin#nodeStats() returning List<PluginNodeStats>. NodeStats
serializes plugin-contributed payloads via NamedWriteable, surfacing them under
_nodes/stats. Annotated @experimentalapi to match PluginNodeStats.

### 4. Pool wiring through Guice (no static singleton)

ArrowNativeAllocator is constructed in ArrowBasePlugin#createComponents and
returned to Node.java which auto-binds it via Guice. Consumers
(AnalyticsSearchService, FlightTransport, DefaultPlanExecutor) receive it through
constructor @Inject — no static instance() / INSTANCE / ensureForTesting.

### 5. Drop NativeAllocatorListener SPI [@bowenlan-amzn #3269655712, #3269660170,
   @Bukhtawar #3267186318]

The original design used NativeAllocatorListener to push pool-limit changes into
consumer-side child allocators that captured the pool's limit at construction
time. With children created at Long.MAX_VALUE, Arrow's Accountant.allocate (lines
191-203 of Accountant.java in arrow-java v18.3.0) walks the parent chain on every
allocation and checks the parent's allocationLimit — so dynamic resizes of
parquet.native.pool.{flight,query}.max take effect immediately on subsequent
allocations through any descendant. The listener was emulating this Arrow-native
behavior and is no longer needed.

  - AnalyticsSearchService: drop poolListener; child uses Long.MAX_VALUE
  - DefaultPlanExecutor: drop poolListener; coordinator uses Long.MAX_VALUE
  - FlightTransport: drop intermediate flightAllocator + poolListener; pool is
    the parent, server/client are Long.MAX_VALUE children
  - libs/arrow-spi/.../NativeAllocatorListener.java: deleted
  - NativeAllocator interface: addListener / removeListener removed
  - ArrowNativeAllocator: listeners field, fireListeners method, and listener
    calls in setPoolLimit/setPoolMin/rebalance removed

The rebalancer continues to function as before: it changes pool limits via
setLimit, which children read via Arrow's parent-cap check at allocateBytes.
Rebalancer is off by default.

### 6. Fix Guice duplicate-binding regression on ArrowNativeAllocator

Removed the redundant ArrowBasePlugin#createGuiceModules override. Node.java
line 1748 already binds every component returned from createComponents:

    pluginComponents.stream()
        .forEach(p -> b.bind((Class) p.getClass()).toInstance(p));

ArrowBasePlugin#createComponents already returns the allocator, so the explicit
bind(ArrowNativeAllocator.class).toInstance(allocator) in createGuiceModules
registered the same binding a second time and caused Guice CreationException at
cluster startup. This was the root cause of the gradle-check Jenkins CI failures
on commits 9188043, 498b502, d646dd4, e1f3c5b, 041dc0e — every integration test
failed during MockNode construction.

### 7. Memory defaults aligned to the partitioning model

  - node.native_memory.limit defaults to 79% × (RAM - heap) via OsProbe (cgroup-
    aware: container memory limit on K8s/Docker, total physical RAM on bare
    metal). Falls back to ZERO if probe fails or heap >= RAM, preserving the
    pre-default opt-in semantics.

  - native.allocator.root.limit defaults to 20% × node.native_memory.limit.

  - parquet.native.pool.{flight,ingest,query}.max default to 5%/8%/5% of
    node.native_memory.limit (sum 18% < root's 20%, leaving 2 pp headroom inside
    the framework cap).

  - datafusion.memory_pool_limit_bytes defaults to 75% × node.native_memory.limit
    (called out by @bharath-techie #3271093086: prior default
    Runtime.maxMemory() / 4 was JVM-heap derived, wrong baseline for an off-heap
    runtime).

  - datafusion.spill_memory_limit_bytes defaults to 50% × physical RAM,
    independent of node.native_memory.limit (spill is a disk-staging budget,
    not a memory budget).

Behavior change to call out in upgrade notes: admission control is now active
by default. Operators wanting pre-existing opt-out behavior can set
node.native_memory.limit: 0b — all framework caps then fall back to
Long.MAX_VALUE unbounded mode.

### 8. PLUGIN_STATS metric is opt-in

Plugin-contributed nodeStats are gated behind a new NodesStatsRequest.Metric
(plugin_stats) so that operators not opting in don't pay the serialization cost
on every _nodes/stats poll. Default for cluster-stats is opt-out (false).

### 9. @experimentalapi on Plugin#nodeStats() [@bowenlan-amzn #3269563833]

Annotate the new SPI hook to match the @experimentalapi annotation already
present on PluginNodeStats.

### 10. FQN -> import nit in ParquetIndexingEngine [@bowenlan-amzn #3269589758]

Replace 3 fully-qualified org.opensearch.arrow.allocator.ArrowNativeAllocator
references in constructor parameters with a single import.

### 11. Integration tests for cap-enforcement boundaries

Added NativeAllocatorBoundaryIT under
plugins/arrow-flight-rpc/src/internalClusterTest. Three tests boot a single-node
cluster with tight memory settings (root=16 MiB, pool maxes=16 MiB) and exercise
real Arrow allocations via the Guice-injected ArrowNativeAllocator:

  - testPoolMaxRejectsAllocationsBeyondCap: PUT pool.query.max=4 MiB, verify a
    8 MiB request through the pool throws OutOfMemoryException.
  - testRootLimitRejectsAllocationsBeyondCap: hold 8 MiB in FLIGHT, verify a
    16 MiB QUERY request fails at the root level (8+16 > 16 root cap) even
    though QUERY's own max would individually allow it.
  - testDynamicPoolResizeAffectsInFlightAllocations: create child at
    Long.MAX_VALUE (the AnalyticsSearchService / DefaultPlanExecutor pattern
    post-listener-removal), verify a 2 MiB allocation succeeds, PUT
    query.max=1 MiB via cluster settings, verify a 2 MiB request now throws
    OOM via Arrow's parent-cap check at allocateBytes.

These cover the boundaries the framework is supposed to enforce. The third test
specifically verifies the listener-replacement contract: dynamic pool resize
takes effect on in-flight Long.MAX_VALUE children without any listener
machinery, exactly as Arrow's parent-cap check provides natively.

## How a query flows through these layers

Take a concrete example: a user issues a PPL query that goes through
analytics-engine and dispatches to DataFusion.

  1. Coordinator receives the request.
     → Admission control checks node.native_memory.limit budget.
     → If OK, proceeds; otherwise rejects with 429.

  2. AnalyticsSearchService creates a per-fragment Arrow allocator:
        allocator = getPoolAllocator(POOL_QUERY).newChildAllocator("frag-N", 0, Long.MAX_VALUE)
     → Future BufferAllocator.buffer(...) calls on this allocator increment
       POOL_QUERY's counter and the root counter via Arrow's parent-cap chain.
     → Bounded by parquet.native.pool.query.max.

  3. AnalyticsSearchService dispatches to DataFusion via NativeBridge.
     → NativeBridge.executeQueryAsync(...) marshals plan bytes via FFM.

  4. DataFusion runs the query in Rust:
     → HashAggregate builds a hash table.
     → reservation.try_grow(50MB) → DataFusion MemoryPool counter += 50MB.
     → Bounded by datafusion.memory_pool_limit_bytes.
     → If exceeded, HashAggregate spills to disk (bounded by
       datafusion.spill_memory_limit_bytes).

  5. DataFusion produces a result batch in Rust.
     → Java imports it via Arrow C Data Interface.
     → The import allocates Java ArrowBufs under the per-fragment allocator
       from step 2. POOL_QUERY counter += result_size.

  6. Result returns to coordinator.
     → Per-fragment allocator closes; POOL_QUERY counter decrements.
     → DataFusion query completes; MemoryPool counter decrements.

Each layer accounts for what it owns. No double-counting. Each operator-tunable
knob bounds a real, observable thing.

## Verification

  ./gradlew -Dsandbox.enabled=true \\
    :plugins:arrow-base:test :plugins:arrow-flight-rpc:test \\
    :sandbox:plugins:{analytics-engine,analytics-backend-datafusion,parquet-data-format}:test \\
    :server:test --tests "org.opensearch.node.resource.tracker.*" \\
    :plugins:arrow-flight-rpc:internalClusterTest --tests "*NativeAllocatorBoundaryIT*"
  → BUILD SUCCESSFUL

By submitting this pull request, I confirm that my contribution is made under
the terms of the Apache 2.0 license.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from c428701 to e2e869d Compare May 20, 2026 10:10
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e2e869d

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 7abcfce

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 7abcfce: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 20, 2026

Codecov Report

❌ Patch coverage is 78.75000% with 34 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.41%. Comparing base (ca22376) to head (d4df93c).

Files with missing lines Patch % Lines
...rg/opensearch/arrow/allocator/ArrowBasePlugin.java 86.84% 10 Missing ⚠️
...src/main/java/org/opensearch/node/NodeService.java 0.00% 7 Missing and 1 partial ⚠️
...rch/action/admin/cluster/node/stats/NodeStats.java 82.05% 5 Missing and 2 partials ⚠️
...ensearch/arrow/allocator/ArrowNativeAllocator.java 75.00% 1 Missing and 3 partials ⚠️
...rch/arrow/flight/transport/FlightStreamPlugin.java 0.00% 2 Missing ⚠️
...e/stream/TransportNativeArrowStreamDataAction.java 0.00% 2 Missing ⚠️
...node/resource/tracker/ResourceTrackerSettings.java 87.50% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21732      +/-   ##
============================================
- Coverage     73.44%   73.41%   -0.04%     
- Complexity    75084    75104      +20     
============================================
  Files          6016     6015       -1     
  Lines        341072   341170      +98     
  Branches      49091    49107      +16     
============================================
- Hits         250513   250461      -52     
- Misses        70641    70779     +138     
- Partials      19918    19930      +12     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

gaurav-amz added a commit to gaurav-amz/OpenSearch that referenced this pull request May 20, 2026
… plugin nodeStats — review-feedback edition

Builds on opensearch-project#21703 to add the query, datafusion with allocator, dynamic-tuning, stats,
and pool-wiring pieces. This commit is the consolidated version after addressing
review feedback on PR opensearch-project#21732.

## Architecture: three native-memory trackers, three knobs

Each tracker owns the bytes it can actually see, with a process-level cap above all
of them. On a 64 GB / 16 GB-heap node the defaults compose to:

  RAM                                            64 GB
  JVM heap (operator-configured -Xmx)            16 GB
  Off-heap (RAM - heap)                          48 GB
  node.native_memory.limit  (79% of off-heap)    37.92 GB     ← AC throttle threshold
    native.allocator.root.limit (20% of NM)       7.58 GB     ← Arrow framework cap
      pool.flight.max  (5% of NM)                 1.90 GB     ← Flight transport pool
      pool.ingest.max  (8% of NM)                 3.03 GB     ← Parquet VSR pool
      pool.query.max   (5% of NM)                 1.90 GB     ← analytics-engine pool
    datafusion.memory_pool_limit (75% of NM)     28.44 GB     ← Rust runtime pool (sibling to Arrow)
  Unmanaged                                      10.08 GB     ← Lucene mmap, OS page cache, etc.

  Independent (disk budget):
    datafusion.spill_memory_limit_bytes (50% of physical RAM)  32 GB

## What this commit does

### 1. Dynamic settings + cluster-update consumers

All native-allocator settings are Setting.Property.Dynamic. Cluster-settings update
consumers in ArrowBasePlugin#registerSettingsUpdateConsumers wire PUTs to live
ArrowNativeAllocator state. The grouped validator (validateUpdate) rejects
cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time
with HTTP 400.

### 2. Query and DataFusion pools

POOL_QUERY added for analytics-engine per-query allocators (children of
nativeAllocator.getPoolAllocator(POOL_QUERY)). DataFusion gets its own setting
(datafusion.memory_pool_limit_bytes) since DataFusion's working memory lives
entirely on the Rust side and is reported only through DataFusion's MemoryPool
— a Java-side pool that pretended to track it would either need a per-allocation
FFM round-trip (performance disaster) or be a config-only mirror (HTTP 200 with
no observable effect).

POOL_DATAFUSION is intentionally absent; the Rust-side
datafusion.memory_pool_limit_bytes is the honest knob for that layer.

### 3. Plugin#nodeStats SPI

New SPI method Plugin#nodeStats() returning List<PluginNodeStats>. NodeStats
serializes plugin-contributed payloads via NamedWriteable, surfacing them under
_nodes/stats. Annotated @experimentalapi to match PluginNodeStats.

### 4. Pool wiring through Guice (no static singleton)

ArrowNativeAllocator is constructed in ArrowBasePlugin#createComponents and
returned to Node.java which auto-binds it via Guice. Consumers
(AnalyticsSearchService, FlightTransport, DefaultPlanExecutor) receive it through
constructor @Inject — no static instance() / INSTANCE / ensureForTesting.

The createComponents body is extracted into a package-private buildAllocator
helper so unit tests can exercise the full pool/consumer wiring without the
heavyweight ClusterService fixture.

### 5. Drop NativeAllocatorListener SPI [@bowenlan-amzn #3269655712, #3269660170,
   @Bukhtawar #3267186318]

The original design used NativeAllocatorListener to push pool-limit changes into
consumer-side child allocators that captured the pool's limit at construction
time. With children created at Long.MAX_VALUE, Arrow's Accountant.allocate (lines
191-203 of Accountant.java in arrow-java v18.3.0) walks the parent chain on every
allocation and checks the parent's allocationLimit — so dynamic resizes of
parquet.native.pool.{flight,query}.max take effect immediately on subsequent
allocations through any descendant. The listener was emulating this Arrow-native
behavior and is no longer needed.

  - AnalyticsSearchService: drop poolListener; child uses Long.MAX_VALUE
  - DefaultPlanExecutor: drop poolListener; coordinator uses Long.MAX_VALUE
  - FlightTransport: drop intermediate flightAllocator + poolListener; pool is
    the parent, server/client are Long.MAX_VALUE children
  - libs/arrow-spi/.../NativeAllocatorListener.java: deleted
  - NativeAllocator interface: addListener / removeListener removed
  - ArrowNativeAllocator: listeners field, fireListeners method, and listener
    calls in setPoolLimit/setPoolMin/rebalance removed

The rebalancer continues to function as before: it changes pool limits via
setLimit, which children read via Arrow's parent-cap check at allocateBytes.
Rebalancer is off by default.

### 6. Fix Guice duplicate-binding regression on ArrowNativeAllocator

Removed the redundant ArrowBasePlugin#createGuiceModules override. Node.java
line 1748 already binds every component returned from createComponents:

    pluginComponents.stream()
        .forEach(p -> b.bind((Class) p.getClass()).toInstance(p));

ArrowBasePlugin#createComponents already returns the allocator, so the explicit
bind(ArrowNativeAllocator.class).toInstance(allocator) in createGuiceModules
registered the same binding a second time and caused Guice CreationException at
cluster startup. This was the root cause of the gradle-check Jenkins CI failures
on commits 9188043, 498b502, d646dd4, e1f3c5b, 041dc0e — every integration test
failed during MockNode construction.

### 7. @experimentalapi on Plugin#nodeStats() [@bowenlan-amzn #3269563833]

Annotate the new SPI hook to match the @experimentalapi annotation already
present on PluginNodeStats (the type the method returns).

### 8. FQN -> import nit in ParquetIndexingEngine [@bowenlan-amzn #3269589758]

Replace 3 fully-qualified
org.opensearch.arrow.allocator.ArrowNativeAllocator references in constructor
parameters with a single import.

### 9. Memory defaults aligned to the partitioning model

  - node.native_memory.limit defaults to 79% × (RAM - heap) via OsProbe (cgroup-
    aware: container memory limit on K8s/Docker, total physical RAM on bare
    metal). Falls back to ZERO if probe fails or heap >= RAM, preserving the
    pre-default opt-in semantics. The OsProbe-reading entry point delegates to
    a pure overload deriveNativeMemoryLimitDefault(long, long) so unit tests
    cover the fallback branches deterministically.

  - native.allocator.root.limit defaults to 20% × node.native_memory.limit.

  - parquet.native.pool.{flight,ingest,query}.max default to 5%/8%/5% of
    node.native_memory.limit (sum 18% < root's 20%, leaving 2 pp headroom inside
    the framework cap).

  - datafusion.memory_pool_limit_bytes defaults to 75% × node.native_memory.limit
    (called out by @bharath-techie #3271093086: prior default
    Runtime.maxMemory() / 4 was JVM-heap derived, wrong baseline for an off-heap
    runtime).

  - datafusion.spill_memory_limit_bytes defaults to 50% × physical RAM,
    independent of node.native_memory.limit (spill is a disk-staging budget,
    not a memory budget).

Behavior change to call out in upgrade notes: admission control is now active
by default. Operators wanting pre-existing opt-out behavior can set
node.native_memory.limit: 0b — all framework caps then fall back to
Long.MAX_VALUE unbounded mode.

### 10. Translate Arrow OutOfMemoryException to OpenSearchRejectedExecutionException
   at per-request boundaries [reviewer feedback]

Added an AllocationRejection helper in plugins/arrow-base that wraps Arrow OOM
with OpenSearch's standard backpressure signal so the REST layer maps these
failures to HTTP 429 (Too Many Requests) instead of a generic 500. Wraps applied
at the per-request allocation boundaries:

  - DefaultPlanExecutor.executeInternal: per-query child allocator creation
  - VSRPool.tryRotate: VSR rotation during ingest

Lifetime/startup allocations (FlightTransport.doStart, AnalyticsSearchService
constructor, VSRPool.initializeActiveVSR) are intentionally left unwrapped — a
failure there is a framework misconfiguration, not a per-request backpressure
signal.

### 11. Integration tests for cap-enforcement boundaries

Added NativeAllocatorBoundaryIT under
plugins/arrow-flight-rpc/src/internalClusterTest. Three tests boot a single-node
cluster with tight memory settings (root=16 MiB, pool maxes=16 MiB) and exercise
real Arrow allocations via the Guice-injected ArrowNativeAllocator:

  - testPoolMaxRejectsAllocationsBeyondCap: PUT pool.query.max=4 MiB, verify a
    8 MiB request through the pool throws OutOfMemoryException.
  - testRootLimitRejectsAllocationsBeyondCap: hold 8 MiB in FLIGHT, verify a
    16 MiB QUERY request fails at the root level (8+16 > 16 root cap) even
    though QUERY's own max would individually allow it.
  - testDynamicPoolResizeAffectsInFlightAllocations: create child at
    Long.MAX_VALUE (the AnalyticsSearchService / DefaultPlanExecutor pattern
    post-listener-removal), verify a 2 MiB allocation succeeds, PUT
    query.max=1 MiB via cluster settings, verify a 2 MiB request now throws
    OOM via Arrow's parent-cap check at allocateBytes.

These cover the boundaries the framework is supposed to enforce. The third test
specifically verifies the listener-replacement contract.

### 12. Unit tests for codecov patch-coverage gates

Added unit tests covering paths flagged by codecov as uncovered diff lines:

  - ArrowBasePluginTests:
      * extended testPoolMaxRejectsNegative to cover INGEST_MAX and QUERY_MAX
        rejection branches.
      * testBuildAllocatorWiresAllPoolsAndSettingsConsumers exercises the full
        createComponents code path via the extracted buildAllocator helper:
        all three pools registered, pool maxes match operator-set values,
        cluster-settings update consumers wired.
  - NodeStatsTests:
      * testPluginStatsDropsAllEntriesWhenReceiverHasNoRegistry covers the
        registry-null branch in NodeStats.readPluginStats (defensive path
        when the parent stream has no NamedWriteableRegistry attached).
      * testPluginNodeStatsDefaultReturnsEmpty covers the Plugin#nodeStats()
        default empty-list return.
  - NodeResourceUsageTrackerTests:
      * testDeriveNativeMemoryLimitDefaultFallbackPaths covers the
        ResourceTrackerSettings.deriveNativeMemoryLimitDefault fallback
        branches (probe-returns-0, heap >= ram) plus the happy path.
  - AllocationRejectionTests:
      * 4 tests covering success pass-through, OOM translation, non-OOM
        propagation, and the Runnable overload of AllocationRejection.wrap.

## How a query flows through these layers

Take a concrete example: a user issues a PPL query that goes through
analytics-engine and dispatches to DataFusion.

  1. Coordinator receives the request.
     → Admission control checks node.native_memory.limit budget.
     → If OK, proceeds; otherwise rejects with 429.

  2. AnalyticsSearchService creates a per-fragment Arrow allocator:
        allocator = getPoolAllocator(POOL_QUERY).newChildAllocator("frag-N", 0, Long.MAX_VALUE)
     → Future BufferAllocator.buffer(...) calls on this allocator increment
       POOL_QUERY's counter and the root counter via Arrow's parent-cap chain.
     → Bounded by parquet.native.pool.query.max.

  3. AnalyticsSearchService dispatches to DataFusion via NativeBridge.
     → NativeBridge.executeQueryAsync(...) marshals plan bytes via FFM.

  4. DataFusion runs the query in Rust:
     → HashAggregate builds a hash table.
     → reservation.try_grow(50MB) → DataFusion MemoryPool counter += 50MB.
     → Bounded by datafusion.memory_pool_limit_bytes.
     → If exceeded, HashAggregate spills to disk
       (which itself is bounded by datafusion.spill_memory_limit_bytes).

  5. DataFusion produces a result batch in Rust.
     → Java imports it via Arrow C Data Interface.
     → The import allocates Java ArrowBufs under the per-fragment allocator
       from step 2. POOL_QUERY counter += result_size.

  6. Result returns to coordinator.
     → Per-fragment allocator closes; POOL_QUERY counter decrements.
     → DataFusion query completes; MemoryPool counter decrements.

Each layer accounts for what it owns. No double-counting. Each operator-tunable
knob bounds a real, observable thing.

## Verification

  ./gradlew -Dsandbox.enabled=true \
    :plugins:arrow-base:test :plugins:arrow-flight-rpc:test \
    :sandbox:plugins:{analytics-engine,analytics-backend-datafusion,parquet-data-format}:test \
    :server:test --tests "org.opensearch.node.resource.tracker.*" \
    :server:test --tests "org.opensearch.action.admin.cluster.node.stats.NodeStatsTests" \
    :plugins:arrow-flight-rpc:internalClusterTest --tests "*NativeAllocatorBoundaryIT*" \
    spotlessJavaCheck
  → BUILD SUCCESSFUL

By submitting this pull request, I confirm that my contribution is made under
the terms of the Apache 2.0 license.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from 7abcfce to 6d3436c Compare May 20, 2026 13:59
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6d3436c

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f4cbdd2

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for f4cbdd2: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

…Stats

Builds on opensearch-project#21703 to add the pieces a real production deployment needs:

* Make pool min/max settings dynamic and grouped-validate cross-setting
  invariants on every cluster-state update (sum of pool mins <= root,
  per-pool min <= max). FLIGHT_MIN/INGEST_MIN defaults are 0L (the prior
  Long.MAX_VALUE defaults caused the new validator to reject any non-MAX
  root). setPoolMin now updates the live BufferAllocator.setLimit so the
  Dynamic property has observable effect even when the rebalancer is off.
* Derive ROOT_LIMIT_SETTING default from the AC node-native-memory budget
  (limit minus buffer-percent) so the framework respects the same budget
  AC throttles on, with a Long.MAX_VALUE fallback when AC is unconfigured.
* Add QUERY pool alongside FLIGHT and INGEST. Pools init at min when the
  rebalancer is enabled; otherwise at max so non-rebalanced nodes can
  still allocate.
* Wire arrow-flight-rpc to the FLIGHT pool, parquet-data-format to the
  INGEST pool, and analytics-engine to the QUERY pool via the framework's
  allocator service. Hard-fail if the framework plugin is missing —
  silently skipping the wire-up is the silent-misconfiguration class of
  bug we want to prevent.
* Cleanup ad-hoc allocator fallbacks in parquet-data-format / analytics-engine
  so all Arrow consumers go through the unified pool hierarchy.
* Rebalancer now distributes headroom across all pools (not only those with
  current allocation > 0). Avoids the dead-pool corner case where a pool
  with min=0 starts at limit=0, can never make a first allocation, and
  never receives a bonus.

DataFusion runtime memory accounting stays separate. The Rust-side
DataFusion MemoryPool is governed by datafusion.memory_pool_limit_bytes
(unchanged from before), which is the right knob: DataFusion's internal
sort/hash/group-by working memory is allocated by Rust, not through the
Arrow Java BufferAllocator hierarchy, so a Java-side pool would resize a
ceiling no allocator routes through. The framework's QUERY pool covers
the cross-plugin Arrow allocations the analytics-engine plumbing makes,
which is what we want bounded centrally.

Plugin _nodes/stats integration:
* Add Plugin#nodeStats() hook + PluginNodeStats interface in server.
* Wire NodeStats to carry Map<String, PluginNodeStats> with version-gated
  ser/deser at V_3_7_0 and top-level rendering under nodes.<id>.<name>.
* Each entry is wire-framed as (name, length-prefixed bytes); the receiver
  wraps the inner payload with NamedWriteableAwareStreamInput and drops
  entries whose subtype is not registered locally. This makes mixed-version
  rolling upgrades safe — a coordinator that lacks the plugin a data node
  is running keeps decoding the rest of NodeStats instead of failing the
  whole response.
* NativeAllocatorPluginStats adapter wraps NativeAllocatorPoolStats so the
  framework contributes to _nodes/stats. The dedicated _native_allocator/stats
  REST endpoint is gone — one observability surface, not two.
* Plugin stats are emitted on every _nodes/stats request regardless of the
  ?metric= filter; matches RemoteStoreNodeStats precedent.

DataFusion spill memory limit:
* Promote datafusion.spill_memory_limit_bytes to Setting.Property.Dynamic.
* Wire addSettingsUpdateConsumer that branches on
  NativeBridge.isSpillLimitDynamic(): mirrors live when df_set_spill_limit
  is exported, otherwise warns and waits for next node restart.

API hygiene:
* parquet.max_per_vsr_allocation_ratio is a divisor (limit/N), not a ratio.
  Renamed to parquet.max_per_vsr_allocation_divisor with a hard upper
  bound of 100 to reject fat-finger PUTs that would starve every VSR.

Real regressions caught during self-audit:
* Existing arrow-flight-rpc internalClusterTests and the sandbox coordinator
  ITs were not declaring the framework plugin in nodePlugins(). After the
  flight transport got wired to the FLIGHT pool, those ITs fail at node
  startup with "ArrowNativeAllocator not initialized". Each IT now installs
  the framework plugin and lists it as an extendedPlugin.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
…gistry injection

Removes ArrowNativeAllocator.instance(), the static INSTANCE field, and ensureForTesting(). Binds ArrowNativeAllocator into Guice from ArrowBasePlugin#createGuiceModules; consumer plugins (FlightStreamPlugin, AnalyticsPlugin, ParquetDataFormatPlugin) fetch it from PluginComponentRegistry and pass it through to FlightTransport, AnalyticsSearchService, ArrowBufferPool, and ParquetIndexingEngine via constructor parameters.

Tests construct the allocator directly with the standard pools instead of relying on the test-only ensureForTesting() helper.

Addresses bowen's review comment to align with the ArrowAllocatorService provisioning pattern (returned from createComponents, bound via Guice).

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
…ocatorService

Bowen's review feedback: every Arrow consumer should source its allocator from one of the framework's named pools (FLIGHT/INGEST/QUERY) via ArrowNativeAllocator#getPoolAllocator. The old ArrowAllocatorService SPI let callers create root-siblings outside any pool, which left their allocations invisible in _nodes/stats.native_allocator.pools.* and created a footgun for cross-plugin handoffs.

Migrations:
* DefaultPlanExecutor.coordinatorAllocator -> child of POOL_QUERY (coordinator-side bytes now count against parquet.native.pool.query.max alongside the data-node-side per-fragment allocators)
* TransportNativeArrowStreamDataAction (stream-transport-example) -> POOL_FLIGHT
* NativeArrowTransportIT.TransportTestArrowAction -> POOL_FLIGHT

Deletions:
* ArrowAllocatorService interface and DefaultArrowAllocatorService impl
* Guice binding bind(ArrowAllocatorService.class) in ArrowBasePlugin
* Dead allocatorService constructor parameter on FlightTransport and AnalyticsSearchService
* Now-empty org.opensearch.arrow.memory package (package-info.java)

ArrowBatchResponse Javadoc updated to point readers at the named-pool API.

Behavior change: on a node serving as both coordinator and data node for an analytics query, both sides of the query now count against parquet.native.pool.query.max. Operators upgrading should review the existing budget against total per-node analytics workload.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
Self-review surfaced six items worth fixing before pushing:

* DefaultPlanExecutor now subscribes to the QUERY pool listener and updates coordinatorAllocator on dynamic resize, matching the pattern used by AnalyticsSearchService and FlightTransport. Without this, runtime grows of parquet.native.pool.query.max never reached the coordinator-side allocator.
* DefaultPlanExecutor.coordinatorAllocator is now sized at queryPool.getLimit() instead of Long.MAX_VALUE so the literal matches the actual cap.
* Drop the dead try/catch around removeListener in FlightTransport.stopInternal and AnalyticsSearchService.close. Those catches were holdovers from the singleton era and can't fire now that nativeAllocator is constructor-injected.
* Fix the joined-import line in AnalyticsSearchService that two earlier patches concatenated.
* Expand ArrowBatchResponse Javadoc with a pool-selection decision tree (FLIGHT for transport, INGEST for ingest path, QUERY for query execution) so readers know which pool to pick.
* Update misleading Javadoc in TransportNativeArrowStreamDataAction and DefaultPlanExecutor that referenced non-existent close paths. The pre-existing leak is preserved (out of scope for this PR) but documented honestly with a path forward.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
… plugin nodeStats — review-feedback edition

Builds on opensearch-project#21703 to add the query, datafusion with allocator, dynamic-tuning, stats,
and pool-wiring pieces. This commit is the consolidated version after addressing
review feedback on PR opensearch-project#21732.

## Architecture: three native-memory trackers, three knobs

Each tracker owns the bytes it can actually see, with a process-level cap above all
of them. On a 64 GB / 16 GB-heap node the defaults compose to:

  RAM                                            64 GB
  JVM heap (operator-configured -Xmx)            16 GB
  Off-heap (RAM - heap)                          48 GB
  node.native_memory.limit  (79% of off-heap)    37.92 GB     ← AC throttle threshold
    native.allocator.root.limit (20% of NM)       7.58 GB     ← Arrow framework cap
      pool.flight.max  (5% of NM)                 1.90 GB     ← Flight transport pool
      pool.ingest.max  (8% of NM)                 3.03 GB     ← Parquet VSR pool
      pool.query.max   (5% of NM)                 1.90 GB     ← analytics-engine pool
    datafusion.memory_pool_limit (75% of NM)     28.44 GB     ← Rust runtime pool (sibling to Arrow)
  Unmanaged                                      10.08 GB     ← Lucene mmap, OS page cache, etc.

  Independent (disk budget):
    datafusion.spill_memory_limit_bytes (50% of physical RAM)  32 GB

## What this commit does

### 1. Dynamic settings + cluster-update consumers

All native-allocator settings are Setting.Property.Dynamic. Cluster-settings update
consumers in ArrowBasePlugin#registerSettingsUpdateConsumers wire PUTs to live
ArrowNativeAllocator state. The grouped validator (validateUpdate) rejects
cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time
with HTTP 400.

### 2. Query and DataFusion pools

POOL_QUERY added for analytics-engine per-query allocators (children of
nativeAllocator.getPoolAllocator(POOL_QUERY)). DataFusion gets its own setting
(datafusion.memory_pool_limit_bytes) since DataFusion's working memory lives
entirely on the Rust side and is reported only through DataFusion's MemoryPool
— a Java-side pool that pretended to track it would either need a per-allocation
FFM round-trip (performance disaster) or be a config-only mirror (HTTP 200 with
no observable effect).

POOL_DATAFUSION is intentionally absent; the Rust-side
datafusion.memory_pool_limit_bytes is the honest knob for that layer.

### 3. Plugin#nodeStats SPI

New SPI method Plugin#nodeStats() returning List<PluginNodeStats>. NodeStats
serializes plugin-contributed payloads via NamedWriteable, surfacing them under
_nodes/stats. Annotated @experimentalapi to match PluginNodeStats.

### 4. Pool wiring through Guice (no static singleton)

ArrowNativeAllocator is constructed in ArrowBasePlugin#createComponents and
returned to Node.java which auto-binds it via Guice. Consumers
(AnalyticsSearchService, FlightTransport, DefaultPlanExecutor) receive it through
constructor @Inject — no static instance() / INSTANCE / ensureForTesting.

The createComponents body is extracted into a package-private buildAllocator
helper so unit tests can exercise the full pool/consumer wiring without the
heavyweight ClusterService fixture.

### 5. Drop NativeAllocatorListener SPI [@bowenlan-amzn #3269655712, #3269660170,
   @Bukhtawar #3267186318]

The original design used NativeAllocatorListener to push pool-limit changes into
consumer-side child allocators that captured the pool's limit at construction
time. With children created at Long.MAX_VALUE, Arrow's Accountant.allocate (lines
191-203 of Accountant.java in arrow-java v18.3.0) walks the parent chain on every
allocation and checks the parent's allocationLimit — so dynamic resizes of
parquet.native.pool.{flight,query}.max take effect immediately on subsequent
allocations through any descendant. The listener was emulating this Arrow-native
behavior and is no longer needed.

  - AnalyticsSearchService: drop poolListener; child uses Long.MAX_VALUE
  - DefaultPlanExecutor: drop poolListener; coordinator uses Long.MAX_VALUE
  - FlightTransport: drop intermediate flightAllocator + poolListener; pool is
    the parent, server/client are Long.MAX_VALUE children
  - libs/arrow-spi/.../NativeAllocatorListener.java: deleted
  - NativeAllocator interface: addListener / removeListener removed
  - ArrowNativeAllocator: listeners field, fireListeners method, and listener
    calls in setPoolLimit/setPoolMin/rebalance removed

The rebalancer continues to function as before: it changes pool limits via
setLimit, which children read via Arrow's parent-cap check at allocateBytes.
Rebalancer is off by default.

### 6. Fix Guice duplicate-binding regression on ArrowNativeAllocator

Removed the redundant ArrowBasePlugin#createGuiceModules override. Node.java
line 1748 already binds every component returned from createComponents:

    pluginComponents.stream()
        .forEach(p -> b.bind((Class) p.getClass()).toInstance(p));

ArrowBasePlugin#createComponents already returns the allocator, so the explicit
bind(ArrowNativeAllocator.class).toInstance(allocator) in createGuiceModules
registered the same binding a second time and caused Guice CreationException at
cluster startup. This was the root cause of the gradle-check Jenkins CI failures
on commits 9188043, 498b502, d646dd4, e1f3c5b, 041dc0e — every integration test
failed during MockNode construction.

### 7. @experimentalapi on Plugin#nodeStats() [@bowenlan-amzn #3269563833]

Annotate the new SPI hook to match the @experimentalapi annotation already
present on PluginNodeStats (the type the method returns).

### 8. FQN -> import nit in ParquetIndexingEngine [@bowenlan-amzn #3269589758]

Replace 3 fully-qualified
org.opensearch.arrow.allocator.ArrowNativeAllocator references in constructor
parameters with a single import.

### 9. Memory defaults aligned to the partitioning model

  - node.native_memory.limit defaults to 79% × (RAM - heap) via OsProbe (cgroup-
    aware: container memory limit on K8s/Docker, total physical RAM on bare
    metal). Falls back to ZERO if probe fails or heap >= RAM, preserving the
    pre-default opt-in semantics. The OsProbe-reading entry point delegates to
    a pure overload deriveNativeMemoryLimitDefault(long, long) so unit tests
    cover the fallback branches deterministically.

  - native.allocator.root.limit defaults to 20% × node.native_memory.limit.

  - parquet.native.pool.{flight,ingest,query}.max default to 5%/8%/5% of
    node.native_memory.limit (sum 18% < root's 20%, leaving 2 pp headroom inside
    the framework cap).

  - datafusion.memory_pool_limit_bytes defaults to 75% × node.native_memory.limit
    (called out by @bharath-techie #3271093086: prior default
    Runtime.maxMemory() / 4 was JVM-heap derived, wrong baseline for an off-heap
    runtime).

  - datafusion.spill_memory_limit_bytes defaults to 50% × physical RAM,
    independent of node.native_memory.limit (spill is a disk-staging budget,
    not a memory budget).

Behavior change to call out in upgrade notes: admission control is now active
by default. Operators wanting pre-existing opt-out behavior can set
node.native_memory.limit: 0b — all framework caps then fall back to
Long.MAX_VALUE unbounded mode.

### 10. Translate Arrow OutOfMemoryException to OpenSearchRejectedExecutionException
   at per-request boundaries [reviewer feedback]

Added an AllocationRejection helper in plugins/arrow-base that wraps Arrow OOM
with OpenSearch's standard backpressure signal so the REST layer maps these
failures to HTTP 429 (Too Many Requests) instead of a generic 500. Wraps applied
at the per-request allocation boundaries:

  - DefaultPlanExecutor.executeInternal: per-query child allocator creation
  - VSRPool.tryRotate: VSR rotation during ingest

Lifetime/startup allocations (FlightTransport.doStart, AnalyticsSearchService
constructor, VSRPool.initializeActiveVSR) are intentionally left unwrapped — a
failure there is a framework misconfiguration, not a per-request backpressure
signal.

### 11. Integration tests for cap-enforcement boundaries

Added NativeAllocatorBoundaryIT under
plugins/arrow-flight-rpc/src/internalClusterTest. Three tests boot a single-node
cluster with tight memory settings (root=16 MiB, pool maxes=16 MiB) and exercise
real Arrow allocations via the Guice-injected ArrowNativeAllocator:

  - testPoolMaxRejectsAllocationsBeyondCap: PUT pool.query.max=4 MiB, verify a
    8 MiB request through the pool throws OutOfMemoryException.
  - testRootLimitRejectsAllocationsBeyondCap: hold 8 MiB in FLIGHT, verify a
    16 MiB QUERY request fails at the root level (8+16 > 16 root cap) even
    though QUERY's own max would individually allow it.
  - testDynamicPoolResizeAffectsInFlightAllocations: create child at
    Long.MAX_VALUE (the AnalyticsSearchService / DefaultPlanExecutor pattern
    post-listener-removal), verify a 2 MiB allocation succeeds, PUT
    query.max=1 MiB via cluster settings, verify a 2 MiB request now throws
    OOM via Arrow's parent-cap check at allocateBytes.

These cover the boundaries the framework is supposed to enforce. The third test
specifically verifies the listener-replacement contract.

### 12. Unit tests for codecov patch-coverage gates

Added unit tests covering paths flagged by codecov as uncovered diff lines:

  - ArrowBasePluginTests:
      * extended testPoolMaxRejectsNegative to cover INGEST_MAX and QUERY_MAX
        rejection branches.
      * testBuildAllocatorWiresAllPoolsAndSettingsConsumers exercises the full
        createComponents code path via the extracted buildAllocator helper:
        all three pools registered, pool maxes match operator-set values,
        cluster-settings update consumers wired.
  - NodeStatsTests:
      * testPluginStatsDropsAllEntriesWhenReceiverHasNoRegistry covers the
        registry-null branch in NodeStats.readPluginStats (defensive path
        when the parent stream has no NamedWriteableRegistry attached).
      * testPluginNodeStatsDefaultReturnsEmpty covers the Plugin#nodeStats()
        default empty-list return.
  - NodeResourceUsageTrackerTests:
      * testDeriveNativeMemoryLimitDefaultFallbackPaths covers the
        ResourceTrackerSettings.deriveNativeMemoryLimitDefault fallback
        branches (probe-returns-0, heap >= ram) plus the happy path.
  - AllocationRejectionTests:
      * 4 tests covering success pass-through, OOM translation, non-OOM
        propagation, and the Runnable overload of AllocationRejection.wrap.

## How a query flows through these layers

Take a concrete example: a user issues a PPL query that goes through
analytics-engine and dispatches to DataFusion.

  1. Coordinator receives the request.
     → Admission control checks node.native_memory.limit budget.
     → If OK, proceeds; otherwise rejects with 429.

  2. AnalyticsSearchService creates a per-fragment Arrow allocator:
        allocator = getPoolAllocator(POOL_QUERY).newChildAllocator("frag-N", 0, Long.MAX_VALUE)
     → Future BufferAllocator.buffer(...) calls on this allocator increment
       POOL_QUERY's counter and the root counter via Arrow's parent-cap chain.
     → Bounded by parquet.native.pool.query.max.

  3. AnalyticsSearchService dispatches to DataFusion via NativeBridge.
     → NativeBridge.executeQueryAsync(...) marshals plan bytes via FFM.

  4. DataFusion runs the query in Rust:
     → HashAggregate builds a hash table.
     → reservation.try_grow(50MB) → DataFusion MemoryPool counter += 50MB.
     → Bounded by datafusion.memory_pool_limit_bytes.
     → If exceeded, HashAggregate spills to disk
       (which itself is bounded by datafusion.spill_memory_limit_bytes).

  5. DataFusion produces a result batch in Rust.
     → Java imports it via Arrow C Data Interface.
     → The import allocates Java ArrowBufs under the per-fragment allocator
       from step 2. POOL_QUERY counter += result_size.

  6. Result returns to coordinator.
     → Per-fragment allocator closes; POOL_QUERY counter decrements.
     → DataFusion query completes; MemoryPool counter decrements.

Each layer accounts for what it owns. No double-counting. Each operator-tunable
knob bounds a real, observable thing.

## Verification

  ./gradlew -Dsandbox.enabled=true \
    :plugins:arrow-base:test :plugins:arrow-flight-rpc:test \
    :sandbox:plugins:{analytics-engine,analytics-backend-datafusion,parquet-data-format}:test \
    :server:test --tests "org.opensearch.node.resource.tracker.*" \
    :server:test --tests "org.opensearch.action.admin.cluster.node.stats.NodeStatsTests" \
    :plugins:arrow-flight-rpc:internalClusterTest --tests "*NativeAllocatorBoundaryIT*" \
    spotlessJavaCheck
  → BUILD SUCCESSFUL

By submitting this pull request, I confirm that my contribution is made under
the terms of the Apache 2.0 license.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
…load

The Runnable overload of AllocationRejection.wrap was missing @param tags
for context and body, failing the missingJavadoc gradle-check on CI:

  AllocationRejection.java:75: error: AllocationRejection.wrap (method):
    missing javadoc @param for parameter 'context'

Mirror the @param/@throws structure already present on the Supplier
overload above.

Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a63141a

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3ab722f

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 3ab722f: SUCCESS

@gaurav-amz
Copy link
Copy Markdown
Contributor Author

We might need to reject the request, throw OpenSearchRejectedException rather than oom-ing. Also lets try to see how we can wire up circuit-breaker for this. Maybe circuit-breaker stats is something we leverage for tracking memory used

OpenSearchRejectedException handling has been addressed in this PR. Circuit breaker-related changes will be included in a follow-up PR. Stats-related refactoring will be needed once Ajay’s PR is merged.

Comment on lines +118 to +125
public static final Setting<Integer> MAX_PER_VSR_ALLOCATION_DIVISOR = Setting.intSetting(
"parquet.max_per_vsr_allocation_divisor",
10,
1,
100,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be function of cores?

Comment on lines +279 to +300
private static Map<String, PluginNodeStats> readPluginStats(StreamInput in) throws IOException {
int size = in.readVInt();
if (size == 0) {
return Collections.emptyMap();
}
NamedWriteableRegistry registry = in.namedWriteableRegistry();
Map<String, PluginNodeStats> result = new HashMap<>(size);
for (int i = 0; i < size; i++) {
String name = in.readString();
BytesReference payload = in.readBytesReference();
if (registry == null) {
// No registry attached to the parent stream — we can't deserialize any
// entry. Drop the whole map. This branch is defensive; production paths
// always set a registry when they expect named writeables.
continue;
}
try (
StreamInput rawIn = payload.streamInput();
NamedWriteableAwareStreamInput payloadIn = new NamedWriteableAwareStreamInput(rawIn, registry)
) {
payloadIn.setVersion(in.getVersion());
PluginNodeStats stats = payloadIn.readNamedWriteable(PluginNodeStats.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will _node/stats?metrics=<metric_name> work?

…lowup

# Conflicts:
#	sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java
#	server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java
#	server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java
#	server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java
#	server/src/main/java/org/opensearch/node/NodeService.java
#	test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java
@gaurav-amz gaurav-amz force-pushed the unified-allocator-followup branch from 3ab722f to d4df93c Compare May 21, 2026 05:13
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit d4df93c

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for d4df93c: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants