fix: fail fast on non-JSON-serializable types in serialization fallback (AI-Triage PR)#954
Conversation
The json.dumps() fallback in airbyte_message_to_string() could also fail for types that neither orjson nor stdlib json can serialize (e.g. complex numbers), causing an unhandled exception that leads to deadlocks in the concurrent source pipeline. Add a second fallback using json.dumps(default=str) to ensure serialization never raises an unhandled exception. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773523110-fix-serialization-fallback-complex-types#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773523110-fix-serialization-fallback-complex-typesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR hardens AirbyteEntrypoint.airbyte_message_to_string() against record payloads containing Python types that cannot be serialized by either orjson or stdlib json, to avoid worker-thread crashes that can lead to deadlocks in concurrent pipelines.
Changes:
- Add a second serialization fallback: if
json.dumps()fails, retry withjson.dumps(..., default=str). - Add a unit test covering a
complexvalue that triggers the new fallback.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
airbyte_cdk/entrypoint.py |
Adds a nested JSON serialization fallback using default=str when both orjson and stdlib json fail. |
unit_tests/test_entrypoint.py |
Adds coverage for the new fallback path using a complex value in record data. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Suggestion: Pair this with a connector-level fix in source-google-search-consoleThis CDK-level fix prevents the deadlock, which is good — but the I'd recommend pairing this with a defensive transformation in the connector itself ( @dataclass
class SanitizeNumericFields(RecordTransformation):
"""
Ensures numeric metric fields are JSON-serializable by guarding against non-standard
Python numeric types (e.g., complex) that can cause serialization failures and deadlocks
in the concurrent read pipeline.
See: https://github.com/airbytehq/airbyte/issues/74883
"""
NUMERIC_FIELDS = ("clicks", "impressions", "ctr", "position")
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
for field in self.NUMERIC_FIELDS:
value = record.get(field)
if value is not None and isinstance(value, complex):
record[field] = value.realThen wire it into all search analytics streams in - type: CustomTransformation
class_name: source_declarative_manifest.components.SanitizeNumericFieldsWhy both fixes?
|
|
Great suggestion — you're right that the I looked at the connector and can confirm:
Want me to create a companion Draft PR in |
Updated suggestion: SanitizeNumericFields with warning loggingBuilding on my previous comment — the connector-level transformation should also log a warning when a complex value is encountered, so the data issue is visible rather than silently discarded: @dataclass
class SanitizeNumericFields(RecordTransformation):
"""
Ensures numeric metric fields are JSON-serializable by guarding against non-standard
Python numeric types (e.g., complex) that can cause serialization failures and deadlocks
in the concurrent read pipeline.
See: https://github.com/airbytehq/airbyte/issues/74883
"""
NUMERIC_FIELDS = ("clicks", "impressions", "ctr", "position")
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
for field in self.NUMERIC_FIELDS:
value = record.get(field)
if value is not None and isinstance(value, complex):
logger.warning(
f"Complex value encountered for field '{field}': {value}. "
f"Using real component only."
)
record[field] = value.realWhy this is preferable to the
|
|
Thanks for the refined suggestion with the warning logging — that's a good addition. I'll create a companion Draft PR in |
|
Created the companion connector-level Draft PR per Patrick Nilan (@pnilan)'s suggestion: airbytehq/airbyte#75426 This adds a |
…r review Fail fast with a clear error instead of silently corrupting data by converting non-serializable types to strings. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-Authored-By: suisui.xia <suisui.xia@airbyte.io>
Patrick Nilan (pnilan)
left a comment
There was a problem hiding this comment.
Approvingh
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773523110-fix-serialization-fallback-complex-types#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773523110-fix-serialization-fallback-complex-typesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: suisui.xia <suisui.xia@airbyte.io>
Summary
Hardens
airbyte_message_to_string()to raise a clearAirbyteTracedExceptionwhen record data contains types that neitherorjsonnor stdlibjsoncan serialize, instead of letting an unhandledTypeErrorsilently kill worker threads and deadlock the concurrent source pipeline.Problem: The existing fallback chain is
orjson.dumps()→json.dumps(). Butjson.dumps()can also raiseTypeErrorfor types like Pythoncomplexnumbers. When this unhandled exception occurs in a worker thread of the concurrent source pipeline, the main thread deadlocks waiting onqueue.get()because the worker silently dies.Fix: Wrap the
json.dumps()fallback in its own try/except and raise anAirbyteTracedExceptionwithfailure_type=system_errorand a clear message identifying the serialization failure. This stops the sync immediately with an actionable error rather than deadlocking or silently corrupting data.Resolves https://github.com/airbytehq/oncall/issues/11654:
Related: airbytehq/airbyte#74883
Review & Testing Checklist for Human
AirbyteTracedExceptionraised fromairbyte_message_to_string()propagates correctly through the concurrent read pipeline and does not itself cause a deadlock. The function is called viamap()inrun()— confirm the exception surfaces cleanly to the platform.system_erroris the correct failure type: This will cause the platform to retry the sync. Confirm this is preferable overconfig_error(which would not retry) for non-serializable data types in records.complextype issue insource-google-search-consoleis handled at the connector level in airbytehq/airbyte#75426. This CDK change is defense-in-depth — test that the connector fix prevents the exception from firing, and that without the connector fix, this CDK change produces a clear error instead of a deadlock.Suggested test plan: Run
source-google-search-consolewith thesearch_analytics_by_querystream against a real account. With the companion connector fix, the sync should complete normally. Without it, the sync should fail with a clearAirbyteTracedExceptionmessage mentioning JSON serialization, rather than deadlocking.Notes
complexvalues entering the data flow was separately fixed in CDK PR airbytehq/airbyte-python-cdk#579 (Jinja interpolation rejecting complex types, merged in v6.54.5). This PR hardens the serialization layer as defense-in-depth against any future non-serializable type reaching this code path.orjson→jsonfallback was added in airbytehq/airbyte-python-cdk#210 (v6.16.1, by Artem Inzhyyants (@artem1205) / Maxime Carbonneau-Leclerc (@maxi297)).SanitizeNumericFieldstransformation to extract.realfrom complex values before serialization.Updates since last revision
default=strsilent fallback withAirbyteTracedExceptionper reviewer feedback from suisuixia42 — fail fast with a clear error instead of silently corrupting numeric fields into strings like"(0.0423+0j)".AirbyteTracedExceptionis raised with the expected message.Link to Devin session: https://app.devin.ai/sessions/d8317c1f4ce64f70b5e807425b72aca2
Important
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.
Note
Auto-merge may have been disabled. Please check the PR status to confirm.