Skip to content

feat(cdk): enable fail-fast shutdown on memory threshold with dual-condition check#962

Merged
Patrick Nilan (pnilan) merged 13 commits intomainfrom
devin/1774478445-memory-failfast
Apr 2, 2026
Merged

feat(cdk): enable fail-fast shutdown on memory threshold with dual-condition check#962
Patrick Nilan (pnilan) merged 13 commits intomainfrom
devin/1774478445-memory-failfast

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Mar 25, 2026

Summary

Adds fail-fast memory shutdown to the CDK's MemoryMonitor. When container memory pressure is critical, the monitor now raises AirbyteTracedException with FailureType.system_error instead of letting the process hit an opaque OOM-kill.

Dual-condition logic: The exception only fires when both:

  1. Cgroup memory usage ≥ 98% of the container limit
  2. Anonymous memory ≥ 85% of current cgroup usage (i.e. most of the near-OOM memory is process-owned anonymous pages, not file-backed cache)

The anonymous memory signal is read from cgroup v2 memory.stat (anon field) when available, falling back to /proc/self/status RssAnon. Comparing anonymous memory to current usage (not the container limit) answers the more relevant question: "is most of the near-OOM memory actually process-owned?" This avoids the brittleness of a limit-based denominator, where anonymous memory can dominate usage yet still fall short of a percentage-of-limit threshold — as observed in real leak scenarios where RssAnon reached 87–88% of the limit but didn't cross a 90%-of-limit gate.

High-pressure polling: Once cgroup usage first crosses 95%, the check interval permanently tightens from 5000 to 100 messages, narrowing the race window near OOM.

Event-based logging (one-shot, not periodic): All logging is driven by state transitions rather than periodic sampling:

  • One INFO when high-pressure mode activates (usage first crosses 95%)
  • One INFO/WARNING when the critical threshold (98%) is crossed but we do not raise (either anon share is below the fail-fast gate or the anonymous memory signal is unavailable)
  • No repeated per-check warnings — _critical_logged flag ensures the "not raising" / "signal unavailable" messages emit exactly once

Unconditional fail-fast (no kill switch): Fail-fast is always active — there is no env var or constructor parameter to disable it. If a runtime control surface is needed later, it should be wired to something the platform actually supports rather than an env var the runtime cannot set.

Failure-path contract (entrypoint.py): When check_memory_usage() raises AirbyteTracedException, the exception is caught inline, queued state messages are flushed via yield, and the exception is re-raised. The catch is scoped specifically to AirbyteTracedException (not broad Exception) so that unrelated monitor bugs do not participate in the checkpoint-flush path. Normal-completion flush happens after the read loop.

Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/15982:

Updates since last revision

  • Switched second gate denominator from anon / limit to anon / usage (commit 82fa6721): Threshold is now 85% of current cgroup usage instead of 90% of container limit. This fixes the real leak scenario where anonymous memory dominated usage but didn't cross a limit-based gate.
  • Added cgroup v2 memory.stat as primary anonymous memory source: New _read_cgroup_v2_anon_bytes() reads the anon field from memory.stat, falling back to /proc/self/status RssAnon when unavailable.
  • Added high-pressure polling mode: Once usage crosses 95%, check interval tightens permanently from 5000 → 100 messages.
  • Removed AIRBYTE_MEMORY_FAIL_FAST env var kill switch entirely (commit 6c2cd9cc): Fail-fast is unconditional.
  • Simplified logging to event-based one-shot (commit c819363a): Removed the periodic _MEMORY_THRESHOLD warning that fired every check interval above 95%. All logging is now driven by state transitions — _critical_logged flag ensures the "not raising" and "signal unavailable" messages emit at most once. No repeated per-check log spam.

Review & Testing Checklist for Human

  • Anon-share-of-usage denominator: The second gate now computes anon_bytes / usage_bytes (not anon_bytes / limit_bytes). Verify with real production telemetry that the 85% threshold catches actual heap-driven OOMs without false-positiving on containers with high file-backed/kernel overhead. The anon_bytes / usage_bytes division is safe because the code only reaches this path when usage_ratio >= 0.98, guaranteeing usage_bytes > 0.
  • memory.stat parsing on production kernels: The parser matches lines starting with "anon " (note the trailing space). Confirm this matches the actual format across kernel versions in your fleet. If memory.stat is unavailable or malformed, the code falls back to /proc/self/status RssAnon, then to warning-only — verify this graceful degradation.
  • One-shot logging flags never reset: Both _high_pressure_mode and _critical_logged are permanent for the lifetime of the sync — even if memory drops below 95%/98% and rises again, the state-transition logs will not re-fire. This is intentional (a sync that spiked once is likely to spike again, and re-logging would be noise). However, it means a single log line is the only breadcrumb for memory pressure below the raise threshold. Confirm this is acceptable observability for production debugging.
  • High-pressure polling is permanent: Once _high_pressure_mode flips at 95%, it never resets. This tightens I/O from every 5000 messages to every 100 messages for the remainder of the sync. Confirm this doesn't cause excessive overhead in long-running syncs with transient memory spikes.
  • Entrypoint flush-before-raise path: The test_memory_failfast_flushes_queued_state_before_raising test validates the contract with mocks (including sourceStats.recordCount). Verify in a real container that the platform actually receives the flushed state checkpoint when the exception fires.
  • No runtime disable mechanism: There is no env var or flag to turn fail-fast off. If a connector triggers false positives in production, the only recourse is a CDK rollback or code change.

