Skip to content

Commit b6f3f36

Browse files
feat: add diagnostic logging to stdout writer thread to track pipe write timing
Logs when os.write() blocks for >5s (indicates platform paused reading), and logs every 30s when write_queue is full. This will help validate whether the platform ever resumes reading from the pipe after a pause. Co-Authored-By: unknown <>
1 parent 583e6ee commit b6f3f36

1 file changed

Lines changed: 46 additions & 0 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,17 +438,46 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None:
438438
write_queue: queue.Queue[Optional[bytes]] = queue.Queue(maxsize=_WRITE_QUEUE_SIZE)
439439
writer_error: List[Exception] = []
440440

441+
logger = logging.getLogger("airbyte_cdk.stdout_writer")
442+
_BLOCK_LOG_THRESHOLD_S = 5.0 # log when a single write blocks longer than this
443+
messages_written = 0
444+
bytes_written = 0
445+
last_write_ts = time.monotonic()
446+
total_blocked_s = 0.0
447+
block_count = 0
448+
441449
def _stdout_writer() -> None:
442450
"""Dedicated thread that writes queued messages to stdout."""
451+
nonlocal messages_written, bytes_written, last_write_ts, total_blocked_s, block_count
443452
try:
444453
while True:
445454
data = write_queue.get()
446455
if data is None:
447456
break
448457
total = 0
449458
while total < len(data):
459+
before = time.monotonic()
450460
written = os.write(real_fd, data[total:])
461+
elapsed = time.monotonic() - before
451462
total += written
463+
if elapsed >= _BLOCK_LOG_THRESHOLD_S:
464+
block_count += 1
465+
total_blocked_s += elapsed
466+
logger.warning(
467+
"STDOUT_WRITER: os.write() blocked for %.1fs "
468+
"(wrote %d bytes). block_count=%d total_blocked=%.1fs "
469+
"messages_written=%d bytes_written=%d queue_size=%d",
470+
elapsed,
471+
written,
472+
block_count,
473+
total_blocked_s,
474+
messages_written,
475+
bytes_written,
476+
write_queue.qsize(),
477+
)
478+
messages_written += 1
479+
bytes_written += len(data)
480+
last_write_ts = time.monotonic()
452481
except (KeyboardInterrupt, SystemExit):
453482
raise
454483
except Exception as exc:
@@ -472,10 +501,27 @@ def _stdout_writer() -> None:
472501
if writer_error:
473502
raise writer_error[0]
474503
elapsed = time.monotonic() - last_progress
504+
if int(elapsed) % 30 == 0 and int(elapsed) > 0:
505+
logger.warning(
506+
"STDOUT_WRITER: write_queue full for %.0fs. "
507+
"writer_messages=%d writer_bytes=%d "
508+
"writer_blocks=%d writer_total_blocked=%.1fs "
509+
"writer_last_write=%.1fs_ago queue_size=%d",
510+
elapsed,
511+
messages_written,
512+
bytes_written,
513+
block_count,
514+
total_blocked_s,
515+
time.monotonic() - last_write_ts,
516+
write_queue.qsize(),
517+
)
475518
if elapsed > _WATCHDOG_TIMEOUT_S:
476519
raise RuntimeError(
477520
f"stdout pipe blocked for {elapsed:.0f}s with no progress "
478521
f"(watchdog timeout={_WATCHDOG_TIMEOUT_S}s). "
522+
f"Writer stats: messages={messages_written} bytes={bytes_written} "
523+
f"blocks={block_count} total_blocked={total_blocked_s:.1f}s "
524+
f"last_write={time.monotonic() - last_write_ts:.1f}s ago. "
479525
"Terminating process to prevent indefinite hang."
480526
)
481527
finally:

0 commit comments

Comments
 (0)