Skip to content

Commit c96825f

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
refactor(cdk): remove MemoryLimitExceeded subclass, raise AirbyteTracedException directly
Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 4dda57c commit c96825f

3 files changed

Lines changed: 17 additions & 52 deletions

File tree

airbyte_cdk/utils/memory_monitor.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,6 @@
2929
_DEFAULT_CHECK_INTERVAL = 1000
3030

3131

32-
class MemoryLimitExceeded(AirbyteTracedException):
33-
"""Raised when connector memory usage exceeds critical threshold."""
34-
35-
pass
36-
37-
3832
class MemoryMonitor:
3933
"""Monitors container memory usage via cgroup files and emits warnings before OOM kills.
4034
@@ -119,8 +113,8 @@ def check_memory_usage(self) -> None:
119113
messages (default 1000) to minimise I/O overhead.
120114
121115
At the warning threshold (default 85%), logs a warning message.
122-
At the critical threshold (default 95%), raises MemoryLimitExceeded to
123-
trigger a graceful shutdown with an actionable error message.
116+
At the critical threshold (default 95%), raises ``AirbyteTracedException``
117+
to trigger a graceful shutdown with an actionable error message.
124118
125119
Each threshold triggers at most once per sync to avoid log spam.
126120
This method is a no-op if cgroup files are unavailable.
@@ -145,7 +139,7 @@ def check_memory_usage(self) -> None:
145139

146140
if usage_ratio >= self._critical_threshold and not self._critical_raised:
147141
self._critical_raised = True
148-
raise MemoryLimitExceeded(
142+
raise AirbyteTracedException(
149143
internal_message=f"Memory usage is {usage_percent}% ({usage_gb:.2f} / {limit_gb:.2f} GB). "
150144
f"Critical threshold is {int(self._critical_threshold * 100)}%.",
151145
message=f"Source exceeded memory limit ({usage_percent}% used) and must shut down to avoid an out-of-memory crash.",

unit_tests/test_entrypoint.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -836,10 +836,10 @@ def test_handle_record_counts(
836836

837837

838838
def test_memory_limit_exceeded_flushes_queued_messages(mocker, spec_mock, config_mock):
839-
"""When MemoryLimitExceeded is raised mid-read, queued messages should still be flushed.
839+
"""When AirbyteTracedException is raised mid-read, queued messages should still be flushed.
840840
841841
The read() try/finally ensures _emit_queued_messages runs even when
842-
MemoryLimitExceeded propagates. The exception still surfaces to the
842+
AirbyteTracedException propagates. The exception still surfaces to the
843843
caller, but all messages yielded before (records) and during (finally-
844844
block state messages) the exception are available to the consumer.
845845
"""
@@ -879,15 +879,15 @@ def test_memory_limit_exceeded_flushes_queued_messages(mocker, spec_mock, config
879879
mocker.patch.object(MockSource, "read_catalog", return_value={})
880880
mocker.patch.object(MockSource, "read", return_value=[record, record])
881881

882-
from airbyte_cdk.utils.memory_monitor import MemoryLimitExceeded
882+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
883883

884884
call_count = 0
885885

886886
def _raise_on_second_call() -> None:
887887
nonlocal call_count
888888
call_count += 1
889889
if call_count >= 2:
890-
raise MemoryLimitExceeded(
890+
raise AirbyteTracedException(
891891
internal_message="Memory at 96%",
892892
message="Source exceeded memory limit (96% used) and must shut down to avoid an out-of-memory crash.",
893893
failure_type=FailureType.system_error,
@@ -901,22 +901,22 @@ def _raise_on_second_call() -> None:
901901
command="read", config="config_path", state="statepath", catalog="catalogpath"
902902
)
903903

904-
# The generator yields messages until MemoryLimitExceeded propagates.
904+
# The generator yields messages until AirbyteTracedException propagates.
905905
# Collect everything yielded before the exception surfaces.
906906
messages: list[str] = []
907-
with pytest.raises(MemoryLimitExceeded):
907+
with pytest.raises(AirbyteTracedException):
908908
for msg in entrypoint.run(parsed_args):
909909
messages.append(msg)
910910

911911
# 1. Both records were yielded before the exception — the memory check
912912
# runs after yield so every message pulled from the source is emitted.
913913
record_messages = [m for m in messages if "RECORD" in m]
914-
assert len(record_messages) == 2, "Both records should be yielded before MemoryLimitExceeded"
914+
assert len(record_messages) == 2, "Both records should be yielded before AirbyteTracedException"
915915

916916
# 2. The queued state message was flushed by the finally block
917917
state_messages = [m for m in messages if "STATE" in m]
918918
assert len(state_messages) >= 1, (
919-
"Queued state message should be flushed even after MemoryLimitExceeded"
919+
"Queued state message should be flushed even after AirbyteTracedException"
920920
)
921921

922922
# 3. The flushed state has sourceStats.recordCount set by handle_record_counts.

unit_tests/utils/test_memory_monitor.py

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
_CGROUP_V1_USAGE,
1515
_CGROUP_V2_CURRENT,
1616
_CGROUP_V2_MAX,
17-
MemoryLimitExceeded,
1817
MemoryMonitor,
1918
)
19+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
2020

2121
_MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB
2222
_MOCK_USAGE_WARNING = "870000000\n" # 87% of 1 GB
@@ -160,27 +160,27 @@ def test_custom_thresholds_warning(caplog: pytest.LogCaptureFixture) -> None:
160160

161161

162162
def test_critical_at_95_percent_raises() -> None:
163-
"""MemoryLimitExceeded should be raised at 96% usage."""
163+
"""AirbyteTracedException should be raised at 96% usage."""
164164
monitor = MemoryMonitor(check_interval=1)
165165
with (
166166
patch.object(Path, "exists", _v2_exists),
167167
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_CRITICAL)),
168168
):
169-
with pytest.raises(MemoryLimitExceeded) as exc_info:
169+
with pytest.raises(AirbyteTracedException) as exc_info:
170170
monitor.check_memory_usage()
171171

