diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d6c92f9fa..82136b65f 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -281,7 +281,17 @@ def read( stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) for message in self.source.read(self.logger, config, catalog, state): yield self.handle_record_counts(message, stream_message_counter) - self._memory_monitor.check_memory_usage() + try: + self._memory_monitor.check_memory_usage() + except AirbyteTracedException: + # Flush queued messages (state checkpoints, logs) before propagating + # the memory fail-fast exception, so the platform receives the last + # committed state for the next sync. + for queued_message in self._emit_queued_messages(self.source): + yield self.handle_record_counts(queued_message, stream_message_counter) + raise + + # Flush queued messages after normal completion of the read loop. for message in self._emit_queued_messages(self.source): yield self.handle_record_counts(message, stream_message_counter) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 0767ce3bf..5609b1acd 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -2,39 +2,122 @@ # Copyright (c) 2026 Airbyte, Inc., all rights reserved. # -"""Source-side memory introspection to log memory usage approaching container limits.""" +"""Source-side memory introspection with fail-fast shutdown on memory threshold.""" import logging from pathlib import Path from typing import Optional +from airbyte_cdk.models import FailureType +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + logger = logging.getLogger("airbyte") # cgroup v2 paths _CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current") _CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max") +_CGROUP_V2_STAT = Path("/sys/fs/cgroup/memory.stat") # cgroup v1 paths — TODO: remove if all deployments are confirmed cgroup v2 _CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes") _CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes") -# Log when usage is at or above 90% -_MEMORY_THRESHOLD = 0.90 +# Process-level anonymous RSS from /proc/self/status (Linux only, no extra dependency) +_PROC_SELF_STATUS = Path("/proc/self/status") + +# Raise AirbyteTracedException when BOTH conditions are met: +# 1. cgroup usage >= critical threshold +# 2. anonymous memory >= anon-share threshold of *current cgroup usage* +# Comparing anon to usage (not limit) answers the more relevant question: +# "is most of the near-OOM memory actually process-owned anonymous memory?" +_CRITICAL_THRESHOLD = 0.98 +_ANON_SHARE_OF_USAGE_THRESHOLD = 0.85 -# Check interval (every N messages) +# Check interval (every N messages) — tightens after crossing high-pressure threshold _DEFAULT_CHECK_INTERVAL = 5000 +_HIGH_PRESSURE_CHECK_INTERVAL = 100 +_HIGH_PRESSURE_THRESHOLD = 0.95 + + +def _read_cgroup_v2_anon_bytes() -> Optional[int]: + """Read cgroup-level anonymous memory from ``/sys/fs/cgroup/memory.stat``. + + The ``anon`` field in ``memory.stat`` accounts for all anonymous pages + charged to the cgroup, which is a more accurate view of process-private + memory pressure than per-process ``RssAnon`` in multi-process containers. + + Returns anonymous bytes, or ``None`` if unavailable or malformed. + """ + try: + for line in _CGROUP_V2_STAT.read_text().splitlines(): + if line.startswith("anon "): + return int(line.split()[1]) + except (OSError, ValueError): + return None + return None + + +def _read_process_anon_rss_bytes() -> Optional[int]: + """Read process-private anonymous resident memory from /proc/self/status. + + Parses the ``RssAnon`` field which represents private anonymous pages — the + closest proxy for Python-heap memory pressure. Unlike ``VmRSS`` (which is + ``RssAnon + RssFile + RssShmem``), ``RssAnon`` is not inflated by mmap'd + file-backed or shared resident pages. + + Returns anonymous RSS in bytes, or None if unavailable (non-Linux, + permission error, or ``RssAnon`` field not present in the kernel). + """ + try: + status_text = _PROC_SELF_STATUS.read_text() + for line in status_text.splitlines(): + if line.startswith("RssAnon:"): + # Format: "RssAnon: 12345 kB" + parts = line.split() + if len(parts) >= 2: + return int(parts[1]) * 1024 # Convert kB to bytes + return None + except (OSError, ValueError): + return None class MemoryMonitor: - """Monitors container memory usage via cgroup files and logs warnings when usage is high. + """Monitors container memory usage via cgroup files and raises on critical pressure. Lazily probes cgroup v2 then v1 files on the first call to ``check_memory_usage()``. Caches which version exists. If neither is found (local dev / CI), all subsequent calls are instant no-ops. - Logs a WARNING on every check interval (default 5000 messages) when memory - usage is at or above 90% of the container limit. This gives breadcrumb - trails showing whether memory is climbing, plateauing, or sawtoothing. + **Logging (event-based, not periodic):** + + - One INFO when high-pressure mode activates (usage first crosses 95%) + - One INFO/WARNING when critical threshold (98%) is crossed but we do + *not* raise (either anon share is below the fail-fast gate or the + anonymous memory signal is unavailable) + - No repeated per-check warnings — logging is driven by state + transitions, not periodic sampling + + **High-pressure polling:** Once cgroup usage first crosses 95%, the check + interval permanently tightens from 5000 to 100 messages to narrow the race + window near OOM. + + **Fail-fast:** Raises ``AirbyteTracedException`` with + ``FailureType.system_error`` when *both*: + + 1. Cgroup usage >= 98% of the container limit (container is near OOM-kill) + 2. Anonymous memory >= 85% of *current cgroup usage* (most of the charged + memory is process-private anonymous pages, not file-backed cache) + + The anonymous memory signal is read from cgroup v2 ``memory.stat`` (``anon`` + field) when available, falling back to ``/proc/self/status`` ``RssAnon``. + Comparing anonymous memory to current usage (not the container limit) answers + the more relevant question: "is most of the near-OOM memory actually + process-owned?" This avoids the brittleness of comparing to the full limit + where anonymous memory can dominate usage yet still fall short of a + limit-based percentage threshold. + + If the anonymous memory signal is unavailable, the monitor logs a warning + and skips fail-fast rather than falling back to cgroup-only raising. """ def __init__( @@ -47,6 +130,8 @@ def __init__( self._message_count = 0 self._cgroup_version: Optional[int] = None self._probed = False + self._high_pressure_mode = False + self._critical_logged = False def _probe_cgroup(self) -> None: """Detect which cgroup version (if any) is available. @@ -101,15 +186,33 @@ def _read_memory(self) -> Optional[tuple[int, int]]: logger.debug("Failed to read cgroup memory files; skipping memory check.") return None + def _read_anon_bytes(self) -> Optional[tuple[int, str]]: + """Read anonymous memory bytes from the best available source. + + Tries cgroup v2 ``memory.stat`` (``anon`` field) first, then falls back + to ``/proc/self/status`` ``RssAnon``. Returns ``(bytes, source_label)`` + or ``None`` if neither is available. + """ + if self._cgroup_version == 2: + cgroup_anon = _read_cgroup_v2_anon_bytes() + if cgroup_anon is not None: + return cgroup_anon, "cgroup memory.stat anon" + + proc_anon = _read_process_anon_rss_bytes() + if proc_anon is not None: + return proc_anon, "process RssAnon" + + return None + def check_memory_usage(self) -> None: - """Check memory usage and log when above 90%. + """Check memory usage and raise at critical dual-condition. Intended to be called on every message. The monitor internally tracks a message counter and only reads cgroup files every ``check_interval`` - messages (default 5000) to minimise I/O overhead. + messages (default 5000). Once usage crosses 95%, the interval tightens + to 100 messages for the remainder of the sync. - Logs a WARNING on every check above 90% to provide breadcrumb trails - showing memory trends over the sync lifetime. + Logging is event-based (one-shot on state transitions), not periodic. This method is a no-op if cgroup files are unavailable. """ @@ -118,7 +221,10 @@ def check_memory_usage(self) -> None: return self._message_count += 1 - if self._message_count % self._check_interval != 0: + interval = ( + _HIGH_PRESSURE_CHECK_INTERVAL if self._high_pressure_mode else self._check_interval + ) + if self._message_count % interval != 0: return memory_info = self._read_memory() @@ -131,10 +237,43 @@ def check_memory_usage(self) -> None: usage_gb = usage_bytes / (1024**3) limit_gb = limit_bytes / (1024**3) - if usage_ratio >= _MEMORY_THRESHOLD: - logger.warning( - "Source memory usage at %d%% of container limit (%.2f / %.2f GB).", - usage_percent, - usage_gb, - limit_gb, + if usage_ratio >= _HIGH_PRESSURE_THRESHOLD and not self._high_pressure_mode: + self._high_pressure_mode = True + logger.info( + "Memory usage crossed %d%%; tightening check interval from %d to %d messages.", + int(_HIGH_PRESSURE_THRESHOLD * 100), + self._check_interval, + _HIGH_PRESSURE_CHECK_INTERVAL, ) + + # Fail-fast: dual-condition check + if usage_ratio >= _CRITICAL_THRESHOLD: + anon_info = self._read_anon_bytes() + if anon_info is not None: + anon_bytes, anon_source = anon_info + anon_share = anon_bytes / usage_bytes + if anon_share >= _ANON_SHARE_OF_USAGE_THRESHOLD: + raise AirbyteTracedException( + message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).", + internal_message=( + f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). " + f"Anonymous memory ({anon_source}): {anon_bytes} bytes " + f"({int(anon_share * 100)}% of current cgroup usage). " + f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, " + f"anon share of usage >= {int(_ANON_SHARE_OF_USAGE_THRESHOLD * 100)}%." + ), + failure_type=FailureType.system_error, + ) + elif not self._critical_logged: + self._critical_logged = True + logger.info( + "Cgroup usage crossed %d%% but anonymous memory is only %d%% of current cgroup usage; not raising.", + int(_CRITICAL_THRESHOLD * 100), + int(anon_share * 100), + ) + elif not self._critical_logged: + self._critical_logged = True + logger.warning( + "Cgroup usage crossed %d%% but anonymous memory signal unavailable; skipping fail-fast.", + int(_CRITICAL_THRESHOLD * 100), + ) diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 520131881..35cd608a3 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -856,3 +856,72 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json( # There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here record_messages = list(filter(lambda message: "RECORD" in message, messages)) assert len(record_messages) == 2 + + +def test_memory_failfast_flushes_queued_state_before_raising(mocker): + """Record emitted → check_memory_usage raises → queued STATE flushed with recordCount → exception propagates.""" + queued_state = AirbyteMessage( + type=Type.STATE, + state=AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="users", namespace=None), + stream_state=AirbyteStateBlob({"cursor": "abc123"}), + ), + ), + ) + + message_repository = MagicMock() + message_repository.consume_queue.side_effect = [ + [queued_state], # flush during fail-fast exception handling + [], # normal end-of-loop flush (not reached) + ] + mocker.patch.object( + MockSource, + "message_repository", + new_callable=mocker.PropertyMock, + return_value=message_repository, + ) + + record = AirbyteMessage( + record=AirbyteRecordMessage(stream="users", data={"id": 1}, emitted_at=1), + type=Type.RECORD, + ) + mocker.patch.object(MockSource, "read_state", return_value={}) + mocker.patch.object(MockSource, "read_catalog", return_value={}) + mocker.patch.object(MockSource, "read", return_value=[record]) + + fail_fast_exc = AirbyteTracedException( + message="Memory usage exceeded critical threshold (98%)", + failure_type=FailureType.system_error, + ) + + entrypoint_obj = AirbyteEntrypoint(MockSource()) + mocker.patch.object( + entrypoint_obj._memory_monitor, "check_memory_usage", side_effect=fail_fast_exc + ) + + spec = ConnectorSpecification(connectionSpecification={}) + config: dict[str, str] = {} + + # Call read() directly to get AirbyteMessage objects (not serialised strings) + gen = entrypoint_obj.read(spec, config, {}, []) + + # 1. First yielded message is the RECORD + first = next(gen) + assert first.type == Type.RECORD + assert first.record.stream == "users" # type: ignore[union-attr] + + # 2. Second yielded message is the queued STATE (flushed before exception) + second = next(gen) + assert second.type == Type.STATE + assert second.state.stream.stream_state == AirbyteStateBlob({"cursor": "abc123"}) # type: ignore[union-attr] + + # 3. The STATE passed through handle_record_counts, so sourceStats.recordCount == 1.0 + assert second.state.sourceStats is not None # type: ignore[union-attr] + assert second.state.sourceStats.recordCount == 1.0 # type: ignore[union-attr] + + # 4. Next iteration re-raises the AirbyteTracedException + with pytest.raises(AirbyteTracedException) as exc_info: + next(gen) + assert exc_info.value is fail_fast_exc diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index cf8250465..87d038220 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -8,18 +8,47 @@ import pytest +from airbyte_cdk.models import FailureType from airbyte_cdk.utils.memory_monitor import ( _CGROUP_V1_LIMIT, _CGROUP_V1_USAGE, _CGROUP_V2_CURRENT, _CGROUP_V2_MAX, + _CGROUP_V2_STAT, + _PROC_SELF_STATUS, MemoryMonitor, + _read_cgroup_v2_anon_bytes, + _read_process_anon_rss_bytes, ) +from airbyte_cdk.utils.traced_exception import AirbyteTracedException _MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB -_MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB +_MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB (below 95% logging threshold) +_MOCK_USAGE_AT_95 = "960000000\n" # 96% of 1 GB (above 95% logging threshold) +_MOCK_USAGE_AT_98 = "980000000\n" # 98% of 1 GB (at critical threshold) _MOCK_LIMIT = "1000000000\n" # 1 GB +# cgroup v2 memory.stat mock content. +# The "anon" field is what we parse for the cgroup-level anonymous memory signal. +_MOCK_MEMORY_STAT_ANON_HIGH = ( + "anon 860000000\n" # 860 MB — 87.7% of 980 MB usage (above 85% threshold) + "file 100000000\n" + "kernel 20000000\n" +) +_MOCK_MEMORY_STAT_ANON_LOW = ( + "anon 300000000\n" # 300 MB — 30.6% of 980 MB usage (below 85% threshold) + "file 650000000\n" + "kernel 30000000\n" +) +_MOCK_MEMORY_STAT_NO_ANON = ( + "file 650000000\n" # malformed: missing anon line + "kernel 30000000\n" +) + +# /proc/self/status mock values (fallback when cgroup v2 memory.stat is unavailable). +_MOCK_PROC_ANON_HIGH = "RssAnon:\t 840000 kB\n" # ~860 MB — 87.7% of 980 MB usage +_MOCK_PROC_ANON_LOW = "RssAnon:\t 300000 kB\n" # ~307 MB — 31.3% of 980 MB usage + def _v2_exists(self: Path) -> bool: return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) @@ -104,53 +133,17 @@ def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None: # --------------------------------------------------------------------------- -def test_no_warning_below_threshold(caplog: pytest.LogCaptureFixture) -> None: - """No warning should be emitted when usage is below 90%.""" - monitor = MemoryMonitor(check_interval=1) - with ( - caplog.at_level(logging.WARNING, logger="airbyte"), - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_BELOW)), - ): - monitor.check_memory_usage() - assert not caplog.records - - -# --------------------------------------------------------------------------- -# check_memory_usage — at/above 90% threshold -# --------------------------------------------------------------------------- - - -def test_logs_at_90_percent(caplog: pytest.LogCaptureFixture) -> None: - """Warning log should be emitted at 91% usage (above 90% threshold).""" +def test_no_log_below_threshold(caplog: pytest.LogCaptureFixture) -> None: + """No log should be emitted when usage is below 95%.""" monitor = MemoryMonitor(check_interval=1) with ( - caplog.at_level(logging.WARNING, logger="airbyte"), - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), - ): - monitor.check_memory_usage() - - assert len(caplog.records) == 1 - assert "91%" in caplog.records[0].message - - -def test_logs_on_every_check_above_90_percent(caplog: pytest.LogCaptureFixture) -> None: - """Warning should be logged on EVERY check interval when above 90%, not just once.""" - monitor = MemoryMonitor(check_interval=1) - with ( - caplog.at_level(logging.WARNING, logger="airbyte"), + caplog.at_level(logging.DEBUG, logger="airbyte"), patch.object(Path, "exists", _v2_exists), patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), ): monitor.check_memory_usage() - monitor.check_memory_usage() - monitor.check_memory_usage() - - # All three checks should produce a warning (no one-shot flag) - assert len(caplog.records) == 3 - for record in caplog.records: - assert "91%" in record.message + # Only the debug probe message, no info/warning + assert all(r.levelno <= logging.DEBUG for r in caplog.records) # --------------------------------------------------------------------------- @@ -158,26 +151,27 @@ def test_logs_on_every_check_above_90_percent(caplog: pytest.LogCaptureFixture) # --------------------------------------------------------------------------- -def test_cgroup_v1_emits_warning(caplog: pytest.LogCaptureFixture) -> None: - """Memory reading should work with cgroup v1 paths (proves v1 detection works).""" +def test_cgroup_v1_activates_high_pressure_mode(caplog: pytest.LogCaptureFixture) -> None: + """Memory reading works with cgroup v1 paths and activates high-pressure mode at 95%.""" def mock_read_text(self: Path) -> str: if self == _CGROUP_V1_USAGE: - return _MOCK_USAGE_AT_90 + return _MOCK_USAGE_AT_95 if self == _CGROUP_V1_LIMIT: return _MOCK_LIMIT return "" monitor = MemoryMonitor(check_interval=1) with ( - caplog.at_level(logging.WARNING, logger="airbyte"), + caplog.at_level(logging.INFO, logger="airbyte"), patch.object(Path, "exists", _v1_exists), patch.object(Path, "read_text", mock_read_text), ): monitor.check_memory_usage() - assert len(caplog.records) == 1 - assert "91%" in caplog.records[0].message + assert monitor._high_pressure_mode + info_records = [r for r in caplog.records if r.levelno == logging.INFO] + assert any("tightening check interval" in r.message for r in info_records) # --------------------------------------------------------------------------- @@ -189,17 +183,18 @@ def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixtur """Monitor should only check cgroup files every check_interval messages.""" monitor = MemoryMonitor(check_interval=5000) with ( - caplog.at_level(logging.WARNING, logger="airbyte"), + caplog.at_level(logging.INFO, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)), ): # First 4999 calls should be skipped for _ in range(4999): monitor.check_memory_usage() - assert not caplog.records - # Call 5000 should trigger the actual check + info_records = [r for r in caplog.records if r.levelno >= logging.INFO] + assert not info_records + # Call 5000 should trigger the actual check and activate high-pressure mode monitor.check_memory_usage() - assert len(caplog.records) == 1 + assert monitor._high_pressure_mode # --------------------------------------------------------------------------- @@ -245,3 +240,236 @@ def mock_read_text(self: Path) -> str: ): monitor.check_memory_usage() assert not caplog.records + + +# --------------------------------------------------------------------------- +# _read_process_anon_rss_bytes — unit tests +# --------------------------------------------------------------------------- + + +def test_read_process_anon_rss_bytes_parses_rssanon() -> None: + """Correctly parses RssAnon from /proc/self/status content.""" + status_content = ( + "Name:\tpython3\nVmRSS:\t 1000000 kB\nRssAnon:\t 512000 kB\nRssShmem:\t 0 kB\n" + ) + with patch.object(Path, "read_text", return_value=status_content): + result = _read_process_anon_rss_bytes() + assert result == 512000 * 1024 + + +def test_read_process_anon_rss_bytes_returns_none_on_missing_file() -> None: + """Returns None when /proc/self/status is unreadable.""" + + def raise_oserror(self: Path) -> str: + raise OSError("No such file") + + with patch.object(Path, "read_text", raise_oserror): + assert _read_process_anon_rss_bytes() is None + + +def test_read_process_anon_rss_bytes_returns_none_when_rssanon_absent() -> None: + """Returns None when RssAnon line is not present (e.g. older kernel).""" + with patch.object(Path, "read_text", return_value="Name:\tpython3\nVmRSS:\t 512000 kB\n"): + assert _read_process_anon_rss_bytes() is None + + +def test_read_process_anon_rss_bytes_ignores_vmrss() -> None: + """Ensures the parser reads RssAnon specifically, not VmRSS.""" + # Only VmRSS present, no RssAnon — should return None + status_content = "VmRSS:\t 900000 kB\n" + with patch.object(Path, "read_text", return_value=status_content): + assert _read_process_anon_rss_bytes() is None + + +# --------------------------------------------------------------------------- +# _read_cgroup_v2_anon_bytes — unit tests +# --------------------------------------------------------------------------- + + +def test_read_cgroup_v2_anon_bytes_parses_anon_field() -> None: + """Correctly parses the 'anon' field from cgroup v2 memory.stat.""" + with patch.object(Path, "read_text", return_value=_MOCK_MEMORY_STAT_ANON_HIGH): + result = _read_cgroup_v2_anon_bytes() + assert result == 860000000 + + +def test_read_cgroup_v2_anon_bytes_returns_none_when_anon_absent() -> None: + """Returns None when memory.stat lacks the 'anon' line.""" + with patch.object(Path, "read_text", return_value=_MOCK_MEMORY_STAT_NO_ANON): + assert _read_cgroup_v2_anon_bytes() is None + + +def test_read_cgroup_v2_anon_bytes_returns_none_on_oserror() -> None: + """Returns None when memory.stat is unreadable.""" + + def raise_oserror(self: Path) -> str: + raise OSError("No such file") + + with patch.object(Path, "read_text", raise_oserror): + assert _read_cgroup_v2_anon_bytes() is None + + +# --------------------------------------------------------------------------- +# check_memory_usage — fail-fast (dual-condition: anon share of usage) +# --------------------------------------------------------------------------- + + +def _v2_full_mock(usage: str = _MOCK_USAGE_AT_98, memory_stat: str = _MOCK_MEMORY_STAT_ANON_HIGH): + """Return a mock read_text that serves cgroup v2 current/max AND memory.stat.""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return usage + if self == _CGROUP_V2_MAX: + return _MOCK_LIMIT + if self == _CGROUP_V2_STAT: + return memory_stat + return "" + + return mock_read_text + + +def test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold() -> None: + """Fail-fast raises when cgroup >= 98% and anon >= 85% of current usage.""" + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_full_mock()), + ): + with pytest.raises(AirbyteTracedException) as exc_info: + monitor.check_memory_usage() + assert exc_info.value.failure_type == FailureType.system_error + assert "critical threshold" in (exc_info.value.message or "") + assert "98%" in (exc_info.value.message or "") + assert "anon share of usage" in (exc_info.value.internal_message or "") + + +def test_no_raise_when_cgroup_critical_but_anon_share_of_usage_below_threshold( + caplog: pytest.LogCaptureFixture, +) -> None: + """No exception when cgroup >= 98% but anon < 85% of usage; logs once then silences.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.INFO, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_full_mock(memory_stat=_MOCK_MEMORY_STAT_ANON_LOW)), + ): + monitor.check_memory_usage() # Should NOT raise — logs one-shot info + monitor.check_memory_usage() # Should NOT log again (_critical_logged is True) + info_records = [r for r in caplog.records if r.levelno == logging.INFO] + # Exactly one critical-not-raising log (one-shot), plus one high-pressure-mode log + critical_logs = [r for r in info_records if "not raising" in r.message] + assert len(critical_logs) == 1 + assert monitor._critical_logged + + +def test_falls_back_to_process_rssanon_when_cgroup_v2_anon_unavailable() -> None: + """Uses /proc/self/status RssAnon when memory.stat anon is missing, and still raises.""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return _MOCK_USAGE_AT_98 + if self == _CGROUP_V2_MAX: + return _MOCK_LIMIT + if self == _CGROUP_V2_STAT: + return _MOCK_MEMORY_STAT_NO_ANON # anon line missing + if self == _PROC_SELF_STATUS: + return _MOCK_PROC_ANON_HIGH + return "" + + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): + with pytest.raises(AirbyteTracedException) as exc_info: + monitor.check_memory_usage() + assert "process RssAnon" in (exc_info.value.internal_message or "") + + +def test_falls_back_to_process_rssanon_low_and_does_not_raise( + caplog: pytest.LogCaptureFixture, +) -> None: + """Uses /proc/self/status RssAnon when memory.stat anon is missing, but does not raise when below threshold.""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return _MOCK_USAGE_AT_98 + if self == _CGROUP_V2_MAX: + return _MOCK_LIMIT + if self == _CGROUP_V2_STAT: + return _MOCK_MEMORY_STAT_NO_ANON # anon line missing + if self == _PROC_SELF_STATUS: + return _MOCK_PROC_ANON_LOW # ~307 MB — 31.3% of 980 MB usage (below 85%) + return "" + + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.INFO, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): + monitor.check_memory_usage() # Should NOT raise + info_records = [r for r in caplog.records if r.levelno == logging.INFO] + assert any("not raising" in r.message for r in info_records) + + +def test_no_raise_when_anonymous_memory_signal_unavailable_at_critical_usage( + caplog: pytest.LogCaptureFixture, +) -> None: + """Logs warning once and skips fail-fast when neither anon source is available.""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return _MOCK_USAGE_AT_98 + if self == _CGROUP_V2_MAX: + return _MOCK_LIMIT + if self == _CGROUP_V2_STAT: + raise OSError("No such file") + if self == _PROC_SELF_STATUS: + raise OSError("No such file") + return "" + + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): + monitor.check_memory_usage() # Should NOT raise — logs one-shot warning + monitor.check_memory_usage() # Should NOT log again + warning_records = [r for r in caplog.records if r.levelno == logging.WARNING] + assert len(warning_records) == 1 + assert "anonymous memory signal unavailable" in warning_records[0].message + assert monitor._critical_logged + + +def test_switches_to_high_pressure_check_interval_after_crossing_95_percent( + caplog: pytest.LogCaptureFixture, +) -> None: + """Once usage crosses 95%, the monitor tightens polling from 5000 to 100 messages.""" + monitor = MemoryMonitor(check_interval=5000) + assert not monitor._high_pressure_mode + + with ( + caplog.at_level(logging.INFO, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)), + ): + # Pump 5000 messages to trigger the first real check + for _ in range(5000): + monitor.check_memory_usage() + + assert monitor._high_pressure_mode + info_records = [r for r in caplog.records if r.levelno == logging.INFO] + assert any("tightening check interval" in r.message for r in info_records) + + # After switching to high-pressure mode, checks happen every 100 messages. + # Verify by pumping 100 messages at critical usage with high anon — should raise. + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_full_mock()), + ): + with pytest.raises(AirbyteTracedException): + for _ in range(100): + monitor.check_memory_usage()