Skip to content

Commit b3ee3c2

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix(cdk): preserve error message in concurrent source exception handling
In ConcurrentReadProcessor.on_exception(), when the inner exception is not an AirbyteTracedException, the code was passing the StreamThreadException wrapper to from_exception() instead of the inner exception. Since from_exception() does not set a user-facing message for non-ATE inputs, this resulted in message=None, triggering the generic fallback: 'Something went wrong in the connector. See the logs for more details.' Fix: pass the inner exception and explicitly provide the message parameter so the actual error text surfaces to users instead of the generic fallback. Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 6136336 commit b3ee3c2

2 files changed

Lines changed: 5 additions & 2 deletions

File tree

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMess
173173
yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
174174
else:
175175
yield AirbyteTracedException.from_exception(
176-
exception, stream_descriptor=stream_descriptor
176+
exception.exception,
177+
stream_descriptor=stream_descriptor,
178+
message=str(exception.exception),
177179
).as_airbyte_message()
178180

179181
def _flag_exception(self, stream_name: str, exception: Exception) -> None:

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,8 @@ def test_on_exception_return_trace_message_and_on_stream_complete_return_stream_
576576

577577
exception_messages = list(handler.on_exception(exception))
578578
assert len(exception_messages) == 1
579-
assert "StreamThreadException" in exception_messages[0].trace.error.stack_trace
579+
assert "RuntimeError" in exception_messages[0].trace.error.stack_trace
580+
assert exception_messages[0].trace.error.message == "Something went wrong"
580581

581582
assert list(
582583
handler.on_partition_complete_sentinel(

0 commit comments

Comments
 (0)