Suggested test plan: Deploy a connector with artificially low memory limits (e.g., 256 MB container running a large sync) and verify: (a) the AirbyteTracedException appears in logs with the expected message referencing anonymous memory share of usage, (b) the last state checkpoint is received by the platform, (c) re-sync resumes from the flushed checkpoint, (d) high-pressure polling activates at 95% and the one-shot INFO log appears.

Notes

  • This is not a breaking change: the behavior only triggers under conditions that would otherwise result in an opaque OOM-kill.
  • Anonymous memory is read from cgroup v2 memory.stat (anon field) first, then /proc/self/status (RssAnon in kB) as fallback. No new dependencies added.
  • 25 unit tests cover: memory.stat anon parsing, RssAnon parsing (including a test proving VmRSS-only content returns None), anon-share-of-usage raise/no-raise paths, memory.statRssAnon fallback, anonymous-signal-unavailable graceful skip, high-pressure polling mode activation, one-shot logging verification (call twice at critical, assert exactly one log record), and the entrypoint flush-before-raise contract with sourceStats.recordCount verification.
  • If the normal-state check interval is ever made configurable or reduced below 100 messages, re-evaluate the fixed high-pressure interval so that entering high-pressure mode cannot become less frequent than the baseline polling cadence.

Link to Devin session: https://app.devin.ai/sessions/be2f3d2a1505478891a9859e41ca1cb9

…ndition check

Add critical memory threshold (95% cgroup + 80% process RSS) that raises
AirbyteTracedException with FailureType.system_error when both conditions
are met. This gives connectors a clean error message instead of an opaque
OOM kill.

Key design decisions:
- Dual-condition: only raises when BOTH cgroup >= 95% AND process RSS >= 80%
  of container limit. This avoids false positives from reclaimable kernel
  page cache (e.g., SQLite file cache).
- Process RSS read from /proc/self/status (no new dependencies).
- Kill switch: AIRBYTE_MEMORY_FAIL_FAST env var (default: true, set to
  'false' to disable).
- Failure-path contract: entrypoint read loop wrapped in try/finally to
  flush queued state messages before exception propagates.

Resolves airbytehq/airbyte-internal-issues#15982

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1774478445-memory-failfast#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1774478445-memory-failfast

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 25, 2026

PyTest Results (Fast)

3 987 tests  +12   3 976 ✅ +12   7m 44s ⏱️ +12s
    1 suites ± 0      11 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit b856e7c. ± Comparison against base commit 69cd63d.

This pull request removes 4 and adds 16 tests. Note that renamed tests count towards both.
unit_tests.utils.test_memory_monitor ‑ test_cgroup_v1_emits_warning
unit_tests.utils.test_memory_monitor ‑ test_logs_at_90_percent
unit_tests.utils.test_memory_monitor ‑ test_logs_on_every_check_above_90_percent
unit_tests.utils.test_memory_monitor ‑ test_no_warning_below_threshold
unit_tests.test_entrypoint ‑ test_memory_failfast_flushes_queued_state_before_raising
unit_tests.utils.test_memory_monitor ‑ test_cgroup_v1_activates_high_pressure_mode
unit_tests.utils.test_memory_monitor ‑ test_falls_back_to_process_rssanon_low_and_does_not_raise
unit_tests.utils.test_memory_monitor ‑ test_falls_back_to_process_rssanon_when_cgroup_v2_anon_unavailable
unit_tests.utils.test_memory_monitor ‑ test_no_log_below_threshold
unit_tests.utils.test_memory_monitor ‑ test_no_raise_when_anonymous_memory_signal_unavailable_at_critical_usage
unit_tests.utils.test_memory_monitor ‑ test_no_raise_when_cgroup_critical_but_anon_share_of_usage_below_threshold
unit_tests.utils.test_memory_monitor ‑ test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold
unit_tests.utils.test_memory_monitor ‑ test_read_cgroup_v2_anon_bytes_parses_anon_field
unit_tests.utils.test_memory_monitor ‑ test_read_cgroup_v2_anon_bytes_returns_none_on_oserror
…

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 25, 2026

PyTest Results (Full)

3 990 tests  +12   3 978 ✅ +12   11m 12s ⏱️ -8s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit b856e7c. ± Comparison against base commit 69cd63d.

