diff --git a/pkg/config/app/admin.go b/pkg/config/app/admin.go index ee943a668..f07daf6de 100644 --- a/pkg/config/app/admin.go +++ b/pkg/config/app/admin.go @@ -27,6 +27,7 @@ import ( "github.com/apache/dubbo-admin/pkg/config/diagnostics" "github.com/apache/dubbo-admin/pkg/config/discovery" "github.com/apache/dubbo-admin/pkg/config/engine" + "github.com/apache/dubbo-admin/pkg/config/eventbus" "github.com/apache/dubbo-admin/pkg/config/log" "github.com/apache/dubbo-admin/pkg/config/observability" "github.com/apache/dubbo-admin/pkg/config/store" @@ -48,11 +49,14 @@ type AdminConfig struct { Discovery []*discovery.Config `json:"discovery" yaml:"discovery"` // Engine configuration Engine *engine.Config `json:"engine" yaml:"engine"` + // EventBus configuration + EventBus *eventbus.Config `json:"eventBus,omitempty" yaml:"eventBus,omitempty"` } var _ = &AdminConfig{} var DefaultAdminConfig = func() AdminConfig { + eventBusCfg := eventbus.Default() return AdminConfig{ Log: log.DefaultLogConfig(), Store: store.DefaultStoreConfig(), @@ -60,10 +64,11 @@ var DefaultAdminConfig = func() AdminConfig { Observability: observability.DefaultObservabilityConfig(), Diagnostics: diagnostics.DefaultDiagnosticsConfig(), Console: console.DefaultConsoleConfig(), + EventBus: &eventBusCfg, } } -func (c AdminConfig) Sanitize() { +func (c *AdminConfig) Sanitize() { c.Engine.Sanitize() for _, d := range c.Discovery { d.Sanitize() @@ -75,7 +80,7 @@ func (c AdminConfig) Sanitize() { c.Log.Sanitize() } -func (c AdminConfig) PreProcess() error { +func (c *AdminConfig) PreProcess() error { discoveryPreProcess := func() error { for _, d := range c.Discovery { if err := d.PreProcess(); err != nil { @@ -95,7 +100,7 @@ func (c AdminConfig) PreProcess() error { ) } -func (c AdminConfig) PostProcess() error { +func (c *AdminConfig) PostProcess() error { discoveryPostProcess := func() error { for _, d := range c.Discovery { if err := d.PostProcess(); err != nil { @@ -115,7 +120,7 @@ func (c AdminConfig) PostProcess() error { ) } -func (c AdminConfig) Validate() error { +func (c *AdminConfig) Validate() error { if c.Log == nil { c.Log = log.DefaultLogConfig() } else if err := c.Log.Validate(); err != nil { @@ -160,6 +165,12 @@ func (c AdminConfig) Validate() error { } else if err := c.Engine.Validate(); err != nil { return bizerror.Wrap(err, bizerror.ConfigError, "engine config validation failed") } + if c.EventBus == nil { + cfg := eventbus.Default() + c.EventBus = &cfg + } else if err := c.EventBus.Validate(); err != nil { + return bizerror.Wrap(err, bizerror.ConfigError, "event bus config validation failed") + } return nil } diff --git a/pkg/config/eventbus/config.go b/pkg/config/eventbus/config.go index 4cb84b319..4c912ed35 100644 --- a/pkg/config/eventbus/config.go +++ b/pkg/config/eventbus/config.go @@ -29,6 +29,6 @@ func (c Config) Validate() error { func Default() Config { return Config{ - BufferSize: 100, + BufferSize: 1024, } } diff --git a/pkg/console/counter/manager.go b/pkg/console/counter/manager.go index 0c8fc0c09..0cef62e8c 100644 --- a/pkg/console/counter/manager.go +++ b/pkg/console/counter/manager.go @@ -355,6 +355,10 @@ func (s *counterEventSubscriber) Name() string { return s.name } +func (s *counterEventSubscriber) AsyncEnabled() bool { + return false +} + func (s *counterEventSubscriber) ProcessEvent(event events.Event) error { if s.handler == nil { return nil diff --git a/pkg/core/discovery/subscriber/instance.go b/pkg/core/discovery/subscriber/instance.go index bb60fb8ea..529e0465e 100644 --- a/pkg/core/discovery/subscriber/instance.go +++ b/pkg/core/discovery/subscriber/instance.go @@ -55,6 +55,10 @@ func (s *InstanceEventSubscriber) Name() string { return "Discovery-" + s.ResourceKind().ToString() } +func (s *InstanceEventSubscriber) AsyncEnabled() bool { + return true +} + func (s *InstanceEventSubscriber) ProcessEvent(event events.Event) error { newObj, ok := event.NewObj().(*meshresource.InstanceResource) if !ok && event.NewObj() != nil { diff --git a/pkg/core/discovery/subscriber/nacos_service.go b/pkg/core/discovery/subscriber/nacos_service.go index 07c23f108..4d8495cec 100644 --- a/pkg/core/discovery/subscriber/nacos_service.go +++ b/pkg/core/discovery/subscriber/nacos_service.go @@ -57,6 +57,10 @@ func (n *NacosServiceEventSubscriber) Name() string { return "Nacos2Discovery-" + n.ResourceKind().ToString() } +func (n *NacosServiceEventSubscriber) AsyncEnabled() bool { + return true +} + func (n *NacosServiceEventSubscriber) ProcessEvent(event events.Event) error { newObj, ok := event.NewObj().(*meshresource.NacosServiceResource) if !ok && event.NewObj() != nil { diff --git a/pkg/core/discovery/subscriber/rpc_instance.go b/pkg/core/discovery/subscriber/rpc_instance.go index 4879d1b8f..411f454d5 100644 --- a/pkg/core/discovery/subscriber/rpc_instance.go +++ b/pkg/core/discovery/subscriber/rpc_instance.go @@ -63,6 +63,10 @@ func (s *RPCInstanceEventSubscriber) ResourceKind() coremodel.ResourceKind { return meshresource.RPCInstanceKind } +func (s *RPCInstanceEventSubscriber) AsyncEnabled() bool { + return true +} + func (s *RPCInstanceEventSubscriber) ProcessEvent(event events.Event) error { newObj, ok := event.NewObj().(*meshresource.RPCInstanceResource) if !ok && event.NewObj() != nil { diff --git a/pkg/core/discovery/subscriber/service_consumer_metadata.go b/pkg/core/discovery/subscriber/service_consumer_metadata.go index 21e70e14a..042728743 100644 --- a/pkg/core/discovery/subscriber/service_consumer_metadata.go +++ b/pkg/core/discovery/subscriber/service_consumer_metadata.go @@ -53,6 +53,10 @@ func (s *ServiceConsumerMetadataEventSubscriber) Name() string { return "Discovery-" + s.ResourceKind().ToString() } +func (s *ServiceConsumerMetadataEventSubscriber) AsyncEnabled() bool { + return true +} + func (s *ServiceConsumerMetadataEventSubscriber) ProcessEvent(event events.Event) error { newObj, ok := event.NewObj().(*meshresource.ServiceConsumerMetadataResource) if !ok && event.NewObj() != nil { diff --git a/pkg/core/discovery/subscriber/service_provider_metadata.go b/pkg/core/discovery/subscriber/service_provider_metadata.go index 3c508e486..77ebe7342 100644 --- a/pkg/core/discovery/subscriber/service_provider_metadata.go +++ b/pkg/core/discovery/subscriber/service_provider_metadata.go @@ -53,6 +53,10 @@ func (s *ServiceProviderMetadataEventSubscriber) Name() string { return "Discovery-" + s.ResourceKind().ToString() } +func (s *ServiceProviderMetadataEventSubscriber) AsyncEnabled() bool { + return true +} + func (s *ServiceProviderMetadataEventSubscriber) ProcessEvent(event events.Event) error { newObj, ok := event.NewObj().(*meshresource.ServiceProviderMetadataResource) if !ok && event.NewObj() != nil { diff --git a/pkg/core/discovery/subscriber/zk_config.go b/pkg/core/discovery/subscriber/zk_config.go index 71598682f..07f9a2620 100644 --- a/pkg/core/discovery/subscriber/zk_config.go +++ b/pkg/core/discovery/subscriber/zk_config.go @@ -53,6 +53,10 @@ func (z *ZKConfigEventSubscriber) Name() string { return "Discovery-" + z.ResourceKind().ToString() } +func (z *ZKConfigEventSubscriber) AsyncEnabled() bool { + return true +} + func (z *ZKConfigEventSubscriber) ProcessEvent(event events.Event) error { newObj, ok := event.NewObj().(*meshresource.ZKConfigResource) if !ok && event.NewObj() != nil { diff --git a/pkg/core/discovery/subscriber/zk_metadata.go b/pkg/core/discovery/subscriber/zk_metadata.go index 784b61f43..4a744119e 100644 --- a/pkg/core/discovery/subscriber/zk_metadata.go +++ b/pkg/core/discovery/subscriber/zk_metadata.go @@ -53,6 +53,10 @@ func (z *ZKMetadataEventSubscriber) Name() string { return "Discovery-" + z.ResourceKind().ToString() } +func (z *ZKMetadataEventSubscriber) AsyncEnabled() bool { + return true +} + func (z *ZKMetadataEventSubscriber) ProcessEvent(event events.Event) error { newObj, ok := event.NewObj().(*meshresource.ZKMetadataResource) if !ok && event.NewObj() != nil { diff --git a/pkg/core/engine/subscriber/runtime_instance.go b/pkg/core/engine/subscriber/runtime_instance.go index b39a09888..f4325e229 100644 --- a/pkg/core/engine/subscriber/runtime_instance.go +++ b/pkg/core/engine/subscriber/runtime_instance.go @@ -56,6 +56,10 @@ func (s *RuntimeInstanceEventSubscriber) Name() string { return "Engine-" + s.ResourceKind().ToString() } +func (s *RuntimeInstanceEventSubscriber) AsyncEnabled() bool { + return true +} + func (s *RuntimeInstanceEventSubscriber) ProcessEvent(event events.Event) error { newObj, ok := event.NewObj().(*meshresource.RuntimeInstanceResource) if !ok && event.NewObj() != nil { diff --git a/pkg/core/events/async.go b/pkg/core/events/async.go new file mode 100644 index 000000000..9364aa414 --- /dev/null +++ b/pkg/core/events/async.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package events + +import "sync/atomic" + +type subscriberState struct { + subscriber Subscriber + async bool + ch chan Event + done chan struct{} + closed atomic.Bool + drainerStarted atomic.Bool +} diff --git a/pkg/core/events/component.go b/pkg/core/events/component.go index dc2b6db70..7592dbaa6 100644 --- a/pkg/core/events/component.go +++ b/pkg/core/events/component.go @@ -21,7 +21,9 @@ import ( "fmt" "math" "sync" + "sync/atomic" + eventbusconfig "github.com/apache/dubbo-admin/pkg/config/eventbus" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/resource/model" "github.com/apache/dubbo-admin/pkg/core/runtime" @@ -37,10 +39,15 @@ type EventBusComponent interface { } var _ EventBusComponent = &eventBus{} +var _ runtime.GracefulComponent = &eventBus{} type eventBus struct { rwMutex sync.RWMutex - subscriberDir map[model.ResourceKind]Subscribers + subscriberDir map[model.ResourceKind][]*subscriberState + bufferSize int + started atomic.Bool + stopped atomic.Bool + wg sync.WaitGroup } func (b *eventBus) RequiredDependencies() []runtime.ComponentType { @@ -55,12 +62,29 @@ func (b *eventBus) Order() int { return math.MaxInt } -func (b *eventBus) Init(_ runtime.BuilderContext) error { - b.subscriberDir = make(map[model.ResourceKind]Subscribers) +func (b *eventBus) Init(ctx runtime.BuilderContext) error { + b.subscriberDir = make(map[model.ResourceKind][]*subscriberState) + if cfg := ctx.Config().EventBus; cfg != nil { + b.bufferSize = int(cfg.BufferSize) + } else { + b.bufferSize = int(eventbusconfig.Default().BufferSize) + } return nil } func (b *eventBus) Start(_ runtime.Runtime, _ <-chan struct{}) error { + if !b.started.CompareAndSwap(false, true) { + return fmt.Errorf("eventBus already started") + } + b.rwMutex.RLock() + for _, states := range b.subscriberDir { + for _, st := range states { + if st.async { + b.launchDrainer(st) + } + } + } + b.rwMutex.RUnlock() return nil } @@ -68,39 +92,82 @@ func (b *eventBus) Start(_ runtime.Runtime, _ <-chan struct{}) error { func (b *eventBus) Subscribe(subscriber Subscriber) error { b.rwMutex.Lock() defer b.rwMutex.Unlock() - subs, exists := b.subscriberDir[subscriber.ResourceKind()] + if b.stopped.Load() { + return fmt.Errorf("eventBus already stopped") + } + rk := subscriber.ResourceKind() + states, exists := b.subscriberDir[rk] if !exists { - subs = make(Subscribers, 0) + states = make([]*subscriberState, 0) } // check name if is unique - for _, sub := range subs { - if sub.Name() == subscriber.Name() { + for _, st := range states { + if st.subscriber.Name() == subscriber.Name() { return fmt.Errorf("duplicated subscriber name %s, skipped subscribing", subscriber.Name()) } } - b.subscriberDir[subscriber.ResourceKind()] = append(subs, subscriber) + isAsync := false + if b.bufferSize > 0 { + if subscriber.AsyncEnabled() { + isAsync = true + } + } + state := &subscriberState{ + subscriber: subscriber, + async: isAsync, + } + if isAsync { + state.ch = make(chan Event, b.bufferSize) + state.done = make(chan struct{}) + } + b.subscriberDir[rk] = append(states, state) + if isAsync && b.started.Load() { + b.launchDrainer(state) + } return nil } func (b *eventBus) Unsubscribe(subscriber Subscriber) error { + var asyncState *subscriberState + var drainerRunning bool + b.rwMutex.Lock() - defer b.rwMutex.Unlock() rk := subscriber.ResourceKind() name := subscriber.Name() - subs, exists := b.subscriberDir[rk] + states, exists := b.subscriberDir[rk] if !exists { + b.rwMutex.Unlock() return fmt.Errorf("no subscriber for resource %s, skipped unsubscribing", rk) } - for i, sub := range subs { - if sub.Name() == name { - b.subscriberDir[rk] = append(subs[:i], subs[i+1:]...) - return nil + found := false + for i, st := range states { + if st.subscriber.Name() == name { + b.subscriberDir[rk] = append(states[:i], states[i+1:]...) + if st.async && st.ch != nil { + st.closed.Store(true) + close(st.ch) + asyncState = st + drainerRunning = st.drainerStarted.Load() + } + found = true + break } } - return fmt.Errorf("no subscriber named %s for resource %s, skipped unsubscribing", name, rk) + b.rwMutex.Unlock() + if !found { + return fmt.Errorf("no subscriber named %s for resource %s, skipped unsubscribing", name, rk) + } + if asyncState != nil && drainerRunning { + <-asyncState.done + } + return nil } func (b *eventBus) Send(event Event) { + if b.stopped.Load() { + return + } + b.rwMutex.RLock() defer b.rwMutex.RUnlock() var rk model.ResourceKind @@ -109,15 +176,65 @@ func (b *eventBus) Send(event Event) { } else if event.OldObj() != nil { rk = event.OldObj().ResourceKind() } - subs, exists := b.subscriberDir[rk] + states, exists := b.subscriberDir[rk] if !exists { logger.Infof("no subscriber for resource %s, skipped sending event%v", rk, event) return } - for _, sub := range subs { - // TODO Do we need to support reprocess - if err := sub.ProcessEvent(event); err != nil { - logger.Errorf("failed to process event in %s, cause: %s, event: %v", sub.Name(), err.Error(), event) + for _, st := range states { + if st.async && !st.closed.Load() { + select { + case st.ch <- event: + default: + logger.Warnf("async subscriber %s channel full (cap=%d), event dropped: %v", + st.subscriber.Name(), cap(st.ch), event) + } + continue + } + if !st.async { + // TODO Do we need to support reprocess + if err := st.subscriber.ProcessEvent(event); err != nil { + logger.Errorf("failed to process event in %s, cause: %s, event: %v", + st.subscriber.Name(), err.Error(), event) + } + } + } +} + +func (b *eventBus) WaitForDone() { + b.stopped.Store(true) + + b.rwMutex.Lock() + for _, states := range b.subscriberDir { + for _, st := range states { + if st.async && st.ch != nil && !st.closed.Load() { + st.closed.Store(true) + close(st.ch) + } } } + b.rwMutex.Unlock() + + b.wg.Wait() +} + +func (b *eventBus) launchDrainer(st *subscriberState) { + if b.stopped.Load() { + return + } + if !st.drainerStarted.CompareAndSwap(false, true) { + return + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + defer close(st.done) + for event := range st.ch { + if err := st.subscriber.ProcessEvent(event); err != nil { + logger.Errorf("async: failed to process event in %s, cause: %s, event: %v", + st.subscriber.Name(), err.Error(), event) + } + } + }() } diff --git a/pkg/core/events/eventbus.go b/pkg/core/events/eventbus.go index adec15ca7..51393c4d2 100644 --- a/pkg/core/events/eventbus.go +++ b/pkg/core/events/eventbus.go @@ -45,6 +45,16 @@ type Subscriber interface { Name() string ProcessEvent(event Event) error + + // AsyncEnabled controls whether this subscriber should be dispatched asynchronously. + // + // Return true when the subscriber may execute relatively slow logic and should not + // block EventBus.Send(). In async mode, EventBus enqueues events into a buffered + // channel and processes them in a dedicated drainer goroutine. + // + // Async dispatch is best-effort: when the subscriber queue is full, new events are + // dropped with a warning log. + AsyncEnabled() bool } type Subscribers []Subscriber diff --git a/pkg/core/runtime/runtime.go b/pkg/core/runtime/runtime.go index 260451ec5..3c4c589c1 100644 --- a/pkg/core/runtime/runtime.go +++ b/pkg/core/runtime/runtime.go @@ -136,6 +136,11 @@ func (rt *runtime) Start(stop <-chan struct{}) error { logger.Info("Admin started successfully") select { case <-stop: + for _, com := range components { + if gc, ok := com.(GracefulComponent); ok { + gc.WaitForDone() + } + } return nil } }