Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 46 additions & 19 deletions airbyte_cdk/sources/message/concurrent_repository.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
import logging
import os
from queue import Queue
import threading
from collections import deque
from queue import Full, Queue
from typing import Callable, Iterable

from airbyte_cdk.models import AirbyteMessage, Level
Expand All @@ -13,35 +14,61 @@


class ConcurrentMessageRepository(MessageRepository):
"""
Message repository that immediately loads messages onto the queue processed on the
main thread. This ensures that messages are processed in the correct order they are
received. The InMemoryMessageRepository implementation does not have guaranteed
ordering since whether to process the main thread vs. partitions is non-deterministic
and there can be a lag between reading the main-thread and consuming messages on the
MessageRepository.

This is particularly important for the connector builder which relies on grouping
of messages to organize request/response, pages, and partitions.
"""Message repository that loads messages onto the shared concurrent queue.

Messages are placed directly onto the queue consumed by the main thread.
This ensures correct ordering, which is especially important for the
connector builder's grouping of request/response, pages, and partitions.

Deadlock prevention: the main thread is the sole consumer of the queue.
If it also calls ``emit_message`` or ``log_message`` (e.g. when emitting
a final state via ``ensure_at_least_one_state_emitted``), a blocking
``put`` on a full queue would deadlock because no other thread can drain
it. To avoid this, the repository captures the consumer (main) thread
ID at construction time. Calls from the main thread use non-blocking
``put``; overflow is buffered in ``_pending`` and drained on the next
``consume_queue`` call, which the main thread already invokes after
processing every queue item. Worker threads continue to use blocking
``put`` for normal back-pressure.
"""

def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepository):
self._queue = queue
self._decorated_message_repository = message_repository
self._consumer_thread_id: int = threading.get_ident()
self._pending: deque[AirbyteMessage] = deque()

def emit_message(self, message: AirbyteMessage) -> None:
self._decorated_message_repository.emit_message(message)
for message in self._decorated_message_repository.consume_queue():
self._queue.put(message)
for queued_message in self._decorated_message_repository.consume_queue():
self._put(queued_message)

def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
self._decorated_message_repository.log_message(level, message_provider)
for message in self._decorated_message_repository.consume_queue():
self._queue.put(message)
for queued_message in self._decorated_message_repository.consume_queue():
self._put(queued_message)

def consume_queue(self) -> Iterable[AirbyteMessage]:
"""Drain any messages buffered because the queue was full.

The main thread calls this after processing every queue item, so
buffered messages are delivered promptly without risking deadlock.
"""
This method shouldn't need to be called because as part of emit_message() we are already
loading messages onto the queue processed on the main thread.
while self._pending:
yield self._pending.popleft()

def _put(self, message: AirbyteMessage) -> None:
"""Place a message on the shared queue.

