Skip to content

Commit bf80db5

Browse files
feat: add thread dumps and queue stats to heartbeat for deadlock diagnosis
When the heartbeat detects a stall (message count frozen for 90s), it now: - Dumps all thread stack traces via sys._current_frames() to show exactly what each worker thread is doing (blocked on HTTP, queue.put, lock, etc.) - Reports queue_size and queue_full on every heartbeat line - Re-dumps thread stacks every ~5 min during ongoing stalls The queue stats instantly reveal the deadlock type: - queue_size=0 + msgs frozen → workers stuck on HTTP/IO (Scenario B) - queue_size=10000 + print_blocked=YES → classic pipe deadlock (Scenario A) - queue_size=10000 + print_blocked=NO → main thread not consuming (bug) Adds a lightweight queue_registry module so the heartbeat thread in entrypoint.py can access the ConcurrentSource queue without threading it through the call chain. Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
1 parent 3a41aaa commit bf80db5

3 files changed

Lines changed: 115 additions & 23 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import tempfile
1515
import threading
1616
import time
17+
import traceback
1718
from collections import defaultdict
1819
from functools import wraps
1920
from typing import Any, DefaultDict, Iterable, List, Mapping, Optional
@@ -387,25 +388,69 @@ def launch(source: Source, args: List[str]) -> None:
387388
heartbeat_stop = threading.Event()
388389

