diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d6c92f9fa..c090f5a44 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -7,10 +7,14 @@ import ipaddress import json import logging +import os import os.path import socket import sys import tempfile +import threading +import time +import traceback from collections import defaultdict from functools import wraps from typing import Any, DefaultDict, Iterable, List, Mapping, Optional @@ -374,13 +378,103 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: def launch(source: Source, args: List[str]) -> None: source_entrypoint = AirbyteEntrypoint(source) parsed_args = source_entrypoint.parse_args(args) + + # Heartbeat state — shared with the background heartbeat thread. + _HEARTBEAT_INTERVAL_S = 30.0 + messages_written = 0 + bytes_written = 0 + print_blocked = False + print_blocked_since = 0.0 + heartbeat_stop = threading.Event() + + def _heartbeat() -> None: + """Emit periodic status to stderr to diagnose stdout pipe blocking and deadlocks. + + Writes directly to fd 2 (stderr) which the Kubernetes container + runtime collects independently of the orchestrator reading stdout. + + When a stall is detected (message count frozen for 3+ intervals = 90s), + a full thread dump is emitted to help diagnose deadlocks in the + concurrent source worker pool. + """ + from airbyte_cdk.sources.concurrent_source.queue_registry import get_queue + + start = time.monotonic() + stderr_fd = 2 + last_msgs = 0 + stall_count = 0 + _STALL_THRESHOLD = 3 # intervals before triggering thread dump (3 * 30s = 90s) + _DUMP_REPEAT_INTERVAL = 10 # re-dump every 10 intervals (~5 min) during ongoing stall + + while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S): + now = time.monotonic() + elapsed = now - start + + # Detect stall: same message count for multiple consecutive intervals + if messages_written == last_msgs and messages_written > 0: + stall_count += 1 + else: + stall_count = 0 + last_msgs = messages_written + + blocked_str = "YES" if print_blocked else "NO" + blocked_dur = ( + f" blocked_since={now - print_blocked_since:.0f}s" if print_blocked else "" + ) + + # Include queue stats if concurrent source is active + queue_stats = "" + q = get_queue() + if q is not None: + try: + queue_stats = f" queue_size={q.qsize()} queue_full={q.full()}" + except Exception: + pass # Queue methods are best-effort + + line = ( + f"STDOUT_HEARTBEAT: t={elapsed:.0f}s " + f"msgs={messages_written} bytes={bytes_written} " + f"print_blocked={blocked_str}{blocked_dur}" + f"{queue_stats}\n" + ) + + # Dump all thread stacks when stall is detected, then periodically during ongoing stall + if stall_count == _STALL_THRESHOLD or ( + stall_count > _STALL_THRESHOLD + and (stall_count - _STALL_THRESHOLD) % _DUMP_REPEAT_INTERVAL == 0 + ): + line += "=== THREAD DUMP (stall detected) ===\n" + thread_names = {t.ident: t.name for t in threading.enumerate()} + for thread_id, frame in sys._current_frames().items(): + thread_name = thread_names.get(thread_id, "unknown") + line += f"\nThread {thread_name} ({thread_id}):\n" + line += "".join(traceback.format_stack(frame)) + line += "=== END THREAD DUMP ===\n" + + try: + os.write(stderr_fd, line.encode()) + except OSError: + pass # Best-effort diagnostic — if stderr (fd 2) is broken, silently give up. + + heartbeat_thread = threading.Thread(target=_heartbeat, name="stdout-heartbeat", daemon=True) + heartbeat_thread.start() + # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs # Refer to: https://github.com/airbytehq/oncall/issues/6235 - with PRINT_BUFFER: - for message in source_entrypoint.run(parsed_args): - # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and - # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time - print(f"{message}\n", end="") + try: + with PRINT_BUFFER: + for message in source_entrypoint.run(parsed_args): + # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and + # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time + data = f"{message}\n" + print_blocked = True + print_blocked_since = time.monotonic() + print(data, end="") + print_blocked = False + messages_written += 1 + bytes_written += len(data) + finally: + heartbeat_stop.set() def _init_internal_request_filter() -> None: diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..6ffbdb0cd 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -12,6 +12,7 @@ from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( PartitionGenerationCompletedSentinel, ) +from airbyte_cdk.sources.concurrent_source.queue_registry import register_queue, unregister_queue from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository @@ -106,29 +107,34 @@ def read( streams: List[AbstractStream], ) -> Iterator[AirbyteMessage]: self._logger.info("Starting syncing") - concurrent_stream_processor = ConcurrentReadProcessor( - streams, - PartitionEnqueuer(self._queue, self._threadpool), - self._threadpool, - self._logger, - self._slice_logger, - self._message_repository, - PartitionReader( - self._queue, - PartitionLogger(self._slice_logger, self._logger, self._message_repository), - ), - ) + # Register queue so the heartbeat thread can report queue stats for deadlock diagnosis + register_queue(self._queue) + try: + concurrent_stream_processor = ConcurrentReadProcessor( + streams, + PartitionEnqueuer(self._queue, self._threadpool), + self._threadpool, + self._logger, + self._slice_logger, + self._message_repository, + PartitionReader( + self._queue, + PartitionLogger(self._slice_logger, self._logger, self._message_repository), + ), + ) - # Enqueue initial partition generation tasks - yield from self._submit_initial_partition_generators(concurrent_stream_processor) + # Enqueue initial partition generation tasks + yield from self._submit_initial_partition_generators(concurrent_stream_processor) - # Read from the queue until all partitions were generated and read - yield from self._consume_from_queue( - self._queue, - concurrent_stream_processor, - ) - self._threadpool.check_for_errors_and_shutdown() - self._logger.info("Finished syncing") + # Read from the queue until all partitions were generated and read + yield from self._consume_from_queue( + self._queue, + concurrent_stream_processor, + ) + self._threadpool.check_for_errors_and_shutdown() + self._logger.info("Finished syncing") + finally: + unregister_queue() def _submit_initial_partition_generators( self, concurrent_stream_processor: ConcurrentReadProcessor diff --git a/airbyte_cdk/sources/concurrent_source/queue_registry.py b/airbyte_cdk/sources/concurrent_source/queue_registry.py new file mode 100644 index 000000000..569ee4edf --- /dev/null +++ b/airbyte_cdk/sources/concurrent_source/queue_registry.py @@ -0,0 +1,41 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Module-level registry for the concurrent source queue. + +The heartbeat thread in entrypoint.py needs to report queue stats (size, full/empty) +to help diagnose deadlocks. Since the queue is created deep inside ConcurrentSource, +this registry provides a lightweight way to expose it without threading the queue +object through the entire call chain. + +Usage: + # In ConcurrentSource.read(): + register_queue(self._queue) + + # In the heartbeat thread: + q = get_queue() + if q is not None: + print(f"queue_size={q.qsize()} queue_full={q.full()}") +""" + +from queue import Queue +from typing import Optional + +from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem + +_queue: Optional[Queue[QueueItem]] = None + + +def register_queue(queue: Queue[QueueItem]) -> None: + """Register the concurrent source queue for heartbeat monitoring.""" + global _queue + _queue = queue + + +def get_queue() -> Optional[Queue[QueueItem]]: + """Return the registered queue, or None if no concurrent source is active.""" + return _queue + + +def unregister_queue() -> None: + """Clear the registered queue.""" + global _queue + _queue = None diff --git a/airbyte_cdk/sources/message/concurrent_repository.py b/airbyte_cdk/sources/message/concurrent_repository.py index e3bc7116a..ede405fba 100644 --- a/airbyte_cdk/sources/message/concurrent_repository.py +++ b/airbyte_cdk/sources/message/concurrent_repository.py @@ -1,11 +1,11 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. import logging -import os -from queue import Queue +import threading +from collections import deque +from queue import Full, Queue from typing import Callable, Iterable from airbyte_cdk.models import AirbyteMessage, Level -from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.message.repository import LogMessage, MessageRepository from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem @@ -23,25 +23,61 @@ class ConcurrentMessageRepository(MessageRepository): This is particularly important for the connector builder which relies on grouping of messages to organize request/response, pages, and partitions. + + DEADLOCK PREVENTION: + The main thread is the sole consumer of the shared queue. If it calls queue.put() + while the queue is full, it deadlocks — nobody else will drain the queue. + This happens in 3 code paths from _handle_item: + 1. PartitionCompleteSentinel → _on_stream_is_done → ensure_at_least_one_state_emitted → emit_message → queue.put(state) + 2. PartitionGenerationCompletedSentinel → _on_stream_is_done → same path + 3. Partition → on_partition → emit_message(slice_log) → queue.put(log) + To prevent this, the main thread uses non-blocking put(block=False). If the queue + is full, messages are buffered in _pending and drained via consume_queue(), which + the main thread calls after processing every queue item. + Worker threads continue using blocking put() for normal backpressure. """ def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepository): self._queue = queue self._decorated_message_repository = message_repository + # Capture the thread ID of the consumer (main thread) at construction time. + # This is always the main thread because ConcurrentSource.__init__ (and the + # declarative source that creates this repository) runs on the main thread. + self._consumer_thread_id = threading.get_ident() + # Buffer for messages that couldn't be put on the queue from the main thread + # because the queue was full. Drained by consume_queue(). + # deque.append() and deque.popleft() are atomic in CPython (GIL-protected). + self._pending: deque[AirbyteMessage] = deque() + + def _put_on_queue(self, message: AirbyteMessage) -> None: + """Put a message on the shared queue, with deadlock prevention for the main thread.""" + if threading.get_ident() == self._consumer_thread_id: + # Main thread (consumer): non-blocking to prevent self-deadlock. + # If queue is full, buffer the message — it will be drained via consume_queue(). + try: + self._queue.put(message, block=False) + except Full: + self._pending.append(message) + else: + # Worker thread: blocking put for normal backpressure. + self._queue.put(message) def emit_message(self, message: AirbyteMessage) -> None: self._decorated_message_repository.emit_message(message) - for message in self._decorated_message_repository.consume_queue(): - self._queue.put(message) + for msg in self._decorated_message_repository.consume_queue(): + self._put_on_queue(msg) def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: self._decorated_message_repository.log_message(level, message_provider) - for message in self._decorated_message_repository.consume_queue(): - self._queue.put(message) + for msg in self._decorated_message_repository.consume_queue(): + self._put_on_queue(msg) def consume_queue(self) -> Iterable[AirbyteMessage]: """ - This method shouldn't need to be called because as part of emit_message() we are already - loading messages onto the queue processed on the main thread. + Drain any messages that were buffered because the queue was full when the + main thread tried to put them. This is called by the main thread after + processing every queue item (in on_record, on_partition_complete_sentinel, + _on_stream_is_done), ensuring buffered messages are yielded promptly. """ - yield from [] + while self._pending: + yield self._pending.popleft()