Skip to content

Commit f5738e5

Browse files
fix(memory-monitor): lower fail-fast thresholds to catch OOM before cliff
Co-Authored-By: patrick.nilan@airbyte.io <patrick.nilan@airbyte.io>
1 parent fd553bd commit f5738e5

2 files changed

Lines changed: 39 additions & 33 deletions

File tree

airbyte_cdk/utils/memory_monitor.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,19 @@
3030
# 2. anonymous memory >= anon-share threshold of *current cgroup usage*
3131
# Comparing anon to usage (not limit) answers the more relevant question:
3232
# "is most of the near-OOM memory actually process-owned anonymous memory?"
33-
_CRITICAL_THRESHOLD = 0.98
33+
#
34+
# Thresholds are deliberately set below the OOM cliff to leave headroom for
35+
# the check-interval race window: between two checks, allocations can jump
36+
# a container past any gate directly into kernel OOM-kill. Firing the fail-
37+
# fast trace well before the cliff is what makes the failure visible to the
38+
# platform instead of appearing as a silent exit.
39+
_CRITICAL_THRESHOLD = 0.95
3440
_ANON_SHARE_OF_USAGE_THRESHOLD = 0.85
3541

3642
# Check interval (every N messages) — tightens after crossing high-pressure threshold
3743
_DEFAULT_CHECK_INTERVAL = 5000
3844
_HIGH_PRESSURE_CHECK_INTERVAL = 100
39-
_HIGH_PRESSURE_THRESHOLD = 0.95
45+
_HIGH_PRESSURE_THRESHOLD = 0.90
4046

4147

4248
def _read_cgroup_v2_anon_bytes() -> Optional[int]:
@@ -90,21 +96,21 @@ class MemoryMonitor:
9096
9197
**Logging (event-based, not periodic):**
9298
93-
- One INFO when high-pressure mode activates (usage first crosses 95%)
94-
- One INFO/WARNING when critical threshold (98%) is crossed but we do
99+
- One INFO when high-pressure mode activates (usage first crosses 90%)
100+
- One INFO/WARNING when critical threshold (95%) is crossed but we do
95101
*not* raise (either anon share is below the fail-fast gate or the
96102
anonymous memory signal is unavailable)
97103
- No repeated per-check warnings — logging is driven by state
98104
transitions, not periodic sampling
99105
100-
**High-pressure polling:** Once cgroup usage first crosses 95%, the check
106+
**High-pressure polling:** Once cgroup usage first crosses 90%, the check
101107
interval permanently tightens from 5000 to 100 messages to narrow the race
102108
window near OOM.
103109
104110
**Fail-fast:** Raises ``AirbyteTracedException`` with
105111
``FailureType.system_error`` when *both*:
106112
107-
1. Cgroup usage >= 98% of the container limit (container is near OOM-kill)
113+
1. Cgroup usage >= 95% of the container limit (container is near OOM-kill)
108114
2. Anonymous memory >= 85% of *current cgroup usage* (most of the charged
109115
memory is process-private anonymous pages, not file-backed cache)
110116
@@ -209,7 +215,7 @@ def check_memory_usage(self) -> None:
209215
210216
Intended to be called on every message. The monitor internally tracks
211217
a message counter and only reads cgroup files every ``check_interval``
212-
messages (default 5000). Once usage crosses 95%, the interval tightens
218+
messages (default 5000). Once usage crosses 90%, the interval tightens
213219
to 100 messages for the remainder of the sync.
214220
215221
Logging is event-based (one-shot on state transitions), not periodic.

unit_tests/utils/test_memory_monitor.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,31 @@
2323
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
2424

2525
_MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB
26-
_MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB (below 95% logging threshold)
27-
_MOCK_USAGE_AT_95 = "960000000\n" # 96% of 1 GB (above 95% logging threshold)
28-
_MOCK_USAGE_AT_98 = "980000000\n" # 98% of 1 GB (at critical threshold)
26+
_MOCK_USAGE_AT_85 = "850000000\n" # 85% of 1 GB (below 90% high-pressure threshold)
27+
_MOCK_USAGE_AT_92 = "920000000\n" # 92% of 1 GB (above 90% high-pressure, below 95% critical)
28+
_MOCK_USAGE_AT_96 = "960000000\n" # 96% of 1 GB (above 95% critical threshold)
2929
_MOCK_LIMIT = "1000000000\n" # 1 GB
3030

