Skip to content

Commit 5f7e16d

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
refactor(cdk): move memory check before yield, test observable behavior via caplog
Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent fc71aba commit 5f7e16d

3 files changed

Lines changed: 287 additions & 351 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,8 @@ def read(
281281
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
282282
try:
283283
for message in self.source.read(self.logger, config, catalog, state):
284-
yield self.handle_record_counts(message, stream_message_counter)
285284
self._memory_monitor.check_memory_usage()
285+
yield self.handle_record_counts(message, stream_message_counter)
286286
finally:
287287
for message in self._emit_queued_messages(self.source):
288288
yield self.handle_record_counts(message, stream_message_counter)

unit_tests/test_entrypoint.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -908,7 +908,8 @@ def _raise_on_second_call() -> None:
908908
for msg in entrypoint.run(parsed_args):
909909
messages.append(msg)
910910

911-
# 1. The first record was yielded before the exception
911+
# 1. Only the first record was yielded; the second triggered the exception
912+
# before its yield so it is handled in the finally block instead.
912913
record_messages = [m for m in messages if "RECORD" in m]
913914
assert len(record_messages) >= 1, (
914915
"At least the first record should be yielded before MemoryLimitExceeded"
@@ -921,10 +922,11 @@ def _raise_on_second_call() -> None:
921922
)
922923

923924
# 3. The flushed state has sourceStats.recordCount set by handle_record_counts.
924-
# Both records are yielded (and counted) before the second check_memory_usage
925-
# raises, so the counter is 2.0 at flush time.
925+
# Only the first record is yielded (and counted) because the memory check
926+
# now runs *before* yield — the second check raises before the second
927+
# record is handled, so the counter is 1.0 at flush time.
926928
state_json = orjson.loads(state_messages[0])
927-
assert state_json["state"]["sourceStats"]["recordCount"] == 2.0
929+
assert state_json["state"]["sourceStats"]["recordCount"] == 1.0
928930

929931

930932
def test_given_serialization_error_using_orjson_then_fallback_on_json(

0 commit comments

Comments
 (0)