This pull request removes 4 and adds 16 tests. Note that renamed tests count towards both.
unit_tests.utils.test_memory_monitor ‑ test_cgroup_v1_emits_warning
unit_tests.utils.test_memory_monitor ‑ test_logs_at_90_percent
unit_tests.utils.test_memory_monitor ‑ test_logs_on_every_check_above_90_percent
unit_tests.utils.test_memory_monitor ‑ test_no_warning_below_threshold
unit_tests.test_entrypoint ‑ test_memory_failfast_flushes_queued_state_before_raising
unit_tests.utils.test_memory_monitor ‑ test_cgroup_v1_activates_high_pressure_mode
unit_tests.utils.test_memory_monitor ‑ test_falls_back_to_process_rssanon_low_and_does_not_raise
unit_tests.utils.test_memory_monitor ‑ test_falls_back_to_process_rssanon_when_cgroup_v2_anon_unavailable
unit_tests.utils.test_memory_monitor ‑ test_no_log_below_threshold
unit_tests.utils.test_memory_monitor ‑ test_no_raise_when_anonymous_memory_signal_unavailable_at_critical_usage
unit_tests.utils.test_memory_monitor ‑ test_no_raise_when_cgroup_critical_but_anon_share_of_usage_below_threshold
unit_tests.utils.test_memory_monitor ‑ test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold
unit_tests.utils.test_memory_monitor ‑ test_read_cgroup_v2_anon_bytes_parses_anon_field
unit_tests.utils.test_memory_monitor ‑ test_read_cgroup_v2_anon_bytes_returns_none_on_oserror
…

♻️ This comment has been updated with latest results.

@pnilan
Copy link
Copy Markdown
Contributor

I think the second signal should be RssAnon instead of VmRSS, and if the goal is to fail fast only under very strong evidence of imminent process-driven OOM, I would also move _CRITICAL_THRESHOLD from 95% to 98%.

Why:

  • VmRSS is still RssAnon + RssFile + RssShmem, so it can be pulled up by file-backed / shared resident pages. That means mmap'd SQLite pages and other resident file-backed mappings can still satisfy the second threshold even though the PR is trying to discount that kind of pressure.
  • RssAnon is not identical to "Python heap", but it is a closer proxy for private / anonymous process memory than total RSS.
  • If RssAnon is unavailable, I would skip fail-fast and log rather than falling back to cgroup-only raising; otherwise the implementation stops being truly dual-condition.
  • With a 98% cgroup threshold, the behavior becomes more conservative: only fail fast when container pressure is extremely high and anonymous process memory is also high.

Concrete changes I'd suggest:

  1. Parse RssAnon: from /proc/self/status instead of VmRSS:.
  2. Rename the helper / constants / log messages so they say anon explicitly.
  3. Set _CRITICAL_THRESHOLD = 0.98.
  4. On missing RssAnon, log and return instead of raising.
  5. Update the tests to:
    • parse RssAnon
    • use 98% / 97% boundary values
    • prove the metric choice matters by making VmRSS high but RssAnon low in the no-raise case.
  6. Update the comments/docstrings to say anonymous resident memory / process-private anonymous memory, not just RSS or Python heap.

That would make the implementation line up more closely with the design described in the PR.

…8%, log+skip on missing RssAnon

Address review feedback:
- Parse RssAnon from /proc/self/status instead of VmRSS to avoid
  inflation from file-backed/shared resident pages
- Raise _CRITICAL_THRESHOLD from 95% to 98% for more conservative
  fail-fast behavior
- When RssAnon is unavailable, log a warning and skip fail-fast
  instead of falling back to cgroup-only raising (stays truly
  dual-condition)
- Add test proving metric choice matters (VmRSS high but RssAnon low)
- Update all docstrings/comments to say 'anonymous resident memory'

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Great points — all addressed in 748212f:

  1. RssAnon instead of VmRSS — now parses RssAnon: from /proc/self/status. Docstrings updated to say "process-private anonymous memory".
  2. _CRITICAL_THRESHOLD raised to 0.98 — more conservative, only fires under extreme container pressure.
  3. Missing RssAnon → log + skip — when RssAnon is unavailable, logs a warning and returns instead of falling back to cgroup-only raising. The implementation stays truly dual-condition.
  4. Renamed helper to _read_process_anon_rss_bytes(), constant to _ANON_RSS_THRESHOLD, and all log messages / comments to reference "anonymous RSS" explicitly.
  5. Tests updated:
    • Parse RssAnon in all mocks
    • Boundary values at 98%/97% for cgroup threshold
    • New test_read_process_anon_rss_bytes_ignores_vmrss proves the metric choice matters (VmRSS-only content → returns None)
    • test_no_raise_when_cgroup_high_but_anon_rss_low uses mock with VmRSS at 90% but RssAnon at 50% to prove file-backed pages are correctly excluded
    • test_no_raise_when_anon_rss_unavailable_and_cgroup_critical verifies log-and-skip behavior

