Skip to content

Commit 0485fa2

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
refactor(cdk): move memory check back to after yield for zero data loss
Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 5f7e16d commit 0485fa2

2 files changed

Lines changed: 8 additions & 9 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,8 @@ def read(
281281
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
282282
try:
283283
for message in self.source.read(self.logger, config, catalog, state):
284-
self._memory_monitor.check_memory_usage()
285284
yield self.handle_record_counts(message, stream_message_counter)
285+
self._memory_monitor.check_memory_usage()
286286
finally:
287287
for message in self._emit_queued_messages(self.source):
288288
yield self.handle_record_counts(message, stream_message_counter)

unit_tests/test_entrypoint.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -908,11 +908,11 @@ def _raise_on_second_call() -> None:
908908
for msg in entrypoint.run(parsed_args):
909909
messages.append(msg)
910910

911-
# 1. Only the first record was yielded; the second triggered the exception
912-
# before its yield so it is handled in the finally block instead.
911+
# 1. Both records were yielded before the exception — the memory check
912+
# runs after yield so every message pulled from the source is emitted.
913913
record_messages = [m for m in messages if "RECORD" in m]
914-
assert len(record_messages) >= 1, (
915-
"At least the first record should be yielded before MemoryLimitExceeded"
914+
assert len(record_messages) == 2, (
915+
"Both records should be yielded before MemoryLimitExceeded"
916916
)
917917

918918
# 2. The queued state message was flushed by the finally block
@@ -922,11 +922,10 @@ def _raise_on_second_call() -> None:
922922
)
923923

924924
# 3. The flushed state has sourceStats.recordCount set by handle_record_counts.
925-
# Only the first record is yielded (and counted) because the memory check
926-
# now runs *before* yield — the second check raises before the second
927-
# record is handled, so the counter is 1.0 at flush time.
925+
# Both records are yielded (and counted) before the second check_memory_usage
926+
# raises, so the counter is 2.0 at flush time.
928927
state_json = orjson.loads(state_messages[0])
929-
assert state_json["state"]["sourceStats"]["recordCount"] == 1.0
928+
assert state_json["state"]["sourceStats"]["recordCount"] == 2.0
930929

931930

932931
def test_given_serialization_error_using_orjson_then_fallback_on_json(

0 commit comments

Comments
 (0)