diff --git a/event-gateway/gateway-runtime/go.mod b/event-gateway/gateway-runtime/go.mod index fe2f29b4d..766a19f06 100644 --- a/event-gateway/gateway-runtime/go.mod +++ b/event-gateway/gateway-runtime/go.mod @@ -10,6 +10,7 @@ require ( github.com/knadh/koanf/providers/env v1.1.0 github.com/knadh/koanf/providers/file v1.2.1 github.com/knadh/koanf/v2 v2.3.0 + github.com/stretchr/testify v1.11.1 github.com/twmb/franz-go v1.18.1 github.com/twmb/franz-go/pkg/kadm v1.14.0 github.com/wso2/api-platform/common v0.0.0 @@ -26,6 +27,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/envoyproxy/protoc-gen-validate v1.3.0 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect @@ -39,6 +41,7 @@ require ( github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go index f687465c2..e203534bd 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go @@ -20,6 +20,7 @@ package websub import ( "context" + "errors" "fmt" "log/slog" ) @@ -27,12 +28,20 @@ import ( // ApplyBindingDelta mutates the live receiver for channel add/remove changes // without recreating the receiver or the subscription sync topic. func (e *WebSubReceiver) ApplyBindingDelta(ctx context.Context, removedChannels map[string]string, addedChannels map[string]string) error { + var tombstoneErrs []error + for channelName := range removedChannels { subscriptions := e.store.GetByTopic(channelName) for _, sub := range subscriptions { if e.syncProducer != nil { if err := e.syncProducer.PublishTombstone(ctx, channelName, sub.CallbackURL); err != nil { - return fmt.Errorf("failed to tombstone subscription for removed channel %q: %w", channelName, err) + slog.Error("Failed to tombstone subscription for removed WebSub channel", + "api", e.channel.Name, + "channel", channelName, + "callback", sub.CallbackURL, + "error", err) + tombstoneErrs = append(tombstoneErrs, + fmt.Errorf("failed to tombstone subscription for removed channel %q callback %q: %w", channelName, sub.CallbackURL, err)) } } } @@ -71,5 +80,9 @@ func (e *WebSubReceiver) ApplyBindingDelta(ctx context.Context, removedChannels e.topics.Register(channelName) } + if len(tombstoneErrs) > 0 { + return errors.Join(tombstoneErrs...) + } + return nil } diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta_test.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta_test.go new file mode 100644 index 000000000..0a62b6f40 --- /dev/null +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta_test.go @@ -0,0 +1,162 @@ +package websub + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" + "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/subscription" +) + +type deltaTestBrokerDriver struct { + publishErrs map[string]error + published []string +} + +func (d *deltaTestBrokerDriver) Publish(_ context.Context, topic string, msg *connectors.Message) error { + key := string(msg.Key) + d.published = append(d.published, key) + if err, ok := d.publishErrs[key]; ok { + return err + } + return nil +} + +func (d *deltaTestBrokerDriver) Subscribe(string, []string, connectors.MessageHandler) (connectors.Receiver, error) { + return nil, nil +} + +func (d *deltaTestBrokerDriver) SubscribeManual(string, []string, connectors.MessageHandler) (connectors.Receiver, error) { + return nil, nil +} + +func (d *deltaTestBrokerDriver) Replay(context.Context, string, connectors.MessageHandler) error { + return nil +} + +func (d *deltaTestBrokerDriver) TopicExists(context.Context, string) (bool, error) { + return false, nil +} + +func (d *deltaTestBrokerDriver) EnsureTopics(context.Context, []string) error { + return nil +} + +func (d *deltaTestBrokerDriver) EnsureCompactedTopic(context.Context, string) error { + return nil +} + +func (d *deltaTestBrokerDriver) DeleteTopics(context.Context, []string) error { + return nil +} + +func (d *deltaTestBrokerDriver) Close() error { + return nil +} + +type deltaTestReceiver struct { + stopCalls int +} + +func (r *deltaTestReceiver) Start(context.Context) error { + return nil +} + +func (r *deltaTestReceiver) Stop(context.Context) error { + r.stopCalls++ + return nil +} + +func TestApplyBindingDeltaContinuesAfterTombstoneFailure(t *testing.T) { + store := subscription.NewInMemoryStore("runtime-1") + require.NoError(t, store.Add(&subscription.Subscription{ + ID: "sub-1", + Topic: "removed-a", + CallbackURL: "https://callback-a.example", + State: subscription.StateActive, + })) + require.NoError(t, store.Add(&subscription.Subscription{ + ID: "sub-2", + Topic: "removed-b", + CallbackURL: "https://callback-b.example", + State: subscription.StateActive, + })) + + topics := NewTopicRegistry() + topics.Register("removed-a") + topics.Register("removed-b") + + consumerA := &deltaTestReceiver{} + consumerB := &deltaTestReceiver{} + consumerMgr := &ConsumerManager{ + consumers: map[string]*managedConsumer{ + "https://callback-a.example": { + consumer: consumerA, + topics: map[string]bool{"kafka-a": true}, + }, + "https://callback-b.example": { + consumer: consumerB, + topics: map[string]bool{"kafka-b": true}, + }, + }, + } + + driver := &deltaTestBrokerDriver{ + publishErrs: map[string]error{ + "removed-a:https://callback-a.example": errors.New("boom"), + }, + } + + receiver := &WebSubReceiver{ + topics: topics, + store: store, + consumerMgr: consumerMgr, + syncProducer: subscription.NewSyncProducer( + driver, + "runtime-1", + "internal-sub-topic", + ), + channel: connectors.ChannelInfo{ + Name: "test-api", + Channels: map[string]string{ + "removed-a": "kafka-a", + "removed-b": "kafka-b", + }, + }, + } + + err := receiver.ApplyBindingDelta(context.Background(), + map[string]string{ + "removed-a": "kafka-a", + "removed-b": "kafka-b", + }, + map[string]string{ + "added-c": "kafka-c", + }, + ) + + require.Error(t, err) + require.ErrorContains(t, err, `failed to tombstone subscription for removed channel "removed-a"`) + + require.ElementsMatch(t, []string{ + "removed-a:https://callback-a.example", + "removed-b:https://callback-b.example", + }, driver.published) + + require.Nil(t, store.GetByTopic("removed-a")) + require.Nil(t, store.GetByTopic("removed-b")) + + require.Equal(t, 1, consumerA.stopCalls) + require.Equal(t, 1, consumerB.stopCalls) + require.Empty(t, consumerMgr.consumers) + + require.NotContains(t, receiver.channel.Channels, "removed-a") + require.NotContains(t, receiver.channel.Channels, "removed-b") + require.Equal(t, "kafka-c", receiver.channel.Channels["added-c"]) + + require.False(t, topics.IsRegistered("removed-a")) + require.False(t, topics.IsRegistered("removed-b")) + require.True(t, topics.IsRegistered("added-c")) +} diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime.go b/event-gateway/gateway-runtime/internal/runtime/runtime.go index 608cb5370..00881b181 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime.go @@ -20,12 +20,14 @@ package runtime import ( "context" + "encoding/json" "fmt" "log/slog" "net/http" "os" "path" "reflect" + "sort" "sync" "time" @@ -64,6 +66,8 @@ type Runtime struct { activeBrokerDrivers map[string]connectors.BrokerDriver bindingPaths map[string][]string // name → registered mux paths bindingTopics map[string][]string // name → Kafka topics (data + internal sub) + bindingMutationMu sync.Mutex + bindingMutationLocks map[string]*sync.Mutex websubMux *DynamicMux websubServer *managedServer webSubServersCreated bool // true if LoadChannels created WebSub servers @@ -111,6 +115,7 @@ func New(cfg *config.Config, rawConfig map[string]interface{}, registry *connect activeBrokerDrivers: make(map[string]connectors.BrokerDriver), bindingPaths: make(map[string][]string), bindingTopics: make(map[string][]string), + bindingMutationLocks: make(map[string]*sync.Mutex), websubMux: NewDynamicMux(), }, nil } @@ -742,6 +747,9 @@ func webSubActiveChainKeys(wsb binding.WebSubApiBinding, vhost string) map[strin if len(wsb.Policies.Subscribe) > 0 { active[binding.GenerateRouteKey("SUBSCRIBE", hubPath, vhost)] = true } + if len(wsb.Policies.Unsubscribe) > 0 { + active[binding.GenerateRouteKey("UNSUBSCRIBE", hubPath, vhost)] = true + } if len(wsb.Policies.Inbound) > 0 { active[binding.GenerateRouteKey("SUB", basePath+"/webhook-receiver", vhost)] = true } @@ -754,6 +762,9 @@ func webSubActiveChainKeys(wsb binding.WebSubApiBinding, vhost string) map[strin if len(ch.Policies.Subscribe) > 0 { active[binding.GenerateRouteKey("SUBSCRIBE", chPath, vhost)] = true } + if len(ch.Policies.Unsubscribe) > 0 { + active[binding.GenerateRouteKey("UNSUBSCRIBE", chPath, vhost)] = true + } if len(ch.Policies.Inbound) > 0 { active[binding.GenerateRouteKey("SUB", chPath, vhost)] = true } @@ -769,7 +780,7 @@ func (r *Runtime) unregisterStaleBindingChains(b *hub.ChannelBinding, activeKeys if b == nil { return } - keys := []string{b.SubscribeChainKey, b.InboundChainKey, b.OutboundChainKey} + keys := []string{b.SubscribeChainKey, b.UnsubscribeChainKey, b.InboundChainKey, b.OutboundChainKey} for _, key := range keys { if key == "" { continue @@ -779,7 +790,7 @@ func (r *Runtime) unregisterStaleBindingChains(b *hub.ChannelBinding, activeKeys } } for _, chKeys := range b.ChannelChainKeys { - channelKeys := []string{chKeys.SubscribeChainKey, chKeys.InboundChainKey, chKeys.OutboundChainKey} + channelKeys := []string{chKeys.SubscribeChainKey, chKeys.UnsubscribeChainKey, chKeys.InboundChainKey, chKeys.OutboundChainKey} for _, key := range channelKeys { if key == "" { continue @@ -804,14 +815,69 @@ func canDeltaUpdateWebSubBinding(oldWSB, newWSB binding.WebSubApiBinding) bool { if !reflect.DeepEqual(oldWSB.Receiver, newWSB.Receiver) { return false } - if !reflect.DeepEqual(oldWSB.BrokerDriver, newWSB.BrokerDriver) { + if !equalBrokerDriverSpec(oldWSB.BrokerDriver, newWSB.BrokerDriver) { return false } return true } +func equalBrokerDriverSpec(oldSpec, newSpec binding.BrokerDriverSpec) bool { + if oldSpec.Type != newSpec.Type || oldSpec.Topic != newSpec.Topic || oldSpec.Ordering != newSpec.Ordering { + return false + } + + oldConfigJSON, oldErr := json.Marshal(oldSpec.Config) + newConfigJSON, newErr := json.Marshal(newSpec.Config) + if oldErr != nil || newErr != nil { + return reflect.DeepEqual(oldSpec.Config, newSpec.Config) + } + + return string(oldConfigJSON) == string(newConfigJSON) +} + +func (r *Runtime) lockBindingMutations(names ...string) func() { + seen := make(map[string]bool, len(names)) + ordered := make([]string, 0, len(names)) + for _, name := range names { + if name == "" || seen[name] { + continue + } + seen[name] = true + ordered = append(ordered, name) + } + sort.Strings(ordered) + + locks := make([]*sync.Mutex, 0, len(ordered)) + r.bindingMutationMu.Lock() + for _, name := range ordered { + lock, ok := r.bindingMutationLocks[name] + if !ok { + lock = &sync.Mutex{} + r.bindingMutationLocks[name] = lock + } + locks = append(locks, lock) + } + r.bindingMutationMu.Unlock() + + for _, lock := range locks { + lock.Lock() + } + + return func() { + for i := len(locks) - 1; i >= 0; i-- { + locks[i].Unlock() + } + } +} + // AddWebSubApiBinding dynamically adds a WebSubApi binding at runtime (xDS mode). func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { + unlock := r.lockBindingMutations(wsb.Name) + defer unlock() + return r.addWebSubApiBindingLocked(wsb) +} + +func (r *Runtime) addWebSubApiBindingLocked(wsb binding.WebSubApiBinding) error { r.mu.Lock() vhost := defaultVhost(wsb.Vhost) @@ -914,11 +980,14 @@ func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { // UpdateWebSubApiBinding dynamically updates an existing WebSubApi binding at runtime (xDS mode). func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding) error { + unlock := r.lockBindingMutations(oldWSB.Name, newWSB.Name) + defer unlock() + if !canDeltaUpdateWebSubBinding(oldWSB, newWSB) { - if err := r.RemoveWebSubApiBinding(oldWSB.Name); err != nil { + if err := r.removeWebSubApiBindingLocked(oldWSB.Name); err != nil { return err } - return r.AddWebSubApiBinding(newWSB) + return r.addWebSubApiBindingLocked(newWSB) } r.mu.Lock() @@ -940,14 +1009,15 @@ func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding return fmt.Errorf("active broker-driver not found for WebSubApi %q", oldWSB.Name) } - oldChannels := webSubChannelTopicMap(oldWSB) - newChannels := webSubChannelTopicMap(newWSB) - removedChannels, addedChannels := diffChannelTopics(oldChannels, newChannels) - oldBinding := r.hub.GetBinding(oldWSB.Name) - r.mu.Unlock() + oldChannels := webSubChannelTopicMap(oldWSB) + newChannels := webSubChannelTopicMap(newWSB) + removedChannels, addedChannels := diffChannelTopics(oldChannels, newChannels) + oldBinding := r.hub.GetBinding(oldWSB.Name) + updateCtx := r.runCtx + r.mu.Unlock() vhost := defaultVhost(newWSB.Vhost) - subKey, inKey, outKey, chChainKeys, err := r.buildWebSubApiPolicyChains(newWSB, vhost) + subKey, unsubKey, inKey, outKey, chChainKeys, err := r.buildWebSubApiPolicyChains(newWSB, vhost) if err != nil { return fmt.Errorf("failed to build chains for updated WebSubApi %q: %w", newWSB.Name, err) } @@ -965,9 +1035,12 @@ func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding cancel() } - if err := updater.ApplyBindingDelta(context.Background(), removedChannels, addedChannels); err != nil { - return fmt.Errorf("failed to apply WebSub delta update for %q: %w", newWSB.Name, err) - } + if updateCtx == nil { + updateCtx = context.Background() + } + if err := updater.ApplyBindingDelta(updateCtx, removedChannels, addedChannels); err != nil { + return fmt.Errorf("failed to apply WebSub delta update for %q: %w", newWSB.Name, err) + } if len(removedChannels) > 0 { topicsToDelete := make([]string, 0, len(removedChannels)) @@ -1007,6 +1080,7 @@ func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding Version: newWSB.Version, Vhost: vhost, SubscribeChainKey: subKey, + UnsubscribeChainKey: unsubKey, InboundChainKey: inKey, OutboundChainKey: outKey, Channels: newChannels, @@ -1028,6 +1102,12 @@ func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding // RemoveWebSubApiBinding dynamically removes a WebSubApi binding at runtime (xDS mode). func (r *Runtime) RemoveWebSubApiBinding(name string) error { + unlock := r.lockBindingMutations(name) + defer unlock() + return r.removeWebSubApiBindingLocked(name) +} + +func (r *Runtime) removeWebSubApiBindingLocked(name string) error { r.mu.Lock() defer r.mu.Unlock() diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go index 8b8350c1b..89cfa09cc 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime_test.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime_test.go @@ -164,6 +164,116 @@ func TestWebSubSubscriptionSyncTopic_FallsBackToDerivedTopic(t *testing.T) { } } +func TestWebSubActiveChainKeys_IncludesUnsubscribeKeys(t *testing.T) { + wsb := binding.WebSubApiBinding{ + Context: "/repos", + Version: "v1.0", + Policies: binding.PolicyBindings{ + Subscribe: []binding.PolicyRef{{Name: "sub", Version: "v1.0.0"}}, + Unsubscribe: []binding.PolicyRef{{Name: "unsub", Version: "v1.0.0"}}, + }, + Channels: []binding.ChannelDef{ + { + Name: "issues", + Policies: binding.PolicyBindings{ + Subscribe: []binding.PolicyRef{{Name: "sub", Version: "v1.0.0"}}, + Unsubscribe: []binding.PolicyRef{{Name: "unsub", Version: "v1.0.0"}}, + }, + }, + }, + } + + vhost := defaultVhost("") + basePath := binding.WebSubApiBasePath(wsb.Context, wsb.Version) + hubPath := basePath + "/hub" + channelPath := hubPath + "/issues" + + active := webSubActiveChainKeys(wsb, vhost) + + if !active[binding.GenerateRouteKey("SUBSCRIBE", hubPath, vhost)] { + t.Fatal("expected hub-level subscribe key to be active") + } + if !active[binding.GenerateRouteKey("UNSUBSCRIBE", hubPath, vhost)] { + t.Fatal("expected hub-level unsubscribe key to be active") + } + if !active[binding.GenerateRouteKey("SUBSCRIBE", channelPath, vhost)] { + t.Fatal("expected channel-level subscribe key to be active") + } + if !active[binding.GenerateRouteKey("UNSUBSCRIBE", channelPath, vhost)] { + t.Fatal("expected channel-level unsubscribe key to be active") + } +} + +func TestUnregisterStaleBindingChains_RemovesStaleUnsubscribeKeys(t *testing.T) { + eng, err := enginepkg.New(nil) + if err != nil { + t.Fatalf("failed to create engine: %v", err) + } + if err := eng.RegisterPolicy(&policy.PolicyDefinition{ + Name: "test-noop", + Version: "v1.0.0", + }, newTestNoopPolicy); err != nil { + t.Fatalf("failed to register test policy: %v", err) + } + + registerChain := func(routeKey string) { + chain, err := eng.BuildChain(routeKey, []enginepkg.PolicySpec{{ + Name: "test-noop", + Version: "v1", + Enabled: true, + }}) + if err != nil { + t.Fatalf("failed to build chain %q: %v", routeKey, err) + } + eng.RegisterChain(routeKey, chain) + } + + bindingState := &hub.ChannelBinding{ + SubscribeChainKey: "hub-sub", + UnsubscribeChainKey: "hub-unsub", + InboundChainKey: "hub-in", + OutboundChainKey: "hub-out", + ChannelChainKeys: map[string]hub.ChannelChainKeySet{ + "issues": { + SubscribeChainKey: "ch-sub", + UnsubscribeChainKey: "ch-unsub", + InboundChainKey: "ch-in", + OutboundChainKey: "ch-out", + }, + }, + } + + for _, key := range []string{ + bindingState.SubscribeChainKey, + bindingState.UnsubscribeChainKey, + bindingState.InboundChainKey, + bindingState.OutboundChainKey, + "ch-sub", "ch-unsub", "ch-in", "ch-out", + } { + registerChain(key) + } + + rt := &Runtime{engine: eng} + rt.unregisterStaleBindingChains(bindingState, map[string]bool{ + "hub-sub": true, + "hub-in": true, + "hub-out": true, + "ch-sub": true, + "ch-in": true, + "ch-out": true, + }) + + if eng.GetChain("hub-unsub") != nil { + t.Fatal("expected stale hub unsubscribe chain to be removed") + } + if eng.GetChain("ch-unsub") != nil { + t.Fatal("expected stale channel unsubscribe chain to be removed") + } + if eng.GetChain("hub-sub") == nil || eng.GetChain("ch-sub") == nil { + t.Fatal("expected active subscribe chains to remain registered") + } +} + func TestStartReceiverWithRetry_RetriesUntilSuccess(t *testing.T) { previousInitial := initialReceiverStartBackoff previousMax := maxReceiverStartBackoff