Skip to content

Commit 312a994

Browse files
fix: redirect logging handler to non-blocking buffer to prevent deadlock
The logging handler's stream is PrintBuffer, which writes to sys.__stdout__ while holding an RLock. When the platform pauses reading from the stdout pipe, PrintBuffer.flush() blocks on the pipe while holding the lock. Any thread that tries to log (including the main thread's heartbeat) also blocks waiting for the RLock. This commit redirects all logging handler streams to a _QueueStream that puts messages into the same unbounded buffer used by the stdout writer thread. The writer thread is now the ONLY thing that touches sys.__stdout__, so no other thread can block on the pipe. Co-Authored-By: unknown <>
1 parent 63fbd7e commit 312a994

1 file changed

Lines changed: 57 additions & 6 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import argparse
66
import importlib
7+
import io
78
import ipaddress
89
import json
910
import logging
@@ -382,14 +383,49 @@ def launch(source: Source, args: List[str]) -> None:
382383
_buffered_write_to_stdout(source_entrypoint.run(parsed_args))
383384

384385

386+
class _QueueStream(io.TextIOBase):
387+
"""A file-like stream that puts each write into an unbounded queue.
388+
389+
This is used to replace the logging handler's stream (and optionally
390+
``sys.stdout`` / ``sys.stderr``) so that **no thread** performs
391+
blocking writes to the real stdout pipe. A single background writer
392+
thread drains the queue and is the only thing that touches
393+
``sys.__stdout__``.
394+
"""
395+
396+
def __init__(self, buffer: "Queue[Optional[str]]") -> None:
397+
self._buffer = buffer
398+
399+
def write(self, data: str) -> int: # type: ignore[override]
400+
# StreamHandler writes the formatted message, then the terminator
401+
# ("\n"). We strip trailing newlines because the writer thread
402+
# adds its own.
403+
stripped = data.rstrip("\n")
404+
if stripped:
405+
self._buffer.put(stripped)
406+
return len(data)
407+
408+
def flush(self) -> None:
409+
pass # No-op: the writer thread handles actual I/O.
410+
411+
def writable(self) -> bool:
412+
return True
413+
414+
385415
def _buffered_write_to_stdout(messages: Iterable[str]) -> None:
386416
"""Drain *messages* through a background writer thread.
387417
388418
The main thread puts serialised messages into an in-memory queue.
389419
A dedicated daemon thread reads from that queue and performs the
390-
blocking ``print()`` calls. This prevents stdout pipe backpressure
391-
from stalling the main thread (and, by extension, the CDK's
392-
internal record queue).
420+
blocking ``sys.__stdout__`` writes. This prevents stdout pipe
421+
backpressure from stalling the main thread (and, by extension, the
422+
CDK's internal record queue).
423+
424+
**Critically**, this function also redirects the Python logging
425+
handler's stream to the same queue. Without this, every
426+
``logger.info()`` call still goes through ``PrintBuffer.flush()``
427+
→ ``sys.__stdout__.write()`` which blocks on the pipe. The main
428+
thread logs heartbeat messages, so it deadlocks the same way.
393429
394430
If the background writer encounters an error the exception is
395431
re-raised in the main thread after the generator is exhausted.
@@ -404,19 +440,34 @@ def _writer() -> None:
404440
item = buffer.get()
405441
if item is _SENTINEL:
406442
return
407-
# Adding `\n` to the message ensures both the payload and
408-
# the line break are printed in a single write call.
409-
print(f"{item}\n", end="")
443+
sys.__stdout__.write(f"{item}\n") # type: ignore[union-attr]
444+
sys.__stdout__.flush() # type: ignore[union-attr]
410445
except Exception as exc:
411446
writer_error.append(exc)
412447

413448
writer_thread = threading.Thread(target=_writer, daemon=True, name="stdout-writer")
414449
writer_thread.start()
415450

451+
# Redirect the root logger's handler stream so that logger.info() etc.
452+
# go through the non-blocking buffer instead of PrintBuffer → pipe.
453+
queue_stream = _QueueStream(buffer)
454+
original_handler_streams: List[Any] = []
455+
root_logger = logging.getLogger()
456+
for handler in root_logger.handlers:
457+
if isinstance(handler, logging.StreamHandler):
458+
original_handler_streams.append(handler.stream)
459+
handler.stream = queue_stream # type: ignore[assignment]
460+
416461
try:
417462
for message in messages:
418463
buffer.put(message)
419464
finally:
465+
# Restore original handler streams before shutting down the writer.
466+
idx = 0
467+
for handler in root_logger.handlers:
468+
if isinstance(handler, logging.StreamHandler) and idx < len(original_handler_streams):
469+
handler.stream = original_handler_streams[idx]
470+
idx += 1
420471
buffer.put(_SENTINEL)
421472
writer_thread.join(timeout=300)
422473

0 commit comments

Comments
 (0)