Devin session

@pnilan Patrick Nilan (pnilan) marked this pull request as ready for review March 26, 2026 23:46
Copilot AI review requested due to automatic review settings March 26, 2026 23:46
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a dual-signal “fail-fast” memory shutdown path to the CDK so connectors can fail with a clear AirbyteTracedException(FailureType.system_error) under near-OOM conditions, instead of being OOM-killed.

Changes:

  • Extend MemoryMonitor to optionally raise when cgroup usage is critical and process RssAnon is high (with env-var + constructor gating).
  • Add unit tests for RssAnon parsing, dual-condition behavior, and feature-flag behavior.
  • Wrap the read() loop to attempt flushing queued messages even when a fail-fast exception interrupts a sync.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
airbyte_cdk/utils/memory_monitor.py Implements dual-condition fail-fast using cgroup + /proc/self/status RssAnon, plus the AIRBYTE_MEMORY_FAIL_FAST switch.
unit_tests/utils/test_memory_monitor.py Adds coverage for RssAnon parsing and the new fail-fast decision logic + toggles.
airbyte_cdk/entrypoint.py Adds a try/finally around the read loop to flush queued messages on interruption.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Address Copilot review feedback:
- Restructure entrypoint.py to catch the fail-fast exception around
  check_memory_usage(), flush queued messages, then re-raise — avoids
  yielding inside a finally block which can trigger RuntimeError on
  GeneratorExit.
- Use clear=True in patch.dict for test_fail_fast_enabled_by_default
  to ensure AIRBYTE_MEMORY_FAIL_FAST is truly unset in test env.

Co-Authored-By: bot_apk <apk@cognition.ai>
Per review feedback, increase _MEMORY_THRESHOLD from 0.90 to 0.95
to reduce noise from the warning log. Update all related docstrings,
comments, and test assertions accordingly.

Co-Authored-By: bot_apk <apk@cognition.ai>
@pnilan
Copy link
Copy Markdown
Contributor

Thanks for making the RssAnon / 98% changes.

One follow-up: I would set _ANON_RSS_THRESHOLD = 0.90, not 0.80.

Now that the logic is intentionally very conservative on the cgroup side (98%), an anonymous-RSS threshold of 80% still leaves a fairly large gap between "container is almost full" and "anonymous/process-private memory is almost full". At 98% cgroup + 80% anon RSS, you can still have up to 18% of the container limit in other categories and trip fail-fast.

If the design goal is "only fail fast when there is very strong evidence of imminent process-driven OOM", 0.90 feels like the better default:

  • still leaves room for some RssFile / RssShmem / runtime overhead
  • makes the second signal meaningfully stronger than the current 0.80
  • lines up better with the new conservative 98% cgroup threshold

Absent fleet data, I would rather start at 0.90 and relax later than start at 0.80 and discover we are still a bit too eager.

Per review feedback, increase _ANON_RSS_THRESHOLD from 0.80 to 0.90
to align with the conservative 98% cgroup threshold. Start stricter
and relax later once fleet data is available.

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Patrick Nilan (@pnilan) Agreed — _ANON_RSS_THRESHOLD bumped to 0.90 in 10fc815. Start strict, relax later with fleet data.

Current thresholds summary:

  • Logging warning: ≥ 95% cgroup usage
  • Fail-fast: ≥ 98% cgroup AND ≥ 90% RssAnon
  • Kill switch: AIRBYTE_MEMORY_FAIL_FAST=false to disable

Devin session

@pnilan
Copy link
Copy Markdown
Contributor

Patrick Nilan (pnilan) commented Mar 27, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/23659658197

@pnilan
Copy link
Copy Markdown
Contributor

I think the exception scope in AirbyteEntrypoint.read() should be narrowed.

At airbyte_cdk/entrypoint.py:284-292, the new logic does this:

try:
    self._memory_monitor.check_memory_usage()
except Exception:
    for queued_message in self._emit_queued_messages(self.source):
        yield self.handle_record_counts(queued_message, stream_message_counter)
    raise

The intended exception from the new fail-fast path is AirbyteTracedException, raised by MemoryMonitor.check_memory_usage() when the dual-condition threshold is met. Catching Exception here is broader than that contract.

Why this matters:

  • It makes unrelated monitor bugs follow the same checkpoint-flush path as the intentional fail-fast case.
  • For example, if a future regression in check_memory_usage() raises TypeError, KeyError, or another unexpected exception, we will still flush queued messages before re-raising.
  • Because queued messages can include state checkpoints, that means an internal bug in the monitor can affect resume state behavior, not just the deliberate memory fail-fast path.
  • The code still re-raises, so this is not swallowing errors, but it is still changing observable behavior for exception types that were probably not meant to participate in this flow.

