feat(eventbus): support per-subscriber async dispatch with graceful drain#1455
feat(eventbus): support per-subscriber async dispatch with graceful drain#1455WyRainBow wants to merge 4 commits intoapache:developfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds opt-in asynchronous dispatching to the core EventBus to prevent slow subscribers from blocking the event pipeline, and wires graceful draining into runtime shutdown.
Changes:
- Introduces
AsyncSubscriberand per-subscriber runtime state with per-subscriber buffered queues + drainer goroutines. - Integrates EventBus graceful drain into runtime stop handling via
runtime.GracefulComponent. - Adds
eventBusconfiguration toAdminConfigand increases the default EventBus buffer size to1024.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/core/runtime/runtime.go | On stop, waits for GracefulComponent implementations to finish (enables graceful drains). |
| pkg/core/events/component.go | Refactors subscriber directory to track per-subscriber state; supports async enqueue + drainer lifecycle; adds graceful drain (WaitForDone). |
| pkg/core/events/async.go | Adds AsyncSubscriber interface and subscriberState for async runtime bookkeeping. |
| pkg/core/engine/subscriber/runtime_instance.go | Marks runtime instance subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/zk_metadata.go | Marks ZK metadata subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/zk_config.go | Marks ZK config subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/service_provider_metadata.go | Marks service provider metadata subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/service_consumer_metadata.go | Marks service consumer metadata subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/rpc_instance.go | Marks RPC instance subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/nacos_service.go | Marks Nacos service subscriber as async-capable (AsyncEnabled). |
| pkg/core/discovery/subscriber/instance.go | Marks instance subscriber as async-capable (AsyncEnabled). |
| pkg/config/eventbus/config.go | Raises default EventBus buffer size to 1024. |
| pkg/config/app/admin.go | Adds EventBus config field, defaults, and validation wiring. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
479c15d to
d8cf428
Compare
robocanic
left a comment
There was a problem hiding this comment.
Great Work! I've reviewd the whole PR and I left some comments, if there are problems you came into, just discuss with me.
| } | ||
|
|
||
| func (c AdminConfig) Validate() error { | ||
| func (c *AdminConfig) Validate() error { |
There was a problem hiding this comment.
这条:Validate() 保持指针接收者,FindDiscovery()/Meshes() 保持值接收者(否则会在 ctx.Config().FindDiscovery(...) 场景触发编译错误:cannot call pointer method on app.AdminConfig)这里就用会修改配置的方法用指针接收者、只读查询方法用值接收者的方式处理
|




Background
EventBus previously dispatched events synchronously. Slow subscribers could block
Send()and impact the whole event pipeline.What Changed
AsyncSubscriber.AdminConfig1024Core Files
pkg/core/events/async.gopkg/core/events/component.gopkg/config/app/admin.gopkg/config/eventbus/config.goBehavior Summary
select+default), drop with warn when channel is full.ProcessEventbehavior.Notes