Skip to content

feat(cdk): add source-side memory monitoring (logging-only trial)#941

Merged
Patrick Nilan (pnilan) merged 16 commits intomainfrom
devin/1773091366-add-memory-monitor
Mar 10, 2026
Merged

feat(cdk): add source-side memory monitoring (logging-only trial)#941
Patrick Nilan (pnilan) merged 16 commits intomainfrom
devin/1773091366-add-memory-monitor

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Mar 9, 2026

Summary

Adds a MemoryMonitor class to the Python CDK that reads Linux cgroup v2/v1 memory files to detect memory pressure inside containers. Integrated into AirbyteEntrypoint.read(), it checks memory every 5000 messages:

  • ≥90% usage → logs a WARNING on every check interval (e.g., Source memory usage at 91% of container limit (0.85 / 0.93 GB).)

This is a logging-only trial — no exceptions are raised and no syncs are terminated. The goal is to collect production data on memory usage patterns (climbing, plateauing, sawtoothing) before deciding on fail-fast thresholds. cgroup memory includes reclaimable kernel page cache, so aggressive thresholds could prematurely kill healthy syncs.

Warnings are logged on every check interval above 90% (not one-shot), providing breadcrumb trails for post-mortem analysis.

When cgroup files are absent (local dev, CI), all calls are instant no-ops. Zero new dependencies.

Related to https://github.com/airbytehq/airbyte-internal-issues/issues/15938

Updates since last revision

Switched from fail-fast (exception at 95%) to logging-only trial per reviewer feedback:

  • Removed AirbyteTracedException raise — WARNING log only at ≥90%
  • Changed default check_interval from 1000 → 5000 to reduce I/O overhead
  • Removed one-shot flags (_warning_emitted, _critical_raised) — logs on every check above threshold
  • Simplified constructor — removed warning_threshold and critical_threshold parameters
  • Removed try/finally from entrypoint.py read loop (no exceptions to catch)
  • Added TODO on cgroup v1 paths for future removal once all deployments confirmed v2

Key design decisions

  • Logging-only, no fail-fast (yet): cgroup memory includes reclaimable page cache, making current thresholds unreliable for deciding when to kill a sync. This trial collects data to inform future thresholds.
  • Log on every check, not one-shot: Repeated warnings create breadcrumb trails showing memory trends over the sync lifetime — critical for diagnosing whether memory is climbing (leak) vs. plateauing (page cache) vs. sawtoothing (GC).
  • Check interval = 5000: Reduces filesystem I/O overhead and log volume while still providing useful signal.
  • Check after yield, not before: Every message pulled from the source iterator is emitted to the consumer before the next check. Logging cannot drop messages.
  • check_interval validated >= 1: Prevents ZeroDivisionError from message_count % check_interval.
  • cgroup v1 retained (with TODO): Minimal overhead; useful for older container runtimes / self-hosted setups.

Review & Testing Checklist for Human

  • Affects all Python CDK connectors: Integrated into AirbyteEntrypoint.read(). Every Python source connector will now have memory monitoring active in containers with cgroup files. Verify no performance regression — the no-op path (no cgroup files, or between check intervals) must be effectively zero-cost.

  • No real cgroup integration tests: All tests mock Path.exists() and Path.read_text(). Verify mocking assumptions match actual cgroup behavior (e.g., memory.max = "max" for unlimited). Manual testing in a memory-constrained container is recommended.

  • Log volume at scale: At 5000-message intervals with usage above 90%, a sync processing millions of records could produce many warning logs. Verify this is acceptable for log aggregation infrastructure.

Suggested test plan: Deploy a Python CDK connector in a memory-constrained container and run a sync against a high-volume source. Verify that (a) the warning log appears when usage crosses 90%, (b) warnings repeat at each 5000-message interval while above threshold, (c) syncs are not interrupted by the monitor, and (d) no performance degradation is observable compared to a baseline without the monitor.

Notes

  • Test coverage: 15 tests in test_memory_monitor.py + entrypoint integration tests remain unchanged
    • Unit tests cover: cgroup v1/v2 detection and usage reading, threshold behavior (logs at ≥90%), repeated logging (not one-shot), check interval logic (including validation >= 1), graceful degradation on I/O errors, no-op when cgroup unavailable
    • Removed tests: exception raising at critical threshold, one-shot flag behavior, custom thresholds, try/finally flush behavior (no longer applicable)
  • cgroup v1 support included with TODO comment for future removal once all deployments confirmed v2
  • Link to Devin Session: https://app.devin.ai/sessions/50b4da3217384b01b395ba1192be0f80
  • Requested by: bot_apk

