Skip to content

fix(cdk): Preserve error message in concurrent source exception handling (AI-Triage PR)#937

Merged
Patrick Nilan (pnilan) merged 4 commits intomainfrom
devin/1772822472-fix-concurrent-source-error-message
Mar 9, 2026
Merged

fix(cdk): Preserve error message in concurrent source exception handling (AI-Triage PR)#937
Patrick Nilan (pnilan) merged 4 commits intomainfrom
devin/1772822472-fix-concurrent-source-error-message

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Mar 6, 2026

Summary

Fixes the generic fallback error message "Something went wrong in the connector. See the logs for more details." that appears when a non-AirbyteTracedException propagates through the concurrent source pipeline.

Root cause: In ConcurrentReadProcessor.on_exception(), the else branch (Path B, line 175) was passing the StreamThreadException wrapper to AirbyteTracedException.from_exception() instead of the inner exception. Since StreamThreadException is not an AirbyteTracedException, from_exception() never sets the message field, resulting in message=None → generic fallback in as_airbyte_message(). This bug has existed since the concurrent source was built — Path B was the legacy default before Path A (the ATE-aware branch) was added in airbytehq/airbyte#37443.

Fix (3 changes):

  1. Pass the inner exception (exception.exception) instead of the StreamThreadException wrapper to from_exception()
  2. Use a safe templated message — f"An unexpected error occurred in stream {stream_name}: {ExceptionType}" — that exposes stream name and exception type without leaking raw exception text (e.g., SQL, credentials, internal hostnames)
  3. failure_type remains system_error (the default) since non-ATE exceptions reaching Path B are by definition unexpected/unhandled

Scope note: This fix addresses Path B only (non-ATE inner exceptions). The originally reported Sentry scenario (Asana 429 → retry exhaustion) hits Path A, where _send_with_retry wraps the error as an AirbyteTracedException. That path is likely already resolved by #927 (merged). This PR fixes a separate, longstanding defect on Path B.

Related to https://github.com/airbytehq/airbyte-internal-issues/issues/15942:

Updates since last revision

  • Safe templated message: Replaced str(exception.exception) with f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}" per reviewer recommendation. This prevents leaking raw exception text (URLs with credentials, SQL snippets, etc.) into user-facing messages.
  • Regression test added: test_on_exception_non_ate_uses_templated_message_with_correct_failure_type validates that:
    • User-facing message uses the safe template (not generic fallback, not raw str())
    • Stack trace comes from the inner exception (not the StreamThreadException wrapper)
    • failure_type is system_error
    • Raw exception text (e.g., "SELECT * FROM secrets") does not leak into the message
  • Ruff format fix: Applied ruff formatting to long assertion lines in tests.

Review & Testing Checklist for Human

  • Confirm the templated message format is acceptable for all connectors. The message "An unexpected error occurred in stream {name}: {ExceptionType}" only reveals the exception class name. Verify this is informative enough for debugging without being too opaque.
  • Verify the stack trace change is acceptable. The stack trace now comes from the inner exception rather than the StreamThreadException wrapper, losing some context about the concurrent pipeline location. The updated connector builder test assertions ("Error while refreshing access token", "Invalid Protocol Scheme") reflect this.
  • Evaluate whether from_exception() itself should handle this more broadly. Currently from_exception() never sets message for non-ATE inputs. A fix there would protect all call sites, not just this one. This PR's approach is more targeted/safer but leaves the broader gap open.

