Skip to content

Commit 731ff7e

Browse files
feat: add diagnostic logging for deadlock debugging
- Add heartbeat logging to _consume_from_queue (logs every 60s when no items received) - Add STARTED/PROGRESS/COMPLETED/FAILED logging to PartitionReader.process_partition - Add STARTED/COMPLETED/FAILED logging to PartitionEnqueuer.generate_partitions - Use queue.get(timeout=60) instead of blocking queue.get() to enable heartbeat Co-Authored-By: unknown <>
1 parent 4ea2fe0 commit 731ff7e

3 files changed

Lines changed: 113 additions & 8 deletions

File tree

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
import concurrent
66
import logging
7-
from queue import Queue
7+
import threading
8+
import time
9+
from queue import Empty, Queue
810
from typing import Iterable, Iterator, List, Optional
911

1012
from airbyte_cdk.models import AirbyteMessage
@@ -143,7 +145,42 @@ def _consume_from_queue(
143145
queue: Queue[QueueItem],
144146
concurrent_stream_processor: ConcurrentReadProcessor,
145147
) -> Iterable[AirbyteMessage]:
146-
while airbyte_message_or_record_or_exception := queue.get():
148+
last_item_time = time.monotonic()
149+
heartbeat_interval = 60.0 # Log heartbeat every 60 seconds
150+
items_since_last_heartbeat = 0
151+
152+
while True:
153+
try:
154+
airbyte_message_or_record_or_exception = queue.get(timeout=heartbeat_interval)
155+
except Empty:
156+
elapsed = time.monotonic() - last_item_time
157+
self._logger.info(
158+
"Queue heartbeat: no items received for %.0fs. "
159+
"queue_size=%d, threadpool_done=%s, active_threads=%d",
160+
elapsed,
161+
queue.qsize(),
162+
self._threadpool.is_done(),
163+
threading.active_count(),
164+
)
165+
continue
166+
167+
if not airbyte_message_or_record_or_exception:
168+
break
169+
170+
now = time.monotonic()
171+
items_since_last_heartbeat += 1
172+
if now - last_item_time >= heartbeat_interval:
173+
self._logger.info(
174+
"Queue heartbeat: processed %d items in last %.0fs. "
175+
"queue_size=%d, item_type=%s",
176+
items_since_last_heartbeat,
177+
now - last_item_time,
178+
queue.qsize(),
179+
type(airbyte_message_or_record_or_exception).__name__,
180+
)
181+
items_since_last_heartbeat = 0
182+
last_item_time = now
183+
147184
yield from self._handle_item(
148185
airbyte_message_or_record_or_exception,
149186
concurrent_stream_processor,

airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4+
import logging
45
import time
56
from queue import Queue
67

@@ -33,17 +34,30 @@ def __init__(
3334
self._sleep_time_in_seconds = sleep_time_in_seconds
3435

3536
def generate_partitions(self, stream: AbstractStream) -> None:
36-
"""
37-
Generate partitions from a partition generator and put them in a queue.
38-
When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
37+
"""Generate partitions from a partition generator and put them in a queue.
38+
39+
When all the partitions are added to the queue, a sentinel is added to the queue to indicate
40+
that all the partitions have been generated.
3941
40-
If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
41-
main thread will have no way to know that something when wrong and will wait until the timeout is reached
42+
If an exception is encountered, the exception will be caught and put in the queue. This is
43+
very important because if we don't, the main thread will have no way to know that something
44+
went wrong and will wait until the timeout is reached.
4245
4346
This method is meant to be called in a separate thread.
4447
"""
48+
logger = logging.getLogger(f"airbyte.partition_enqueuer.{stream.name}")
49+
logger.info("Partition generation STARTED for stream=%s", stream.name)
50+
partition_count = 0
51+
start_time = time.monotonic()
4552
try:
4653
for partition in stream.generate_partitions():
54+
partition_count += 1
55+
logger.info(
56+
"Partition generation: enqueuing partition #%d for stream=%s, slice=%s",
57+
partition_count,
58+
stream.name,
59+
partition.to_slice(),
60+
)
4761
# Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that
4862
# we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the
4963
# future but we expect the delta between the max futures length and the actual to be small enough that it would not be an
@@ -58,7 +72,22 @@ def generate_partitions(self, stream: AbstractStream) -> None:
5872
while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit():
5973
time.sleep(self._sleep_time_in_seconds)
6074
self._queue.put(partition)
75+
elapsed = time.monotonic() - start_time
76+
logger.info(
77+
"Partition generation COMPLETED for stream=%s: %d partitions in %.1fs",
78+
stream.name,
79+
partition_count,
80+
elapsed,
81+
)
6182
self._queue.put(PartitionGenerationCompletedSentinel(stream))
6283
except Exception as e:
84+
elapsed = time.monotonic() - start_time
85+
logger.info(
86+
"Partition generation FAILED for stream=%s after %.1fs with %d partitions: %s",
87+
stream.name,
88+
elapsed,
89+
partition_count,
90+
str(e)[:200],
91+
)
6392
self._queue.put(StreamThreadException(e, stream.name))
6493
self._queue.put(PartitionGenerationCompletedSentinel(stream))

airbyte_cdk/sources/streams/concurrent/partition_reader.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
22

33
import logging
4+
import time
45
from queue import Queue
56
from typing import Optional
67

@@ -72,15 +73,53 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None:
7273
:param partition: The partition to read data from
7374
:return: None
7475
"""
76+
partition_start = time.monotonic()
77+
stream_name = partition.stream_name()
78+
slice_info = partition.to_slice()
79+
logger = logging.getLogger(f"airbyte.partition_reader.{stream_name}")
80+
logger.info(
81+
"Partition read STARTED for stream=%s, slice=%s",
82+
stream_name,
83+
slice_info,
84+
)
7585
try:
7686
if self._partition_logger:
7787
self._partition_logger.log(partition)
7888

89+
record_count = 0
90+
last_progress_time = partition_start
7991
for record in partition.read():
8092
self._queue.put(record)
8193
cursor.observe(record)
94+
record_count += 1
95+
now = time.monotonic()
96+
if now - last_progress_time >= 30.0:
97+
logger.info(
98+
"Partition read PROGRESS for stream=%s: %d records read so far (%.0fs elapsed), slice=%s",
99+
stream_name,
100+
record_count,
101+
now - partition_start,
102+
slice_info,
103+
)
104+
last_progress_time = now
82105
cursor.close_partition(partition)
106+
elapsed = time.monotonic() - partition_start
107+
logger.info(
108+
"Partition read COMPLETED for stream=%s: %d records in %.1fs, slice=%s",
109+
stream_name,
110+
record_count,
111+
elapsed,
112+
slice_info,
113+
)
83114
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
84115
except Exception as e:
85-
self._queue.put(StreamThreadException(e, partition.stream_name()))
116+
elapsed = time.monotonic() - partition_start
117+
logger.info(
118+
"Partition read FAILED for stream=%s after %.1fs: %s, slice=%s",
119+
stream_name,
120+
elapsed,
121+
str(e)[:200],
122+
slice_info,
123+
)
124+
self._queue.put(StreamThreadException(e, stream_name))
86125
self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))

0 commit comments

Comments
 (0)