I think the safer and more self-documenting version is:

try:
    self._memory_monitor.check_memory_usage()
except AirbyteTracedException:
    # Flush queued messages (state checkpoints, logs) before propagating
    # the memory fail-fast exception, so the platform receives the last
    # committed state for the next sync.
    for queued_message in self._emit_queued_messages(self.source):
        yield self.handle_record_counts(queued_message, stream_message_counter)
    raise

That keeps the new flush-and-reraise behavior tightly scoped to the intentional fail-fast mechanism introduced in this PR.

If the broader behavior is intentional, I’d suggest documenting that explicitly in a comment, because except Exception currently reads more like an over-broad catch than a deliberate policy decision.

I’d also recommend a targeted test for this path:

  • queue a state message,
  • make check_memory_usage() raise AirbyteTracedException,
  • assert the queued state is emitted before the exception propagates.

That would lock in the exact behavior this PR is trying to guarantee.

…-before-raise test

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Good call — narrowed the catch to except AirbyteTracedException in 6cd3467 so only the intentional fail-fast path triggers the flush-and-reraise. Unexpected monitor bugs (e.g. TypeError, KeyError) will now propagate without touching queued messages.

Also added test_memory_failfast_flushes_queued_state_before_raising which:

  1. Queues a state message in the message repository
  2. Makes check_memory_usage() raise AirbyteTracedException
  3. Asserts the queued state is emitted before the exception propagates

Devin session

@pnilan
Copy link
Copy Markdown
Contributor

I think this change also needs a targeted regression test for the new exception path in AirbyteEntrypoint.read().

Right now, the PR adds special behavior at airbyte_cdk/entrypoint.py:284-292:

  • call check_memory_usage() after yielding a record,
  • if that call raises,
  • flush queued messages,
  • then re-raise.

That behavior is important because it is the mechanism that preserves the latest queued state checkpoint before the sync fails fast. I do not see a test in this PR that locks that contract in.

The existing read tests in unit_tests/test_entrypoint.py cover:

  • normal read success,
  • exceptions raised directly from source.read().

But they do not cover the new case where:

  • source.read() has already yielded at least one record,
  • check_memory_usage() raises AirbyteTracedException,
  • queued STATE messages must be flushed before the exception propagates.

I think the regression test should prove all of the following in one flow:

  • a record is emitted first,
  • check_memory_usage() raises AirbyteTracedException,
  • a queued STATE message is emitted before the exception is re-raised,
  • that queued STATE still goes through handle_record_counts(), so sourceStats.recordCount is populated correctly.

Suggested shape:

  1. Patch MockSource.read_state and MockSource.read_catalog as the existing read tests already do.
  2. Patch MockSource.read to return one RECORD message.
  3. Patch entrypoint._memory_monitor.check_memory_usage to raise AirbyteTracedException.
  4. Patch message_repository.consume_queue so the pre-read flush is empty, the exception-path flush returns one queued STATE message, and the final cleanup flush is empty.
  5. Iterate the generator manually and assert:
    • first output is the RECORD,
    • second output is the queued STATE,
    • the next iteration raises AirbyteTracedException.
  6. Assert the emitted STATE message has sourceStats.recordCount == 1.0 so the test verifies the flush path is not bypassing handle_record_counts().

That would protect the exact behavior this PR is introducing and would catch future regressions in the flush-before-reraise path.

@pnilan
Copy link
Copy Markdown
Contributor

I also think the AIRBYTE_MEMORY_FAIL_FAST env-var kill switch should be removed if the runtime environment for these syncs cannot actually provide it.

Right now the implementation presents fail-fast as runtime-configurable:

  • _ENV_FAIL_FAST = "AIRBYTE_MEMORY_FAIL_FAST"
  • MemoryMonitor.__init__() resolves fail_fast from the constructor arg or environment
  • the docstrings describe fail-fast as controlled by that env var
  • the tests validate env-var on/off behavior

If the production environment cannot set that env var, then this is not a real operational control. In practice that means:

  • the code advertises a rollback lever that does not exist,
  • the docs overstate deploy-time flexibility,
  • the env-var-specific branches and tests add maintenance surface without giving operators an actionable switch.

In that case, I think the cleaner implementation is to remove the env-var path entirely and make the behavior explicit in code.

Concretely, I would suggest:

  • remove _ENV_FAIL_FAST and the os.environ lookup from MemoryMonitor.__init__(),
  • remove the fail_fast parameter too unless there is a real non-default call site that needs it,
  • update the MemoryMonitor docstrings so they describe fail-fast as unconditional behavior rather than env-controlled behavior,
  • delete the env-var-specific tests in unit_tests/utils/test_memory_monitor.py.

