diff --git a/event-gateway/gateway-runtime/configs/config.toml b/event-gateway/gateway-runtime/configs/config.toml index 50ebe9681e..ff8f444615 100644 --- a/event-gateway/gateway-runtime/configs/config.toml +++ b/event-gateway/gateway-runtime/configs/config.toml @@ -26,6 +26,9 @@ metrics_port = 9003 # Default Kafka brokers. Channels can override with broker-driver.config.brokers. brokers = ["localhost:9092"] consumer_group_prefix = "event-gateway" +# Partitions and replication factor used for internal compacted topics. +compact_topic_partitions = 1 +compact_topic_replication_factor = 1 tls = false # Optional PEM CA file for self-signed or private Kafka CAs. # tls_ca_file = "/etc/event-gateway/kafka/ca.crt" @@ -70,4 +73,3 @@ allow_payloads = false enabled = true xds_address = "localhost:18001" # node_id = "" - diff --git a/event-gateway/gateway-runtime/internal/config/config.go b/event-gateway/gateway-runtime/internal/config/config.go index e7ae25cd3c..198decd3f8 100644 --- a/event-gateway/gateway-runtime/internal/config/config.go +++ b/event-gateway/gateway-runtime/internal/config/config.go @@ -57,16 +57,18 @@ type ServerConfig struct { // KafkaConfig holds Kafka connection settings. type KafkaConfig struct { - Brokers []string `koanf:"brokers"` - ConsumerGroupPrefix string `koanf:"consumer_group_prefix"` - TLS bool `koanf:"tls"` - TLSCAFile string `koanf:"tls_ca_file"` - TLSCertFile string `koanf:"tls_cert_file"` - TLSKeyFile string `koanf:"tls_key_file"` - TLSServerName string `koanf:"tls_server_name"` - SASLMechanism string `koanf:"sasl_mechanism"` - SASLUsername string `koanf:"sasl_username"` - SASLPassword string `koanf:"sasl_password"` + Brokers []string `koanf:"brokers"` + ConsumerGroupPrefix string `koanf:"consumer_group_prefix"` + CompactTopicPartitions int `koanf:"compact_topic_partitions"` + CompactTopicReplicationFactor int `koanf:"compact_topic_replication_factor"` + TLS bool `koanf:"tls"` + TLSCAFile string `koanf:"tls_ca_file"` + TLSCertFile string `koanf:"tls_cert_file"` + TLSKeyFile string `koanf:"tls_key_file"` + TLSServerName string `koanf:"tls_server_name"` + SASLMechanism string `koanf:"sasl_mechanism"` + SASLUsername string `koanf:"sasl_username"` + SASLPassword string `koanf:"sasl_password"` } // WebSubConfig holds WebSub-specific settings. @@ -111,8 +113,10 @@ func DefaultConfig() *Config { MetricsPort: 9003, }, Kafka: KafkaConfig{ - Brokers: []string{"localhost:9092"}, - ConsumerGroupPrefix: "event-gateway", + Brokers: []string{"localhost:9092"}, + ConsumerGroupPrefix: "event-gateway", + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, }, WebSub: WebSubConfig{ VerificationTimeoutSeconds: 10, @@ -223,6 +227,8 @@ func mapEnvValue(path, value string) interface{} { "server.websocket_port", "server.admin_port", "server.metrics_port", + "kafka.compact_topic_partitions", + "kafka.compact_topic_replication_factor", "websub.verification_timeout_seconds", "websub.delivery_max_retries", "websub.delivery_initial_delay_ms", @@ -295,6 +301,12 @@ func validateKafkaConfig(kafkaCfg KafkaConfig) error { if len(kafkaCfg.Brokers) == 0 { return fmt.Errorf("kafka.brokers must contain at least one broker") } + if kafkaCfg.CompactTopicPartitions <= 0 { + return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", kafkaCfg.CompactTopicPartitions) + } + if kafkaCfg.CompactTopicReplicationFactor <= 0 { + return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", kafkaCfg.CompactTopicReplicationFactor) + } if kafkaCfg.TLS { if strings.TrimSpace(kafkaCfg.TLSCAFile) != "" { diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go index 914fa6c85c..045ce595fc 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "math" "os" "strings" @@ -34,29 +35,39 @@ import ( // ConnectionConfig holds the Kafka connection settings used by the driver. type ConnectionConfig struct { - Brokers []string - TLS bool - TLSCAFile string - TLSCertFile string - TLSKeyFile string - TLSServerName string - SASLMechanism string - SASLUsername string - SASLPassword string + Brokers []string + CompactTopicPartitions int + CompactTopicReplicationFactor int + TLS bool + TLSCAFile string + TLSCertFile string + TLSKeyFile string + TLSServerName string + SASLMechanism string + SASLUsername string + SASLPassword string } // ResolveConnectionConfig merges global runtime config with per-binding overrides. func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]interface{}) (ConnectionConfig, error) { cfg := ConnectionConfig{ - Brokers: append([]string(nil), global.Brokers...), - TLS: global.TLS, - TLSCAFile: global.TLSCAFile, - TLSCertFile: global.TLSCertFile, - TLSKeyFile: global.TLSKeyFile, - TLSServerName: global.TLSServerName, - SASLMechanism: global.SASLMechanism, - SASLUsername: global.SASLUsername, - SASLPassword: global.SASLPassword, + Brokers: append([]string(nil), global.Brokers...), + CompactTopicPartitions: global.CompactTopicPartitions, + CompactTopicReplicationFactor: global.CompactTopicReplicationFactor, + TLS: global.TLS, + TLSCAFile: global.TLSCAFile, + TLSCertFile: global.TLSCertFile, + TLSKeyFile: global.TLSKeyFile, + TLSServerName: global.TLSServerName, + SASLMechanism: global.SASLMechanism, + SASLUsername: global.SASLUsername, + SASLPassword: global.SASLPassword, + } + if cfg.CompactTopicPartitions <= 0 { + return ConnectionConfig{}, fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions) + } + if cfg.CompactTopicReplicationFactor <= 0 { + return ConnectionConfig{}, fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor) } if overrides != nil { @@ -65,6 +76,16 @@ func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]int } else if ok { cfg.Brokers = brokers } + if v, ok, err := intOverride(overrides["compact_topic_partitions"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.CompactTopicPartitions = v + } + if v, ok, err := intOverride(overrides["compact_topic_replication_factor"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.CompactTopicReplicationFactor = v + } if v, ok, err := boolOverride(overrides["tls"]); err != nil { return ConnectionConfig{}, err } else if ok { @@ -135,6 +156,18 @@ func validateConnectionConfig(cfg ConnectionConfig) error { if len(cfg.Brokers) == 0 { return fmt.Errorf("kafka brokers must not be empty") } + if cfg.CompactTopicPartitions <= 0 { + return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions) + } + if cfg.CompactTopicPartitions > math.MaxInt32 { + return fmt.Errorf("kafka.compact_topic_partitions must be <= %d, got %d", math.MaxInt32, cfg.CompactTopicPartitions) + } + if cfg.CompactTopicReplicationFactor <= 0 { + return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor) + } + if cfg.CompactTopicReplicationFactor > math.MaxInt16 { + return fmt.Errorf("kafka.compact_topic_replication_factor must be <= %d, got %d", math.MaxInt16, cfg.CompactTopicReplicationFactor) + } if !cfg.TLS { if cfg.TLSCAFile != "" || cfg.TLSCertFile != "" || cfg.TLSKeyFile != "" || cfg.TLSServerName != "" { @@ -264,6 +297,29 @@ func boolOverride(value interface{}) (bool, bool, error) { return v, true, nil } +func intOverride(value interface{}) (int, bool, error) { + if value == nil { + return 0, false, nil + } + switch v := value.(type) { + case int: + return v, true, nil + case float64: + if math.IsNaN(v) || math.IsInf(v, 0) { + return 0, false, fmt.Errorf("expected integer Kafka config override, got non-finite float64 %v", v) + } + if v != math.Trunc(v) { + return 0, false, fmt.Errorf("expected integer Kafka config override, got non-integer float64 %v", v) + } + if v < math.MinInt32 || v > math.MaxInt32 { + return 0, false, fmt.Errorf("expected integer Kafka config override within [%d, %d], got float64 %v", math.MinInt32, math.MaxInt32, v) + } + return int(v), true, nil + default: + return 0, false, fmt.Errorf("expected int override, got %T", value) + } +} + func stringOverride(value interface{}) (string, bool, error) { if value == nil { return "", false, nil diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go index 3a46a34f71..abe611092d 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go @@ -1,9 +1,11 @@ package kafka import ( + "math" "os" "path/filepath" "reflect" + "strings" "testing" runtimeconfig "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/config" @@ -17,13 +19,15 @@ func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) { } global := runtimeconfig.KafkaConfig{ - Brokers: []string{"broker-1:9092"}, - TLS: true, - TLSCAFile: caPath, - TLSServerName: "global-kafka", - SASLMechanism: "plain", - SASLUsername: "global-user", - SASLPassword: "global-pass", + Brokers: []string{"broker-1:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLS: true, + TLSCAFile: caPath, + TLSServerName: "global-kafka", + SASLMechanism: "plain", + SASLUsername: "global-user", + SASLPassword: "global-pass", } resolved, err := ResolveConnectionConfig(global, map[string]interface{}{ @@ -57,7 +61,10 @@ func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) { } func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) { - resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{ + resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + }, map[string]interface{}{ "brokers": []interface{}{"broker:9092"}, "sasl_mechanism": "plain", "sasl_username": " user-with-spaces ", @@ -77,8 +84,10 @@ func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) { func TestResolveConnectionConfig_RequiresTLSWhenTLSFilesAreConfigured(t *testing.T) { _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ - Brokers: []string{"broker:9092"}, - TLSCAFile: "/tmp/ca.crt", + Brokers: []string{"broker:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLSCAFile: "/tmp/ca.crt", }, nil) if err == nil { t.Fatalf("expected error when TLS files are set with TLS disabled") @@ -93,18 +102,22 @@ func TestResolveConnectionConfig_ValidatesReadableTLSFiles(t *testing.T) { } _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ - Brokers: []string{"broker:9092"}, - TLS: true, - TLSCAFile: caPath, + Brokers: []string{"broker:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLS: true, + TLSCAFile: caPath, }, nil) if err != nil { t.Fatalf("expected readable CA file to validate, got %v", err) } _, err = ResolveConnectionConfig(runtimeconfig.KafkaConfig{ - Brokers: []string{"broker:9092"}, - TLS: true, - TLSCAFile: filepath.Join(tempDir, "missing.crt"), + Brokers: []string{"broker:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLS: true, + TLSCAFile: filepath.Join(tempDir, "missing.crt"), }, nil) if err == nil { t.Fatalf("expected missing CA file to fail validation") @@ -112,7 +125,10 @@ func TestResolveConnectionConfig_ValidatesReadableTLSFiles(t *testing.T) { } func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) { - _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{ + _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + }, map[string]interface{}{ "brokers": []interface{}{"broker:9092"}, "sasl_mechanism": "scram-sha-512", "sasl_username": "user", @@ -121,3 +137,55 @@ func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) { t.Fatalf("expected missing SASL password to fail validation") } } + +func TestIntOverride_AcceptsIntegerFloat64(t *testing.T) { + got, ok, err := intOverride(float64(3)) + if err != nil { + t.Fatalf("expected integer float64 override to succeed, got %v", err) + } + if !ok { + t.Fatalf("expected integer float64 override to be accepted") + } + if got != 3 { + t.Fatalf("expected integer float64 override to convert to 3, got %d", got) + } +} + +func TestIntOverride_RejectsInvalidFloat64(t *testing.T) { + tests := []struct { + name string + value float64 + wantErr string + }{ + { + name: "non integer", + value: 3.5, + wantErr: "non-integer", + }, + { + name: "out of bounds", + value: float64(math.MaxInt32) + 1, + wantErr: "within", + }, + { + name: "non finite", + value: math.NaN(), + wantErr: "non-finite", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, ok, err := intOverride(tt.value) + if err == nil { + t.Fatalf("expected float64 override %v to fail", tt.value) + } + if ok { + t.Fatalf("expected invalid float64 override %v to be rejected", tt.value) + } + if !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("expected error %q to contain %q", err.Error(), tt.wantErr) + } + }) + } +} diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go index ffa77ffeb4..d8367526e4 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go @@ -123,9 +123,15 @@ func (e *KafkaBrokerDriver) EnsureTopics(ctx context.Context, topics []string) e // EnsureCompactedTopic creates a compacted topic if it does not already exist. func (e *KafkaBrokerDriver) EnsureCompactedTopic(ctx context.Context, topic string) error { - resp, err := e.admin.CreateTopics(ctx, 1, 1, map[string]*string{ - "cleanup.policy": kadm.StringPtr("compact"), - }, topic) + resp, err := e.admin.CreateTopics( + ctx, + int32(e.cfg.CompactTopicPartitions), + int16(e.cfg.CompactTopicReplicationFactor), + map[string]*string{ + "cleanup.policy": kadm.StringPtr("compact"), + }, + topic, + ) if err != nil { return fmt.Errorf("failed to create compacted topic %s: %w", topic, err) } diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go index 526a27c34c..bda53d5a0d 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "log/slog" + "sync" "time" "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/binding" @@ -55,6 +56,7 @@ type WebSubReceiver struct { syncProducer *subscription.SyncProducer brokerDriver connectors.BrokerDriver channel connectors.ChannelInfo + channelMu sync.RWMutex opts Options } @@ -98,17 +100,28 @@ func NewReceiver(cfg connectors.ReceiverConfig, opts Options) (connectors.Receiv verificationTimeout := time.Duration(opts.VerificationTimeoutSeconds) * time.Second + receiver := &WebSubReceiver{ + deliverer: deliverer, + topics: topics, + store: store, + consumerMgr: consumerMgr, + syncProducer: syncProducer, + brokerDriver: cfg.BrokerDriver, + channel: cfg.Channel, + opts: opts, + } + // Create HubHandler for subscribe/unsubscribe on {context}/{version}/hub. hubHandler := NewHubHandler( topics, store, verificationTimeout, opts.DefaultLeaseSeconds, cfg.Processor, cfg.BrokerDriver, cfg.Channel.Name, - cfg.Channel.Channels, consumerMgr, syncProducer, + cfg.Channel.Channels, &receiver.channelMu, consumerMgr, syncProducer, ) // Create WebhookReceiverHandler for ingress on {context}/{version}/webhook-receiver. webhookHandler := NewWebhookReceiverHandler( topics, cfg.Processor, cfg.BrokerDriver, - cfg.Channel.Name, cfg.Channel.Channels, + cfg.Channel.Name, cfg.Channel.Channels, &receiver.channelMu, ) // Register handlers on shared mux. @@ -116,18 +129,10 @@ func NewReceiver(cfg connectors.ReceiverConfig, opts Options) (connectors.Receiv cfg.Mux.Handle(basePath+"/hub", hubHandler) cfg.Mux.Handle(basePath+"/webhook-receiver", webhookHandler) - return &WebSubReceiver{ - hubHandler: hubHandler, - webhookHandler: webhookHandler, - deliverer: deliverer, - topics: topics, - store: store, - consumerMgr: consumerMgr, - syncProducer: syncProducer, - brokerDriver: cfg.BrokerDriver, - channel: cfg.Channel, - opts: opts, - }, nil + receiver.hubHandler = hubHandler + receiver.webhookHandler = webhookHandler + + return receiver, nil } // Start ensures Kafka topics exist and sets up the consumer manager context. diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go index f909444aa3..45e1598639 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go @@ -218,8 +218,8 @@ func (cm *ConsumerManager) createConsumer(groupID string, topics []string, callb } // consumerGroupID generates a unique, safe consumer group ID for a callback URL. -// Format: {prefix}-websub-{sha256(callbackURL)[:16]} +// Format: {prefix}-websub-{sha256(callbackURL)[:32]} func (cm *ConsumerManager) consumerGroupID(callbackURL string) string { h := sha256.Sum256([]byte(callbackURL)) - return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:16] + return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:32] } diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go new file mode 100644 index 0000000000..f687465c24 --- /dev/null +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package websub + +import ( + "context" + "fmt" + "log/slog" +) + +// ApplyBindingDelta mutates the live receiver for channel add/remove changes +// without recreating the receiver or the subscription sync topic. +func (e *WebSubReceiver) ApplyBindingDelta(ctx context.Context, removedChannels map[string]string, addedChannels map[string]string) error { + for channelName := range removedChannels { + subscriptions := e.store.GetByTopic(channelName) + for _, sub := range subscriptions { + if e.syncProducer != nil { + if err := e.syncProducer.PublishTombstone(ctx, channelName, sub.CallbackURL); err != nil { + return fmt.Errorf("failed to tombstone subscription for removed channel %q: %w", channelName, err) + } + } + } + } + + for channelName, kafkaTopic := range removedChannels { + e.topics.Deregister(channelName) + + subscriptions := e.store.GetByTopic(channelName) + for _, sub := range subscriptions { + if err := e.consumerMgr.RemoveSubscription(sub.CallbackURL, kafkaTopic); err != nil { + slog.Error("Failed to remove consumer for deleted WebSub channel", + "api", e.channel.Name, + "channel", channelName, + "callback", sub.CallbackURL, + "error", err) + } + if err := e.store.Remove(channelName, sub.CallbackURL); err != nil { + slog.Error("Failed to remove subscription for deleted WebSub channel", + "api", e.channel.Name, + "channel", channelName, + "callback", sub.CallbackURL, + "error", err) + } + } + + e.channelMu.Lock() + delete(e.channel.Channels, channelName) + e.channelMu.Unlock() + } + + for channelName, kafkaTopic := range addedChannels { + e.channelMu.Lock() + e.channel.Channels[channelName] = kafkaTopic + e.channelMu.Unlock() + e.topics.Register(channelName) + } + + return nil +} diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go index 02ac6a981e..b3c0e737e7 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go @@ -25,6 +25,7 @@ import ( "log/slog" "net/http" "strconv" + "sync" "time" "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" @@ -41,6 +42,7 @@ type HubHandler struct { brokerDriver connectors.BrokerDriver bindingName string channels map[string]string // channel-name → Kafka topic + channelMu *sync.RWMutex consumerMgr *ConsumerManager syncProducer *subscription.SyncProducer defaultLease int @@ -56,6 +58,7 @@ func NewHubHandler( brokerDriver connectors.BrokerDriver, bindingName string, channels map[string]string, + channelMu *sync.RWMutex, consumerMgr *ConsumerManager, syncProducer *subscription.SyncProducer, ) *HubHandler { @@ -67,6 +70,7 @@ func NewHubHandler( brokerDriver: brokerDriver, bindingName: bindingName, channels: channels, + channelMu: channelMu, consumerMgr: consumerMgr, syncProducer: syncProducer, defaultLease: defaultLease, @@ -131,7 +135,9 @@ func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { } // Resolve the Kafka topic for this channel. + h.channelMu.RLock() kafkaTopic, ok := h.channels[topic] + h.channelMu.RUnlock() if !ok { http.Error(w, fmt.Sprintf("no kafka topic for channel: %s", topic), http.StatusNotFound) return @@ -239,7 +245,9 @@ func (h *HubHandler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { } // Stop/update per-callback consumer. + h.channelMu.RLock() kafkaTopic := h.channels[topic] + h.channelMu.RUnlock() if kafkaTopic != "" { if err := h.consumerMgr.RemoveSubscription(callback, kafkaTopic); err != nil { slog.Error("Failed to update consumer on unsubscribe", "callback", callback, "error", err) @@ -271,6 +279,7 @@ type WebhookReceiverHandler struct { brokerDriver connectors.BrokerDriver bindingName string channels map[string]string // channel-name → Kafka topic + channelMu *sync.RWMutex } // NewWebhookReceiverHandler creates a new webhook receiver handler. @@ -280,6 +289,7 @@ func NewWebhookReceiverHandler( brokerDriver connectors.BrokerDriver, bindingName string, channels map[string]string, + channelMu *sync.RWMutex, ) *WebhookReceiverHandler { return &WebhookReceiverHandler{ topics: topics, @@ -287,6 +297,7 @@ func NewWebhookReceiverHandler( brokerDriver: brokerDriver, bindingName: bindingName, channels: channels, + channelMu: channelMu, } } @@ -309,7 +320,9 @@ func (h *WebhookReceiverHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques return } + h.channelMu.RLock() kafkaTopic, ok := h.channels[channelName] + h.channelMu.RUnlock() if !ok { http.Error(w, fmt.Sprintf("no kafka topic for channel: %s", channelName), http.StatusNotFound) return diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime.go b/event-gateway/gateway-runtime/internal/runtime/runtime.go index 490d279e1e..a324620dc5 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime.go @@ -25,6 +25,7 @@ import ( "net/http" "os" "path" + "reflect" "sync" "time" @@ -78,6 +79,11 @@ type managedServer struct { keyFile string } +type webSubBindingUpdater interface { + connectors.Receiver + ApplyBindingDelta(ctx context.Context, removedChannels map[string]string, addedChannels map[string]string) error +} + // New creates a new Runtime. After creation: // 1. Call Engine() to register policies // 2. Call LoadChannels() to parse bindings and create per-channel receiver+broker-driver pairs @@ -674,6 +680,119 @@ func (r *Runtime) unregisterBindingChains(b *hub.ChannelBinding) { } } +func webSubChannelTopicMap(wsb binding.WebSubApiBinding) map[string]string { + channels := make(map[string]string, len(wsb.Channels)) + for _, ch := range wsb.Channels { + channels[ch.Name] = binding.WebSubApiTopicName(wsb.Name, wsb.Version, ch.Name) + } + return channels +} + +func webSubTopicList(channels map[string]string, subscriptionTopic string) []string { + topics := make([]string, 0, len(channels)+1) + for _, kafkaTopic := range channels { + topics = append(topics, kafkaTopic) + } + if subscriptionTopic != "" { + topics = append(topics, subscriptionTopic) + } + return topics +} + +func diffChannelTopics(oldChannels, newChannels map[string]string) (map[string]string, map[string]string) { + removed := make(map[string]string) + added := make(map[string]string) + + for channelName, kafkaTopic := range oldChannels { + if _, exists := newChannels[channelName]; !exists { + removed[channelName] = kafkaTopic + } + } + for channelName, kafkaTopic := range newChannels { + if _, exists := oldChannels[channelName]; !exists { + added[channelName] = kafkaTopic + } + } + + return removed, added +} + +func webSubActiveChainKeys(wsb binding.WebSubApiBinding, vhost string) map[string]bool { + active := make(map[string]bool) + basePath := binding.WebSubApiBasePath(wsb.Context, wsb.Version) + hubPath := basePath + "/hub" + + if len(wsb.Policies.Subscribe) > 0 { + active[binding.GenerateRouteKey("SUBSCRIBE", hubPath, vhost)] = true + } + if len(wsb.Policies.Inbound) > 0 { + active[binding.GenerateRouteKey("SUB", basePath+"/webhook-receiver", vhost)] = true + } + if len(wsb.Policies.Outbound) > 0 { + active[binding.GenerateRouteKey("DELIVER", hubPath, vhost)] = true + } + + for _, ch := range wsb.Channels { + chPath := hubPath + "/" + ch.Name + if len(ch.Policies.Subscribe) > 0 { + active[binding.GenerateRouteKey("SUBSCRIBE", chPath, vhost)] = true + } + if len(ch.Policies.Inbound) > 0 { + active[binding.GenerateRouteKey("SUB", chPath, vhost)] = true + } + if len(ch.Policies.Outbound) > 0 { + active[binding.GenerateRouteKey("DELIVER", chPath, vhost)] = true + } + } + + return active +} + +func (r *Runtime) unregisterStaleBindingChains(b *hub.ChannelBinding, activeKeys map[string]bool) { + if b == nil { + return + } + keys := []string{b.SubscribeChainKey, b.InboundChainKey, b.OutboundChainKey} + for _, key := range keys { + if key == "" { + continue + } + if !activeKeys[key] { + r.engine.UnregisterChain(key) + } + } + for _, chKeys := range b.ChannelChainKeys { + channelKeys := []string{chKeys.SubscribeChainKey, chKeys.InboundChainKey, chKeys.OutboundChainKey} + for _, key := range channelKeys { + if key == "" { + continue + } + if !activeKeys[key] { + r.engine.UnregisterChain(key) + } + } + } +} + +func canDeltaUpdateWebSubBinding(oldWSB, newWSB binding.WebSubApiBinding) bool { + if oldWSB.Name != newWSB.Name { + return false + } + if oldWSB.Context != newWSB.Context { + return false + } + if oldWSB.Version != newWSB.Version { + return false + } + if !reflect.DeepEqual(oldWSB.Receiver, newWSB.Receiver) { + return false + } + if !reflect.DeepEqual(oldWSB.BrokerDriver, newWSB.BrokerDriver) { + return false + } + return true +} + // AddWebSubApiBinding dynamically adds a WebSubApi binding at runtime (xDS mode). func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { r.mu.Lock() @@ -726,12 +845,7 @@ func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { r.bindingPaths[wsb.Name] = []string{basePath + "/hub", basePath + "/webhook-receiver"} // Track all Kafka topics for cleanup on removal. - allTopics := make([]string, 0, len(channels)+1) - for _, kafkaTopic := range channels { - allTopics = append(allTopics, kafkaTopic) - } - allTopics = append(allTopics, internalSubTopic) - r.bindingTopics[wsb.Name] = allTopics + r.bindingTopics[wsb.Name] = webSubTopicList(channels, internalSubTopic) ch := connectors.ChannelInfo{ Name: wsb.Name, @@ -780,6 +894,120 @@ func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { return nil } +// UpdateWebSubApiBinding dynamically updates an existing WebSubApi binding at runtime (xDS mode). +func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding) error { + if !canDeltaUpdateWebSubBinding(oldWSB, newWSB) { + if err := r.RemoveWebSubApiBinding(oldWSB.Name); err != nil { + return err + } + return r.AddWebSubApiBinding(newWSB) + } + + r.mu.Lock() + + receiver, ok := r.activeReceivers[oldWSB.Name] + if !ok { + r.mu.Unlock() + return fmt.Errorf("active receiver not found for WebSubApi %q", oldWSB.Name) + } + updater, ok := receiver.(webSubBindingUpdater) + if !ok { + r.mu.Unlock() + return fmt.Errorf("receiver for WebSubApi %q does not support delta updates", oldWSB.Name) + } + + brokerDriver, ok := r.activeBrokerDrivers[oldWSB.Name] + if !ok { + r.mu.Unlock() + return fmt.Errorf("active broker-driver not found for WebSubApi %q", oldWSB.Name) + } + + oldChannels := webSubChannelTopicMap(oldWSB) + newChannels := webSubChannelTopicMap(newWSB) + removedChannels, addedChannels := diffChannelTopics(oldChannels, newChannels) + oldBinding := r.hub.GetBinding(oldWSB.Name) + r.mu.Unlock() + + vhost := defaultVhost(newWSB.Vhost) + subKey, inKey, outKey, chChainKeys, err := r.buildWebSubApiPolicyChains(newWSB, vhost) + if err != nil { + return fmt.Errorf("failed to build chains for updated WebSubApi %q: %w", newWSB.Name, err) + } + + if len(addedChannels) > 0 { + topicsToEnsure := make([]string, 0, len(addedChannels)) + for _, kafkaTopic := range addedChannels { + topicsToEnsure = append(topicsToEnsure, kafkaTopic) + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := brokerDriver.EnsureTopics(ctx, topicsToEnsure); err != nil { + cancel() + return fmt.Errorf("failed to ensure broker-driver topics during update for WebSubApi %q: %w", newWSB.Name, err) + } + cancel() + } + + if err := updater.ApplyBindingDelta(context.Background(), removedChannels, addedChannels); err != nil { + return fmt.Errorf("failed to apply WebSub delta update for %q: %w", newWSB.Name, err) + } + + if len(removedChannels) > 0 { + topicsToDelete := make([]string, 0, len(removedChannels)) + for _, kafkaTopic := range removedChannels { + topicsToDelete = append(topicsToDelete, kafkaTopic) + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := brokerDriver.DeleteTopics(ctx, topicsToDelete); err != nil { + slog.Error("Failed to delete broker-driver topics during delta update", + "name", newWSB.Name, + "error", err) + } + cancel() + } + + r.mu.Lock() + defer r.mu.Unlock() + + currentReceiver, ok := r.activeReceivers[oldWSB.Name] + if !ok || currentReceiver != receiver { + return fmt.Errorf("active receiver changed during WebSubApi update for %q", newWSB.Name) + } + currentBrokerDriver, ok := r.activeBrokerDrivers[oldWSB.Name] + if !ok || currentBrokerDriver != brokerDriver { + return fmt.Errorf("active broker-driver changed during WebSubApi update for %q", newWSB.Name) + } + if r.hub.GetBinding(oldWSB.Name) != oldBinding { + return fmt.Errorf("hub binding changed during WebSubApi update for %q", newWSB.Name) + } + + r.unregisterStaleBindingChains(oldBinding, webSubActiveChainKeys(newWSB, vhost)) + r.hub.RegisterBinding(hub.ChannelBinding{ + APIID: newWSB.APIID, + Name: newWSB.Name, + Mode: "websub", + Context: newWSB.Context, + Version: newWSB.Version, + Vhost: vhost, + SubscribeChainKey: subKey, + InboundChainKey: inKey, + OutboundChainKey: outKey, + Channels: newChannels, + ChannelChainKeys: chChainKeys, + }) + + internalSubTopic := r.webSubSubscriptionSyncTopic(newWSB.Name, newWSB.Version) + r.bindingTopics[newWSB.Name] = webSubTopicList(newChannels, internalSubTopic) + + slog.Info("Dynamically updated WebSubApi binding", + "name", newWSB.Name, + "added_channels", len(addedChannels), + "removed_channels", len(removedChannels), + "channels", len(newWSB.Channels), + ) + + return nil +} + // RemoveWebSubApiBinding dynamically removes a WebSubApi binding at runtime (xDS mode). func (r *Runtime) RemoveWebSubApiBinding(name string) error { r.mu.Lock() diff --git a/event-gateway/gateway-runtime/internal/xdsclient/handler.go b/event-gateway/gateway-runtime/internal/xdsclient/handler.go index 81e80743f0..eca82ff956 100644 --- a/event-gateway/gateway-runtime/internal/xdsclient/handler.go +++ b/event-gateway/gateway-runtime/internal/xdsclient/handler.go @@ -38,6 +38,7 @@ import ( type BindingManager interface { AddWebSubApiBinding(wsb binding.WebSubApiBinding) error RemoveWebSubApiBinding(name string) error + UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding) error } // KafkaConfig holds local Kafka broker settings used as defaults. @@ -187,15 +188,14 @@ func (h *Handler) HandleResources(ctx context.Context, resources []*discoveryv3. if reflect.DeepEqual(old, ecr) { continue } - // Update: remove then re-add slog.Info("Updating binding via xDS", "name", ecr.Name, "uuid", uuid) - if err := h.manager.RemoveWebSubApiBinding(old.Name); err != nil { - slog.Error("Failed to remove binding for update", "name", old.Name, "error", err) + if err := h.manager.UpdateWebSubApiBinding(h.toWebSubApiBinding(old), h.toWebSubApiBinding(ecr)); err != nil { + return fmt.Errorf("failed to update binding %q: %w", ecr.Name, err) } - } else { - slog.Info("Adding binding via xDS", "name", ecr.Name, "uuid", uuid) + continue } + slog.Info("Adding binding via xDS", "name", ecr.Name, "uuid", uuid) wsb := h.toWebSubApiBinding(ecr) if err := h.manager.AddWebSubApiBinding(wsb); err != nil { return fmt.Errorf("failed to add binding %q: %w", ecr.Name, err) diff --git a/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go b/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go index c497b174ce..f9cc1deac6 100644 --- a/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go +++ b/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go @@ -141,11 +141,17 @@ func TestHandlerHandleResources_UpdatesOnlyChangedBinding(t *testing.T) { t.Fatalf("updated HandleResources returned error: %v", err) } - if got := manager.removedNames(); len(got) != 1 || got[0] != "githubser" { - t.Fatalf("expected only githubser to be removed on change, got %#v", got) + if got := len(manager.updated); got != 1 { + t.Fatalf("expected exactly one update call, got %d", got) } - if got := manager.addedCount("githubser"); got != 2 { - t.Fatalf("expected githubser to be added twice, got %d", got) + if manager.updated[0].old.Name != "githubser" || manager.updated[0].new.Name != "githubser" { + t.Fatalf("unexpected update binding names: %#v", manager.updated[0]) + } + if got := manager.removedNames(); len(got) != 0 { + t.Fatalf("expected no removals on delta update, got %#v", got) + } + if got := manager.addedCount("githubser"); got != 1 { + t.Fatalf("expected githubser to be added once, got %d", got) } if got := manager.addedCount("gitlabser"); got != 1 { t.Fatalf("expected gitlabser to be added once, got %d", got) @@ -155,6 +161,12 @@ func TestHandlerHandleResources_UpdatesOnlyChangedBinding(t *testing.T) { type recordingBindingManager struct { added []string removed []string + updated []updatedBindingCall +} + +type updatedBindingCall struct { + old binding.WebSubApiBinding + new binding.WebSubApiBinding } func (m *recordingBindingManager) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { @@ -167,6 +179,11 @@ func (m *recordingBindingManager) RemoveWebSubApiBinding(name string) error { return nil } +func (m *recordingBindingManager) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding) error { + m.updated = append(m.updated, updatedBindingCall{old: oldWSB, new: newWSB}) + return nil +} + func (m *recordingBindingManager) addedNames() []string { out := append([]string(nil), m.added...) return out diff --git a/gateway/gateway-controller/pkg/api/handlers/handlers.go b/gateway/gateway-controller/pkg/api/handlers/handlers.go index cf71a5a057..e5dcb77113 100644 --- a/gateway/gateway-controller/pkg/api/handlers/handlers.go +++ b/gateway/gateway-controller/pkg/api/handlers/handlers.go @@ -46,6 +46,7 @@ import ( "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/secrets" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/restapi" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/websubapi" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" @@ -56,6 +57,7 @@ type APIServer struct { *RestAPIHandler // embedded — promotes CreateRestAPI, ListRestAPIs, GetRestAPIById, UpdateRestAPI, DeleteRestAPI restAPIService *restapi.RestAPIService + webSubAPIService *websubapi.WebSubAPIService store *storage.ConfigStore db storage.Storage snapshotManager *xds.SnapshotManager @@ -151,6 +153,17 @@ func NewAPIServer( subscriptionResourceService: subscriptionResourceService, } server.restAPIService = restAPIService + server.webSubAPIService = websubapi.NewWebSubAPIService( + db, + deploymentService, + controlPlaneClient, + systemConfig, + parser, + validator, + logger, + eventHub, + secretService, + ) server.RestAPIHandler = NewRestAPIHandler(restAPIService, logger) // Register status update callback diff --git a/gateway/gateway-controller/pkg/api/handlers/handlers_test.go b/gateway/gateway-controller/pkg/api/handlers/handlers_test.go index 7e24480bbe..d588f0a243 100644 --- a/gateway/gateway-controller/pkg/api/handlers/handlers_test.go +++ b/gateway/gateway-controller/pkg/api/handlers/handlers_test.go @@ -49,6 +49,7 @@ import ( policybuilder "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policy" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/restapi" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/websubapi" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" ) @@ -1019,6 +1020,17 @@ func createTestAPIServerWithDB(db storage.Storage) *APIServer { httpClient, parser, validator, logger, hub, nil, ) server.restAPIService = restAPIService + server.webSubAPIService = websubapi.NewWebSubAPIService( + db, + deploymentService, + nil, + systemCfg, + parser, + validator, + logger, + hub, + nil, + ) server.RestAPIHandler = NewRestAPIHandler(restAPIService, logger) return server @@ -1238,6 +1250,17 @@ func attachTestEventHub(server *APIServer, hub eventhub.EventHub, gatewayID stri server.restAPIService = restAPIService server.RestAPIHandler = NewRestAPIHandler(restAPIService, server.logger) } + server.webSubAPIService = websubapi.NewWebSubAPIService( + server.db, + server.deploymentService, + nil, + server.systemConfig, + server.parser, + server.validator, + server.logger, + hub, + nil, + ) } func seedAPIForAPIKeyHandlerTests(t *testing.T, server *APIServer, handle string) *models.StoredConfig { @@ -2999,6 +3022,84 @@ func TestGetLLMProxyByIdWithDeployedAt(t *testing.T) { assert.Equal(t, http.StatusOK, w.Code) } +func TestUpdateWebSubAPIUndeployUsesDedicatedServicePath(t *testing.T) { + server := createTestAPIServer() + mockDB := server.db.(*MockStorage) + mockHub := &mockEventHub{} + attachTestEventHub(server, mockHub, "test-gateway") + + cfg := &models.StoredConfig{ + UUID: "0000-websub-id-0000-000000000000", + Kind: string(api.WebSubAPIKindWebSubApi), + Handle: "repo-watcher-v1-0", + DisplayName: "repo-watcher", + Version: "v1.0", + Configuration: api.WebSubAPI{ + ApiVersion: api.WebSubAPIApiVersionGatewayApiPlatformWso2Comv1alpha1, + Kind: api.WebSubAPIKindWebSubApi, + Metadata: api.Metadata{ + Name: "repo-watcher-v1-0", + }, + Spec: api.WebhookAPIData{ + DisplayName: "repo-watcher", + Version: "v1.0", + Context: "/repos", + Hub: api.WebSubHub{ + Channels: []api.HubChannel{{Name: "issues"}}, + }, + }, + }, + SourceConfiguration: api.WebSubAPI{ + ApiVersion: api.WebSubAPIApiVersionGatewayApiPlatformWso2Comv1alpha1, + Kind: api.WebSubAPIKindWebSubApi, + Metadata: api.Metadata{ + Name: "repo-watcher-v1-0", + }, + Spec: api.WebhookAPIData{ + DisplayName: "repo-watcher", + Version: "v1.0", + Context: "/repos", + Hub: api.WebSubHub{ + Channels: []api.HubChannel{{Name: "issues"}}, + }, + }, + }, + DesiredState: models.StateDeployed, + Origin: models.OriginGatewayAPI, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + require.NoError(t, mockDB.SaveConfig(cfg)) + + body := []byte(`{ + "apiVersion":"gateway.api-platform.wso2.com/v1alpha1", + "kind":"WebSubApi", + "metadata":{"name":"repo-watcher-v1-0"}, + "spec":{ + "displayName":"repo-watcher", + "version":"v1.0", + "context":"/repos", + "deploymentState":"undeployed", + "hub":{"channels":[{"name":"issues"}]} + } + }`) + + c, w := createTestContext("PUT", "/websub-apis/repo-watcher-v1-0", body) + c.Request.Header.Set("Content-Type", "application/json") + + server.UpdateWebSubAPI(c, "repo-watcher-v1-0") + + assert.Equal(t, http.StatusOK, w.Code) + + updatedCfg, err := mockDB.GetConfig("0000-websub-id-0000-000000000000") + require.NoError(t, err) + assert.Equal(t, models.StateUndeployed, updatedCfg.DesiredState) + require.Len(t, mockHub.publishedEvents, 1) + assert.Equal(t, "UPDATE", mockHub.publishedEvents[0].event.Action) + assert.Equal(t, eventhub.EventTypeAPI, mockHub.publishedEvents[0].event.EventType) + assert.Equal(t, "0000-websub-id-0000-000000000000", mockHub.publishedEvents[0].event.EntityID) +} + // TestHandleStatusUpdateStoreError tests handleStatusUpdate with store error func TestHandleStatusUpdateStoreError(t *testing.T) { server := createTestAPIServer() diff --git a/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go b/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go index 66a54e2aa8..15b46a4144 100644 --- a/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go +++ b/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go @@ -20,6 +20,7 @@ package handlers import ( "context" + "errors" "fmt" "io" "log/slog" @@ -32,10 +33,13 @@ import ( api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/management" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/middleware" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/websubapi" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" ) +//TODO: Refactor to move business logic to service layer and keep handlers thin, focusing on request parsing, response formatting, and error handling. + // CreateWebSubAPI implements ServerInterface.CreateWebSubAPI // (POST /websub-apis) func (s *APIServer) CreateWebSubAPI(c *gin.Context) { @@ -182,31 +186,24 @@ func (s *APIServer) UpdateWebSubAPI(c *gin.Context, id string) { return } - existing, err := s.db.GetConfigByKindAndHandle(models.KindWebSubApi, handle) - if err != nil { - log.Warn("WebSub API configuration not found", - slog.String("handle", handle)) - c.JSON(http.StatusNotFound, api.ErrorResponse{ - Status: "error", - Message: fmt.Sprintf("WebSub API configuration with handle '%s' not found", handle), - }) - return - } - correlationID := middleware.GetCorrelationID(c) - result, err := s.deploymentService.DeployAPIConfiguration(utils.APIDeploymentParams{ - Data: body, + result, err := s.webSubAPIService.Update(websubapi.UpdateParams{ + Handle: handle, + Body: body, ContentType: c.GetHeader("Content-Type"), - Kind: "WebSubApi", - APIID: existing.UUID, - Origin: models.OriginGatewayAPI, CorrelationID: correlationID, Logger: log, }) if err != nil { log.Error("Failed to update WebSub API configuration", slog.Any("error", err)) - if storage.IsConflictError(err) { + if errors.Is(err, websubapi.ErrNotFound) { + c.JSON(http.StatusNotFound, api.ErrorResponse{ + Status: "error", + Message: fmt.Sprintf("WebSub API configuration with handle '%s' not found", handle), + }) + return + } else if storage.IsConflictError(err) { c.JSON(http.StatusConflict, api.ErrorResponse{ Status: "error", Message: err.Error(), @@ -223,7 +220,7 @@ func (s *APIServer) UpdateWebSubAPI(c *gin.Context, id string) { return } - updated := result.StoredConfig + updated := result.Config log.Info("WebSub API configuration updated", slog.String("id", updated.UUID), diff --git a/gateway/gateway-controller/pkg/service/websubapi/errors.go b/gateway/gateway-controller/pkg/service/websubapi/errors.go new file mode 100644 index 0000000000..3a44cba9d0 --- /dev/null +++ b/gateway/gateway-controller/pkg/service/websubapi/errors.go @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package websubapi + +import ( + "errors" + "fmt" + + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" +) + +var ( + // ErrNotFound is returned when a WebSub API is not found. + ErrNotFound = errors.New("websub api not found") +) + +// ValidationError wraps configuration validation errors. +type ValidationError struct { + Errors []config.ValidationError +} + +func (e *ValidationError) Error() string { + return fmt.Sprintf("configuration validation failed (%d errors)", len(e.Errors)) +} + +// ParseError wraps a configuration parse failure. +type ParseError struct { + Cause error +} + +func (e *ParseError) Error() string { + return fmt.Sprintf("failed to parse configuration: %v", e.Cause) +} + +func (e *ParseError) Unwrap() error { + return e.Cause +} + +// HandleMismatchError is returned when the path handle doesn't match metadata.name. +type HandleMismatchError struct { + PathHandle string + YAMLHandle string +} + +func (e *HandleMismatchError) Error() string { + return fmt.Sprintf("handle mismatch: path has '%s' but YAML metadata.name has '%s'", e.PathHandle, e.YAMLHandle) +} diff --git a/gateway/gateway-controller/pkg/service/websubapi/service.go b/gateway/gateway-controller/pkg/service/websubapi/service.go new file mode 100644 index 0000000000..4a9208aca5 --- /dev/null +++ b/gateway/gateway-controller/pkg/service/websubapi/service.go @@ -0,0 +1,265 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package websubapi + +import ( + "fmt" + "log/slog" + "strings" + "time" + + "github.com/wso2/api-platform/common/eventhub" + api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/management" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/controlplane" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/templateengine" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/templateengine/funcs" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" +) + +// UpdateParams holds parameters for the Update operation. +type UpdateParams struct { + Handle string + Body []byte + ContentType string + CorrelationID string + Logger *slog.Logger +} + +// UpdateResult holds the result of an Update operation. +type UpdateResult struct { + Config *models.StoredConfig +} + +// WebSubAPIService encapsulates WebSub API update behavior. +type WebSubAPIService struct { + db storage.Storage + deploymentService *utils.APIDeploymentService + controlPlaneClient controlplane.ControlPlaneClient + systemConfig *config.Config + parser *config.Parser + validator config.Validator + logger *slog.Logger + eventHub eventhub.EventHub + secretResolver funcs.SecretResolver +} + +// NewWebSubAPIService creates a new WebSubAPIService. +func NewWebSubAPIService( + db storage.Storage, + deploymentService *utils.APIDeploymentService, + controlPlaneClient controlplane.ControlPlaneClient, + systemConfig *config.Config, + parser *config.Parser, + validator config.Validator, + logger *slog.Logger, + eventHub eventhub.EventHub, + secretResolver funcs.SecretResolver, +) *WebSubAPIService { + if db == nil { + panic("WebSubAPIService requires non-nil storage") + } + if deploymentService == nil { + panic("WebSubAPIService requires APIDeploymentService") + } + if eventHub == nil { + panic("WebSubAPIService requires non-nil EventHub") + } + if systemConfig == nil { + panic("WebSubAPIService requires non-nil system config") + } + if strings.TrimSpace(systemConfig.Controller.Server.GatewayID) == "" { + panic("WebSubAPIService requires non-empty gateway ID") + } + + return &WebSubAPIService{ + db: db, + deploymentService: deploymentService, + controlPlaneClient: controlPlaneClient, + systemConfig: systemConfig, + parser: parser, + validator: validator, + logger: logger, + eventHub: eventHub, + secretResolver: secretResolver, + } +} + +// Update modifies an existing WebSub API configuration. +func (s *WebSubAPIService) Update(params UpdateParams) (*UpdateResult, error) { + log := params.Logger + + var apiConfig api.WebSubAPI + if err := s.parser.Parse(params.Body, params.ContentType, &apiConfig); err != nil { + return nil, &ParseError{Cause: err} + } + + if apiConfig.Metadata.Name != "" && apiConfig.Metadata.Name != params.Handle { + return nil, &HandleMismatchError{ + PathHandle: params.Handle, + YAMLHandle: apiConfig.Metadata.Name, + } + } + + existing, err := s.db.GetConfigByKindAndHandle(models.KindWebSubApi, params.Handle) + if err != nil { + if storage.IsNotFoundError(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("failed to retrieve existing websub api: %w", err) + } + + desiredState := models.StateDeployed + if apiConfig.Spec.DeploymentState != nil && + *apiConfig.Spec.DeploymentState == api.WebhookAPIDataDeploymentStateUndeployed { + desiredState = models.StateUndeployed + } + + if desiredState != models.StateUndeployed { + result, err := s.deploymentService.DeployAPIConfiguration(utils.APIDeploymentParams{ + Data: params.Body, + ContentType: params.ContentType, + Kind: "WebSubApi", + APIID: existing.UUID, + Origin: models.OriginGatewayAPI, + CorrelationID: params.CorrelationID, + Logger: log, + }) + if err != nil { + return nil, err + } + return &UpdateResult{Config: result.StoredConfig}, nil + } + + if apiConfig.Metadata.Name == "" { + apiConfig.Metadata.Name = params.Handle + } + + existing.Configuration = apiConfig + existing.SourceConfiguration = apiConfig + + renderedExisting := *existing + if err := templateengine.RenderSpec(&renderedExisting, s.secretResolver, log); err != nil { + return nil, err + } + + renderedConfig, ok := renderedExisting.Configuration.(api.WebSubAPI) + if !ok { + return nil, fmt.Errorf("failed to render websub api configuration") + } + + validationErrors := s.validator.Validate(&renderedConfig) + if len(validationErrors) > 0 { + return nil, &ValidationError{Errors: validationErrors} + } + + if err := s.validateArtifactConflicts(existing.UUID, renderedConfig.Spec.DisplayName, renderedConfig.Spec.Version, existing.Handle); err != nil { + return nil, err + } + + now := time.Now() + existing.DisplayName = renderedConfig.Spec.DisplayName + existing.Version = renderedConfig.Spec.Version + existing.DesiredState = desiredState + existing.UpdatedAt = now + + truncatedNow := now.Truncate(time.Millisecond) + existing.DeployedAt = &truncatedNow + + if existing.Origin == models.OriginGatewayAPI { + existing.CPSyncStatus = models.CPSyncStatusPending + } + + if err := s.db.UpdateConfig(existing); err != nil { + log.Error("Failed to update WebSub API config in database", slog.Any("error", err)) + return nil, fmt.Errorf("failed to persist configuration update: %w", err) + } + + s.publishEvent(eventhub.EventTypeAPI, "UPDATE", existing.UUID, params.CorrelationID, log) + + if existing.Origin == models.OriginGatewayAPI && s.controlPlaneClient != nil && s.controlPlaneClient.IsConnected() && s.controlPlaneClient.IsOnPrem() { + go func() { + if err := s.controlPlaneClient.SyncArtifactsToOnPremAPIM(s.controlPlaneClient.GetAPIMConfig()); err != nil { + log.Error("Failed to sync WebSub API to on-prem APIM", slog.Any("error", err)) + } + }() + } + + log.Info("WebSub API configuration updated", + slog.String("id", existing.UUID), + slog.String("handle", params.Handle), + slog.String("desired_state", string(desiredState))) + + return &UpdateResult{Config: existing}, nil +} + +func (s *WebSubAPIService) validateArtifactConflicts(currentID, displayName, version, handle string) error { + existingByNameVersion, err := s.db.GetConfigByKindNameAndVersion(models.KindWebSubApi, displayName, version) + if err == nil { + if existingByNameVersion != nil && existingByNameVersion.UUID != currentID { + return fmt.Errorf("%w: configuration with name '%s' and version '%s' already exists", + storage.ErrConflict, displayName, version) + } + } else if !storage.IsNotFoundError(err) { + return fmt.Errorf("failed to check existing WebSubApi name/version conflict: %w", err) + } + + existingByHandle, err := s.db.GetConfigByKindAndHandle(models.KindWebSubApi, handle) + if err == nil { + if existingByHandle != nil && existingByHandle.UUID != currentID { + return fmt.Errorf("%w: configuration with handle '%s' already exists", + storage.ErrConflict, handle) + } + } else if !storage.IsNotFoundError(err) { + return fmt.Errorf("failed to check existing WebSubApi handle conflict: %w", err) + } + + return nil +} + +func (s *WebSubAPIService) publishEvent(eventType eventhub.EventType, action, entityID, correlationID string, logger *slog.Logger) { + gatewayID := strings.TrimSpace(s.systemConfig.Controller.Server.GatewayID) + event := eventhub.Event{ + GatewayID: gatewayID, + OriginatedTimestamp: time.Now(), + EventType: eventType, + Action: action, + EntityID: entityID, + EventID: correlationID, + EventData: eventhub.EmptyEventData, + } + + if err := s.eventHub.PublishEvent(gatewayID, event); err != nil { + logger.Warn("Failed to publish event to event hub", + slog.String("gateway_id", gatewayID), + slog.String("event_type", string(eventType)), + slog.String("action", action), + slog.String("entity_id", entityID), + slog.Any("error", err)) + return + } + + logger.Debug("Published event to event hub", + slog.String("gateway_id", gatewayID), + slog.String("event_type", string(eventType)), + slog.String("action", action), + slog.String("entity_id", entityID)) +}