Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
ef5d8f6
feat: non-blocking stdout writes to prevent deadlock on pipe backpres…
devin-ai-integration[bot] Mar 13, 2026
0e32124
fix: handle UnsupportedOperation from fileno() in test environments a…
devin-ai-integration[bot] Mar 13, 2026
e676457
fix: use sys.__stdout__ to get real fd when PRINT_BUFFER replaces sys…
devin-ai-integration[bot] Mar 13, 2026
1b95a87
fix: fall back to print() when sys.stdout is replaced (capsys/PRINT_B…
devin-ai-integration[bot] Mar 13, 2026
4d2c36f
fix: detect capsys/redirected stdout via fileno comparison, remove PR…
devin-ai-integration[bot] Mar 13, 2026
649e0e0
fix: use dedicated writer thread instead of non-blocking fd to avoid …
devin-ai-integration[bot] Mar 13, 2026
583e6ee
fix: catch Exception instead of BaseException in writer thread, re-ra…
devin-ai-integration[bot] Mar 13, 2026
b6f3f36
feat: add diagnostic logging to stdout writer thread to track pipe wr…
devin-ai-integration[bot] Mar 13, 2026
75bd890
feat: add stderr heartbeat thread to prove platform never resumes rea…
devin-ai-integration[bot] Mar 17, 2026
2fc96fa
refactor: simplify to heartbeat-only diagnostic (remove writer thread…
devin-ai-integration[bot] Mar 17, 2026
17b4a6f
style: fix ruff formatting
devin-ai-integration[bot] Mar 17, 2026
3a41aaa
style: add explanatory comment to empty except clause
devin-ai-integration[bot] Mar 17, 2026
bf80db5
feat: add thread dumps and queue stats to heartbeat for deadlock diag…
devin-ai-integration[bot] Mar 31, 2026
b7818b0
fix: prevent deadlock when main thread puts on full queue
devin-ai-integration[bot] Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 99 additions & 5 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
import ipaddress
import json
import logging
import os
import os.path
import socket
import sys
import tempfile
import threading
import time
import traceback
from collections import defaultdict
from functools import wraps
from typing import Any, DefaultDict, Iterable, List, Mapping, Optional
Expand Down Expand Up @@ -374,13 +378,103 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]:
def launch(source: Source, args: List[str]) -> None:
source_entrypoint = AirbyteEntrypoint(source)
parsed_args = source_entrypoint.parse_args(args)

# Heartbeat state — shared with the background heartbeat thread.
_HEARTBEAT_INTERVAL_S = 30.0
messages_written = 0
bytes_written = 0
print_blocked = False
print_blocked_since = 0.0
heartbeat_stop = threading.Event()

def _heartbeat() -> None:
"""Emit periodic status to stderr to diagnose stdout pipe blocking and deadlocks.

Writes directly to fd 2 (stderr) which the Kubernetes container
runtime collects independently of the orchestrator reading stdout.

When a stall is detected (message count frozen for 3+ intervals = 90s),
a full thread dump is emitted to help diagnose deadlocks in the
concurrent source worker pool.
"""
from airbyte_cdk.sources.concurrent_source.queue_registry import get_queue

start = time.monotonic()
stderr_fd = 2
last_msgs = 0
stall_count = 0
_STALL_THRESHOLD = 3 # intervals before triggering thread dump (3 * 30s = 90s)
_DUMP_REPEAT_INTERVAL = 10 # re-dump every 10 intervals (~5 min) during ongoing stall

while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S):
now = time.monotonic()
elapsed = now - start

# Detect stall: same message count for multiple consecutive intervals
if messages_written == last_msgs and messages_written > 0:
stall_count += 1
else:
stall_count = 0
last_msgs = messages_written

blocked_str = "YES" if print_blocked else "NO"
blocked_dur = (
f" blocked_since={now - print_blocked_since:.0f}s" if print_blocked else ""
)

# Include queue stats if concurrent source is active
queue_stats = ""
q = get_queue()
if q is not None:
try:
queue_stats = f" queue_size={q.qsize()} queue_full={q.full()}"
except Exception:
pass # Queue methods are best-effort

