Skip to content

feat: add stderr heartbeat with deadlock diagnosis and prevention#953

Draft
devin-ai-integration[bot] wants to merge 14 commits intomainfrom
devin/1773412868-nonblocking-stdout
Draft

feat: add stderr heartbeat with deadlock diagnosis and prevention#953
devin-ai-integration[bot] wants to merge 14 commits intomainfrom
devin/1773412868-nonblocking-stdout

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Mar 13, 2026

feat: add stderr heartbeat with deadlock diagnosis and prevention

Summary

Adds a lightweight background heartbeat thread to launch() that writes periodic diagnostic status to stderr (fd 2) every 30 seconds, plus a fix for a confirmed deadlock in ConcurrentMessageRepository where the main thread (sole queue consumer) could block on queue.put() when the queue was full.

Purpose: In Airbyte Cloud, the orchestrator controls reading from the source container's stdout pipe. When it pauses reading, print() blocks in the kernel. We need to prove whether the platform ever resumes reading or permanently stops. Additionally, when using concurrent sources with multiple workers, syncs can stall due to deadlocks between the worker threads, the shared queue, and the main thread. Since stderr is collected independently by the Kubernetes container runtime, heartbeat lines will appear in logs even when stdout is blocked.

Heartbeat & Diagnostics (entrypoint.py, queue_registry.py, concurrent_source.py)

Heartbeat output (normal):

STDOUT_HEARTBEAT: t=30s msgs=1042 bytes=5242880 print_blocked=NO queue_size=42 queue_full=False
STDOUT_HEARTBEAT: t=60s msgs=1042 bytes=5242880 print_blocked=YES blocked_since=28s queue_size=10000 queue_full=True

Heartbeat output (stall detected — after 90s of frozen message count):

STDOUT_HEARTBEAT: t=120s msgs=1042 bytes=5242880 print_blocked=NO queue_size=0 queue_full=False
=== THREAD DUMP (stall detected) ===

Thread workerpool_0 (140234567890):
  File ".../http_client.py", line 328, in _send
    response = self._session.send(request, **request_kwargs)
...
=== END THREAD DUMP ===

The queue stats instantly reveal the deadlock type:

  • queue_size=0 + msgs frozen → workers stuck on HTTP/IO (all threads blocked on API calls)
  • queue_size=10000 + print_blocked=YES → classic pipe deadlock (stdout backpressure)
  • queue_size=10000 + print_blocked=NO → main thread not consuming (the deadlock this PR fixes)

What changed for diagnostics:

  • Added a daemon thread (stdout-heartbeat) that writes to fd 2 every 30s
  • Wrapped the existing print() loop with print_blocked / messages_written / bytes_written tracking
  • Added try/finally to signal the heartbeat thread to stop on exit
  • No changes to the stdout write path itself — still uses print() with PRINT_BUFFER
  • Added queue_size and queue_full fields to heartbeat output when a concurrent source is active
  • Added automatic thread dump via sys._current_frames() when a stall is detected (message count frozen for 3+ intervals = 90s), repeated every ~5 min during ongoing stalls
  • Added queue_registry.py module so the heartbeat thread can access the ConcurrentSource queue without threading it through the call chain. ConcurrentSource.read() registers/unregisters the queue via try/finally.

Deadlock Fix (concurrent_repository.py)

Root cause (confirmed via thread dump from production sync): The main thread is the sole consumer of the shared queue. Three code paths in _handle_item cause it to call ConcurrentMessageRepository.emit_message()queue.put():

  1. PartitionCompleteSentinel_on_stream_is_doneensure_at_least_one_state_emittedqueue.put(state)
  2. PartitionGenerationCompletedSentinel_on_stream_is_done → same path
  3. Partitionon_partitionemit_message(slice_log)queue.put(log)

If the queue is full (10,000 items from workers), the main thread blocks on put() and nobody drains the queue → deadlock.

