Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 46 additions & 10 deletions airbyte_cdk/sources/message/concurrent_repository.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
import logging
import os
from queue import Queue
import threading
from collections import deque
from queue import Full, Queue
from typing import Callable, Iterable

from airbyte_cdk.models import AirbyteMessage, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.message.repository import LogMessage, MessageRepository
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem

Expand All @@ -23,25 +23,61 @@ class ConcurrentMessageRepository(MessageRepository):

This is particularly important for the connector builder which relies on grouping
of messages to organize request/response, pages, and partitions.

DEADLOCK PREVENTION:
The main thread is the sole consumer of the shared queue. If it calls queue.put()
while the queue is full, it deadlocks — nobody else will drain the queue.
This happens in 3 code paths from _handle_item:
1. PartitionCompleteSentinel → _on_stream_is_done → ensure_at_least_one_state_emitted → emit_message → queue.put(state)
2. PartitionGenerationCompletedSentinel → _on_stream_is_done → same path
3. Partition → on_partition → emit_message(slice_log) → queue.put(log)
To prevent this, the main thread uses non-blocking put(block=False). If the queue
is full, messages are buffered in _pending and drained via consume_queue(), which
the main thread calls after processing every queue item.
Worker threads continue using blocking put() for normal backpressure.
"""

def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepository):
self._queue = queue
self._decorated_message_repository = message_repository
# Capture the thread ID of the consumer (main thread) at construction time.
# This is always the main thread because ConcurrentSource.__init__ (and the
# declarative source that creates this repository) runs on the main thread.
self._consumer_thread_id = threading.get_ident()
# Buffer for messages that couldn't be put on the queue from the main thread
# because the queue was full. Drained by consume_queue().
# deque.append() and deque.popleft() are atomic in CPython (GIL-protected).
self._pending: deque[AirbyteMessage] = deque()

def _put_on_queue(self, message: AirbyteMessage) -> None:
"""Put a message on the shared queue, with deadlock prevention for the main thread."""
if threading.get_ident() == self._consumer_thread_id:
# Main thread (consumer): non-blocking to prevent self-deadlock.
# If queue is full, buffer the message — it will be drained via consume_queue().
try:
self._queue.put(message, block=False)
except Full:
self._pending.append(message)
else:
# Worker thread: blocking put for normal backpressure.
self._queue.put(message)

def emit_message(self, message: AirbyteMessage) -> None:
self._decorated_message_repository.emit_message(message)
for message in self._decorated_message_repository.consume_queue():
self._queue.put(message)
for msg in self._decorated_message_repository.consume_queue():
self._put_on_queue(msg)

def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
self._decorated_message_repository.log_message(level, message_provider)
for message in self._decorated_message_repository.consume_queue():
self._queue.put(message)
for msg in self._decorated_message_repository.consume_queue():
self._put_on_queue(msg)

def consume_queue(self) -> Iterable[AirbyteMessage]:
"""
This method shouldn't need to be called because as part of emit_message() we are already
loading messages onto the queue processed on the main thread.
Drain any messages that were buffered because the queue was full when the
main thread tried to put them. This is called by the main thread after
processing every queue item (in on_record, on_partition_complete_sentinel,
_on_stream_is_done), ensuring buffered messages are yielded promptly.
"""
yield from []
while self._pending:
yield self._pending.popleft()
Loading