1212import org .apache .logging .log4j .Logger ;
1313import org .opensearch .common .settings .Settings ;
1414import org .opensearch .common .unit .TimeValue ;
15+ import org .opensearch .core .common .unit .ByteSizeValue ;
1516import org .opensearch .monitor .os .OsProbe ;
1617import org .opensearch .threadpool .ThreadPool ;
1718
1819import java .lang .management .ManagementFactory ;
19- import java .util .concurrent .atomic .AtomicBoolean ;
2020import java .util .function .LongSupplier ;
2121
2222/**
2323 * AverageNativeMemoryUsageTracker reports this OpenSearch process's off-heap native memory
24- * utilization as a percentage of the budget configured for the native analytics engines,
25- * averaged over a rolling window.
24+ * utilization as a percentage of a configured node-level budget, averaged over a rolling window.
2625 *
2726 * <p>On Linux, each polling cycle computes
2827 * {@code usage = max(0, RssAnon - HeapCommitted)} where {@code RssAnon} comes from
2928 * {@link OsProbe#getProcessRssAnon()} and {@code HeapCommitted} comes from the JVM memory MX
30- * bean. The denominator is a 20% padded sum of the two plugin budgets:
31- * {@code cap = 1.2 * (datafusion.memory_pool_limit_bytes + resolved(parquet.max_native_allocation))}.
32- * When either plugin setting is absent, zero, negative, or unparseable, its contribution is 0;
33- * when both contribute 0 the cap is 0 and {@code getUsage()} returns 0 without dividing.
29+ * bean. The denominator is the effective native memory budget:
30+ * {@code effective = limit - (limit * bufferPercent / 100)}, resolved per poll from
31+ * {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_LIMIT_SETTING} and
32+ * {@link ResourceTrackerSettings#NODE_NATIVE_MEMORY_BUFFER_PERCENT_SETTING}. When the limit is
33+ * absent or non-positive, or the buffer fully consumes the limit, {@code getUsage()} returns
34+ * {@code 0} without dividing.
3435 *
3536 * <p>Activation is already gated to Linux in {@link NodeResourceUsageTracker}; on non-Linux
3637 * platforms the polling loop is not started and {@link OsProbe#getProcessRssAnon()} returns
@@ -40,144 +41,104 @@ public class AverageNativeMemoryUsageTracker extends AbstractAverageUsageTracker
4041
4142 private static final Logger LOGGER = LogManager .getLogger (AverageNativeMemoryUsageTracker .class );
4243
43- static final String DATAFUSION_MEMORY_POOL_LIMIT_KEY = "datafusion.memory_pool_limit_bytes" ;
44- static final String PARQUET_MAX_NATIVE_ALLOCATION_KEY = "parquet.max_native_allocation" ;
45- static final double HEADROOM_FACTOR = 1.2 ;
46-
4744 private final LongSupplier rssAnonSupplier ;
4845 private final LongSupplier heapCommittedSupplier ;
49- private final LongSupplier nativeMemoryCapSupplier ;
46+ private final LongSupplier effectiveNativeMemorySupplier ;
5047
5148 /**
5249 * Production constructor. Wires the RSS reader to {@link OsProbe#getProcessRssAnon()}, the
53- * heap-committed supplier to the JVM memory MX bean, and the native-memory cap supplier to
54- * {@link #computeNativeMemoryCap (Settings)} so dynamic updates to
55- * {@code datafusion.memory_pool_limit_bytes} are observed on the next polling cycle.
50+ * heap-committed supplier to the JVM memory MX bean, and the effective- native-memory supplier
51+ * to {@link #computeEffectiveNativeMemory (Settings)} so dynamic updates to the two node
52+ * settings are observed on the next polling cycle.
5653 */
5754 public AverageNativeMemoryUsageTracker (ThreadPool threadPool , TimeValue pollingInterval , TimeValue windowDuration , Settings settings ) {
5855 super (threadPool , pollingInterval , windowDuration );
5956 this .rssAnonSupplier = () -> OsProbe .getInstance ().getProcessRssAnon ();
6057 this .heapCommittedSupplier = () -> ManagementFactory .getMemoryMXBean ().getHeapMemoryUsage ().getCommitted ();
61- this .nativeMemoryCapSupplier = () -> computeNativeMemoryCap (settings );
58+ this .effectiveNativeMemorySupplier = () -> computeEffectiveNativeMemory (settings );
6259 }
6360
6461 /**
6562 * Package-private test constructor. Accepts three {@link LongSupplier}s in place of the
6663 * production defaults so tests can drive {@link #getUsage()} deterministically without
67- * reading {@code /proc/self/status}, the JVM memory MX bean, or plugin settings.
64+ * reading {@code /proc/self/status}, the JVM memory MX bean, or node settings. The third
65+ * supplier provides the effective native memory (limit minus buffer) directly.
6866 */
6967 AverageNativeMemoryUsageTracker (
7068 ThreadPool threadPool ,
7169 TimeValue pollingInterval ,
7270 TimeValue windowDuration ,
7371 LongSupplier rssAnonSupplier ,
7472 LongSupplier heapCommittedSupplier ,
75- LongSupplier nativeMemoryCapSupplier
73+ LongSupplier effectiveNativeMemorySupplier
7674 ) {
7775 super (threadPool , pollingInterval , windowDuration );
7876 this .rssAnonSupplier = rssAnonSupplier ;
7977 this .heapCommittedSupplier = heapCommittedSupplier ;
80- this .nativeMemoryCapSupplier = nativeMemoryCapSupplier ;
78+ this .effectiveNativeMemorySupplier = effectiveNativeMemorySupplier ;
8179 }
8280
8381 @ Override
8482 public long getUsage () {
8583
8684 long rssAnon = rssAnonSupplier .getAsLong ();
8785 if (rssAnon < 0L ) {
88- LOGGER .debug ("Native memory poll skipped: RssAnon unavailable from /proc/self/status" );
86+ LOGGER .warn ("Native memory poll skipped: RssAnon unavailable from /proc/self/status" );
8987 return 0L ;
9088 }
9189
9290 long heapCommitted = heapCommittedSupplier .getAsLong ();
93- long usage = Math .max (0L , rssAnon - heapCommitted );
94-
95- long cap = nativeMemoryCapSupplier .getAsLong ();
96- if (cap <= 0L ) {
97- LOGGER .debug ("Native memory poll: rssAnon={} heapCommitted={} usage={} cap=0 -> 0%" , rssAnon , heapCommitted , usage );
91+ long nativeUsed = Math .max (0L , rssAnon - heapCommitted );
92+
93+ long effectiveNativeMemory = effectiveNativeMemorySupplier .getAsLong ();
94+ if (effectiveNativeMemory <= 0L ) {
95+ LOGGER .warn (
96+ "Native memory poll: rssAnon={} heapCommitted={} nativeUsed={} effectiveNativeMemory=0 -> 0%" ,
97+ rssAnon ,
98+ heapCommitted ,
99+ nativeUsed
100+ );
98101 return 0L ;
99102 }
100103
101- long percent = (usage * 100L ) / cap ;
104+ double utilization = (double ) nativeUsed / effectiveNativeMemory * 100.0 ;
105+ long percent = (long ) utilization ;
102106 if (percent > 100L ) {
103107 percent = 100L ;
104108 }
105109 if (percent < 0L ) {
106110 percent = 0L ;
107111 }
108112
109- LOGGER .debug ("Native memory poll: rssAnon={} heapCommitted={} usage={} cap={} pct={}" , rssAnon , heapCommitted , usage , cap , percent );
110- return percent ;
111- }
112-
113-
114- /**
115- * Computes {@code Non_Heap_Base = max(0, totalPhysicalMemory - Runtime.maxMemory())}, the
116- * reference value against which {@code parquet.max_native_allocation} percentages resolve.
117- */
118- long computeNonHeapBase () {
119- return Math .max (0L , OsProbe .getInstance ().getTotalPhysicalMemorySize () - Runtime .getRuntime ().maxMemory ());
120- }
121-
122- /**
123- * Resolves the DataFusion native-pool contribution. Absent, zero, negative, or unparseable
124- * values all collapse to {@code 0L}.
125- */
126- long resolveDataFusionContribution (Settings settings ) {
127- try {
128- long value = settings .getAsLong (DATAFUSION_MEMORY_POOL_LIMIT_KEY , 0L );
129- return value > 0L ? value : 0L ;
130- } catch (IllegalArgumentException unparseable ) {
131- return 0L ;
113+ if (LOGGER .isDebugEnabled ()) {
114+ LOGGER .debug (
115+ "Native memory poll: rssAnon={} heapCommitted={} nativeUsed={} effectiveNativeMemory={} pct={}" ,
116+ rssAnon ,
117+ heapCommitted ,
118+ nativeUsed ,
119+ effectiveNativeMemory ,
120+ percent
121+ );
132122 }
133- }
134123
135- /**
136- * Resolves the Parquet native-allocation contribution from the percentage-string setting.
137- * Absent, empty, missing trailing {@code %}, unparseable, NaN, or non-positive values all
138- * collapse to {@code 0L}. Percentages above 100 are defensively clamped to 100.
139- */
140- long resolveParquetContribution (Settings settings , long nonHeapBase ) {
141- if (nonHeapBase <= 0L ) {
142- return 0L ;
143- }
144- String raw = settings .get (PARQUET_MAX_NATIVE_ALLOCATION_KEY );
145- if (raw == null ) {
146- return 0L ;
147- }
148- String trimmed = raw .trim ();
149- if (trimmed .isEmpty () || trimmed .endsWith ("%" ) == false ) {
150- return 0L ;
151- }
152- String pctStr = trimmed .substring (0 , trimmed .length () - 1 ).trim ();
153- double pct ;
154- try {
155- pct = Double .parseDouble (pctStr );
156- } catch (NumberFormatException nfe ) {
157- return 0L ;
158- }
159- if (Double .isNaN (pct ) || pct <= 0.0 ) {
160- return 0L ;
161- }
162- if (pct > 100.0 ) {
163- pct = 100.0 ;
164- }
165- return (long ) Math .floor (nonHeapBase * pct / 100.0 );
124+ return percent ;
166125 }
167126
168127 /**
169- * Resolves both plugin settings to byte values and returns the 20% headroom-padded sum.
170- * Returns {@code 0L} when neither plugin setting contributes a positive value, so
171- * {@link #getUsage()} can short-circuit without dividing.
128+ * Resolves the configured native-memory limit and buffer percentage from the node settings and
129+ * returns the effective native-memory budget in bytes: {@code limit - (limit * buffer / 100)}.
130+ * Returns {@code 0L} when the limit is absent or non-positive, or when the buffer fully
131+ * consumes the limit, so {@link #getUsage()} can short-circuit without dividing.
172132 */
173- long computeNativeMemoryCap (Settings settings ) {
174- long df = resolveDataFusionContribution (settings );
175- long base = computeNonHeapBase ();
176- long pq = resolveParquetContribution (settings , base );
177- long sum = df + pq ;
178- if (sum <= 0L ) {
133+ long computeEffectiveNativeMemory (Settings settings ) {
134+ ByteSizeValue limitValue = ResourceTrackerSettings .NODE_NATIVE_MEMORY_LIMIT_SETTING .get (settings );
135+ long limit = limitValue .getBytes ();
136+ if (limit <= 0L ) {
179137 return 0L ;
180138 }
181- return (long ) Math .floor (HEADROOM_FACTOR * (double ) sum );
139+ int bufferPercent = ResourceTrackerSettings .NODE_NATIVE_MEMORY_BUFFER_PERCENT_SETTING .get (settings );
140+ long buffer = limit * bufferPercent / 100L ;
141+ long effective = limit - buffer ;
142+ return Math .max (0L , effective );
182143 }
183144}
0 commit comments