172172
assert exc_info.value.failure_type == FailureType.system_error
173173
assert "96%" in (exc_info.value.message or "")
174174

175175

176176
def test_critical_raised_only_once() -> None:
177-
"""MemoryLimitExceeded should only be raised once."""
177+
"""AirbyteTracedException should only be raised once."""
178178
monitor = MemoryMonitor(check_interval=1)
179179
with (
180180
patch.object(Path, "exists", _v2_exists),
181181
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_CRITICAL)),
182182
):
183-
with pytest.raises(MemoryLimitExceeded):
183+
with pytest.raises(AirbyteTracedException):
184184
monitor.check_memory_usage()
185185
# Second call should NOT raise again
186186
monitor.check_memory_usage()
@@ -197,7 +197,7 @@ def test_custom_thresholds_critical() -> None:
197197
patch.object(Path, "exists", _v2_exists),
198198
patch.object(Path, "read_text", _v2_mock_read(usage="850000000\n")),
199199
):
200-
with pytest.raises(MemoryLimitExceeded):
200+
with pytest.raises(AirbyteTracedException):
201201
monitor.check_memory_usage()
202202

203203

@@ -293,32 +293,3 @@ def mock_read_text(self: Path) -> str:
293293
):
294294
monitor.check_memory_usage()
295295
assert not caplog.records
296-
297-
298-
# ---------------------------------------------------------------------------
299-
# MemoryLimitExceeded exception
300-
# ---------------------------------------------------------------------------
301-
302-
303-
def test_memory_limit_exceeded_is_airbyte_traced_exception() -> None:
304-
"""MemoryLimitExceeded should be a subclass of AirbyteTracedException."""
305-
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
306-
307-
exc = MemoryLimitExceeded(
308-
internal_message="test",
309-
message="test message",
310-
failure_type=FailureType.system_error,
311-
)
312-
assert isinstance(exc, AirbyteTracedException)
313-
314-
315-
def test_memory_limit_exceeded_attributes() -> None:
316-
"""MemoryLimitExceeded should have correct attributes."""
317-
exc = MemoryLimitExceeded(
318-
internal_message="Memory at 96%",
319-
message="Source exceeded memory limit.",
320-
failure_type=FailureType.system_error,
321-
)
322-
assert exc.failure_type == FailureType.system_error
323-
assert exc.message == "Source exceeded memory limit."
324-
assert exc.internal_message == "Memory at 96%"

0 commit comments

Comments
 (0)