3131
# cgroup v2 memory.stat mock content.
3232
# The "anon" field is what we parse for the cgroup-level anonymous memory signal.
3333
_MOCK_MEMORY_STAT_ANON_HIGH = (
34-
"anon 860000000\n" # 860 MB — 87.7% of 980 MB usage (above 85% threshold)
34+
"anon 840000000\n" # 840 MB — 87.5% of 960 MB usage (above 85% threshold)
3535
"file 100000000\n"
3636
"kernel 20000000\n"
3737
)
3838
_MOCK_MEMORY_STAT_ANON_LOW = (
39-
"anon 300000000\n" # 300 MB — 30.6% of 980 MB usage (below 85% threshold)
40-
"file 650000000\n"
39+
"anon 300000000\n" # 300 MB — 31.25% of 960 MB usage (below 85% threshold)
40+
"file 630000000\n"
4141
"kernel 30000000\n"
4242
)
4343
_MOCK_MEMORY_STAT_NO_ANON = (
44-
"file 650000000\n" # malformed: missing anon line
44+
"file 630000000\n" # malformed: missing anon line
4545
"kernel 30000000\n"
4646
)
4747

4848
# /proc/self/status mock values (fallback when cgroup v2 memory.stat is unavailable).
49-
_MOCK_PROC_ANON_HIGH = "RssAnon:\t 840000 kB\n" # ~860 MB — 87.7% of 980 MB usage
50-
_MOCK_PROC_ANON_LOW = "RssAnon:\t 300000 kB\n" # ~307 MB — 31.3% of 980 MB usage
49+
_MOCK_PROC_ANON_HIGH = "RssAnon:\t 820313 kB\n" # ~840 MB — 87.5% of 960 MB usage
50+
_MOCK_PROC_ANON_LOW = "RssAnon:\t 300000 kB\n" # ~307 MB — 32.0% of 960 MB usage
5151

5252

5353
def _v2_exists(self: Path) -> bool:
@@ -134,12 +134,12 @@ def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None:
134134

135135

136136
def test_no_log_below_threshold(caplog: pytest.LogCaptureFixture) -> None:
137-
"""No log should be emitted when usage is below 95%."""
137+
"""No log should be emitted when usage is below the 90% high-pressure threshold."""
138138
monitor = MemoryMonitor(check_interval=1)
139139
with (
140140
caplog.at_level(logging.DEBUG, logger="airbyte"),
141141
patch.object(Path, "exists", _v2_exists),
142-
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)),
142+
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_85)),
143143
):
144144
monitor.check_memory_usage()
145145
# Only the debug probe message, no info/warning
@@ -152,11 +152,11 @@ def test_no_log_below_threshold(caplog: pytest.LogCaptureFixture) -> None:
152152

153153

154154
def test_cgroup_v1_activates_high_pressure_mode(caplog: pytest.LogCaptureFixture) -> None:
155-
"""Memory reading works with cgroup v1 paths and activates high-pressure mode at 95%."""
155+
"""Memory reading works with cgroup v1 paths and activates high-pressure mode at 90%."""
156156

157157
def mock_read_text(self: Path) -> str:
158158
if self == _CGROUP_V1_USAGE:
159-
return _MOCK_USAGE_AT_95
159+
return _MOCK_USAGE_AT_92
160160
if self == _CGROUP_V1_LIMIT:
161161
return _MOCK_LIMIT
162162
return ""
@@ -185,7 +185,7 @@ def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixtur
185185
with (
186186
caplog.at_level(logging.INFO, logger="airbyte"),
187187
patch.object(Path, "exists", _v2_exists),
188-
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)),
188+
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_92)),
189189
):
190190
# First 4999 calls should be skipped
191191
for _ in range(4999):
@@ -290,7 +290,7 @@ def test_read_cgroup_v2_anon_bytes_parses_anon_field() -> None:
290290
"""Correctly parses the 'anon' field from cgroup v2 memory.stat."""
291291
with patch.object(Path, "read_text", return_value=_MOCK_MEMORY_STAT_ANON_HIGH):
292292
result = _read_cgroup_v2_anon_bytes()
293-
assert result == 860000000
293+
assert result == 840000000
294294

295295

296296
def test_read_cgroup_v2_anon_bytes_returns_none_when_anon_absent() -> None:
@@ -314,7 +314,7 @@ def raise_oserror(self: Path) -> str:
314314
# ---------------------------------------------------------------------------
315315

316316

317-
def _v2_full_mock(usage: str = _MOCK_USAGE_AT_98, memory_stat: str = _MOCK_MEMORY_STAT_ANON_HIGH):
317+
def _v2_full_mock(usage: str = _MOCK_USAGE_AT_96, memory_stat: str = _MOCK_MEMORY_STAT_ANON_HIGH):
318318
"""Return a mock read_text that serves cgroup v2 current/max AND memory.stat."""
319319