389390
def _heartbeat() -> None:
390-
"""Emit periodic status to stderr to diagnose stdout pipe blocking.
391+
"""Emit periodic status to stderr to diagnose stdout pipe blocking and deadlocks.
391392
392393
Writes directly to fd 2 (stderr) which the Kubernetes container
393394
runtime collects independently of the orchestrator reading stdout.
395+
396+
When a stall is detected (message count frozen for 3+ intervals = 90s),
397+
a full thread dump is emitted to help diagnose deadlocks in the
398+
concurrent source worker pool.
394399
"""
400+
from airbyte_cdk.sources.concurrent_source.queue_registry import get_queue
401+
395402
start = time.monotonic()
396403
stderr_fd = 2
404+
last_msgs = 0
405+
stall_count = 0
406+
_STALL_THRESHOLD = 3 # intervals before triggering thread dump (3 * 30s = 90s)
407+
_DUMP_REPEAT_INTERVAL = 10 # re-dump every 10 intervals (~5 min) during ongoing stall
408+
397409
while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S):
398410
now = time.monotonic()
399411
elapsed = now - start
412+
413+
# Detect stall: same message count for multiple consecutive intervals
414+
if messages_written == last_msgs and messages_written > 0:
415+
stall_count += 1
416+
else:
417+
stall_count = 0
418+
last_msgs = messages_written
419+
400420
blocked_str = "YES" if print_blocked else "NO"
401421
blocked_dur = (
402422
f" blocked_since={now - print_blocked_since:.0f}s" if print_blocked else ""
403423
)
424+
425+
# Include queue stats if concurrent source is active
426+
queue_stats = ""
427+
q = get_queue()
428+
if q is not None:
429+
try:
430+
queue_stats = f" queue_size={q.qsize()} queue_full={q.full()}"
431+
except Exception:
432+
pass # Queue methods are best-effort
433+
404434
line = (
405435
f"STDOUT_HEARTBEAT: t={elapsed:.0f}s "
406436
f"msgs={messages_written} bytes={bytes_written} "
407-
f"print_blocked={blocked_str}{blocked_dur}\n"
437+
f"print_blocked={blocked_str}{blocked_dur}"
438+
f"{queue_stats}\n"
408439
)
440+
441+
# Dump all thread stacks when stall is detected, then periodically during ongoing stall
442+
if stall_count == _STALL_THRESHOLD or (
443+
stall_count > _STALL_THRESHOLD
444+
and (stall_count - _STALL_THRESHOLD) % _DUMP_REPEAT_INTERVAL == 0
445+
):
446+
line += "=== THREAD DUMP (stall detected) ===\n"
447+
thread_names = {t.ident: t.name for t in threading.enumerate()}
448+
for thread_id, frame in sys._current_frames().items():
449+
thread_name = thread_names.get(thread_id, "unknown")
450+
line += f"\nThread {thread_name} ({thread_id}):\n"
451+
line += "".join(traceback.format_stack(frame))
452+
line += "=== END THREAD DUMP ===\n"
453+
409454
try:
410455
os.write(stderr_fd, line.encode())
411456
except OSError:

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import (
1313
PartitionGenerationCompletedSentinel,
1414
)
15+
from airbyte_cdk.sources.concurrent_source.queue_registry import register_queue, unregister_queue
1516
from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException
1617
from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager
1718
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
@@ -106,29 +107,34 @@ def read(
106107
streams: List[AbstractStream],
107108
) -> Iterator[AirbyteMessage]:
108109
self._logger.info("Starting syncing")
109-
concurrent_stream_processor = ConcurrentReadProcessor(
110-
streams,
111-
PartitionEnqueuer(self._queue, self._threadpool),
112-
self._threadpool,
113-
self._logger,
114-
self._slice_logger,
115-
self._message_repository,
116-
PartitionReader(
117-
self._queue,
118-
PartitionLogger(self._slice_logger, self._logger, self._message_repository),
119-
),
120-
)
110+
# Register queue so the heartbeat thread can report queue stats for deadlock diagnosis
111+
register_queue(self._queue)
112+
try:
113+
concurrent_stream_processor = ConcurrentReadProcessor(
114+
streams,
115+
PartitionEnqueuer(self._queue, self._threadpool),
116+
self._threadpool,
117+
self._logger,
118+
self._slice_logger,
119+
self._message_repository,
120+
PartitionReader(
121+
self._queue,
122+
PartitionLogger(self._slice_logger, self._logger, self._message_repository),
123+
),
124+
)
121125

122-
# Enqueue initial partition generation tasks
123-
yield from self._submit_initial_partition_generators(concurrent_stream_processor)
126+
# Enqueue initial partition generation tasks
127+
yield from self._submit_initial_partition_generators(concurrent_stream_processor)
124128

125-
# Read from the queue until all partitions were generated and read
126-
yield from self._consume_from_queue(
127-
self._queue,
128-
concurrent_stream_processor,
129-
)
130-
self._threadpool.check_for_errors_and_shutdown()
131-
self._logger.info("Finished syncing")
129+
# Read from the queue until all partitions were generated and read
130+
yield from self._consume_from_queue(
131+
self._queue,
132+
concurrent_stream_processor,
133+
)
134+
self._threadpool.check_for_errors_and_shutdown()
135+
self._logger.info("Finished syncing")
136+
finally:
137+
unregister_queue()
132138

133139
def _submit_initial_partition_generators(
134140
self, concurrent_stream_processor: ConcurrentReadProcessor
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
"""Module-level registry for the concurrent source queue.
3+
4+
The heartbeat thread in entrypoint.py needs to report queue stats (size, full/empty)
5+
to help diagnose deadlocks. Since the queue is created deep inside ConcurrentSource,
6+
this registry provides a lightweight way to expose it without threading the queue
7+
object through the entire call chain.
8+
9+
Usage:
10+
# In ConcurrentSource.read():
11+
register_queue(self._queue)
12+
13+
# In the heartbeat thread:
14+
q = get_queue()
15+
if q is not None:
16+
print(f"queue_size={q.qsize()} queue_full={q.full()}")
17+
"""
18+
19+
from queue import Queue
20+
from typing import Optional
21+
22+
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
23+
24+
_queue: Optional[Queue[QueueItem]] = None
25+
26+
27+
def register_queue(queue: Queue[QueueItem]) -> None:
28+
"""Register the concurrent source queue for heartbeat monitoring."""
29+
global _queue
30+
_queue = queue
31+
32+
33+
def get_queue() -> Optional[Queue[QueueItem]]:
34+
"""Return the registered queue, or None if no concurrent source is active."""
35+
return _queue
36+
37+
38+
def unregister_queue() -> None:
39+
"""Clear the registered queue."""
40+
global _queue
41+
_queue = None

0 commit comments

Comments
 (0)