Skip to content

Commit 31be325

Browse files
authored
Merge branch 'devin/1774478445-memory-failfast' into devin/1774888310-memory-failfast-feature-2
2 parents 4970505 + 3c7c755 commit 31be325

2 files changed

Lines changed: 264 additions & 73 deletions

File tree

airbyte_cdk/utils/memory_monitor.py

Lines changed: 99 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# cgroup v2 paths
1717
_CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current")
1818
_CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max")
19+
_CGROUP_V2_STAT = Path("/sys/fs/cgroup/memory.stat")
1920

2021
# cgroup v1 paths — TODO: remove if all deployments are confirmed cgroup v2
2122
_CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
@@ -29,14 +30,34 @@
2930

3031
# Raise AirbyteTracedException when BOTH conditions are met:
3132
# 1. cgroup usage >= critical threshold
32-
# 2. process anonymous RSS (RssAnon) >= anon threshold of the container limit
33-
# This dual-condition avoids false positives from reclaimable kernel page cache
34-
# and file-backed / shared resident pages that inflate VmRSS.
33+
# 2. anonymous memory >= anon-share threshold of *current cgroup usage*
34+
# Comparing anon to usage (not limit) answers the more relevant question:
35+
# "is most of the near-OOM memory actually process-owned anonymous memory?"
3536
_CRITICAL_THRESHOLD = 0.98
36-
_ANON_RSS_THRESHOLD = 0.90
37+
_ANON_SHARE_OF_USAGE_THRESHOLD = 0.85
3738

38-
# Check interval (every N messages)
39+
# Check interval (every N messages) — tightens after crossing high-pressure threshold
3940
_DEFAULT_CHECK_INTERVAL = 5000
41+
_HIGH_PRESSURE_CHECK_INTERVAL = 100
42+
_HIGH_PRESSURE_THRESHOLD = 0.95
43+
44+
45+
def _read_cgroup_v2_anon_bytes() -> Optional[int]:
46+
"""Read cgroup-level anonymous memory from ``/sys/fs/cgroup/memory.stat``.
47+
48+
The ``anon`` field in ``memory.stat`` accounts for all anonymous pages
49+
charged to the cgroup, which is a more accurate view of process-private
50+
memory pressure than per-process ``RssAnon`` in multi-process containers.
51+
52+
Returns anonymous bytes, or ``None`` if unavailable or malformed.
53+
"""
54+
try:
55+
for line in _CGROUP_V2_STAT.read_text().splitlines():
56+
if line.startswith("anon "):
57+
return int(line.split()[1])
58+
except (OSError, ValueError):
59+
return None
60+
return None
4061

4162

4263
def _read_process_anon_rss_bytes() -> Optional[int]:
@@ -73,19 +94,27 @@ class MemoryMonitor:
7394
**Logging:** Logs a WARNING on every check interval (default 5000 messages)
7495
when cgroup memory usage is at or above 95% of the container limit.
7596
97+
**High-pressure polling:** Once cgroup usage first crosses 95%, the check
98+
interval permanently tightens from 5000 to 100 messages to narrow the race
99+
window near OOM.
100+
76101
**Fail-fast:** Raises ``AirbyteTracedException`` with
77102
``FailureType.system_error`` when *both*:
78103
79104
1. Cgroup usage >= 98% of the container limit (container is near OOM-kill)
80-
2. Process anonymous RSS (``RssAnon``) >= 90% of the container limit
81-
(pressure is from process-private anonymous memory, not elastic kernel
82-
page cache or file-backed resident pages)
83-
84-
This dual-condition avoids false positives from SQLite mmap'd pages, shared
85-
memory, or other kernel-reclaimable memory that inflates cgroup usage but
86-
does not represent real process memory pressure. If ``RssAnon`` is not
87-
available, the monitor logs a warning and skips fail-fast rather than
88-
falling back to cgroup-only raising.
105+
2. Anonymous memory >= 85% of *current cgroup usage* (most of the charged
106+
memory is process-private anonymous pages, not file-backed cache)
107+
108+
The anonymous memory signal is read from cgroup v2 ``memory.stat`` (``anon``
109+
field) when available, falling back to ``/proc/self/status`` ``RssAnon``.
110+
Comparing anonymous memory to current usage (not the container limit) answers
111+
the more relevant question: "is most of the near-OOM memory actually
112+
process-owned?" This avoids the brittleness of comparing to the full limit
113+
where anonymous memory can dominate usage yet still fall short of a
114+
limit-based percentage threshold.
115+
116+
If the anonymous memory signal is unavailable, the monitor logs a warning
117+
and skips fail-fast rather than falling back to cgroup-only raising.
89118
"""
90119

91120
def __init__(
@@ -98,6 +127,7 @@ def __init__(
98127
self._message_count = 0
99128
self._cgroup_version: Optional[int] = None
100129
self._probed = False
130+
self._high_pressure_mode = False
101131

102132
def _probe_cgroup(self) -> None:
103133
"""Detect which cgroup version (if any) is available.
@@ -152,20 +182,41 @@ def _read_memory(self) -> Optional[tuple[int, int]]:
152182
logger.debug("Failed to read cgroup memory files; skipping memory check.")
153183
return None
154184

