Skip to content

Commit 33ff81f

Browse files
fix: add watchdog thread to detect stdout backpressure deadlocks
When the platform stops reading from the source container's stdout/stderr pipes (e.g. destination backpressure), all threads block on I/O and no in-process timeout can fire. The watchdog monitors main-thread progress and calls os._exit(1) after 10 minutes of no activity. Also fixes exception handlers in PartitionReader and PartitionEnqueuer that used blocking queue.put() without timeout, which would deadlock when the queue is full. Co-Authored-By: unknown <>
1 parent d8065e5 commit 33ff81f

3 files changed

Lines changed: 91 additions & 24 deletions

File tree

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 79 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import concurrent
66
import logging
7+
import os
8+
import sys
79
import threading
810
import time
911
from queue import Empty, Queue
@@ -38,6 +40,11 @@ class ConcurrentSource:
3840
"""
3941

4042
DEFAULT_TIMEOUT_SECONDS = 900
43+
# If the main thread makes no progress for this long, the watchdog
44+
# terminates the process. This breaks deadlocks caused by stdout/stderr
45+
# pipe blockage where no in-process timeout can fire because I/O itself
46+
# is blocked at the OS level.
47+
_WATCHDOG_TIMEOUT_SECONDS = 600.0 # 10 minutes
4148

4249
@staticmethod
4350
def create(
@@ -108,29 +115,44 @@ def read(
108115
streams: List[AbstractStream],
109116
) -> Iterator[AirbyteMessage]:
110117
self._logger.info("Starting syncing")
111-
concurrent_stream_processor = ConcurrentReadProcessor(
112-
streams,
113-
PartitionEnqueuer(self._queue, self._threadpool),
114-
self._threadpool,
115-
self._logger,
116-
self._slice_logger,
117-
self._message_repository,
118-
PartitionReader(
119-
self._queue,
120-
PartitionLogger(self._slice_logger, self._logger, self._message_repository),
121-
),
118+
# Shared timestamp updated every time the main thread makes progress
119+
# (consumes an item from the queue). The watchdog reads this to
120+
# detect when the main thread is stuck.
121+
self._last_progress_time = time.monotonic()
122+
self._watchdog_should_run = True
123+
watchdog = threading.Thread(
124+
target=self._watchdog_loop,
125+
daemon=True,
126+
name="progress-watchdog",
122127
)
128+
watchdog.start()
129+
130+
try:
131+
concurrent_stream_processor = ConcurrentReadProcessor(
132+
streams,
133+
PartitionEnqueuer(self._queue, self._threadpool),
134+
self._threadpool,
135+
self._logger,
136+
self._slice_logger,
137+
self._message_repository,
138+
PartitionReader(
139+
self._queue,
140+
PartitionLogger(self._slice_logger, self._logger, self._message_repository),
141+
),
142+
)
123143

124-
# Enqueue initial partition generation tasks
125-
yield from self._submit_initial_partition_generators(concurrent_stream_processor)
144+
# Enqueue initial partition generation tasks
145+
yield from self._submit_initial_partition_generators(concurrent_stream_processor)
126146

127-
# Read from the queue until all partitions were generated and read
128-
yield from self._consume_from_queue(
129-
self._queue,
130-
concurrent_stream_processor,
131-
)
132-
self._threadpool.check_for_errors_and_shutdown()
133-
self._logger.info("Finished syncing")
147+
# Read from the queue until all partitions were generated and read
148+
yield from self._consume_from_queue(
149+
self._queue,
150+
concurrent_stream_processor,
151+
)
152+
self._threadpool.check_for_errors_and_shutdown()
153+
self._logger.info("Finished syncing")
154+
finally:
155+
self._watchdog_should_run = False
134156

135157
def _submit_initial_partition_generators(
136158
self, concurrent_stream_processor: ConcurrentReadProcessor
@@ -179,6 +201,7 @@ def _consume_from_queue(
179201
type(airbyte_message_or_record_or_exception).__name__,
180202
)
181203
items_since_last_heartbeat = 0
204+
self._last_progress_time = now
182205
last_item_time = now
183206

184207
yield from self._handle_item(
@@ -192,6 +215,42 @@ def _consume_from_queue(
192215
# all partitions were generated and processed. we're done here
193216
break
194217

218+
def _watchdog_loop(self) -> None:
219+
"""Daemon thread that terminates the process when the main thread stalls.
220+
221+
In Airbyte Cloud the source container's stdout and stderr are read by
222+
the platform (replication-orchestrator). If the platform stops reading
223+
(e.g. destination backpressure), both pipes fill up and *all* threads
224+
block on I/O — including the main thread's ``yield`` and every worker
225+
thread's ``logger.*()`` call. No in-process timeout can fire because
226+
the timeout's own log/write call also blocks.
227+
228+
This watchdog does **not** perform any I/O. It simply checks a shared
229+
monotonic timestamp that the main thread updates whenever it consumes a
230+
queue item. If no progress is observed for ``_WATCHDOG_TIMEOUT_SECONDS``,
231+
it calls ``os._exit(1)`` which is a raw syscall that terminates the
232+
process immediately regardless of I/O state.
233+
"""
234+
while self._watchdog_should_run:
235+
time.sleep(30) # check every 30 seconds
236+
if not self._watchdog_should_run:
237+
return
238+
elapsed = time.monotonic() - self._last_progress_time
239+
if elapsed >= self._WATCHDOG_TIMEOUT_SECONDS:
240+
# Write directly to stderr fd to bypass Python buffering
241+
# which may be blocked. This is best-effort; if the fd is
242+
# blocked the write will simply fail and we still exit.
243+
try:
244+
msg = (
245+
f"WATCHDOG: Main thread made no progress for "
246+
f"{elapsed:.0f}s (threshold={self._WATCHDOG_TIMEOUT_SECONDS:.0f}s). "
247+
f"Terminating process to prevent indefinite hang.\n"
248+
)
249+
os.write(sys.stderr.fileno(), msg.encode())
250+
except Exception:
251+
pass
252+
os._exit(1)
253+
195254
def _handle_item(
196255
self,
197256
queue_item: QueueItem,

airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,12 @@ def generate_partitions(self, stream: AbstractStream) -> None:
9696
partition_count,
9797
str(e)[:200],
9898
)
99-
self._queue.put(StreamThreadException(e, stream.name))
100-
self._queue.put(PartitionGenerationCompletedSentinel(stream))
99+
self._put_with_timeout(StreamThreadException(e, stream.name), stream.name, logger)
100+
self._put_with_timeout(
101+
PartitionGenerationCompletedSentinel(stream),
102+
stream.name,
103+
logger,
104+
)
101105

102106
def _put_with_timeout(
103107
self,

airbyte_cdk/sources/streams/concurrent/partition_reader.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,12 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None:
129129
str(e)[:200],
130130
slice_info,
131131
)
132-
self._queue.put(StreamThreadException(e, stream_name))
133-
self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL))
132+
self._put_with_timeout(StreamThreadException(e, stream_name), stream_name, logger)
133+
self._put_with_timeout(
134+
PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL),
135+
stream_name,
136+
logger,
137+
)
134138

135139
def _put_with_timeout(
136140
self,

0 commit comments

Comments
 (0)