Skip to content

Commit ea995b9

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix: improve serialization error messages and add default=str fallback
- Replace misleading warning message with clear, guideline-compliant text - Add json.dumps(default=str) fallback to prevent deadlocks when both orjson and json.dumps fail on non-serializable types (e.g. complex) - Catch specific TypeError instead of broad Exception for json fallback - Log technical details at DEBUG level, user-facing messages at WARNING - Add separate flag to log second fallback warning only once Resolves airbytehq/airbyte-internal-issues#16049 Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent acafc75 commit ea995b9

2 files changed

Lines changed: 48 additions & 3 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
VALID_URL_SCHEMES = ["https"]
5050
CLOUD_DEPLOYMENT_MODE = "cloud"
5151
_HAS_LOGGED_FOR_SERIALIZATION_ERROR = False
52+
_HAS_LOGGED_FOR_SERIALIZATION_FALLBACK = False
5253

5354

5455
class AirbyteEntrypoint(object):
@@ -333,16 +334,28 @@ def set_up_secret_filter(config: TConfig, connection_specification: Mapping[str,
333334
@staticmethod
334335
def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str:
335336
global _HAS_LOGGED_FOR_SERIALIZATION_ERROR
337+
global _HAS_LOGGED_FOR_SERIALIZATION_FALLBACK
336338
serialized_message = AirbyteMessageSerializer.dump(airbyte_message)
337339
try:
338340
return orjson.dumps(serialized_message).decode()
339-
except Exception as exception:
341+
except Exception as orjson_error:
340342
if not _HAS_LOGGED_FOR_SERIALIZATION_ERROR:
341343
logger.warning(
342-
f"There was an error during the serialization of an AirbyteMessage: `{exception}`. This might impact the sync performances."
344+
"Record serialization fell back to slower method. Sync will continue with reduced performance."
343345
)
346+
logger.debug("orjson serialization error: %s", orjson_error)
344347
_HAS_LOGGED_FOR_SERIALIZATION_ERROR = True
345-
return json.dumps(serialized_message)
348+
try:
349+
return json.dumps(serialized_message)
350+
except TypeError as json_error:
351+
if not _HAS_LOGGED_FOR_SERIALIZATION_FALLBACK:
352+
logger.warning(
353+
"Record contains a value that could not be serialized to JSON. "
354+
"The value was converted to a string representation."
355+
)
356+
logger.debug("json serialization error: %s", json_error)
357+
_HAS_LOGGED_FOR_SERIALIZATION_FALLBACK = True
358+
return json.dumps(serialized_message, default=str)
346359

347360
@classmethod
348361
def extract_state(cls, args: List[str]) -> Optional[Any]:

unit_tests/test_entrypoint.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,10 @@ def test_handle_record_counts(
838838
def test_given_serialization_error_using_orjson_then_fallback_on_json(
839839
entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock
840840
):
841+
# Reset global flags to avoid test pollution
842+
entrypoint_module._HAS_LOGGED_FOR_SERIALIZATION_ERROR = False
843+
entrypoint_module._HAS_LOGGED_FOR_SERIALIZATION_FALLBACK = False
844+
841845
parsed_args = Namespace(
842846
command="read", config="config_path", state="statepath", catalog="catalogpath"
843847
)
@@ -856,3 +860,31 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json(
856860
# There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here
857861
record_messages = list(filter(lambda message: "RECORD" in message, messages))
858862
assert len(record_messages) == 2
863+
864+
865+
def test_given_non_json_serializable_type_then_fallback_with_default_str(
866+
entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock
867+
):
868+
"""Test that types which both orjson and json cannot serialize (like complex) are handled via default=str fallback."""
869+
# Reset global flags to avoid test pollution
870+
entrypoint_module._HAS_LOGGED_FOR_SERIALIZATION_ERROR = False
871+
entrypoint_module._HAS_LOGGED_FOR_SERIALIZATION_FALLBACK = False
872+
873+
parsed_args = Namespace(
874+
command="read", config="config_path", state="statepath", catalog="catalogpath"
875+
)
876+
record = AirbyteMessage(
877+
record=AirbyteRecordMessage(stream="stream", data={"value": complex(1, 2)}, emitted_at=1),
878+
type=Type.RECORD,
879+
)
880+
mocker.patch.object(MockSource, "read_state", return_value={})
881+
mocker.patch.object(MockSource, "read_catalog", return_value={})
882+
mocker.patch.object(MockSource, "read", return_value=[record])
883+
884+
messages = list(entrypoint.run(parsed_args))
885+
886+
record_messages = list(filter(lambda message: "RECORD" in message, messages))
887+
assert len(record_messages) == 1
888+
# Verify the complex value was converted to its string representation
889+
parsed_record = orjson.loads(record_messages[0])
890+
assert parsed_record["record"]["data"]["value"] == "(1+2j)"

0 commit comments

Comments
 (0)