Suggested test plan: Trigger a sync with a connector using the concurrent source path against an endpoint that causes a non-ATE exception (e.g., a connection timeout or DNS resolution failure that isn't wrapped as ATE). Verify the user-facing error message shows "An unexpected error occurred in stream {name}: {ExceptionType}" rather than the generic fallback.

Notes

  • This is a CDK-level change affecting all connectors using the concurrent source path.
  • Unit tests pass locally (21 tests in test_concurrent_read_processor.py, 6 tests in test_connector_builder_handler.py). No end-to-end integration testing with a real connector was performed.

Requested by: bot_apk
Devin session

Important

Auto-merge enabled.

This PR is set to merge automatically when all requirements are met.

In ConcurrentReadProcessor.on_exception(), when the inner exception is
not an AirbyteTracedException, the code was passing the StreamThreadException
wrapper to from_exception() instead of the inner exception. Since
from_exception() does not set a user-facing message for non-ATE inputs,
this resulted in message=None, triggering the generic fallback:
'Something went wrong in the connector. See the logs for more details.'

Fix: pass the inner exception and explicitly provide the message parameter
so the actual error text surfaces to users instead of the generic fallback.

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 6, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1772822472-fix-concurrent-source-error-message#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1772822472-fix-concurrent-source-error-message

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 6, 2026

PyTest Results (Fast)

3 892 tests  +2   3 880 ✅ +2   6m 46s ⏱️ -5s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 3cdf557. ± Comparison against base commit 6136336.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 6, 2026

PyTest Results (Full)

3 895 tests  +2   3 883 ✅ +2   11m 13s ⏱️ +41s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 3cdf557. ± Comparison against base commit 6136336.

♻️ This comment has been updated with latest results.

Co-Authored-By: bot_apk <apk@cognition.ai>
Copy link
Copy Markdown
Contributor

@pnilan Patrick Nilan (pnilan) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Principal Engineer Review

This PR fixes a real code defect (passing the StreamThreadException wrapper instead of the inner exception on Path B of on_exception()), but there are significant gaps that should be addressed before merging.

Key Concern: Does this actually fix the reported Sentry issue?

The triage assumes the Sentry scenario (Asana 429 → retry exhaustion) hits Path B (non-ATE inner exception, line 174). But _send_with_retry in http_client.py wraps retry exhaustion as MessageRepresentationAirbyteTracedErrors — which is an AirbyteTracedException subclass — meaning Path A (line 172) should handle it. If that's the case, this PR is a no-op for the reported issue.

Before this can be merged, someone needs to pull the actual Sentry stack trace and confirm which path is hit.

Investigation needed

  1. If Path A is the real path: Why does it still produce the generic fallback? The ATE from _send_with_retry sets both message and internal_message. If the message is being lost, the bug is upstream in the wrapping chain. Note: PR #927 (now merged) fixed from_exception() to preserve message for ATE inputs — this may have already resolved the issue. Verify by reproducing against the latest CDK.

  2. If Path B is the real path: Why isn't the exception wrapped as an ATE? That's the actual bug to fix — not papering over it in on_exception().

Issues with the current fix

  1. message=str(exception.exception) is unsafe for user-facing display. Raw exception strings can contain SQL, class names, stack fragments, connection strings, or other internals that don't belong in a user-facing message. A safer alternative would be a templated message, e.g.:

    f"Stream {exception.stream_name} encountered an error: {type(exception.exception).__name__}"
  2. failure_type is not addressed. The issue correctly notes that 429 rate limits should be transient_error, not system_error. This PR still defaults to system_error for all non-ATE exceptions on Path B. Consider detecting rate-limit/timeout exceptions and setting failure_type accordingly.

  3. No end-to-end regression test. There should be a test that simulates 429 retry exhaustion through the concurrent source pipeline and asserts the user-facing message is not the generic fallback.

Recommendation

Hold this PR until the investigation above is completed. If Path A is the real path and PR #927 already fixed it, this PR can be closed. If Path B is genuinely hit, fix the upstream wrapping first, then use this PR as defense-in-depth with the str() safety and failure_type concerns addressed.


This review was created by Claude Code.

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Thank you for the thorough review — these are valid concerns and I appreciate the rigor.

Path A vs Path B: You're right

After re-examining the code path more carefully:

  1. _send_with_retry (http_client.py:295-303) catches BaseBackoffException on retry exhaustion and raises AirbyteTracedException directly — with both message and internal_message set
  2. partition_reader.process_partition (line 84) catches this and wraps it in StreamThreadException
  3. on_exception() checks isinstance(exception.exception, AirbyteTracedException)TruePath A (line 172)

So for the reported 429 retry exhaustion scenario, Path A handles it, and this PR (which only fixes Path B) would indeed be a no-op for the specific Sentry issue.

PR #927 likely already resolves the reported issue

PR #927 (now merged) fixed from_exception() to preserve message for ATE inputs and moved the __str__ override to the base class. Since Path A calls exception.exception.as_airbyte_message() directly, and the ATE from _send_with_retry already has message set, this should mean the generic fallback no longer occurs for the 429 scenario.

What this PR still fixes

This PR does fix a real defect on Path B — previously passing the StreamThreadException wrapper (instead of the inner exception) to from_exception(). Path B is hit when non-ATE exceptions propagate through the concurrent pipeline (e.g., the OAuth token refresh failures for private endpoints / unsecured protocols that the connector builder tests exercise). However, this is not the defect causing the reported Sentry issue.

Recommendation

Given the above, I agree with your recommendation to hold this PR. The options are:

  1. Close this PR if PR fix: update AirbyteTracedException.__str__ to show user-facing message (AI-Triage PR) #927 has already resolved the reported issue, and the Path B defect is low-priority
  2. Keep as defense-in-depth but address the concerns you raised (str() safety, failure_type, end-to-end test) before merging

I'll defer to your judgment on which path to take. If you'd like me to verify that PR #927 resolved the Sentry issue (by checking recent error rates), I can do that.


Devin session · Requested by bot_apk (apk@cognition.ai)

@pnilan
Copy link
Copy Markdown
Contributor

Follow-up: Recommended changes

After further investigation (including reviewing the original PR that introduced this code path — airbytehq/airbyte#37443), here's a refined recommendation for this PR.

Context

Path B was never deliberately designed — it's the legacy default from before Maxime Carbonneau-Leclerc (@maxi297) added the ATE-aware Path A in #37443. The bug (passing the StreamThreadException wrapper instead of the inner exception) has been there since the concurrent source was built. This PR is fixing a real, longstanding defect and should be completed — not closed.

Recommended changes

  1. Pass inner exception instead of wrapper — already done, this is correct ✅

  2. Use a safe templated message instead of str(exception.exception) — Raw str() on arbitrary exceptions can leak internals. Replace with:

    message=f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}"

    This is specific enough to be useful (stream name + exception type) without exposing raw exception text to users.

  3. Keep failure_type as system_error (default) — no change needed. If a non-ATE exception reaches Path B, it is by definition unexpected/unhandled. system_error is the correct classification. If a specific error type (like 429 rate limits) needs a different failure_type, the fix belongs upstream — ensuring it's properly wrapped as an ATE so it hits Path A.

  4. Add an end-to-end regression test — A test that simulates a non-ATE exception propagating through the concurrent source pipeline and asserts:

    • The user-facing message is the templated format (not the generic fallback)
    • The stack trace comes from the inner exception (not the StreamThreadException wrapper)
    • failure_type is system_error

This comment was created by Claude Code.

@pnilan Patrick Nilan (pnilan) marked this pull request as ready for review March 9, 2026 19:43
Copilot AI review requested due to automatic review settings March 9, 2026 19:43
…edback

Co-Authored-By: bot_apk <apk@cognition.ai>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes loss of the underlying exception message when non-AirbyteTracedException errors propagate through the concurrent source pipeline, preventing a generic fallback error from being shown to users.

Changes:

  • Passes the inner exception from StreamThreadException into AirbyteTracedException.from_exception() and explicitly sets the message.
  • Updates concurrent source unit tests to assert on the inner exception’s stack trace/message.
  • Updates connector builder tests to assert on inner exception strings instead of "StreamThreadException".

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py Preserves user-facing error message by tracing the inner exception and setting message.
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py Adjusts assertions to match the new stack trace/message behavior.
unit_tests/connector_builder/test_connector_builder_handler.py Updates expected error strings to reflect inner exception traces.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Addressed all three recommended changes:

  1. Safe templated message — Replaced str(exception.exception) with:

    f"An unexpected error occurred in stream {exception.stream_name}: {type(exception.exception).__name__}"

    This exposes stream name + exception type without leaking raw exception text.

  2. failure_type stays system_error — No change needed per your guidance; non-ATE exceptions on Path B are by definition unexpected.

  3. Regression test addedtest_on_exception_non_ate_uses_templated_message_with_correct_failure_type asserts:

    • User-facing message uses the safe template (not generic fallback, not raw str())
    • Stack trace comes from inner exception (not StreamThreadException wrapper)
    • failure_type is system_error
    • Test uses ValueError("some internal detail: SELECT * FROM secrets") to prove raw text doesn't leak into the message

All 21 unit tests pass locally (concurrent read processor) + 6 connector builder tests pass.


Devin session · Requested by bot_apk (apk@cognition.ai)

@pnilan
Copy link
Copy Markdown
Contributor

(aside)

Impact Analysis: What this PR actually improves

The meaningful improvements

The primary value of this PR is debuggability, not the user-facing message.

By passing exception.exception (the inner exception) instead of exception (the StreamThreadException wrapper) to from_exception():

  1. internal_message goes from str(StreamThreadException(...)) → the actual exception text (e.g., "connection refused", "Error while refreshing access token"). This is what engineers see in logs and Sentry.

  2. Stack trace now originates from the real exception, not the concurrent infrastructure wrapper. In Sentry and sync logs, you'll see where the error actually happened — not where partition_reader.py caught and re-wrapped it. This is a significant improvement for diagnosing production issues.

  3. User-facing message improves marginally. The exception type name isn't meaningful to end users, so the recommendation is to use a stream-specific but honest fallback:

    "An unexpected error occurred in stream {stream_name}. See logs for details."
    

    This is still better than the current generic message because it tells the user which stream failed — useful when syncing dozens of streams.

Side effects to be aware of

Sentry grouping will change. Today, all non-ATE exceptions on Path B produce stack traces rooted in StreamThreadException. After this PR, each exception will produce a stack trace from its actual origin. This means:

  • Existing Sentry issue groups that aggregate multiple different root causes under StreamThreadException will split into separate issues grouped by their actual cause
  • Historical trend continuity on those Sentry issues will break — you'll see the old issue stop and new issues appear
  • Any Sentry alert rules or oncall automation that keys on specific Sentry issue IDs from Path B exceptions may need to be updated

This is a one-time disruption and the resulting grouping is more correct — errors from different root causes will no longer be lumped together. But it's worth being aware of during the rollout.


This comment was created by Claude Code.

@pnilan Patrick Nilan (pnilan) enabled auto-merge (squash) March 9, 2026 20:14
@pnilan Patrick Nilan (pnilan) merged commit f550424 into main Mar 9, 2026
29 checks passed
@pnilan Patrick Nilan (pnilan) deleted the devin/1772822472-fix-concurrent-source-error-message branch March 9, 2026 20:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants