Skip to content

Commit aefbc19

Browse files
bot_apkdevin-ai-integration[bot]
andcommitted
fix: handle non-JSON-serializable types in serialization fallback
The json.dumps() fallback in airbyte_message_to_string() could also fail for types that neither orjson nor stdlib json can serialize (e.g. complex numbers), causing an unhandled exception that leads to deadlocks in the concurrent source pipeline. Add a second fallback using json.dumps(default=str) to ensure serialization never raises an unhandled exception. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 0e57414 commit aefbc19

2 files changed

Lines changed: 28 additions & 1 deletion

File tree

airbyte_cdk/entrypoint.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,10 @@ def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str:
342342
f"There was an error during the serialization of an AirbyteMessage: `{exception}`. This might impact the sync performances."
343343
)
344344
_HAS_LOGGED_FOR_SERIALIZATION_ERROR = True
345-
return json.dumps(serialized_message)
345+
try:
346+
return json.dumps(serialized_message)
347+
except Exception:
348+
return json.dumps(serialized_message, default=str)
346349

347350
@classmethod
348351
def extract_state(cls, args: List[str]) -> Optional[Any]:

unit_tests/test_entrypoint.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,3 +856,27 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json(
856856
# There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here
857857
record_messages = list(filter(lambda message: "RECORD" in message, messages))
858858
assert len(record_messages) == 2
859+
860+
861+
def test_given_non_json_serializable_type_then_fallback_with_default_str(
862+
entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock
863+
):
864+
"""Test that types which both orjson and json cannot serialize (like complex) are handled via default=str fallback."""
865+
parsed_args = Namespace(
866+
command="read", config="config_path", state="statepath", catalog="catalogpath"
867+
)
868+
record = AirbyteMessage(
869+
record=AirbyteRecordMessage(
870+
stream="stream", data={"value": complex(1, 2)}, emitted_at=1
871+
),
872+
type=Type.RECORD,
873+
)
874+
mocker.patch.object(MockSource, "read_state", return_value={})
875+
mocker.patch.object(MockSource, "read_catalog", return_value={})
876+
mocker.patch.object(MockSource, "read", return_value=[record])
877+
878+
messages = list(entrypoint.run(parsed_args))
879+
880+
record_messages = list(filter(lambda message: "RECORD" in message, messages))
881+
assert len(record_messages) == 1
882+
assert "(1+2j)" in record_messages[0]

0 commit comments

Comments
 (0)