Important

Auto-merge enabled.

This PR is set to merge automatically when all requirements are met.

…ror messages before OOM kills

Add MemoryMonitor class that reads cgroup v2/v1 memory files to detect
memory pressure in containerized environments. Integrates into the
AirbyteEntrypoint.read() loop to check memory every 1000 messages.

- At 85% usage: logs a warning message
- At 95% usage: raises MemoryLimitExceeded (AirbyteTracedException with
  transient_error failure type) for graceful shutdown
- No-op when cgroup files unavailable (local dev / CI)
- No new dependencies required

Related to airbytehq/airbyte-internal-issues#15938

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 9, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773091366-add-memory-monitor#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773091366-add-memory-monitor

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

Co-Authored-By: bot_apk <apk@cognition.ai>
@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 9, 2026

PyTest Results (Fast)

3 905 tests  +13   3 893 ✅ +13   6m 17s ⏱️ -29s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit bf9db5f. ± Comparison against base commit f550424.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 9, 2026

PyTest Results (Full)

3 908 tests  +13   3 896 ✅ +13   11m 16s ⏱️ +2s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit bf9db5f. ± Comparison against base commit f550424.

♻️ This comment has been updated with latest results.

@pnilan
Copy link
Copy Markdown
Contributor

_read_memory() has no error handling — sync will crash on malformed cgroup files

memory_monitor.py:82-93read_text() and int() calls are completely unguarded. If cgroup files exist but contain unexpected content (kernel quirks, permission issues, race conditions during container shutdown), this will crash the sync instead of degrading gracefully. The issue spec explicitly calls for "best-effort — failures to read memory info never crash a sync."

# Current (will crash on any I/O or parse error):
usage_text = usage_path.read_text().strip()
limit_text = limit_path.read_text().strip()

Suggestion: Wrap _read_memory() internals in a try/except that catches OSError and ValueError, logs a debug message, and returns None:

def _read_memory(self) -> Optional[tuple[int, int]]:
    if self._cgroup_version is None:
        return None
    try:
        if self._cgroup_version == 2:
            usage_path, limit_path = _CGROUP_V2_CURRENT, _CGROUP_V2_MAX
        else:
            usage_path, limit_path = _CGROUP_V1_USAGE, _CGROUP_V1_LIMIT

        limit_text = limit_path.read_text().strip()
        if limit_text == "max":
            return None

        usage_bytes = int(usage_path.read_text().strip())
        limit_bytes = int(limit_text)
        if limit_bytes <= 0:
            return None
        return usage_bytes, limit_bytes
    except (OSError, ValueError):
        logger.debug("Failed to read cgroup memory files; skipping memory check.")
        return None

This is low-risk and directly aligned with the issue spec's "failures to read memory info never crash a sync" requirement.

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Patrick Nilan (@pnilan) Great catch — addressed in b2e233f. _read_memory() now wraps all I/O and parsing in a try/except (OSError, ValueError) that logs at debug level and returns None, matching the issue spec's "best-effort — failures to read memory info never crash a sync" requirement.

Added two new tests covering both cases:

  • test_malformed_cgroup_file_degrades_gracefully — malformed content (e.g. "not_a_number")
  • test_os_error_degrades_gracefully — I/O errors (e.g. permission denied)

All 20 tests pass, ruff clean.


Devin session

@pnilan
Copy link
Copy Markdown
Contributor

Exception propagation — MemoryLimitExceeded crashes the process instead of producing a trace message

When MemoryLimitExceeded is raised inside the read() generator, it propagates through run() which has no try/except for AirbyteTracedException in the read path (lines 207-210). It bubbles up to launch() and crashes the process.

The whole point is a graceful shutdown — ideally the CDK should:

  1. Catch the exception
  2. Emit the last queued state messages (so the sync can resume)
  3. Emit an AirbyteTraceMessage with the user-facing error
  4. Then exit

Queued messages (including final state) are lost on shutdown

This is directly related: when MemoryLimitExceeded is raised inside the first for loop in read(), execution jumps out entirely. The second loop (_emit_queued_messages on line 282-283) is skipped, so any queued messages — including checkpoint state — are never yielded.

The finally block in run() (line 213-217) does flush the queue, but those messages bypass handle_record_counts, so state messages may have incorrect sourceStats.recordCount.

Since the InMemoryMessageRepository is just an unbounded deque with no backpressure, draining it in a finally is safe and will always complete immediately.

Suggestion