Fix: ConcurrentMessageRepository captures the consumer thread ID at construction. When emit_message / log_message is called:

  • Main thread: uses put(block=False). On Full, buffers the message in a deque (_pending). These are drained via consume_queue(), which the main thread already calls after processing every queue item.
  • Worker threads: unchanged — blocking put() for normal backpressure.

Why ordering is preserved: _on_stream_is_done is only called when all PartitionCompleteSentinels for a stream have been processed. Since each sentinel is enqueued after all records for that partition (FIFO), all records for the stream are already consumed by the main thread before the final state is emitted.

Review & Testing Checklist for Human

  • Deadlock fix thread-ID assumption: _consumer_thread_id = threading.get_ident() is captured in __init__. This assumes ConcurrentMessageRepository is always constructed on the main (consumer) thread. Verify this holds for all instantiation paths (currently ConcurrentDeclarativeSource at line 171). If it's ever constructed on a different thread, the main thread would get blocking put() and still deadlock.
  • _pending deque is only accessed by the main thread: _pending.append() happens only when threading.get_ident() == self._consumer_thread_id (main thread). _pending.popleft() happens in consume_queue() which is called by the main thread. So no cross-thread access. Confirm no code path violates this.
  • Path B behavioral change (on_partition log): on_partition() returns None (not a generator), so when its log goes to _pending, it's drained on the next consume_queue() call (e.g., in on_record or on_partition_complete_sentinel). This may shift the log's position slightly in stdout output compared to before. Verify this is acceptable.
  • Thread safety of queue_registry global: register_queue / get_queue use a module-level global (_queue). Under CPython's GIL, simple reference assignments and reads are atomic, but if multiple ConcurrentSource instances run simultaneously, only the last one's queue would be visible. Confirm this is acceptable.
  • sys._current_frames() availability: This is a CPython implementation detail (underscore-prefixed). It has been stable for years and is present in all CPython versions Airbyte supports, but it is not guaranteed by the Python language spec.
  • End-to-end test in Cloud: Deploy a connector prerelease pinned to this CDK version against a high-throughput concurrent source (e.g., HubSpot with 10 workers). Verify:
    • STDOUT_HEARTBEAT lines appear with queue_size and queue_full fields
    • The sync completes without deadlocking (previously it would stall when a stream's final state was emitted with a full queue)
    • If a stall does occur, the === THREAD DUMP === section appears in stderr logs

Notes

Related issues:

Pre-release published: airbyte-cdk==7.13.0.post13.dev23806774478 (PyPI) and airbyte/source-declarative-manifest:7.13.0.post13.dev23806774478 (DockerHub). Testing PR: airbytehq/airbyte#75589

Deadlock confirmed in production: Thread dump from HubSpot sync (job 77457394) showed all 10 workers blocked on queue.put(record) and the main thread blocked on queue.put(state_message) inside ConcurrentMessageRepository.emit_message()cursor.ensure_at_least_one_state_emitted()_on_stream_is_done("contact_lists"). Queue was full at 10,000 items.

Link to Devin session: https://app.devin.ai/sessions/ad184113df474f0ba37ede09cdac7eaf

…sure

When the Airbyte platform pauses reading from the source container's
stdout pipe, the main thread's print() call blocks in an OS-level
write() syscall. This stalls the record queue consumer, filling the
bounded queue and blocking all worker threads — a complete deadlock.

This change replaces blocking print() with non-blocking os.write()
using select() to wait for the pipe to become writable. The main
thread stays in a Python-level retry loop instead of getting stuck
in a kernel syscall. When the platform resumes reading, select()
returns, the write completes, and the pipeline resumes automatically.

Key properties:
- Memory stays bounded (queue maxsize=10,000 unchanged)
- No deadlock (main thread never stuck in blocking syscall)
- Automatic recovery when platform resumes reading
- 600s watchdog raises RuntimeError if pipe stays blocked

Co-Authored-By: unknown <>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773412868-nonblocking-stdout#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773412868-nonblocking-stdout

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 13, 2026

PyTest Results (Fast)

3 975 tests  +41   3 964 ✅ +42   7m 9s ⏱️ ±0s
    1 suites ± 0      11 💤  -  1 
    1 files   ± 0       0 ❌ ± 0 

Results for commit b7818b0. ± Comparison against base commit 0e57414.

♻️ This comment has been updated with latest results.

…nd log restore failures

Co-Authored-By: unknown <>
@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 13, 2026

PyTest Results (Full)

3 978 tests  +41   3 966 ✅ +41   10m 39s ⏱️ -35s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit b7818b0. ± Comparison against base commit 0e57414.

♻️ This comment has been updated with latest results.

…global BlockingIOError

Setting os.set_blocking(fd, False) is a process-wide change that causes
BlockingIOError in other threads (logging via print_buffer, worker threads).
Instead, use a dedicated stdout-writer thread that does blocking os.write()
calls. If the pipe is full, only the writer thread stalls - the main thread
continues draining the record queue.

Co-Authored-By: unknown <>
…ise KeyboardInterrupt/SystemExit

Co-Authored-By: unknown <>
…ite timing

Logs when os.write() blocks for >5s (indicates platform paused reading),
and logs every 30s when write_queue is full. This will help validate
whether the platform ever resumes reading from the pipe after a pause.

Co-Authored-By: unknown <>
…ding from stdout pipe

Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
@devin-ai-integration devin-ai-integration bot changed the title feat: non-blocking stdout writes to prevent deadlock on pipe backpressure feat: add stderr heartbeat to diagnose stdout pipe blocking Mar 17, 2026
…nosis

When the heartbeat detects a stall (message count frozen for 90s), it now:
- Dumps all thread stack traces via sys._current_frames() to show exactly
  what each worker thread is doing (blocked on HTTP, queue.put, lock, etc.)
- Reports queue_size and queue_full on every heartbeat line
- Re-dumps thread stacks every ~5 min during ongoing stalls

The queue stats instantly reveal the deadlock type:
- queue_size=0 + msgs frozen → workers stuck on HTTP/IO (Scenario B)
- queue_size=10000 + print_blocked=YES → classic pipe deadlock (Scenario A)
- queue_size=10000 + print_blocked=NO → main thread not consuming (bug)

Adds a lightweight queue_registry module so the heartbeat thread in
entrypoint.py can access the ConcurrentSource queue without threading
it through the call chain.

Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
@devin-ai-integration devin-ai-integration bot changed the title feat: add stderr heartbeat to diagnose stdout pipe blocking feat: add stderr heartbeat with thread dumps and queue stats for deadlock diagnosis Mar 31, 2026
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

/prerelease

The main thread is the sole consumer of the shared queue. Three code
paths in _handle_item cause it to call queue.put() via
ConcurrentMessageRepository.emit_message():
1. PartitionCompleteSentinel → _on_stream_is_done → ensure_at_least_one_state_emitted
2. PartitionGenerationCompletedSentinel → _on_stream_is_done → same
3. Partition → on_partition → emit_message(slice_log)

If the queue is full (10,000 items from workers), the main thread
blocks on put() and nobody drains the queue → deadlock.

Fix: detect the consumer thread (main thread) via thread ID captured
at construction time. Main thread uses non-blocking put(block=False);
if Full, messages are buffered in a deque and drained via
consume_queue(), which the main thread already calls after processing
every queue item. Worker threads continue using blocking put() for
normal backpressure.

Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
@devin-ai-integration devin-ai-integration bot changed the title feat: add stderr heartbeat with thread dumps and queue stats for deadlock diagnosis feat: add stderr heartbeat with deadlock diagnosis and prevention Apr 1, 2026
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

/prerelease

@tolik0
Copy link
Copy Markdown
Contributor

Anatolii Yatsuk (tolik0) commented Apr 1, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/23855054325

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

❌ Cannot revive Devin session - the session is too old. Please start a new session instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants