feat: add stderr heartbeat with deadlock diagnosis and prevention#953
feat: add stderr heartbeat with deadlock diagnosis and prevention#953devin-ai-integration[bot] wants to merge 14 commits intomainfrom
Conversation
…sure When the Airbyte platform pauses reading from the source container's stdout pipe, the main thread's print() call blocks in an OS-level write() syscall. This stalls the record queue consumer, filling the bounded queue and blocking all worker threads — a complete deadlock. This change replaces blocking print() with non-blocking os.write() using select() to wait for the pipe to become writable. The main thread stays in a Python-level retry loop instead of getting stuck in a kernel syscall. When the platform resumes reading, select() returns, the write completes, and the pipeline resumes automatically. Key properties: - Memory stays bounded (queue maxsize=10,000 unchanged) - No deadlock (main thread never stuck in blocking syscall) - Automatic recovery when platform resumes reading - 600s watchdog raises RuntimeError if pipe stays blocked Co-Authored-By: unknown <>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773412868-nonblocking-stdout#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773412868-nonblocking-stdoutPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
…nd log restore failures Co-Authored-By: unknown <>
….stdout Co-Authored-By: unknown <>
…UFFER) Co-Authored-By: unknown <>
…INT_BUFFER wrapper Co-Authored-By: unknown <>
…global BlockingIOError Setting os.set_blocking(fd, False) is a process-wide change that causes BlockingIOError in other threads (logging via print_buffer, worker threads). Instead, use a dedicated stdout-writer thread that does blocking os.write() calls. If the pipe is full, only the writer thread stalls - the main thread continues draining the record queue. Co-Authored-By: unknown <>
…ise KeyboardInterrupt/SystemExit Co-Authored-By: unknown <>
…ite timing Logs when os.write() blocks for >5s (indicates platform paused reading), and logs every 30s when write_queue is full. This will help validate whether the platform ever resumes reading from the pipe after a pause. Co-Authored-By: unknown <>
…ding from stdout pipe Co-Authored-By: unknown <>
…/queue/watchdog) Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
…nosis When the heartbeat detects a stall (message count frozen for 90s), it now: - Dumps all thread stack traces via sys._current_frames() to show exactly what each worker thread is doing (blocked on HTTP, queue.put, lock, etc.) - Reports queue_size and queue_full on every heartbeat line - Re-dumps thread stacks every ~5 min during ongoing stalls The queue stats instantly reveal the deadlock type: - queue_size=0 + msgs frozen → workers stuck on HTTP/IO (Scenario B) - queue_size=10000 + print_blocked=YES → classic pipe deadlock (Scenario A) - queue_size=10000 + print_blocked=NO → main thread not consuming (bug) Adds a lightweight queue_registry module so the heartbeat thread in entrypoint.py can access the ConcurrentSource queue without threading it through the call chain. Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
|
/prerelease |
The main thread is the sole consumer of the shared queue. Three code paths in _handle_item cause it to call queue.put() via ConcurrentMessageRepository.emit_message(): 1. PartitionCompleteSentinel → _on_stream_is_done → ensure_at_least_one_state_emitted 2. PartitionGenerationCompletedSentinel → _on_stream_is_done → same 3. Partition → on_partition → emit_message(slice_log) If the queue is full (10,000 items from workers), the main thread blocks on put() and nobody drains the queue → deadlock. Fix: detect the consumer thread (main thread) via thread ID captured at construction time. Main thread uses non-blocking put(block=False); if Full, messages are buffered in a deque and drained via consume_queue(), which the main thread already calls after processing every queue item. Worker threads continue using blocking put() for normal backpressure. Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
|
/prerelease |
|
/prerelease
|
|
❌ Cannot revive Devin session - the session is too old. Please start a new session instead. |
feat: add stderr heartbeat with deadlock diagnosis and prevention
Summary
Adds a lightweight background heartbeat thread to
launch()that writes periodic diagnostic status to stderr (fd 2) every 30 seconds, plus a fix for a confirmed deadlock inConcurrentMessageRepositorywhere the main thread (sole queue consumer) could block onqueue.put()when the queue was full.Purpose: In Airbyte Cloud, the orchestrator controls reading from the source container's stdout pipe. When it pauses reading,
print()blocks in the kernel. We need to prove whether the platform ever resumes reading or permanently stops. Additionally, when using concurrent sources with multiple workers, syncs can stall due to deadlocks between the worker threads, the shared queue, and the main thread. Since stderr is collected independently by the Kubernetes container runtime, heartbeat lines will appear in logs even when stdout is blocked.Heartbeat & Diagnostics (entrypoint.py, queue_registry.py, concurrent_source.py)
Heartbeat output (normal):
Heartbeat output (stall detected — after 90s of frozen message count):
The queue stats instantly reveal the deadlock type:
queue_size=0+msgs frozen→ workers stuck on HTTP/IO (all threads blocked on API calls)queue_size=10000+print_blocked=YES→ classic pipe deadlock (stdout backpressure)queue_size=10000+print_blocked=NO→ main thread not consuming (the deadlock this PR fixes)What changed for diagnostics:
stdout-heartbeat) that writes to fd 2 every 30sprint()loop withprint_blocked/messages_written/bytes_writtentrackingtry/finallyto signal the heartbeat thread to stop on exitprint()withPRINT_BUFFERqueue_sizeandqueue_fullfields to heartbeat output when a concurrent source is activesys._current_frames()when a stall is detected (message count frozen for 3+ intervals = 90s), repeated every ~5 min during ongoing stallsqueue_registry.pymodule so the heartbeat thread can access theConcurrentSourcequeue without threading it through the call chain.ConcurrentSource.read()registers/unregisters the queue viatry/finally.Deadlock Fix (concurrent_repository.py)
Root cause (confirmed via thread dump from production sync): The main thread is the sole consumer of the shared queue. Three code paths in
_handle_itemcause it to callConcurrentMessageRepository.emit_message()→queue.put():PartitionCompleteSentinel→_on_stream_is_done→ensure_at_least_one_state_emitted→queue.put(state)PartitionGenerationCompletedSentinel→_on_stream_is_done→ same pathPartition→on_partition→emit_message(slice_log)→queue.put(log)If the queue is full (10,000 items from workers), the main thread blocks on
put()and nobody drains the queue → deadlock.Fix:
ConcurrentMessageRepositorycaptures the consumer thread ID at construction. Whenemit_message/log_messageis called:put(block=False). OnFull, buffers the message in adeque(_pending). These are drained viaconsume_queue(), which the main thread already calls after processing every queue item.put()for normal backpressure.Why ordering is preserved:
_on_stream_is_doneis only called when allPartitionCompleteSentinels for a stream have been processed. Since each sentinel is enqueued after all records for that partition (FIFO), all records for the stream are already consumed by the main thread before the final state is emitted.Review & Testing Checklist for Human
_consumer_thread_id = threading.get_ident()is captured in__init__. This assumesConcurrentMessageRepositoryis always constructed on the main (consumer) thread. Verify this holds for all instantiation paths (currentlyConcurrentDeclarativeSourceat line 171). If it's ever constructed on a different thread, the main thread would get blockingput()and still deadlock._pendingdeque is only accessed by the main thread:_pending.append()happens only whenthreading.get_ident() == self._consumer_thread_id(main thread)._pending.popleft()happens inconsume_queue()which is called by the main thread. So no cross-thread access. Confirm no code path violates this.on_partitionlog):on_partition()returnsNone(not a generator), so when its log goes to_pending, it's drained on the nextconsume_queue()call (e.g., inon_recordoron_partition_complete_sentinel). This may shift the log's position slightly in stdout output compared to before. Verify this is acceptable.queue_registryglobal:register_queue/get_queueuse a module-level global (_queue). Under CPython's GIL, simple reference assignments and reads are atomic, but if multipleConcurrentSourceinstances run simultaneously, only the last one's queue would be visible. Confirm this is acceptable.sys._current_frames()availability: This is a CPython implementation detail (underscore-prefixed). It has been stable for years and is present in all CPython versions Airbyte supports, but it is not guaranteed by the Python language spec.STDOUT_HEARTBEATlines appear withqueue_sizeandqueue_fullfields=== THREAD DUMP ===section appears in stderr logsNotes
Related issues:
Pre-release published:
airbyte-cdk==7.13.0.post13.dev23806774478(PyPI) andairbyte/source-declarative-manifest:7.13.0.post13.dev23806774478(DockerHub). Testing PR: airbytehq/airbyte#75589Deadlock confirmed in production: Thread dump from HubSpot sync (job 77457394) showed all 10 workers blocked on
queue.put(record)and the main thread blocked onqueue.put(state_message)insideConcurrentMessageRepository.emit_message()→cursor.ensure_at_least_one_state_emitted()→_on_stream_is_done("contact_lists"). Queue was full at 10,000 items.Link to Devin session: https://app.devin.ai/sessions/ad184113df474f0ba37ede09cdac7eaf