That leaves the team with a simpler and more honest contract: either fail-fast is part of the product behavior, or it is not. If a kill switch is still desired, it should be wired to a control surface the runtime actually supports rather than an env var that operators cannot set.

…nt and step-by-step generator iteration

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Agreed — rewrote the test in e649a70 to match your suggested shape exactly. It now calls read() directly (not run()) and iterates the generator step-by-step:

  1. next(gen) → asserts RECORD with stream="users"
  2. next(gen) → asserts queued STATE with stream_state={"cursor": "abc123"}
  3. Asserts sourceStats.recordCount == 1.0 on that STATE (proves it went through handle_record_counts)
  4. next(gen) → raises AirbyteTracedException

This locks in the full flush-before-reraise contract including the record count population.


Devin session

@pnilan
Copy link
Copy Markdown
Contributor

Answering your question directly: the runtime environment for these syncs will not support AIRBYTE_MEMORY_FAIL_FAST.

Given that, I do think the env-var path should be removed from this PR rather than kept as a kill switch.

Concretely, please:

  • remove _ENV_FAIL_FAST
  • remove the os.environ lookup in MemoryMonitor.__init__()
  • remove the fail_fast constructor parameter too unless there is a real non-test caller that needs it
  • update the docstrings/comments so fail-fast is described as unconditional behavior, not env-controlled behavior
  • delete the env-var-specific tests

My concern is not just code cleanliness. If the runtime cannot actually provide this env var, then leaving it in the implementation creates a control surface that looks operationally real but is not usable in practice. That is worse than having no kill switch at all, because it overstates rollback flexibility.

If the platform wants a real rollout lever later, it should be introduced through a control surface the runtime actually supports. I would not block this PR on designing that alternative. For this change, I would simplify the implementation and remove the dead env-var path.

…ail-fast unconditional

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Done in 6c2cd9c. Removed the env-var kill switch entirely:

  • Deleted _ENV_FAIL_FAST constant and import os
  • Removed fail_fast constructor parameter and self._fail_fast field
  • Removed the self._fail_fast and guard in check_memory_usage() — fail-fast is now unconditional when the dual-condition is met
  • Updated all docstrings to describe fail-fast as unconditional behavior
  • Deleted all 4 env-var/kill-switch tests (test_fail_fast_disabled_via_constructor, test_fail_fast_disabled_via_env_var, test_fail_fast_enabled_by_default, test_fail_fast_constructor_overrides_env_var)

Agree that if a real rollout lever is needed later, it should be wired to a control surface the runtime actually supports.


Devin session

@pnilan
Copy link
Copy Markdown
Contributor

