fix(cdk): Preserve error message in concurrent source exception handling (AI-Triage PR)#937
Conversation
In ConcurrentReadProcessor.on_exception(), when the inner exception is not an AirbyteTracedException, the code was passing the StreamThreadException wrapper to from_exception() instead of the inner exception. Since from_exception() does not set a user-facing message for non-ATE inputs, this resulted in message=None, triggering the generic fallback: 'Something went wrong in the connector. See the logs for more details.' Fix: pass the inner exception and explicitly provide the message parameter so the actual error text surfaces to users instead of the generic fallback. Co-Authored-By: bot_apk <apk@cognition.ai>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1772822472-fix-concurrent-source-error-message#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1772822472-fix-concurrent-source-error-messagePR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: bot_apk <apk@cognition.ai>
Patrick Nilan (pnilan)
left a comment
There was a problem hiding this comment.
Principal Engineer Review
This PR fixes a real code defect (passing the StreamThreadException wrapper instead of the inner exception on Path B of on_exception()), but there are significant gaps that should be addressed before merging.
Key Concern: Does this actually fix the reported Sentry issue?
The triage assumes the Sentry scenario (Asana 429 → retry exhaustion) hits Path B (non-ATE inner exception, line 174). But _send_with_retry in http_client.py wraps retry exhaustion as MessageRepresentationAirbyteTracedErrors — which is an AirbyteTracedException subclass — meaning Path A (line 172) should handle it. If that's the case, this PR is a no-op for the reported issue.
Before this can be merged, someone needs to pull the actual Sentry stack trace and confirm which path is hit.
Investigation needed
-
If Path A is the real path: Why does it still produce the generic fallback? The ATE from
_send_with_retrysets bothmessageandinternal_message. If the message is being lost, the bug is upstream in the wrapping chain. Note: PR #927 (now merged) fixedfrom_exception()to preservemessagefor ATE inputs — this may have already resolved the issue. Verify by reproducing against the latest CDK. -
If Path B is the real path: Why isn't the exception wrapped as an ATE? That's the actual bug to fix — not papering over it in
on_exception().
Issues with the current fix
-
message=str(exception.exception)is unsafe for user-facing display. Raw exception strings can contain SQL, class names, stack fragments, connection strings, or other internals that don't belong in a user-facing message. A safer alternative would be a templated message, e.g.:f"Stream {exception.stream_name} encountered an error: {type(exception.exception).__name__}" -
failure_typeis not addressed. The issue correctly notes that 429 rate limits should betransient_error, notsystem_error. This PR still defaults tosystem_errorfor all non-ATE exceptions on Path B. Consider detecting rate-limit/timeout exceptions and settingfailure_typeaccordingly. -
No end-to-end regression test. There should be a test that simulates 429 retry exhaustion through the concurrent source pipeline and asserts the user-facing message is not the generic fallback.
Recommendation
Hold this PR until the investigation above is completed. If Path A is the real path and PR #927 already fixed it, this PR can be closed. If Path B is genuinely hit, fix the upstream wrapping first, then use this PR as defense-in-depth with the str() safety and failure_type concerns addressed.
This review was created by Claude Code.
|
Thank you for the thorough review — these are valid concerns and I appreciate the rigor. Path A vs Path B: You're rightAfter re-examining the code path more carefully:
So for the reported 429 retry exhaustion scenario, Path A handles it, and this PR (which only fixes Path B) would indeed be a no-op for the specific Sentry issue. PR #927 likely already resolves the reported issuePR #927 (now merged) fixed What this PR still fixesThis PR does fix a real defect on Path B — previously passing the RecommendationGiven the above, I agree with your recommendation to hold this PR. The options are:
I'll defer to your judgment on which path to take. If you'd like me to verify that PR #927 resolved the Sentry issue (by checking recent error rates), I can do that. Devin session · Requested by bot_apk (apk@cognition.ai) |
Follow-up: Recommended changesAfter further investigation (including reviewing the original PR that introduced this code path — airbytehq/airbyte#37443), here's a refined recommendation for this PR. ContextPath B was never deliberately designed — it's the legacy default from before Maxime Carbonneau-Leclerc (@maxi297) added the ATE-aware Path A in #37443. The bug (passing the Recommended changes
This comment was created by Claude Code. |
…edback Co-Authored-By: bot_apk <apk@cognition.ai>
There was a problem hiding this comment.
Pull request overview
Fixes loss of the underlying exception message when non-AirbyteTracedException errors propagate through the concurrent source pipeline, preventing a generic fallback error from being shown to users.
Changes:
- Passes the inner exception from
StreamThreadExceptionintoAirbyteTracedException.from_exception()and explicitly sets the message. - Updates concurrent source unit tests to assert on the inner exception’s stack trace/message.
- Updates connector builder tests to assert on inner exception strings instead of
"StreamThreadException".
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py | Preserves user-facing error message by tracing the inner exception and setting message. |
| unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py | Adjusts assertions to match the new stack trace/message behavior. |
| unit_tests/connector_builder/test_connector_builder_handler.py | Updates expected error strings to reflect inner exception traces. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-Authored-By: bot_apk <apk@cognition.ai>
|
Addressed all three recommended changes:
All 21 unit tests pass locally (concurrent read processor) + 6 connector builder tests pass. Devin session · Requested by bot_apk (apk@cognition.ai) |
|
(aside) Impact Analysis: What this PR actually improvesThe meaningful improvementsThe primary value of this PR is debuggability, not the user-facing message. By passing
Side effects to be aware ofSentry grouping will change. Today, all non-ATE exceptions on Path B produce stack traces rooted in
This is a one-time disruption and the resulting grouping is more correct — errors from different root causes will no longer be lumped together. But it's worth being aware of during the rollout. This comment was created by Claude Code. |
Summary
Fixes the generic fallback error message
"Something went wrong in the connector. See the logs for more details."that appears when a non-AirbyteTracedExceptionpropagates through the concurrent source pipeline.Root cause: In
ConcurrentReadProcessor.on_exception(), the else branch (Path B, line 175) was passing theStreamThreadExceptionwrapper toAirbyteTracedException.from_exception()instead of the inner exception. SinceStreamThreadExceptionis not anAirbyteTracedException,from_exception()never sets themessagefield, resulting inmessage=None→ generic fallback inas_airbyte_message(). This bug has existed since the concurrent source was built — Path B was the legacy default before Path A (the ATE-aware branch) was added in airbytehq/airbyte#37443.Fix (3 changes):
exception.exception) instead of theStreamThreadExceptionwrapper tofrom_exception()f"An unexpected error occurred in stream {stream_name}: {ExceptionType}"— that exposes stream name and exception type without leaking raw exception text (e.g., SQL, credentials, internal hostnames)failure_typeremainssystem_error(the default) since non-ATE exceptions reaching Path B are by definition unexpected/unhandledScope note: This fix addresses Path B only (non-ATE inner exceptions). The originally reported Sentry scenario (Asana 429 → retry exhaustion) hits Path A, where
_send_with_retrywraps the error as anAirbyteTracedException. That path is likely already resolved by #927 (merged). This PR fixes a separate, longstanding defect on Path B.Related to https://github.com/airbytehq/airbyte-internal-issues/issues/15942:
Updates since last revision
str(exception.exception)withf"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}"per reviewer recommendation. This prevents leaking raw exception text (URLs with credentials, SQL snippets, etc.) into user-facing messages.test_on_exception_non_ate_uses_templated_message_with_correct_failure_typevalidates that:str())StreamThreadExceptionwrapper)failure_typeissystem_error"SELECT * FROM secrets") does not leak into the messageReview & Testing Checklist for Human
"An unexpected error occurred in stream {name}: {ExceptionType}"only reveals the exception class name. Verify this is informative enough for debugging without being too opaque.StreamThreadExceptionwrapper, losing some context about the concurrent pipeline location. The updated connector builder test assertions ("Error while refreshing access token","Invalid Protocol Scheme") reflect this.from_exception()itself should handle this more broadly. Currentlyfrom_exception()never setsmessagefor non-ATE inputs. A fix there would protect all call sites, not just this one. This PR's approach is more targeted/safer but leaves the broader gap open.Suggested test plan: Trigger a sync with a connector using the concurrent source path against an endpoint that causes a non-ATE exception (e.g., a connection timeout or DNS resolution failure that isn't wrapped as ATE). Verify the user-facing error message shows
"An unexpected error occurred in stream {name}: {ExceptionType}"rather than the generic fallback.Notes
test_concurrent_read_processor.py, 6 tests intest_connector_builder_handler.py). No end-to-end integration testing with a real connector was performed.Requested by: bot_apk
Devin session
Important
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.