Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 54 additions & 16 deletions airbyte_cdk/utils/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,42 @@
# Process-level anonymous RSS from /proc/self/status (Linux only, no extra dependency)
_PROC_SELF_STATUS = Path("/proc/self/status")


def _format_bytes(num_bytes: int) -> str:
"""Render a byte count as a short human-readable string with 2 decimals.

Uses decimal units (GB = 10^9, MB = 10^6, KB = 10^3) so that raw cgroup
byte values render close to the way operators describe container limits
(e.g. a 2_147_483_648-byte limit renders as ``2.15 GB`` rather than the
binary ``2.00 GiB``). Values below 1 KB are rendered as plain bytes.
"""
if num_bytes >= 1_000_000_000:
return f"{num_bytes / 1_000_000_000:.2f} GB"
if num_bytes >= 1_000_000:
return f"{num_bytes / 1_000_000:.2f} MB"
if num_bytes >= 1_000:
return f"{num_bytes / 1_000:.2f} KB"
return f"{num_bytes} B"


# Raise AirbyteTracedException when BOTH conditions are met:
# 1. cgroup usage >= critical threshold
# 2. anonymous memory >= anon-share threshold of *current cgroup usage*
# Comparing anon to usage (not limit) answers the more relevant question:
# "is most of the near-OOM memory actually process-owned anonymous memory?"
_CRITICAL_THRESHOLD = 0.98
#
# Thresholds are deliberately set below the OOM cliff to leave headroom for
# the check-interval race window: between two checks, allocations can jump
# a container past any gate directly into kernel OOM-kill. Firing the fail-
# fast trace well before the cliff is what makes the failure visible to the
# platform instead of appearing as a silent exit.
_CRITICAL_THRESHOLD = 0.95
_ANON_SHARE_OF_USAGE_THRESHOLD = 0.85

# Check interval (every N messages) — tightens after crossing high-pressure threshold
_DEFAULT_CHECK_INTERVAL = 5000
_HIGH_PRESSURE_CHECK_INTERVAL = 100
_HIGH_PRESSURE_THRESHOLD = 0.95
_HIGH_PRESSURE_THRESHOLD = 0.90


def _read_cgroup_v2_anon_bytes() -> Optional[int]:
Expand Down Expand Up @@ -90,21 +114,21 @@ class MemoryMonitor:

**Logging (event-based, not periodic):**

- One INFO when high-pressure mode activates (usage first crosses 95%)
- One INFO/WARNING when critical threshold (98%) is crossed but we do
- One INFO when high-pressure mode activates (usage first crosses 90%)
- One INFO/WARNING when critical threshold (95%) is crossed but we do
*not* raise (either anon share is below the fail-fast gate or the
anonymous memory signal is unavailable)
- No repeated per-check warnings — logging is driven by state
transitions, not periodic sampling

**High-pressure polling:** Once cgroup usage first crosses 95%, the check
interval permanently tightens from 5000 to 100 messages to narrow the race
window near OOM.
**High-pressure polling:** Once cgroup usage first crosses 90%, the check
interval permanently tightens from the configured ``check_interval``
(default 5000) to 100 messages to narrow the race window near OOM.

**Fail-fast:** Raises ``AirbyteTracedException`` with
``FailureType.system_error`` when *both*:

1. Cgroup usage >= 98% of the container limit (container is near OOM-kill)
1. Cgroup usage >= 95% of the container limit (container is near OOM-kill)
2. Anonymous memory >= 85% of *current cgroup usage* (most of the charged
memory is process-private anonymous pages, not file-backed cache)

Expand Down Expand Up @@ -132,6 +156,16 @@ def __init__(
self._probed = False
self._high_pressure_mode = False
self._critical_logged = False
logger.info(
"MemoryMonitor instantiated with critical threshold: %d%%, "
"anon share of usage threshold: %d%%, high-pressure threshold: %d%%, "
"check interval: %d messages (tightens to %d under high pressure).",
int(_CRITICAL_THRESHOLD * 100),
int(_ANON_SHARE_OF_USAGE_THRESHOLD * 100),
int(_HIGH_PRESSURE_THRESHOLD * 100),
self._check_interval,
_HIGH_PRESSURE_CHECK_INTERVAL,
)

def _probe_cgroup(self) -> None:
"""Detect which cgroup version (if any) is available.
Expand Down Expand Up @@ -209,8 +243,9 @@ def check_memory_usage(self) -> None:

Intended to be called on every message. The monitor internally tracks
a message counter and only reads cgroup files every ``check_interval``
messages (default 5000). Once usage crosses 95%, the interval tightens
to 100 messages for the remainder of the sync.
messages (default 5000). Once usage crosses 90%, the interval tightens
to 100 messages for the remainder of the sync regardless of the
configured ``check_interval``.

Logging is event-based (one-shot on state transitions), not periodic.

Expand All @@ -234,8 +269,6 @@ def check_memory_usage(self) -> None:
usage_bytes, limit_bytes = memory_info
usage_ratio = usage_bytes / limit_bytes
usage_percent = int(usage_ratio * 100)
usage_gb = usage_bytes / (1024**3)
limit_gb = limit_bytes / (1024**3)

if usage_ratio >= _HIGH_PRESSURE_THRESHOLD and not self._high_pressure_mode:
self._high_pressure_mode = True
Expand All @@ -256,8 +289,9 @@ def check_memory_usage(self) -> None:
raise AirbyteTracedException(
message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).",
internal_message=(
f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). "
f"Anonymous memory ({anon_source}): {anon_bytes} bytes "
f"Cgroup memory: {_format_bytes(usage_bytes)} / "
f"{_format_bytes(limit_bytes)} ({usage_percent}%). "
f"Anonymous memory ({anon_source}): {_format_bytes(anon_bytes)} "
f"({int(anon_share * 100)}% of current cgroup usage). "
f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, "
f"anon share of usage >= {int(_ANON_SHARE_OF_USAGE_THRESHOLD * 100)}%."
Expand All @@ -267,13 +301,17 @@ def check_memory_usage(self) -> None:
elif not self._critical_logged:
self._critical_logged = True
logger.info(
"Cgroup usage crossed %d%% but anonymous memory is only %d%% of current cgroup usage; not raising.",
"Cgroup usage crossed %d%% (%s of %s) but anonymous memory is only %d%% of current cgroup usage; not raising.",
int(_CRITICAL_THRESHOLD * 100),
_format_bytes(usage_bytes),
_format_bytes(limit_bytes),
int(anon_share * 100),
)
elif not self._critical_logged:
self._critical_logged = True
logger.warning(
"Cgroup usage crossed %d%% but anonymous memory signal unavailable; skipping fail-fast.",
"Cgroup usage crossed %d%% (%s of %s) but anonymous memory signal unavailable; skipping fail-fast.",
int(_CRITICAL_THRESHOLD * 100),
_format_bytes(usage_bytes),
_format_bytes(limit_bytes),
)
Loading
Loading