Skip to content

fix: prevent deadlock by returning messages directly instead of queueing#978

Open
devin-ai-integration[bot] wants to merge 5 commits intomainfrom
devin/1775123731-deadlock-fix-direct-return
Open

fix: prevent deadlock by returning messages directly instead of queueing#978
devin-ai-integration[bot] wants to merge 5 commits intomainfrom
devin/1775123731-deadlock-fix-direct-return

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Apr 2, 2026

Summary

Alternative approach to fixing the concurrent source deadlock (see also PR #977 for the non-blocking put approach).

Root cause: The main thread is the sole consumer of the shared Queue(maxsize=10,000). In 3 code paths inside _handle_item, the main thread also tries to produce into the same queue via ConcurrentMessageRepository.emit_message()queue.put(). When the queue is full, the main thread deadlocks on itself.

This fix: Changes the Cursor ABC so ensure_at_least_one_state_emitted() returns Iterable[AirbyteMessage] instead of -> None. The main thread yields these messages directly to stdout, never touching the shared queue. Similarly, on_partition() now yields slice log messages directly.

Files changed:

  • cursor.py — ABC + FinalStateCursor + ConcurrentCursor signature change
  • concurrent_partition_cursor.py — New _create_state_message() method (parallel to existing _emit_state_message())
  • file_based_concurrent_cursor.py, file_based_final_state_cursor.py, abstract_concurrent_file_based_cursor.py — Updated implementations
  • concurrent_read_processor.pyon_partition() returns Iterable[AirbyteMessage]; _on_stream_is_done() uses yield from
  • concurrent_source.pyyield from on on_partition()
  • substream_partition_router.py — Consumes (and discards) the returned iterator on worker thread

Updates since last revision

  • Fixed integration test test_concurrent_source_yields_the_same_messages_as_abstract_source_when_no_exceptions_are_raised. Previously, FinalStateCursor.ensure_at_least_one_state_emitted() emitted state via InMemoryMessageRepository which was a dead end — those state messages never reached the output. With the direct-return approach, state messages are now properly yielded to the caller. The test now expects these STATE messages (one per stream), which is a behavior change — state is now actually surfaced for FinalStateCursor streams.
  • Fixed 4 tests in test_concurrent_perpartitioncursor.py that called ensure_at_least_one_state_emitted() without consuming the returned generator. Updated assertion in test_given_all_partitions_finished_when_close_partition_then_final_state_emitted from emit_message.call_count == 2 to == 1 (since ensure_at_least_one_state_emitted now yields directly instead of going through the message repository).
  • Fixed 2 tests in test_substream_partition_router.py where Mock() cursors returned a non-iterable object from ensure_at_least_one_state_emitted(). Set mock_cursor.ensure_at_least_one_state_emitted.return_value = [].

Review & Testing Checklist for Human

  • on_partition() is now a generator — verify ALL callers iterate it. If any caller forgets list() or yield from, the method body (partition tracking, thread submission) silently won't execute. Search the full codebase for .on_partition( calls beyond what's in this diff.
  • FileBasedConcurrentCursor.ensure_at_least_one_state_emitted() yields inside with self._state_lock. Because this is a generator, the lock is held across the yield boundary until the caller consumes the value. If a worker thread tries to acquire _state_lock concurrently, it will block until the main thread's caller chain finishes processing the yielded message. Verify this cannot cause contention or secondary deadlock.
  • substream_partition_router.py discards returned state messages. The comment claims close_partition() already emitted state via the queue. Verify this is always true — are there edge cases where the discarded messages from ensure_at_least_one_state_emitted() are the only state for that stream?
  • _create_state_message() vs _emit_state_message() duplication in concurrent_partition_cursor.py. These share throttling + global cursor skip logic. Confirm they're consistent and consider whether the duplication is acceptable long-term.
  • Behavior change: FinalStateCursor streams now emit STATE messages. Previously these were silently swallowed by InMemoryMessageRepository. Verify downstream consumers (platform, orchestrator) handle the additional state messages correctly.
  • Run a real sync with a multi-stream connector (e.g., HubSpot with 34 streams against a noop destination) to validate no deadlock and correct state emission. Unit tests only cover mocked interactions.

Notes

  • This is a breaking change to the Cursor ABC. Any out-of-tree cursor implementations will need updating.
  • The _create_state_message / _emit_state_message split exists because worker threads (via close_partition()) still need to emit through the queue, while the main thread (via ensure_at_least_one_state_emitted()) must bypass it.
  • Compare with PR fix: prevent deadlock when main thread puts on full queue #977 (non-blocking put + overflow buffer) which achieves the same fix with a single-file change and no API changes.

Link to Devin session: https://app.devin.ai/sessions/ad184113df474f0ba37ede09cdac7eaf

Changes the Cursor ABC and all implementations so that
ensure_at_least_one_state_emitted() returns Iterable[AirbyteMessage]
instead of putting messages on the shared queue. The main thread
(consumer) yields these messages directly, eliminating the deadlock
where it would block on queue.put() into its own full queue.

Also changes on_partition() to yield slice log messages directly
instead of emitting through the message repository.

Modified files:
- cursor.py: ABC + FinalStateCursor + ConcurrentCursor
- concurrent_partition_cursor.py: Added _create_state_message()
- file_based_concurrent_cursor.py
- file_based_final_state_cursor.py
- abstract_concurrent_file_based_cursor.py
- concurrent_read_processor.py: on_partition() and _on_stream_is_done()
- concurrent_source.py: yield from on_partition()
- substream_partition_router.py: consume returned iterator

Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1775123731-deadlock-fix-direct-return#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1775123731-deadlock-fix-direct-return

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>

@abstractmethod
def ensure_at_least_one_state_emitted(self) -> None: ...
def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: ...
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

PyTest Results (Fast)

3 989 tests  ±0   3 978 ✅ ±0   7m 36s ⏱️ +29s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit e154d08. ± Comparison against base commit 0b94cbe.

♻️ This comment has been updated with latest results.

devin-ai-integration bot and others added 3 commits April 2, 2026 10:14
…n approach

Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
…ion cursor tests

Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
…le in substream tests

Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

PyTest Results (Full)

3 992 tests  ±0   3 980 ✅ ±0   11m 52s ⏱️ +39s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit e154d08. ± Comparison against base commit 0b94cbe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

0 participants