diff --git a/event-gateway/gateway-runtime/go.mod b/event-gateway/gateway-runtime/go.mod index fe2f29b4d..766a19f06 100644 --- a/event-gateway/gateway-runtime/go.mod +++ b/event-gateway/gateway-runtime/go.mod @@ -10,6 +10,7 @@ require ( github.com/knadh/koanf/providers/env v1.1.0 github.com/knadh/koanf/providers/file v1.2.1 github.com/knadh/koanf/v2 v2.3.0 + github.com/stretchr/testify v1.11.1 github.com/twmb/franz-go v1.18.1 github.com/twmb/franz-go/pkg/kadm v1.14.0 github.com/wso2/api-platform/common v0.0.0 @@ -26,6 +27,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/envoyproxy/protoc-gen-validate v1.3.0 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect @@ -39,6 +41,7 @@ require ( github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go index 526a27c34..bda53d5a0 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "log/slog" + "sync" "time" "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/binding" @@ -55,6 +56,7 @@ type WebSubReceiver struct { syncProducer *subscription.SyncProducer brokerDriver connectors.BrokerDriver channel connectors.ChannelInfo + channelMu sync.RWMutex opts Options } @@ -98,17 +100,28 @@ func NewReceiver(cfg connectors.ReceiverConfig, opts Options) (connectors.Receiv verificationTimeout := time.Duration(opts.VerificationTimeoutSeconds) * time.Second + receiver := &WebSubReceiver{ + deliverer: deliverer, + topics: topics, + store: store, + consumerMgr: consumerMgr, + syncProducer: syncProducer, + brokerDriver: cfg.BrokerDriver, + channel: cfg.Channel, + opts: opts, + } + // Create HubHandler for subscribe/unsubscribe on {context}/{version}/hub. hubHandler := NewHubHandler( topics, store, verificationTimeout, opts.DefaultLeaseSeconds, cfg.Processor, cfg.BrokerDriver, cfg.Channel.Name, - cfg.Channel.Channels, consumerMgr, syncProducer, + cfg.Channel.Channels, &receiver.channelMu, consumerMgr, syncProducer, ) // Create WebhookReceiverHandler for ingress on {context}/{version}/webhook-receiver. webhookHandler := NewWebhookReceiverHandler( topics, cfg.Processor, cfg.BrokerDriver, - cfg.Channel.Name, cfg.Channel.Channels, + cfg.Channel.Name, cfg.Channel.Channels, &receiver.channelMu, ) // Register handlers on shared mux. @@ -116,18 +129,10 @@ func NewReceiver(cfg connectors.ReceiverConfig, opts Options) (connectors.Receiv cfg.Mux.Handle(basePath+"/hub", hubHandler) cfg.Mux.Handle(basePath+"/webhook-receiver", webhookHandler) - return &WebSubReceiver{ - hubHandler: hubHandler, - webhookHandler: webhookHandler, - deliverer: deliverer, - topics: topics, - store: store, - consumerMgr: consumerMgr, - syncProducer: syncProducer, - brokerDriver: cfg.BrokerDriver, - channel: cfg.Channel, - opts: opts, - }, nil + receiver.hubHandler = hubHandler + receiver.webhookHandler = webhookHandler + + return receiver, nil } // Start ensures Kafka topics exist and sets up the consumer manager context. diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go new file mode 100644 index 000000000..e203534bd --- /dev/null +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package websub + +import ( + "context" + "errors" + "fmt" + "log/slog" +) + +// ApplyBindingDelta mutates the live receiver for channel add/remove changes +// without recreating the receiver or the subscription sync topic. +func (e *WebSubReceiver) ApplyBindingDelta(ctx context.Context, removedChannels map[string]string, addedChannels map[string]string) error { + var tombstoneErrs []error + + for channelName := range removedChannels { + subscriptions := e.store.GetByTopic(channelName) + for _, sub := range subscriptions { + if e.syncProducer != nil { + if err := e.syncProducer.PublishTombstone(ctx, channelName, sub.CallbackURL); err != nil { + slog.Error("Failed to tombstone subscription for removed WebSub channel", + "api", e.channel.Name, + "channel", channelName, + "callback", sub.CallbackURL, + "error", err) + tombstoneErrs = append(tombstoneErrs, + fmt.Errorf("failed to tombstone subscription for removed channel %q callback %q: %w", channelName, sub.CallbackURL, err)) + } + } + } + } + + for channelName, kafkaTopic := range removedChannels { + e.topics.Deregister(channelName) + + subscriptions := e.store.GetByTopic(channelName) + for _, sub := range subscriptions { + if err := e.consumerMgr.RemoveSubscription(sub.CallbackURL, kafkaTopic); err != nil { + slog.Error("Failed to remove consumer for deleted WebSub channel", + "api", e.channel.Name, + "channel", channelName, + "callback", sub.CallbackURL, + "error", err) + } + if err := e.store.Remove(channelName, sub.CallbackURL); err != nil { + slog.Error("Failed to remove subscription for deleted WebSub channel", + "api", e.channel.Name, + "channel", channelName, + "callback", sub.CallbackURL, + "error", err) + } + } + + e.channelMu.Lock() + delete(e.channel.Channels, channelName) + e.channelMu.Unlock() + } + + for channelName, kafkaTopic := range addedChannels { + e.channelMu.Lock() + e.channel.Channels[channelName] = kafkaTopic + e.channelMu.Unlock() + e.topics.Register(channelName) + } + + if len(tombstoneErrs) > 0 { + return errors.Join(tombstoneErrs...) + } + + return nil +} diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta_test.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta_test.go new file mode 100644 index 000000000..0a62b6f40 --- /dev/null +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta_test.go @@ -0,0 +1,162 @@ +package websub + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" + "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/subscription" +) + +type deltaTestBrokerDriver struct { + publishErrs map[string]error + published []string +} + +func (d *deltaTestBrokerDriver) Publish(_ context.Context, topic string, msg *connectors.Message) error { + key := string(msg.Key) + d.published = append(d.published, key) + if err, ok := d.publishErrs[key]; ok { + return err + } + return nil +} + +func (d *deltaTestBrokerDriver) Subscribe(string, []string, connectors.MessageHandler) (connectors.Receiver, error) { + return nil, nil +} + +func (d *deltaTestBrokerDriver) SubscribeManual(string, []string, connectors.MessageHandler) (connectors.Receiver, error) { + return nil, nil +} + +func (d *deltaTestBrokerDriver) Replay(context.Context, string, connectors.MessageHandler) error { + return nil +} + +func (d *deltaTestBrokerDriver) TopicExists(context.Context, string) (bool, error) { + return false, nil +} + +func (d *deltaTestBrokerDriver) EnsureTopics(context.Context, []string) error { + return nil +} + +func (d *deltaTestBrokerDriver) EnsureCompactedTopic(context.Context, string) error { + return nil +} + +func (d *deltaTestBrokerDriver) DeleteTopics(context.Context, []string) error { + return nil +} + +func (d *deltaTestBrokerDriver) Close() error { + return nil +} + +type deltaTestReceiver struct { + stopCalls int +} + +func (r *deltaTestReceiver) Start(context.Context) error { + return nil +} + +func (r *deltaTestReceiver) Stop(context.Context) error { + r.stopCalls++ + return nil +} + +func TestApplyBindingDeltaContinuesAfterTombstoneFailure(t *testing.T) { + store := subscription.NewInMemoryStore("runtime-1") + require.NoError(t, store.Add(&subscription.Subscription{ + ID: "sub-1", + Topic: "removed-a", + CallbackURL: "https://callback-a.example", + State: subscription.StateActive, + })) + require.NoError(t, store.Add(&subscription.Subscription{ + ID: "sub-2", + Topic: "removed-b", + CallbackURL: "https://callback-b.example", + State: subscription.StateActive, + })) + + topics := NewTopicRegistry() + topics.Register("removed-a") + topics.Register("removed-b") + + consumerA := &deltaTestReceiver{} + consumerB := &deltaTestReceiver{} + consumerMgr := &ConsumerManager{ + consumers: map[string]*managedConsumer{ + "https://callback-a.example": { + consumer: consumerA, + topics: map[string]bool{"kafka-a": true}, + }, + "https://callback-b.example": { + consumer: consumerB, + topics: map[string]bool{"kafka-b": true}, + }, + }, + } + + driver := &deltaTestBrokerDriver{ + publishErrs: map[string]error{ + "removed-a:https://callback-a.example": errors.New("boom"), + }, + } + + receiver := &WebSubReceiver{ + topics: topics, + store: store, + consumerMgr: consumerMgr, + syncProducer: subscription.NewSyncProducer( + driver, + "runtime-1", + "internal-sub-topic", + ), + channel: connectors.ChannelInfo{ + Name: "test-api", + Channels: map[string]string{ + "removed-a": "kafka-a", + "removed-b": "kafka-b", + }, + }, + } + + err := receiver.ApplyBindingDelta(context.Background(), + map[string]string{ + "removed-a": "kafka-a", + "removed-b": "kafka-b", + }, + map[string]string{ + "added-c": "kafka-c", + }, + ) + + require.Error(t, err) + require.ErrorContains(t, err, `failed to tombstone subscription for removed channel "removed-a"`) + + require.ElementsMatch(t, []string{ + "removed-a:https://callback-a.example", + "removed-b:https://callback-b.example", + }, driver.published) + + require.Nil(t, store.GetByTopic("removed-a")) + require.Nil(t, store.GetByTopic("removed-b")) + + require.Equal(t, 1, consumerA.stopCalls) + require.Equal(t, 1, consumerB.stopCalls) + require.Empty(t, consumerMgr.consumers) + + require.NotContains(t, receiver.channel.Channels, "removed-a") + require.NotContains(t, receiver.channel.Channels, "removed-b") + require.Equal(t, "kafka-c", receiver.channel.Channels["added-c"]) + + require.False(t, topics.IsRegistered("removed-a")) + require.False(t, topics.IsRegistered("removed-b")) + require.True(t, topics.IsRegistered("added-c")) +} diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go index 5ac7b5871..d53fe0690 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go @@ -25,6 +25,7 @@ import ( "log/slog" "net/http" "strconv" + "sync" "time" "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" @@ -41,6 +42,7 @@ type HubHandler struct { brokerDriver connectors.BrokerDriver bindingName string channels map[string]string // channel-name → Kafka topic + channelMu *sync.RWMutex consumerMgr *ConsumerManager syncProducer *subscription.SyncProducer defaultLease int @@ -56,6 +58,7 @@ func NewHubHandler( brokerDriver connectors.BrokerDriver, bindingName string, channels map[string]string, + channelMu *sync.RWMutex, consumerMgr *ConsumerManager, syncProducer *subscription.SyncProducer, ) *HubHandler { @@ -67,6 +70,7 @@ func NewHubHandler( brokerDriver: brokerDriver, bindingName: bindingName, channels: channels, + channelMu: channelMu, consumerMgr: consumerMgr, syncProducer: syncProducer, defaultLease: defaultLease, @@ -131,7 +135,9 @@ func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { } // Resolve the Kafka topic for this channel. + h.channelMu.RLock() kafkaTopic, ok := h.channels[topic] + h.channelMu.RUnlock() if !ok { http.Error(w, fmt.Sprintf("no kafka topic for channel: %s", topic), http.StatusNotFound) return @@ -239,7 +245,9 @@ func (h *HubHandler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { } // Stop/update per-callback consumer. + h.channelMu.RLock() kafkaTopic := h.channels[topic] + h.channelMu.RUnlock() if kafkaTopic != "" { if err := h.consumerMgr.RemoveSubscription(callback, kafkaTopic); err != nil { slog.Error("Failed to update consumer on unsubscribe", "callback", callback, "error", err) @@ -271,6 +279,7 @@ type WebhookReceiverHandler struct { brokerDriver connectors.BrokerDriver bindingName string channels map[string]string // channel-name → Kafka topic + channelMu *sync.RWMutex } // NewWebhookReceiverHandler creates a new webhook receiver handler. @@ -280,6 +289,7 @@ func NewWebhookReceiverHandler( brokerDriver connectors.BrokerDriver, bindingName string, channels map[string]string, + channelMu *sync.RWMutex, ) *WebhookReceiverHandler { return &WebhookReceiverHandler{ topics: topics, @@ -287,6 +297,7 @@ func NewWebhookReceiverHandler( brokerDriver: brokerDriver, bindingName: bindingName, channels: channels, + channelMu: channelMu, } } @@ -309,7 +320,9 @@ func (h *WebhookReceiverHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques return } + h.channelMu.RLock() kafkaTopic, ok := h.channels[channelName] + h.channelMu.RUnlock() if !ok { http.Error(w, fmt.Sprintf("no kafka topic for channel: %s", channelName), http.StatusNotFound) return diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime.go b/event-gateway/gateway-runtime/internal/runtime/runtime.go index d7ac5d330..00881b181 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime.go @@ -20,11 +20,14 @@ package runtime import ( "context" + "encoding/json" "fmt" "log/slog" "net/http" "os" "path" + "reflect" + "sort" "sync" "time" @@ -63,6 +66,8 @@ type Runtime struct { activeBrokerDrivers map[string]connectors.BrokerDriver bindingPaths map[string][]string // name → registered mux paths bindingTopics map[string][]string // name → Kafka topics (data + internal sub) + bindingMutationMu sync.Mutex + bindingMutationLocks map[string]*sync.Mutex websubMux *DynamicMux websubServer *managedServer webSubServersCreated bool // true if LoadChannels created WebSub servers @@ -78,6 +83,11 @@ type managedServer struct { keyFile string } +type webSubBindingUpdater interface { + connectors.Receiver + ApplyBindingDelta(ctx context.Context, removedChannels map[string]string, addedChannels map[string]string) error +} + // New creates a new Runtime. After creation: // 1. Call Engine() to register policies // 2. Call LoadChannels() to parse bindings and create per-channel receiver+broker-driver pairs @@ -105,6 +115,7 @@ func New(cfg *config.Config, rawConfig map[string]interface{}, registry *connect activeBrokerDrivers: make(map[string]connectors.BrokerDriver), bindingPaths: make(map[string][]string), bindingTopics: make(map[string][]string), + bindingMutationLocks: make(map[string]*sync.Mutex), websubMux: NewDynamicMux(), }, nil } @@ -691,8 +702,182 @@ func (r *Runtime) unregisterBindingChains(b *hub.ChannelBinding) { } } +func webSubChannelTopicMap(wsb binding.WebSubApiBinding) map[string]string { + channels := make(map[string]string, len(wsb.Channels)) + for _, ch := range wsb.Channels { + channels[ch.Name] = binding.WebSubApiTopicName(wsb.Name, wsb.Version, ch.Name) + } + return channels +} + +func webSubTopicList(channels map[string]string, subscriptionTopic string) []string { + topics := make([]string, 0, len(channels)+1) + for _, kafkaTopic := range channels { + topics = append(topics, kafkaTopic) + } + if subscriptionTopic != "" { + topics = append(topics, subscriptionTopic) + } + return topics +} + +func diffChannelTopics(oldChannels, newChannels map[string]string) (map[string]string, map[string]string) { + removed := make(map[string]string) + added := make(map[string]string) + + for channelName, kafkaTopic := range oldChannels { + if _, exists := newChannels[channelName]; !exists { + removed[channelName] = kafkaTopic + } + } + for channelName, kafkaTopic := range newChannels { + if _, exists := oldChannels[channelName]; !exists { + added[channelName] = kafkaTopic + } + } + + return removed, added +} + +func webSubActiveChainKeys(wsb binding.WebSubApiBinding, vhost string) map[string]bool { + active := make(map[string]bool) + basePath := binding.WebSubApiBasePath(wsb.Context, wsb.Version) + hubPath := basePath + "/hub" + + if len(wsb.Policies.Subscribe) > 0 { + active[binding.GenerateRouteKey("SUBSCRIBE", hubPath, vhost)] = true + } + if len(wsb.Policies.Unsubscribe) > 0 { + active[binding.GenerateRouteKey("UNSUBSCRIBE", hubPath, vhost)] = true + } + if len(wsb.Policies.Inbound) > 0 { + active[binding.GenerateRouteKey("SUB", basePath+"/webhook-receiver", vhost)] = true + } + if len(wsb.Policies.Outbound) > 0 { + active[binding.GenerateRouteKey("DELIVER", hubPath, vhost)] = true + } + + for _, ch := range wsb.Channels { + chPath := hubPath + "/" + ch.Name + if len(ch.Policies.Subscribe) > 0 { + active[binding.GenerateRouteKey("SUBSCRIBE", chPath, vhost)] = true + } + if len(ch.Policies.Unsubscribe) > 0 { + active[binding.GenerateRouteKey("UNSUBSCRIBE", chPath, vhost)] = true + } + if len(ch.Policies.Inbound) > 0 { + active[binding.GenerateRouteKey("SUB", chPath, vhost)] = true + } + if len(ch.Policies.Outbound) > 0 { + active[binding.GenerateRouteKey("DELIVER", chPath, vhost)] = true + } + } + + return active +} + +func (r *Runtime) unregisterStaleBindingChains(b *hub.ChannelBinding, activeKeys map[string]bool) { + if b == nil { + return + } + keys := []string{b.SubscribeChainKey, b.UnsubscribeChainKey, b.InboundChainKey, b.OutboundChainKey} + for _, key := range keys { + if key == "" { + continue + } + if !activeKeys[key] { + r.engine.UnregisterChain(key) + } + } + for _, chKeys := range b.ChannelChainKeys { + channelKeys := []string{chKeys.SubscribeChainKey, chKeys.UnsubscribeChainKey, chKeys.InboundChainKey, chKeys.OutboundChainKey} + for _, key := range channelKeys { + if key == "" { + continue + } + if !activeKeys[key] { + r.engine.UnregisterChain(key) + } + } + } +} + +func canDeltaUpdateWebSubBinding(oldWSB, newWSB binding.WebSubApiBinding) bool { + if oldWSB.Name != newWSB.Name { + return false + } + if oldWSB.Context != newWSB.Context { + return false + } + if oldWSB.Version != newWSB.Version { + return false + } + if !reflect.DeepEqual(oldWSB.Receiver, newWSB.Receiver) { + return false + } + if !equalBrokerDriverSpec(oldWSB.BrokerDriver, newWSB.BrokerDriver) { + return false + } + return true +} + +func equalBrokerDriverSpec(oldSpec, newSpec binding.BrokerDriverSpec) bool { + if oldSpec.Type != newSpec.Type || oldSpec.Topic != newSpec.Topic || oldSpec.Ordering != newSpec.Ordering { + return false + } + + oldConfigJSON, oldErr := json.Marshal(oldSpec.Config) + newConfigJSON, newErr := json.Marshal(newSpec.Config) + if oldErr != nil || newErr != nil { + return reflect.DeepEqual(oldSpec.Config, newSpec.Config) + } + + return string(oldConfigJSON) == string(newConfigJSON) +} + +func (r *Runtime) lockBindingMutations(names ...string) func() { + seen := make(map[string]bool, len(names)) + ordered := make([]string, 0, len(names)) + for _, name := range names { + if name == "" || seen[name] { + continue + } + seen[name] = true + ordered = append(ordered, name) + } + sort.Strings(ordered) + + locks := make([]*sync.Mutex, 0, len(ordered)) + r.bindingMutationMu.Lock() + for _, name := range ordered { + lock, ok := r.bindingMutationLocks[name] + if !ok { + lock = &sync.Mutex{} + r.bindingMutationLocks[name] = lock + } + locks = append(locks, lock) + } + r.bindingMutationMu.Unlock() + + for _, lock := range locks { + lock.Lock() + } + + return func() { + for i := len(locks) - 1; i >= 0; i-- { + locks[i].Unlock() + } + } +} + // AddWebSubApiBinding dynamically adds a WebSubApi binding at runtime (xDS mode). func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { + unlock := r.lockBindingMutations(wsb.Name) + defer unlock() + return r.addWebSubApiBindingLocked(wsb) +} + +func (r *Runtime) addWebSubApiBindingLocked(wsb binding.WebSubApiBinding) error { r.mu.Lock() vhost := defaultVhost(wsb.Vhost) @@ -744,12 +929,7 @@ func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { r.bindingPaths[wsb.Name] = []string{basePath + "/hub", basePath + "/webhook-receiver"} // Track all Kafka topics for cleanup on removal. - allTopics := make([]string, 0, len(channels)+1) - for _, kafkaTopic := range channels { - allTopics = append(allTopics, kafkaTopic) - } - allTopics = append(allTopics, internalSubTopic) - r.bindingTopics[wsb.Name] = allTopics + r.bindingTopics[wsb.Name] = webSubTopicList(channels, internalSubTopic) ch := connectors.ChannelInfo{ Name: wsb.Name, @@ -798,8 +978,136 @@ func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { return nil } +// UpdateWebSubApiBinding dynamically updates an existing WebSubApi binding at runtime (xDS mode). +func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding) error { + unlock := r.lockBindingMutations(oldWSB.Name, newWSB.Name) + defer unlock() + + if !canDeltaUpdateWebSubBinding(oldWSB, newWSB) { + if err := r.removeWebSubApiBindingLocked(oldWSB.Name); err != nil { + return err + } + return r.addWebSubApiBindingLocked(newWSB) + } + + r.mu.Lock() + + receiver, ok := r.activeReceivers[oldWSB.Name] + if !ok { + r.mu.Unlock() + return fmt.Errorf("active receiver not found for WebSubApi %q", oldWSB.Name) + } + updater, ok := receiver.(webSubBindingUpdater) + if !ok { + r.mu.Unlock() + return fmt.Errorf("receiver for WebSubApi %q does not support delta updates", oldWSB.Name) + } + + brokerDriver, ok := r.activeBrokerDrivers[oldWSB.Name] + if !ok { + r.mu.Unlock() + return fmt.Errorf("active broker-driver not found for WebSubApi %q", oldWSB.Name) + } + + oldChannels := webSubChannelTopicMap(oldWSB) + newChannels := webSubChannelTopicMap(newWSB) + removedChannels, addedChannels := diffChannelTopics(oldChannels, newChannels) + oldBinding := r.hub.GetBinding(oldWSB.Name) + updateCtx := r.runCtx + r.mu.Unlock() + + vhost := defaultVhost(newWSB.Vhost) + subKey, unsubKey, inKey, outKey, chChainKeys, err := r.buildWebSubApiPolicyChains(newWSB, vhost) + if err != nil { + return fmt.Errorf("failed to build chains for updated WebSubApi %q: %w", newWSB.Name, err) + } + + if len(addedChannels) > 0 { + topicsToEnsure := make([]string, 0, len(addedChannels)) + for _, kafkaTopic := range addedChannels { + topicsToEnsure = append(topicsToEnsure, kafkaTopic) + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := brokerDriver.EnsureTopics(ctx, topicsToEnsure); err != nil { + cancel() + return fmt.Errorf("failed to ensure broker-driver topics during update for WebSubApi %q: %w", newWSB.Name, err) + } + cancel() + } + + if updateCtx == nil { + updateCtx = context.Background() + } + if err := updater.ApplyBindingDelta(updateCtx, removedChannels, addedChannels); err != nil { + return fmt.Errorf("failed to apply WebSub delta update for %q: %w", newWSB.Name, err) + } + + if len(removedChannels) > 0 { + topicsToDelete := make([]string, 0, len(removedChannels)) + for _, kafkaTopic := range removedChannels { + topicsToDelete = append(topicsToDelete, kafkaTopic) + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := brokerDriver.DeleteTopics(ctx, topicsToDelete); err != nil { + slog.Error("Failed to delete broker-driver topics during delta update", + "name", newWSB.Name, + "error", err) + } + cancel() + } + + r.mu.Lock() + defer r.mu.Unlock() + + currentReceiver, ok := r.activeReceivers[oldWSB.Name] + if !ok || currentReceiver != receiver { + return fmt.Errorf("active receiver changed during WebSubApi update for %q", newWSB.Name) + } + currentBrokerDriver, ok := r.activeBrokerDrivers[oldWSB.Name] + if !ok || currentBrokerDriver != brokerDriver { + return fmt.Errorf("active broker-driver changed during WebSubApi update for %q", newWSB.Name) + } + if r.hub.GetBinding(oldWSB.Name) != oldBinding { + return fmt.Errorf("hub binding changed during WebSubApi update for %q", newWSB.Name) + } + + r.unregisterStaleBindingChains(oldBinding, webSubActiveChainKeys(newWSB, vhost)) + r.hub.RegisterBinding(hub.ChannelBinding{ + APIID: newWSB.APIID, + Name: newWSB.Name, + Mode: "websub", + Context: newWSB.Context, + Version: newWSB.Version, + Vhost: vhost, + SubscribeChainKey: subKey, + UnsubscribeChainKey: unsubKey, + InboundChainKey: inKey, + OutboundChainKey: outKey, + Channels: newChannels, + ChannelChainKeys: chChainKeys, + }) + + internalSubTopic := r.webSubSubscriptionSyncTopic(newWSB.Name, newWSB.Version) + r.bindingTopics[newWSB.Name] = webSubTopicList(newChannels, internalSubTopic) + + slog.Info("Dynamically updated WebSubApi binding", + "name", newWSB.Name, + "added_channels", len(addedChannels), + "removed_channels", len(removedChannels), + "channels", len(newWSB.Channels), + ) + + return nil +} + // RemoveWebSubApiBinding dynamically removes a WebSubApi binding at runtime (xDS mode). func (r *Runtime) RemoveWebSubApiBinding(name string) error { + unlock := r.lockBindingMutations(name) + defer unlock() + return r.removeWebSubApiBindingLocked(name) +} + +func (r *Runtime) removeWebSubApiBindingLocked(name string) error { r.mu.Lock() defer r.mu.Unlock() diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go index 8b8350c1b..89cfa09cc 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go @@ -164,6 +164,116 @@ func TestWebSubSubscriptionSyncTopic_FallsBackToDerivedTopic(t *testing.T) { } } +func TestWebSubActiveChainKeys_IncludesUnsubscribeKeys(t *testing.T) { + wsb := binding.WebSubApiBinding{ + Context: "/repos", + Version: "v1.0", + Policies: binding.PolicyBindings{ + Subscribe: []binding.PolicyRef{{Name: "sub", Version: "v1.0.0"}}, + Unsubscribe: []binding.PolicyRef{{Name: "unsub", Version: "v1.0.0"}}, + }, + Channels: []binding.ChannelDef{ + { + Name: "issues", + Policies: binding.PolicyBindings{ + Subscribe: []binding.PolicyRef{{Name: "sub", Version: "v1.0.0"}}, + Unsubscribe: []binding.PolicyRef{{Name: "unsub", Version: "v1.0.0"}}, + }, + }, + }, + } + + vhost := defaultVhost("") + basePath := binding.WebSubApiBasePath(wsb.Context, wsb.Version) + hubPath := basePath + "/hub" + channelPath := hubPath + "/issues" + + active := webSubActiveChainKeys(wsb, vhost) + + if !active[binding.GenerateRouteKey("SUBSCRIBE", hubPath, vhost)] { + t.Fatal("expected hub-level subscribe key to be active") + } + if !active[binding.GenerateRouteKey("UNSUBSCRIBE", hubPath, vhost)] { + t.Fatal("expected hub-level unsubscribe key to be active") + } + if !active[binding.GenerateRouteKey("SUBSCRIBE", channelPath, vhost)] { + t.Fatal("expected channel-level subscribe key to be active") + } + if !active[binding.GenerateRouteKey("UNSUBSCRIBE", channelPath, vhost)] { + t.Fatal("expected channel-level unsubscribe key to be active") + } +} + +func TestUnregisterStaleBindingChains_RemovesStaleUnsubscribeKeys(t *testing.T) { + eng, err := enginepkg.New(nil) + if err != nil { + t.Fatalf("failed to create engine: %v", err) + } + if err := eng.RegisterPolicy(&policy.PolicyDefinition{ + Name: "test-noop", + Version: "v1.0.0", + }, newTestNoopPolicy); err != nil { + t.Fatalf("failed to register test policy: %v", err) + } + + registerChain := func(routeKey string) { + chain, err := eng.BuildChain(routeKey, []enginepkg.PolicySpec{{ + Name: "test-noop", + Version: "v1", + Enabled: true, + }}) + if err != nil { + t.Fatalf("failed to build chain %q: %v", routeKey, err) + } + eng.RegisterChain(routeKey, chain) + } + + bindingState := &hub.ChannelBinding{ + SubscribeChainKey: "hub-sub", + UnsubscribeChainKey: "hub-unsub", + InboundChainKey: "hub-in", + OutboundChainKey: "hub-out", + ChannelChainKeys: map[string]hub.ChannelChainKeySet{ + "issues": { + SubscribeChainKey: "ch-sub", + UnsubscribeChainKey: "ch-unsub", + InboundChainKey: "ch-in", + OutboundChainKey: "ch-out", + }, + }, + } + + for _, key := range []string{ + bindingState.SubscribeChainKey, + bindingState.UnsubscribeChainKey, + bindingState.InboundChainKey, + bindingState.OutboundChainKey, + "ch-sub", "ch-unsub", "ch-in", "ch-out", + } { + registerChain(key) + } + + rt := &Runtime{engine: eng} + rt.unregisterStaleBindingChains(bindingState, map[string]bool{ + "hub-sub": true, + "hub-in": true, + "hub-out": true, + "ch-sub": true, + "ch-in": true, + "ch-out": true, + }) + + if eng.GetChain("hub-unsub") != nil { + t.Fatal("expected stale hub unsubscribe chain to be removed") + } + if eng.GetChain("ch-unsub") != nil { + t.Fatal("expected stale channel unsubscribe chain to be removed") + } + if eng.GetChain("hub-sub") == nil || eng.GetChain("ch-sub") == nil { + t.Fatal("expected active subscribe chains to remain registered") + } +} + func TestStartReceiverWithRetry_RetriesUntilSuccess(t *testing.T) { previousInitial := initialReceiverStartBackoff previousMax := maxReceiverStartBackoff diff --git a/event-gateway/gateway-runtime/internal/xdsclient/handler.go b/event-gateway/gateway-runtime/internal/xdsclient/handler.go index c80fc96fd..d15cff6de 100644 --- a/event-gateway/gateway-runtime/internal/xdsclient/handler.go +++ b/event-gateway/gateway-runtime/internal/xdsclient/handler.go @@ -38,6 +38,7 @@ import ( type BindingManager interface { AddWebSubApiBinding(wsb binding.WebSubApiBinding) error RemoveWebSubApiBinding(name string) error + UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding) error } // KafkaConfig holds local Kafka broker settings used as defaults. @@ -188,15 +189,14 @@ func (h *Handler) HandleResources(ctx context.Context, resources []*discoveryv3. if reflect.DeepEqual(old, ecr) { continue } - // Update: remove then re-add slog.Info("Updating binding via xDS", "name", ecr.Name, "uuid", uuid) - if err := h.manager.RemoveWebSubApiBinding(old.Name); err != nil { - slog.Error("Failed to remove binding for update", "name", old.Name, "error", err) + if err := h.manager.UpdateWebSubApiBinding(h.toWebSubApiBinding(old), h.toWebSubApiBinding(ecr)); err != nil { + return fmt.Errorf("failed to update binding %q: %w", ecr.Name, err) } - } else { - slog.Info("Adding binding via xDS", "name", ecr.Name, "uuid", uuid) + continue } + slog.Info("Adding binding via xDS", "name", ecr.Name, "uuid", uuid) wsb := h.toWebSubApiBinding(ecr) if err := h.manager.AddWebSubApiBinding(wsb); err != nil { return fmt.Errorf("failed to add binding %q: %w", ecr.Name, err) diff --git a/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go b/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go index c497b174c..f9cc1deac 100644 --- a/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go +++ b/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go @@ -141,11 +141,17 @@ func TestHandlerHandleResources_UpdatesOnlyChangedBinding(t *testing.T) { t.Fatalf("updated HandleResources returned error: %v", err) } - if got := manager.removedNames(); len(got) != 1 || got[0] != "githubser" { - t.Fatalf("expected only githubser to be removed on change, got %#v", got) + if got := len(manager.updated); got != 1 { + t.Fatalf("expected exactly one update call, got %d", got) } - if got := manager.addedCount("githubser"); got != 2 { - t.Fatalf("expected githubser to be added twice, got %d", got) + if manager.updated[0].old.Name != "githubser" || manager.updated[0].new.Name != "githubser" { + t.Fatalf("unexpected update binding names: %#v", manager.updated[0]) + } + if got := manager.removedNames(); len(got) != 0 { + t.Fatalf("expected no removals on delta update, got %#v", got) + } + if got := manager.addedCount("githubser"); got != 1 { + t.Fatalf("expected githubser to be added once, got %d", got) } if got := manager.addedCount("gitlabser"); got != 1 { t.Fatalf("expected gitlabser to be added once, got %d", got) @@ -155,6 +161,12 @@ func TestHandlerHandleResources_UpdatesOnlyChangedBinding(t *testing.T) { type recordingBindingManager struct { added []string removed []string + updated []updatedBindingCall +} + +type updatedBindingCall struct { + old binding.WebSubApiBinding + new binding.WebSubApiBinding } func (m *recordingBindingManager) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { @@ -167,6 +179,11 @@ func (m *recordingBindingManager) RemoveWebSubApiBinding(name string) error { return nil } +func (m *recordingBindingManager) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding) error { + m.updated = append(m.updated, updatedBindingCall{old: oldWSB, new: newWSB}) + return nil +} + func (m *recordingBindingManager) addedNames() []string { out := append([]string(nil), m.added...) return out diff --git a/gateway/gateway-controller/pkg/api/handlers/handlers.go b/gateway/gateway-controller/pkg/api/handlers/handlers.go index cf71a5a05..e5dcb7711 100644 --- a/gateway/gateway-controller/pkg/api/handlers/handlers.go +++ b/gateway/gateway-controller/pkg/api/handlers/handlers.go @@ -46,6 +46,7 @@ import ( "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/secrets" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/restapi" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/websubapi" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" @@ -56,6 +57,7 @@ type APIServer struct { *RestAPIHandler // embedded — promotes CreateRestAPI, ListRestAPIs, GetRestAPIById, UpdateRestAPI, DeleteRestAPI restAPIService *restapi.RestAPIService + webSubAPIService *websubapi.WebSubAPIService store *storage.ConfigStore db storage.Storage snapshotManager *xds.SnapshotManager @@ -151,6 +153,17 @@ func NewAPIServer( subscriptionResourceService: subscriptionResourceService, } server.restAPIService = restAPIService + server.webSubAPIService = websubapi.NewWebSubAPIService( + db, + deploymentService, + controlPlaneClient, + systemConfig, + parser, + validator, + logger, + eventHub, + secretService, + ) server.RestAPIHandler = NewRestAPIHandler(restAPIService, logger) // Register status update callback diff --git a/gateway/gateway-controller/pkg/api/handlers/handlers_test.go b/gateway/gateway-controller/pkg/api/handlers/handlers_test.go index 6cacbf025..b58cf04f2 100644 --- a/gateway/gateway-controller/pkg/api/handlers/handlers_test.go +++ b/gateway/gateway-controller/pkg/api/handlers/handlers_test.go @@ -49,6 +49,7 @@ import ( policybuilder "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policy" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/restapi" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/websubapi" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" ) @@ -1032,6 +1033,17 @@ func createTestAPIServerWithDB(db storage.Storage) *APIServer { httpClient, parser, validator, logger, hub, nil, ) server.restAPIService = restAPIService + server.webSubAPIService = websubapi.NewWebSubAPIService( + db, + deploymentService, + nil, + systemCfg, + parser, + validator, + logger, + hub, + nil, + ) server.RestAPIHandler = NewRestAPIHandler(restAPIService, logger) return server @@ -1251,6 +1263,17 @@ func attachTestEventHub(server *APIServer, hub eventhub.EventHub, gatewayID stri server.restAPIService = restAPIService server.RestAPIHandler = NewRestAPIHandler(restAPIService, server.logger) } + server.webSubAPIService = websubapi.NewWebSubAPIService( + server.db, + server.deploymentService, + nil, + server.systemConfig, + server.parser, + server.validator, + server.logger, + hub, + nil, + ) } func seedAPIForAPIKeyHandlerTests(t *testing.T, server *APIServer, handle string) *models.StoredConfig { @@ -3012,6 +3035,69 @@ func TestGetLLMProxyByIdWithDeployedAt(t *testing.T) { assert.Equal(t, http.StatusOK, w.Code) } +func TestUpdateWebSubAPIUndeployUsesDedicatedServicePath(t *testing.T) { + server := createTestAPIServer() + mockDB := server.db.(*MockStorage) + mockHub := &mockEventHub{} + attachTestEventHub(server, mockHub, "test-gateway") + + cfg := &models.StoredConfig{ + UUID: "0000-websub-id-0000-000000000000", + Kind: string(api.WebSubAPIKindWebSubApi), + Handle: "repo-watcher-v1-0", + DisplayName: "repo-watcher", + Version: "v1.0", + SourceConfiguration: api.WebSubAPI{ + ApiVersion: api.WebSubAPIApiVersionGatewayApiPlatformWso2Comv1alpha1, + Kind: api.WebSubAPIKindWebSubApi, + Metadata: api.Metadata{ + Name: "repo-watcher-v1-0", + }, + Spec: api.WebhookAPIData{ + DisplayName: "repo-watcher", + Version: "v1.0", + Context: "/repos", + Channels: &map[string]api.WebSubChannel{ + "issues": {}, + }, + }, + }, + DesiredState: models.StateDeployed, + Origin: models.OriginGatewayAPI, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + require.NoError(t, mockDB.SaveConfig(cfg)) + + body := []byte(`{ + "apiVersion":"gateway.api-platform.wso2.com/v1alpha1", + "kind":"WebSubApi", + "metadata":{"name":"repo-watcher-v1-0"}, + "spec":{ + "displayName":"repo-watcher", + "version":"v1.0", + "context":"/repos", + "deploymentState":"undeployed", + "channels":{"issues":{}} + } + }`) + + c, w := createTestContext("PUT", "/websub-apis/repo-watcher-v1-0", body) + c.Request.Header.Set("Content-Type", "application/json") + + server.UpdateWebSubAPI(c, "repo-watcher-v1-0") + + assert.Equal(t, http.StatusOK, w.Code) + + updatedCfg, err := mockDB.GetConfig("0000-websub-id-0000-000000000000") + require.NoError(t, err) + assert.Equal(t, models.StateUndeployed, updatedCfg.DesiredState) + require.Len(t, mockHub.publishedEvents, 1) + assert.Equal(t, "UPDATE", mockHub.publishedEvents[0].event.Action) + assert.Equal(t, eventhub.EventTypeAPI, mockHub.publishedEvents[0].event.EventType) + assert.Equal(t, "0000-websub-id-0000-000000000000", mockHub.publishedEvents[0].event.EntityID) +} + // TestHandleStatusUpdateStoreError tests handleStatusUpdate with store error func TestHandleStatusUpdateStoreError(t *testing.T) { server := createTestAPIServer() diff --git a/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go b/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go index 66a54e2aa..15b46a414 100644 --- a/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go +++ b/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go @@ -20,6 +20,7 @@ package handlers import ( "context" + "errors" "fmt" "io" "log/slog" @@ -32,10 +33,13 @@ import ( api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/management" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/middleware" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/websubapi" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" ) +//TODO: Refactor to move business logic to service layer and keep handlers thin, focusing on request parsing, response formatting, and error handling. + // CreateWebSubAPI implements ServerInterface.CreateWebSubAPI // (POST /websub-apis) func (s *APIServer) CreateWebSubAPI(c *gin.Context) { @@ -182,31 +186,24 @@ func (s *APIServer) UpdateWebSubAPI(c *gin.Context, id string) { return } - existing, err := s.db.GetConfigByKindAndHandle(models.KindWebSubApi, handle) - if err != nil { - log.Warn("WebSub API configuration not found", - slog.String("handle", handle)) - c.JSON(http.StatusNotFound, api.ErrorResponse{ - Status: "error", - Message: fmt.Sprintf("WebSub API configuration with handle '%s' not found", handle), - }) - return - } - correlationID := middleware.GetCorrelationID(c) - result, err := s.deploymentService.DeployAPIConfiguration(utils.APIDeploymentParams{ - Data: body, + result, err := s.webSubAPIService.Update(websubapi.UpdateParams{ + Handle: handle, + Body: body, ContentType: c.GetHeader("Content-Type"), - Kind: "WebSubApi", - APIID: existing.UUID, - Origin: models.OriginGatewayAPI, CorrelationID: correlationID, Logger: log, }) if err != nil { log.Error("Failed to update WebSub API configuration", slog.Any("error", err)) - if storage.IsConflictError(err) { + if errors.Is(err, websubapi.ErrNotFound) { + c.JSON(http.StatusNotFound, api.ErrorResponse{ + Status: "error", + Message: fmt.Sprintf("WebSub API configuration with handle '%s' not found", handle), + }) + return + } else if storage.IsConflictError(err) { c.JSON(http.StatusConflict, api.ErrorResponse{ Status: "error", Message: err.Error(), @@ -223,7 +220,7 @@ func (s *APIServer) UpdateWebSubAPI(c *gin.Context, id string) { return } - updated := result.StoredConfig + updated := result.Config log.Info("WebSub API configuration updated", slog.String("id", updated.UUID), diff --git a/gateway/gateway-controller/pkg/service/websubapi/errors.go b/gateway/gateway-controller/pkg/service/websubapi/errors.go new file mode 100644 index 000000000..3a44cba9d --- /dev/null +++ b/gateway/gateway-controller/pkg/service/websubapi/errors.go @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package websubapi + +import ( + "errors" + "fmt" + + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" +) + +var ( + // ErrNotFound is returned when a WebSub API is not found. + ErrNotFound = errors.New("websub api not found") +) + +// ValidationError wraps configuration validation errors. +type ValidationError struct { + Errors []config.ValidationError +} + +func (e *ValidationError) Error() string { + return fmt.Sprintf("configuration validation failed (%d errors)", len(e.Errors)) +} + +// ParseError wraps a configuration parse failure. +type ParseError struct { + Cause error +} + +func (e *ParseError) Error() string { + return fmt.Sprintf("failed to parse configuration: %v", e.Cause) +} + +func (e *ParseError) Unwrap() error { + return e.Cause +} + +// HandleMismatchError is returned when the path handle doesn't match metadata.name. +type HandleMismatchError struct { + PathHandle string + YAMLHandle string +} + +func (e *HandleMismatchError) Error() string { + return fmt.Sprintf("handle mismatch: path has '%s' but YAML metadata.name has '%s'", e.PathHandle, e.YAMLHandle) +} diff --git a/gateway/gateway-controller/pkg/service/websubapi/service.go b/gateway/gateway-controller/pkg/service/websubapi/service.go new file mode 100644 index 000000000..4a9208aca --- /dev/null +++ b/gateway/gateway-controller/pkg/service/websubapi/service.go @@ -0,0 +1,265 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package websubapi + +import ( + "fmt" + "log/slog" + "strings" + "time" + + "github.com/wso2/api-platform/common/eventhub" + api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/management" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/controlplane" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/templateengine" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/templateengine/funcs" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" +) + +// UpdateParams holds parameters for the Update operation. +type UpdateParams struct { + Handle string + Body []byte + ContentType string + CorrelationID string + Logger *slog.Logger +} + +// UpdateResult holds the result of an Update operation. +type UpdateResult struct { + Config *models.StoredConfig +} + +// WebSubAPIService encapsulates WebSub API update behavior. +type WebSubAPIService struct { + db storage.Storage + deploymentService *utils.APIDeploymentService + controlPlaneClient controlplane.ControlPlaneClient + systemConfig *config.Config + parser *config.Parser + validator config.Validator + logger *slog.Logger + eventHub eventhub.EventHub + secretResolver funcs.SecretResolver +} + +// NewWebSubAPIService creates a new WebSubAPIService. +func NewWebSubAPIService( + db storage.Storage, + deploymentService *utils.APIDeploymentService, + controlPlaneClient controlplane.ControlPlaneClient, + systemConfig *config.Config, + parser *config.Parser, + validator config.Validator, + logger *slog.Logger, + eventHub eventhub.EventHub, + secretResolver funcs.SecretResolver, +) *WebSubAPIService { + if db == nil { + panic("WebSubAPIService requires non-nil storage") + } + if deploymentService == nil { + panic("WebSubAPIService requires APIDeploymentService") + } + if eventHub == nil { + panic("WebSubAPIService requires non-nil EventHub") + } + if systemConfig == nil { + panic("WebSubAPIService requires non-nil system config") + } + if strings.TrimSpace(systemConfig.Controller.Server.GatewayID) == "" { + panic("WebSubAPIService requires non-empty gateway ID") + } + + return &WebSubAPIService{ + db: db, + deploymentService: deploymentService, + controlPlaneClient: controlPlaneClient, + systemConfig: systemConfig, + parser: parser, + validator: validator, + logger: logger, + eventHub: eventHub, + secretResolver: secretResolver, + } +} + +// Update modifies an existing WebSub API configuration. +func (s *WebSubAPIService) Update(params UpdateParams) (*UpdateResult, error) { + log := params.Logger + + var apiConfig api.WebSubAPI + if err := s.parser.Parse(params.Body, params.ContentType, &apiConfig); err != nil { + return nil, &ParseError{Cause: err} + } + + if apiConfig.Metadata.Name != "" && apiConfig.Metadata.Name != params.Handle { + return nil, &HandleMismatchError{ + PathHandle: params.Handle, + YAMLHandle: apiConfig.Metadata.Name, + } + } + + existing, err := s.db.GetConfigByKindAndHandle(models.KindWebSubApi, params.Handle) + if err != nil { + if storage.IsNotFoundError(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("failed to retrieve existing websub api: %w", err) + } + + desiredState := models.StateDeployed + if apiConfig.Spec.DeploymentState != nil && + *apiConfig.Spec.DeploymentState == api.WebhookAPIDataDeploymentStateUndeployed { + desiredState = models.StateUndeployed + } + + if desiredState != models.StateUndeployed { + result, err := s.deploymentService.DeployAPIConfiguration(utils.APIDeploymentParams{ + Data: params.Body, + ContentType: params.ContentType, + Kind: "WebSubApi", + APIID: existing.UUID, + Origin: models.OriginGatewayAPI, + CorrelationID: params.CorrelationID, + Logger: log, + }) + if err != nil { + return nil, err + } + return &UpdateResult{Config: result.StoredConfig}, nil + } + + if apiConfig.Metadata.Name == "" { + apiConfig.Metadata.Name = params.Handle + } + + existing.Configuration = apiConfig + existing.SourceConfiguration = apiConfig + + renderedExisting := *existing + if err := templateengine.RenderSpec(&renderedExisting, s.secretResolver, log); err != nil { + return nil, err + } + + renderedConfig, ok := renderedExisting.Configuration.(api.WebSubAPI) + if !ok { + return nil, fmt.Errorf("failed to render websub api configuration") + } + + validationErrors := s.validator.Validate(&renderedConfig) + if len(validationErrors) > 0 { + return nil, &ValidationError{Errors: validationErrors} + } + + if err := s.validateArtifactConflicts(existing.UUID, renderedConfig.Spec.DisplayName, renderedConfig.Spec.Version, existing.Handle); err != nil { + return nil, err + } + + now := time.Now() + existing.DisplayName = renderedConfig.Spec.DisplayName + existing.Version = renderedConfig.Spec.Version + existing.DesiredState = desiredState + existing.UpdatedAt = now + + truncatedNow := now.Truncate(time.Millisecond) + existing.DeployedAt = &truncatedNow + + if existing.Origin == models.OriginGatewayAPI { + existing.CPSyncStatus = models.CPSyncStatusPending + } + + if err := s.db.UpdateConfig(existing); err != nil { + log.Error("Failed to update WebSub API config in database", slog.Any("error", err)) + return nil, fmt.Errorf("failed to persist configuration update: %w", err) + } + + s.publishEvent(eventhub.EventTypeAPI, "UPDATE", existing.UUID, params.CorrelationID, log) + + if existing.Origin == models.OriginGatewayAPI && s.controlPlaneClient != nil && s.controlPlaneClient.IsConnected() && s.controlPlaneClient.IsOnPrem() { + go func() { + if err := s.controlPlaneClient.SyncArtifactsToOnPremAPIM(s.controlPlaneClient.GetAPIMConfig()); err != nil { + log.Error("Failed to sync WebSub API to on-prem APIM", slog.Any("error", err)) + } + }() + } + + log.Info("WebSub API configuration updated", + slog.String("id", existing.UUID), + slog.String("handle", params.Handle), + slog.String("desired_state", string(desiredState))) + + return &UpdateResult{Config: existing}, nil +} + +func (s *WebSubAPIService) validateArtifactConflicts(currentID, displayName, version, handle string) error { + existingByNameVersion, err := s.db.GetConfigByKindNameAndVersion(models.KindWebSubApi, displayName, version) + if err == nil { + if existingByNameVersion != nil && existingByNameVersion.UUID != currentID { + return fmt.Errorf("%w: configuration with name '%s' and version '%s' already exists", + storage.ErrConflict, displayName, version) + } + } else if !storage.IsNotFoundError(err) { + return fmt.Errorf("failed to check existing WebSubApi name/version conflict: %w", err) + } + + existingByHandle, err := s.db.GetConfigByKindAndHandle(models.KindWebSubApi, handle) + if err == nil { + if existingByHandle != nil && existingByHandle.UUID != currentID { + return fmt.Errorf("%w: configuration with handle '%s' already exists", + storage.ErrConflict, handle) + } + } else if !storage.IsNotFoundError(err) { + return fmt.Errorf("failed to check existing WebSubApi handle conflict: %w", err) + } + + return nil +} + +func (s *WebSubAPIService) publishEvent(eventType eventhub.EventType, action, entityID, correlationID string, logger *slog.Logger) { + gatewayID := strings.TrimSpace(s.systemConfig.Controller.Server.GatewayID) + event := eventhub.Event{ + GatewayID: gatewayID, + OriginatedTimestamp: time.Now(), + EventType: eventType, + Action: action, + EntityID: entityID, + EventID: correlationID, + EventData: eventhub.EmptyEventData, + } + + if err := s.eventHub.PublishEvent(gatewayID, event); err != nil { + logger.Warn("Failed to publish event to event hub", + slog.String("gateway_id", gatewayID), + slog.String("event_type", string(eventType)), + slog.String("action", action), + slog.String("entity_id", entityID), + slog.Any("error", err)) + return + } + + logger.Debug("Published event to event hub", + slog.String("gateway_id", gatewayID), + slog.String("event_type", string(eventType)), + slog.String("action", action), + slog.String("entity_id", entityID)) +}