diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 82136b65f..57820f005 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -352,7 +352,14 @@ def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str: f"There was an error during the serialization of an AirbyteMessage: `{exception}`. This might impact the sync performances." ) _HAS_LOGGED_FOR_SERIALIZATION_ERROR = True - return json.dumps(serialized_message) + try: + return json.dumps(serialized_message) + except Exception as json_exception: + raise AirbyteTracedException( + internal_message=f"Failed to serialize AirbyteMessage to JSON: `{json_exception}`", + failure_type=FailureType.system_error, + message="A record returned from the API failed to be serialized to JSON.", + ) from json_exception @classmethod def extract_state(cls, args: List[str]) -> Optional[Any]: diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 35cd608a3..fcfb44915 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -858,6 +858,25 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json( assert len(record_messages) == 2 +def test_given_non_json_serializable_type_then_raise_traced_exception( + entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock +): + """Test that types which both orjson and json cannot serialize (like complex) raise AirbyteTracedException to prevent data corruption.""" + parsed_args = Namespace( + command="read", config="config_path", state="statepath", catalog="catalogpath" + ) + record = AirbyteMessage( + record=AirbyteRecordMessage(stream="stream", data={"value": complex(1, 2)}, emitted_at=1), + type=Type.RECORD, + ) + mocker.patch.object(MockSource, "read_state", return_value={}) + mocker.patch.object(MockSource, "read_catalog", return_value={}) + mocker.patch.object(MockSource, "read", return_value=[record]) + + with pytest.raises(AirbyteTracedException, match="failed to be serialized to JSON"): + list(entrypoint.run(parsed_args)) + + def test_memory_failfast_flushes_queued_state_before_raising(mocker): """Record emitted → check_memory_usage raises → queued STATE flushed with recordCount → exception propagates.""" queued_state = AirbyteMessage(