Skip to content

Commit 572f732

Browse files
author
Pradeep L
committed
changing source from meminfo to status
Signed-off-by: Pradeep L <spradeel@amazon.com>
1 parent 5139724 commit 572f732

6 files changed

Lines changed: 255 additions & 117 deletions

File tree

server/src/main/java/org/opensearch/monitor/os/OsProbe.java

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -237,63 +237,79 @@ String readProcLoadavg() throws IOException {
237237
return readSingleLine(PathUtils.get("/proc/loadavg"));
238238
}
239239

240-
public short getSystemCpuPercent() {
241-
return Probes.getLoadAndScaleToPercent(getSystemCpuLoad, osMxBean);
242-
}
243-
244240
/**
245-
* Reads a file containing a single line.
241+
* Reads the {@code RssAnon} field (anonymous resident memory) of the current process from
242+
* {@code /proc/self/status} and returns it in bytes. Returns {@code -1L} when the host is
243+
* not Linux, when {@code /proc/self/status} cannot be read, or when the {@code RssAnon:}
244+
* line is missing or malformed. Failure paths are logged at debug level; callers that want
245+
* user-visible warnings must wrap this method themselves.
246246
*
247-
* @param path path to the file to read
248-
* @return the single line
249-
* @throws IOException if an I/O exception occurs reading the file
247+
* @return the {@code RssAnon} value in bytes, or {@code -1L} if unavailable
250248
*/
251-
private String readSingleLine(final Path path) throws IOException {
252-
final List<String> lines = Files.readAllLines(path);
253-
assert lines.size() == 1 : String.join("\n", lines);
254-
return lines.get(0);
255-
}
256-
257-
/**
258-
* Returns the available memory in bytes on Linux by reading {@code MemAvailable} from {@code /proc/meminfo}.
259-
* Available memory is a better estimate than free memory as it accounts for reclaimable page cache and slab memory.
260-
* Returns -1 if not on Linux or if the value cannot be read.
261-
*/
262-
public long getAvailableMemorySize() {
263-
if (!Constants.LINUX) {
264-
return -1;
249+
public long getProcessRssAnon() {
250+
if (Constants.LINUX == false) {
251+
return -1L;
265252
}
266253
try {
267-
return readAvailableMemoryFromProcMeminfo();
268-
} catch (Exception e) {
269-
logger.warn("error reading available memory from /proc/meminfo", e);
270-
return -1;
254+
return readRssAnonFromProcSelfStatus();
255+
} catch (IOException e) {
256+
logger.debug("failed to read /proc/self/status", e);
257+
return -1L;
271258
}
272259
}
273260

274261
/**
275-
* Reads {@code MemAvailable} from {@code /proc/meminfo}.
262+
* Reads the {@code RssAnon} field from {@code /proc/self/status}. Package-private so tests
263+
* can override it with canned file contents.
276264
*
277-
* @return the available memory in bytes, or -1 if not found
278-
* @throws IOException if an I/O exception occurs reading {@code /proc/meminfo}
265+
* @return the {@code RssAnon} value in bytes, or {@code -1L} when the line is missing or
266+
* the value is unparseable/negative
267+
* @throws IOException if the file cannot be opened or read
279268
*/
280-
@SuppressForbidden(reason = "access /proc/meminfo")
281-
long readAvailableMemoryFromProcMeminfo() throws IOException {
282-
try (BufferedReader reader = Files.newBufferedReader(PathUtils.get("/proc/meminfo"))) {
269+
@SuppressForbidden(reason = "access /proc/self/status")
270+
long readRssAnonFromProcSelfStatus() throws IOException {
271+
try (BufferedReader reader = Files.newBufferedReader(PathUtils.get("/proc/self/status"))) {
283272
String line;
284273
while ((line = reader.readLine()) != null) {
285-
if (line.startsWith("MemAvailable:")) {
286-
final String[] parts = line.split("\\s+");
274+
if (line.startsWith("RssAnon:")) {
275+
// Format: "RssAnon:\t 12345 kB"
276+
String[] parts = line.split("\\s+");
287277
if (parts.length >= 2) {
288-
// Value in /proc/meminfo is in kB
289-
return Long.parseLong(parts[1]) * 1024;
290-
} else {
291-
return -1;
278+
try {
279+
long kb = Long.parseLong(parts[1]);
280+
if (kb < 0L) {
281+
return -1L;
282+
}
283+
return kb * 1024L;
284+
} catch (NumberFormatException nfe) {
285+
logger.debug("malformed RssAnon value in /proc/self/status", nfe);
286+
return -1L;
287+
}
292288
}
289+
logger.debug("RssAnon line has unexpected shape: [{}]", line);
290+
return -1L;
293291
}
294292
}
293+
logger.debug("RssAnon line not found in /proc/self/status");
294+
return -1L;
295295
}
296-
return -1;
296+
}
297+
298+
public short getSystemCpuPercent() {
299+
return Probes.getLoadAndScaleToPercent(getSystemCpuLoad, osMxBean);
300+
}
301+
302+
/**
303+
* Reads a file containing a single line.
304+
*
305+
* @param path path to the file to read
306+
* @return the single line
307+
* @throws IOException if an I/O exception occurs reading the file
308+
*/
309+
private String readSingleLine(final Path path) throws IOException {
310+
final List<String> lines = Files.readAllLines(path);
311+
assert lines.size() == 1 : String.join("\n", lines);
312+
return lines.get(0);
297313
}
298314

299315
// this property is to support a hack to workaround an issue with Docker containers mounting the cgroups hierarchy inconsistently with

server/src/main/java/org/opensearch/node/resource/tracker/AverageNativeMemoryUsageTracker.java

Lines changed: 151 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,52 +10,174 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.settings.Settings;
1314
import org.opensearch.common.unit.TimeValue;
1415
import org.opensearch.monitor.os.OsProbe;
1516
import org.opensearch.threadpool.ThreadPool;
1617

18+
import java.lang.management.ManagementFactory;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.function.LongSupplier;
21+
1722
/**
18-
* AverageNativeMemoryUsageTracker tracks the average native (physical) memory usage on the node
19-
* by polling the OS-level memory stats every (pollingInterval) and keeping track of the rolling
20-
* average over a defined time window (windowDuration).
23+
* AverageNativeMemoryUsageTracker reports this OpenSearch process's off-heap native memory
24+
* utilization as a percentage of the budget configured for the native analytics engines,
25+
* averaged over a rolling window.
26+
*
27+
* <p>On Linux, each polling cycle computes
28+
* {@code usage = max(0, RssAnon - HeapCommitted)} where {@code RssAnon} comes from
29+
* {@link OsProbe#getProcessRssAnon()} and {@code HeapCommitted} comes from the JVM memory MX
30+
* bean. The denominator is a 20% padded sum of the two plugin budgets:
31+
* {@code cap = 1.2 * (datafusion.memory_pool_limit_bytes + resolved(parquet.max_native_allocation))}.
32+
* When either plugin setting is absent, zero, negative, or unparseable, its contribution is 0;
33+
* when both contribute 0 the cap is 0 and {@code getUsage()} returns 0 without dividing.
2134
*
22-
* On Linux, it uses available memory from /proc/meminfo (MemAvailable) which accounts for
23-
* reclaimable page cache and slab memory, giving a more accurate picture of actual memory
24-
* pressure. On other platforms, it falls back to free physical memory.
35+
* <p>Activation is already gated to Linux in {@link NodeResourceUsageTracker}; on non-Linux
36+
* platforms the polling loop is not started and {@link OsProbe#getProcessRssAnon()} returns
37+
* {@code -1} without touching the filesystem.
2538
*/
2639
public class AverageNativeMemoryUsageTracker extends AbstractAverageUsageTracker {
2740

2841
private static final Logger LOGGER = LogManager.getLogger(AverageNativeMemoryUsageTracker.class);
2942

30-
public AverageNativeMemoryUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) {
43+
static final String DATAFUSION_MEMORY_POOL_LIMIT_KEY = "datafusion.memory_pool_limit_bytes";
44+
static final String PARQUET_MAX_NATIVE_ALLOCATION_KEY = "parquet.max_native_allocation";
45+
static final double HEADROOM_FACTOR = 1.2;
46+
47+
private final LongSupplier rssAnonSupplier;
48+
private final LongSupplier heapCommittedSupplier;
49+
private final LongSupplier nativeMemoryCapSupplier;
50+
51+
/**
52+
* Production constructor. Wires the RSS reader to {@link OsProbe#getProcessRssAnon()}, the
53+
* heap-committed supplier to the JVM memory MX bean, and the native-memory cap supplier to
54+
* {@link #computeNativeMemoryCap(Settings)} so dynamic updates to
55+
* {@code datafusion.memory_pool_limit_bytes} are observed on the next polling cycle.
56+
*/
57+
public AverageNativeMemoryUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration, Settings settings) {
3158
super(threadPool, pollingInterval, windowDuration);
59+
this.rssAnonSupplier = () -> OsProbe.getInstance().getProcessRssAnon();
60+
this.heapCommittedSupplier = () -> ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getCommitted();
61+
this.nativeMemoryCapSupplier = () -> computeNativeMemoryCap(settings);
3262
}
3363

3464
/**
35-
* Get current native memory usage percentage using OS-level physical memory stats.
36-
* Prefers available memory (MemAvailable) over free memory (MemFree) as it provides
37-
* a more accurate measure of actual memory pressure on the node.
65+
* Package-private test constructor. Accepts three {@link LongSupplier}s in place of the
66+
* production defaults so tests can drive {@link #getUsage()} deterministically without
67+
* reading {@code /proc/self/status}, the JVM memory MX bean, or plugin settings.
3868
*/
69+
AverageNativeMemoryUsageTracker(
70+
ThreadPool threadPool,
71+
TimeValue pollingInterval,
72+
TimeValue windowDuration,
73+
LongSupplier rssAnonSupplier,
74+
LongSupplier heapCommittedSupplier,
75+
LongSupplier nativeMemoryCapSupplier
76+
) {
77+
super(threadPool, pollingInterval, windowDuration);
78+
this.rssAnonSupplier = rssAnonSupplier;
79+
this.heapCommittedSupplier = heapCommittedSupplier;
80+
this.nativeMemoryCapSupplier = nativeMemoryCapSupplier;
81+
}
82+
3983
@Override
4084
public long getUsage() {
41-
OsProbe osProbe = OsProbe.getInstance();
42-
long totalMemory = osProbe.getTotalPhysicalMemorySize();
43-
if (totalMemory <= 0) {
44-
LOGGER.warn("Unable to retrieve total physical memory size");
45-
return 0;
46-
}
47-
long availableMemory = osProbe.getAvailableMemorySize();
48-
long unusedMemory;
49-
if (availableMemory >= 0) {
50-
// Use available memory (includes reclaimable cache) for a more accurate picture
51-
unusedMemory = availableMemory;
52-
} else {
53-
LOGGER.warn("unable to retrieve available memory");
54-
return 0;
55-
}
56-
long usedMemory = totalMemory - unusedMemory;
57-
long usage = usedMemory * 100 / totalMemory;
58-
LOGGER.debug("Recording native memory usage: {}%", usage);
59-
return usage;
85+
86+
long rssAnon = rssAnonSupplier.getAsLong();
87+
if (rssAnon < 0L) {
88+
LOGGER.debug("Native memory poll skipped: RssAnon unavailable from /proc/self/status");
89+
return 0L;
90+
}
91+
92+
long heapCommitted = heapCommittedSupplier.getAsLong();
93+
long usage = Math.max(0L, rssAnon - heapCommitted);
94+
95+
long cap = nativeMemoryCapSupplier.getAsLong();
96+
if (cap <= 0L) {
97+
LOGGER.debug("Native memory poll: rssAnon={} heapCommitted={} usage={} cap=0 -> 0%", rssAnon, heapCommitted, usage);
98+
return 0L;
99+
}
100+
101+
long percent = (usage * 100L) / cap;
102+
if (percent > 100L) {
103+
percent = 100L;
104+
}
105+
if (percent < 0L) {
106+
percent = 0L;
107+
}
108+
109+
LOGGER.debug("Native memory poll: rssAnon={} heapCommitted={} usage={} cap={} pct={}", rssAnon, heapCommitted, usage, cap, percent);
110+
return percent;
111+
}
112+
113+
114+
/**
115+
* Computes {@code Non_Heap_Base = max(0, totalPhysicalMemory - Runtime.maxMemory())}, the
116+
* reference value against which {@code parquet.max_native_allocation} percentages resolve.
117+
*/
118+
long computeNonHeapBase() {
119+
return Math.max(0L, OsProbe.getInstance().getTotalPhysicalMemorySize() - Runtime.getRuntime().maxMemory());
120+
}
121+
122+
/**
123+
* Resolves the DataFusion native-pool contribution. Absent, zero, negative, or unparseable
124+
* values all collapse to {@code 0L}.
125+
*/
126+
long resolveDataFusionContribution(Settings settings) {
127+
try {
128+
long value = settings.getAsLong(DATAFUSION_MEMORY_POOL_LIMIT_KEY, 0L);
129+
return value > 0L ? value : 0L;
130+
} catch (IllegalArgumentException unparseable) {
131+
return 0L;
132+
}
133+
}
134+
135+
/**
136+
* Resolves the Parquet native-allocation contribution from the percentage-string setting.
137+
* Absent, empty, missing trailing {@code %}, unparseable, NaN, or non-positive values all
138+
* collapse to {@code 0L}. Percentages above 100 are defensively clamped to 100.
139+
*/
140+
long resolveParquetContribution(Settings settings, long nonHeapBase) {
141+
if (nonHeapBase <= 0L) {
142+
return 0L;
143+
}
144+
String raw = settings.get(PARQUET_MAX_NATIVE_ALLOCATION_KEY);
145+
if (raw == null) {
146+
return 0L;
147+
}
148+
String trimmed = raw.trim();
149+
if (trimmed.isEmpty() || trimmed.endsWith("%") == false) {
150+
return 0L;
151+
}
152+
String pctStr = trimmed.substring(0, trimmed.length() - 1).trim();
153+
double pct;
154+
try {
155+
pct = Double.parseDouble(pctStr);
156+
} catch (NumberFormatException nfe) {
157+
return 0L;
158+
}
159+
if (Double.isNaN(pct) || pct <= 0.0) {
160+
return 0L;
161+
}
162+
if (pct > 100.0) {
163+
pct = 100.0;
164+
}
165+
return (long) Math.floor(nonHeapBase * pct / 100.0);
166+
}
167+
168+
/**
169+
* Resolves both plugin settings to byte values and returns the 20% headroom-padded sum.
170+
* Returns {@code 0L} when neither plugin setting contributes a positive value, so
171+
* {@link #getUsage()} can short-circuit without dividing.
172+
*/
173+
long computeNativeMemoryCap(Settings settings) {
174+
long df = resolveDataFusionContribution(settings);
175+
long base = computeNonHeapBase();
176+
long pq = resolveParquetContribution(settings, base);
177+
long sum = df + pq;
178+
if (sum <= 0L) {
179+
return 0L;
180+
}
181+
return (long) Math.floor(HEADROOM_FACTOR * (double) sum);
60182
}
61183
}

server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
public class NodeResourceUsageTracker extends AbstractLifecycleComponent {
2424
private ThreadPool threadPool;
2525
private final ClusterSettings clusterSettings;
26+
private final Settings settings;
2627
private AverageCpuUsageTracker cpuUsageTracker;
2728
private AverageMemoryUsageTracker memoryUsageTracker;
2829
private AverageIoUsageTracker ioUsageTracker;
@@ -35,6 +36,7 @@ public class NodeResourceUsageTracker extends AbstractLifecycleComponent {
3536
public NodeResourceUsageTracker(FsService fsService, ThreadPool threadPool, Settings settings, ClusterSettings clusterSettings) {
3637
this.fsService = fsService;
3738
this.threadPool = threadPool;
39+
this.settings = settings;
3840
this.clusterSettings = clusterSettings;
3941
this.resourceTrackerSettings = new ResourceTrackerSettings(settings);
4042
initialize();
@@ -125,7 +127,8 @@ void initialize() {
125127
nativeMemoryUsageTracker = new AverageNativeMemoryUsageTracker(
126128
threadPool,
127129
resourceTrackerSettings.getNativeMemoryPollingInterval(),
128-
resourceTrackerSettings.getNativeMemoryWindowDuration()
130+
resourceTrackerSettings.getNativeMemoryWindowDuration(),
131+
settings
129132
);
130133
if (Constants.LINUX) {
131134
clusterSettings.addSettingsUpdateConsumer(

server/src/test/java/org/opensearch/monitor/os/OsProbeTests.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -310,39 +310,6 @@ List<String> readSysFsCgroupCpuAcctCpuStat(String controlGroup) throws IOExcepti
310310
verify(logger, never()).warn(anyString());
311311
}
312312

313-
public void testGetAvailableMemorySizeLinux() {
314-
assumeTrue("test runs on Linux only", Constants.LINUX);
315-
316-
final OsProbe probe = new OsProbe() {
317-
@Override
318-
long readAvailableMemoryFromProcMeminfo() {
319-
return 8388608L; // 8 MB in bytes
320-
}
321-
};
322-
323-
assertThat(probe.getAvailableMemorySize(), equalTo(8388608L));
324-
}
325-
326-
public void testGetAvailableMemorySizeNonLinux() {
327-
assumeFalse("test does not run on Linux", Constants.LINUX);
328-
329-
final OsProbe probe = new OsProbe();
330-
assertThat(probe.getAvailableMemorySize(), equalTo(-1L));
331-
}
332-
333-
public void testGetAvailableMemorySizeWhenReadThrowsException() {
334-
assumeTrue("test runs on Linux only", Constants.LINUX);
335-
336-
final OsProbe probe = new OsProbe() {
337-
@Override
338-
long readAvailableMemoryFromProcMeminfo() throws IOException {
339-
throw new IOException("simulated /proc/meminfo read failure");
340-
}
341-
};
342-
343-
assertThat(probe.getAvailableMemorySize(), equalTo(-1L));
344-
}
345-
346313
private static List<String> getProcSelfGroupLines(String hierarchy) {
347314
return Arrays.asList(
348315
"10:freezer:/",

0 commit comments

Comments
 (0)