Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions airbyte_cdk/utils/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ def _format_bytes(num_bytes: int) -> str:
# a container past any gate directly into kernel OOM-kill. Firing the fail-
# fast trace well before the cliff is what makes the failure visible to the
# platform instead of appearing as a silent exit.
_CRITICAL_THRESHOLD = 0.95
_ANON_SHARE_OF_USAGE_THRESHOLD = 0.85
# WARNING: These values are deliberately aggressive for the test branch only.
# DO NOT MERGE. See PR description for the paired production values.
_CRITICAL_THRESHOLD = 0.80
_ANON_SHARE_OF_USAGE_THRESHOLD = 0.75

# Check interval (every N messages) β€” tightens after crossing high-pressure threshold
_DEFAULT_CHECK_INTERVAL = 5000
_HIGH_PRESSURE_CHECK_INTERVAL = 100
_HIGH_PRESSURE_THRESHOLD = 0.90
_DEFAULT_CHECK_INTERVAL = 100
_HIGH_PRESSURE_CHECK_INTERVAL = 5
_HIGH_PRESSURE_THRESHOLD = 0.50


def _read_cgroup_v2_anon_bytes() -> Optional[int]:
Expand Down Expand Up @@ -114,22 +116,22 @@ class MemoryMonitor:

**Logging (event-based, not periodic):**

- One INFO when high-pressure mode activates (usage first crosses 90%)
- One INFO/WARNING when critical threshold (95%) is crossed but we do
- One INFO when high-pressure mode activates (usage first crosses 50%)
- One INFO/WARNING when critical threshold (80%) is crossed but we do
*not* raise (either anon share is below the fail-fast gate or the
anonymous memory signal is unavailable)
- No repeated per-check warnings β€” logging is driven by state
transitions, not periodic sampling

**High-pressure polling:** Once cgroup usage first crosses 90%, the check
**High-pressure polling:** Once cgroup usage first crosses 50%, the check
interval permanently tightens from the configured ``check_interval``
(default 5000) to 100 messages to narrow the race window near OOM.
(default 100) to 5 messages to narrow the race window near OOM.

**Fail-fast:** Raises ``AirbyteTracedException`` with
``FailureType.system_error`` when *both*:

1. Cgroup usage >= 95% of the container limit (container is near OOM-kill)
2. Anonymous memory >= 85% of *current cgroup usage* (most of the charged
1. Cgroup usage >= 80% of the container limit (container is near OOM-kill)
2. Anonymous memory >= 75% of *current cgroup usage* (most of the charged
memory is process-private anonymous pages, not file-backed cache)

The anonymous memory signal is read from cgroup v2 ``memory.stat`` (``anon``
Expand Down Expand Up @@ -243,8 +245,8 @@ def check_memory_usage(self) -> None:

Intended to be called on every message. The monitor internally tracks
a message counter and only reads cgroup files every ``check_interval``
messages (default 5000). Once usage crosses 90%, the interval tightens
to 100 messages for the remainder of the sync regardless of the
messages (default 100). Once usage crosses 50%, the interval tightens
to 5 messages for the remainder of the sync regardless of the
configured ``check_interval``.

Logging is event-based (one-shot on state transitions), not periodic.
Expand Down
50 changes: 25 additions & 25 deletions unit_tests/utils/test_memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@
)
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

_MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB
_MOCK_USAGE_AT_85 = "850000000\n" # 85% of 1 GB (below 90% high-pressure threshold)
_MOCK_USAGE_AT_92 = "920000000\n" # 92% of 1 GB (above 90% high-pressure, below 95% critical)
_MOCK_USAGE_AT_96 = "960000000\n" # 96% of 1 GB (above 95% critical threshold)
_MOCK_USAGE_BELOW = "400000000\n" # 40% of 1 GB (below 50% high-pressure threshold)
_MOCK_USAGE_AT_40 = "400000000\n" # 40% of 1 GB (below 50% high-pressure threshold)
_MOCK_USAGE_AT_60 = "600000000\n" # 60% of 1 GB (above 50% high-pressure, below 80% critical)
_MOCK_USAGE_AT_96 = "960000000\n" # 96% of 1 GB (above 80% critical threshold)
_MOCK_LIMIT = "1000000000\n" # 1 GB

# cgroup v2 memory.stat mock content.
# The "anon" field is what we parse for the cgroup-level anonymous memory signal.
_MOCK_MEMORY_STAT_ANON_HIGH = (
"anon 840000000\n" # 840 MB β€” 87.5% of 960 MB usage (above 85% threshold)
"anon 840000000\n" # 840 MB β€” 87.5% of 960 MB usage (above 75% threshold)
"file 100000000\n"
"kernel 20000000\n"
)
_MOCK_MEMORY_STAT_ANON_LOW = (
"anon 300000000\n" # 300 MB β€” 31.25% of 960 MB usage (below 85% threshold)
"anon 300000000\n" # 300 MB β€” 31.25% of 960 MB usage (below 75% threshold)
"file 630000000\n"
"kernel 30000000\n"
)
Expand All @@ -47,8 +47,8 @@
)

