Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
13 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,17 @@ def read(
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
self._memory_monitor.check_memory_usage()
try:
self._memory_monitor.check_memory_usage()
except Exception:
# Flush queued messages (state checkpoints, logs) before propagating
# a memory fail-fast (or other) exception, so the platform receives
# the last committed state for the next sync.
for queued_message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(queued_message, stream_message_counter)
raise

# Flush queued messages after normal completion of the read loop.
for message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(message, stream_message_counter)

Expand Down
119 changes: 111 additions & 8 deletions airbyte_cdk/utils/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#

"""Source-side memory introspection to log memory usage approaching container limits."""
"""Source-side memory introspection with fail-fast shutdown on memory threshold."""

import logging
import os
from pathlib import Path
from typing import Optional

from airbyte_cdk.models import FailureType
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

logger = logging.getLogger("airbyte")

# cgroup v2 paths
Expand All @@ -18,28 +22,82 @@
_CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
_CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")

# Process-level anonymous RSS from /proc/self/status (Linux only, no extra dependency)
_PROC_SELF_STATUS = Path("/proc/self/status")

# Log when usage is at or above 90%
_MEMORY_THRESHOLD = 0.90
Comment thread
pnilan marked this conversation as resolved.
Outdated

# Raise AirbyteTracedException when BOTH conditions are met:
# 1. cgroup usage >= critical threshold
# 2. process anonymous RSS (RssAnon) >= anon threshold of the container limit
# This dual-condition avoids false positives from reclaimable kernel page cache
# and file-backed / shared resident pages that inflate VmRSS.
_CRITICAL_THRESHOLD = 0.98
_ANON_RSS_THRESHOLD = 0.80

# Check interval (every N messages)
_DEFAULT_CHECK_INTERVAL = 5000

# Environment variable to disable fail-fast (set to "false" to disable)
_ENV_FAIL_FAST = "AIRBYTE_MEMORY_FAIL_FAST"


def _read_process_anon_rss_bytes() -> Optional[int]:
"""Read process-private anonymous resident memory from /proc/self/status.

Parses the ``RssAnon`` field which represents private anonymous pages — the
closest proxy for Python-heap memory pressure. Unlike ``VmRSS`` (which is
``RssAnon + RssFile + RssShmem``), ``RssAnon`` is not inflated by mmap'd
file-backed or shared resident pages.

Returns anonymous RSS in bytes, or None if unavailable (non-Linux,
permission error, or ``RssAnon`` field not present in the kernel).
"""
try:
status_text = _PROC_SELF_STATUS.read_text()
for line in status_text.splitlines():
if line.startswith("RssAnon:"):
# Format: "RssAnon: 12345 kB"
parts = line.split()
if len(parts) >= 2:
return int(parts[1]) * 1024 # Convert kB to bytes
return None
except (OSError, ValueError):
return None


class MemoryMonitor:
"""Monitors container memory usage via cgroup files and logs warnings when usage is high.
"""Monitors container memory usage via cgroup files and raises on critical pressure.

Lazily probes cgroup v2 then v1 files on the first call to
``check_memory_usage()``. Caches which version exists.
If neither is found (local dev / CI), all subsequent calls are instant no-ops.

Logs a WARNING on every check interval (default 5000 messages) when memory
usage is at or above 90% of the container limit. This gives breadcrumb
trails showing whether memory is climbing, plateauing, or sawtoothing.
**Logging (always active):** Logs a WARNING on every check interval (default
5000 messages) when cgroup memory usage is at or above 90% of the container
limit.

**Fail-fast (controlled by ``AIRBYTE_MEMORY_FAIL_FAST`` env var, default
enabled):** Raises ``AirbyteTracedException`` with
``FailureType.system_error`` when *both*:

1. Cgroup usage >= 98% of the container limit (container is near OOM-kill)
2. Process anonymous RSS (``RssAnon``) >= 80% of the container limit
(pressure is from process-private anonymous memory, not elastic kernel
page cache or file-backed resident pages)

This dual-condition avoids false positives from SQLite mmap'd pages, shared
memory, or other kernel-reclaimable memory that inflates cgroup usage but
does not represent real process memory pressure. If ``RssAnon`` is not
available, the monitor logs a warning and skips fail-fast rather than
falling back to cgroup-only raising.
"""

def __init__(
self,
check_interval: int = _DEFAULT_CHECK_INTERVAL,
fail_fast: Optional[bool] = None,
) -> None:
if check_interval < 1:
raise ValueError(f"check_interval must be >= 1, got {check_interval}")
Expand All @@ -48,6 +106,13 @@ def __init__(
self._cgroup_version: Optional[int] = None
self._probed = False

# Resolve fail-fast setting: explicit arg > env var > default (True)
if fail_fast is not None:
self._fail_fast = fail_fast
else:
env_val = os.environ.get(_ENV_FAIL_FAST, "true").strip().lower()
self._fail_fast = env_val != "false"

def _probe_cgroup(self) -> None:
"""Detect which cgroup version (if any) is available.

Expand Down Expand Up @@ -102,14 +167,19 @@ def _read_memory(self) -> Optional[tuple[int, int]]:
return None

def check_memory_usage(self) -> None:
"""Check memory usage and log when above 90%.
"""Check memory usage; log at 90% and raise at critical dual-condition.

Intended to be called on every message. The monitor internally tracks
a message counter and only reads cgroup files every ``check_interval``
messages (default 5000) to minimise I/O overhead.

Logs a WARNING on every check above 90% to provide breadcrumb trails
showing memory trends over the sync lifetime.
**Logging:** WARNING on every check above 90%.

**Fail-fast (when enabled):** If cgroup usage >= 98% *and* process
anonymous RSS (``RssAnon``) >= 80% of the container limit, raises
``AirbyteTracedException`` with ``FailureType.system_error`` so the
platform receives a clear error message instead of an opaque OOM-kill.
If ``RssAnon`` is unavailable, logs a warning and skips fail-fast.

This method is a no-op if cgroup files are unavailable.
"""
Expand Down Expand Up @@ -138,3 +208,36 @@ def check_memory_usage(self) -> None:
usage_gb,
limit_gb,
)

# Fail-fast: dual-condition check
if self._fail_fast and usage_ratio >= _CRITICAL_THRESHOLD:
anon_rss_bytes = _read_process_anon_rss_bytes()
if anon_rss_bytes is not None:
anon_ratio = anon_rss_bytes / limit_bytes
anon_percent = int(anon_ratio * 100)
if anon_ratio >= _ANON_RSS_THRESHOLD:
raise AirbyteTracedException(
message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).",
internal_message=(
f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). "
f"Process anonymous RSS (RssAnon): {anon_rss_bytes} bytes ({anon_percent}% of limit). "
f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, "
f"anonymous RSS >= {int(_ANON_RSS_THRESHOLD * 100)}%."
),
failure_type=FailureType.system_error,
)
else:
logger.info(
"Cgroup usage at %d%% but process anonymous RSS only %d%% of limit; "
"pressure likely from file-backed or reclaimable pages — not raising.",
usage_percent,
anon_percent,
)
else:
# RssAnon unavailable — log and skip rather than cgroup-only raising,
# so the implementation stays truly dual-condition.
logger.warning(
"Cgroup usage at %d%% but RssAnon unavailable from /proc/self/status; "
"skipping fail-fast (cannot confirm anonymous memory pressure).",
usage_percent,
)
Loading
Loading