I dug through the current PR head (6c2cd9cc, March 27, 2026) and the intentional-leak test PR (#969 / 5aac7208, March 30, 2026). I think the current dual-condition is missing a real heap-driven OOM because the second gate is using the wrong denominator.

Current behavior in this PR:

  • First gate: cgroup usage >= 98% of container limit
  • Second gate: RssAnon >= 90% of container limit
  • Polling cadence: every 5000 messages

Why I think this misses the leak case:

  • In the failing run, the actor reaches 98-99% cgroup usage, but RssAnon is only 87-88% of the container limit, so we hit the "pressure likely from file-backed or reclaimable pages — not raising" path.
  • That does not necessarily mean the memory pressure is mostly reclaimable cache. It only means anonymous memory did not cross 90% of the full limit.
  • Near OOM, memory.current includes anon + file + kernel-accounted memory, so anonymous memory can be the dominant share of the memory actually in use and still never reach 90% of limit. In other words, the current second gate is brittle because it compares anon to the full limit instead of to the memory that is actually charged right now.

Concrete change I would recommend:

  1. Keep the first gate exactly as-is: usage_ratio >= 0.98.
  2. On cgroup v2, read anon from /sys/fs/cgroup/memory.stat; fall back to /proc/self/status:RssAnon only if memory.stat is unavailable or malformed.
  3. Replace anon_ratio = anon_bytes / limit_bytes with anon_share = anon_bytes / usage_bytes.
  4. Raise when anon_share >= 0.85.
  5. Once usage first crosses 0.95, permanently tighten polling from every 5000 messages to every 100 messages for the remainder of the sync.
  6. Keep the warning-only path when the anonymous-memory signal is unavailable.

Why this is a better fit for the intent of the PR:

  • It preserves the core design goal: do not fail fast just because total cgroup memory is high from file-backed/cache-heavy pressure.
  • It fixes the current leak scenario without hard-coding a container-size-specific threshold.
  • It makes the second gate answer the more relevant question: "is most of the near-OOM memory anonymous / process-owned right now?"
  • It narrows the current race window created by the 5000-message polling interval.

I would not just lower _ANON_RSS_THRESHOLD from 0.90 to 0.80. That would probably fix this one case, but it keeps the wrong denominator and will still be sensitive to container size and non-anon overhead.

Sketch of the code change:

_CGROUP_V2_STAT = Path("/sys/fs/cgroup/memory.stat")
_ANON_SHARE_OF_USAGE_THRESHOLD = 0.85
_HIGH_PRESSURE_CHECK_INTERVAL = 100

def _read_cgroup_v2_anon_bytes() -> Optional[int]:
    try:
        for line in _CGROUP_V2_STAT.read_text().splitlines():
            if line.startswith("anon "):
                return int(line.split()[1])
    except (OSError, ValueError):
        return None
    return None

def _read_anon_bytes(self) -> Optional[tuple[int, str]]:
    if self._cgroup_version == 2:
        cgroup_anon = _read_cgroup_v2_anon_bytes()
        if cgroup_anon is not None:
            return cgroup_anon, "cgroup memory.stat anon"

    proc_anon = _read_process_anon_rss_bytes()
    if proc_anon is not None:
        return proc_anon, "process RssAnon"

    return None

# inside check_memory_usage()
interval = 100 if self._high_pressure_mode else self._check_interval
if self._message_count % interval != 0:
    return

if usage_ratio >= 0.95:
    self._high_pressure_mode = True

if usage_ratio >= 0.98:
    anon_info = self._read_anon_bytes()
    if anon_info is not None:
        anon_bytes, anon_source = anon_info
        anon_share = anon_bytes / usage_bytes
        if anon_share >= 0.85:
            raise AirbyteTracedException(...)
        else:
            logger.info(
                "Cgroup usage at %d%% but anonymous memory only %d%% of current cgroup usage; "
                "pressure likely from file-backed or kernel memory — not raising.",
                usage_percent,
                int(anon_share * 100),
            )

Tests I would want with this change:

  • test_read_cgroup_v2_anon_bytes_parses_anon_field
  • test_falls_back_to_process_rssanon_when_cgroup_v2_anon_unavailable
  • test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold
  • test_no_raise_when_cgroup_critical_but_anon_share_of_usage_below_threshold
  • test_switches_to_high_pressure_check_interval_after_crossing_95_percent
  • test_no_raise_when_anonymous_memory_signal_unavailable_at_critical_usage

If you want to keep the change smaller, I think the minimum safe version is:

  • switch the second gate to anon_bytes / usage_bytes
  • keep the current 98% first gate
  • add the tighter polling mode after 95%

That gets you most of the benefit without broadening the surface area much.

…mory.stat source, add high-pressure polling

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

All addressed in 82fa672. Great analysis — the wrong-denominator issue is exactly the kind of thing that would have caused a miss in production.

Changes:

  1. Second gate denominator switched to anon_bytes / usage_bytes — threshold is now 85% of current cgroup usage, not of the container limit. This answers "is most of the near-OOM memory actually process-owned?" instead of "is anonymous memory high relative to the full limit?" — which, as you pointed out, can miss real leaks where anon dominates usage but doesn't reach 90% of limit.

  2. Cgroup v2 memory.stat anon field as primary source — new _read_cgroup_v2_anon_bytes() function parses the anon line from /sys/fs/cgroup/memory.stat. The _read_anon_bytes() method tries this first (on cgroup v2), then falls back to /proc/self/status RssAnon. This gives a cgroup-level view of anonymous memory rather than just the process-level one.

  3. High-pressure polling mode — once usage first crosses 95%, _high_pressure_mode flips permanently and the check interval tightens from 5000 to 100 messages. This narrows the race window near OOM.

  4. Tests match your requested list:

    • test_read_cgroup_v2_anon_bytes_parses_anon_field
    • test_falls_back_to_process_rssanon_when_cgroup_v2_anon_unavailable
    • test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold
    • test_no_raise_when_cgroup_critical_but_anon_share_of_usage_below_threshold
    • test_switches_to_high_pressure_check_interval_after_crossing_95_percent
    • test_no_raise_when_anonymous_memory_signal_unavailable_at_critical_usage
    • Plus 3 additional unit tests for _read_cgroup_v2_anon_bytes edge cases

All 75 tests pass locally (memory monitor + entrypoint), lint/format/mypy clean.


Devin session

…W fallback path

Co-Authored-By: bot_apk <apk@cognition.ai>
@pnilan
Copy link
Copy Markdown
Contributor

Patrick Nilan (pnilan) commented Mar 31, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/23820904857

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Reviewed the latest changes — this looks solid. Here's my summary of what changed and a few observations:

Changes implemented (all align with the analysis from the memory leak test investigation):

  1. anon / usage denominator (was anon / limit): The second gate now computes anon_bytes / usage_bytes at 85% threshold. This directly fixes the real leak scenario where RssAnon was 87–88% of the limit but 88.8% of usage — the old 90%-of-limit gate was mathematically unreachable in a 2 GB container with ~220 MB of non-anonymous overhead.

  2. cgroup v2 memory.stat as primary anon source: New _read_cgroup_v2_anon_bytes() reads the anon field, with graceful fallback to /proc/self/status RssAnon. This is the most accurate signal since it's the same accounting level as memory.current.

  3. High-pressure polling: Once cgroup crosses 95%, check interval permanently tightens from 5000 → 100 messages. Narrows the race window near OOM.

  4. Unconditional fail-fast: No env var kill switch — clean design.

Test coverage looks comprehensive: 16 tests covering memory.stat parsing, RssAnon fallback (both raise and no-raise paths), anon-share-of-usage boundary conditions, high-pressure mode activation, and the entrypoint flush-before-raise contract.

One observation on the _read_anon_bytes fallback path: When cgroup v2 is detected but memory.stat is malformed/missing the anon line, the code correctly falls back to process-level RssAnon. Worth noting that RssAnon is process-level only (not cgroup-level), so in a multi-process container the fallback gives a lower bound. This is fine for now since CDK connectors are single-process, and the fallback path is already tested (test_falls_back_to_process_rssanon_when_cgroup_v2_anon_unavailable).

Monitoring the prerelease publish now — will update the airbyte PRs with the new CDK version once it's available.

@pnilan
Copy link
Copy Markdown
Contributor

I’d simplify the logging behavior further.

Right now, once _high_pressure_mode is enabled, the monitor checks every 100 messages and logs on every sampled check while usage remains above the warning/critical thresholds. In practice that produces repeated logs every 100 messages, which feels noisier than the signal we actually need.

I think the simpler behavior is:

  • Log once when high-pressure mode turns on (>= 95%): "we crossed the high-pressure threshold and tightened the polling interval"
  • Log once when the critical threshold is crossed (>= 98%) but we do not raise:
    • either anon share is below the fail-fast threshold
    • or the anonymous-memory signal is unavailable
  • If anon share is high enough at >= 98%, just raise AirbyteTracedException as we do now; no extra repeated logging needed

Concretely, I would remove the repeated warning at the current >= 95% check and make logging event-based instead of periodic-threshold-based.

Something along these lines:

self._high_pressure_mode = False
self._critical_logged = False

if usage_ratio >= _HIGH_PRESSURE_THRESHOLD and not self._high_pressure_mode:
    self._high_pressure_mode = True
    logger.info(
        "Memory usage crossed %d%%; tightening check interval from %d to %d messages.",
        int(_HIGH_PRESSURE_THRESHOLD * 100),
        self._check_interval,
        _HIGH_PRESSURE_CHECK_INTERVAL,
    )

if usage_ratio >= _CRITICAL_THRESHOLD:
    anon_info = self._read_anon_bytes()
    if anon_info is not None:
        anon_bytes, anon_source = anon_info
        anon_share = anon_bytes / usage_bytes
        if anon_share >= _ANON_SHARE_OF_USAGE_THRESHOLD:
            raise AirbyteTracedException(...)
        elif not self._critical_logged:
            self._critical_logged = True
            logger.info(
                "Cgroup usage crossed %d%% but anonymous memory is only %d%% of current cgroup usage; not raising.",
                int(_CRITICAL_THRESHOLD * 100),
                int(anon_share * 100),
            )
    elif not self._critical_logged:
        self._critical_logged = True
        logger.warning(
            "Cgroup usage crossed %d%% but anonymous memory signal unavailable; skipping fail-fast.",
            int(_CRITICAL_THRESHOLD * 100),
        )

That would keep the behavior easy to reason about:

  • one log when we enter high-pressure mode
  • one log when we hit critical pressure but intentionally do not fail fast
  • one exception when we hit critical pressure and do fail fast

It also keeps the logs focused on state transitions / decisions, rather than producing a repeated breadcrumb every 100 messages once memory is already known to be high.

…5% warnings)

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Patrick Nilan (@pnilan) Implemented your logging simplification in c819363. Logging is now fully event-based:

  • One INFO when high-pressure mode activates (usage first crosses 95%)
  • One INFO when critical threshold (98%) is crossed but anon share is below the fail-fast gate
  • One WARNING when critical threshold is crossed but anonymous memory signal is unavailable
  • One exception when both conditions are met — no extra logging before the raise

Removed the periodic _MEMORY_THRESHOLD warning entirely. Added self._critical_logged flag so the critical-but-not-raising logs fire exactly once.

Tests updated to verify one-shot behavior (call check_memory_usage() twice at critical, assert exactly one log record).


Devin session

@pnilan
Copy link
Copy Markdown
Contributor

Patrick Nilan (pnilan) commented Mar 31, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/23823332086

@pnilan Patrick Nilan (pnilan) merged commit 0b94cbe into main Apr 2, 2026
26 of 27 checks passed
@pnilan Patrick Nilan (pnilan) deleted the devin/1774478445-memory-failfast branch April 2, 2026 01:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants