feat(cdk): add source-side memory monitoring (logging-only trial)#941
feat(cdk): add source-side memory monitoring (logging-only trial)#941Patrick Nilan (pnilan) merged 16 commits intomainfrom
Conversation
…ror messages before OOM kills Add MemoryMonitor class that reads cgroup v2/v1 memory files to detect memory pressure in containerized environments. Integrates into the AirbyteEntrypoint.read() loop to check memory every 1000 messages. - At 85% usage: logs a warning message - At 95% usage: raises MemoryLimitExceeded (AirbyteTracedException with transient_error failure type) for graceful shutdown - No-op when cgroup files unavailable (local dev / CI) - No new dependencies required Related to airbytehq/airbyte-internal-issues#15938 Co-Authored-By: bot_apk <apk@cognition.ai>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773091366-add-memory-monitor#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773091366-add-memory-monitorPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: bot_apk <apk@cognition.ai>
|
Co-Authored-By: bot_apk <apk@cognition.ai>
|
Patrick Nilan (@pnilan) Great catch — addressed in b2e233f. Added two new tests covering both cases:
All 20 tests pass, ruff clean. |
Exception propagation —
|
… shutdown Co-Authored-By: bot_apk <apk@cognition.ai>
|
Patrick Nilan (@pnilan) Good catch — addressed in ee93565. The Regarding the trace message emission: you're right that the exception propagates through The trace message ( All 69 tests pass (20 memory monitor + 49 entrypoint), ruff clean. |
Encapsulate check interval inside
|
Move message counting and modulo logic from the entrypoint into MemoryMonitor.check_memory_usage(). The caller now simply calls check_memory_usage() on every message and the monitor decides internally whether to read cgroup files based on check_interval. This removes the coupling between the entrypoint and the monitor's internal check cadence. Co-Authored-By: bot_apk <apk@cognition.ai>
|
Re: encapsulate check interval inside MemoryMonitor — good call, done in cca6c50. The monitor now owns the message counter and interval logic internally: # MemoryMonitor.__init__ now accepts check_interval (default 1000)
self._check_interval = check_interval
self._message_count = 0
# check_memory_usage() increments and checks internally
self._message_count += 1
if self._message_count % self._check_interval != 0:
returnThe entrypoint simplifies to: memory_monitor = MemoryMonitor()
try:
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
memory_monitor.check_memory_usage()
finally:
...Also added a Devin session (on behalf of Patrick Nilan (@pnilan)) |
Move
|
Instantiate MemoryMonitor in __init__ so it persists for the lifetime of the entrypoint instance. This makes it available as self._memory_monitor for read() and any future command that does significant work (e.g. check). Also avoids resetting warning/critical flags if read() were ever called more than once. Co-Authored-By: bot_apk <apk@cognition.ai>
|
Re: Move MemoryMonitor to AirbyteEntrypoint.init — done in ec56943.
All 70 tests pass, ruff clean. Devin session (on behalf of Patrick Nilan (@pnilan)) |
Lazy-initialize cgroup detection
Suggestion: Defer cgroup version detection to the first call to def __init__(self, ...) -> None:
self._warning_threshold = warning_threshold
self._critical_threshold = critical_threshold
self._warning_emitted = False
self._critical_raised = False
self._cgroup_version: Optional[int] = None
self._probed = False
def _probe_cgroup(self) -> None:
if self._probed:
return
self._probed = True
if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists():
self._cgroup_version = 2
elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists():
self._cgroup_version = 1
def check_memory_usage(self) -> None:
self._probe_cgroup()
if self._cgroup_version is None:
return
# ... rest of check logicThis way the filesystem probing only happens when monitoring is actually needed (during |
Missing test coverage1. No test for I/O errors in
|
Missing integration test: graceful shutdown flushes queued state messagesThe core value proposition of this PR is graceful shutdown, but there's no test verifying that when The existing test infrastructure makes this straightforward to add. Follow the pattern from Steps
Example sketchdef test_memory_limit_exceeded_flushes_queued_messages(
entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock
):
parsed_args = Namespace(
command="read", config="config_path", state="statepath", catalog="catalogpath"
)
records = [
AirbyteMessage(record=AirbyteRecordMessage(stream="s", data={}, emitted_at=1), type=Type.RECORD),
AirbyteMessage(record=AirbyteRecordMessage(stream="s", data={}, emitted_at=1), type=Type.RECORD),
]
mocker.patch.object(MockSource, "read_state", return_value={})
mocker.patch.object(MockSource, "read_catalog", return_value={})
mocker.patch.object(MockSource, "read", return_value=iter(records))
# Patch monitor to raise after first check
mocker.patch.object(
MemoryMonitor, "check_memory_usage",
side_effect=[None, MemoryLimitExceeded(
internal_message="Memory at 96%",
message="Source exceeded memory limit.",
failure_type=FailureType.transient_error,
)],
)
yielded = []
with pytest.raises(MemoryLimitExceeded):
for msg in entrypoint.run(parsed_args):
yielded.append(msg)
# Assert queued messages were flushed (from the try/finally in read())
# Assert state messages have correct sourceStats.recordCountNote: This test depends on the try/finally fix from the earlier comment (wrapping the read loop so |
…wn integration test - Complete test_memory_monitor.py with 24 tests across 4 test classes - TestMemoryMonitorInit: cgroup v1/v2 detection, lazy-init verification - TestMemoryMonitorCheckMemory: thresholds, error degradation, intervals, limit_bytes==0 - TestMemoryLimitExceeded: exception type and attribute validation - TestDefaultCheckInterval: constant value verification - Add test_memory_limit_exceeded_flushes_queued_messages to test_entrypoint.py verifying that try/finally in read() flushes queued state messages even when MemoryLimitExceeded propagates Co-Authored-By: bot_apk <apk@cognition.ai>
|
Patrick Nilan (@pnilan) Re: Lazy-initialize cgroup detection — done in 7f7ab40.
def __init__(self, ...) -> None:
self._cgroup_version: Optional[int] = None
self._probed = False
# ... thresholds and counters only
def _probe_cgroup(self) -> None:
if self._probed:
return
self._probed = True
if _CGROUP_V2_CURRENT.exists() and _CGROUP_V2_MAX.exists():
self._cgroup_version = 2
elif _CGROUP_V1_USAGE.exists() and _CGROUP_V1_LIMIT.exists():
self._cgroup_version = 1This means |
|
Patrick Nilan (@pnilan) Re: Missing test coverage — both gaps addressed in 7f7ab40. 1. I/O errors in 2. All 24 unit tests pass in |
|
Patrick Nilan (@pnilan) Re: Missing integration test: graceful shutdown flushes queued state messages — done in 7f7ab40. Added
This directly proves the All 25 tests pass (24 unit + 1 integration), ruff clean. |
Patrick Nilan (pnilan)
left a comment
There was a problem hiding this comment.
Review Feedback
Change failure type from transient_error to system_error
The MemoryLimitExceeded exception should use FailureType.system_error, not transient_error. The goals of this feature are:
- Observability — we'll know when a source fails due to OOM before it happens, see it in logs, and triage/debug more easily
- Graceful shutdown — flush messages and shut down cleanly instead of silent OOM kills that cause cascading failures to the orchestrator/destination
This is an infrastructure/system issue, not something the end user caused. Using system_error ensures it surfaces to Sentry/oncall for investigation rather than being silently retried.
Update the user-facing message
The current message tells users to "Reduce the number of streams or increase memory allocation." — the user didn't do anything wrong here and can't fix this themselves. Change it to something like:
message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.",
failure_type=FailureType.system_error,Update tests accordingly
The tests that assert FailureType.transient_error will need to be updated to FailureType.system_error:
test_critical_at_95_percent_raisesintest_memory_monitor.pytest_memory_limit_exceeded_flushes_queued_messagesintest_entrypoint.pytest_default_attributesintest_memory_monitor.py
…facing message Co-Authored-By: bot_apk <apk@cognition.ai>
|
Done — pushed
|
Make
|
…tests Co-Authored-By: bot_apk <apk@cognition.ai>
|
(aside) End-to-End Verification: Graceful Shutdown Message DeliveryInvestigated whether the platform orchestrator fully consumes the finally-block messages when Python side (generator semantics)When
Platform side (orchestrator)The orchestrator reads source output from a named pipe (FIFO) via
ConclusionAll finally-block messages — including the last checkpoint state with correct |
Move memory check back to after yieldThe current order (check before yield) silently drops one message on critical shutdown: for message in self.source.read(self.logger, config, catalog, state):
self._memory_monitor.check_memory_usage() # raises here
yield self.handle_record_counts(message, stream_message_counter) # never reachedThe dropped message was already read from the iterator into memory, so skipping the yield doesn't reclaim anything. You're paying the memory cost either way — the only difference is whether the downstream consumer gets the data. Recommendation: Move the check back to after yield: for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
self._memory_monitor.check_memory_usage()This way:
The integration test |
Co-Authored-By: bot_apk <apk@cognition.ai>
|
Good call — moved the check back to after yield in 0485fa2. The message is already in memory either way, so emitting it ensures zero data loss and accurate for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
self._memory_monitor.check_memory_usage()Integration test |
Co-Authored-By: bot_apk <apk@cognition.ai>
|
I am currently testing this w/ a dev image of source-sample-data in a personal workspace. In this dev image, I also attempt to force an OOM scenario. TBD how this goes. |
|
Minor readability nit: the usage_gb = usage_bytes / (1024 ** 3)
limit_gb = limit_bytes / (1024 ** 3)Then use those in the messages, e.g.: internal_message=f"Memory usage is {usage_percent}% ({usage_gb:.2f} / {limit_gb:.2f} GB). "
f"Critical threshold is {int(self._critical_threshold * 100)}%.",logger.warning(
"Source memory usage reached %d%% of container limit (%.2f / %.2f GB).",
usage_percent,
usage_gb,
limit_gb,
) |
Co-Authored-By: bot_apk <apk@cognition.ai>
|
Good suggestion — updated in 4dda57c. Both |
|
raise AirbyteTracedException(
internal_message=f"Memory usage is {usage_percent}% ...",
message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.",
failure_type=FailureType.system_error,
)The Recommend removing the |
There was a problem hiding this comment.
Pull request overview
Adds a source-side memory monitoring mechanism to the Python CDK to detect container memory pressure (via Linux cgroup v1/v2 files) and trigger a graceful shutdown before an OOM kill, integrated into the core AirbyteEntrypoint.read() loop.
Changes:
- Added
MemoryMonitor+MemoryLimitExceededto read cgroup memory usage/limit and enforce warning/critical thresholds. - Integrated memory checks into
AirbyteEntrypoint.read()and wrapped the read loop withtry/finallyto flush queued messages (including state) on early termination. - Added comprehensive unit tests for
MemoryMonitorand an integration test ensuring queued messages flush whenMemoryLimitExceededoccurs mid-read.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
airbyte_cdk/utils/memory_monitor.py |
New memory monitoring utility reading cgroup files and raising MemoryLimitExceeded at critical threshold. |
airbyte_cdk/entrypoint.py |
Instantiates and invokes MemoryMonitor during read(); ensures queued messages are flushed in finally. |
unit_tests/utils/test_memory_monitor.py |
New unit tests covering thresholds, v1/v2 behavior, no-op paths, degradation on read errors, and check interval behavior. |
unit_tests/test_entrypoint.py |
Integration test validating queued state messages are flushed and record counts are preserved when memory exception propagates. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…edException directly Co-Authored-By: bot_apk <apk@cognition.ai>
Co-Authored-By: bot_apk <apk@cognition.ai>
|
Addressed both review items:
PR description also updated to reflect current semantics (after-yield check, no custom subclass, GB formatting). |
Rodi Reich Zilberman (rodireich)
left a comment
There was a problem hiding this comment.
Looks good. I'd just consider simplifying in case V1 is something that's no longer used (I don't know if that's the case)
Proposed changes: logging-only trial periodAfter discussion, we'd like to adjust the memory monitor behavior for an initial trial period before enabling fail-fast. The goal is to collect production data on memory usage patterns without risking premature sync failures. Why
Requested changes1. Remove the exception at 95% — log only (ERROR level) for now Replace the 2. Change check interval from 1000 to 5000 messages Reduces I/O overhead and log volume. 3. Log on every check above 90% (not just once) Instead of the current one-shot Remove the def check_memory_usage(self) -> None:
self._probe_cgroup()
if self._cgroup_version is None:
return
self._message_count += 1
if self._message_count % self._check_interval != 0:
return
memory_info = self._read_memory()
if memory_info is None:
return
usage_bytes, limit_bytes = memory_info
usage_ratio = usage_bytes / limit_bytes
usage_percent = int(usage_ratio * 100)
usage_gb = usage_bytes / (1024**3)
limit_gb = limit_bytes / (1024**3)
if usage_ratio >= 0.90:
logger.warning(
"Source memory usage at %d%% of container limit (%.2f / %.2f GB).",
usage_percent,
usage_gb,
limit_gb,
)4. Simplify constructor With the above changes:
5. Update tests
Future follow-up (not for this PR)Once we have production data:
|
Per pnilan's review feedback: - Remove AirbyteTracedException raise at critical threshold — log only - Change check_interval default from 1000 to 5000 messages - Log WARNING on every check above 90% (remove one-shot flags) - Simplify constructor (remove threshold params and state flags) - Remove try/finally from entrypoint read() (no longer raising exceptions) - Add TODO comment for cgroup v1 removal - Update tests: remove exception/one-shot tests, add repeated logging test Co-Authored-By: bot_apk <apk@cognition.ai>
Summary
Adds a
MemoryMonitorclass to the Python CDK that reads Linux cgroup v2/v1 memory files to detect memory pressure inside containers. Integrated intoAirbyteEntrypoint.read(), it checks memory every 5000 messages:WARNINGon every check interval (e.g.,Source memory usage at 91% of container limit (0.85 / 0.93 GB).)This is a logging-only trial — no exceptions are raised and no syncs are terminated. The goal is to collect production data on memory usage patterns (climbing, plateauing, sawtoothing) before deciding on fail-fast thresholds. cgroup memory includes reclaimable kernel page cache, so aggressive thresholds could prematurely kill healthy syncs.
Warnings are logged on every check interval above 90% (not one-shot), providing breadcrumb trails for post-mortem analysis.
When cgroup files are absent (local dev, CI), all calls are instant no-ops. Zero new dependencies.
Related to https://github.com/airbytehq/airbyte-internal-issues/issues/15938
Updates since last revision
Switched from fail-fast (exception at 95%) to logging-only trial per reviewer feedback:
AirbyteTracedExceptionraise —WARNINGlog only at ≥90%check_intervalfrom 1000 → 5000 to reduce I/O overhead_warning_emitted,_critical_raised) — logs on every check above thresholdwarning_thresholdandcritical_thresholdparameterstry/finallyfromentrypoint.pyread loop (no exceptions to catch)Key design decisions
check_intervalvalidated >= 1: PreventsZeroDivisionErrorfrommessage_count % check_interval.Review & Testing Checklist for Human
Affects all Python CDK connectors: Integrated into
AirbyteEntrypoint.read(). Every Python source connector will now have memory monitoring active in containers with cgroup files. Verify no performance regression — the no-op path (no cgroup files, or between check intervals) must be effectively zero-cost.No real cgroup integration tests: All tests mock
Path.exists()andPath.read_text(). Verify mocking assumptions match actual cgroup behavior (e.g.,memory.max="max"for unlimited). Manual testing in a memory-constrained container is recommended.Log volume at scale: At 5000-message intervals with usage above 90%, a sync processing millions of records could produce many warning logs. Verify this is acceptable for log aggregation infrastructure.
Suggested test plan: Deploy a Python CDK connector in a memory-constrained container and run a sync against a high-volume source. Verify that (a) the warning log appears when usage crosses 90%, (b) warnings repeat at each 5000-message interval while above threshold, (c) syncs are not interrupted by the monitor, and (d) no performance degradation is observable compared to a baseline without the monitor.
Notes
test_memory_monitor.py+ entrypoint integration tests remain unchangedImportant
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.