320320
def mock_read_text(self: Path) -> str:
@@ -330,7 +330,7 @@ def mock_read_text(self: Path) -> str:
330330

331331

332332
def test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold() -> None:
333-
"""Fail-fast raises when cgroup >= 98% and anon >= 85% of current usage."""
333+
"""Fail-fast raises when cgroup >= 95% and anon >= 85% of current usage."""
334334
monitor = MemoryMonitor(check_interval=1)
335335
with (
336336
patch.object(Path, "exists", _v2_exists),
@@ -340,14 +340,14 @@ def test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold() -
340340
monitor.check_memory_usage()
341341
assert exc_info.value.failure_type == FailureType.system_error
342342
assert "critical threshold" in (exc_info.value.message or "")
343-
assert "98%" in (exc_info.value.message or "")
343+
assert "96%" in (exc_info.value.message or "")
344344
assert "anon share of usage" in (exc_info.value.internal_message or "")
345345

346346

347347
def test_no_raise_when_cgroup_critical_but_anon_share_of_usage_below_threshold(
348348
caplog: pytest.LogCaptureFixture,
349349
) -> None:
350-
"""No exception when cgroup >= 98% but anon < 85% of usage; logs once then silences."""
350+
"""No exception when cgroup >= 95% but anon < 85% of usage; logs once then silences."""
351351
monitor = MemoryMonitor(check_interval=1)
352352
with (
353353
caplog.at_level(logging.INFO, logger="airbyte"),
@@ -368,7 +368,7 @@ def test_falls_back_to_process_rssanon_when_cgroup_v2_anon_unavailable() -> None
368368

369369
def mock_read_text(self: Path) -> str:
370370
if self == _CGROUP_V2_CURRENT:
371-
return _MOCK_USAGE_AT_98
371+
return _MOCK_USAGE_AT_96
372372
if self == _CGROUP_V2_MAX:
373373
return _MOCK_LIMIT
374374
if self == _CGROUP_V2_STAT:
@@ -394,13 +394,13 @@ def test_falls_back_to_process_rssanon_low_and_does_not_raise(
394394

395395
def mock_read_text(self: Path) -> str:
396396
if self == _CGROUP_V2_CURRENT:
397-
return _MOCK_USAGE_AT_98
397+
return _MOCK_USAGE_AT_96
398398
if self == _CGROUP_V2_MAX:
399399
return _MOCK_LIMIT
400400
if self == _CGROUP_V2_STAT:
401401
return _MOCK_MEMORY_STAT_NO_ANON # anon line missing
402402
if self == _PROC_SELF_STATUS:
403-
return _MOCK_PROC_ANON_LOW # ~307 MB — 31.3% of 980 MB usage (below 85%)
403+
return _MOCK_PROC_ANON_LOW # ~307 MB — 32.0% of 960 MB usage (below 85%)
404404
return ""
405405

406406
monitor = MemoryMonitor(check_interval=1)
@@ -421,7 +421,7 @@ def test_no_raise_when_anonymous_memory_signal_unavailable_at_critical_usage(
421421

422422
def mock_read_text(self: Path) -> str:
423423
if self == _CGROUP_V2_CURRENT:
424-
return _MOCK_USAGE_AT_98
424+
return _MOCK_USAGE_AT_96
425425
if self == _CGROUP_V2_MAX:
426426
return _MOCK_LIMIT
427427
if self == _CGROUP_V2_STAT:
@@ -444,17 +444,17 @@ def mock_read_text(self: Path) -> str:
444444
assert monitor._critical_logged
445445

446446

447-
def test_switches_to_high_pressure_check_interval_after_crossing_95_percent(
447+
def test_switches_to_high_pressure_check_interval_after_crossing_90_percent(
448448
caplog: pytest.LogCaptureFixture,
449449
) -> None:
450-
"""Once usage crosses 95%, the monitor tightens polling from 5000 to 100 messages."""
450+
"""Once usage crosses 90%, the monitor tightens polling from 5000 to 100 messages."""
451451
monitor = MemoryMonitor(check_interval=5000)
452452
assert not monitor._high_pressure_mode
453453

454454
with (
455455
caplog.at_level(logging.INFO, logger="airbyte"),
456456
patch.object(Path, "exists", _v2_exists),
457-
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)),
457+
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_92)),
458458
):
459459
# Pump 5000 messages to trigger the first real check
460460
for _ in range(5000):

0 commit comments

Comments
 (0)