diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 5609b1acd..e9f620563 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -25,18 +25,42 @@ # Process-level anonymous RSS from /proc/self/status (Linux only, no extra dependency) _PROC_SELF_STATUS = Path("/proc/self/status") + +def _format_bytes(num_bytes: int) -> str: + """Render a byte count as a short human-readable string with 2 decimals. + + Uses decimal units (GB = 10^9, MB = 10^6, KB = 10^3) so that raw cgroup + byte values render close to the way operators describe container limits + (e.g. a 2_147_483_648-byte limit renders as ``2.15 GB`` rather than the + binary ``2.00 GiB``). Values below 1 KB are rendered as plain bytes. + """ + if num_bytes >= 1_000_000_000: + return f"{num_bytes / 1_000_000_000:.2f} GB" + if num_bytes >= 1_000_000: + return f"{num_bytes / 1_000_000:.2f} MB" + if num_bytes >= 1_000: + return f"{num_bytes / 1_000:.2f} KB" + return f"{num_bytes} B" + + # Raise AirbyteTracedException when BOTH conditions are met: # 1. cgroup usage >= critical threshold # 2. anonymous memory >= anon-share threshold of *current cgroup usage* # Comparing anon to usage (not limit) answers the more relevant question: # "is most of the near-OOM memory actually process-owned anonymous memory?" -_CRITICAL_THRESHOLD = 0.98 +# +# Thresholds are deliberately set below the OOM cliff to leave headroom for +# the check-interval race window: between two checks, allocations can jump +# a container past any gate directly into kernel OOM-kill. Firing the fail- +# fast trace well before the cliff is what makes the failure visible to the +# platform instead of appearing as a silent exit. +_CRITICAL_THRESHOLD = 0.95 _ANON_SHARE_OF_USAGE_THRESHOLD = 0.85 # Check interval (every N messages) — tightens after crossing high-pressure threshold _DEFAULT_CHECK_INTERVAL = 5000 _HIGH_PRESSURE_CHECK_INTERVAL = 100 -_HIGH_PRESSURE_THRESHOLD = 0.95 +_HIGH_PRESSURE_THRESHOLD = 0.90 def _read_cgroup_v2_anon_bytes() -> Optional[int]: @@ -90,21 +114,21 @@ class MemoryMonitor: **Logging (event-based, not periodic):** - - One INFO when high-pressure mode activates (usage first crosses 95%) - - One INFO/WARNING when critical threshold (98%) is crossed but we do + - One INFO when high-pressure mode activates (usage first crosses 90%) + - One INFO/WARNING when critical threshold (95%) is crossed but we do *not* raise (either anon share is below the fail-fast gate or the anonymous memory signal is unavailable) - No repeated per-check warnings — logging is driven by state transitions, not periodic sampling - **High-pressure polling:** Once cgroup usage first crosses 95%, the check - interval permanently tightens from 5000 to 100 messages to narrow the race - window near OOM. + **High-pressure polling:** Once cgroup usage first crosses 90%, the check + interval permanently tightens from the configured ``check_interval`` + (default 5000) to 100 messages to narrow the race window near OOM. **Fail-fast:** Raises ``AirbyteTracedException`` with - ``FailureType.system_error`` when *both*: + ``FailureType.transient_error`` when *both*: - 1. Cgroup usage >= 98% of the container limit (container is near OOM-kill) + 1. Cgroup usage >= 95% of the container limit (container is near OOM-kill) 2. Anonymous memory >= 85% of *current cgroup usage* (most of the charged memory is process-private anonymous pages, not file-backed cache) @@ -132,6 +156,16 @@ def __init__( self._probed = False self._high_pressure_mode = False self._critical_logged = False + logger.info( + "MemoryMonitor instantiated with critical threshold: %d%%, " + "anon share of usage threshold: %d%%, high-pressure threshold: %d%%, " + "check interval: %d messages (tightens to %d under high pressure).", + int(_CRITICAL_THRESHOLD * 100), + int(_ANON_SHARE_OF_USAGE_THRESHOLD * 100), + int(_HIGH_PRESSURE_THRESHOLD * 100), + self._check_interval, + _HIGH_PRESSURE_CHECK_INTERVAL, + ) def _probe_cgroup(self) -> None: """Detect which cgroup version (if any) is available. @@ -209,8 +243,9 @@ def check_memory_usage(self) -> None: Intended to be called on every message. The monitor internally tracks a message counter and only reads cgroup files every ``check_interval`` - messages (default 5000). Once usage crosses 95%, the interval tightens - to 100 messages for the remainder of the sync. + messages (default 5000). Once usage crosses 90%, the interval tightens + to 100 messages for the remainder of the sync regardless of the + configured ``check_interval``. Logging is event-based (one-shot on state transitions), not periodic. @@ -234,8 +269,6 @@ def check_memory_usage(self) -> None: usage_bytes, limit_bytes = memory_info usage_ratio = usage_bytes / limit_bytes usage_percent = int(usage_ratio * 100) - usage_gb = usage_bytes / (1024**3) - limit_gb = limit_bytes / (1024**3) if usage_ratio >= _HIGH_PRESSURE_THRESHOLD and not self._high_pressure_mode: self._high_pressure_mode = True @@ -256,24 +289,29 @@ def check_memory_usage(self) -> None: raise AirbyteTracedException( message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).", internal_message=( - f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). " - f"Anonymous memory ({anon_source}): {anon_bytes} bytes " + f"Cgroup memory: {_format_bytes(usage_bytes)} / " + f"{_format_bytes(limit_bytes)} ({usage_percent}%). " + f"Anonymous memory ({anon_source}): {_format_bytes(anon_bytes)} " f"({int(anon_share * 100)}% of current cgroup usage). " f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, " f"anon share of usage >= {int(_ANON_SHARE_OF_USAGE_THRESHOLD * 100)}%." ), - failure_type=FailureType.system_error, + failure_type=FailureType.transient_error, ) elif not self._critical_logged: self._critical_logged = True logger.info( - "Cgroup usage crossed %d%% but anonymous memory is only %d%% of current cgroup usage; not raising.", + "Cgroup usage crossed %d%% (%s of %s) but anonymous memory is only %d%% of current cgroup usage; not raising.", int(_CRITICAL_THRESHOLD * 100), + _format_bytes(usage_bytes), + _format_bytes(limit_bytes), int(anon_share * 100), ) elif not self._critical_logged: self._critical_logged = True logger.warning( - "Cgroup usage crossed %d%% but anonymous memory signal unavailable; skipping fail-fast.", + "Cgroup usage crossed %d%% (%s of %s) but anonymous memory signal unavailable; skipping fail-fast.", int(_CRITICAL_THRESHOLD * 100), + _format_bytes(usage_bytes), + _format_bytes(limit_bytes), ) diff --git a/unit_tests/sources/file_based/test_scenarios.py b/unit_tests/sources/file_based/test_scenarios.py index d70b7f4ef..d1e88eb27 100644 --- a/unit_tests/sources/file_based/test_scenarios.py +++ b/unit_tests/sources/file_based/test_scenarios.py @@ -84,7 +84,10 @@ def assert_exception(expected_exception: type[BaseException], output: Entrypoint def _verify_read_output(output: EntrypointOutput, scenario: TestScenario[AbstractSource]) -> None: records_and_state_messages, log_messages = output.records_and_state_messages, output.logs logs = [ - message.log for message in log_messages if message.log.level.value in scenario.log_levels + message.log + for message in log_messages + if message.log.level.value in scenario.log_levels + and not message.log.message.startswith("MemoryMonitor instantiated") ] if scenario.expected_records is None: return diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 87d038220..6fdd8e8fe 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -17,37 +17,38 @@ _CGROUP_V2_STAT, _PROC_SELF_STATUS, MemoryMonitor, + _format_bytes, _read_cgroup_v2_anon_bytes, _read_process_anon_rss_bytes, ) from airbyte_cdk.utils.traced_exception import AirbyteTracedException _MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB -_MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB (below 95% logging threshold) -_MOCK_USAGE_AT_95 = "960000000\n" # 96% of 1 GB (above 95% logging threshold) -_MOCK_USAGE_AT_98 = "980000000\n" # 98% of 1 GB (at critical threshold) +_MOCK_USAGE_AT_85 = "850000000\n" # 85% of 1 GB (below 90% high-pressure threshold) +_MOCK_USAGE_AT_92 = "920000000\n" # 92% of 1 GB (above 90% high-pressure, below 95% critical) +_MOCK_USAGE_AT_96 = "960000000\n" # 96% of 1 GB (above 95% critical threshold) _MOCK_LIMIT = "1000000000\n" # 1 GB # cgroup v2 memory.stat mock content. # The "anon" field is what we parse for the cgroup-level anonymous memory signal. _MOCK_MEMORY_STAT_ANON_HIGH = ( - "anon 860000000\n" # 860 MB — 87.7% of 980 MB usage (above 85% threshold) + "anon 840000000\n" # 840 MB — 87.5% of 960 MB usage (above 85% threshold) "file 100000000\n" "kernel 20000000\n" ) _MOCK_MEMORY_STAT_ANON_LOW = ( - "anon 300000000\n" # 300 MB — 30.6% of 980 MB usage (below 85% threshold) - "file 650000000\n" + "anon 300000000\n" # 300 MB — 31.25% of 960 MB usage (below 85% threshold) + "file 630000000\n" "kernel 30000000\n" ) _MOCK_MEMORY_STAT_NO_ANON = ( - "file 650000000\n" # malformed: missing anon line + "file 630000000\n" # malformed: missing anon line "kernel 30000000\n" ) # /proc/self/status mock values (fallback when cgroup v2 memory.stat is unavailable). -_MOCK_PROC_ANON_HIGH = "RssAnon:\t 840000 kB\n" # ~860 MB — 87.7% of 980 MB usage -_MOCK_PROC_ANON_LOW = "RssAnon:\t 300000 kB\n" # ~307 MB — 31.3% of 980 MB usage +_MOCK_PROC_ANON_HIGH = "RssAnon:\t 820313 kB\n" # ~840 MB — 87.5% of 960 MB usage +_MOCK_PROC_ANON_LOW = "RssAnon:\t 300000 kB\n" # ~307 MB — 32.0% of 960 MB usage def _v2_exists(self: Path) -> bool: @@ -88,6 +89,46 @@ def test_check_interval_negative_raises() -> None: MemoryMonitor(check_interval=-1) +def test_init_logs_configured_thresholds(caplog: pytest.LogCaptureFixture) -> None: + """Instantiation should emit one INFO line with the configured thresholds and intervals.""" + with caplog.at_level(logging.INFO, logger="airbyte"): + MemoryMonitor(check_interval=1234) + init_logs = [r for r in caplog.records if "MemoryMonitor instantiated" in r.message] + assert len(init_logs) == 1 + msg = init_logs[0].message + assert "critical threshold: 95%" in msg + assert "anon share of usage threshold: 85%" in msg + assert "high-pressure threshold: 90%" in msg + assert "check interval: 1234 messages" in msg + assert "tightens to 100 under high pressure" in msg + + +# --------------------------------------------------------------------------- +# _format_bytes — unit tests +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + ("num_bytes", "expected"), + [ + (0, "0 B"), + (999, "999 B"), + (1_000, "1.00 KB"), + (1_500, "1.50 KB"), + (999_999, "1000.00 KB"), + (1_000_000, "1.00 MB"), + (960_000_000, "960.00 MB"), + (999_999_999, "1000.00 MB"), + (1_000_000_000, "1.00 GB"), + (2_109_915_136, "2.11 GB"), + (2_147_483_648, "2.15 GB"), + ], +) +def test_format_bytes(num_bytes: int, expected: str) -> None: + """`_format_bytes` renders byte counts with 2 decimals using decimal units.""" + assert _format_bytes(num_bytes) == expected + + # --------------------------------------------------------------------------- # check_memory_usage — no-op paths # --------------------------------------------------------------------------- @@ -96,6 +137,7 @@ def test_check_interval_negative_raises() -> None: def test_noop_when_no_cgroup(caplog: pytest.LogCaptureFixture) -> None: """check_memory_usage should be a no-op when cgroup is unavailable.""" monitor = MemoryMonitor() + caplog.clear() # discard the one-shot instantiation log with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", return_value=False), @@ -107,6 +149,7 @@ def test_noop_when_no_cgroup(caplog: pytest.LogCaptureFixture) -> None: def test_noop_when_limit_is_max(caplog: pytest.LogCaptureFixture) -> None: """When cgroup v2 memory.max is 'max' (unlimited), should be a no-op.""" monitor = MemoryMonitor(check_interval=1) + caplog.clear() with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), @@ -119,6 +162,7 @@ def test_noop_when_limit_is_max(caplog: pytest.LogCaptureFixture) -> None: def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None: """When cgroup limit file contains '0', should be a no-op.""" monitor = MemoryMonitor(check_interval=1) + caplog.clear() with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), @@ -134,12 +178,13 @@ def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None: def test_no_log_below_threshold(caplog: pytest.LogCaptureFixture) -> None: - """No log should be emitted when usage is below 95%.""" + """No log should be emitted when usage is below the 90% high-pressure threshold.""" monitor = MemoryMonitor(check_interval=1) + caplog.clear() with ( caplog.at_level(logging.DEBUG, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_85)), ): monitor.check_memory_usage() # Only the debug probe message, no info/warning @@ -152,11 +197,11 @@ def test_no_log_below_threshold(caplog: pytest.LogCaptureFixture) -> None: def test_cgroup_v1_activates_high_pressure_mode(caplog: pytest.LogCaptureFixture) -> None: - """Memory reading works with cgroup v1 paths and activates high-pressure mode at 95%.""" + """Memory reading works with cgroup v1 paths and activates high-pressure mode at 90%.""" def mock_read_text(self: Path) -> str: if self == _CGROUP_V1_USAGE: - return _MOCK_USAGE_AT_95 + return _MOCK_USAGE_AT_92 if self == _CGROUP_V1_LIMIT: return _MOCK_LIMIT return "" @@ -182,10 +227,11 @@ def mock_read_text(self: Path) -> str: def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixture) -> None: """Monitor should only check cgroup files every check_interval messages.""" monitor = MemoryMonitor(check_interval=5000) + caplog.clear() with ( caplog.at_level(logging.INFO, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_92)), ): # First 4999 calls should be skipped for _ in range(4999): @@ -205,6 +251,7 @@ def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixtur def test_malformed_cgroup_file_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None: """Malformed cgroup files should not crash the sync.""" monitor = MemoryMonitor(check_interval=1) + caplog.clear() with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), @@ -217,6 +264,7 @@ def test_malformed_cgroup_file_degrades_gracefully(caplog: pytest.LogCaptureFixt def test_empty_cgroup_file_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None: """Empty cgroup file content should not crash the sync.""" monitor = MemoryMonitor(check_interval=1) + caplog.clear() with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), @@ -233,6 +281,7 @@ def mock_read_text(self: Path) -> str: raise OSError("Permission denied") monitor = MemoryMonitor(check_interval=1) + caplog.clear() with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), @@ -290,7 +339,7 @@ def test_read_cgroup_v2_anon_bytes_parses_anon_field() -> None: """Correctly parses the 'anon' field from cgroup v2 memory.stat.""" with patch.object(Path, "read_text", return_value=_MOCK_MEMORY_STAT_ANON_HIGH): result = _read_cgroup_v2_anon_bytes() - assert result == 860000000 + assert result == 840000000 def test_read_cgroup_v2_anon_bytes_returns_none_when_anon_absent() -> None: @@ -314,7 +363,7 @@ def raise_oserror(self: Path) -> str: # --------------------------------------------------------------------------- -def _v2_full_mock(usage: str = _MOCK_USAGE_AT_98, memory_stat: str = _MOCK_MEMORY_STAT_ANON_HIGH): +def _v2_full_mock(usage: str = _MOCK_USAGE_AT_96, memory_stat: str = _MOCK_MEMORY_STAT_ANON_HIGH): """Return a mock read_text that serves cgroup v2 current/max AND memory.stat.""" def mock_read_text(self: Path) -> str: @@ -330,7 +379,7 @@ def mock_read_text(self: Path) -> str: def test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold() -> None: - """Fail-fast raises when cgroup >= 98% and anon >= 85% of current usage.""" + """Fail-fast raises when cgroup >= 95% and anon >= 85% of current usage.""" monitor = MemoryMonitor(check_interval=1) with ( patch.object(Path, "exists", _v2_exists), @@ -338,16 +387,24 @@ def test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold() - ): with pytest.raises(AirbyteTracedException) as exc_info: monitor.check_memory_usage() - assert exc_info.value.failure_type == FailureType.system_error + assert exc_info.value.failure_type == FailureType.transient_error assert "critical threshold" in (exc_info.value.message or "") - assert "98%" in (exc_info.value.message or "") + assert "96%" in (exc_info.value.message or "") assert "anon share of usage" in (exc_info.value.internal_message or "") + # Human-readable byte formatting: 960 MB usage, 1.00 GB limit, 840 MB anon. + internal = exc_info.value.internal_message or "" + assert "960.00 MB" in internal + assert "1.00 GB" in internal + assert "840.00 MB" in internal + # Raw byte counts should no longer appear in the message. + assert "960000000" not in internal + assert "1000000000" not in internal def test_no_raise_when_cgroup_critical_but_anon_share_of_usage_below_threshold( caplog: pytest.LogCaptureFixture, ) -> None: - """No exception when cgroup >= 98% but anon < 85% of usage; logs once then silences.""" + """No exception when cgroup >= 95% but anon < 85% of usage; logs once then silences.""" monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.INFO, logger="airbyte"), @@ -368,7 +425,7 @@ def test_falls_back_to_process_rssanon_when_cgroup_v2_anon_unavailable() -> None def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_AT_98 + return _MOCK_USAGE_AT_96 if self == _CGROUP_V2_MAX: return _MOCK_LIMIT if self == _CGROUP_V2_STAT: @@ -394,13 +451,13 @@ def test_falls_back_to_process_rssanon_low_and_does_not_raise( def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_AT_98 + return _MOCK_USAGE_AT_96 if self == _CGROUP_V2_MAX: return _MOCK_LIMIT if self == _CGROUP_V2_STAT: return _MOCK_MEMORY_STAT_NO_ANON # anon line missing if self == _PROC_SELF_STATUS: - return _MOCK_PROC_ANON_LOW # ~307 MB — 31.3% of 980 MB usage (below 85%) + return _MOCK_PROC_ANON_LOW # ~307 MB — 32.0% of 960 MB usage (below 85%) return "" monitor = MemoryMonitor(check_interval=1) @@ -421,7 +478,7 @@ def test_no_raise_when_anonymous_memory_signal_unavailable_at_critical_usage( def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_AT_98 + return _MOCK_USAGE_AT_96 if self == _CGROUP_V2_MAX: return _MOCK_LIMIT if self == _CGROUP_V2_STAT: @@ -444,17 +501,17 @@ def mock_read_text(self: Path) -> str: assert monitor._critical_logged -def test_switches_to_high_pressure_check_interval_after_crossing_95_percent( +def test_switches_to_high_pressure_check_interval_after_crossing_90_percent( caplog: pytest.LogCaptureFixture, ) -> None: - """Once usage crosses 95%, the monitor tightens polling from 5000 to 100 messages.""" + """Once usage crosses 90%, the monitor tightens polling from 5000 to 100 messages.""" monitor = MemoryMonitor(check_interval=5000) assert not monitor._high_pressure_mode with ( caplog.at_level(logging.INFO, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_92)), ): # Pump 5000 messages to trigger the first real check for _ in range(5000):