Skip to content

Commit 6aa5377

Browse files
chore: merge main into scopes-array-schema to resolve conflicts
2 parents 1e1f8f0 + f550424 commit 6aa5377

5 files changed

Lines changed: 69 additions & 6 deletions

File tree

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMess
173173
yield exception.exception.as_airbyte_message(stream_descriptor=stream_descriptor)
174174
else:
175175
yield AirbyteTracedException.from_exception(
176-
exception, stream_descriptor=stream_descriptor
176+
exception.exception,
177+
stream_descriptor=stream_descriptor,
178+
message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}",
177179
).as_airbyte_message()
178180

179181
def _flag_exception(self, stream_name: str, exception: Exception) -> None:

airbyte_cdk/sources/declarative/interpolation/jinja.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ def eval(
124124

125125
def _literal_eval(self, result: Optional[str], valid_types: Optional[Tuple[Type[Any]]]) -> Any:
126126
try:
127-
evaluated = ast.literal_eval(result) # type: ignore # literal_eval is able to handle None
128-
except (ValueError, SyntaxError):
127+
evaluated = ast.literal_eval(result) # type: ignore # result may be None; on error we return it unchanged
128+
except (ValueError, SyntaxError, TypeError):
129129
return result
130130
if (not valid_types and not isinstance(evaluated, complex)) or (
131131
valid_types and isinstance(evaluated, valid_types)

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1288,13 +1288,13 @@ def test_handle_read_external_requests(deployment_mode, url_base, expected_error
12881288
pytest.param(
12891289
"CLOUD",
12901290
"https://10.0.27.27/tokens/bearer",
1291-
"StreamThreadException",
1291+
"Error while refreshing access token",
12921292
id="test_cloud_read_with_private_endpoint",
12931293
),
12941294
pytest.param(
12951295
"CLOUD",
12961296
"http://unsecured.protocol/tokens/bearer",
1297-
"StreamThreadException",
1297+
"Invalid Protocol Scheme",
12981298
id="test_cloud_read_with_unsecured_endpoint",
12991299
),
13001300
pytest.param(

unit_tests/sources/declarative/interpolation/test_jinja.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,16 @@ def test_interpolation_private_partition_attribute():
345345
assert actual_output == expected_output
346346

347347

348+
def test_literal_eval_handles_unhashable_set_typeerror():
349+
"""Test that _literal_eval gracefully handles TypeError from ast.literal_eval for nested sets."""
350+
# ast.literal_eval("{{'web'}, {'discover'}}") raises TypeError: unhashable type: 'set'
351+
# The interpolation should return the string as-is instead of propagating the error.
352+
config = {"query": "{{'web'}, {'discover'}}"}
353+
s = "{{ config['query'] }}"
354+
val = interpolation.eval(s, config)
355+
assert val == "{{'web'}, {'discover'}}"
356+
357+
348358
def test_given_complex_when_eval_then_return_string():
349359
s = "9173710294242221J"
350360
config = {}

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
AirbyteStreamStatus,
1717
AirbyteStreamStatusTraceMessage,
1818
AirbyteTraceMessage,
19+
FailureType,
1920
StreamDescriptor,
2021
SyncMode,
2122
TraceType,
@@ -576,7 +577,11 @@ def test_on_exception_return_trace_message_and_on_stream_complete_return_stream_
576577

577578
exception_messages = list(handler.on_exception(exception))
578579
assert len(exception_messages) == 1
579-
assert "StreamThreadException" in exception_messages[0].trace.error.stack_trace
580+
assert "RuntimeError" in exception_messages[0].trace.error.stack_trace
581+
assert (
582+
exception_messages[0].trace.error.message
583+
== f"An unexpected error occurred in stream {_STREAM_NAME}: RuntimeError"
584+
)
580585

581586
assert list(
582587
handler.on_partition_complete_sentinel(
@@ -761,6 +766,52 @@ def test_is_done_is_true_if_all_partitions_are_closed_and_no_streams_are_generat
761766

762767
assert handler.is_done()
763768

769+
@freezegun.freeze_time("2020-01-01T00:00:00")
770+
def test_on_exception_non_ate_uses_templated_message_with_correct_failure_type(self):
771+
"""Regression test: non-ATE exceptions on Path B produce a safe templated message, not the generic fallback."""
772+
stream_instances_to_read_from = [self._stream, self._another_stream]
773+
774+
handler = ConcurrentReadProcessor(
775+
stream_instances_to_read_from,
776+
self._partition_enqueuer,
777+
self._thread_pool_manager,
778+
self._logger,
779+
self._slice_logger,
780+
self._message_repository,
781+
self._partition_reader,
782+
)
783+
784+
handler.start_next_partition_generator()
785+
handler.on_partition(self._an_open_partition)
786+
list(
787+
handler.on_partition_generation_completed(
788+
PartitionGenerationCompletedSentinel(self._stream)
789+
)
790+
)
791+
list(
792+
handler.on_partition_generation_completed(
793+
PartitionGenerationCompletedSentinel(self._another_stream)
794+
)
795+
)
796+
797+
inner_exception = ValueError("some internal detail: SELECT * FROM secrets")
798+
exception = StreamThreadException(inner_exception, _STREAM_NAME)
799+
800+
exception_messages = list(handler.on_exception(exception))
801+
assert len(exception_messages) == 1
802+
803+
trace_error = exception_messages[0].trace.error
804+
# User-facing message uses safe template (no raw exception text)
805+
assert (
806+
trace_error.message
807+
== f"An unexpected error occurred in stream {_STREAM_NAME}: ValueError"
808+
)
809+
# Stack trace comes from the inner exception, not the StreamThreadException wrapper
810+
assert "ValueError" in trace_error.stack_trace
811+
assert "StreamThreadException" not in trace_error.stack_trace
812+
# failure_type defaults to system_error for unhandled exceptions
813+
assert trace_error.failure_type == FailureType.system_error
814+
764815
@freezegun.freeze_time("2020-01-01T00:00:00")
765816
def test_start_next_partition_generator(self):
766817
stream_instances_to_read_from = [self._stream]

0 commit comments

Comments
 (0)