Skip to content

Commit e4ce817

Browse files
fix: set stderr fd 2 to non-blocking mode to prevent deadlock from pipe backpressure
Co-Authored-By: unknown <>
1 parent 4c6b4b1 commit e4ce817

2 files changed

Lines changed: 62 additions & 4 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
import argparse
6+
import fcntl
67
import importlib
78
import io
89
import ipaddress
@@ -442,18 +443,40 @@ def __getattr__(self, name: str) -> Any:
442443
return getattr(self._original, name)
443444

444445

446+
def _ensure_stderr_nonblock() -> None:
447+
"""Set stderr fd 2 to non-blocking mode (once).
448+
449+
When the Airbyte platform stops reading from the source container's
450+
stderr pipe, the pipe buffer fills and any ``os.write(2, ...)`` call
451+
blocks the calling thread. If that thread is the main thread, the
452+
CDK's record queue fills and all workers deadlock.
453+
454+
Setting ``O_NONBLOCK`` makes ``os.write(2, ...)`` raise
455+
``BlockingIOError`` (EAGAIN) instead of blocking, which
456+
``_stderr_diag`` already catches.
457+
"""
458+
try:
459+
flags = fcntl.fcntl(2, fcntl.F_GETFL)
460+
fcntl.fcntl(2, fcntl.F_SETFL, flags | os.O_NONBLOCK)
461+
except Exception:
462+
# Best-effort; some environments may not support fcntl on fd 2.
463+
pass
464+
465+
445466
def _stderr_diag(msg: str) -> None:
446467
"""Write a diagnostic message directly to stderr fd.
447468
448469
Uses ``os.write()`` on the raw file descriptor so the write bypasses
449470
*all* Python buffering (``sys.stderr``, ``PrintBuffer``, logging
450-
handlers). This works even when the stdout/stderr pipes are blocked
451-
because ``os.write(2, …)`` is a direct syscall on fd 2.
471+
handlers). fd 2 is set to non-blocking mode so this never stalls
472+
the calling thread.
452473
"""
453474
try:
454475
os.write(2, f"DIAG: {msg}\n".encode())
455476
except Exception:
456-
pass # Best-effort; must not prevent caller from continuing.
477+
# Best-effort; catches BlockingIOError (EAGAIN) when pipe is
478+
# full, plus any other I/O error.
479+
pass
457480

458481

459482
def _buffered_write_to_stdout(messages: Iterable[str]) -> None:
@@ -490,6 +513,8 @@ def _buffered_write_to_stdout(messages: Iterable[str]) -> None:
490513
print(message)
491514
return
492515

516+
_ensure_stderr_nonblock()
517+
493518
_SENTINEL = None # signals the writer to stop
494519
buffer: Queue[Optional[str]] = Queue()
495520
writer_error: List[Exception] = []

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
import concurrent
6+
import fcntl
67
import logging
78
import os
89
import sys
@@ -119,6 +120,7 @@ def read(
119120
# (consumes an item from the queue). The watchdog reads this to
120121
# detect when the main thread is stuck.
121122
self._last_progress_time = time.monotonic()
123+
self._ensure_stderr_nonblock()
122124
self._watchdog_should_run = True
123125
watchdog = threading.Thread(
124126
target=self._watchdog_loop,
@@ -162,18 +164,49 @@ def _submit_initial_partition_generators(
162164
if status_message:
163165
yield status_message
164166

167+
_stderr_nonblock_set = False
168+
169+
@classmethod
170+
def _ensure_stderr_nonblock(cls) -> None:
171+
"""Set stderr fd 2 to non-blocking mode (once).
172+
173+
In Airbyte Cloud the platform reads the source container's stdout and
174+
stderr pipes. If the platform pauses reading (e.g. destination
175+
backpressure), both pipe buffers fill up. A blocking ``os.write(2,
176+
...)`` would then stall whichever thread called it — including the
177+
main thread, which causes the CDK queue to fill and deadlock all
178+
workers.
179+
180+
Setting ``O_NONBLOCK`` on fd 2 makes ``os.write(2, ...)`` return
181+
immediately with ``BlockingIOError`` (EAGAIN) instead of blocking.
182+
The ``_diag`` method already catches all exceptions, so the message
183+
is simply dropped when the pipe is full.
184+
"""
185+
if cls._stderr_nonblock_set:
186+
return
187+
try:
188+
flags = fcntl.fcntl(2, fcntl.F_GETFL)
189+
fcntl.fcntl(2, fcntl.F_SETFL, flags | os.O_NONBLOCK)
190+
cls._stderr_nonblock_set = True
191+
except Exception:
192+
# Best-effort; some environments may not support fcntl on fd 2.
193+
pass
194+
165195
@staticmethod
166196
def _diag(msg: str) -> None:
167197
"""Write diagnostic message directly to stderr fd 2.
168198
169199
Bypasses all Python buffering (sys.stderr, logging, PrintBuffer)
170200
so the message is visible even when stdout/stderr pipes are blocked.
201+
The fd is set to non-blocking mode by ``_ensure_stderr_nonblock``
202+
so this call never stalls the calling thread.
171203
"""
172204
try:
173205
os.write(2, f"DIAG: {msg}\n".encode())
174206
except Exception:
175207
# Intentionally ignored: diagnostics are best-effort and must
176-
# never interfere with program execution.
208+
# never interfere with program execution. In non-blocking mode
209+
# this catches BlockingIOError (EAGAIN) when the pipe is full.
177210
pass
178211

179212
def _consume_from_queue(

0 commit comments

Comments
 (0)