Wrap the read loop in read() with try/finally to ensure queued messages are always flushed with correct record counts:

# In read():
try:
    for message in self.source.read(self.logger, config, catalog, state):
        yield self.handle_record_counts(message, stream_message_counter)
        message_count += 1
        if message_count % DEFAULT_CHECK_INTERVAL == 0:
            memory_monitor.check_memory_usage()
finally:
    for message in self._emit_queued_messages(self.source):
        yield self.handle_record_counts(message, stream_message_counter)

The trace message emission may already be handled by the platform runner wrapping launch(), but that should be explicitly verified. If not, a catch in run() that calls traced_exc.as_airbyte_message() and yields it would close the loop entirely within the CDK.

… shutdown

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Patrick Nilan (@pnilan) Good catch — addressed in ee93565. The read() loop is now wrapped in try/finally so that _emit_queued_messages (with handle_record_counts) always runs, even when MemoryLimitExceeded interrupts the loop. This ensures checkpoint state is flushed with correct sourceStats.recordCount.

Regarding the trace message emission: you're right that the exception propagates through run() (which has no AirbyteTracedException catch in the read path). The run() finally block (lines 214-218) does flush the message repository, but those messages bypass handle_record_counts. With this fix, queued messages are now properly flushed in read() before the exception propagates, so the run() finally block will see an empty queue — the important messages (including state) have already been yielded with correct record counts.

The trace message (as_airbyte_message()) emission is a separate concern — it likely needs to happen at the run() or launch() level. I think that's worth a follow-up discussion with the team to confirm whether the platform runner already handles this, rather than adding it in this PR. Happy to open a tracking issue if you'd prefer.

All 69 tests pass (20 memory monitor + 49 entrypoint), ruff clean.


Devin session

@pnilan
Copy link
Copy Markdown
Contributor

Encapsulate check interval inside MemoryMonitor

Currently the entrypoint imports DEFAULT_CHECK_INTERVAL and manages the message counting / modulo logic itself:

from airbyte_cdk.utils.memory_monitor import DEFAULT_CHECK_INTERVAL, MemoryMonitor

message_count += 1
if message_count % DEFAULT_CHECK_INTERVAL == 0:
    memory_monitor.check_memory_usage()

This leaks an implementation detail to the caller. The monitor should own the "how often to check" logic internally. The caller should just call check_memory_usage() on every message and let the monitor decide whether to actually read the cgroup files:

# In MemoryMonitor.__init__:
self._check_interval = check_interval
self._message_count = 0

# In check_memory_usage:
def check_memory_usage(self) -> None:
    if self._cgroup_version is None:
        return
    self._message_count += 1
    if self._message_count % self._check_interval != 0:
        return
    # ... actual memory check logic

Then the entrypoint simplifies to:

memory_monitor = MemoryMonitor()
for message in self.source.read(self.logger, config, catalog, state):
    yield self.handle_record_counts(message, stream_message_counter)
    memory_monitor.check_memory_usage()

This keeps the interval easy to find and change (constructor parameter with a default const), and removes the coupling between the entrypoint and the monitor's internal check cadence.

Move message counting and modulo logic from the entrypoint into
MemoryMonitor.check_memory_usage(). The caller now simply calls
check_memory_usage() on every message and the monitor decides
internally whether to read cgroup files based on check_interval.

This removes the coupling between the entrypoint and the monitor's
internal check cadence.

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Re: encapsulate check interval inside MemoryMonitor — good call, done in cca6c50.

The monitor now owns the message counter and interval logic internally:

# MemoryMonitor.__init__ now accepts check_interval (default 1000)
self._check_interval = check_interval
self._message_count = 0

# check_memory_usage() increments and checks internally
self._message_count += 1
if self._message_count % self._check_interval != 0:
    return

The entrypoint simplifies to:

memory_monitor = MemoryMonitor()
try:
    for message in self.source.read(self.logger, config, catalog, state):
        yield self.handle_record_counts(message, stream_message_counter)
        memory_monitor.check_memory_usage()
finally:
    ...

Also added a test_check_interval_skips_intermediate_calls test to verify the interval behavior. All 70 tests pass, ruff clean.


Devin session (on behalf of Patrick Nilan (@pnilan))

@pnilan
Copy link
Copy Markdown
Contributor

Move MemoryMonitor to AirbyteEntrypoint.__init__

The MemoryMonitor is currently created inside read(), scoping it to just the read loop. But memory pressure can also occur during check commands — some connectors actually read records to validate the connection. With the current design, those check-time reads have no memory monitoring at all.

