From ef5d8f687afc30fca0a48d01d0645f010e01a283 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 14:43:01 +0000 Subject: [PATCH 01/14] feat: non-blocking stdout writes to prevent deadlock on pipe backpressure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the Airbyte platform pauses reading from the source container's stdout pipe, the main thread's print() call blocks in an OS-level write() syscall. This stalls the record queue consumer, filling the bounded queue and blocking all worker threads — a complete deadlock. This change replaces blocking print() with non-blocking os.write() using select() to wait for the pipe to become writable. The main thread stays in a Python-level retry loop instead of getting stuck in a kernel syscall. When the platform resumes reading, select() returns, the write completes, and the pipeline resumes automatically. Key properties: - Memory stays bounded (queue maxsize=10,000 unchanged) - No deadlock (main thread never stuck in blocking syscall) - Automatic recovery when platform resumes reading - 600s watchdog raises RuntimeError if pipe stays blocked Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 77 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d6c92f9fa..45e4c542e 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -7,10 +7,13 @@ import ipaddress import json import logging +import os import os.path +import select import socket import sys import tempfile +import time from collections import defaultdict from functools import wraps from typing import Any, DefaultDict, Iterable, List, Mapping, Optional @@ -374,13 +377,77 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: def launch(source: Source, args: List[str]) -> None: source_entrypoint = AirbyteEntrypoint(source) parsed_args = source_entrypoint.parse_args(args) - # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs - # Refer to: https://github.com/airbytehq/oncall/issues/6235 with PRINT_BUFFER: - for message in source_entrypoint.run(parsed_args): - # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and - # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time + _nonblocking_write_to_stdout(source_entrypoint.run(parsed_args)) + + +def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: + """Write messages to stdout using non-blocking I/O to prevent deadlocks. + + When the Airbyte platform pauses reading from the source container's + stdout pipe, a blocking ``write()`` stalls the main thread. Since the + main thread is also responsible for draining the internal record queue, + this causes a cascading deadlock: the queue fills, worker threads block + on ``queue.put()``, and the entire process hangs. + + This function sets stdout to non-blocking mode so that ``os.write()`` + raises ``BlockingIOError`` instead of blocking when the pipe buffer is + full. It then uses ``select()`` to wait (with a timeout) until the fd + is writable again. The main thread remains in a Python-level retry + loop it controls, so it never gets stuck in a kernel-level syscall. + + Memory stays bounded because the upstream record queue keeps its + default bounded size (10,000 items). When stdout is blocked the main + thread pauses here, the queue fills naturally, and worker threads + block on ``queue.put()`` with their own timeouts. When the platform + resumes reading, ``select()`` returns, the write completes, the main + thread resumes draining the queue, and workers unblock automatically. + """ + stdout_fd = sys.stdout.fileno() + original_blocking = os.get_blocking(stdout_fd) + + try: + os.set_blocking(stdout_fd, False) + except OSError: + # Fallback: if we cannot set non-blocking (e.g. redirected to + # a file or in a test environment), just write normally. + for message in messages: print(f"{message}\n", end="") + return + + try: + for message in messages: + data = f"{message}\n".encode() + _write_all_nonblocking(stdout_fd, data) + finally: + try: + os.set_blocking(stdout_fd, original_blocking) + except OSError: + pass + + +def _write_all_nonblocking(fd: int, data: bytes) -> None: + """Write all bytes to a non-blocking fd, retrying with select on EAGAIN.""" + total_written = 0 + last_progress = time.monotonic() + + while total_written < len(data): + try: + written = os.write(fd, data[total_written:]) + total_written += written + last_progress = time.monotonic() + except BlockingIOError: + # Pipe buffer is full. Wait up to 1 second for it to become + # writable, then retry. The short timeout keeps the main + # thread responsive and allows periodic stall detection. + _, writable, _ = select.select([], [fd], [], 1.0) + if not writable: + elapsed = time.monotonic() - last_progress + if elapsed > 600: + raise RuntimeError( + f"stdout pipe blocked for {elapsed:.0f}s with no progress. " + "The platform is not reading from the source container pipe." + ) def _init_internal_request_filter() -> None: From 0e3212402ad3685e93507cf829531d5875facd9d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 14:49:10 +0000 Subject: [PATCH 02/14] fix: handle UnsupportedOperation from fileno() in test environments and log restore failures Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 45e4c542e..8e5972425 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -403,14 +403,14 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: resumes reading, ``select()`` returns, the write completes, the main thread resumes draining the queue, and workers unblock automatically. """ - stdout_fd = sys.stdout.fileno() - original_blocking = os.get_blocking(stdout_fd) - try: + stdout_fd = sys.stdout.fileno() + original_blocking = os.get_blocking(stdout_fd) os.set_blocking(stdout_fd, False) except OSError: - # Fallback: if we cannot set non-blocking (e.g. redirected to - # a file or in a test environment), just write normally. + # Fallback: if we cannot set non-blocking (e.g. pytest captures + # stdout with a StringIO that has no fileno, or the fd does not + # support non-blocking mode), just write normally. for message in messages: print(f"{message}\n", end="") return @@ -423,7 +423,7 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: try: os.set_blocking(stdout_fd, original_blocking) except OSError: - pass + logger.debug("Failed to restore stdout blocking mode", exc_info=True) def _write_all_nonblocking(fd: int, data: bytes) -> None: From e676457fdd0051742478d7c58a72741ce6798e07 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:06:06 +0000 Subject: [PATCH 03/14] fix: use sys.__stdout__ to get real fd when PRINT_BUFFER replaces sys.stdout Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 8e5972425..ad8300a67 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -403,13 +403,21 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: resumes reading, ``select()`` returns, the write completes, the main thread resumes draining the queue, and workers unblock automatically. """ + # Use the *real* stdout (sys.__stdout__) rather than sys.stdout, + # because PRINT_BUFFER replaces sys.stdout with a StringIO wrapper + # that has no fileno(). + real_stdout = sys.__stdout__ + if real_stdout is None or not hasattr(real_stdout, "fileno"): + for message in messages: + print(f"{message}\n", end="") + return + try: - stdout_fd = sys.stdout.fileno() + stdout_fd = real_stdout.fileno() original_blocking = os.get_blocking(stdout_fd) os.set_blocking(stdout_fd, False) except OSError: - # Fallback: if we cannot set non-blocking (e.g. pytest captures - # stdout with a StringIO that has no fileno, or the fd does not + # Fallback: if we cannot set non-blocking (e.g. the fd does not # support non-blocking mode), just write normally. for message in messages: print(f"{message}\n", end="") From 1b95a875e46843315eb06af4f3089404f273b587 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:12:10 +0000 Subject: [PATCH 04/14] fix: fall back to print() when sys.stdout is replaced (capsys/PRINT_BUFFER) Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index ad8300a67..54ddc4345 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -403,11 +403,12 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: resumes reading, ``select()`` returns, the write completes, the main thread resumes draining the queue, and workers unblock automatically. """ - # Use the *real* stdout (sys.__stdout__) rather than sys.stdout, - # because PRINT_BUFFER replaces sys.stdout with a StringIO wrapper - # that has no fileno(). + # Only use non-blocking I/O when stdout is the real file descriptor. + # In test environments (pytest capsys) or when PRINT_BUFFER is active, + # sys.stdout is replaced with a wrapper. Writing to sys.__stdout__ + # via os.write() would bypass the capture, so fall back to print(). real_stdout = sys.__stdout__ - if real_stdout is None or not hasattr(real_stdout, "fileno"): + if real_stdout is None or not hasattr(real_stdout, "fileno") or sys.stdout is not real_stdout: for message in messages: print(f"{message}\n", end="") return From 4d2c36f9a9e719b2048816bd9c6463a71aae5266 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:13:50 +0000 Subject: [PATCH 05/14] fix: detect capsys/redirected stdout via fileno comparison, remove PRINT_BUFFER wrapper Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 45 ++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 54ddc4345..cc590fd3b 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -377,8 +377,7 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: def launch(source: Source, args: List[str]) -> None: source_entrypoint = AirbyteEntrypoint(source) parsed_args = source_entrypoint.parse_args(args) - with PRINT_BUFFER: - _nonblocking_write_to_stdout(source_entrypoint.run(parsed_args)) + _nonblocking_write_to_stdout(source_entrypoint.run(parsed_args)) def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: @@ -403,20 +402,42 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: resumes reading, ``select()`` returns, the write completes, the main thread resumes draining the queue, and workers unblock automatically. """ - # Only use non-blocking I/O when stdout is the real file descriptor. - # In test environments (pytest capsys) or when PRINT_BUFFER is active, - # sys.stdout is replaced with a wrapper. Writing to sys.__stdout__ - # via os.write() would bypass the capture, so fall back to print(). + # We need to write to the *real* stdout fd for non-blocking I/O. + # However, in test environments (pytest capsys) or other wrappers, + # sys.stdout may have been replaced. If sys.stdout.fileno() fails + # or doesn't match sys.__stdout__.fileno(), something is capturing + # output and we must fall back to print() so it goes through the + # capture layer. + try: + current_fd = sys.stdout.fileno() + except (OSError, AttributeError, ValueError): + # capsys, PRINT_BUFFER, or other wrapper — no real fd available. + for message in messages: + print(f"{message}\n", end="") + return + real_stdout = sys.__stdout__ - if real_stdout is None or not hasattr(real_stdout, "fileno") or sys.stdout is not real_stdout: + if real_stdout is None or not hasattr(real_stdout, "fileno"): + for message in messages: + print(f"{message}\n", end="") + return + + try: + real_fd = real_stdout.fileno() + except (OSError, AttributeError, ValueError): + for message in messages: + print(f"{message}\n", end="") + return + + if current_fd != real_fd: + # stdout has been redirected; fall back to print(). for message in messages: print(f"{message}\n", end="") return try: - stdout_fd = real_stdout.fileno() - original_blocking = os.get_blocking(stdout_fd) - os.set_blocking(stdout_fd, False) + original_blocking = os.get_blocking(real_fd) + os.set_blocking(real_fd, False) except OSError: # Fallback: if we cannot set non-blocking (e.g. the fd does not # support non-blocking mode), just write normally. @@ -427,10 +448,10 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: try: for message in messages: data = f"{message}\n".encode() - _write_all_nonblocking(stdout_fd, data) + _write_all_nonblocking(real_fd, data) finally: try: - os.set_blocking(stdout_fd, original_blocking) + os.set_blocking(real_fd, original_blocking) except OSError: logger.debug("Failed to restore stdout blocking mode", exc_info=True) From 649e0e051814d805759ad322ddcaa9c0e5318c3f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 18:02:50 +0000 Subject: [PATCH 06/14] fix: use dedicated writer thread instead of non-blocking fd to avoid global BlockingIOError Setting os.set_blocking(fd, False) is a process-wide change that causes BlockingIOError in other threads (logging via print_buffer, worker threads). Instead, use a dedicated stdout-writer thread that does blocking os.write() calls. If the pipe is full, only the writer thread stalls - the main thread continues draining the record queue. Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 132 +++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 65 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index cc590fd3b..b500c7e17 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -9,10 +9,11 @@ import logging import os import os.path -import select +import queue import socket import sys import tempfile +import threading import time from collections import defaultdict from functools import wraps @@ -381,37 +382,32 @@ def launch(source: Source, args: List[str]) -> None: def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: - """Write messages to stdout using non-blocking I/O to prevent deadlocks. + """Write messages to stdout via a dedicated writer thread to prevent deadlocks. When the Airbyte platform pauses reading from the source container's - stdout pipe, a blocking ``write()`` stalls the main thread. Since the - main thread is also responsible for draining the internal record queue, - this causes a cascading deadlock: the queue fills, worker threads block - on ``queue.put()``, and the entire process hangs. - - This function sets stdout to non-blocking mode so that ``os.write()`` - raises ``BlockingIOError`` instead of blocking when the pipe buffer is - full. It then uses ``select()`` to wait (with a timeout) until the fd - is writable again. The main thread remains in a Python-level retry - loop it controls, so it never gets stuck in a kernel-level syscall. - - Memory stays bounded because the upstream record queue keeps its - default bounded size (10,000 items). When stdout is blocked the main - thread pauses here, the queue fills naturally, and worker threads - block on ``queue.put()`` with their own timeouts. When the platform - resumes reading, ``select()`` returns, the write completes, the main - thread resumes draining the queue, and workers unblock automatically. + stdout pipe, a blocking ``write()`` in the main thread stalls message + processing. Since the main thread is also responsible for draining the + internal record queue, this causes a cascading deadlock: the queue + fills, worker threads block on ``queue.put()``, and the entire process + hangs. + + This function decouples stdout writing from the main thread by routing + messages through a bounded internal queue to a dedicated writer thread. + The writer thread performs normal blocking ``os.write()`` calls; if the + pipe is full, only the writer thread stalls — the main thread continues + iterating the message generator (which drains the record queue). + + When the internal write queue fills (because the writer is blocked on + the pipe), the main thread retries ``queue.put()`` with a 1-second + timeout. A watchdog detects if no message has been accepted for 600 + seconds and raises ``RuntimeError`` to terminate the process cleanly. """ - # We need to write to the *real* stdout fd for non-blocking I/O. - # However, in test environments (pytest capsys) or other wrappers, - # sys.stdout may have been replaced. If sys.stdout.fileno() fails - # or doesn't match sys.__stdout__.fileno(), something is capturing - # output and we must fall back to print() so it goes through the - # capture layer. + # In test environments (pytest capsys) or wrappers like PRINT_BUFFER, + # sys.stdout may have been replaced. Detect this via fileno() and + # fall back to print() so output goes through the capture layer. try: current_fd = sys.stdout.fileno() except (OSError, AttributeError, ValueError): - # capsys, PRINT_BUFFER, or other wrapper — no real fd available. for message in messages: print(f"{message}\n", end="") return @@ -430,54 +426,60 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: return if current_fd != real_fd: - # stdout has been redirected; fall back to print(). for message in messages: print(f"{message}\n", end="") return - try: - original_blocking = os.get_blocking(real_fd) - os.set_blocking(real_fd, False) - except OSError: - # Fallback: if we cannot set non-blocking (e.g. the fd does not - # support non-blocking mode), just write normally. - for message in messages: - print(f"{message}\n", end="") - return + # Bounded queue decouples the main thread from stdout I/O. + # The writer thread does blocking writes; if the pipe is full only the + # writer stalls — the main thread keeps draining the record queue. + _WRITE_QUEUE_SIZE = 1000 + _WATCHDOG_TIMEOUT_S = 600 + write_queue: queue.Queue[Optional[bytes]] = queue.Queue(maxsize=_WRITE_QUEUE_SIZE) + writer_error: List[BaseException] = [] + + def _stdout_writer() -> None: + """Dedicated thread that writes queued messages to stdout.""" + try: + while True: + data = write_queue.get() + if data is None: + break + total = 0 + while total < len(data): + written = os.write(real_fd, data[total:]) + total += written + except BaseException as exc: + writer_error.append(exc) + + writer = threading.Thread(target=_stdout_writer, name="stdout-writer", daemon=True) + writer.start() try: + last_progress = time.monotonic() for message in messages: + if writer_error: + raise writer_error[0] data = f"{message}\n".encode() - _write_all_nonblocking(real_fd, data) + while True: + try: + write_queue.put(data, timeout=1.0) + last_progress = time.monotonic() + break + except queue.Full: + if writer_error: + raise writer_error[0] + elapsed = time.monotonic() - last_progress + if elapsed > _WATCHDOG_TIMEOUT_S: + raise RuntimeError( + f"stdout pipe blocked for {elapsed:.0f}s with no progress " + f"(watchdog timeout={_WATCHDOG_TIMEOUT_S}s). " + "Terminating process to prevent indefinite hang." + ) finally: - try: - os.set_blocking(real_fd, original_blocking) - except OSError: - logger.debug("Failed to restore stdout blocking mode", exc_info=True) - - -def _write_all_nonblocking(fd: int, data: bytes) -> None: - """Write all bytes to a non-blocking fd, retrying with select on EAGAIN.""" - total_written = 0 - last_progress = time.monotonic() - - while total_written < len(data): - try: - written = os.write(fd, data[total_written:]) - total_written += written - last_progress = time.monotonic() - except BlockingIOError: - # Pipe buffer is full. Wait up to 1 second for it to become - # writable, then retry. The short timeout keeps the main - # thread responsive and allows periodic stall detection. - _, writable, _ = select.select([], [fd], [], 1.0) - if not writable: - elapsed = time.monotonic() - last_progress - if elapsed > 600: - raise RuntimeError( - f"stdout pipe blocked for {elapsed:.0f}s with no progress. " - "The platform is not reading from the source container pipe." - ) + # Signal writer to drain remaining messages and exit. + write_queue.put(None) + writer.join(timeout=30) def _init_internal_request_filter() -> None: From 583e6ee633806eb335301868151eb37f97acba2f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 18:05:20 +0000 Subject: [PATCH 07/14] fix: catch Exception instead of BaseException in writer thread, re-raise KeyboardInterrupt/SystemExit Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index b500c7e17..b6313850b 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -436,7 +436,7 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: _WRITE_QUEUE_SIZE = 1000 _WATCHDOG_TIMEOUT_S = 600 write_queue: queue.Queue[Optional[bytes]] = queue.Queue(maxsize=_WRITE_QUEUE_SIZE) - writer_error: List[BaseException] = [] + writer_error: List[Exception] = [] def _stdout_writer() -> None: """Dedicated thread that writes queued messages to stdout.""" @@ -449,7 +449,9 @@ def _stdout_writer() -> None: while total < len(data): written = os.write(real_fd, data[total:]) total += written - except BaseException as exc: + except (KeyboardInterrupt, SystemExit): + raise + except Exception as exc: writer_error.append(exc) writer = threading.Thread(target=_stdout_writer, name="stdout-writer", daemon=True) From b6f3f36efe8c8e606419f8e2e1866930b397a765 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 19:08:12 +0000 Subject: [PATCH 08/14] feat: add diagnostic logging to stdout writer thread to track pipe write timing Logs when os.write() blocks for >5s (indicates platform paused reading), and logs every 30s when write_queue is full. This will help validate whether the platform ever resumes reading from the pipe after a pause. Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 46 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index b6313850b..e0c672906 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -438,8 +438,17 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: write_queue: queue.Queue[Optional[bytes]] = queue.Queue(maxsize=_WRITE_QUEUE_SIZE) writer_error: List[Exception] = [] + logger = logging.getLogger("airbyte_cdk.stdout_writer") + _BLOCK_LOG_THRESHOLD_S = 5.0 # log when a single write blocks longer than this + messages_written = 0 + bytes_written = 0 + last_write_ts = time.monotonic() + total_blocked_s = 0.0 + block_count = 0 + def _stdout_writer() -> None: """Dedicated thread that writes queued messages to stdout.""" + nonlocal messages_written, bytes_written, last_write_ts, total_blocked_s, block_count try: while True: data = write_queue.get() @@ -447,8 +456,28 @@ def _stdout_writer() -> None: break total = 0 while total < len(data): + before = time.monotonic() written = os.write(real_fd, data[total:]) + elapsed = time.monotonic() - before total += written + if elapsed >= _BLOCK_LOG_THRESHOLD_S: + block_count += 1 + total_blocked_s += elapsed + logger.warning( + "STDOUT_WRITER: os.write() blocked for %.1fs " + "(wrote %d bytes). block_count=%d total_blocked=%.1fs " + "messages_written=%d bytes_written=%d queue_size=%d", + elapsed, + written, + block_count, + total_blocked_s, + messages_written, + bytes_written, + write_queue.qsize(), + ) + messages_written += 1 + bytes_written += len(data) + last_write_ts = time.monotonic() except (KeyboardInterrupt, SystemExit): raise except Exception as exc: @@ -472,10 +501,27 @@ def _stdout_writer() -> None: if writer_error: raise writer_error[0] elapsed = time.monotonic() - last_progress + if int(elapsed) % 30 == 0 and int(elapsed) > 0: + logger.warning( + "STDOUT_WRITER: write_queue full for %.0fs. " + "writer_messages=%d writer_bytes=%d " + "writer_blocks=%d writer_total_blocked=%.1fs " + "writer_last_write=%.1fs_ago queue_size=%d", + elapsed, + messages_written, + bytes_written, + block_count, + total_blocked_s, + time.monotonic() - last_write_ts, + write_queue.qsize(), + ) if elapsed > _WATCHDOG_TIMEOUT_S: raise RuntimeError( f"stdout pipe blocked for {elapsed:.0f}s with no progress " f"(watchdog timeout={_WATCHDOG_TIMEOUT_S}s). " + f"Writer stats: messages={messages_written} bytes={bytes_written} " + f"blocks={block_count} total_blocked={total_blocked_s:.1f}s " + f"last_write={time.monotonic() - last_write_ts:.1f}s ago. " "Terminating process to prevent indefinite hang." ) finally: From 75bd8909d22f2ddf1adc5871e77437d482906026 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 13:52:44 +0000 Subject: [PATCH 09/14] feat: add stderr heartbeat thread to prove platform never resumes reading from stdout pipe Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 49 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index e0c672906..f2bb99565 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -440,15 +440,51 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: logger = logging.getLogger("airbyte_cdk.stdout_writer") _BLOCK_LOG_THRESHOLD_S = 5.0 # log when a single write blocks longer than this + _HEARTBEAT_INTERVAL_S = 30.0 # emit a heartbeat to stderr every 30s messages_written = 0 bytes_written = 0 last_write_ts = time.monotonic() total_blocked_s = 0.0 block_count = 0 + pipe_blocked = False # True while os.write() is in progress + pipe_blocked_since = 0.0 # monotonic timestamp when current os.write() started + heartbeat_stop = threading.Event() + + def _heartbeat() -> None: + """Emit periodic status to stderr so we can prove pipe-blocking in Cloud logs. + + This thread writes directly to fd 2 (stderr) which is collected by the + Kubernetes container runtime independently of the orchestrator that reads + stdout. Even when the orchestrator stops reading stdout, these heartbeat + lines should still appear in the Cloud job logs. + """ + start = time.monotonic() + stderr_fd = 2 # write directly to fd 2, bypassing Python buffering + while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S): + now = time.monotonic() + elapsed_total = now - start + blocked_str = "YES" if pipe_blocked else "NO" + blocked_dur = f" blocked_since={now - pipe_blocked_since:.0f}s" if pipe_blocked else "" + line = ( + f"STDOUT_WRITER_HEARTBEAT: t={elapsed_total:.0f}s " + f"msgs={messages_written} bytes={bytes_written} " + f"pipe_blocked={blocked_str}{blocked_dur} " + f"queue={write_queue.qsize()}/{_WRITE_QUEUE_SIZE}\n" + ) + try: + os.write(stderr_fd, line.encode()) + except OSError: + pass # stderr itself might be broken; nothing we can do + + heartbeat_thread = threading.Thread( + target=_heartbeat, name="stdout-writer-heartbeat", daemon=True + ) + heartbeat_thread.start() def _stdout_writer() -> None: """Dedicated thread that writes queued messages to stdout.""" nonlocal messages_written, bytes_written, last_write_ts, total_blocked_s, block_count + nonlocal pipe_blocked, pipe_blocked_since try: while True: data = write_queue.get() @@ -456,9 +492,12 @@ def _stdout_writer() -> None: break total = 0 while total < len(data): - before = time.monotonic() + pipe_blocked = True + pipe_blocked_since = time.monotonic() + before = pipe_blocked_since written = os.write(real_fd, data[total:]) elapsed = time.monotonic() - before + pipe_blocked = False total += written if elapsed >= _BLOCK_LOG_THRESHOLD_S: block_count += 1 @@ -485,6 +524,13 @@ def _stdout_writer() -> None: writer = threading.Thread(target=_stdout_writer, name="stdout-writer", daemon=True) writer.start() + logger.info( + "STDOUT_WRITER: started writer_thread fd=%d queue_size=%d watchdog=%ds heartbeat=%ds", + real_fd, + _WRITE_QUEUE_SIZE, + _WATCHDOG_TIMEOUT_S, + int(_HEARTBEAT_INTERVAL_S), + ) try: last_progress = time.monotonic() @@ -525,6 +571,7 @@ def _stdout_writer() -> None: "Terminating process to prevent indefinite hang." ) finally: + heartbeat_stop.set() # Signal writer to drain remaining messages and exit. write_queue.put(None) writer.join(timeout=30) From 2fc96fa4bf09e9da10214e539a48c8b9c400db96 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:30:59 +0000 Subject: [PATCH 10/14] refactor: simplify to heartbeat-only diagnostic (remove writer thread/queue/watchdog) Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 203 ++++++-------------------------------- 1 file changed, 31 insertions(+), 172 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index f2bb99565..98018e0f4 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -9,7 +9,6 @@ import logging import os import os.path -import queue import socket import sys import tempfile @@ -378,203 +377,63 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: def launch(source: Source, args: List[str]) -> None: source_entrypoint = AirbyteEntrypoint(source) parsed_args = source_entrypoint.parse_args(args) - _nonblocking_write_to_stdout(source_entrypoint.run(parsed_args)) - -def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: - """Write messages to stdout via a dedicated writer thread to prevent deadlocks. - - When the Airbyte platform pauses reading from the source container's - stdout pipe, a blocking ``write()`` in the main thread stalls message - processing. Since the main thread is also responsible for draining the - internal record queue, this causes a cascading deadlock: the queue - fills, worker threads block on ``queue.put()``, and the entire process - hangs. - - This function decouples stdout writing from the main thread by routing - messages through a bounded internal queue to a dedicated writer thread. - The writer thread performs normal blocking ``os.write()`` calls; if the - pipe is full, only the writer thread stalls — the main thread continues - iterating the message generator (which drains the record queue). - - When the internal write queue fills (because the writer is blocked on - the pipe), the main thread retries ``queue.put()`` with a 1-second - timeout. A watchdog detects if no message has been accepted for 600 - seconds and raises ``RuntimeError`` to terminate the process cleanly. - """ - # In test environments (pytest capsys) or wrappers like PRINT_BUFFER, - # sys.stdout may have been replaced. Detect this via fileno() and - # fall back to print() so output goes through the capture layer. - try: - current_fd = sys.stdout.fileno() - except (OSError, AttributeError, ValueError): - for message in messages: - print(f"{message}\n", end="") - return - - real_stdout = sys.__stdout__ - if real_stdout is None or not hasattr(real_stdout, "fileno"): - for message in messages: - print(f"{message}\n", end="") - return - - try: - real_fd = real_stdout.fileno() - except (OSError, AttributeError, ValueError): - for message in messages: - print(f"{message}\n", end="") - return - - if current_fd != real_fd: - for message in messages: - print(f"{message}\n", end="") - return - - # Bounded queue decouples the main thread from stdout I/O. - # The writer thread does blocking writes; if the pipe is full only the - # writer stalls — the main thread keeps draining the record queue. - _WRITE_QUEUE_SIZE = 1000 - _WATCHDOG_TIMEOUT_S = 600 - write_queue: queue.Queue[Optional[bytes]] = queue.Queue(maxsize=_WRITE_QUEUE_SIZE) - writer_error: List[Exception] = [] - - logger = logging.getLogger("airbyte_cdk.stdout_writer") - _BLOCK_LOG_THRESHOLD_S = 5.0 # log when a single write blocks longer than this - _HEARTBEAT_INTERVAL_S = 30.0 # emit a heartbeat to stderr every 30s + # Heartbeat state — shared with the background heartbeat thread. + _HEARTBEAT_INTERVAL_S = 30.0 messages_written = 0 bytes_written = 0 - last_write_ts = time.monotonic() - total_blocked_s = 0.0 - block_count = 0 - pipe_blocked = False # True while os.write() is in progress - pipe_blocked_since = 0.0 # monotonic timestamp when current os.write() started + print_blocked = False + print_blocked_since = 0.0 heartbeat_stop = threading.Event() def _heartbeat() -> None: - """Emit periodic status to stderr so we can prove pipe-blocking in Cloud logs. + """Emit periodic status to stderr to diagnose stdout pipe blocking. - This thread writes directly to fd 2 (stderr) which is collected by the - Kubernetes container runtime independently of the orchestrator that reads - stdout. Even when the orchestrator stops reading stdout, these heartbeat - lines should still appear in the Cloud job logs. + Writes directly to fd 2 (stderr) which the Kubernetes container + runtime collects independently of the orchestrator reading stdout. """ start = time.monotonic() - stderr_fd = 2 # write directly to fd 2, bypassing Python buffering + stderr_fd = 2 while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S): now = time.monotonic() - elapsed_total = now - start - blocked_str = "YES" if pipe_blocked else "NO" - blocked_dur = f" blocked_since={now - pipe_blocked_since:.0f}s" if pipe_blocked else "" + elapsed = now - start + blocked_str = "YES" if print_blocked else "NO" + blocked_dur = ( + f" blocked_since={now - print_blocked_since:.0f}s" + if print_blocked + else "" + ) line = ( - f"STDOUT_WRITER_HEARTBEAT: t={elapsed_total:.0f}s " + f"STDOUT_HEARTBEAT: t={elapsed:.0f}s " f"msgs={messages_written} bytes={bytes_written} " - f"pipe_blocked={blocked_str}{blocked_dur} " - f"queue={write_queue.qsize()}/{_WRITE_QUEUE_SIZE}\n" + f"print_blocked={blocked_str}{blocked_dur}\n" ) try: os.write(stderr_fd, line.encode()) except OSError: - pass # stderr itself might be broken; nothing we can do + pass heartbeat_thread = threading.Thread( - target=_heartbeat, name="stdout-writer-heartbeat", daemon=True + target=_heartbeat, name="stdout-heartbeat", daemon=True ) heartbeat_thread.start() - def _stdout_writer() -> None: - """Dedicated thread that writes queued messages to stdout.""" - nonlocal messages_written, bytes_written, last_write_ts, total_blocked_s, block_count - nonlocal pipe_blocked, pipe_blocked_since - try: - while True: - data = write_queue.get() - if data is None: - break - total = 0 - while total < len(data): - pipe_blocked = True - pipe_blocked_since = time.monotonic() - before = pipe_blocked_since - written = os.write(real_fd, data[total:]) - elapsed = time.monotonic() - before - pipe_blocked = False - total += written - if elapsed >= _BLOCK_LOG_THRESHOLD_S: - block_count += 1 - total_blocked_s += elapsed - logger.warning( - "STDOUT_WRITER: os.write() blocked for %.1fs " - "(wrote %d bytes). block_count=%d total_blocked=%.1fs " - "messages_written=%d bytes_written=%d queue_size=%d", - elapsed, - written, - block_count, - total_blocked_s, - messages_written, - bytes_written, - write_queue.qsize(), - ) + # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs + # Refer to: https://github.com/airbytehq/oncall/issues/6235 + try: + with PRINT_BUFFER: + for message in source_entrypoint.run(parsed_args): + # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and + # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time + data = f"{message}\n" + print_blocked = True + print_blocked_since = time.monotonic() + print(data, end="") + print_blocked = False messages_written += 1 bytes_written += len(data) - last_write_ts = time.monotonic() - except (KeyboardInterrupt, SystemExit): - raise - except Exception as exc: - writer_error.append(exc) - - writer = threading.Thread(target=_stdout_writer, name="stdout-writer", daemon=True) - writer.start() - logger.info( - "STDOUT_WRITER: started writer_thread fd=%d queue_size=%d watchdog=%ds heartbeat=%ds", - real_fd, - _WRITE_QUEUE_SIZE, - _WATCHDOG_TIMEOUT_S, - int(_HEARTBEAT_INTERVAL_S), - ) - - try: - last_progress = time.monotonic() - for message in messages: - if writer_error: - raise writer_error[0] - data = f"{message}\n".encode() - while True: - try: - write_queue.put(data, timeout=1.0) - last_progress = time.monotonic() - break - except queue.Full: - if writer_error: - raise writer_error[0] - elapsed = time.monotonic() - last_progress - if int(elapsed) % 30 == 0 and int(elapsed) > 0: - logger.warning( - "STDOUT_WRITER: write_queue full for %.0fs. " - "writer_messages=%d writer_bytes=%d " - "writer_blocks=%d writer_total_blocked=%.1fs " - "writer_last_write=%.1fs_ago queue_size=%d", - elapsed, - messages_written, - bytes_written, - block_count, - total_blocked_s, - time.monotonic() - last_write_ts, - write_queue.qsize(), - ) - if elapsed > _WATCHDOG_TIMEOUT_S: - raise RuntimeError( - f"stdout pipe blocked for {elapsed:.0f}s with no progress " - f"(watchdog timeout={_WATCHDOG_TIMEOUT_S}s). " - f"Writer stats: messages={messages_written} bytes={bytes_written} " - f"blocks={block_count} total_blocked={total_blocked_s:.1f}s " - f"last_write={time.monotonic() - last_write_ts:.1f}s ago. " - "Terminating process to prevent indefinite hang." - ) finally: heartbeat_stop.set() - # Signal writer to drain remaining messages and exit. - write_queue.put(None) - writer.join(timeout=30) def _init_internal_request_filter() -> None: From 17b4a6f7596e2542a51887710a276b723471161b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:32:20 +0000 Subject: [PATCH 11/14] style: fix ruff formatting Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 98018e0f4..0205d95cf 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -399,9 +399,7 @@ def _heartbeat() -> None: elapsed = now - start blocked_str = "YES" if print_blocked else "NO" blocked_dur = ( - f" blocked_since={now - print_blocked_since:.0f}s" - if print_blocked - else "" + f" blocked_since={now - print_blocked_since:.0f}s" if print_blocked else "" ) line = ( f"STDOUT_HEARTBEAT: t={elapsed:.0f}s " @@ -413,9 +411,7 @@ def _heartbeat() -> None: except OSError: pass - heartbeat_thread = threading.Thread( - target=_heartbeat, name="stdout-heartbeat", daemon=True - ) + heartbeat_thread = threading.Thread(target=_heartbeat, name="stdout-heartbeat", daemon=True) heartbeat_thread.start() # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs From 3a41aaa2b50f8e654ebc4372cfdbb80973952cf8 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:35:23 +0000 Subject: [PATCH 12/14] style: add explanatory comment to empty except clause Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 0205d95cf..c771a96bd 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -409,7 +409,7 @@ def _heartbeat() -> None: try: os.write(stderr_fd, line.encode()) except OSError: - pass + pass # Best-effort diagnostic — if stderr (fd 2) is broken, silently give up. heartbeat_thread = threading.Thread(target=_heartbeat, name="stdout-heartbeat", daemon=True) heartbeat_thread.start() From bf80db5d064e1a32ddbbc1cc73f0240a9bffbb88 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 31 Mar 2026 15:33:40 +0000 Subject: [PATCH 13/14] feat: add thread dumps and queue stats to heartbeat for deadlock diagnosis MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the heartbeat detects a stall (message count frozen for 90s), it now: - Dumps all thread stack traces via sys._current_frames() to show exactly what each worker thread is doing (blocked on HTTP, queue.put, lock, etc.) - Reports queue_size and queue_full on every heartbeat line - Re-dumps thread stacks every ~5 min during ongoing stalls The queue stats instantly reveal the deadlock type: - queue_size=0 + msgs frozen → workers stuck on HTTP/IO (Scenario B) - queue_size=10000 + print_blocked=YES → classic pipe deadlock (Scenario A) - queue_size=10000 + print_blocked=NO → main thread not consuming (bug) Adds a lightweight queue_registry module so the heartbeat thread in entrypoint.py can access the ConcurrentSource queue without threading it through the call chain. Co-Authored-By: gl_anatolii.yatsuk --- airbyte_cdk/entrypoint.py | 49 ++++++++++++++++++- .../concurrent_source/concurrent_source.py | 48 ++++++++++-------- .../concurrent_source/queue_registry.py | 41 ++++++++++++++++ 3 files changed, 115 insertions(+), 23 deletions(-) create mode 100644 airbyte_cdk/sources/concurrent_source/queue_registry.py diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index c771a96bd..c090f5a44 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -14,6 +14,7 @@ import tempfile import threading import time +import traceback from collections import defaultdict from functools import wraps from typing import Any, DefaultDict, Iterable, List, Mapping, Optional @@ -387,25 +388,69 @@ def launch(source: Source, args: List[str]) -> None: heartbeat_stop = threading.Event() def _heartbeat() -> None: - """Emit periodic status to stderr to diagnose stdout pipe blocking. + """Emit periodic status to stderr to diagnose stdout pipe blocking and deadlocks. Writes directly to fd 2 (stderr) which the Kubernetes container runtime collects independently of the orchestrator reading stdout. + + When a stall is detected (message count frozen for 3+ intervals = 90s), + a full thread dump is emitted to help diagnose deadlocks in the + concurrent source worker pool. """ + from airbyte_cdk.sources.concurrent_source.queue_registry import get_queue + start = time.monotonic() stderr_fd = 2 + last_msgs = 0 + stall_count = 0 + _STALL_THRESHOLD = 3 # intervals before triggering thread dump (3 * 30s = 90s) + _DUMP_REPEAT_INTERVAL = 10 # re-dump every 10 intervals (~5 min) during ongoing stall + while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S): now = time.monotonic() elapsed = now - start + + # Detect stall: same message count for multiple consecutive intervals + if messages_written == last_msgs and messages_written > 0: + stall_count += 1 + else: + stall_count = 0 + last_msgs = messages_written + blocked_str = "YES" if print_blocked else "NO" blocked_dur = ( f" blocked_since={now - print_blocked_since:.0f}s" if print_blocked else "" ) + + # Include queue stats if concurrent source is active + queue_stats = "" + q = get_queue() + if q is not None: + try: + queue_stats = f" queue_size={q.qsize()} queue_full={q.full()}" + except Exception: + pass # Queue methods are best-effort + line = ( f"STDOUT_HEARTBEAT: t={elapsed:.0f}s " f"msgs={messages_written} bytes={bytes_written} " - f"print_blocked={blocked_str}{blocked_dur}\n" + f"print_blocked={blocked_str}{blocked_dur}" + f"{queue_stats}\n" ) + + # Dump all thread stacks when stall is detected, then periodically during ongoing stall + if stall_count == _STALL_THRESHOLD or ( + stall_count > _STALL_THRESHOLD + and (stall_count - _STALL_THRESHOLD) % _DUMP_REPEAT_INTERVAL == 0 + ): + line += "=== THREAD DUMP (stall detected) ===\n" + thread_names = {t.ident: t.name for t in threading.enumerate()} + for thread_id, frame in sys._current_frames().items(): + thread_name = thread_names.get(thread_id, "unknown") + line += f"\nThread {thread_name} ({thread_id}):\n" + line += "".join(traceback.format_stack(frame)) + line += "=== END THREAD DUMP ===\n" + try: os.write(stderr_fd, line.encode()) except OSError: diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..6ffbdb0cd 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -12,6 +12,7 @@ from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( PartitionGenerationCompletedSentinel, ) +from airbyte_cdk.sources.concurrent_source.queue_registry import register_queue, unregister_queue from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository @@ -106,29 +107,34 @@ def read( streams: List[AbstractStream], ) -> Iterator[AirbyteMessage]: self._logger.info("Starting syncing") - concurrent_stream_processor = ConcurrentReadProcessor( - streams, - PartitionEnqueuer(self._queue, self._threadpool), - self._threadpool, - self._logger, - self._slice_logger, - self._message_repository, - PartitionReader( - self._queue, - PartitionLogger(self._slice_logger, self._logger, self._message_repository), - ), - ) + # Register queue so the heartbeat thread can report queue stats for deadlock diagnosis + register_queue(self._queue) + try: + concurrent_stream_processor = ConcurrentReadProcessor( + streams, + PartitionEnqueuer(self._queue, self._threadpool), + self._threadpool, + self._logger, + self._slice_logger, + self._message_repository, + PartitionReader( + self._queue, + PartitionLogger(self._slice_logger, self._logger, self._message_repository), + ), + ) - # Enqueue initial partition generation tasks - yield from self._submit_initial_partition_generators(concurrent_stream_processor) + # Enqueue initial partition generation tasks + yield from self._submit_initial_partition_generators(concurrent_stream_processor) - # Read from the queue until all partitions were generated and read - yield from self._consume_from_queue( - self._queue, - concurrent_stream_processor, - ) - self._threadpool.check_for_errors_and_shutdown() - self._logger.info("Finished syncing") + # Read from the queue until all partitions were generated and read + yield from self._consume_from_queue( + self._queue, + concurrent_stream_processor, + ) + self._threadpool.check_for_errors_and_shutdown() + self._logger.info("Finished syncing") + finally: + unregister_queue() def _submit_initial_partition_generators( self, concurrent_stream_processor: ConcurrentReadProcessor diff --git a/airbyte_cdk/sources/concurrent_source/queue_registry.py b/airbyte_cdk/sources/concurrent_source/queue_registry.py new file mode 100644 index 000000000..569ee4edf --- /dev/null +++ b/airbyte_cdk/sources/concurrent_source/queue_registry.py @@ -0,0 +1,41 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Module-level registry for the concurrent source queue. + +The heartbeat thread in entrypoint.py needs to report queue stats (size, full/empty) +to help diagnose deadlocks. Since the queue is created deep inside ConcurrentSource, +this registry provides a lightweight way to expose it without threading the queue +object through the entire call chain. + +Usage: + # In ConcurrentSource.read(): + register_queue(self._queue) + + # In the heartbeat thread: + q = get_queue() + if q is not None: + print(f"queue_size={q.qsize()} queue_full={q.full()}") +""" + +from queue import Queue +from typing import Optional + +from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem + +_queue: Optional[Queue[QueueItem]] = None + + +def register_queue(queue: Queue[QueueItem]) -> None: + """Register the concurrent source queue for heartbeat monitoring.""" + global _queue + _queue = queue + + +def get_queue() -> Optional[Queue[QueueItem]]: + """Return the registered queue, or None if no concurrent source is active.""" + return _queue + + +def unregister_queue() -> None: + """Clear the registered queue.""" + global _queue + _queue = None From b7818b090b53378094e0eb14f3de1e1cfabf8b83 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 13:52:50 +0000 Subject: [PATCH 14/14] fix: prevent deadlock when main thread puts on full queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The main thread is the sole consumer of the shared queue. Three code paths in _handle_item cause it to call queue.put() via ConcurrentMessageRepository.emit_message(): 1. PartitionCompleteSentinel → _on_stream_is_done → ensure_at_least_one_state_emitted 2. PartitionGenerationCompletedSentinel → _on_stream_is_done → same 3. Partition → on_partition → emit_message(slice_log) If the queue is full (10,000 items from workers), the main thread blocks on put() and nobody drains the queue → deadlock. Fix: detect the consumer thread (main thread) via thread ID captured at construction time. Main thread uses non-blocking put(block=False); if Full, messages are buffered in a deque and drained via consume_queue(), which the main thread already calls after processing every queue item. Worker threads continue using blocking put() for normal backpressure. Co-Authored-By: gl_anatolii.yatsuk --- .../sources/message/concurrent_repository.py | 56 +++++++++++++++---- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/airbyte_cdk/sources/message/concurrent_repository.py b/airbyte_cdk/sources/message/concurrent_repository.py index e3bc7116a..ede405fba 100644 --- a/airbyte_cdk/sources/message/concurrent_repository.py +++ b/airbyte_cdk/sources/message/concurrent_repository.py @@ -1,11 +1,11 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. import logging -import os -from queue import Queue +import threading +from collections import deque +from queue import Full, Queue from typing import Callable, Iterable from airbyte_cdk.models import AirbyteMessage, Level -from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.message.repository import LogMessage, MessageRepository from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem @@ -23,25 +23,61 @@ class ConcurrentMessageRepository(MessageRepository): This is particularly important for the connector builder which relies on grouping of messages to organize request/response, pages, and partitions. + + DEADLOCK PREVENTION: + The main thread is the sole consumer of the shared queue. If it calls queue.put() + while the queue is full, it deadlocks — nobody else will drain the queue. + This happens in 3 code paths from _handle_item: + 1. PartitionCompleteSentinel → _on_stream_is_done → ensure_at_least_one_state_emitted → emit_message → queue.put(state) + 2. PartitionGenerationCompletedSentinel → _on_stream_is_done → same path + 3. Partition → on_partition → emit_message(slice_log) → queue.put(log) + To prevent this, the main thread uses non-blocking put(block=False). If the queue + is full, messages are buffered in _pending and drained via consume_queue(), which + the main thread calls after processing every queue item. + Worker threads continue using blocking put() for normal backpressure. """ def __init__(self, queue: Queue[QueueItem], message_repository: MessageRepository): self._queue = queue self._decorated_message_repository = message_repository + # Capture the thread ID of the consumer (main thread) at construction time. + # This is always the main thread because ConcurrentSource.__init__ (and the + # declarative source that creates this repository) runs on the main thread. + self._consumer_thread_id = threading.get_ident() + # Buffer for messages that couldn't be put on the queue from the main thread + # because the queue was full. Drained by consume_queue(). + # deque.append() and deque.popleft() are atomic in CPython (GIL-protected). + self._pending: deque[AirbyteMessage] = deque() + + def _put_on_queue(self, message: AirbyteMessage) -> None: + """Put a message on the shared queue, with deadlock prevention for the main thread.""" + if threading.get_ident() == self._consumer_thread_id: + # Main thread (consumer): non-blocking to prevent self-deadlock. + # If queue is full, buffer the message — it will be drained via consume_queue(). + try: + self._queue.put(message, block=False) + except Full: + self._pending.append(message) + else: + # Worker thread: blocking put for normal backpressure. + self._queue.put(message) def emit_message(self, message: AirbyteMessage) -> None: self._decorated_message_repository.emit_message(message) - for message in self._decorated_message_repository.consume_queue(): - self._queue.put(message) + for msg in self._decorated_message_repository.consume_queue(): + self._put_on_queue(msg) def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: self._decorated_message_repository.log_message(level, message_provider) - for message in self._decorated_message_repository.consume_queue(): - self._queue.put(message) + for msg in self._decorated_message_repository.consume_queue(): + self._put_on_queue(msg) def consume_queue(self) -> Iterable[AirbyteMessage]: """ - This method shouldn't need to be called because as part of emit_message() we are already - loading messages onto the queue processed on the main thread. + Drain any messages that were buffered because the queue was full when the + main thread tried to put them. This is called by the main thread after + processing every queue item (in on_record, on_partition_complete_sentinel, + _on_stream_is_done), ensuring buffered messages are yielded promptly. """ - yield from [] + while self._pending: + yield self._pending.popleft()