fix: prevent deadlock by returning messages directly instead of queueing#978
Open
devin-ai-integration[bot] wants to merge 5 commits intomainfrom
Open
fix: prevent deadlock by returning messages directly instead of queueing#978devin-ai-integration[bot] wants to merge 5 commits intomainfrom
devin-ai-integration[bot] wants to merge 5 commits intomainfrom
Conversation
Changes the Cursor ABC and all implementations so that ensure_at_least_one_state_emitted() returns Iterable[AirbyteMessage] instead of putting messages on the shared queue. The main thread (consumer) yields these messages directly, eliminating the deadlock where it would block on queue.put() into its own full queue. Also changes on_partition() to yield slice log messages directly instead of emitting through the message repository. Modified files: - cursor.py: ABC + FinalStateCursor + ConcurrentCursor - concurrent_partition_cursor.py: Added _create_state_message() - file_based_concurrent_cursor.py - file_based_final_state_cursor.py - abstract_concurrent_file_based_cursor.py - concurrent_read_processor.py: on_partition() and _on_stream_is_done() - concurrent_source.py: yield from on_partition() - substream_partition_router.py: consume returned iterator Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
Contributor
Author
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1775123731-deadlock-fix-direct-return#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1775123731-deadlock-fix-direct-returnPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
…n approach Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
…ion cursor tests Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
…le in substream tests Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Alternative approach to fixing the concurrent source deadlock (see also PR #977 for the non-blocking put approach).
Root cause: The main thread is the sole consumer of the shared
Queue(maxsize=10,000). In 3 code paths inside_handle_item, the main thread also tries to produce into the same queue viaConcurrentMessageRepository.emit_message()→queue.put(). When the queue is full, the main thread deadlocks on itself.This fix: Changes the
CursorABC soensure_at_least_one_state_emitted()returnsIterable[AirbyteMessage]instead of-> None. The main thread yields these messages directly to stdout, never touching the shared queue. Similarly,on_partition()now yields slice log messages directly.Files changed:
cursor.py— ABC +FinalStateCursor+ConcurrentCursorsignature changeconcurrent_partition_cursor.py— New_create_state_message()method (parallel to existing_emit_state_message())file_based_concurrent_cursor.py,file_based_final_state_cursor.py,abstract_concurrent_file_based_cursor.py— Updated implementationsconcurrent_read_processor.py—on_partition()returnsIterable[AirbyteMessage];_on_stream_is_done()usesyield fromconcurrent_source.py—yield fromonon_partition()substream_partition_router.py— Consumes (and discards) the returned iterator on worker threadUpdates since last revision
test_concurrent_source_yields_the_same_messages_as_abstract_source_when_no_exceptions_are_raised. Previously,FinalStateCursor.ensure_at_least_one_state_emitted()emitted state viaInMemoryMessageRepositorywhich was a dead end — those state messages never reached the output. With the direct-return approach, state messages are now properly yielded to the caller. The test now expects these STATE messages (one per stream), which is a behavior change — state is now actually surfaced forFinalStateCursorstreams.test_concurrent_perpartitioncursor.pythat calledensure_at_least_one_state_emitted()without consuming the returned generator. Updated assertion intest_given_all_partitions_finished_when_close_partition_then_final_state_emittedfromemit_message.call_count == 2to== 1(sinceensure_at_least_one_state_emittednow yields directly instead of going through the message repository).test_substream_partition_router.pywhereMock()cursors returned a non-iterable object fromensure_at_least_one_state_emitted(). Setmock_cursor.ensure_at_least_one_state_emitted.return_value = [].Review & Testing Checklist for Human
on_partition()is now a generator — verify ALL callers iterate it. If any caller forgetslist()oryield from, the method body (partition tracking, thread submission) silently won't execute. Search the full codebase for.on_partition(calls beyond what's in this diff.FileBasedConcurrentCursor.ensure_at_least_one_state_emitted()yields insidewith self._state_lock. Because this is a generator, the lock is held across the yield boundary until the caller consumes the value. If a worker thread tries to acquire_state_lockconcurrently, it will block until the main thread's caller chain finishes processing the yielded message. Verify this cannot cause contention or secondary deadlock.substream_partition_router.pydiscards returned state messages. The comment claimsclose_partition()already emitted state via the queue. Verify this is always true — are there edge cases where the discarded messages fromensure_at_least_one_state_emitted()are the only state for that stream?_create_state_message()vs_emit_state_message()duplication inconcurrent_partition_cursor.py. These share throttling + global cursor skip logic. Confirm they're consistent and consider whether the duplication is acceptable long-term.FinalStateCursorstreams now emit STATE messages. Previously these were silently swallowed byInMemoryMessageRepository. Verify downstream consumers (platform, orchestrator) handle the additional state messages correctly.Notes
_create_state_message/_emit_state_messagesplit exists because worker threads (viaclose_partition()) still need to emit through the queue, while the main thread (viaensure_at_least_one_state_emitted()) must bypass it.Link to Devin session: https://app.devin.ai/sessions/ad184113df474f0ba37ede09cdac7eaf