Suggestion: Instantiate MemoryMonitor in AirbyteEntrypoint.__init__ so it's available as self._memory_monitor for the lifetime of the sync. This way check_memory_usage() can be called from both read() and check() (and any future command that does significant work). It also avoids the warning/critical flag reset issue if read() were ever called more than once per entrypoint instance.

Instantiate MemoryMonitor in __init__ so it persists for the lifetime
of the entrypoint instance. This makes it available as
self._memory_monitor for read() and any future command that does
significant work (e.g. check). Also avoids resetting warning/critical
flags if read() were ever called more than once.

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Re: Move MemoryMonitor to AirbyteEntrypoint.init — done in ec56943.

MemoryMonitor is now instantiated as self._memory_monitor in __init__, so it persists for the lifetime of the entrypoint and is available across read(), check(), and any future command. This also avoids resetting warning/critical flags if read() were ever called more than once per instance.

All 70 tests pass, ruff clean.


Devin session (on behalf of Patrick Nilan (@pnilan))

@pnilan
Copy link
Copy Markdown
Contributor

Lazy-initialize cgroup detection

MemoryMonitor.__init__ calls Path.exists() 2-4 times to probe for cgroup v2/v1 files. Combined with the suggestion to move instantiation to AirbyteEntrypoint.__init__, this I/O would run on every connector invocation — including spec and discover where memory monitoring is irrelevant.

Suggestion: Defer cgroup version detection to the first call to check_memory_usage():

def __init__(self, ...) -> None:
    self._warning_threshold = warning_threshold
    self._critical_threshold = critical_threshold
    self._warning_emitted = False
    self._critical_raised = False
    self._cgroup_version: Optional[int] = None
    self._probed = False

def _probe_cgroup(self) -> None:
    if self._probed:
        return
    self._probed = True
    if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists():
        self._cgroup_version = 2
    elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists():
        self._cgroup_version = 1

def check_memory_usage(self) -> None:
    self._probe_cgroup()
    if self._cgroup_version is None:
        return
    # ... rest of check logic

This way the filesystem probing only happens when monitoring is actually needed (during read() or check()), not on every connector invocation.

@pnilan
Copy link
Copy Markdown
Contributor

Missing test coverage

1. No test for I/O errors in _read_memory()

Once the try/except is added (per the earlier comment), there should be tests proving graceful degradation:

  • read_text() raises OSError (permission denied, file disappeared mid-read)
  • File contents are non-numeric (e.g., empty string, garbage data) causing ValueError from int()
  • Both cases should result in check_memory_usage() being a silent no-op, not a crash

2. No test for limit_bytes == 0

The <= 0 guard exists in _read_memory() but is never exercised. Add a test where the cgroup limit file contains "0" and verify it returns None / is a no-op.

@pnilan
Copy link
Copy Markdown
Contributor

Missing integration test: graceful shutdown flushes queued state messages

The core value proposition of this PR is graceful shutdown, but there's no test verifying that when MemoryLimitExceeded fires mid-read, queued messages (including the last checkpoint state) are flushed with correct sourceStats.recordCount.

The existing test infrastructure makes this straightforward to add. Follow the pattern from test_run_read in test_entrypoint.py (~line 472):

Steps

  1. Set up the mock source using the existing MockSource and entrypoint fixture. Have source.read() return a controlled sequence: record, record, state, record, record.

  2. Queue a message in the InMemoryMessageRepository mock (same pattern as the existing fixture) — this simulates a pending state/log message that should be flushed on shutdown.

  3. Patch MemoryMonitor.check_memory_usage with a side_effect that returns None for the first few calls, then raises MemoryLimitExceeded. This triggers the exception at a known point mid-stream.

  4. Consume entrypoint.run(parsed_args) collecting yielded messages until the exception propagates.

  5. Assert:

    • All messages yielded before the exception point are present
    • The queued messages from _emit_queued_messages (the finally block) were flushed
    • State messages have correct sourceStats.recordCount (proving they went through handle_record_counts, not just the raw run() finally block)

Example sketch

