Skip to content

Commit c088a51

Browse files
diag: add os.write(2,...) diagnostics to trace main thread blocking point
Adds periodic diagnostic messages written directly to stderr fd 2 (bypassing all Python buffering) at key points: 1. _consume_from_queue: every 10s logs total_items, total_yields, qsize, and last_get_wait. Also logs SLOW yields (>5s). 2. _buffered_write_to_stdout: every 10s logs msg_count, buffer_qsize, and writer_alive status. These diagnostics will be visible in platform logs even when the stdout pipe is blocked, allowing us to pinpoint exactly where the main thread gets stuck. Co-Authored-By: unknown <>
1 parent c5038c5 commit c088a51

2 files changed

Lines changed: 76 additions & 2 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import sys
1414
import tempfile
1515
import threading
16+
import time
1617
from collections import defaultdict
1718
from functools import wraps
1819
from queue import Queue
@@ -539,8 +540,23 @@ def _writer() -> None:
539540
)
540541

541542
try:
543+
msg_count = 0
544+
last_diag = time.monotonic()
542545
for message in messages:
543546
buffer.put(message)
547+
msg_count += 1
548+
now = time.monotonic()
549+
if now - last_diag >= 10.0:
550+
_stderr_diag(
551+
f"_buffered_write_to_stdout: alive, "
552+
f"msg_count={msg_count}, buffer_qsize={buffer.qsize()}, "
553+
f"writer_alive={writer_thread.is_alive()}"
554+
)
555+
last_diag = now
556+
_stderr_diag(
557+
f"_buffered_write_to_stdout: generator exhausted, "
558+
f"msg_count={msg_count}, buffer_qsize={buffer.qsize()}"
559+
)
544560
finally:
545561
# Restore sys.stdout / sys.stderr before shutting down.
546562
sys.stdout = original_stdout

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,18 @@ def _submit_initial_partition_generators(
162162
if status_message:
163163
yield status_message
164164

165+
@staticmethod
166+
def _diag(msg: str) -> None:
167+
"""Write diagnostic message directly to stderr fd 2.
168+
169+
Bypasses all Python buffering (sys.stderr, logging, PrintBuffer)
170+
so the message is visible even when stdout/stderr pipes are blocked.
171+
"""
172+
try:
173+
os.write(2, f"DIAG: {msg}\n".encode())
174+
except Exception:
175+
pass
176+
165177
def _consume_from_queue(
166178
self,
167179
queue: Queue[QueueItem],
@@ -170,12 +182,24 @@ def _consume_from_queue(
170182
last_item_time = time.monotonic()
171183
heartbeat_interval = 60.0 # Log heartbeat every 60 seconds
172184
items_since_last_heartbeat = 0
185+
total_items = 0
186+
total_yields = 0
187+
last_diag_time = time.monotonic()
188+
diag_interval = 10.0 # Diagnostic log every 10 seconds
189+
190+
self._diag("_consume_from_queue: ENTER")
173191

174192
while True:
193+
now_pre_get = time.monotonic()
175194
try:
176195
airbyte_message_or_record_or_exception = queue.get(timeout=heartbeat_interval)
177196
except Empty:
178197
elapsed = time.monotonic() - last_item_time
198+
self._diag(
199+
f"_consume_from_queue: EMPTY after {elapsed:.0f}s, "
200+
f"qsize={queue.qsize()}, pool_done={self._threadpool.is_done()}, "
201+
f"threads={threading.active_count()}"
202+
)
179203
self._logger.info(
180204
"Queue heartbeat: no items received for %.0fs. "
181205
"queue_size=%d, threadpool_done=%s, active_threads=%d",
@@ -187,10 +211,26 @@ def _consume_from_queue(
187211
continue
188212

189213
if not airbyte_message_or_record_or_exception:
214+
self._diag("_consume_from_queue: got sentinel, breaking")
190215
break
191216

192217
now = time.monotonic()
218+
get_wait = now - now_pre_get
219+
total_items += 1
193220
items_since_last_heartbeat += 1
221+
222+
# Periodic diagnostic via os.write(2,...) — visible even when
223+
# stdout pipe is blocked.
224+
if now - last_diag_time >= diag_interval:
225+
item_type = type(airbyte_message_or_record_or_exception).__name__
226+
self._diag(
227+
f"_consume_from_queue: alive, "
228+
f"total_items={total_items}, total_yields={total_yields}, "
229+
f"qsize={queue.qsize()}, last_get_wait={get_wait:.3f}s, "
230+
f"item_type={item_type}"
231+
)
232+
last_diag_time = now
233+
194234
if now - last_item_time >= heartbeat_interval:
195235
self._logger.info(
196236
"Queue heartbeat: processed %d items in last %.0fs. "
@@ -204,17 +244,35 @@ def _consume_from_queue(
204244
self._last_progress_time = now
205245
last_item_time = now
206246

207-
yield from self._handle_item(
247+
pre_yield = time.monotonic()
248+
for msg in self._handle_item(
208249
airbyte_message_or_record_or_exception,
209250
concurrent_stream_processor,
210-
)
251+
):
252+
total_yields += 1
253+
yield msg
254+
post_yield = time.monotonic()
255+
yield_dur = post_yield - pre_yield
256+
if yield_dur > 5.0:
257+
self._diag(
258+
f"_consume_from_queue: SLOW yield, "
259+
f"duration={yield_dur:.1f}s, "
260+
f"item_type={type(airbyte_message_or_record_or_exception).__name__}"
261+
)
262+
211263
# In the event that a partition raises an exception, anything remaining in
212264
# the queue will be missed because is_done() can raise an exception and exit
213265
# out of this loop before remaining items are consumed
214266
if queue.empty() and concurrent_stream_processor.is_done():
215267
# all partitions were generated and processed. we're done here
268+
self._diag("_consume_from_queue: all done, breaking")
216269
break
217270

271+
self._diag(
272+
f"_consume_from_queue: EXIT, total_items={total_items}, "
273+
f"total_yields={total_yields}"
274+
)
275+
218276
def _watchdog_loop(self) -> None:
219277
"""Daemon thread that terminates the process when the main thread stalls.
220278

0 commit comments

Comments
 (0)