fix(cdk): prevent deadlock when main thread puts on full queue#979
Draft
devin-ai-integration[bot] wants to merge 1 commit intomainfrom
Draft
fix(cdk): prevent deadlock when main thread puts on full queue#979devin-ai-integration[bot] wants to merge 1 commit intomainfrom
devin-ai-integration[bot] wants to merge 1 commit intomainfrom
Conversation
The main thread is the sole consumer of the shared Queue(maxsize=10000). When it also produces into the queue via emit_message() or log_message() and the queue is full, queue.put() blocks indefinitely — deadlock. Fix: capture the consumer thread ID at construction. On the consumer thread use non-blocking put; overflow is buffered in a deque and drained via consume_queue(), which the main thread already calls after every queue item. Worker threads still use blocking put for back-pressure. Co-Authored-By: bot_apk <apk@cognition.ai>
Contributor
Author
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1775123959-fix-concurrent-queue-deadlock#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1775123959-fix-concurrent-queue-deadlockPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Resolves https://github.com/airbytehq/oncall/issues/11852
Fixes a production deadlock in
ConcurrentMessageRepositorywhere the main thread (sole queue consumer) blocks onqueue.put()when the queue is full, causing a deadlock since no other thread can drain it. Confirmed via thread dump from Job 77457394.The fix captures the consumer thread ID at construction. When the main thread calls
emit_message/log_message, it uses non-blockingput(block=False); overflow goes to adeque(_pending) drained byconsume_queue(), which the main thread already calls after every queue item. Worker threads still use blockingputfor back-pressure.Review & Testing Checklist for Human
ConcurrentMessageRepositoryis constructed on the consumer (main) thread (threading.get_ident()in__init__). Confirm this holds inConcurrentDeclarativeSourceand any other construction sites. If constructed on a different thread, the blocking/non-blocking logic would be inverted.consume_queuecallers:consume_queue()previously yielded nothing. It now yields buffered messages from_pending. Check that callers inconcurrent_read_processor.py(lines ~187, ~217) correctly handle these newly-yielded messages (they should — they iterate and emit them)._pendingbuffer: The overflow deque is unbounded. In practice only a few messages land here per drain cycle (state messages, logs from_handle_item), but confirm this assumption holds under edge cases.test_worker_thread_uses_blocking_putuseswait(timeout=0.5)to assert the worker is blocked — could be flaky in slow CI. Consider whether this is acceptable.Recommended test plan: Run a connector that previously hit this deadlock (e.g., the HubSpot source with
contact_listsstream that triggered Job 77457394) and verify the sync completes without hanging.Notes
import osremoval is intentional — it was unused.deque.append()/popleft()are GIL-atomic in CPython, but thread safety of_pendingrelies on the fact that only the main thread ever touches it (workers always take the blockingputpath).Link to Devin session: https://app.devin.ai/sessions/484590a2d8af4c51a978c6f23ee870c7