Skip to content

Commit 0494fd4

Browse files
devin-ai-integration[bot]bot_apksuisuixia42
authored
fix: fail fast on non-JSON-serializable types in serialization fallback (AI-Triage PR) (#954)
Co-authored-by: bot_apk <apk@cognition.ai> Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: suisuixia42 <suisui.xia@airbyte.io>
1 parent 0b94cbe commit 0494fd4

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

airbyte_cdk/entrypoint.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,14 @@ def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str:
352352
f"There was an error during the serialization of an AirbyteMessage: `{exception}`. This might impact the sync performances."
353353
)
354354
_HAS_LOGGED_FOR_SERIALIZATION_ERROR = True
355-
return json.dumps(serialized_message)
355+
try:
356+
return json.dumps(serialized_message)
357+
except Exception as json_exception:
358+
raise AirbyteTracedException(
359+
internal_message=f"Failed to serialize AirbyteMessage to JSON: `{json_exception}`",
360+
failure_type=FailureType.system_error,
361+
message="A record returned from the API failed to be serialized to JSON.",
362+
) from json_exception
356363

357364
@classmethod
358365
def extract_state(cls, args: List[str]) -> Optional[Any]:

unit_tests/test_entrypoint.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,25 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json(
858858
assert len(record_messages) == 2
859859

860860

861+
def test_given_non_json_serializable_type_then_raise_traced_exception(
862+
entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock
863+
):
864+
"""Test that types which both orjson and json cannot serialize (like complex) raise AirbyteTracedException to prevent data corruption."""
865+
parsed_args = Namespace(
866+
command="read", config="config_path", state="statepath", catalog="catalogpath"
867+
)
868+
record = AirbyteMessage(
869+
record=AirbyteRecordMessage(stream="stream", data={"value": complex(1, 2)}, emitted_at=1),
870+
type=Type.RECORD,
871+
)
872+
mocker.patch.object(MockSource, "read_state", return_value={})
873+
mocker.patch.object(MockSource, "read_catalog", return_value={})
874+
mocker.patch.object(MockSource, "read", return_value=[record])
875+
876+
with pytest.raises(AirbyteTracedException, match="failed to be serialized to JSON"):
877+
list(entrypoint.run(parsed_args))
878+
879+
861880
def test_memory_failfast_flushes_queued_state_before_raising(mocker):
862881
"""Record emitted → check_memory_usage raises → queued STATE flushed with recordCount → exception propagates."""
863882
queued_state = AirbyteMessage(

0 commit comments

Comments
 (0)