On the consumer (main) thread a non-blocking put is used; if the
queue is full the message is appended to ``_pending`` instead of
blocking. Worker threads use a normal blocking put so that
back-pressure is preserved.
"""
yield from []
if threading.get_ident() == self._consumer_thread_id:
try:
self._queue.put(message, block=False)
except Full:
self._pending.append(message)
else:
self._queue.put(message)
178 changes: 178 additions & 0 deletions unit_tests/sources/message/test_concurrent_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import threading
from queue import Queue

import pytest

from airbyte_cdk.models import (
AirbyteControlConnectorConfigMessage,
AirbyteControlMessage,
AirbyteMessage,
AirbyteStateMessage,
AirbyteStateType,
Level,
OrchestratorType,
Type,
)
from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository


def _make_state_message(stream_name: str = "test_stream") -> AirbyteMessage:
return AirbyteMessage(
type=Type.STATE,
state=AirbyteStateMessage(type=AirbyteStateType.STREAM, data={"stream_name": stream_name}),
)


def _make_control_message() -> AirbyteMessage:
return AirbyteMessage(
type=Type.CONTROL,
control=AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=0,
connectorConfig=AirbyteControlConnectorConfigMessage(config={"key": "value"}),
),
)


@pytest.fixture()
def small_queue() -> Queue:
return Queue(maxsize=2)


@pytest.fixture()
def repo(small_queue: Queue) -> ConcurrentMessageRepository:
return ConcurrentMessageRepository(small_queue, InMemoryMessageRepository())


def test_emit_message_puts_on_queue_when_space_available() -> None:
"""When the queue has space, emit_message places the message directly on it."""
queue: Queue = Queue(maxsize=100)
repo = ConcurrentMessageRepository(queue, InMemoryMessageRepository())

msg = _make_control_message()
repo.emit_message(msg)

assert not queue.empty()
assert queue.get_nowait() == msg


def test_emit_message_buffers_when_queue_full_on_consumer_thread(
small_queue: Queue, repo: ConcurrentMessageRepository
) -> None:
"""When called from the consumer thread with a full queue, the message goes to _pending."""
small_queue.put("filler_1")
small_queue.put("filler_2")
assert small_queue.full()

msg = _make_state_message()
repo.emit_message(msg)

assert len(repo._pending) == 1
assert repo._pending[0] == msg


def test_consume_queue_drains_pending_buffer(
small_queue: Queue, repo: ConcurrentMessageRepository
) -> None:
"""consume_queue yields messages that were buffered due to a full queue."""
small_queue.put("filler_1")
small_queue.put("filler_2")

msg1 = _make_state_message("stream_1")
msg2 = _make_state_message("stream_2")
repo.emit_message(msg1)
repo.emit_message(msg2)

drained = list(repo.consume_queue())
assert drained == [msg1, msg2]
assert len(repo._pending) == 0


def test_consume_queue_empty_when_no_pending(repo: ConcurrentMessageRepository) -> None:
"""consume_queue yields nothing when there are no pending messages."""
assert list(repo.consume_queue()) == []


def test_log_message_buffers_when_queue_full_on_consumer_thread(
small_queue: Queue, repo: ConcurrentMessageRepository
) -> None:
"""log_message also uses non-blocking put on the consumer thread."""
small_queue.put("filler_1")
small_queue.put("filler_2")

repo.log_message(Level.INFO, lambda: {"message": "test log"})

assert len(repo._pending) == 1


def test_worker_thread_uses_blocking_put() -> None:
"""Worker threads (non-consumer) should use blocking put for back-pressure."""
queue: Queue = Queue(maxsize=1)
repo = ConcurrentMessageRepository(queue, InMemoryMessageRepository())

queue.put("filler")

worker_started = threading.Event()
worker_done = threading.Event()

def worker_emit() -> None:
worker_started.set()
repo.emit_message(_make_state_message())
worker_done.set()

t = threading.Thread(target=worker_emit, daemon=True)
t.start()

worker_started.wait(timeout=1.0)
assert not worker_done.wait(timeout=0.5), "Worker should be blocked on full queue"

queue.get()
assert worker_done.wait(timeout=2.0), "Worker should complete after queue space freed"
t.join(timeout=2.0)


def test_main_thread_does_not_deadlock_on_full_queue(
small_queue: Queue, repo: ConcurrentMessageRepository
) -> None:
"""Simulate the deadlock: main thread emits on a full queue.

Without the fix this would hang forever because the main thread
(sole consumer) blocks on queue.put() and nobody drains the queue.
"""
small_queue.put("record_1")
small_queue.put("record_2")
assert small_queue.full()

state_msg = _make_state_message("contact_lists")
repo.emit_message(state_msg) # Must return immediately

pending = list(repo.consume_queue())
assert len(pending) == 1
assert pending[0] == state_msg


def test_ordering_preserved_across_queue_and_pending(
small_queue: Queue, repo: ConcurrentMessageRepository
) -> None:
"""Messages maintain order: first queued directly, overflow to pending."""
msg1 = _make_state_message("stream_1")
msg2 = _make_state_message("stream_2")
msg3 = _make_state_message("stream_3")

repo.emit_message(msg1)
repo.emit_message(msg2)
assert small_queue.qsize() == 2

repo.emit_message(msg3)
assert len(repo._pending) == 1

from_queue = [small_queue.get_nowait(), small_queue.get_nowait()]
from_pending = list(repo.consume_queue())
all_messages = from_queue + from_pending

assert all_messages == [msg1, msg2, msg3]
Loading