Skip to content

Commit caa0c82

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
feat(cdk): add source-side memory introspection to emit controlled error messages before OOM kills
Add MemoryMonitor class that reads cgroup v2/v1 memory files to detect memory pressure in containerized environments. Integrates into the AirbyteEntrypoint.read() loop to check memory every 1000 messages. - At 85% usage: logs a warning message - At 95% usage: raises MemoryLimitExceeded (AirbyteTracedException with transient_error failure type) for graceful shutdown - No-op when cgroup files unavailable (local dev / CI) - No new dependencies required Related to airbytehq/airbyte-internal-issues#15938 Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent f550424 commit caa0c82

3 files changed

Lines changed: 440 additions & 0 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from airbyte_cdk.utils import is_cloud_environment, message_utils
4242
from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets
4343
from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH
44+
from airbyte_cdk.utils.memory_monitor import DEFAULT_CHECK_INTERVAL, MemoryMonitor
4445
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
4546

4647
logger = init_logger("airbyte")
@@ -277,8 +278,13 @@ def read(
277278

278279
# The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows
279280
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
281+
memory_monitor = MemoryMonitor()
282+
message_count = 0
280283
for message in self.source.read(self.logger, config, catalog, state):
281284
yield self.handle_record_counts(message, stream_message_counter)
285+
message_count += 1
286+
if message_count % DEFAULT_CHECK_INTERVAL == 0:
287+
memory_monitor.check_memory_usage()
282288
for message in self._emit_queued_messages(self.source):
283289
yield self.handle_record_counts(message, stream_message_counter)
284290

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#
2+
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
"""Source-side memory introspection to emit controlled error messages before OOM kills."""
6+
7+
import logging
8+
from pathlib import Path
9+
from typing import Optional
10+
11+
from airbyte_cdk.models import FailureType
12+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
13+
14+
logger = logging.getLogger("airbyte")
15+
16+
# cgroup v2 paths
17+
_CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current")
18+
_CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max")
19+
20+
# cgroup v1 paths
21+
_CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
22+
_CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")
23+
24+
# Default thresholds
25+
_DEFAULT_WARNING_THRESHOLD = 0.85
26+
_DEFAULT_CRITICAL_THRESHOLD = 0.95
27+
28+
# Check interval (every N messages)
29+
DEFAULT_CHECK_INTERVAL = 1000
30+
31+
32+
class MemoryLimitExceeded(AirbyteTracedException):
33+
"""Raised when connector memory usage exceeds critical threshold."""
34+
35+
pass
36+
37+
38+
class MemoryMonitor:
39+
"""Monitors container memory usage via cgroup files and emits warnings before OOM kills.
40+
41+
On init, probes cgroup v2 then v1 files. Caches which version exists.
42+
If neither is found (local dev / CI), all subsequent calls are instant no-ops.
43+
"""
44+
45+
def __init__(
46+
self,
47+
warning_threshold: float = _DEFAULT_WARNING_THRESHOLD,
48+
critical_threshold: float = _DEFAULT_CRITICAL_THRESHOLD,
49+
) -> None:
50+
self._warning_threshold = warning_threshold
51+
self._critical_threshold = critical_threshold
52+
self._warning_emitted = False
53+
self._critical_raised = False
54+
self._cgroup_version: Optional[int] = None
55+
56+
# Probe cgroup version on init
57+
if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists():
58+
self._cgroup_version = 2
59+
elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists():
60+
self._cgroup_version = 1
61+
62+
if self._cgroup_version is None:
63+
logger.debug("No cgroup memory files found. Memory monitoring disabled (likely local dev / CI).")
64+
65+
def _read_memory(self) -> Optional[tuple[int, int]]:
66+
"""Read current memory usage and limit from cgroup files.
67+
68+
Returns a tuple of (usage_bytes, limit_bytes) or None if unavailable.
69+
"""
70+
if self._cgroup_version is None:
71+
return None
72+
73+
if self._cgroup_version == 2:
74+
usage_path = _CGROUP_V2_CURRENT
75+
limit_path = _CGROUP_V2_MAX
76+
else:
77+
usage_path = _CGROUP_V1_USAGE
78+
limit_path = _CGROUP_V1_LIMIT
79+
80+
usage_text = usage_path.read_text().strip()
81+
limit_text = limit_path.read_text().strip()
82+
83+
# cgroup v2 memory.max can be the literal string "max" (unlimited)
84+
if limit_text == "max":
85+
return None
86+
87+
usage_bytes = int(usage_text)
88+
limit_bytes = int(limit_text)
89+
90+
if limit_bytes <= 0:
91+
return None
92+
93+
return usage_bytes, limit_bytes
94+
95+
def check_memory_usage(self) -> None:
96+
"""Check memory usage against thresholds.
97+
98+
At the warning threshold (default 85%), logs a warning message.
99+
At the critical threshold (default 95%), raises MemoryLimitExceeded to
100+
trigger a graceful shutdown with an actionable error message.
101+
102+
Each threshold triggers at most once per sync to avoid log spam.
103+
This method is a no-op if cgroup files are unavailable.
104+
"""
105+
if self._cgroup_version is None:
106+
return
107+
108+
memory_info = self._read_memory()
109+
if memory_info is None:
110+
return
111+
112+
usage_bytes, limit_bytes = memory_info
113+
usage_ratio = usage_bytes / limit_bytes
114+
usage_percent = int(usage_ratio * 100)
115+
116+
if usage_ratio >= self._critical_threshold and not self._critical_raised:
117+
self._critical_raised = True
118+
raise MemoryLimitExceeded(
119+
internal_message=f"Memory usage is {usage_percent}% ({usage_bytes} / {limit_bytes} bytes). "
120+
f"Critical threshold is {int(self._critical_threshold * 100)}%.",
121+
message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down. "
122+
f"Reduce the number of streams or increase memory allocation.",
123+
failure_type=FailureType.transient_error,
124+
)
125+
126+
if usage_ratio >= self._warning_threshold and not self._warning_emitted:
127+
self._warning_emitted = True
128+
logger.warning(
129+
"Source memory usage reached %d%% of container limit (%d / %d bytes).",
130+
usage_percent,
131+
usage_bytes,
132+
limit_bytes,
133+
)

0 commit comments

Comments
 (0)