11# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
22import logging
3- import os
4- from queue import Queue
3+ import threading
4+ from collections import deque
5+ from queue import Full , Queue
56from typing import Callable , Iterable
67
78from airbyte_cdk .models import AirbyteMessage , Level
8- from airbyte_cdk .models import Type as MessageType
99from airbyte_cdk .sources .message .repository import LogMessage , MessageRepository
1010from airbyte_cdk .sources .streams .concurrent .partitions .types import QueueItem
1111
@@ -23,25 +23,61 @@ class ConcurrentMessageRepository(MessageRepository):
2323
2424 This is particularly important for the connector builder which relies on grouping
2525 of messages to organize request/response, pages, and partitions.
26+
27+ DEADLOCK PREVENTION:
28+ The main thread is the sole consumer of the shared queue. If it calls queue.put()
29+ while the queue is full, it deadlocks — nobody else will drain the queue.
30+ This happens in 3 code paths from _handle_item:
31+ 1. PartitionCompleteSentinel → _on_stream_is_done → ensure_at_least_one_state_emitted → emit_message → queue.put(state)
32+ 2. PartitionGenerationCompletedSentinel → _on_stream_is_done → same path
33+ 3. Partition → on_partition → emit_message(slice_log) → queue.put(log)
34+ To prevent this, the main thread uses non-blocking put(block=False). If the queue
35+ is full, messages are buffered in _pending and drained via consume_queue(), which
36+ the main thread calls after processing every queue item.
37+ Worker threads continue using blocking put() for normal backpressure.
2638 """
2739
2840 def __init__ (self , queue : Queue [QueueItem ], message_repository : MessageRepository ):
2941 self ._queue = queue
3042 self ._decorated_message_repository = message_repository
43+ # Capture the thread ID of the consumer (main thread) at construction time.
44+ # This is always the main thread because ConcurrentSource.__init__ (and the
45+ # declarative source that creates this repository) runs on the main thread.
46+ self ._consumer_thread_id = threading .get_ident ()
47+ # Buffer for messages that couldn't be put on the queue from the main thread
48+ # because the queue was full. Drained by consume_queue().
49+ # deque.append() and deque.popleft() are atomic in CPython (GIL-protected).
50+ self ._pending : deque [AirbyteMessage ] = deque ()
51+
52+ def _put_on_queue (self , message : AirbyteMessage ) -> None :
53+ """Put a message on the shared queue, with deadlock prevention for the main thread."""
54+ if threading .get_ident () == self ._consumer_thread_id :
55+ # Main thread (consumer): non-blocking to prevent self-deadlock.
56+ # If queue is full, buffer the message — it will be drained via consume_queue().
57+ try :
58+ self ._queue .put (message , block = False )
59+ except Full :
60+ self ._pending .append (message )
61+ else :
62+ # Worker thread: blocking put for normal backpressure.
63+ self ._queue .put (message )
3164
3265 def emit_message (self , message : AirbyteMessage ) -> None :
3366 self ._decorated_message_repository .emit_message (message )
34- for message in self ._decorated_message_repository .consume_queue ():
35- self ._queue . put ( message )
67+ for msg in self ._decorated_message_repository .consume_queue ():
68+ self ._put_on_queue ( msg )
3669
3770 def log_message (self , level : Level , message_provider : Callable [[], LogMessage ]) -> None :
3871 self ._decorated_message_repository .log_message (level , message_provider )
39- for message in self ._decorated_message_repository .consume_queue ():
40- self ._queue . put ( message )
72+ for msg in self ._decorated_message_repository .consume_queue ():
73+ self ._put_on_queue ( msg )
4174
4275 def consume_queue (self ) -> Iterable [AirbyteMessage ]:
4376 """
44- This method shouldn't need to be called because as part of emit_message() we are already
45- loading messages onto the queue processed on the main thread.
77+ Drain any messages that were buffered because the queue was full when the
78+ main thread tried to put them. This is called by the main thread after
79+ processing every queue item (in on_record, on_partition_complete_sentinel,
80+ _on_stream_is_done), ensuring buffered messages are yielded promptly.
4681 """
47- yield from []
82+ while self ._pending :
83+ yield self ._pending .popleft ()
0 commit comments