Skip to content

Commit f550424

Browse files
devin-ai-integration[bot]bot_apk
andauthored
fix(cdk): Preserve error message in concurrent source exception handling (AI-Triage PR) (#937)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: bot_apk <apk@cognition.ai>
1 parent 171054f commit f550424

File tree

3 files changed

+57
-4
lines changed

3 files changed

+57
-4
lines changed

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMess
173173
yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
174174
else:
175175
yield AirbyteTracedException.from_exception(
176-
exception, stream_descriptor=stream_descriptor
176+
exception.exception,
177+
stream_descriptor=stream_descriptor,
178+
message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
177179
).as_airbyte_message()
178180

179181
def _flag_exception(self, stream_name: str, exception: Exception) -> None:

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1288,13 +1288,13 @@ def test_handle_read_external_requests(deployment_mode, url_base, expected_error
12881288
pytest.param(
12891289
"CLOUD",
12901290
"https://10.0.27.27/tokens/bearer",
1291-
"StreamThreadException",
1291+
"Error while refreshing access token",
12921292
id="test_cloud_read_with_private_endpoint",
12931293
),
12941294
pytest.param(
12951295
"CLOUD",
12961296
"http://unsecured.protocol/tokens/bearer",
1297-
"StreamThreadException",
1297+
"Invalid Protocol Scheme",
12981298
id="test_cloud_read_with_unsecured_endpoint",
12991299
),
13001300
pytest.param(

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
AirbyteStreamStatus,
1717
AirbyteStreamStatusTraceMessage,
1818
AirbyteTraceMessage,
19+
FailureType,
1920
StreamDescriptor,
2021
SyncMode,
2122
TraceType,
@@ -576,7 +577,11 @@ def test_on_exception_return_trace_message_and_on_stream_complete_return_stream_
576577

577578
exception_messages = list(handler.on_exception(exception))
578579
assert len(exception_messages) == 1
579-
assert "StreamThreadException" in exception_messages[0].trace.error.stack_trace
580+
assert "RuntimeError" in exception_messages[0].trace.error.stack_trace
581+
assert (
582+
exception_messages[0].trace.error.message
583+
== f"An unexpected error occurred in stream {_STREAM_NAME}: RuntimeError"
584+
)
580585

581586
assert list(
582587
handler.on_partition_complete_sentinel(
@@ -761,6 +766,52 @@ def test_is_done_is_true_if_all_partitions_are_closed_and_no_streams_are_generat
761766

762767
assert handler.is_done()
763768

769+
@freezegun.freeze_time("2020-01-01T00:00:00")
770+
def test_on_exception_non_ate_uses_templated_message_with_correct_failure_type(self):
771+
"""Regression test: non-ATE exceptions on Path B produce a safe templated message, not the generic fallback."""
772+
stream_instances_to_read_from = [self._stream, self._another_stream]
773+
774+
handler = ConcurrentReadProcessor(
775+
stream_instances_to_read_from,
776+
self._partition_enqueuer,
777+
self._thread_pool_manager,
778+
self._logger,
779+
self._slice_logger,
780+
self._message_repository,
781+
self._partition_reader,
782+
)
783+
784+
handler.start_next_partition_generator()
785+
handler.on_partition(self._an_open_partition)
786+
list(
787+
handler.on_partition_generation_completed(
788+
PartitionGenerationCompletedSentinel(self._stream)
789+
)
790+
)
791+
list(
792+
handler.on_partition_generation_completed(
793+
PartitionGenerationCompletedSentinel(self._another_stream)
794+
)
795+
)
796+
797+
inner_exception = ValueError("some internal detail: SELECT * FROM secrets")
798+
exception = StreamThreadException(inner_exception, _STREAM_NAME)
799+
800+
exception_messages = list(handler.on_exception(exception))
801+
assert len(exception_messages) == 1
802+
803+
trace_error = exception_messages[0].trace.error
804+
# User-facing message uses safe template (no raw exception text)
805+
assert (
806+
trace_error.message
807+
== f"An unexpected error occurred in stream {_STREAM_NAME}: ValueError"
808+
)
809+
# Stack trace comes from the inner exception, not the StreamThreadException wrapper
810+
assert "ValueError" in trace_error.stack_trace
811+
assert "StreamThreadException" not in trace_error.stack_trace
812+
# failure_type defaults to system_error for unhandled exceptions
813+
assert trace_error.failure_type == FailureType.system_error
814+
764815
@freezegun.freeze_time("2020-01-01T00:00:00")
765816
def test_start_next_partition_generator(self):
766817
stream_instances_to_read_from = [self._stream]

0 commit comments

Comments
 (0)