line = (
f"STDOUT_HEARTBEAT: t={elapsed:.0f}s "
f"msgs={messages_written} bytes={bytes_written} "
f"print_blocked={blocked_str}{blocked_dur}"
f"{queue_stats}\n"
)

# Dump all thread stacks when stall is detected, then periodically during ongoing stall
if stall_count == _STALL_THRESHOLD or (
stall_count > _STALL_THRESHOLD
and (stall_count - _STALL_THRESHOLD) % _DUMP_REPEAT_INTERVAL == 0
):
line += "=== THREAD DUMP (stall detected) ===\n"
thread_names = {t.ident: t.name for t in threading.enumerate()}
for thread_id, frame in sys._current_frames().items():
thread_name = thread_names.get(thread_id, "unknown")
line += f"\nThread {thread_name} ({thread_id}):\n"
line += "".join(traceback.format_stack(frame))
line += "=== END THREAD DUMP ===\n"

try:
os.write(stderr_fd, line.encode())
except OSError:
pass # Best-effort diagnostic — if stderr (fd 2) is broken, silently give up.

heartbeat_thread = threading.Thread(target=_heartbeat, name="stdout-heartbeat", daemon=True)
heartbeat_thread.start()

# temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs
# Refer to: https://github.com/airbytehq/oncall/issues/6235
with PRINT_BUFFER:
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
print(f"{message}\n", end="")
try:
with PRINT_BUFFER:
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
data = f"{message}\n"
print_blocked = True
print_blocked_since = time.monotonic()
print(data, end="")
print_blocked = False
messages_written += 1
bytes_written += len(data)
finally:
heartbeat_stop.set()


def _init_internal_request_filter() -> None:
Expand Down
48 changes: 27 additions & 21 deletions airbyte_cdk/sources/concurrent_source/concurrent_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
PartitionGenerationCompletedSentinel,
)
from airbyte_cdk.sources.concurrent_source.queue_registry import register_queue, unregister_queue
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
Expand Down Expand Up @@ -106,29 +107,34 @@ def read(
streams: List[AbstractStream],
) -> Iterator[AirbyteMessage]:
self._logger.info("Starting syncing")
concurrent_stream_processor = ConcurrentReadProcessor(
streams,
PartitionEnqueuer(self._queue, self._threadpool),
self._threadpool,
self._logger,
self._slice_logger,
self._message_repository,
PartitionReader(
self._queue,
PartitionLogger(self._slice_logger, self._logger, self._message_repository),
),
)
# Register queue so the heartbeat thread can report queue stats for deadlock diagnosis
register_queue(self._queue)
try:
concurrent_stream_processor = ConcurrentReadProcessor(
streams,
PartitionEnqueuer(self._queue, self._threadpool),
self._threadpool,
self._logger,
self._slice_logger,
self._message_repository,
PartitionReader(
self._queue,
PartitionLogger(self._slice_logger, self._logger, self._message_repository),
),
)

# Enqueue initial partition generation tasks
yield from self._submit_initial_partition_generators(concurrent_stream_processor)
# Enqueue initial partition generation tasks
yield from self._submit_initial_partition_generators(concurrent_stream_processor)

# Read from the queue until all partitions were generated and read
yield from self._consume_from_queue(
self._queue,
concurrent_stream_processor,
)
self._threadpool.check_for_errors_and_shutdown()
self._logger.info("Finished syncing")
# Read from the queue until all partitions were generated and read
yield from self._consume_from_queue(
self._queue,
concurrent_stream_processor,
)
self._threadpool.check_for_errors_and_shutdown()
self._logger.info("Finished syncing")
finally:
unregister_queue()

