Skip to content

Commit c5038c5

Browse files
fix: replace sys.stdout/stderr with non-blocking proxies to prevent deadlock
Previous fix only redirected logging handler streams to the non-blocking buffer queue. This was insufficient because other code paths (print() calls, direct sys.stdout.write()) still wrote to the real pipe and could block when the platform paused reading. This commit adds two additional layers of protection: 1. _StdoutProxy: A proxy object that intercepts write() and flush() calls, routing them through the unbounded buffer queue. All other attribute access (encoding, fileno, buffer, etc.) is delegated to the original stream object. 2. sys.stdout and sys.stderr are replaced with _StdoutProxy instances inside _buffered_write_to_stdout(). This ensures that ANY write from ANY thread goes through the non-blocking buffer. 3. Diagnostic logging via os.write(2, ...) confirms the buffered writer is active and reports how many handlers were redirected and the original stdout/stderr types. The three layers (handler redirection, stdout/stderr replacement, and the writer thread) together ensure no thread ever blocks on the stdout pipe except the dedicated writer thread. Co-Authored-By: unknown <>
1 parent 05796a5 commit c5038c5

1 file changed

Lines changed: 76 additions & 12 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,49 @@ def writable(self) -> bool:
412412
return True
413413

414414

415+
class _StdoutProxy:
416+
"""Proxy for ``sys.stdout`` / ``sys.stderr`` that intercepts writes.
417+
418+
Unlike ``_QueueStream`` (which extends ``io.TextIOBase``), this proxy
419+
delegates *all* attribute access to the original stream object. This
420+
means code that inspects ``sys.stdout.encoding``, calls
421+
``sys.stdout.fileno()``, or accesses ``sys.stdout.buffer`` continues
422+
to work. Only ``write()`` and ``flush()`` are overridden to route
423+
data through the non-blocking buffer queue.
424+
"""
425+
426+
def __init__(self, original: Any, buffer: "Queue[Optional[str]]") -> None:
427+
# Use object.__setattr__ to bypass our own __setattr__ if we ever add one.
428+
object.__setattr__(self, "_original", original)
429+
object.__setattr__(self, "_buffer", buffer)
430+
431+
def write(self, data: str) -> int:
432+
stripped = str(data).rstrip("\n")
433+
if stripped:
434+
self._buffer.put(stripped)
435+
return len(data)
436+
437+
def flush(self) -> None:
438+
pass # No-op: the writer thread handles actual I/O.
439+
440+
def __getattr__(self, name: str) -> Any:
441+
return getattr(self._original, name)
442+
443+
444+
def _stderr_diag(msg: str) -> None:
445+
"""Write a diagnostic message directly to stderr fd.
446+
447+
Uses ``os.write()`` on the raw file descriptor so the write bypasses
448+
*all* Python buffering (``sys.stderr``, ``PrintBuffer``, logging
449+
handlers). This works even when the stdout/stderr pipes are blocked
450+
because ``os.write(2, …)`` is a direct syscall on fd 2.
451+
"""
452+
try:
453+
os.write(2, f"DIAG: {msg}\n".encode())
454+
except Exception:
455+
pass # Best-effort; must not prevent caller from continuing.
456+
457+
415458
def _buffered_write_to_stdout(messages: Iterable[str]) -> None:
416459
"""Drain *messages* through a background writer thread.
417460
@@ -421,11 +464,17 @@ def _buffered_write_to_stdout(messages: Iterable[str]) -> None:
421464
backpressure from stalling the main thread (and, by extension, the
422465
CDK's internal record queue).
423466
424-
**Critically**, this function also redirects the Python logging
425-
handler's stream to the same queue. Without this, every
426-
``logger.info()`` call still goes through ``PrintBuffer.flush()``
427-
→ ``sys.__stdout__.write()`` which blocks on the pipe. The main
428-
thread logs heartbeat messages, so it deadlocks the same way.
467+
Three layers of protection are applied:
468+
469+
1. **Logging handler streams** on the root logger that target stdout,
470+
stderr, or a ``PrintBuffer`` are replaced with a ``_QueueStream``
471+
that writes into the buffer queue.
472+
2. **``sys.stdout`` and ``sys.stderr``** are replaced with
473+
``_StdoutProxy`` objects that also write into the buffer queue.
474+
This catches *any* ``print()`` call or direct ``sys.stdout.write()``
475+
from any thread.
476+
3. The **stdout writer thread** is the only code that touches the
477+
real ``sys.__stdout__`` pipe.
429478
430479
In test environments (pytest), the buffered writer is skipped because
431480
writing to ``sys.__stdout__`` bypasses pytest's ``capsys`` capture.
@@ -458,12 +507,7 @@ def _writer() -> None:
458507
writer_thread = threading.Thread(target=_writer, daemon=True, name="stdout-writer")
459508
writer_thread.start()
460509

461-
# Redirect the root logger's handler stream so that logger.info() etc.
462-
# go through the non-blocking buffer instead of PrintBuffer → pipe.
463-
# Only redirect handlers whose stream targets stdout/stderr (or a
464-
# PrintBuffer wrapping stdout). Pytest installs a _LiveLoggingStream-
465-
# Handler whose stream is a TerminalWriter with extra methods like
466-
# .section(); replacing that stream would break pytest.
510+
# --- Layer 1: redirect logging handler streams ---
467511
_STDOUT_STREAMS = (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__)
468512
queue_stream = _QueueStream(buffer)
469513
redirected_handlers: List[logging.StreamHandler] = [] # type: ignore[type-arg]
@@ -477,11 +521,31 @@ def _writer() -> None:
477521
original_handler_streams.append(stream)
478522
handler.stream = queue_stream # type: ignore[assignment]
479523

524+
# --- Layer 2: replace sys.stdout / sys.stderr ---
525+
# This catches print() calls and direct sys.stdout.write() from any
526+
# thread, routing them through the non-blocking buffer instead of the
527+
# pipe. _StdoutProxy delegates all other attribute access (encoding,
528+
# fileno, buffer, etc.) to the original stream object.
529+
original_stdout = sys.stdout
530+
original_stderr = sys.stderr
531+
sys.stdout = _StdoutProxy(original_stdout, buffer) # type: ignore[assignment]
532+
sys.stderr = _StdoutProxy(original_stderr, buffer) # type: ignore[assignment]
533+
534+
_stderr_diag(
535+
f"Buffered writer ACTIVE: "
536+
f"handlers_redirected={len(redirected_handlers)}, "
537+
f"stdout_type={type(original_stdout).__name__}, "
538+
f"stderr_type={type(original_stderr).__name__}"
539+
)
540+
480541
try:
481542
for message in messages:
482543
buffer.put(message)
483544
finally:
484-
# Restore original handler streams before shutting down the writer.
545+
# Restore sys.stdout / sys.stderr before shutting down.
546+
sys.stdout = original_stdout
547+
sys.stderr = original_stderr
548+
# Restore original handler streams.
485549
for handler, orig_stream in zip(redirected_handlers, original_handler_streams):
486550
handler.stream = orig_stream
487551
buffer.put(_SENTINEL)

0 commit comments

Comments
 (0)