def test_memory_limit_exceeded_flushes_queued_messages(
    entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock
):
    parsed_args = Namespace(
        command="read", config="config_path", state="statepath", catalog="catalogpath"
    )
    records = [
        AirbyteMessage(record=AirbyteRecordMessage(stream="s", data={}, emitted_at=1), type=Type.RECORD),
        AirbyteMessage(record=AirbyteRecordMessage(stream="s", data={}, emitted_at=1), type=Type.RECORD),
    ]
    mocker.patch.object(MockSource, "read_state", return_value={})
    mocker.patch.object(MockSource, "read_catalog", return_value={})
    mocker.patch.object(MockSource, "read", return_value=iter(records))

    # Patch monitor to raise after first check
    mocker.patch.object(
        MemoryMonitor, "check_memory_usage",
        side_effect=[None, MemoryLimitExceeded(
            internal_message="Memory at 96%",
            message="Source exceeded memory limit.",
            failure_type=FailureType.transient_error,
        )],
    )

    yielded = []
    with pytest.raises(MemoryLimitExceeded):
        for msg in entrypoint.run(parsed_args):
            yielded.append(msg)

    # Assert queued messages were flushed (from the try/finally in read())
    # Assert state messages have correct sourceStats.recordCount

Note: This test depends on the try/finally fix from the earlier comment (wrapping the read loop so _emit_queued_messages runs on exception). Without that fix, this test would fail — which is actually a good way to prove the fix works. Consider writing the test first as a failing test, then applying the fix.

…wn integration test

- Complete test_memory_monitor.py with 24 tests across 4 test classes
- TestMemoryMonitorInit: cgroup v1/v2 detection, lazy-init verification
- TestMemoryMonitorCheckMemory: thresholds, error degradation, intervals, limit_bytes==0
- TestMemoryLimitExceeded: exception type and attribute validation
- TestDefaultCheckInterval: constant value verification
- Add test_memory_limit_exceeded_flushes_queued_messages to test_entrypoint.py
  verifying that try/finally in read() flushes queued state messages even
  when MemoryLimitExceeded propagates

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Patrick Nilan (@pnilan) Re: Lazy-initialize cgroup detection — done in 7f7ab40.

MemoryMonitor.__init__ now does zero I/O. Cgroup probing is deferred to _probe_cgroup(), which runs once on the first check_memory_usage() call:

def __init__(self, ...) -> None:
    self._cgroup_version: Optional[int] = None
    self._probed = False
    # ... thresholds and counters only

def _probe_cgroup(self) -> None:
    if self._probed:
        return
    self._probed = True
    if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists():
        self._cgroup_version = 2
    elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists():
        self._cgroup_version = 1

This means spec and discover commands pay zero cost for the monitor sitting in __init__. Tests include test_no_io_on_construction and test_probing_happens_once_on_first_check to verify the lazy-init behavior.


Devin session

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Patrick Nilan (@pnilan) Re: Missing test coverage — both gaps addressed in 7f7ab40.

1. I/O errors in _read_memory() — already covered by test_malformed_cgroup_file_degrades_gracefully and test_os_error_degrades_gracefully (added in an earlier commit). Both confirm check_memory_usage() is a silent no-op when read_text() raises OSError or contents cause ValueError.

2. limit_bytes == 0 guard — new test test_limit_bytes_zero_is_noop explicitly covers this: cgroup limit file returns "0\n", monitor confirms no warning emitted and no critical raised.

All 24 unit tests pass in test_memory_monitor.py.


Devin session

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Patrick Nilan (@pnilan) Re: Missing integration test: graceful shutdown flushes queued state messages — done in 7f7ab40.

Added test_memory_limit_exceeded_flushes_queued_messages in test_entrypoint.py following your sketch. The test:

  1. Sets up MockSource.read() returning 2 records
  2. Queues a state message in message_repository (simulating pending checkpoint)
  3. Patches check_memory_usage to raise MemoryLimitExceeded on the 2nd call
  4. Collects messages with pytest.raises(MemoryLimitExceeded) — the exception propagates as expected
  5. Asserts:
    • At least 1 record yielded before the exception
    • Queued state message flushed by the finally block in read()
    • State message has correct sourceStats.recordCount == 2.0 (both records counted before exception)

This directly proves the try/finally fix works — without it, the queued state would be lost.

All 25 tests pass (24 unit + 1 integration), ruff clean.


Devin session

Copy link
Copy Markdown
Contributor

@pnilan Patrick Nilan (pnilan) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Feedback

Change failure type from transient_error to system_error

The MemoryLimitExceeded exception should use FailureType.system_error, not transient_error. The goals of this feature are:

  1. Observability — we'll know when a source fails due to OOM before it happens, see it in logs, and triage/debug more easily
  2. Graceful shutdown — flush messages and shut down cleanly instead of silent OOM kills that cause cascading failures to the orchestrator/destination

This is an infrastructure/system issue, not something the end user caused. Using system_error ensures it surfaces to Sentry/oncall for investigation rather than being silently retried.