def _submit_initial_partition_generators(
self, concurrent_stream_processor: ConcurrentReadProcessor
Expand Down
41 changes: 41 additions & 0 deletions airbyte_cdk/sources/concurrent_source/queue_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Module-level registry for the concurrent source queue.

The heartbeat thread in entrypoint.py needs to report queue stats (size, full/empty)
to help diagnose deadlocks. Since the queue is created deep inside ConcurrentSource,
this registry provides a lightweight way to expose it without threading the queue
object through the entire call chain.

Usage:
# In ConcurrentSource.read():
register_queue(self._queue)

# In the heartbeat thread:
q = get_queue()
if q is not None:
print(f"queue_size={q.qsize()} queue_full={q.full()}")
"""

from queue import Queue
from typing import Optional

from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem

_queue: Optional[Queue[QueueItem]] = None


def register_queue(queue: Queue[QueueItem]) -> None:
"""Register the concurrent source queue for heartbeat monitoring."""
global _queue
_queue = queue


def get_queue() -> Optional[Queue[QueueItem]]:
"""Return the registered queue, or None if no concurrent source is active."""
return _queue


def unregister_queue() -> None:
"""Clear the registered queue."""
global _queue
_queue = None
56 changes: 46 additions & 10 deletions airbyte_cdk/sources/message/concurrent_repository.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
import logging
import os
from queue import Queue
import threading
from collections import deque
from queue import Full, Queue
from typing import Callable, Iterable

from airbyte_cdk.models import AirbyteMessage, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.message.repository import LogMessage, MessageRepository
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem

Expand All @@ -23,25 +23,61 @@ class ConcurrentMessageRepository(MessageRepository):

This is particularly important for the connector builder which relies on grouping
of messages to organize request/response, pages, and partitions.

DEADLOCK PREVENTION:
The main thread is the sole consumer of the shared queue. If it calls queue.put()
while the queue is full, it deadlocks — nobody else will drain the queue.
This happens in 3 code paths from _handle_item:
1. PartitionCompleteSentinel → _on_stream_is_done → ensure_at_least_one_state_emitted → emit_message → queue.put(state)
2. PartitionGenerationCompletedSentinel → _on_stream_is_done → same path
3. Partition → on_partition → emit_message(slice_log) → queue.put(log)
To prevent this, the main thread uses non-blocking put(block=False). If the queue
is full, messages are buffered in _pending and drained via consume_queue(), which
the main thread calls after processing every queue item.
Worker threads continue using blocking put() for normal backpressure.
"""

def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepository):
self._queue = queue
self._decorated_message_repository = message_repository
# Capture the thread ID of the consumer (main thread) at construction time.
# This is always the main thread because ConcurrentSource.__init__ (and the
# declarative source that creates this repository) runs on the main thread.
self._consumer_thread_id = threading.get_ident()
# Buffer for messages that couldn't be put on the queue from the main thread
# because the queue was full. Drained by consume_queue().
# deque.append() and deque.popleft() are atomic in CPython (GIL-protected).
self._pending: deque[AirbyteMessage] = deque()

def _put_on_queue(self, message: AirbyteMessage) -> None:
"""Put a message on the shared queue, with deadlock prevention for the main thread."""
if threading.get_ident() == self._consumer_thread_id:
# Main thread (consumer): non-blocking to prevent self-deadlock.
# If queue is full, buffer the message — it will be drained via consume_queue().
try:
self._queue.put(message, block=False)
except Full:
self._pending.append(message)
else:
# Worker thread: blocking put for normal backpressure.
self._queue.put(message)

def emit_message(self, message: AirbyteMessage) -> None:
self._decorated_message_repository.emit_message(message)
for message in self._decorated_message_repository.consume_queue():
self._queue.put(message)
for msg in self._decorated_message_repository.consume_queue():
self._put_on_queue(msg)

def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
self._decorated_message_repository.log_message(level, message_provider)
for message in self._decorated_message_repository.consume_queue():
self._queue.put(message)
for msg in self._decorated_message_repository.consume_queue():
self._put_on_queue(msg)

def consume_queue(self) -> Iterable[AirbyteMessage]:
"""
This method shouldn't need to be called because as part of emit_message() we are already
loading messages onto the queue processed on the main thread.
Drain any messages that were buffered because the queue was full when the
main thread tried to put them. This is called by the main thread after
processing every queue item (in on_record, on_partition_complete_sentinel,
_on_stream_is_done), ensuring buffered messages are yielded promptly.
"""
yield from []
while self._pending:
yield self._pending.popleft()
Loading