From 6ae55795cda10cd36b3c6726589c834eec61cd70 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:04:44 +0000 Subject: [PATCH] fix(cdk): prevent deadlock when main thread puts on full queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The main thread is the sole consumer of the shared Queue(maxsize=10000). When it also produces into the queue via emit_message() or log_message() and the queue is full, queue.put() blocks indefinitely — deadlock. Fix: capture the consumer thread ID at construction. On the consumer thread use non-blocking put; overflow is buffered in a deque and drained via consume_queue(), which the main thread already calls after every queue item. Worker threads still use blocking put for back-pressure. Co-Authored-By: bot_apk --- .../sources/message/concurrent_repository.py | 65 +++++-- .../message/test_concurrent_repository.py | 178 ++++++++++++++++++ 2 files changed, 224 insertions(+), 19 deletions(-) create mode 100644 unit_tests/sources/message/test_concurrent_repository.py diff --git a/airbyte_cdk/sources/message/concurrent_repository.py b/airbyte_cdk/sources/message/concurrent_repository.py index e3bc7116a..e19e1f461 100644 --- a/airbyte_cdk/sources/message/concurrent_repository.py +++ b/airbyte_cdk/sources/message/concurrent_repository.py @@ -1,7 +1,8 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. import logging -import os -from queue import Queue +import threading +from collections import deque +from queue import Full, Queue from typing import Callable, Iterable from airbyte_cdk.models import AirbyteMessage, Level @@ -13,35 +14,61 @@ class ConcurrentMessageRepository(MessageRepository): - """ - Message repository that immediately loads messages onto the queue processed on the - main thread. This ensures that messages are processed in the correct order they are - received. The InMemoryMessageRepository implementation does not have guaranteed - ordering since whether to process the main thread vs. partitions is non-deterministic - and there can be a lag between reading the main-thread and consuming messages on the - MessageRepository. - - This is particularly important for the connector builder which relies on grouping - of messages to organize request/response, pages, and partitions. + """Message repository that loads messages onto the shared concurrent queue. + + Messages are placed directly onto the queue consumed by the main thread. + This ensures correct ordering, which is especially important for the + connector builder's grouping of request/response, pages, and partitions. + + Deadlock prevention: the main thread is the sole consumer of the queue. + If it also calls ``emit_message`` or ``log_message`` (e.g. when emitting + a final state via ``ensure_at_least_one_state_emitted``), a blocking + ``put`` on a full queue would deadlock because no other thread can drain + it. To avoid this, the repository captures the consumer (main) thread + ID at construction time. Calls from the main thread use non-blocking + ``put``; overflow is buffered in ``_pending`` and drained on the next + ``consume_queue`` call, which the main thread already invokes after + processing every queue item. Worker threads continue to use blocking + ``put`` for normal back-pressure. """ def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepository): self._queue = queue self._decorated_message_repository = message_repository + self._consumer_thread_id: int = threading.get_ident() + self._pending: deque[AirbyteMessage] = deque() def emit_message(self, message: AirbyteMessage) -> None: self._decorated_message_repository.emit_message(message) - for message in self._decorated_message_repository.consume_queue(): - self._queue.put(message) + for queued_message in self._decorated_message_repository.consume_queue(): + self._put(queued_message) def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: self._decorated_message_repository.log_message(level, message_provider) - for message in self._decorated_message_repository.consume_queue(): - self._queue.put(message) + for queued_message in self._decorated_message_repository.consume_queue(): + self._put(queued_message) def consume_queue(self) -> Iterable[AirbyteMessage]: + """Drain any messages buffered because the queue was full. + + The main thread calls this after processing every queue item, so + buffered messages are delivered promptly without risking deadlock. """ - This method shouldn't need to be called because as part of emit_message() we are already - loading messages onto the queue processed on the main thread. + while self._pending: + yield self._pending.popleft() + + def _put(self, message: AirbyteMessage) -> None: + """Place a message on the shared queue. + + On the consumer (main) thread a non-blocking put is used; if the + queue is full the message is appended to ``_pending`` instead of + blocking. Worker threads use a normal blocking put so that + back-pressure is preserved. """ - yield from [] + if threading.get_ident() == self._consumer_thread_id: + try: + self._queue.put(message, block=False) + except Full: + self._pending.append(message) + else: + self._queue.put(message) diff --git a/unit_tests/sources/message/test_concurrent_repository.py b/unit_tests/sources/message/test_concurrent_repository.py new file mode 100644 index 000000000..4279b147d --- /dev/null +++ b/unit_tests/sources/message/test_concurrent_repository.py @@ -0,0 +1,178 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +import threading +from queue import Queue + +import pytest + +from airbyte_cdk.models import ( + AirbyteControlConnectorConfigMessage, + AirbyteControlMessage, + AirbyteMessage, + AirbyteStateMessage, + AirbyteStateType, + Level, + OrchestratorType, + Type, +) +from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository +from airbyte_cdk.sources.message.repository import InMemoryMessageRepository + + +def _make_state_message(stream_name: str = "test_stream") -> AirbyteMessage: + return AirbyteMessage( + type=Type.STATE, + state=AirbyteStateMessage(type=AirbyteStateType.STREAM, data={"stream_name": stream_name}), + ) + + +def _make_control_message() -> AirbyteMessage: + return AirbyteMessage( + type=Type.CONTROL, + control=AirbyteControlMessage( + type=OrchestratorType.CONNECTOR_CONFIG, + emitted_at=0, + connectorConfig=AirbyteControlConnectorConfigMessage(config={"key": "value"}), + ), + ) + + +@pytest.fixture() +def small_queue() -> Queue: + return Queue(maxsize=2) + + +@pytest.fixture() +def repo(small_queue: Queue) -> ConcurrentMessageRepository: + return ConcurrentMessageRepository(small_queue, InMemoryMessageRepository()) + + +def test_emit_message_puts_on_queue_when_space_available() -> None: + """When the queue has space, emit_message places the message directly on it.""" + queue: Queue = Queue(maxsize=100) + repo = ConcurrentMessageRepository(queue, InMemoryMessageRepository()) + + msg = _make_control_message() + repo.emit_message(msg) + + assert not queue.empty() + assert queue.get_nowait() == msg + + +def test_emit_message_buffers_when_queue_full_on_consumer_thread( + small_queue: Queue, repo: ConcurrentMessageRepository +) -> None: + """When called from the consumer thread with a full queue, the message goes to _pending.""" + small_queue.put("filler_1") + small_queue.put("filler_2") + assert small_queue.full() + + msg = _make_state_message() + repo.emit_message(msg) + + assert len(repo._pending) == 1 + assert repo._pending[0] == msg + + +def test_consume_queue_drains_pending_buffer( + small_queue: Queue, repo: ConcurrentMessageRepository +) -> None: + """consume_queue yields messages that were buffered due to a full queue.""" + small_queue.put("filler_1") + small_queue.put("filler_2") + + msg1 = _make_state_message("stream_1") + msg2 = _make_state_message("stream_2") + repo.emit_message(msg1) + repo.emit_message(msg2) + + drained = list(repo.consume_queue()) + assert drained == [msg1, msg2] + assert len(repo._pending) == 0 + + +def test_consume_queue_empty_when_no_pending(repo: ConcurrentMessageRepository) -> None: + """consume_queue yields nothing when there are no pending messages.""" + assert list(repo.consume_queue()) == [] + + +def test_log_message_buffers_when_queue_full_on_consumer_thread( + small_queue: Queue, repo: ConcurrentMessageRepository +) -> None: + """log_message also uses non-blocking put on the consumer thread.""" + small_queue.put("filler_1") + small_queue.put("filler_2") + + repo.log_message(Level.INFO, lambda: {"message": "test log"}) + + assert len(repo._pending) == 1 + + +def test_worker_thread_uses_blocking_put() -> None: + """Worker threads (non-consumer) should use blocking put for back-pressure.""" + queue: Queue = Queue(maxsize=1) + repo = ConcurrentMessageRepository(queue, InMemoryMessageRepository()) + + queue.put("filler") + + worker_started = threading.Event() + worker_done = threading.Event() + + def worker_emit() -> None: + worker_started.set() + repo.emit_message(_make_state_message()) + worker_done.set() + + t = threading.Thread(target=worker_emit, daemon=True) + t.start() + + worker_started.wait(timeout=1.0) + assert not worker_done.wait(timeout=0.5), "Worker should be blocked on full queue" + + queue.get() + assert worker_done.wait(timeout=2.0), "Worker should complete after queue space freed" + t.join(timeout=2.0) + + +def test_main_thread_does_not_deadlock_on_full_queue( + small_queue: Queue, repo: ConcurrentMessageRepository +) -> None: + """Simulate the deadlock: main thread emits on a full queue. + + Without the fix this would hang forever because the main thread + (sole consumer) blocks on queue.put() and nobody drains the queue. + """ + small_queue.put("record_1") + small_queue.put("record_2") + assert small_queue.full() + + state_msg = _make_state_message("contact_lists") + repo.emit_message(state_msg) # Must return immediately + + pending = list(repo.consume_queue()) + assert len(pending) == 1 + assert pending[0] == state_msg + + +def test_ordering_preserved_across_queue_and_pending( + small_queue: Queue, repo: ConcurrentMessageRepository +) -> None: + """Messages maintain order: first queued directly, overflow to pending.""" + msg1 = _make_state_message("stream_1") + msg2 = _make_state_message("stream_2") + msg3 = _make_state_message("stream_3") + + repo.emit_message(msg1) + repo.emit_message(msg2) + assert small_queue.qsize() == 2 + + repo.emit_message(msg3) + assert len(repo._pending) == 1 + + from_queue = [small_queue.get_nowait(), small_queue.get_nowait()] + from_pending = list(repo.consume_queue()) + all_messages = from_queue + from_pending + + assert all_messages == [msg1, msg2, msg3]