-
Notifications
You must be signed in to change notification settings - Fork 44
feat(cdk): add source-side memory monitoring (logging-only trial) #941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Patrick Nilan (pnilan)
merged 16 commits into
main
from
devin/1773091366-add-memory-monitor
Mar 10, 2026
Merged
Changes from 13 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
caa0c82
feat(cdk): add source-side memory introspection to emit controlled er…
devin-ai-integration[bot] 8d059cf
style: fix ruff format and import sorting
devin-ai-integration[bot] b2e233f
fix(cdk): add error handling to _read_memory() for graceful degradation
devin-ai-integration[bot] ee93565
fix(cdk): wrap read() loop in try/finally to flush queued messages on…
devin-ai-integration[bot] cca6c50
refactor(cdk): encapsulate check interval inside MemoryMonitor
devin-ai-integration[bot] ec56943
refactor(cdk): move MemoryMonitor to AirbyteEntrypoint.__init__
devin-ai-integration[bot] 7f7ab40
test(cdk): add comprehensive memory monitor tests and graceful shutdo…
devin-ai-integration[bot] a021eb7
fix(cdk): change MemoryLimitExceeded to system_error and update user-…
devin-ai-integration[bot] fc71aba
refactor(cdk): make DEFAULT_CHECK_INTERVAL private and drop circular …
devin-ai-integration[bot] 5f7e16d
refactor(cdk): move memory check before yield, test observable behavi…
devin-ai-integration[bot] 0485fa2
refactor(cdk): move memory check back to after yield for zero data loss
devin-ai-integration[bot] 1621a39
style: fix ruff format in test_entrypoint.py
devin-ai-integration[bot] 4dda57c
refactor(cdk): display memory usage in GB instead of raw bytes
devin-ai-integration[bot] c96825f
refactor(cdk): remove MemoryLimitExceeded subclass, raise AirbyteTrac…
devin-ai-integration[bot] cdc518c
fix(cdk): validate check_interval >= 1 to prevent ZeroDivisionError
devin-ai-integration[bot] bf9db5f
refactor(cdk): switch memory monitor to logging-only trial mode
devin-ai-integration[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,162 @@ | ||
| # | ||
| # Copyright (c) 2026 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| """Source-side memory introspection to emit controlled error messages before OOM kills.""" | ||
|
|
||
| import logging | ||
| from pathlib import Path | ||
| from typing import Optional | ||
|
|
||
| from airbyte_cdk.models import FailureType | ||
| from airbyte_cdk.utils.traced_exception import AirbyteTracedException | ||
|
|
||
| logger = logging.getLogger("airbyte") | ||
|
|
||
| # cgroup v2 paths | ||
| _CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current") | ||
| _CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max") | ||
|
|
||
| # cgroup v1 paths | ||
|
pnilan marked this conversation as resolved.
Outdated
|
||
| _CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes") | ||
| _CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes") | ||
|
|
||
| # Default thresholds | ||
| _DEFAULT_WARNING_THRESHOLD = 0.85 | ||
| _DEFAULT_CRITICAL_THRESHOLD = 0.95 | ||
|
|
||
| # Check interval (every N messages) | ||
| _DEFAULT_CHECK_INTERVAL = 1000 | ||
|
|
||
|
|
||
| class MemoryLimitExceeded(AirbyteTracedException): | ||
| """Raised when connector memory usage exceeds critical threshold.""" | ||
|
|
||
| pass | ||
|
|
||
|
|
||
| class MemoryMonitor: | ||
| """Monitors container memory usage via cgroup files and emits warnings before OOM kills. | ||
|
|
||
| Lazily probes cgroup v2 then v1 files on the first call to | ||
| ``check_memory_usage()``. Caches which version exists. | ||
| If neither is found (local dev / CI), all subsequent calls are instant no-ops. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| warning_threshold: float = _DEFAULT_WARNING_THRESHOLD, | ||
| critical_threshold: float = _DEFAULT_CRITICAL_THRESHOLD, | ||
| check_interval: int = _DEFAULT_CHECK_INTERVAL, | ||
| ) -> None: | ||
| self._warning_threshold = warning_threshold | ||
| self._critical_threshold = critical_threshold | ||
| self._check_interval = check_interval | ||
| self._message_count = 0 | ||
| self._warning_emitted = False | ||
| self._critical_raised = False | ||
| self._cgroup_version: Optional[int] = None | ||
| self._probed = False | ||
|
|
||
| def _probe_cgroup(self) -> None: | ||
| """Detect which cgroup version (if any) is available. | ||
|
|
||
| Called lazily on the first ``check_memory_usage()`` invocation so | ||
| that ``spec`` and ``discover`` commands never incur filesystem I/O. | ||
| """ | ||
| if self._probed: | ||
| return | ||
| self._probed = True | ||
|
|
||
| if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists(): | ||
| self._cgroup_version = 2 | ||
| elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists(): | ||
| self._cgroup_version = 1 | ||
|
|
||
| if self._cgroup_version is None: | ||
| logger.debug( | ||
| "No cgroup memory files found. Memory monitoring disabled (likely local dev / CI)." | ||
| ) | ||
|
|
||
| def _read_memory(self) -> Optional[tuple[int, int]]: | ||
| """Read current memory usage and limit from cgroup files. | ||
|
|
||
| Returns a tuple of (usage_bytes, limit_bytes) or None if unavailable. | ||
| Best-effort: failures to read memory info never crash a sync. | ||
| """ | ||
| if self._cgroup_version is None: | ||
| return None | ||
|
|
||
| try: | ||
| if self._cgroup_version == 2: | ||
| usage_path = _CGROUP_V2_CURRENT | ||
| limit_path = _CGROUP_V2_MAX | ||
| else: | ||
| usage_path = _CGROUP_V1_USAGE | ||
| limit_path = _CGROUP_V1_LIMIT | ||
|
|
||
| limit_text = limit_path.read_text().strip() | ||
| # cgroup v2 memory.max can be the literal string "max" (unlimited) | ||
| if limit_text == "max": | ||
| return None | ||
|
|
||
| usage_bytes = int(usage_path.read_text().strip()) | ||
| limit_bytes = int(limit_text) | ||
|
|
||
| if limit_bytes <= 0: | ||
| return None | ||
|
|
||
| return usage_bytes, limit_bytes | ||
| except (OSError, ValueError): | ||
| logger.debug("Failed to read cgroup memory files; skipping memory check.") | ||
| return None | ||
|
|
||
| def check_memory_usage(self) -> None: | ||
| """Check memory usage against thresholds. | ||
|
|
||
| Intended to be called on every message. The monitor internally tracks | ||
| a message counter and only reads cgroup files every ``check_interval`` | ||
| messages (default 1000) to minimise I/O overhead. | ||
|
|
||
| At the warning threshold (default 85%), logs a warning message. | ||
| At the critical threshold (default 95%), raises MemoryLimitExceeded to | ||
| trigger a graceful shutdown with an actionable error message. | ||
|
|
||
| Each threshold triggers at most once per sync to avoid log spam. | ||
| This method is a no-op if cgroup files are unavailable. | ||
| """ | ||
| self._probe_cgroup() | ||
| if self._cgroup_version is None: | ||
| return | ||
|
|
||
| self._message_count += 1 | ||
| if self._message_count % self._check_interval != 0: | ||
| return | ||
|
pnilan marked this conversation as resolved.
|
||
|
|
||
| memory_info = self._read_memory() | ||
| if memory_info is None: | ||
| return | ||
|
|
||
| usage_bytes, limit_bytes = memory_info | ||
| usage_ratio = usage_bytes / limit_bytes | ||
| usage_percent = int(usage_ratio * 100) | ||
| usage_gb = usage_bytes / (1024**3) | ||
| limit_gb = limit_bytes / (1024**3) | ||
|
|
||
| if usage_ratio >= self._critical_threshold and not self._critical_raised: | ||
| self._critical_raised = True | ||
| raise MemoryLimitExceeded( | ||
| internal_message=f"Memory usage is {usage_percent}% ({usage_gb:.2f} / {limit_gb:.2f} GB). " | ||
| f"Critical threshold is {int(self._critical_threshold * 100)}%.", | ||
| message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.", | ||
| failure_type=FailureType.system_error, | ||
| ) | ||
|
|
||
| if usage_ratio >= self._warning_threshold and not self._warning_emitted: | ||
| self._warning_emitted = True | ||
| logger.warning( | ||
| "Source memory usage reached %d%% of container limit (%.2f / %.2f GB).", | ||
| usage_percent, | ||
| usage_gb, | ||
| limit_gb, | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.