Update the user-facing message

The current message tells users to "Reduce the number of streams or increase memory allocation." — the user didn't do anything wrong here and can't fix this themselves. Change it to something like:

message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.",
failure_type=FailureType.system_error,

Update tests accordingly

The tests that assert FailureType.transient_error will need to be updated to FailureType.system_error:

  • test_critical_at_95_percent_raises in test_memory_monitor.py
  • test_memory_limit_exceeded_flushes_queued_messages in test_entrypoint.py
  • test_default_attributes in test_memory_monitor.py

…facing message

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Done — pushed a021eb7d:

  1. FailureType.system_errorMemoryLimitExceeded now uses system_error so it surfaces to Sentry/oncall instead of being silently retried.
  2. User-facing message updated — removed remediation advice; now reads: "Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash."
  3. Tests updatedtest_critical_at_95_percent_raises, test_default_attributes, test_is_airbyte_traced_exception, and test_memory_limit_exceeded_flushes_queued_messages all assert system_error. All 25 tests passing.

Devin session

@pnilan
Copy link
Copy Markdown
Contributor

Make DEFAULT_CHECK_INTERVAL private and drop circular tests

DEFAULT_CHECK_INTERVAL is exported as a public module-level constant, but nothing outside of tests uses it. The MemoryMonitor constructor already accepts check_interval as a parameter with a default value — that's the proper public API for configuring this.

The only consumer of DEFAULT_CHECK_INTERVAL is test_memory_monitor.py, which imports it to assert == 1000. That test is circular — it's testing that a constant equals itself. It doesn't validate any behavior.

Recommendation: Rename to _DEFAULT_CHECK_INTERVAL (private) and drop the two tests in TestDefaultCheckInterval entirely. They add no value. If someone changes the default interval, the relevant behavioral tests (like test_check_interval_skips_intermediate_calls) will catch any regressions.

…tests

Co-Authored-By: bot_apk <apk@cognition.ai>
@pnilan
Copy link
Copy Markdown
Contributor

(aside)

End-to-End Verification: Graceful Shutdown Message Delivery

Investigated whether the platform orchestrator fully consumes the finally-block messages when MemoryLimitExceeded is raised. Confirmed: it works end-to-end.

Python side (generator semantics)