185+
def _read_anon_bytes(self) -> Optional[tuple[int, str]]:
186+
"""Read anonymous memory bytes from the best available source.
187+
188+
Tries cgroup v2 ``memory.stat`` (``anon`` field) first, then falls back
189+
to ``/proc/self/status`` ``RssAnon``. Returns ``(bytes, source_label)``
190+
or ``None`` if neither is available.
191+
"""
192+
if self._cgroup_version == 2:
193+
cgroup_anon = _read_cgroup_v2_anon_bytes()
194+
if cgroup_anon is not None:
195+
return cgroup_anon, "cgroup memory.stat anon"
196+
197+
proc_anon = _read_process_anon_rss_bytes()
198+
if proc_anon is not None:
199+
return proc_anon, "process RssAnon"
200+
201+
return None
202+
155203
def check_memory_usage(self) -> None:
156204
"""Check memory usage; log at 95% and raise at critical dual-condition.
157205
158206
Intended to be called on every message. The monitor internally tracks
159207
a message counter and only reads cgroup files every ``check_interval``
160-
messages (default 5000) to minimise I/O overhead.
208+
messages (default 5000). Once usage crosses 95%, the interval tightens
209+
to 100 messages for the remainder of the sync.
161210
162211
**Logging:** WARNING on every check above 95%.
163212
164-
**Fail-fast:** If cgroup usage >= 98% *and* process anonymous RSS
165-
(``RssAnon``) >= 90% of the container limit, raises
166-
``AirbyteTracedException`` with ``FailureType.system_error`` so the
167-
platform receives a clear error message instead of an opaque OOM-kill.
168-
If ``RssAnon`` is unavailable, logs a warning and skips fail-fast.
213+
**Fail-fast:** If cgroup usage >= 98% *and* anonymous memory >= 85% of
214+
current cgroup usage, raises ``AirbyteTracedException`` with
215+
``FailureType.system_error`` so the platform receives a clear error
216+
message instead of an opaque OOM-kill. Anonymous memory is read from
217+
cgroup v2 ``memory.stat`` when available, falling back to
218+
``/proc/self/status`` ``RssAnon``. If neither is available, logs a
219+
warning and skips fail-fast.
169220
170221
This method is a no-op if cgroup files are unavailable.
171222
"""
@@ -174,7 +225,10 @@ def check_memory_usage(self) -> None:
174225
return
175226

176227
self._message_count += 1
177-
if self._message_count % self._check_interval != 0:
228+
interval = (
229+
_HIGH_PRESSURE_CHECK_INTERVAL if self._high_pressure_mode else self._check_interval
230+
)
231+
if self._message_count % interval != 0:
178232
return
179233

180234
memory_info = self._read_memory()
@@ -187,6 +241,15 @@ def check_memory_usage(self) -> None:
187241
usage_gb = usage_bytes / (1024**3)
188242
limit_gb = limit_bytes / (1024**3)
189243

244+
if usage_ratio >= _HIGH_PRESSURE_THRESHOLD and not self._high_pressure_mode:
245+
self._high_pressure_mode = True
246+
logger.info(
247+
"Memory usage crossed %d%%; tightening check interval from %d to %d messages.",
248+
int(_HIGH_PRESSURE_THRESHOLD * 100),
249+
self._check_interval,
250+
_HIGH_PRESSURE_CHECK_INTERVAL,
251+
)
252+
190253
if usage_ratio >= _MEMORY_THRESHOLD:
191254
logger.warning(
192255
"Source memory usage at %d%% of container limit (%.2f / %.2f GB).",
@@ -197,33 +260,35 @@ def check_memory_usage(self) -> None:
197260

198261
# Fail-fast: dual-condition check
199262
if usage_ratio >= _CRITICAL_THRESHOLD:
200-
anon_rss_bytes = _read_process_anon_rss_bytes()
201-
if anon_rss_bytes is not None:
202-
anon_ratio = anon_rss_bytes / limit_bytes
203-
anon_percent = int(anon_ratio * 100)
204-
if anon_ratio >= _ANON_RSS_THRESHOLD:
263+
anon_info = self._read_anon_bytes()
264+
if anon_info is not None:
265+
anon_bytes, anon_source = anon_info
266+
anon_share = anon_bytes / usage_bytes
267+
anon_share_percent = int(anon_share * 100)
268+
if anon_share >= _ANON_SHARE_OF_USAGE_THRESHOLD:
205269
raise AirbyteTracedException(
206270
message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).",
207271
internal_message=(
208272
f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). "
209-
f"Process anonymous RSS (RssAnon): {anon_rss_bytes} bytes ({anon_percent}% of limit). "
273+
f"Anonymous memory ({anon_source}): {anon_bytes} bytes "
274+
f"({anon_share_percent}% of current cgroup usage). "
210275
f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, "
211-
f"anonymous RSS >= {int(_ANON_RSS_THRESHOLD * 100)}%."
276+
f"anon share of usage >= {int(_ANON_SHARE_OF_USAGE_THRESHOLD * 100)}%."
212277
),
213278
failure_type=FailureType.system_error,
214279
)
215280
else:
216281
logger.info(
217-
"Cgroup usage at %d%% but process anonymous RSS only %d%% of limit; "
218-
"pressure likely from file-backed or reclaimable pages — not raising.",
282+
"Cgroup usage at %d%% but anonymous memory only %d%% of current cgroup usage; "
283+
"pressure likely from file-backed or kernel memory — not raising.",
219284
usage_percent,
220-
anon_percent,
285+
anon_share_percent,
221286
)
222287
else:
223-
# RssAnon unavailable — log and skip rather than cgroup-only raising,
224-
# so the implementation stays truly dual-condition.
288+
# Anonymous memory signal unavailable — log and skip rather than
289+
# cgroup-only raising, so the implementation stays truly dual-condition.
225290
logger.warning(
226-
"Cgroup usage at %d%% but RssAnon unavailable from /proc/self/status; "
291+
"Cgroup usage at %d%% but anonymous memory signal unavailable; "
227292
"skipping fail-fast (cannot confirm anonymous memory pressure).",
228293
usage_percent,
229294
)

0 commit comments

Comments
 (0)