Skip to content

Commit 7466576

Browse files
Merge branch 'devin/1773159423-fix-start-date-format-parsing' of https://git-manager.devin.ai/proxy/github.com/airbytehq/airbyte-python-cdk into devin/1773159423-fix-start-date-format-parsing
2 parents 15e17a5 + 228096d commit 7466576

3 files changed

Lines changed: 390 additions & 0 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from airbyte_cdk.utils import is_cloud_environment, message_utils
4242
from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets
4343
from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH
44+
from airbyte_cdk.utils.memory_monitor import MemoryMonitor
4445
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
4546

4647
logger = init_logger("airbyte")
@@ -60,6 +61,7 @@ def __init__(self, source: Source):
6061

6162
self.source = source
6263
self.logger = logging.getLogger(f"airbyte.{getattr(source, 'name', '')}")
64+
self._memory_monitor = MemoryMonitor()
6365

6466
@staticmethod
6567
def parse_args(args: List[str]) -> argparse.Namespace:
@@ -279,6 +281,7 @@ def read(
279281
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
280282
for message in self.source.read(self.logger, config, catalog, state):
281283
yield self.handle_record_counts(message, stream_message_counter)
284+
self._memory_monitor.check_memory_usage()
282285
for message in self._emit_queued_messages(self.source):
283286
yield self.handle_record_counts(message, stream_message_counter)
284287

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#
2+
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
"""Source-side memory introspection to log memory usage approaching container limits."""
6+
7+
import logging
8+
from pathlib import Path
9+
from typing import Optional
10+
11+
logger = logging.getLogger("airbyte")
12+
13+
# cgroup v2 paths
14+
_CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current")
15+
_CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max")
16+
17+
# cgroup v1 paths — TODO: remove if all deployments are confirmed cgroup v2
18+
_CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
19+
_CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")
20+
21+
# Log when usage is at or above 90%
22+
_MEMORY_THRESHOLD = 0.90
23+
24+
# Check interval (every N messages)
25+
_DEFAULT_CHECK_INTERVAL = 5000
26+
27+
28+
class MemoryMonitor:
29+
"""Monitors container memory usage via cgroup files and logs warnings when usage is high.
30+
31+
Lazily probes cgroup v2 then v1 files on the first call to
32+
``check_memory_usage()``. Caches which version exists.
33+
If neither is found (local dev / CI), all subsequent calls are instant no-ops.
34+
35+
Logs a WARNING on every check interval (default 5000 messages) when memory
36+
usage is at or above 90% of the container limit. This gives breadcrumb
37+
trails showing whether memory is climbing, plateauing, or sawtoothing.
38+
"""
39+
40+
def __init__(
41+
self,
42+
check_interval: int = _DEFAULT_CHECK_INTERVAL,
43+
) -> None:
44+
if check_interval < 1:
45+
raise ValueError(f"check_interval must be >= 1, got {check_interval}")
46+
self._check_interval = check_interval
47+
self._message_count = 0
48+
self._cgroup_version: Optional[int] = None
49+
self._probed = False
50+
51+
def _probe_cgroup(self) -> None:
52+
"""Detect which cgroup version (if any) is available.
53+
54+
Called lazily on the first ``check_memory_usage()`` invocation so
55+
that ``spec`` and ``discover`` commands never incur filesystem I/O.
56+
"""
57+
if self._probed:
58+
return
59+
self._probed = True
60+
61+
if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists():
62+
self._cgroup_version = 2
63+
elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists():
64+
self._cgroup_version = 1
65+
66+
if self._cgroup_version is None:
67+
logger.debug(
68+
"No cgroup memory files found. Memory monitoring disabled (likely local dev / CI)."
69+
)
70+
71+
def _read_memory(self) -> Optional[tuple[int, int]]:
72+
"""Read current memory usage and limit from cgroup files.
73+
74+
Returns a tuple of (usage_bytes, limit_bytes) or None if unavailable.
75+
Best-effort: failures to read memory info never crash a sync.
76+
"""
77+
if self._cgroup_version is None:
78+
return None
79+
80+
try:
81+
if self._cgroup_version == 2:
82+
usage_path = _CGROUP_V2_CURRENT
83+
limit_path = _CGROUP_V2_MAX
84+
else:
85+
usage_path = _CGROUP_V1_USAGE
86+
limit_path = _CGROUP_V1_LIMIT
87+
88+
limit_text = limit_path.read_text().strip()
89+
# cgroup v2 memory.max can be the literal string "max" (unlimited)
90+
if limit_text == "max":
91+
return None
92+
93+
usage_bytes = int(usage_path.read_text().strip())
94+
limit_bytes = int(limit_text)
95+
96+
if limit_bytes <= 0:
97+
return None
98+
99+
return usage_bytes, limit_bytes
100+
except (OSError, ValueError):
101+
logger.debug("Failed to read cgroup memory files; skipping memory check.")
102+
return None
103+
104+
def check_memory_usage(self) -> None:
105+
"""Check memory usage and log when above 90%.
106+
107+
Intended to be called on every message. The monitor internally tracks
108+
a message counter and only reads cgroup files every ``check_interval``
109+
messages (default 5000) to minimise I/O overhead.
110+
111+
Logs a WARNING on every check above 90% to provide breadcrumb trails
112+
showing memory trends over the sync lifetime.
113+
114+
This method is a no-op if cgroup files are unavailable.
115+
"""
116+
self._probe_cgroup()
117+
if self._cgroup_version is None:
118+
return
119+
120+
self._message_count += 1
121+
if self._message_count % self._check_interval != 0:
122+
return
123+
124+
memory_info = self._read_memory()
125+
if memory_info is None:
126+
return
127+
128+
usage_bytes, limit_bytes = memory_info
129+
usage_ratio = usage_bytes / limit_bytes
130+
usage_percent = int(usage_ratio * 100)
131+
usage_gb = usage_bytes / (1024**3)
132+
limit_gb = limit_bytes / (1024**3)
133+
134+
if usage_ratio >= _MEMORY_THRESHOLD:
135+
logger.warning(
136+
"Source memory usage at %d%% of container limit (%.2f / %.2f GB).",
137+
usage_percent,
138+
usage_gb,
139+
limit_gb,
140+
)

0 commit comments

Comments
 (0)