When MemoryLimitExceeded is raised inside read()'s try block, Python's generator protocol allows the finally block to yield messages while the exception is "pending". The generator suspends, the caller (launch()'s for loop) receives the yielded value, and only after the finally block completes does the exception propagate. So:

  1. read()'s finally → yields queued state messages (with correct recordCount) → printed to stdout
  2. run()'s finally → yields additional queued messages → printed to stdout
  3. Exception propagates → uncaught exception handler emits TRACE message → printed to stdout
  4. Process exits with non-zero code

Platform side (orchestrator)

The orchestrator reads source output from a named pipe (FIFO) via BufferedReader.lines() in VersionedAirbyteStreamFactory, which reads until EOF — not until a specific message type or error condition. The SourceReader loop in ReplicationTask.kt consumes all messages before checking the exit code:

  1. Named pipe produces EOF only when the source process exits
  2. SourceReader drains all messages from the pipe first
  3. Exit code is written atomically by the shell wrapper (ContainerCommandFactory.kt) after the process exits
  4. Orchestrator checks exit code only after the iterator is exhausted

Conclusion

All finally-block messages — including the last checkpoint state with correct sourceStats.recordCount and the TRACE message from the uncaught exception handler — are written to stdout, consumed by the platform via the named pipe, and processed before the exit code is checked. No messages are lost.

@pnilan
Copy link
Copy Markdown
Contributor

Move memory check back to after yield

The current order (check before yield) silently drops one message on critical shutdown:

for message in self.source.read(self.logger, config, catalog, state):
    self._memory_monitor.check_memory_usage()  # raises here
    yield self.handle_record_counts(message, stream_message_counter)  # never reached

The dropped message was already read from the iterator into memory, so skipping the yield doesn't reclaim anything. You're paying the memory cost either way — the only difference is whether the downstream consumer gets the data.

Recommendation: Move the check back to after yield:

for message in self.source.read(self.logger, config, catalog, state):
    yield self.handle_record_counts(message, stream_message_counter)
    self._memory_monitor.check_memory_usage()

This way:

  • Every message pulled from the source is emitted — zero data loss
  • recordCount on state messages is always accurate
  • The finally block still guarantees queued state messages are flushed with correct counts on shutdown
  • One extra yielded message at 95% memory won't cause OOM — it's just passing a reference to the caller

The integration test recordCount should go back to 2.0.

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Good call — moved the check back to after yield in 0485fa2. The message is already in memory either way, so emitting it ensures zero data loss and accurate recordCount.

for message in self.source.read(self.logger, config, catalog, state):
    yield self.handle_record_counts(message, stream_message_counter)
    self._memory_monitor.check_memory_usage()

Integration test recordCount is back to 2.0.


Devin session

Co-Authored-By: bot_apk <apk@cognition.ai>
@pnilan
Copy link
Copy Markdown
Contributor

I am currently testing this w/ a dev image of source-sample-data in a personal workspace. In this dev image, I also attempt to force an OOM scenario. TBD how this goes.

@pnilan
Copy link
Copy Markdown
Contributor

Minor readability nit: the internal_message and warning log report memory in raw bytes (e.g., 960000000 / 1000000000 bytes), which is hard to parse at a glance. Since our usage, requested allocation, and limit allocation are typically on the order of gigabytes, consider converting to GB in the formatted output:

usage_gb = usage_bytes / (1024 ** 3)
limit_gb = limit_bytes / (1024 ** 3)

Then use those in the messages, e.g.:

internal_message=f"Memory usage is {usage_percent}% ({usage_gb:.2f} / {limit_gb:.2f} GB). "
    f"Critical threshold is {int(self._critical_threshold * 100)}%.",
logger.warning(
    "Source memory usage reached %d%% of container limit (%.2f / %.2f GB).",
    usage_percent,
    usage_gb,
    limit_gb,
)

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Good suggestion — updated in 4dda57c. Both internal_message and warning log now display GB:

Source memory usage reached 87% of container limit (0.81 / 0.93 GB).
Memory usage is 96% (0.89 / 0.93 GB). Critical threshold is 95%.

Devin session

@pnilan
Copy link
Copy Markdown
Contributor

MemoryLimitExceeded subclassing AirbyteTracedException deviates from the established pattern in the codebase. Subclassing AirbyteTracedException is rare (only 3-4 cases), and the dominant convention for known error conditions is to raise AirbyteTracedException directly with message, internal_message, and failure_type:

raise AirbyteTracedException(
    internal_message=f"Memory usage is {usage_percent}% ...",
    message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.",
    failure_type=FailureType.system_error,
)

The MemoryLimitExceeded subclass only adds value if callers need to catch it specifically, but the existing sys.excepthook in exception_handler.py already handles any AirbyteTracedException — so a plain AirbyteTracedException would work the same way and stay consistent with the rest of the codebase.

Recommend removing the MemoryLimitExceeded class and raising AirbyteTracedException directly.

@pnilan Patrick Nilan (pnilan) marked this pull request as ready for review March 10, 2026 04:52
Copilot AI review requested due to automatic review settings March 10, 2026 04:52
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a source-side memory monitoring mechanism to the Python CDK to detect container memory pressure (via Linux cgroup v1/v2 files) and trigger a graceful shutdown before an OOM kill, integrated into the core AirbyteEntrypoint.read() loop.

Changes:

  • Added MemoryMonitor + MemoryLimitExceeded to read cgroup memory usage/limit and enforce warning/critical thresholds.
  • Integrated memory checks into AirbyteEntrypoint.read() and wrapped the read loop with try/finally to flush queued messages (including state) on early termination.
  • Added comprehensive unit tests for MemoryMonitor and an integration test ensuring queued messages flush when MemoryLimitExceeded occurs mid-read.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
airbyte_cdk/utils/memory_monitor.py New memory monitoring utility reading cgroup files and raising MemoryLimitExceeded at critical threshold.
airbyte_cdk/entrypoint.py Instantiates and invokes MemoryMonitor during read(); ensures queued messages are flushed in finally.
unit_tests/utils/test_memory_monitor.py New unit tests covering thresholds, v1/v2 behavior, no-op paths, degradation on read errors, and check interval behavior.
unit_tests/test_entrypoint.py Integration test validating queued state messages are flushed and record counts are preserved when memory exception propagates.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

devin-ai-integration bot and others added 2 commits March 10, 2026 04:56
…edException directly

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Addressed both review items:

  1. MemoryLimitExceeded removed (c96825f) — now raises AirbyteTracedException directly, consistent with the dominant codebase pattern. All tests updated.
  2. check_interval validation (cdc518c) — __init__ now raises ValueError if check_interval < 1, with two new tests covering zero and negative values.

PR description also updated to reflect current semantics (after-yield check, no custom subclass, GB formatting).


Devin session

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I'd just consider simplifying in case V1 is something that's no longer used (I don't know if that's the case)

@pnilan
Copy link
Copy Markdown
Contributor

Proposed changes: logging-only trial period

After discussion, we'd like to adjust the memory monitor behavior for an initial trial period before enabling fail-fast. The goal is to collect production data on memory usage patterns without risking premature sync failures.

Why

  • memory.current (cgroup v2) / memory.usage_in_bytes (cgroup v1) includes kernel page cache, not just RSS. Connectors using ResponseToFileExtractor or file-based sources can show 95%+ cgroup usage where much of it is reclaimable page cache the kernel will happily drop under pressure.
  • Anatolii Yatsuk (@tolik0) observed syncs that hit the memory limit and then dropped back down — this is likely kernel page cache reclaim in action, which is normal and healthy behavior.
  • Connector pods have restart policy "Never", so an OOM kill is already fatal. The value of this monitor is giving a clean error message instead of an opaque OOMKilled — but only once we're confident the threshold reflects genuine pressure.
  • We suspect most syncs that repeatedly hit the ceiling are destined to fail eventually, but we want data to confirm before enabling fail-fast.

Requested changes

1. Remove the exception at 95% — log only (ERROR level) for now

Replace the AirbyteTracedException raise at the critical threshold with an ERROR-level log. This gives us visibility in monitoring without killing syncs.

2. Change check interval from 1000 to 5000 messages

Reduces I/O overhead and log volume.

3. Log on every check above 90% (not just once)

Instead of the current one-shot _warning_emitted / _critical_raised flags, log at every check interval (every 5000 messages) when usage is above 90%. This gives us breadcrumb trails showing whether memory is climbing, plateauing, or sawtoothing — critical for understanding real-world behavior.

Remove the _warning_emitted and _critical_raised flags. The simplified logic would be:

def check_memory_usage(self) -> None:
    self._probe_cgroup()
    if self._cgroup_version is None:
        return

    self._message_count += 1
    if self._message_count % self._check_interval != 0:
        return

    memory_info = self._read_memory()
    if memory_info is None:
        return

    usage_bytes, limit_bytes = memory_info
    usage_ratio = usage_bytes / limit_bytes
    usage_percent = int(usage_ratio * 100)
    usage_gb = usage_bytes / (1024**3)
    limit_gb = limit_bytes / (1024**3)

    if usage_ratio >= 0.90:
        logger.warning(
            "Source memory usage at %d%% of container limit (%.2f / %.2f GB).",
            usage_percent,
            usage_gb,
            limit_gb,
        )

4. Simplify constructor

With the above changes:

  • Remove warning_threshold and critical_threshold params (hardcode 90% for now)
  • Remove _warning_emitted and _critical_raised state
  • Default check_interval to 5000

5. Update tests

  • Remove all AirbyteTracedException / critical-threshold tests
  • Remove one-shot flag tests (test_warning_emitted_only_once, test_critical_raised_only_once)
  • Add test that logging occurs on every check above 90%
  • Update check interval tests to use 5000

Future follow-up (not for this PR)

Once we have production data:

  • If most 90%+ syncs eventually OOM → re-enable fail-fast with confidence
  • Consider reading memory.stat and subtracting inactive_file (reclaimable page cache) for more accurate pressure detection
  • Consider whether cgroup v1 support can be dropped (checked platform — GKE is on v2, but EKS Paris dataplane may still be on v1, and OSS users could be on either)

Per pnilan's review feedback:
- Remove AirbyteTracedException raise at critical threshold — log only
- Change check_interval default from 1000 to 5000 messages
- Log WARNING on every check above 90% (remove one-shot flags)
- Simplify constructor (remove threshold params and state flags)
- Remove try/finally from entrypoint read() (no longer raising exceptions)
- Add TODO comment for cgroup v1 removal
- Update tests: remove exception/one-shot tests, add repeated logging test

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration devin-ai-integration bot changed the title feat(cdk): add memory monitor for graceful shutdown before OOM kill feat(cdk): add source-side memory monitoring (logging-only trial) Mar 10, 2026
@pnilan Patrick Nilan (pnilan) enabled auto-merge (squash) March 10, 2026 21:15
@pnilan Patrick Nilan (pnilan) merged commit 3e65ad5 into main Mar 10, 2026
29 of 30 checks passed
@pnilan Patrick Nilan (pnilan) deleted the devin/1773091366-add-memory-monitor branch March 10, 2026 21:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants