Skip to content

fix(stream-bridge): guarantee END sentinel delivery to prevent resource leaks#1792

Closed
JasonOA888 wants to merge 1 commit into
bytedance:mainfrom
JasonOA888:fix/issue-1689-stream-bridge-end-sentinel
Closed

fix(stream-bridge): guarantee END sentinel delivery to prevent resource leaks#1792
JasonOA888 wants to merge 1 commit into
bytedance:mainfrom
JasonOA888:fix/issue-1689-stream-bridge-end-sentinel

Conversation

@JasonOA888
Copy link
Copy Markdown
Contributor

Fixes #1689

Problem

When the stream bridge queue is full (256 events), discards the END sentinel after a 30s timeout. Since only exits upon receiving , losing it causes:

  • SSE connections to hang indefinitely
  • and never cleaned up
  • Accumulating resource leak under sustained load (long agent runs with slow consumers)

Root Cause

used the same strategy as . But END sentinel is not a normal event — it is the only way subscribe() knows to exit.

Fix

  • : use instead of — drops events immediately when queue is full, no 30s stall per dropped event
  • : retry loop that blocks until queue has space, with periodic warning logs every 10 seconds. END sentinel is never dropped.

Testing

  • Syntax validated via ast.parse
  • Single file change, no new dependencies

…ce leaks

When the stream bridge queue is full, publish_end() could discard the
END sentinel after a 30s timeout. Since subscribe() only exits when it
receives END_SENTINEL, losing it causes:
- SSE connections to hang indefinitely
- _queues[run_id] and _counters[run_id] to never be cleaned up
- Accumulating resource leak under sustained load

Changes:
- publish(): use put_nowait() instead of wait_for() — drops events
  immediately when queue is full (no 30s stall per dropped event)
- publish_end(): retry loop that blocks until queue has space, with
  periodic warning logs. END sentinel is NEVER dropped.

Fixes bytedance#1689
@JasonOA888
Copy link
Copy Markdown
Contributor Author

Closing this PR — the code it modifies no longer exists.

The original StreamBridge used asyncio.Queue with a put(timeout=30s) strategy that could drop the END sentinel. This was fixed by #1695 (merged), and then the entire StreamBridge was rewritten in #1403 to use a list + asyncio.Condition architecture (MemoryStreamBridge) that does not have this issue.

The current publish_end sets stream.ended = True and notifies waiters — there is no queue to fill, no timeout to expire, and no sentinel to drop.

@JasonOA888 JasonOA888 closed this Apr 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

# BUG-002: Stream Bridge 队列满时静默丢事件,END 哨兵可能丢失

1 participant