Skip to content

Commit bf9db5f

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
refactor(cdk): switch memory monitor to logging-only trial mode
Per pnilan's review feedback: - Remove AirbyteTracedException raise at critical threshold — log only - Change check_interval default from 1000 to 5000 messages - Log WARNING on every check above 90% (remove one-shot flags) - Simplify constructor (remove threshold params and state flags) - Remove try/finally from entrypoint read() (no longer raising exceptions) - Add TODO comment for cgroup v1 removal - Update tests: remove exception/one-shot tests, add repeated logging test Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent cdc518c commit bf9db5f

4 files changed

Lines changed: 44 additions & 220 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -279,13 +279,11 @@ def read(
279279

280280
# The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows
281281
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
282-
try:
283-
for message in self.source.read(self.logger, config, catalog, state):
284-
yield self.handle_record_counts(message, stream_message_counter)
285-
self._memory_monitor.check_memory_usage()
286-
finally:
287-
for message in self._emit_queued_messages(self.source):
288-
yield self.handle_record_counts(message, stream_message_counter)
282+
for message in self.source.read(self.logger, config, catalog, state):
283+
yield self.handle_record_counts(message, stream_message_counter)
284+
self._memory_monitor.check_memory_usage()
285+
for message in self._emit_queued_messages(self.source):
286+
yield self.handle_record_counts(message, stream_message_counter)
289287

290288
@staticmethod
291289
def handle_record_counts(

airbyte_cdk/utils/memory_monitor.py

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,49 @@
22
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
33
#
44

5-
"""Source-side memory introspection to emit controlled error messages before OOM kills."""
5+
"""Source-side memory introspection to log memory usage approaching container limits."""
66

77
import logging
88
from pathlib import Path
99
from typing import Optional
1010

11-
from airbyte_cdk.models import FailureType
12-
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
13-
1411
logger = logging.getLogger("airbyte")
1512

1613
# cgroup v2 paths
1714
_CGROUP_V2_CURRENT = Path("/sys/fs/cgroup/memory.current")
1815
_CGROUP_V2_MAX = Path("/sys/fs/cgroup/memory.max")
1916

20-
# cgroup v1 paths
17+
# cgroup v1 paths — TODO: remove if all deployments are confirmed cgroup v2
2118
_CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
2219
_CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")
2320

24-
# Default thresholds
25-
_DEFAULT_WARNING_THRESHOLD = 0.85
26-
_DEFAULT_CRITICAL_THRESHOLD = 0.95
21+
# Log when usage is at or above 90%
22+
_MEMORY_THRESHOLD = 0.90
2723

2824
# Check interval (every N messages)
29-
_DEFAULT_CHECK_INTERVAL = 1000
25+
_DEFAULT_CHECK_INTERVAL = 5000
3026

3127

3228
class MemoryMonitor:
33-
"""Monitors container memory usage via cgroup files and emits warnings before OOM kills.
29+
"""Monitors container memory usage via cgroup files and logs warnings when usage is high.
3430
3531
Lazily probes cgroup v2 then v1 files on the first call to
3632
``check_memory_usage()``. Caches which version exists.
3733
If neither is found (local dev / CI), all subsequent calls are instant no-ops.
34+
35+
Logs a WARNING on every check interval (default 5000 messages) when memory
36+
usage is at or above 90% of the container limit. This gives breadcrumb
37+
trails showing whether memory is climbing, plateauing, or sawtoothing.
3838
"""
3939

4040
def __init__(
4141
self,
42-
warning_threshold: float = _DEFAULT_WARNING_THRESHOLD,
43-
critical_threshold: float = _DEFAULT_CRITICAL_THRESHOLD,
4442
check_interval: int = _DEFAULT_CHECK_INTERVAL,
4543
) -> None:
4644
if check_interval < 1:
4745
raise ValueError(f"check_interval must be >= 1, got {check_interval}")
48-
self._warning_threshold = warning_threshold
49-
self._critical_threshold = critical_threshold
5046
self._check_interval = check_interval
5147
self._message_count = 0
52-
self._warning_emitted = False
53-
self._critical_raised = False
5448
self._cgroup_version: Optional[int] = None
5549
self._probed = False
5650

@@ -108,17 +102,15 @@ def _read_memory(self) -> Optional[tuple[int, int]]:
108102
return None
109103

110104
def check_memory_usage(self) -> None:
111-
"""Check memory usage against thresholds.
105+
"""Check memory usage and log when above 90%.
112106
113107
Intended to be called on every message. The monitor internally tracks
114108
a message counter and only reads cgroup files every ``check_interval``
115-
messages (default 1000) to minimise I/O overhead.
109+
messages (default 5000) to minimise I/O overhead.
116110
117-
At the warning threshold (default 85%), logs a warning message.
118-
At the critical threshold (default 95%), raises ``AirbyteTracedException``
119-
to trigger a graceful shutdown with an actionable error message.
111+
Logs a WARNING on every check above 90% to provide breadcrumb trails
112+
showing memory trends over the sync lifetime.
120113
121-
Each threshold triggers at most once per sync to avoid log spam.
122114
This method is a no-op if cgroup files are unavailable.
123115
"""
124116
self._probe_cgroup()
@@ -139,19 +131,9 @@ def check_memory_usage(self) -> None:
139131
usage_gb = usage_bytes / (1024**3)
140132
limit_gb = limit_bytes / (1024**3)
141133

142-
if usage_ratio >= self._critical_threshold and not self._critical_raised:
143-
self._critical_raised = True
144-
raise AirbyteTracedException(
145-
internal_message=f"Memory usage is {usage_percent}% ({usage_gb:.2f} / {limit_gb:.2f} GB). "
146-
f"Critical threshold is {int(self._critical_threshold * 100)}%.",
147-
message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.",
148-
failure_type=FailureType.system_error,
149-
)
150-
151-
if usage_ratio >= self._warning_threshold and not self._warning_emitted:
152-
self._warning_emitted = True
134+
if usage_ratio >= _MEMORY_THRESHOLD:
153135
logger.warning(
154-
"Source memory usage reached %d%% of container limit (%.2f / %.2f GB).",
136+
"Source memory usage at %d%% of container limit (%.2f / %.2f GB).",
155137
usage_percent,
156138
usage_gb,
157139
limit_gb,

unit_tests/test_entrypoint.py

Lines changed: 0 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -835,97 +835,6 @@ def test_handle_record_counts(
835835
)
836836

837837

838-
def test_memory_limit_exceeded_flushes_queued_messages(mocker, spec_mock, config_mock):
839-
"""When AirbyteTracedException is raised mid-read, queued messages should still be flushed.
840-
841-
The read() try/finally ensures _emit_queued_messages runs even when
842-
AirbyteTracedException propagates. The exception still surfaces to the
843-
caller, but all messages yielded before (records) and during (finally-
844-
block state messages) the exception are available to the consumer.
845-
"""
846-
queued_state = AirbyteMessage(
847-
type=Type.STATE,
848-
state=AirbyteStateMessage(
849-
type=AirbyteStateType.STREAM,
850-
stream=AirbyteStreamState(
851-
stream_descriptor=StreamDescriptor(name="stream"),
852-
stream_state=AirbyteStateBlob(updated_at="2026-01-01"),
853-
),
854-
),
855-
)
856-
message_repository = MagicMock()
857-
# consume_queue calls:
858-
# 1. run() preamble → initial queued control message
859-
# 2. read() finally block → queued state (the key assertion)
860-
# 3. run() outer finally → nothing
861-
message_repository.consume_queue.side_effect = [
862-
[MESSAGE_FROM_REPOSITORY],
863-
[queued_state],
864-
[],
865-
]
866-
mocker.patch.object(
867-
MockSource,
868-
"message_repository",
869-
new_callable=mocker.PropertyMock,
870-
return_value=message_repository,
871-
)
872-
entrypoint = AirbyteEntrypoint(MockSource())
873-
874-
record = AirbyteMessage(
875-
type=Type.RECORD,
876-
record=AirbyteRecordMessage(stream="stream", data={"id": "1"}, emitted_at=1),
877-
)
878-
mocker.patch.object(MockSource, "read_state", return_value={})
879-
mocker.patch.object(MockSource, "read_catalog", return_value={})
880-
mocker.patch.object(MockSource, "read", return_value=[record, record])
881-
882-
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
883-
884-
call_count = 0
885-
886-
def _raise_on_second_call() -> None:
887-
nonlocal call_count
888-
call_count += 1
889-
if call_count >= 2:
890-
raise AirbyteTracedException(
891-
internal_message="Memory at 96%",
892-
message="Source exceeded memory limit (96% used) and must shut down to avoid an out-of-memory crash.",
893-
failure_type=FailureType.system_error,
894-
)
895-
896-
mocker.patch.object(
897-
entrypoint._memory_monitor, "check_memory_usage", side_effect=_raise_on_second_call
898-
)
899-
900-
parsed_args = Namespace(
901-
command="read", config="config_path", state="statepath", catalog="catalogpath"
902-
)
903-
904-
# The generator yields messages until AirbyteTracedException propagates.
905-
# Collect everything yielded before the exception surfaces.
906-
messages: list[str] = []
907-
with pytest.raises(AirbyteTracedException):
908-
for msg in entrypoint.run(parsed_args):
909-
messages.append(msg)
910-
911-
# 1. Both records were yielded before the exception — the memory check
912-
# runs after yield so every message pulled from the source is emitted.
913-
record_messages = [m for m in messages if "RECORD" in m]
914-
assert len(record_messages) == 2, "Both records should be yielded before AirbyteTracedException"
915-
916-
# 2. The queued state message was flushed by the finally block
917-
state_messages = [m for m in messages if "STATE" in m]
918-
assert len(state_messages) >= 1, (
919-
"Queued state message should be flushed even after AirbyteTracedException"
920-
)
921-
922-
# 3. The flushed state has sourceStats.recordCount set by handle_record_counts.
923-
# Both records are yielded (and counted) before the second check_memory_usage
924-
# raises, so the counter is 2.0 at flush time.
925-
state_json = orjson.loads(state_messages[0])
926-
assert state_json["state"]["sourceStats"]["recordCount"] == 2.0
927-
928-
929838
def test_given_serialization_error_using_orjson_then_fallback_on_json(
930839
entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock
931840
):

0 commit comments

Comments
 (0)