Skip to content

Commit 6ae5579

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix(cdk): prevent deadlock when main thread puts on full queue
The main thread is the sole consumer of the shared Queue(maxsize=10000). When it also produces into the queue via emit_message() or log_message() and the queue is full, queue.put() blocks indefinitely — deadlock. Fix: capture the consumer thread ID at construction. On the consumer thread use non-blocking put; overflow is buffered in a deque and drained via consume_queue(), which the main thread already calls after every queue item. Worker threads still use blocking put for back-pressure. Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 69cd63d commit 6ae5579

2 files changed

Lines changed: 224 additions & 19 deletions

File tree

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
22
import logging
3-
import os
4-
from queue import Queue
3+
import threading
4+
from collections import deque
5+
from queue import Full, Queue
56
from typing import Callable, Iterable
67

78
from airbyte_cdk.models import AirbyteMessage, Level
@@ -13,35 +14,61 @@
1314

1415

1516
class ConcurrentMessageRepository(MessageRepository):
16-
"""
17-
Message repository that immediately loads messages onto the queue processed on the
18-
main thread. This ensures that messages are processed in the correct order they are
19-
received. The InMemoryMessageRepository implementation does not have guaranteed
20-
ordering since whether to process the main thread vs. partitions is non-deterministic
21-
and there can be a lag between reading the main-thread and consuming messages on the
22-
MessageRepository.
23-
24-
This is particularly important for the connector builder which relies on grouping
25-
of messages to organize request/response, pages, and partitions.
17+
"""Message repository that loads messages onto the shared concurrent queue.
18+
19+
Messages are placed directly onto the queue consumed by the main thread.
20+
This ensures correct ordering, which is especially important for the
21+
connector builder's grouping of request/response, pages, and partitions.
22+
23+
Deadlock prevention: the main thread is the sole consumer of the queue.
24+
If it also calls ``emit_message`` or ``log_message`` (e.g. when emitting
25+
a final state via ``ensure_at_least_one_state_emitted``), a blocking
26+
``put`` on a full queue would deadlock because no other thread can drain
27+
it. To avoid this, the repository captures the consumer (main) thread
28+
ID at construction time. Calls from the main thread use non-blocking
29+
``put``; overflow is buffered in ``_pending`` and drained on the next
30+
``consume_queue`` call, which the main thread already invokes after
31+
processing every queue item. Worker threads continue to use blocking
32+
``put`` for normal back-pressure.
2633
"""
2734

2835
def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepository):
2936
self._queue = queue
3037
self._decorated_message_repository = message_repository
38+
self._consumer_thread_id: int = threading.get_ident()
39+
self._pending: deque[AirbyteMessage] = deque()
3140

3241
def emit_message(self, message: AirbyteMessage) -> None:
3342
self._decorated_message_repository.emit_message(message)
34-
for message in self._decorated_message_repository.consume_queue():
35-
self._queue.put(message)
43+
for queued_message in self._decorated_message_repository.consume_queue():
44+
self._put(queued_message)
3645

3746
def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
3847
self._decorated_message_repository.log_message(level, message_provider)
39-
for message in self._decorated_message_repository.consume_queue():
40-
self._queue.put(message)
48+
for queued_message in self._decorated_message_repository.consume_queue():
49+
self._put(queued_message)
4150

4251
def consume_queue(self) -> Iterable[AirbyteMessage]:
52+
"""Drain any messages buffered because the queue was full.
53+
54+
The main thread calls this after processing every queue item, so
55+
buffered messages are delivered promptly without risking deadlock.
4356
"""
44-
This method shouldn't need to be called because as part of emit_message() we are already
45-
loading messages onto the queue processed on the main thread.
57+
while self._pending:
58+
yield self._pending.popleft()
59+
60+
def _put(self, message: AirbyteMessage) -> None:
61+
"""Place a message on the shared queue.
62+
63+
On the consumer (main) thread a non-blocking put is used; if the
64+
queue is full the message is appended to ``_pending`` instead of
65+
blocking. Worker threads use a normal blocking put so that
66+
back-pressure is preserved.
4667
"""
47-
yield from []
68+
if threading.get_ident() == self._consumer_thread_id:
69+
try:
70+
self._queue.put(message, block=False)
71+
except Full:
72+
self._pending.append(message)
73+
else:
74+
self._queue.put(message)
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import threading
6+
from queue import Queue
7+
8+
import pytest
9+
10+
from airbyte_cdk.models import (
11+
AirbyteControlConnectorConfigMessage,
12+
AirbyteControlMessage,
13+
AirbyteMessage,
14+
AirbyteStateMessage,
15+
AirbyteStateType,
16+
Level,
17+
OrchestratorType,
18+
Type,
19+
)
20+
from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
21+
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
22+
23+
24+
def _make_state_message(stream_name: str = "test_stream") -> AirbyteMessage:
25+
return AirbyteMessage(
26+
type=Type.STATE,
27+
state=AirbyteStateMessage(type=AirbyteStateType.STREAM, data={"stream_name": stream_name}),
28+
)
29+
30+
31+
def _make_control_message() -> AirbyteMessage:
32+
return AirbyteMessage(
33+
type=Type.CONTROL,
34+
control=AirbyteControlMessage(
35+
type=OrchestratorType.CONNECTOR_CONFIG,
36+
emitted_at=0,
37+
connectorConfig=AirbyteControlConnectorConfigMessage(config={"key": "value"}),
38+
),
39+
)
40+
41+
42+
@pytest.fixture()
43+
def small_queue() -> Queue:
44+
return Queue(maxsize=2)
45+
46+
47+
@pytest.fixture()
48+
def repo(small_queue: Queue) -> ConcurrentMessageRepository:
49+
return ConcurrentMessageRepository(small_queue, InMemoryMessageRepository())
50+
51+
52+
def test_emit_message_puts_on_queue_when_space_available() -> None:
53+
"""When the queue has space, emit_message places the message directly on it."""
54+
queue: Queue = Queue(maxsize=100)
55+
repo = ConcurrentMessageRepository(queue, InMemoryMessageRepository())
56+
57+
msg = _make_control_message()
58+
repo.emit_message(msg)
59+
60+
assert not queue.empty()
61+
assert queue.get_nowait() == msg
62+
63+
64+
def test_emit_message_buffers_when_queue_full_on_consumer_thread(
65+
small_queue: Queue, repo: ConcurrentMessageRepository
66+
) -> None:
67+
"""When called from the consumer thread with a full queue, the message goes to _pending."""
68+
small_queue.put("filler_1")
69+
small_queue.put("filler_2")
70+
assert small_queue.full()
71+
72+
msg = _make_state_message()
73+
repo.emit_message(msg)
74+
75+
assert len(repo._pending) == 1
76+
assert repo._pending[0] == msg
77+
78+
79+
def test_consume_queue_drains_pending_buffer(
80+
small_queue: Queue, repo: ConcurrentMessageRepository
81+
) -> None:
82+
"""consume_queue yields messages that were buffered due to a full queue."""
83+
small_queue.put("filler_1")
84+
small_queue.put("filler_2")
85+
86+
msg1 = _make_state_message("stream_1")
87+
msg2 = _make_state_message("stream_2")
88+
repo.emit_message(msg1)
89+
repo.emit_message(msg2)
90+
91+
drained = list(repo.consume_queue())
92+
assert drained == [msg1, msg2]
93+
assert len(repo._pending) == 0
94+
95+
96+
def test_consume_queue_empty_when_no_pending(repo: ConcurrentMessageRepository) -> None:
97+
"""consume_queue yields nothing when there are no pending messages."""
98+
assert list(repo.consume_queue()) == []
99+
100+
101+
def test_log_message_buffers_when_queue_full_on_consumer_thread(
102+
small_queue: Queue, repo: ConcurrentMessageRepository
103+
) -> None:
104+
"""log_message also uses non-blocking put on the consumer thread."""
105+
small_queue.put("filler_1")
106+
small_queue.put("filler_2")
107+
108+
repo.log_message(Level.INFO, lambda: {"message": "test log"})
109+
110+
assert len(repo._pending) == 1
111+
112+
113+
def test_worker_thread_uses_blocking_put() -> None:
114+
"""Worker threads (non-consumer) should use blocking put for back-pressure."""
115+
queue: Queue = Queue(maxsize=1)
116+
repo = ConcurrentMessageRepository(queue, InMemoryMessageRepository())
117+
118+
queue.put("filler")
119+
120+
worker_started = threading.Event()
121+
worker_done = threading.Event()
122+
123+
def worker_emit() -> None:
124+
worker_started.set()
125+
repo.emit_message(_make_state_message())
126+
worker_done.set()
127+
128+
t = threading.Thread(target=worker_emit, daemon=True)
129+
t.start()
130+
131+
worker_started.wait(timeout=1.0)
132+
assert not worker_done.wait(timeout=0.5), "Worker should be blocked on full queue"
133+
134+
queue.get()
135+
assert worker_done.wait(timeout=2.0), "Worker should complete after queue space freed"
136+
t.join(timeout=2.0)
137+
138+
139+
def test_main_thread_does_not_deadlock_on_full_queue(
140+
small_queue: Queue, repo: ConcurrentMessageRepository
141+
) -> None:
142+
"""Simulate the deadlock: main thread emits on a full queue.
143+
144+
Without the fix this would hang forever because the main thread
145+
(sole consumer) blocks on queue.put() and nobody drains the queue.
146+
"""
147+
small_queue.put("record_1")
148+
small_queue.put("record_2")
149+
assert small_queue.full()
150+
151+
state_msg = _make_state_message("contact_lists")
152+
repo.emit_message(state_msg) # Must return immediately
153+
154+
pending = list(repo.consume_queue())
155+
assert len(pending) == 1
156+
assert pending[0] == state_msg
157+
158+
159+
def test_ordering_preserved_across_queue_and_pending(
160+
small_queue: Queue, repo: ConcurrentMessageRepository
161+
) -> None:
162+
"""Messages maintain order: first queued directly, overflow to pending."""
163+
msg1 = _make_state_message("stream_1")
164+
msg2 = _make_state_message("stream_2")
165+
msg3 = _make_state_message("stream_3")
166+
167+
repo.emit_message(msg1)
168+
repo.emit_message(msg2)
169+
assert small_queue.qsize() == 2
170+
171+
repo.emit_message(msg3)
172+
assert len(repo._pending) == 1
173+
174+
from_queue = [small_queue.get_nowait(), small_queue.get_nowait()]
175+
from_pending = list(repo.consume_queue())
176+
all_messages = from_queue + from_pending
177+
178+
assert all_messages == [msg1, msg2, msg3]

0 commit comments

Comments
 (0)