feat(cdk): enable fail-fast shutdown on memory threshold with dual-condition check#962
Conversation
…ndition check Add critical memory threshold (95% cgroup + 80% process RSS) that raises AirbyteTracedException with FailureType.system_error when both conditions are met. This gives connectors a clean error message instead of an opaque OOM kill. Key design decisions: - Dual-condition: only raises when BOTH cgroup >= 95% AND process RSS >= 80% of container limit. This avoids false positives from reclaimable kernel page cache (e.g., SQLite file cache). - Process RSS read from /proc/self/status (no new dependencies). - Kill switch: AIRBYTE_MEMORY_FAIL_FAST env var (default: true, set to 'false' to disable). - Failure-path contract: entrypoint read loop wrapped in try/finally to flush queued state messages before exception propagates. Resolves airbytehq/airbyte-internal-issues#15982 Co-Authored-By: bot_apk <apk@cognition.ai>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1774478445-memory-failfast#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1774478445-memory-failfastPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
PyTest Results (Fast)3 987 tests +12 3 976 ✅ +12 7m 44s ⏱️ +12s Results for commit b856e7c. ± Comparison against base commit 69cd63d. This pull request removes 4 and adds 16 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
PyTest Results (Full)3 990 tests +12 3 978 ✅ +12 11m 12s ⏱️ -8s Results for commit b856e7c. ± Comparison against base commit 69cd63d. This pull request removes 4 and adds 16 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
|
I think the second signal should be Why:
Concrete changes I'd suggest:
That would make the implementation line up more closely with the design described in the PR. |
…8%, log+skip on missing RssAnon Address review feedback: - Parse RssAnon from /proc/self/status instead of VmRSS to avoid inflation from file-backed/shared resident pages - Raise _CRITICAL_THRESHOLD from 95% to 98% for more conservative fail-fast behavior - When RssAnon is unavailable, log a warning and skip fail-fast instead of falling back to cgroup-only raising (stays truly dual-condition) - Add test proving metric choice matters (VmRSS high but RssAnon low) - Update all docstrings/comments to say 'anonymous resident memory' Co-Authored-By: bot_apk <apk@cognition.ai>
|
Great points — all addressed in 748212f:
|
There was a problem hiding this comment.
Pull request overview
Adds a dual-signal “fail-fast” memory shutdown path to the CDK so connectors can fail with a clear AirbyteTracedException(FailureType.system_error) under near-OOM conditions, instead of being OOM-killed.
Changes:
- Extend
MemoryMonitorto optionally raise when cgroup usage is critical and processRssAnonis high (with env-var + constructor gating). - Add unit tests for
RssAnonparsing, dual-condition behavior, and feature-flag behavior. - Wrap the
read()loop to attempt flushing queued messages even when a fail-fast exception interrupts a sync.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
airbyte_cdk/utils/memory_monitor.py |
Implements dual-condition fail-fast using cgroup + /proc/self/status RssAnon, plus the AIRBYTE_MEMORY_FAIL_FAST switch. |
unit_tests/utils/test_memory_monitor.py |
Adds coverage for RssAnon parsing and the new fail-fast decision logic + toggles. |
airbyte_cdk/entrypoint.py |
Adds a try/finally around the read loop to flush queued messages on interruption. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address Copilot review feedback: - Restructure entrypoint.py to catch the fail-fast exception around check_memory_usage(), flush queued messages, then re-raise — avoids yielding inside a finally block which can trigger RuntimeError on GeneratorExit. - Use clear=True in patch.dict for test_fail_fast_enabled_by_default to ensure AIRBYTE_MEMORY_FAIL_FAST is truly unset in test env. Co-Authored-By: bot_apk <apk@cognition.ai>
Per review feedback, increase _MEMORY_THRESHOLD from 0.90 to 0.95 to reduce noise from the warning log. Update all related docstrings, comments, and test assertions accordingly. Co-Authored-By: bot_apk <apk@cognition.ai>
|
Thanks for making the One follow-up: I would set Now that the logic is intentionally very conservative on the cgroup side ( If the design goal is "only fail fast when there is very strong evidence of imminent process-driven OOM",
Absent fleet data, I would rather start at |
Per review feedback, increase _ANON_RSS_THRESHOLD from 0.80 to 0.90 to align with the conservative 98% cgroup threshold. Start stricter and relax later once fleet data is available. Co-Authored-By: bot_apk <apk@cognition.ai>
|
Patrick Nilan (@pnilan) Agreed — Current thresholds summary:
|
Co-Authored-By: bot_apk <apk@cognition.ai>
|
/prerelease
|
|
I think the exception scope in At try:
self._memory_monitor.check_memory_usage()
except Exception:
for queued_message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(queued_message, stream_message_counter)
raiseThe intended exception from the new fail-fast path is Why this matters:
I think the safer and more self-documenting version is: try:
self._memory_monitor.check_memory_usage()
except AirbyteTracedException:
# Flush queued messages (state checkpoints, logs) before propagating
# the memory fail-fast exception, so the platform receives the last
# committed state for the next sync.
for queued_message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(queued_message, stream_message_counter)
raiseThat keeps the new flush-and-reraise behavior tightly scoped to the intentional fail-fast mechanism introduced in this PR. If the broader behavior is intentional, I’d suggest documenting that explicitly in a comment, because I’d also recommend a targeted test for this path:
That would lock in the exact behavior this PR is trying to guarantee. |
…-before-raise test Co-Authored-By: bot_apk <apk@cognition.ai>
|
Good call — narrowed the catch to Also added
|
|
I think this change also needs a targeted regression test for the new exception path in Right now, the PR adds special behavior at
That behavior is important because it is the mechanism that preserves the latest queued state checkpoint before the sync fails fast. I do not see a test in this PR that locks that contract in. The existing read tests in
But they do not cover the new case where:
I think the regression test should prove all of the following in one flow:
Suggested shape:
That would protect the exact behavior this PR is introducing and would catch future regressions in the flush-before-reraise path. |
|
I also think the Right now the implementation presents fail-fast as runtime-configurable:
If the production environment cannot set that env var, then this is not a real operational control. In practice that means:
In that case, I think the cleaner implementation is to remove the env-var path entirely and make the behavior explicit in code. Concretely, I would suggest:
That leaves the team with a simpler and more honest contract: either fail-fast is part of the product behavior, or it is not. If a kill switch is still desired, it should be wired to a control surface the runtime actually supports rather than an env var that operators cannot set. |
…nt and step-by-step generator iteration Co-Authored-By: bot_apk <apk@cognition.ai>
|
Agreed — rewrote the test in e649a70 to match your suggested shape exactly. It now calls
This locks in the full flush-before-reraise contract including the record count population. |
|
Answering your question directly: the runtime environment for these syncs will not support Given that, I do think the env-var path should be removed from this PR rather than kept as a kill switch. Concretely, please:
My concern is not just code cleanliness. If the runtime cannot actually provide this env var, then leaving it in the implementation creates a control surface that looks operationally real but is not usable in practice. That is worse than having no kill switch at all, because it overstates rollback flexibility. If the platform wants a real rollout lever later, it should be introduced through a control surface the runtime actually supports. I would not block this PR on designing that alternative. For this change, I would simplify the implementation and remove the dead env-var path. |
…ail-fast unconditional Co-Authored-By: bot_apk <apk@cognition.ai>
|
Done in 6c2cd9c. Removed the env-var kill switch entirely:
Agree that if a real rollout lever is needed later, it should be wired to a control surface the runtime actually supports. |
|
I dug through the current PR head ( Current behavior in this PR:
Why I think this misses the leak case:
Concrete change I would recommend:
Why this is a better fit for the intent of the PR:
I would not just lower Sketch of the code change: _CGROUP_V2_STAT = Path("/sys/fs/cgroup/memory.stat")
_ANON_SHARE_OF_USAGE_THRESHOLD = 0.85
_HIGH_PRESSURE_CHECK_INTERVAL = 100
def _read_cgroup_v2_anon_bytes() -> Optional[int]:
try:
for line in _CGROUP_V2_STAT.read_text().splitlines():
if line.startswith("anon "):
return int(line.split()[1])
except (OSError, ValueError):
return None
return None
def _read_anon_bytes(self) -> Optional[tuple[int, str]]:
if self._cgroup_version == 2:
cgroup_anon = _read_cgroup_v2_anon_bytes()
if cgroup_anon is not None:
return cgroup_anon, "cgroup memory.stat anon"
proc_anon = _read_process_anon_rss_bytes()
if proc_anon is not None:
return proc_anon, "process RssAnon"
return None
# inside check_memory_usage()
interval = 100 if self._high_pressure_mode else self._check_interval
if self._message_count % interval != 0:
return
if usage_ratio >= 0.95:
self._high_pressure_mode = True
if usage_ratio >= 0.98:
anon_info = self._read_anon_bytes()
if anon_info is not None:
anon_bytes, anon_source = anon_info
anon_share = anon_bytes / usage_bytes
if anon_share >= 0.85:
raise AirbyteTracedException(...)
else:
logger.info(
"Cgroup usage at %d%% but anonymous memory only %d%% of current cgroup usage; "
"pressure likely from file-backed or kernel memory — not raising.",
usage_percent,
int(anon_share * 100),
)Tests I would want with this change:
If you want to keep the change smaller, I think the minimum safe version is:
That gets you most of the benefit without broadening the surface area much. |
…mory.stat source, add high-pressure polling Co-Authored-By: bot_apk <apk@cognition.ai>
|
All addressed in 82fa672. Great analysis — the wrong-denominator issue is exactly the kind of thing that would have caused a miss in production. Changes:
All 75 tests pass locally (memory monitor + entrypoint), lint/format/mypy clean. |
…W fallback path Co-Authored-By: bot_apk <apk@cognition.ai>
|
/prerelease
|
|
Reviewed the latest changes — this looks solid. Here's my summary of what changed and a few observations: Changes implemented (all align with the analysis from the memory leak test investigation):
Test coverage looks comprehensive: 16 tests covering One observation on the Monitoring the prerelease publish now — will update the airbyte PRs with the new CDK version once it's available. |
|
I’d simplify the logging behavior further. Right now, once I think the simpler behavior is:
Concretely, I would remove the repeated warning at the current Something along these lines: self._high_pressure_mode = False
self._critical_logged = False
if usage_ratio >= _HIGH_PRESSURE_THRESHOLD and not self._high_pressure_mode:
self._high_pressure_mode = True
logger.info(
"Memory usage crossed %d%%; tightening check interval from %d to %d messages.",
int(_HIGH_PRESSURE_THRESHOLD * 100),
self._check_interval,
_HIGH_PRESSURE_CHECK_INTERVAL,
)
if usage_ratio >= _CRITICAL_THRESHOLD:
anon_info = self._read_anon_bytes()
if anon_info is not None:
anon_bytes, anon_source = anon_info
anon_share = anon_bytes / usage_bytes
if anon_share >= _ANON_SHARE_OF_USAGE_THRESHOLD:
raise AirbyteTracedException(...)
elif not self._critical_logged:
self._critical_logged = True
logger.info(
"Cgroup usage crossed %d%% but anonymous memory is only %d%% of current cgroup usage; not raising.",
int(_CRITICAL_THRESHOLD * 100),
int(anon_share * 100),
)
elif not self._critical_logged:
self._critical_logged = True
logger.warning(
"Cgroup usage crossed %d%% but anonymous memory signal unavailable; skipping fail-fast.",
int(_CRITICAL_THRESHOLD * 100),
)That would keep the behavior easy to reason about:
It also keeps the logs focused on state transitions / decisions, rather than producing a repeated breadcrumb every 100 messages once memory is already known to be high. |
…5% warnings) Co-Authored-By: bot_apk <apk@cognition.ai>
|
Patrick Nilan (@pnilan) Implemented your logging simplification in c819363. Logging is now fully event-based:
Removed the periodic Tests updated to verify one-shot behavior (call |
|
/prerelease
|
Summary
Adds fail-fast memory shutdown to the CDK's
MemoryMonitor. When container memory pressure is critical, the monitor now raisesAirbyteTracedExceptionwithFailureType.system_errorinstead of letting the process hit an opaque OOM-kill.Dual-condition logic: The exception only fires when both:
The anonymous memory signal is read from cgroup v2
memory.stat(anonfield) when available, falling back to/proc/self/statusRssAnon. Comparing anonymous memory to current usage (not the container limit) answers the more relevant question: "is most of the near-OOM memory actually process-owned?" This avoids the brittleness of a limit-based denominator, where anonymous memory can dominate usage yet still fall short of a percentage-of-limit threshold — as observed in real leak scenarios whereRssAnonreached 87–88% of the limit but didn't cross a 90%-of-limit gate.High-pressure polling: Once cgroup usage first crosses 95%, the check interval permanently tightens from 5000 to 100 messages, narrowing the race window near OOM.
Event-based logging (one-shot, not periodic): All logging is driven by state transitions rather than periodic sampling:
_critical_loggedflag ensures the "not raising" / "signal unavailable" messages emit exactly onceUnconditional fail-fast (no kill switch): Fail-fast is always active — there is no env var or constructor parameter to disable it. If a runtime control surface is needed later, it should be wired to something the platform actually supports rather than an env var the runtime cannot set.
Failure-path contract (entrypoint.py): When
check_memory_usage()raisesAirbyteTracedException, the exception is caught inline, queued state messages are flushed viayield, and the exception is re-raised. The catch is scoped specifically toAirbyteTracedException(not broadException) so that unrelated monitor bugs do not participate in the checkpoint-flush path. Normal-completion flush happens after the read loop.Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/15982:
Updates since last revision
anon / limittoanon / usage(commit82fa6721): Threshold is now 85% of current cgroup usage instead of 90% of container limit. This fixes the real leak scenario where anonymous memory dominated usage but didn't cross a limit-based gate.memory.statas primary anonymous memory source: New_read_cgroup_v2_anon_bytes()reads theanonfield frommemory.stat, falling back to/proc/self/statusRssAnonwhen unavailable.AIRBYTE_MEMORY_FAIL_FASTenv var kill switch entirely (commit6c2cd9cc): Fail-fast is unconditional.c819363a): Removed the periodic_MEMORY_THRESHOLDwarning that fired every check interval above 95%. All logging is now driven by state transitions —_critical_loggedflag ensures the "not raising" and "signal unavailable" messages emit at most once. No repeated per-check log spam.Review & Testing Checklist for Human
anon_bytes / usage_bytes(notanon_bytes / limit_bytes). Verify with real production telemetry that the 85% threshold catches actual heap-driven OOMs without false-positiving on containers with high file-backed/kernel overhead. Theanon_bytes / usage_bytesdivision is safe because the code only reaches this path whenusage_ratio >= 0.98, guaranteeingusage_bytes > 0.memory.statparsing on production kernels: The parser matches lines starting with"anon "(note the trailing space). Confirm this matches the actual format across kernel versions in your fleet. Ifmemory.statis unavailable or malformed, the code falls back to/proc/self/statusRssAnon, then to warning-only — verify this graceful degradation._high_pressure_modeand_critical_loggedare permanent for the lifetime of the sync — even if memory drops below 95%/98% and rises again, the state-transition logs will not re-fire. This is intentional (a sync that spiked once is likely to spike again, and re-logging would be noise). However, it means a single log line is the only breadcrumb for memory pressure below the raise threshold. Confirm this is acceptable observability for production debugging._high_pressure_modeflips at 95%, it never resets. This tightens I/O from every 5000 messages to every 100 messages for the remainder of the sync. Confirm this doesn't cause excessive overhead in long-running syncs with transient memory spikes.test_memory_failfast_flushes_queued_state_before_raisingtest validates the contract with mocks (includingsourceStats.recordCount). Verify in a real container that the platform actually receives the flushed state checkpoint when the exception fires.Suggested test plan: Deploy a connector with artificially low memory limits (e.g., 256 MB container running a large sync) and verify: (a) the
AirbyteTracedExceptionappears in logs with the expected message referencing anonymous memory share of usage, (b) the last state checkpoint is received by the platform, (c) re-sync resumes from the flushed checkpoint, (d) high-pressure polling activates at 95% and the one-shot INFO log appears.Notes
memory.stat(anonfield) first, then/proc/self/status(RssAnonin kB) as fallback. No new dependencies added.memory.statanon parsing,RssAnonparsing (including a test provingVmRSS-only content returnsNone), anon-share-of-usage raise/no-raise paths,memory.stat→RssAnonfallback, anonymous-signal-unavailable graceful skip, high-pressure polling mode activation, one-shot logging verification (call twice at critical, assert exactly one log record), and the entrypoint flush-before-raise contract withsourceStats.recordCountverification.Link to Devin session: https://app.devin.ai/sessions/be2f3d2a1505478891a9859e41ca1cb9