# /proc/self/status mock values (fallback when cgroup v2 memory.stat is unavailable).
_MOCK_PROC_ANON_HIGH = "RssAnon:\t 820313 kB\n" # ~840 MB β€” 87.5% of 960 MB usage
_MOCK_PROC_ANON_LOW = "RssAnon:\t 300000 kB\n" # ~307 MB β€” 32.0% of 960 MB usage
_MOCK_PROC_ANON_HIGH = "RssAnon:\t 820313 kB\n" # ~840 MB β€” 87.5% of 960 MB usage (above 75%)
_MOCK_PROC_ANON_LOW = "RssAnon:\t 300000 kB\n" # ~307 MB β€” 32.0% of 960 MB usage (below 75%)


def _v2_exists(self: Path) -> bool:
Expand Down Expand Up @@ -96,11 +96,11 @@ def test_init_logs_configured_thresholds(caplog: pytest.LogCaptureFixture) -> No
init_logs = [r for r in caplog.records if "MemoryMonitor instantiated" in r.message]
assert len(init_logs) == 1
msg = init_logs[0].message
assert "critical threshold: 95%" in msg
assert "anon share of usage threshold: 85%" in msg
assert "high-pressure threshold: 90%" in msg
assert "critical threshold: 80%" in msg
assert "anon share of usage threshold: 75%" in msg
assert "high-pressure threshold: 50%" in msg
assert "check interval: 1234 messages" in msg
assert "tightens to 100 under high pressure" in msg
assert "tightens to 5 under high pressure" in msg


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -178,13 +178,13 @@ def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None:


def test_no_log_below_threshold(caplog: pytest.LogCaptureFixture) -> None:
"""No log should be emitted when usage is below the 90% high-pressure threshold."""
"""No log should be emitted when usage is below the 50% high-pressure threshold."""
monitor = MemoryMonitor(check_interval=1)
caplog.clear()
with (
caplog.at_level(logging.DEBUG, logger="airbyte"),
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_85)),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_40)),
):
monitor.check_memory_usage()
# Only the debug probe message, no info/warning
Expand All @@ -197,11 +197,11 @@ def test_no_log_below_threshold(caplog: pytest.LogCaptureFixture) -> None:


def test_cgroup_v1_activates_high_pressure_mode(caplog: pytest.LogCaptureFixture) -> None:
"""Memory reading works with cgroup v1 paths and activates high-pressure mode at 90%."""
"""Memory reading works with cgroup v1 paths and activates high-pressure mode at 50%."""

def mock_read_text(self: Path) -> str:
if self == _CGROUP_V1_USAGE:
return _MOCK_USAGE_AT_92
return _MOCK_USAGE_AT_60
if self == _CGROUP_V1_LIMIT:
return _MOCK_LIMIT
return ""
Expand Down Expand Up @@ -231,7 +231,7 @@ def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixtur
with (
caplog.at_level(logging.INFO, logger="airbyte"),
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_92)),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_60)),
):
# First 4999 calls should be skipped
for _ in range(4999):
Expand Down Expand Up @@ -379,7 +379,7 @@ def mock_read_text(self: Path) -> str:


def test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold() -> None:
"""Fail-fast raises when cgroup >= 95% and anon >= 85% of current usage."""
"""Fail-fast raises when cgroup >= 80% and anon >= 75% of current usage."""
monitor = MemoryMonitor(check_interval=1)
with (
patch.object(Path, "exists", _v2_exists),
Expand All @@ -404,7 +404,7 @@ def test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold() -
def test_no_raise_when_cgroup_critical_but_anon_share_of_usage_below_threshold(
caplog: pytest.LogCaptureFixture,
) -> None:
"""No exception when cgroup >= 95% but anon < 85% of usage; logs once then silences."""
"""No exception when cgroup >= 80% but anon < 75% of usage; logs once then silences."""
monitor = MemoryMonitor(check_interval=1)
with (
caplog.at_level(logging.INFO, logger="airbyte"),
Expand Down Expand Up @@ -501,17 +501,17 @@ def mock_read_text(self: Path) -> str:
assert monitor._critical_logged


def test_switches_to_high_pressure_check_interval_after_crossing_90_percent(
def test_switches_to_high_pressure_check_interval_after_crossing_hp_threshold(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Once usage crosses 90%, the monitor tightens polling from 5000 to 100 messages."""
"""Once usage crosses 50%, the monitor tightens polling from 5000 to 5 messages."""
monitor = MemoryMonitor(check_interval=5000)
assert not monitor._high_pressure_mode

with (
caplog.at_level(logging.INFO, logger="airbyte"),
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_92)),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_60)),
):
# Pump 5000 messages to trigger the first real check
for _ in range(5000):
Expand All @@ -521,12 +521,12 @@ def test_switches_to_high_pressure_check_interval_after_crossing_90_percent(
info_records = [r for r in caplog.records if r.levelno == logging.INFO]
assert any("tightening check interval" in r.message for r in info_records)

# After switching to high-pressure mode, checks happen every 100 messages.
# Verify by pumping 100 messages at critical usage with high anon β€” should raise.
# After switching to high-pressure mode, checks happen every 5 messages.
# Verify by pumping 5 messages at critical usage with high anon β€” should raise.
with (
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_full_mock()),
):
with pytest.raises(AirbyteTracedException):
for _ in range(100):
for _ in range(5):
monitor.check_memory_usage()
Loading