Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
13 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,17 @@ def read(
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
self._memory_monitor.check_memory_usage()
try:
self._memory_monitor.check_memory_usage()
except AirbyteTracedException:
# Flush queued messages (state checkpoints, logs) before propagating
# the memory fail-fast exception, so the platform receives the last
# committed state for the next sync.
for queued_message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(queued_message, stream_message_counter)
raise

# Flush queued messages after normal completion of the read loop.
for message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(message, stream_message_counter)

Expand Down
177 changes: 158 additions & 19 deletions airbyte_cdk/utils/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,122 @@
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#

"""Source-side memory introspection to log memory usage approaching container limits."""
"""Source-side memory introspection with fail-fast shutdown on memory threshold."""

import logging
from pathlib import Path
from typing import Optional

from airbyte_cdk.models import FailureType
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

logger = logging.getLogger("airbyte")

# cgroup v2 paths
_CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current")
_CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max")
_CGROUP_V2_STAT = Path("/sys/fs/cgroup/memory.stat")

# cgroup v1 paths — TODO: remove if all deployments are confirmed cgroup v2
_CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
_CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")

# Log when usage is at or above 90%
_MEMORY_THRESHOLD = 0.90
# Process-level anonymous RSS from /proc/self/status (Linux only, no extra dependency)
_PROC_SELF_STATUS = Path("/proc/self/status")

# Raise AirbyteTracedException when BOTH conditions are met:
# 1. cgroup usage >= critical threshold
# 2. anonymous memory >= anon-share threshold of *current cgroup usage*
# Comparing anon to usage (not limit) answers the more relevant question:
# "is most of the near-OOM memory actually process-owned anonymous memory?"
_CRITICAL_THRESHOLD = 0.98
_ANON_SHARE_OF_USAGE_THRESHOLD = 0.85

# Check interval (every N messages)
# Check interval (every N messages) — tightens after crossing high-pressure threshold
_DEFAULT_CHECK_INTERVAL = 5000
_HIGH_PRESSURE_CHECK_INTERVAL = 100
_HIGH_PRESSURE_THRESHOLD = 0.95


def _read_cgroup_v2_anon_bytes() -> Optional[int]:
"""Read cgroup-level anonymous memory from ``/sys/fs/cgroup/memory.stat``.

The ``anon`` field in ``memory.stat`` accounts for all anonymous pages
charged to the cgroup, which is a more accurate view of process-private
memory pressure than per-process ``RssAnon`` in multi-process containers.

Returns anonymous bytes, or ``None`` if unavailable or malformed.
"""
try:
for line in _CGROUP_V2_STAT.read_text().splitlines():
if line.startswith("anon "):
return int(line.split()[1])
except (OSError, ValueError):
return None
return None


def _read_process_anon_rss_bytes() -> Optional[int]:
"""Read process-private anonymous resident memory from /proc/self/status.

Parses the ``RssAnon`` field which represents private anonymous pages — the
closest proxy for Python-heap memory pressure. Unlike ``VmRSS`` (which is
``RssAnon + RssFile + RssShmem``), ``RssAnon`` is not inflated by mmap'd
file-backed or shared resident pages.

Returns anonymous RSS in bytes, or None if unavailable (non-Linux,
permission error, or ``RssAnon`` field not present in the kernel).
"""
try:
status_text = _PROC_SELF_STATUS.read_text()
for line in status_text.splitlines():
if line.startswith("RssAnon:"):
# Format: "RssAnon: 12345 kB"
parts = line.split()
if len(parts) >= 2:
return int(parts[1]) * 1024 # Convert kB to bytes
return None
except (OSError, ValueError):
return None


class MemoryMonitor:
"""Monitors container memory usage via cgroup files and logs warnings when usage is high.
"""Monitors container memory usage via cgroup files and raises on critical pressure.

Lazily probes cgroup v2 then v1 files on the first call to
``check_memory_usage()``. Caches which version exists.
If neither is found (local dev / CI), all subsequent calls are instant no-ops.

Logs a WARNING on every check interval (default 5000 messages) when memory
usage is at or above 90% of the container limit. This gives breadcrumb
trails showing whether memory is climbing, plateauing, or sawtoothing.
**Logging (event-based, not periodic):**

- One INFO when high-pressure mode activates (usage first crosses 95%)
- One INFO/WARNING when critical threshold (98%) is crossed but we do
*not* raise (either anon share is below the fail-fast gate or the
anonymous memory signal is unavailable)
- No repeated per-check warnings — logging is driven by state
transitions, not periodic sampling

**High-pressure polling:** Once cgroup usage first crosses 95%, the check
interval permanently tightens from 5000 to 100 messages to narrow the race
window near OOM.

**Fail-fast:** Raises ``AirbyteTracedException`` with
``FailureType.system_error`` when *both*:

1. Cgroup usage >= 98% of the container limit (container is near OOM-kill)
2. Anonymous memory >= 85% of *current cgroup usage* (most of the charged
memory is process-private anonymous pages, not file-backed cache)

The anonymous memory signal is read from cgroup v2 ``memory.stat`` (``anon``
field) when available, falling back to ``/proc/self/status`` ``RssAnon``.
Comparing anonymous memory to current usage (not the container limit) answers
the more relevant question: "is most of the near-OOM memory actually
process-owned?" This avoids the brittleness of comparing to the full limit
where anonymous memory can dominate usage yet still fall short of a
limit-based percentage threshold.

If the anonymous memory signal is unavailable, the monitor logs a warning
and skips fail-fast rather than falling back to cgroup-only raising.
"""

def __init__(
Expand All @@ -47,6 +130,8 @@ def __init__(
self._message_count = 0
self._cgroup_version: Optional[int] = None
self._probed = False
self._high_pressure_mode = False
self._critical_logged = False

def _probe_cgroup(self) -> None:
"""Detect which cgroup version (if any) is available.
Expand Down Expand Up @@ -101,15 +186,33 @@ def _read_memory(self) -> Optional[tuple[int, int]]:
logger.debug("Failed to read cgroup memory files; skipping memory check.")
return None

def _read_anon_bytes(self) -> Optional[tuple[int, str]]:
"""Read anonymous memory bytes from the best available source.

Tries cgroup v2 ``memory.stat`` (``anon`` field) first, then falls back
to ``/proc/self/status`` ``RssAnon``. Returns ``(bytes, source_label)``
or ``None`` if neither is available.
"""
if self._cgroup_version == 2:
cgroup_anon = _read_cgroup_v2_anon_bytes()
if cgroup_anon is not None:
return cgroup_anon, "cgroup memory.stat anon"

proc_anon = _read_process_anon_rss_bytes()
if proc_anon is not None:
return proc_anon, "process RssAnon"

return None

def check_memory_usage(self) -> None:
"""Check memory usage and log when above 90%.
"""Check memory usage and raise at critical dual-condition.

Intended to be called on every message. The monitor internally tracks
a message counter and only reads cgroup files every ``check_interval``
messages (default 5000) to minimise I/O overhead.
messages (default 5000). Once usage crosses 95%, the interval tightens
to 100 messages for the remainder of the sync.

Logs a WARNING on every check above 90% to provide breadcrumb trails
showing memory trends over the sync lifetime.
Logging is event-based (one-shot on state transitions), not periodic.

This method is a no-op if cgroup files are unavailable.
"""
Expand All @@ -118,7 +221,10 @@ def check_memory_usage(self) -> None:
return

self._message_count += 1
if self._message_count % self._check_interval != 0:
interval = (
_HIGH_PRESSURE_CHECK_INTERVAL if self._high_pressure_mode else self._check_interval
)
if self._message_count % interval != 0:
return

memory_info = self._read_memory()
Expand All @@ -131,10 +237,43 @@ def check_memory_usage(self) -> None:
usage_gb = usage_bytes / (1024**3)
limit_gb = limit_bytes / (1024**3)

if usage_ratio >= _MEMORY_THRESHOLD:
logger.warning(
"Source memory usage at %d%% of container limit (%.2f / %.2f GB).",
usage_percent,
usage_gb,
limit_gb,
if usage_ratio >= _HIGH_PRESSURE_THRESHOLD and not self._high_pressure_mode:
self._high_pressure_mode = True
logger.info(
"Memory usage crossed %d%%; tightening check interval from %d to %d messages.",
int(_HIGH_PRESSURE_THRESHOLD * 100),
self._check_interval,
_HIGH_PRESSURE_CHECK_INTERVAL,
)

# Fail-fast: dual-condition check
if usage_ratio >= _CRITICAL_THRESHOLD:
anon_info = self._read_anon_bytes()
if anon_info is not None:
anon_bytes, anon_source = anon_info
anon_share = anon_bytes / usage_bytes
if anon_share >= _ANON_SHARE_OF_USAGE_THRESHOLD:
raise AirbyteTracedException(
message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).",
internal_message=(
f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). "
f"Anonymous memory ({anon_source}): {anon_bytes} bytes "
f"({int(anon_share * 100)}% of current cgroup usage). "
f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, "
f"anon share of usage >= {int(_ANON_SHARE_OF_USAGE_THRESHOLD * 100)}%."
),
failure_type=FailureType.system_error,
)
elif not self._critical_logged:
self._critical_logged = True
logger.info(
"Cgroup usage crossed %d%% but anonymous memory is only %d%% of current cgroup usage; not raising.",
int(_CRITICAL_THRESHOLD * 100),
int(anon_share * 100),
)
elif not self._critical_logged:
self._critical_logged = True
logger.warning(
"Cgroup usage crossed %d%% but anonymous memory signal unavailable; skipping fail-fast.",
int(_CRITICAL_THRESHOLD * 100),
)
69 changes: 69 additions & 0 deletions unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,3 +856,72 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json(
# There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here
record_messages = list(filter(lambda message: "RECORD" in message, messages))
assert len(record_messages) == 2


def test_memory_failfast_flushes_queued_state_before_raising(mocker):
"""Record emitted → check_memory_usage raises → queued STATE flushed with recordCount → exception propagates."""
queued_state = AirbyteMessage(
type=Type.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="users", namespace=None),
stream_state=AirbyteStateBlob({"cursor": "abc123"}),
),
),
)

message_repository = MagicMock()
message_repository.consume_queue.side_effect = [
[queued_state], # flush during fail-fast exception handling
[], # normal end-of-loop flush (not reached)
]
mocker.patch.object(
MockSource,
"message_repository",
new_callable=mocker.PropertyMock,
return_value=message_repository,
)

record = AirbyteMessage(
record=AirbyteRecordMessage(stream="users", data={"id": 1}, emitted_at=1),
type=Type.RECORD,
)
mocker.patch.object(MockSource, "read_state", return_value={})
mocker.patch.object(MockSource, "read_catalog", return_value={})
mocker.patch.object(MockSource, "read", return_value=[record])

fail_fast_exc = AirbyteTracedException(
message="Memory usage exceeded critical threshold (98%)",
failure_type=FailureType.system_error,
)

entrypoint_obj = AirbyteEntrypoint(MockSource())
mocker.patch.object(
entrypoint_obj._memory_monitor, "check_memory_usage", side_effect=fail_fast_exc
)

spec = ConnectorSpecification(connectionSpecification={})
config: dict[str, str] = {}

# Call read() directly to get AirbyteMessage objects (not serialised strings)
gen = entrypoint_obj.read(spec, config, {}, [])

# 1. First yielded message is the RECORD
first = next(gen)
assert first.type == Type.RECORD
assert first.record.stream == "users" # type: ignore[union-attr]

# 2. Second yielded message is the queued STATE (flushed before exception)
second = next(gen)
assert second.type == Type.STATE
assert second.state.stream.stream_state == AirbyteStateBlob({"cursor": "abc123"}) # type: ignore[union-attr]

# 3. The STATE passed through handle_record_counts, so sourceStats.recordCount == 1.0
assert second.state.sourceStats is not None # type: ignore[union-attr]
assert second.state.sourceStats.recordCount == 1.0 # type: ignore[union-attr]

# 4. Next iteration re-raises the AirbyteTracedException
with pytest.raises(AirbyteTracedException) as exc_info:
next(gen)
assert exc_info.value is fail_fast_exc
Loading
Loading