Skip to content

fix(cdk): prevent deadlock when main thread puts on full queue#979

Draft
devin-ai-integration[bot] wants to merge 1 commit intomainfrom
devin/1775123959-fix-concurrent-queue-deadlock
Draft

fix(cdk): prevent deadlock when main thread puts on full queue#979
devin-ai-integration[bot] wants to merge 1 commit intomainfrom
devin/1775123959-fix-concurrent-queue-deadlock

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Summary

Resolves https://github.com/airbytehq/oncall/issues/11852

Fixes a production deadlock in ConcurrentMessageRepository where the main thread (sole queue consumer) blocks on queue.put() when the queue is full, causing a deadlock since no other thread can drain it. Confirmed via thread dump from Job 77457394.

The fix captures the consumer thread ID at construction. When the main thread calls emit_message/log_message, it uses non-blocking put(block=False); overflow goes to a deque (_pending) drained by consume_queue(), which the main thread already calls after every queue item. Worker threads still use blocking put for back-pressure.

Review & Testing Checklist for Human

  • Verify thread ID assumption: The fix assumes ConcurrentMessageRepository is constructed on the consumer (main) thread (threading.get_ident() in __init__). Confirm this holds in ConcurrentDeclarativeSource and any other construction sites. If constructed on a different thread, the blocking/non-blocking logic would be inverted.
  • Verify consume_queue callers: consume_queue() previously yielded nothing. It now yields buffered messages from _pending. Check that callers in concurrent_read_processor.py (lines ~187, ~217) correctly handle these newly-yielded messages (they should — they iterate and emit them).
  • Unbounded _pending buffer: The overflow deque is unbounded. In practice only a few messages land here per drain cycle (state messages, logs from _handle_item), but confirm this assumption holds under edge cases.
  • Threading test flakiness: test_worker_thread_uses_blocking_put uses wait(timeout=0.5) to assert the worker is blocked — could be flaky in slow CI. Consider whether this is acceptable.

Recommended test plan: Run a connector that previously hit this deadlock (e.g., the HubSpot source with contact_lists stream that triggered Job 77457394) and verify the sync completes without hanging.

Notes

  • Breaking change evaluation: Not breaking. No API, schema, spec, or state changes. Worker thread behavior is unchanged.
  • The import os removal is intentional — it was unused.
  • deque.append()/popleft() are GIL-atomic in CPython, but thread safety of _pending relies on the fact that only the main thread ever touches it (workers always take the blocking put path).

Link to Devin session: https://app.devin.ai/sessions/484590a2d8af4c51a978c6f23ee870c7

The main thread is the sole consumer of the shared Queue(maxsize=10000).
When it also produces into the queue via emit_message() or log_message()
and the queue is full, queue.put() blocks indefinitely — deadlock.

Fix: capture the consumer thread ID at construction. On the consumer
thread use non-blocking put; overflow is buffered in a deque and drained
via consume_queue(), which the main thread already calls after every
queue item. Worker threads still use blocking put for back-pressure.

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1775123959-fix-concurrent-queue-deadlock#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1775123959-fix-concurrent-queue-deadlock

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

PyTest Results (Fast)

3 997 tests  +8   3 986 ✅ +8   7m 33s ⏱️ +26s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 6ae5579. ± Comparison against base commit 0b94cbe.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

PyTest Results (Full)

4 000 tests  +8   3 988 ✅ +8   10m 41s ⏱️ -32s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 6ae5579. ± Comparison against base commit 0b94cbe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

0 participants