From cfc3efe73dd1982eaae3deceff1932702a83dd81 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Fri, 8 May 2026 10:32:52 +0530 Subject: [PATCH 01/19] Fix websub undeploy, update gateway websub handler to use websub service layer --- .../pkg/api/handlers/handlers.go | 13 + .../pkg/api/handlers/handlers_test.go | 86 ++++++ .../pkg/api/handlers/websub_api_handler.go | 32 +-- .../pkg/service/websubapi/errors.go | 63 +++++ .../pkg/service/websubapi/service.go | 260 ++++++++++++++++++ 5 files changed, 436 insertions(+), 18 deletions(-) create mode 100644 gateway/gateway-controller/pkg/service/websubapi/errors.go create mode 100644 gateway/gateway-controller/pkg/service/websubapi/service.go diff --git a/gateway/gateway-controller/pkg/api/handlers/handlers.go b/gateway/gateway-controller/pkg/api/handlers/handlers.go index cf71a5a057..e5dcb77113 100644 --- a/gateway/gateway-controller/pkg/api/handlers/handlers.go +++ b/gateway/gateway-controller/pkg/api/handlers/handlers.go @@ -46,6 +46,7 @@ import ( "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/secrets" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/restapi" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/websubapi" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" @@ -56,6 +57,7 @@ type APIServer struct { *RestAPIHandler // embedded — promotes CreateRestAPI, ListRestAPIs, GetRestAPIById, UpdateRestAPI, DeleteRestAPI restAPIService *restapi.RestAPIService + webSubAPIService *websubapi.WebSubAPIService store *storage.ConfigStore db storage.Storage snapshotManager *xds.SnapshotManager @@ -151,6 +153,17 @@ func NewAPIServer( subscriptionResourceService: subscriptionResourceService, } server.restAPIService = restAPIService + server.webSubAPIService = websubapi.NewWebSubAPIService( + db, + deploymentService, + controlPlaneClient, + systemConfig, + parser, + validator, + logger, + eventHub, + secretService, + ) server.RestAPIHandler = NewRestAPIHandler(restAPIService, logger) // Register status update callback diff --git a/gateway/gateway-controller/pkg/api/handlers/handlers_test.go b/gateway/gateway-controller/pkg/api/handlers/handlers_test.go index 7e24480bbe..98a4c81479 100644 --- a/gateway/gateway-controller/pkg/api/handlers/handlers_test.go +++ b/gateway/gateway-controller/pkg/api/handlers/handlers_test.go @@ -49,6 +49,7 @@ import ( policybuilder "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policy" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/restapi" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/websubapi" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" ) @@ -1019,6 +1020,17 @@ func createTestAPIServerWithDB(db storage.Storage) *APIServer { httpClient, parser, validator, logger, hub, nil, ) server.restAPIService = restAPIService + server.webSubAPIService = websubapi.NewWebSubAPIService( + db, + deploymentService, + nil, + systemCfg, + parser, + validator, + logger, + hub, + nil, + ) server.RestAPIHandler = NewRestAPIHandler(restAPIService, logger) return server @@ -1238,6 +1250,17 @@ func attachTestEventHub(server *APIServer, hub eventhub.EventHub, gatewayID stri server.restAPIService = restAPIService server.RestAPIHandler = NewRestAPIHandler(restAPIService, server.logger) } + server.webSubAPIService = websubapi.NewWebSubAPIService( + server.db, + server.deploymentService, + nil, + server.systemConfig, + server.parser, + server.validator, + server.logger, + hub, + nil, + ) } func seedAPIForAPIKeyHandlerTests(t *testing.T, server *APIServer, handle string) *models.StoredConfig { @@ -2999,6 +3022,69 @@ func TestGetLLMProxyByIdWithDeployedAt(t *testing.T) { assert.Equal(t, http.StatusOK, w.Code) } +func TestUpdateWebSubAPIUndeployUsesDedicatedServicePath(t *testing.T) { + server := createTestAPIServer() + mockDB := server.db.(*MockStorage) + mockHub := &mockEventHub{} + attachTestEventHub(server, mockHub, "test-gateway") + + cfg := &models.StoredConfig{ + UUID: "0000-websub-id-0000-000000000000", + Kind: string(api.WebSubAPIKindWebSubApi), + Handle: "repo-watcher-v1-0", + DisplayName: "repo-watcher", + Version: "v1.0", + SourceConfiguration: api.WebSubAPI{ + ApiVersion: api.WebSubAPIApiVersionGatewayApiPlatformWso2Comv1alpha1, + Kind: api.WebSubAPIKindWebSubApi, + Metadata: api.Metadata{ + Name: "repo-watcher-v1-0", + }, + Spec: api.WebhookAPIData{ + DisplayName: "repo-watcher", + Version: "v1.0", + Context: "/repos", + Hub: api.WebSubHub{ + Channels: []api.HubChannel{{Name: "issues"}}, + }, + }, + }, + DesiredState: models.StateDeployed, + Origin: models.OriginGatewayAPI, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + require.NoError(t, mockDB.SaveConfig(cfg)) + + body := []byte(`{ + "apiVersion":"gateway.api-platform.wso2.com/v1alpha1", + "kind":"WebSubApi", + "metadata":{"name":"repo-watcher-v1-0"}, + "spec":{ + "displayName":"repo-watcher", + "version":"v1.0", + "context":"/repos", + "deploymentState":"undeployed", + "hub":{"channels":[{"name":"issues"}]} + } + }`) + + c, w := createTestContext("PUT", "/websub-apis/repo-watcher-v1-0", body) + c.Request.Header.Set("Content-Type", "application/json") + + server.UpdateWebSubAPI(c, "repo-watcher-v1-0") + + assert.Equal(t, http.StatusOK, w.Code) + + updatedCfg, err := mockDB.GetConfig("0000-websub-id-0000-000000000000") + require.NoError(t, err) + assert.Equal(t, models.StateUndeployed, updatedCfg.DesiredState) + require.Len(t, mockHub.publishedEvents, 1) + assert.Equal(t, "UPDATE", mockHub.publishedEvents[0].event.Action) + assert.Equal(t, eventhub.EventTypeAPI, mockHub.publishedEvents[0].event.EventType) + assert.Equal(t, "0000-websub-id-0000-000000000000", mockHub.publishedEvents[0].event.EntityID) +} + // TestHandleStatusUpdateStoreError tests handleStatusUpdate with store error func TestHandleStatusUpdateStoreError(t *testing.T) { server := createTestAPIServer() diff --git a/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go b/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go index 66a54e2aa8..f71c6eb048 100644 --- a/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go +++ b/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go @@ -20,6 +20,7 @@ package handlers import ( "context" + "errors" "fmt" "io" "log/slog" @@ -32,10 +33,13 @@ import ( api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/management" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/middleware" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/service/websubapi" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" ) +//TODO: Refactor to move business logic to service layer and keep handlers thin, focusing on request parsing, response formatting, and error handling. + // CreateWebSubAPI implements ServerInterface.CreateWebSubAPI // (POST /websub-apis) func (s *APIServer) CreateWebSubAPI(c *gin.Context) { @@ -182,31 +186,23 @@ func (s *APIServer) UpdateWebSubAPI(c *gin.Context, id string) { return } - existing, err := s.db.GetConfigByKindAndHandle(models.KindWebSubApi, handle) - if err != nil { - log.Warn("WebSub API configuration not found", - slog.String("handle", handle)) - c.JSON(http.StatusNotFound, api.ErrorResponse{ - Status: "error", - Message: fmt.Sprintf("WebSub API configuration with handle '%s' not found", handle), - }) - return - } - correlationID := middleware.GetCorrelationID(c) - result, err := s.deploymentService.DeployAPIConfiguration(utils.APIDeploymentParams{ - Data: body, + result, err := s.webSubAPIService.Update(websubapi.UpdateParams{ + Handle: handle, + Body: body, ContentType: c.GetHeader("Content-Type"), - Kind: "WebSubApi", - APIID: existing.UUID, - Origin: models.OriginGatewayAPI, CorrelationID: correlationID, Logger: log, }) if err != nil { log.Error("Failed to update WebSub API configuration", slog.Any("error", err)) - if storage.IsConflictError(err) { + if errors.Is(err, websubapi.ErrNotFound) { + c.JSON(http.StatusNotFound, api.ErrorResponse{ + Status: "error", + Message: fmt.Sprintf("WebSub API configuration with handle '%s' not found", handle), + }) + } else if storage.IsConflictError(err) { c.JSON(http.StatusConflict, api.ErrorResponse{ Status: "error", Message: err.Error(), @@ -223,7 +219,7 @@ func (s *APIServer) UpdateWebSubAPI(c *gin.Context, id string) { return } - updated := result.StoredConfig + updated := result.Config log.Info("WebSub API configuration updated", slog.String("id", updated.UUID), diff --git a/gateway/gateway-controller/pkg/service/websubapi/errors.go b/gateway/gateway-controller/pkg/service/websubapi/errors.go new file mode 100644 index 0000000000..3a44cba9d0 --- /dev/null +++ b/gateway/gateway-controller/pkg/service/websubapi/errors.go @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package websubapi + +import ( + "errors" + "fmt" + + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" +) + +var ( + // ErrNotFound is returned when a WebSub API is not found. + ErrNotFound = errors.New("websub api not found") +) + +// ValidationError wraps configuration validation errors. +type ValidationError struct { + Errors []config.ValidationError +} + +func (e *ValidationError) Error() string { + return fmt.Sprintf("configuration validation failed (%d errors)", len(e.Errors)) +} + +// ParseError wraps a configuration parse failure. +type ParseError struct { + Cause error +} + +func (e *ParseError) Error() string { + return fmt.Sprintf("failed to parse configuration: %v", e.Cause) +} + +func (e *ParseError) Unwrap() error { + return e.Cause +} + +// HandleMismatchError is returned when the path handle doesn't match metadata.name. +type HandleMismatchError struct { + PathHandle string + YAMLHandle string +} + +func (e *HandleMismatchError) Error() string { + return fmt.Sprintf("handle mismatch: path has '%s' but YAML metadata.name has '%s'", e.PathHandle, e.YAMLHandle) +} diff --git a/gateway/gateway-controller/pkg/service/websubapi/service.go b/gateway/gateway-controller/pkg/service/websubapi/service.go new file mode 100644 index 0000000000..a1c44292c2 --- /dev/null +++ b/gateway/gateway-controller/pkg/service/websubapi/service.go @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package websubapi + +import ( + "fmt" + "log/slog" + "strings" + "time" + + "github.com/wso2/api-platform/common/eventhub" + api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/management" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/controlplane" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/templateengine" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/templateengine/funcs" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" +) + +// UpdateParams holds parameters for the Update operation. +type UpdateParams struct { + Handle string + Body []byte + ContentType string + CorrelationID string + Logger *slog.Logger +} + +// UpdateResult holds the result of an Update operation. +type UpdateResult struct { + Config *models.StoredConfig +} + +// WebSubAPIService encapsulates WebSub API update behavior. +type WebSubAPIService struct { + db storage.Storage + deploymentService *utils.APIDeploymentService + controlPlaneClient controlplane.ControlPlaneClient + systemConfig *config.Config + parser *config.Parser + validator config.Validator + logger *slog.Logger + eventHub eventhub.EventHub + secretResolver funcs.SecretResolver +} + +// NewWebSubAPIService creates a new WebSubAPIService. +func NewWebSubAPIService( + db storage.Storage, + deploymentService *utils.APIDeploymentService, + controlPlaneClient controlplane.ControlPlaneClient, + systemConfig *config.Config, + parser *config.Parser, + validator config.Validator, + logger *slog.Logger, + eventHub eventhub.EventHub, + secretResolver funcs.SecretResolver, +) *WebSubAPIService { + if db == nil { + panic("WebSubAPIService requires non-nil storage") + } + if deploymentService == nil { + panic("WebSubAPIService requires APIDeploymentService") + } + if eventHub == nil { + panic("WebSubAPIService requires non-nil EventHub") + } + if systemConfig == nil { + panic("WebSubAPIService requires non-nil system config") + } + if strings.TrimSpace(systemConfig.Controller.Server.GatewayID) == "" { + panic("WebSubAPIService requires non-empty gateway ID") + } + + return &WebSubAPIService{ + db: db, + deploymentService: deploymentService, + controlPlaneClient: controlPlaneClient, + systemConfig: systemConfig, + parser: parser, + validator: validator, + logger: logger, + eventHub: eventHub, + secretResolver: secretResolver, + } +} + +// Update modifies an existing WebSub API configuration. +func (s *WebSubAPIService) Update(params UpdateParams) (*UpdateResult, error) { + log := params.Logger + + var apiConfig api.WebSubAPI + if err := s.parser.Parse(params.Body, params.ContentType, &apiConfig); err != nil { + return nil, &ParseError{Cause: err} + } + + if apiConfig.Metadata.Name != "" && apiConfig.Metadata.Name != params.Handle { + return nil, &HandleMismatchError{ + PathHandle: params.Handle, + YAMLHandle: apiConfig.Metadata.Name, + } + } + + existing, err := s.db.GetConfigByKindAndHandle(models.KindWebSubApi, params.Handle) + if err != nil { + if storage.IsNotFoundError(err) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("failed to retrieve existing websub api: %w", err) + } + + desiredState := models.StateDeployed + if apiConfig.Spec.DeploymentState != nil && + *apiConfig.Spec.DeploymentState == api.WebhookAPIDataDeploymentStateUndeployed { + desiredState = models.StateUndeployed + } + + if desiredState != models.StateUndeployed { + result, err := s.deploymentService.DeployAPIConfiguration(utils.APIDeploymentParams{ + Data: params.Body, + ContentType: params.ContentType, + Kind: "WebSubApi", + APIID: existing.UUID, + Origin: models.OriginGatewayAPI, + CorrelationID: params.CorrelationID, + Logger: log, + }) + if err != nil { + return nil, err + } + return &UpdateResult{Config: result.StoredConfig}, nil + } + + existing.Configuration = apiConfig + existing.SourceConfiguration = apiConfig + + if err := templateengine.RenderSpec(existing, s.secretResolver, log); err != nil { + return nil, err + } + + renderedConfig, ok := existing.Configuration.(api.WebSubAPI) + if !ok { + return nil, fmt.Errorf("failed to render websub api configuration") + } + + validationErrors := s.validator.Validate(&renderedConfig) + if len(validationErrors) > 0 { + return nil, &ValidationError{Errors: validationErrors} + } + + if err := s.validateArtifactConflicts(existing.UUID, renderedConfig.Spec.DisplayName, renderedConfig.Spec.Version, existing.Handle); err != nil { + return nil, err + } + + now := time.Now() + existing.DisplayName = renderedConfig.Spec.DisplayName + existing.Version = renderedConfig.Spec.Version + existing.DesiredState = desiredState + existing.UpdatedAt = now + + truncatedNow := now.Truncate(time.Millisecond) + existing.DeployedAt = &truncatedNow + + if existing.Origin == models.OriginGatewayAPI { + existing.CPSyncStatus = models.CPSyncStatusPending + } + + if err := s.db.UpdateConfig(existing); err != nil { + log.Error("Failed to update WebSub API config in database", slog.Any("error", err)) + return nil, fmt.Errorf("failed to persist configuration update: %w", err) + } + + s.publishEvent(eventhub.EventTypeAPI, "UPDATE", existing.UUID, params.CorrelationID, log) + + if existing.Origin == models.OriginGatewayAPI && s.controlPlaneClient != nil && s.controlPlaneClient.IsConnected() && s.controlPlaneClient.IsOnPrem() { + go func() { + if err := s.controlPlaneClient.SyncArtifactsToOnPremAPIM(s.controlPlaneClient.GetAPIMConfig()); err != nil { + log.Error("Failed to sync WebSub API to on-prem APIM", slog.Any("error", err)) + } + }() + } + + log.Info("WebSub API configuration updated", + slog.String("id", existing.UUID), + slog.String("handle", params.Handle), + slog.String("desired_state", string(desiredState))) + + return &UpdateResult{Config: existing}, nil +} + +func (s *WebSubAPIService) validateArtifactConflicts(currentID, displayName, version, handle string) error { + existingByNameVersion, err := s.db.GetConfigByKindNameAndVersion(models.KindWebSubApi, displayName, version) + if err == nil { + if existingByNameVersion != nil && existingByNameVersion.UUID != currentID { + return fmt.Errorf("%w: configuration with name '%s' and version '%s' already exists", + storage.ErrConflict, displayName, version) + } + } else if !storage.IsNotFoundError(err) { + return fmt.Errorf("failed to check existing WebSubApi name/version conflict: %w", err) + } + + existingByHandle, err := s.db.GetConfigByKindAndHandle(models.KindWebSubApi, handle) + if err == nil { + if existingByHandle != nil && existingByHandle.UUID != currentID { + return fmt.Errorf("%w: configuration with handle '%s' already exists", + storage.ErrConflict, handle) + } + } else if !storage.IsNotFoundError(err) { + return fmt.Errorf("failed to check existing WebSubApi handle conflict: %w", err) + } + + return nil +} + +func (s *WebSubAPIService) publishEvent(eventType eventhub.EventType, action, entityID, correlationID string, logger *slog.Logger) { + gatewayID := strings.TrimSpace(s.systemConfig.Controller.Server.GatewayID) + event := eventhub.Event{ + GatewayID: gatewayID, + OriginatedTimestamp: time.Now(), + EventType: eventType, + Action: action, + EntityID: entityID, + EventID: correlationID, + EventData: eventhub.EmptyEventData, + } + + if err := s.eventHub.PublishEvent(gatewayID, event); err != nil { + logger.Warn("Failed to publish event to event hub", + slog.String("gateway_id", gatewayID), + slog.String("event_type", string(eventType)), + slog.String("action", action), + slog.String("entity_id", entityID), + slog.Any("error", err)) + return + } + + logger.Debug("Published event to event hub", + slog.String("gateway_id", gatewayID), + slog.String("event_type", string(eventType)), + slog.String("action", action), + slog.String("entity_id", entityID)) +} From a74fabceace618c42effebfde4007ff8bd5811d4 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Fri, 8 May 2026 12:55:04 +0530 Subject: [PATCH 02/19] Fix Websub api delta update on event-gateway --- .../connectors/receiver/websub/delta.go | 71 ++++++ .../internal/runtime/runtime.go | 222 +++++++++++++++++- .../internal/xdsclient/handler.go | 10 +- .../internal/xdsclient/handler_test.go | 25 +- 4 files changed, 313 insertions(+), 15 deletions(-) create mode 100644 event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go new file mode 100644 index 0000000000..59ba534299 --- /dev/null +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package websub + +import ( + "context" + "fmt" + "log/slog" +) + +// ApplyBindingDelta mutates the live receiver for channel add/remove changes +// without recreating the receiver or the subscription sync topic. +func (e *WebSubReceiver) ApplyBindingDelta(ctx context.Context, removedChannels map[string]string, addedChannels map[string]string) error { + for channelName := range removedChannels { + subscriptions := e.store.GetByTopic(channelName) + for _, sub := range subscriptions { + if e.syncProducer != nil { + if err := e.syncProducer.PublishTombstone(ctx, channelName, sub.CallbackURL); err != nil { + return fmt.Errorf("failed to tombstone subscription for removed channel %q: %w", channelName, err) + } + } + } + } + + for channelName, kafkaTopic := range removedChannels { + e.topics.Deregister(channelName) + + subscriptions := e.store.GetByTopic(channelName) + for _, sub := range subscriptions { + if err := e.consumerMgr.RemoveSubscription(sub.CallbackURL, kafkaTopic); err != nil { + slog.Error("Failed to remove consumer for deleted WebSub channel", + "api", e.channel.Name, + "channel", channelName, + "callback", sub.CallbackURL, + "error", err) + } + if err := e.store.Remove(channelName, sub.CallbackURL); err != nil { + slog.Error("Failed to remove subscription for deleted WebSub channel", + "api", e.channel.Name, + "channel", channelName, + "callback", sub.CallbackURL, + "error", err) + } + } + + delete(e.channel.Channels, channelName) + } + + for channelName, kafkaTopic := range addedChannels { + e.channel.Channels[channelName] = kafkaTopic + e.topics.Register(channelName) + } + + return nil +} diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime.go b/event-gateway/gateway-runtime/internal/runtime/runtime.go index 490d279e1e..029c2e4841 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime.go @@ -25,6 +25,7 @@ import ( "net/http" "os" "path" + "reflect" "sync" "time" @@ -78,6 +79,11 @@ type managedServer struct { keyFile string } +type webSubBindingUpdater interface { + connectors.Receiver + ApplyBindingDelta(ctx context.Context, removedChannels map[string]string, addedChannels map[string]string) error +} + // New creates a new Runtime. After creation: // 1. Call Engine() to register policies // 2. Call LoadChannels() to parse bindings and create per-channel receiver+broker-driver pairs @@ -674,6 +680,119 @@ func (r *Runtime) unregisterBindingChains(b *hub.ChannelBinding) { } } +func webSubChannelTopicMap(wsb binding.WebSubApiBinding) map[string]string { + channels := make(map[string]string, len(wsb.Channels)) + for _, ch := range wsb.Channels { + channels[ch.Name] = binding.WebSubApiTopicName(wsb.Name, wsb.Version, ch.Name) + } + return channels +} + +func webSubTopicList(channels map[string]string, subscriptionTopic string) []string { + topics := make([]string, 0, len(channels)+1) + for _, kafkaTopic := range channels { + topics = append(topics, kafkaTopic) + } + if subscriptionTopic != "" { + topics = append(topics, subscriptionTopic) + } + return topics +} + +func diffChannelTopics(oldChannels, newChannels map[string]string) (map[string]string, map[string]string) { + removed := make(map[string]string) + added := make(map[string]string) + + for channelName, kafkaTopic := range oldChannels { + if _, exists := newChannels[channelName]; !exists { + removed[channelName] = kafkaTopic + } + } + for channelName, kafkaTopic := range newChannels { + if _, exists := oldChannels[channelName]; !exists { + added[channelName] = kafkaTopic + } + } + + return removed, added +} + +func webSubActiveChainKeys(wsb binding.WebSubApiBinding, vhost string) map[string]bool { + active := make(map[string]bool) + basePath := binding.WebSubApiBasePath(wsb.Context, wsb.Version) + hubPath := basePath + "/hub" + + if len(wsb.Policies.Subscribe) > 0 { + active[binding.GenerateRouteKey("SUBSCRIBE", hubPath, vhost)] = true + } + if len(wsb.Policies.Inbound) > 0 { + active[binding.GenerateRouteKey("SUB", basePath+"/webhook-receiver", vhost)] = true + } + if len(wsb.Policies.Outbound) > 0 { + active[binding.GenerateRouteKey("DELIVER", hubPath, vhost)] = true + } + + for _, ch := range wsb.Channels { + chPath := hubPath + "/" + ch.Name + if len(ch.Policies.Subscribe) > 0 { + active[binding.GenerateRouteKey("SUBSCRIBE", chPath, vhost)] = true + } + if len(ch.Policies.Inbound) > 0 { + active[binding.GenerateRouteKey("SUB", chPath, vhost)] = true + } + if len(ch.Policies.Outbound) > 0 { + active[binding.GenerateRouteKey("DELIVER", chPath, vhost)] = true + } + } + + return active +} + +func (r *Runtime) unregisterStaleBindingChains(b *hub.ChannelBinding, activeKeys map[string]bool) { + if b == nil { + return + } + keys := []string{b.SubscribeChainKey, b.InboundChainKey, b.OutboundChainKey} + for _, key := range keys { + if key == "" { + continue + } + if !activeKeys[key] { + r.engine.UnregisterChain(key) + } + } + for _, chKeys := range b.ChannelChainKeys { + channelKeys := []string{chKeys.SubscribeChainKey, chKeys.InboundChainKey, chKeys.OutboundChainKey} + for _, key := range channelKeys { + if key == "" { + continue + } + if !activeKeys[key] { + r.engine.UnregisterChain(key) + } + } + } +} + +func canDeltaUpdateWebSubBinding(oldWSB, newWSB binding.WebSubApiBinding) bool { + if oldWSB.Name != newWSB.Name { + return false + } + if oldWSB.Context != newWSB.Context { + return false + } + if oldWSB.Version != newWSB.Version { + return false + } + if !reflect.DeepEqual(oldWSB.Receiver, newWSB.Receiver) { + return false + } + if !reflect.DeepEqual(oldWSB.BrokerDriver, newWSB.BrokerDriver) { + return false + } + return true +} + // AddWebSubApiBinding dynamically adds a WebSubApi binding at runtime (xDS mode). func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { r.mu.Lock() @@ -726,12 +845,7 @@ func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { r.bindingPaths[wsb.Name] = []string{basePath + "/hub", basePath + "/webhook-receiver"} // Track all Kafka topics for cleanup on removal. - allTopics := make([]string, 0, len(channels)+1) - for _, kafkaTopic := range channels { - allTopics = append(allTopics, kafkaTopic) - } - allTopics = append(allTopics, internalSubTopic) - r.bindingTopics[wsb.Name] = allTopics + r.bindingTopics[wsb.Name] = webSubTopicList(channels, internalSubTopic) ch := connectors.ChannelInfo{ Name: wsb.Name, @@ -780,6 +894,102 @@ func (r *Runtime) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { return nil } +// UpdateWebSubApiBinding dynamically updates an existing WebSubApi binding at runtime (xDS mode). +func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding) error { + if !canDeltaUpdateWebSubBinding(oldWSB, newWSB) { + if err := r.RemoveWebSubApiBinding(oldWSB.Name); err != nil { + return err + } + return r.AddWebSubApiBinding(newWSB) + } + + r.mu.Lock() + defer r.mu.Unlock() + + receiver, ok := r.activeReceivers[oldWSB.Name] + if !ok { + return fmt.Errorf("active receiver not found for WebSubApi %q", oldWSB.Name) + } + updater, ok := receiver.(webSubBindingUpdater) + if !ok { + return fmt.Errorf("receiver for WebSubApi %q does not support delta updates", oldWSB.Name) + } + + brokerDriver, ok := r.activeBrokerDrivers[oldWSB.Name] + if !ok { + return fmt.Errorf("active broker-driver not found for WebSubApi %q", oldWSB.Name) + } + + oldChannels := webSubChannelTopicMap(oldWSB) + newChannels := webSubChannelTopicMap(newWSB) + removedChannels, addedChannels := diffChannelTopics(oldChannels, newChannels) + oldBinding := r.hub.GetBinding(oldWSB.Name) + + if len(addedChannels) > 0 { + topicsToEnsure := make([]string, 0, len(addedChannels)) + for _, kafkaTopic := range addedChannels { + topicsToEnsure = append(topicsToEnsure, kafkaTopic) + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := brokerDriver.EnsureTopics(ctx, topicsToEnsure); err != nil { + cancel() + return fmt.Errorf("failed to ensure broker-driver topics during update for WebSubApi %q: %w", newWSB.Name, err) + } + cancel() + } + + if err := updater.ApplyBindingDelta(context.Background(), removedChannels, addedChannels); err != nil { + return fmt.Errorf("failed to apply WebSub delta update for %q: %w", newWSB.Name, err) + } + + if len(removedChannels) > 0 { + topicsToDelete := make([]string, 0, len(removedChannels)) + for _, kafkaTopic := range removedChannels { + topicsToDelete = append(topicsToDelete, kafkaTopic) + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := brokerDriver.DeleteTopics(ctx, topicsToDelete); err != nil { + slog.Error("Failed to delete broker-driver topics during delta update", + "name", newWSB.Name, + "error", err) + } + cancel() + } + + vhost := defaultVhost(newWSB.Vhost) + subKey, inKey, outKey, chChainKeys, err := r.buildWebSubApiPolicyChains(newWSB, vhost) + if err != nil { + return fmt.Errorf("failed to build chains for updated WebSubApi %q: %w", newWSB.Name, err) + } + + r.unregisterStaleBindingChains(oldBinding, webSubActiveChainKeys(newWSB, vhost)) + r.hub.RegisterBinding(hub.ChannelBinding{ + APIID: newWSB.APIID, + Name: newWSB.Name, + Mode: "websub", + Context: newWSB.Context, + Version: newWSB.Version, + Vhost: vhost, + SubscribeChainKey: subKey, + InboundChainKey: inKey, + OutboundChainKey: outKey, + Channels: newChannels, + ChannelChainKeys: chChainKeys, + }) + + internalSubTopic := binding.WebSubApiSubscriptionTopic(newWSB.Name, newWSB.Version) + r.bindingTopics[newWSB.Name] = webSubTopicList(newChannels, internalSubTopic) + + slog.Info("Dynamically updated WebSubApi binding", + "name", newWSB.Name, + "added_channels", len(addedChannels), + "removed_channels", len(removedChannels), + "channels", len(newWSB.Channels), + ) + + return nil +} + // RemoveWebSubApiBinding dynamically removes a WebSubApi binding at runtime (xDS mode). func (r *Runtime) RemoveWebSubApiBinding(name string) error { r.mu.Lock() diff --git a/event-gateway/gateway-runtime/internal/xdsclient/handler.go b/event-gateway/gateway-runtime/internal/xdsclient/handler.go index 81e80743f0..eca82ff956 100644 --- a/event-gateway/gateway-runtime/internal/xdsclient/handler.go +++ b/event-gateway/gateway-runtime/internal/xdsclient/handler.go @@ -38,6 +38,7 @@ import ( type BindingManager interface { AddWebSubApiBinding(wsb binding.WebSubApiBinding) error RemoveWebSubApiBinding(name string) error + UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding) error } // KafkaConfig holds local Kafka broker settings used as defaults. @@ -187,15 +188,14 @@ func (h *Handler) HandleResources(ctx context.Context, resources []*discoveryv3. if reflect.DeepEqual(old, ecr) { continue } - // Update: remove then re-add slog.Info("Updating binding via xDS", "name", ecr.Name, "uuid", uuid) - if err := h.manager.RemoveWebSubApiBinding(old.Name); err != nil { - slog.Error("Failed to remove binding for update", "name", old.Name, "error", err) + if err := h.manager.UpdateWebSubApiBinding(h.toWebSubApiBinding(old), h.toWebSubApiBinding(ecr)); err != nil { + return fmt.Errorf("failed to update binding %q: %w", ecr.Name, err) } - } else { - slog.Info("Adding binding via xDS", "name", ecr.Name, "uuid", uuid) + continue } + slog.Info("Adding binding via xDS", "name", ecr.Name, "uuid", uuid) wsb := h.toWebSubApiBinding(ecr) if err := h.manager.AddWebSubApiBinding(wsb); err != nil { return fmt.Errorf("failed to add binding %q: %w", ecr.Name, err) diff --git a/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go b/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go index c497b174ce..f9cc1deac6 100644 --- a/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go +++ b/event-gateway/gateway-runtime/internal/xdsclient/handler_test.go @@ -141,11 +141,17 @@ func TestHandlerHandleResources_UpdatesOnlyChangedBinding(t *testing.T) { t.Fatalf("updated HandleResources returned error: %v", err) } - if got := manager.removedNames(); len(got) != 1 || got[0] != "githubser" { - t.Fatalf("expected only githubser to be removed on change, got %#v", got) + if got := len(manager.updated); got != 1 { + t.Fatalf("expected exactly one update call, got %d", got) } - if got := manager.addedCount("githubser"); got != 2 { - t.Fatalf("expected githubser to be added twice, got %d", got) + if manager.updated[0].old.Name != "githubser" || manager.updated[0].new.Name != "githubser" { + t.Fatalf("unexpected update binding names: %#v", manager.updated[0]) + } + if got := manager.removedNames(); len(got) != 0 { + t.Fatalf("expected no removals on delta update, got %#v", got) + } + if got := manager.addedCount("githubser"); got != 1 { + t.Fatalf("expected githubser to be added once, got %d", got) } if got := manager.addedCount("gitlabser"); got != 1 { t.Fatalf("expected gitlabser to be added once, got %d", got) @@ -155,6 +161,12 @@ func TestHandlerHandleResources_UpdatesOnlyChangedBinding(t *testing.T) { type recordingBindingManager struct { added []string removed []string + updated []updatedBindingCall +} + +type updatedBindingCall struct { + old binding.WebSubApiBinding + new binding.WebSubApiBinding } func (m *recordingBindingManager) AddWebSubApiBinding(wsb binding.WebSubApiBinding) error { @@ -167,6 +179,11 @@ func (m *recordingBindingManager) RemoveWebSubApiBinding(name string) error { return nil } +func (m *recordingBindingManager) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding) error { + m.updated = append(m.updated, updatedBindingCall{old: oldWSB, new: newWSB}) + return nil +} + func (m *recordingBindingManager) addedNames() []string { out := append([]string(nil), m.added...) return out From 1fbd90977a17ba80b96f49b0a6209b8574d128e3 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Fri, 8 May 2026 13:15:27 +0530 Subject: [PATCH 03/19] Make the compacted topic's partition/replication settings configurable --- .../gateway-runtime/configs/config.toml | 4 +- .../gateway-runtime/internal/config/config.go | 36 ++++++--- .../connectors/brokerdriver/kafka/config.go | 73 ++++++++++++++----- .../connectors/brokerdriver/kafka/endpoint.go | 12 ++- 4 files changed, 91 insertions(+), 34 deletions(-) diff --git a/event-gateway/gateway-runtime/configs/config.toml b/event-gateway/gateway-runtime/configs/config.toml index 50ebe9681e..ff8f444615 100644 --- a/event-gateway/gateway-runtime/configs/config.toml +++ b/event-gateway/gateway-runtime/configs/config.toml @@ -26,6 +26,9 @@ metrics_port = 9003 # Default Kafka brokers. Channels can override with broker-driver.config.brokers. brokers = ["localhost:9092"] consumer_group_prefix = "event-gateway" +# Partitions and replication factor used for internal compacted topics. +compact_topic_partitions = 1 +compact_topic_replication_factor = 1 tls = false # Optional PEM CA file for self-signed or private Kafka CAs. # tls_ca_file = "/etc/event-gateway/kafka/ca.crt" @@ -70,4 +73,3 @@ allow_payloads = false enabled = true xds_address = "localhost:18001" # node_id = "" - diff --git a/event-gateway/gateway-runtime/internal/config/config.go b/event-gateway/gateway-runtime/internal/config/config.go index e7ae25cd3c..198decd3f8 100644 --- a/event-gateway/gateway-runtime/internal/config/config.go +++ b/event-gateway/gateway-runtime/internal/config/config.go @@ -57,16 +57,18 @@ type ServerConfig struct { // KafkaConfig holds Kafka connection settings. type KafkaConfig struct { - Brokers []string `koanf:"brokers"` - ConsumerGroupPrefix string `koanf:"consumer_group_prefix"` - TLS bool `koanf:"tls"` - TLSCAFile string `koanf:"tls_ca_file"` - TLSCertFile string `koanf:"tls_cert_file"` - TLSKeyFile string `koanf:"tls_key_file"` - TLSServerName string `koanf:"tls_server_name"` - SASLMechanism string `koanf:"sasl_mechanism"` - SASLUsername string `koanf:"sasl_username"` - SASLPassword string `koanf:"sasl_password"` + Brokers []string `koanf:"brokers"` + ConsumerGroupPrefix string `koanf:"consumer_group_prefix"` + CompactTopicPartitions int `koanf:"compact_topic_partitions"` + CompactTopicReplicationFactor int `koanf:"compact_topic_replication_factor"` + TLS bool `koanf:"tls"` + TLSCAFile string `koanf:"tls_ca_file"` + TLSCertFile string `koanf:"tls_cert_file"` + TLSKeyFile string `koanf:"tls_key_file"` + TLSServerName string `koanf:"tls_server_name"` + SASLMechanism string `koanf:"sasl_mechanism"` + SASLUsername string `koanf:"sasl_username"` + SASLPassword string `koanf:"sasl_password"` } // WebSubConfig holds WebSub-specific settings. @@ -111,8 +113,10 @@ func DefaultConfig() *Config { MetricsPort: 9003, }, Kafka: KafkaConfig{ - Brokers: []string{"localhost:9092"}, - ConsumerGroupPrefix: "event-gateway", + Brokers: []string{"localhost:9092"}, + ConsumerGroupPrefix: "event-gateway", + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, }, WebSub: WebSubConfig{ VerificationTimeoutSeconds: 10, @@ -223,6 +227,8 @@ func mapEnvValue(path, value string) interface{} { "server.websocket_port", "server.admin_port", "server.metrics_port", + "kafka.compact_topic_partitions", + "kafka.compact_topic_replication_factor", "websub.verification_timeout_seconds", "websub.delivery_max_retries", "websub.delivery_initial_delay_ms", @@ -295,6 +301,12 @@ func validateKafkaConfig(kafkaCfg KafkaConfig) error { if len(kafkaCfg.Brokers) == 0 { return fmt.Errorf("kafka.brokers must contain at least one broker") } + if kafkaCfg.CompactTopicPartitions <= 0 { + return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", kafkaCfg.CompactTopicPartitions) + } + if kafkaCfg.CompactTopicReplicationFactor <= 0 { + return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", kafkaCfg.CompactTopicReplicationFactor) + } if kafkaCfg.TLS { if strings.TrimSpace(kafkaCfg.TLSCAFile) != "" { diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go index 914fa6c85c..5a0ce9781c 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go @@ -34,29 +34,39 @@ import ( // ConnectionConfig holds the Kafka connection settings used by the driver. type ConnectionConfig struct { - Brokers []string - TLS bool - TLSCAFile string - TLSCertFile string - TLSKeyFile string - TLSServerName string - SASLMechanism string - SASLUsername string - SASLPassword string + Brokers []string + CompactTopicPartitions int + CompactTopicReplicationFactor int + TLS bool + TLSCAFile string + TLSCertFile string + TLSKeyFile string + TLSServerName string + SASLMechanism string + SASLUsername string + SASLPassword string } // ResolveConnectionConfig merges global runtime config with per-binding overrides. func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]interface{}) (ConnectionConfig, error) { cfg := ConnectionConfig{ - Brokers: append([]string(nil), global.Brokers...), - TLS: global.TLS, - TLSCAFile: global.TLSCAFile, - TLSCertFile: global.TLSCertFile, - TLSKeyFile: global.TLSKeyFile, - TLSServerName: global.TLSServerName, - SASLMechanism: global.SASLMechanism, - SASLUsername: global.SASLUsername, - SASLPassword: global.SASLPassword, + Brokers: append([]string(nil), global.Brokers...), + CompactTopicPartitions: global.CompactTopicPartitions, + CompactTopicReplicationFactor: global.CompactTopicReplicationFactor, + TLS: global.TLS, + TLSCAFile: global.TLSCAFile, + TLSCertFile: global.TLSCertFile, + TLSKeyFile: global.TLSKeyFile, + TLSServerName: global.TLSServerName, + SASLMechanism: global.SASLMechanism, + SASLUsername: global.SASLUsername, + SASLPassword: global.SASLPassword, + } + if cfg.CompactTopicPartitions <= 0 { + cfg.CompactTopicPartitions = 1 + } + if cfg.CompactTopicReplicationFactor <= 0 { + cfg.CompactTopicReplicationFactor = 1 } if overrides != nil { @@ -65,6 +75,16 @@ func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]int } else if ok { cfg.Brokers = brokers } + if v, ok, err := intOverride(overrides["compact_topic_partitions"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.CompactTopicPartitions = v + } + if v, ok, err := intOverride(overrides["compact_topic_replication_factor"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.CompactTopicReplicationFactor = v + } if v, ok, err := boolOverride(overrides["tls"]); err != nil { return ConnectionConfig{}, err } else if ok { @@ -135,6 +155,12 @@ func validateConnectionConfig(cfg ConnectionConfig) error { if len(cfg.Brokers) == 0 { return fmt.Errorf("kafka brokers must not be empty") } + if cfg.CompactTopicPartitions <= 0 { + return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions) + } + if cfg.CompactTopicReplicationFactor <= 0 { + return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor) + } if !cfg.TLS { if cfg.TLSCAFile != "" || cfg.TLSCertFile != "" || cfg.TLSKeyFile != "" || cfg.TLSServerName != "" { @@ -264,6 +290,17 @@ func boolOverride(value interface{}) (bool, bool, error) { return v, true, nil } +func intOverride(value interface{}) (int, bool, error) { + if value == nil { + return 0, false, nil + } + v, ok := value.(int) + if !ok { + return 0, false, fmt.Errorf("expected int override, got %T", value) + } + return v, true, nil +} + func stringOverride(value interface{}) (string, bool, error) { if value == nil { return "", false, nil diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go index ffa77ffeb4..d8367526e4 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go @@ -123,9 +123,15 @@ func (e *KafkaBrokerDriver) EnsureTopics(ctx context.Context, topics []string) e // EnsureCompactedTopic creates a compacted topic if it does not already exist. func (e *KafkaBrokerDriver) EnsureCompactedTopic(ctx context.Context, topic string) error { - resp, err := e.admin.CreateTopics(ctx, 1, 1, map[string]*string{ - "cleanup.policy": kadm.StringPtr("compact"), - }, topic) + resp, err := e.admin.CreateTopics( + ctx, + int32(e.cfg.CompactTopicPartitions), + int16(e.cfg.CompactTopicReplicationFactor), + map[string]*string{ + "cleanup.policy": kadm.StringPtr("compact"), + }, + topic, + ) if err != nil { return fmt.Errorf("failed to create compacted topic %s: %w", topic, err) } From 123be8921aa64aebe071f8a8b9defab7806a7f69 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Sat, 9 May 2026 00:19:57 +0530 Subject: [PATCH 04/19] Avoid silently defaulting invalid compact-topic values --- .../internal/connectors/brokerdriver/kafka/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go index 5a0ce9781c..7953dfe059 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go @@ -63,10 +63,10 @@ func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]int SASLPassword: global.SASLPassword, } if cfg.CompactTopicPartitions <= 0 { - cfg.CompactTopicPartitions = 1 + return ConnectionConfig{}, fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions) } if cfg.CompactTopicReplicationFactor <= 0 { - cfg.CompactTopicReplicationFactor = 1 + return ConnectionConfig{}, fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor) } if overrides != nil { From a560ac403de1d0fcf07e4df75f50a10b2483faa6 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Sat, 9 May 2026 00:25:13 +0530 Subject: [PATCH 05/19] intOverride rejects valid numeric values from YAML/JSON deserialization --- .../connectors/brokerdriver/kafka/config.go | 19 +++- .../brokerdriver/kafka/config_test.go | 102 +++++++++++++++--- 2 files changed, 101 insertions(+), 20 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go index 7953dfe059..94b3b225ec 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "math" "os" "strings" @@ -294,11 +295,23 @@ func intOverride(value interface{}) (int, bool, error) { if value == nil { return 0, false, nil } - v, ok := value.(int) - if !ok { + switch v := value.(type) { + case int: + return v, true, nil + case float64: + if math.IsNaN(v) || math.IsInf(v, 0) { + return 0, false, fmt.Errorf("expected integer Kafka config override, got non-finite float64 %v", v) + } + if v != math.Trunc(v) { + return 0, false, fmt.Errorf("expected integer Kafka config override, got non-integer float64 %v", v) + } + if v < math.MinInt32 || v > math.MaxInt32 { + return 0, false, fmt.Errorf("expected integer Kafka config override within [%d, %d], got float64 %v", math.MinInt32, math.MaxInt32, v) + } + return int(v), true, nil + default: return 0, false, fmt.Errorf("expected int override, got %T", value) } - return v, true, nil } func stringOverride(value interface{}) (string, bool, error) { diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go index 3a46a34f71..abe611092d 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go @@ -1,9 +1,11 @@ package kafka import ( + "math" "os" "path/filepath" "reflect" + "strings" "testing" runtimeconfig "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/config" @@ -17,13 +19,15 @@ func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) { } global := runtimeconfig.KafkaConfig{ - Brokers: []string{"broker-1:9092"}, - TLS: true, - TLSCAFile: caPath, - TLSServerName: "global-kafka", - SASLMechanism: "plain", - SASLUsername: "global-user", - SASLPassword: "global-pass", + Brokers: []string{"broker-1:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLS: true, + TLSCAFile: caPath, + TLSServerName: "global-kafka", + SASLMechanism: "plain", + SASLUsername: "global-user", + SASLPassword: "global-pass", } resolved, err := ResolveConnectionConfig(global, map[string]interface{}{ @@ -57,7 +61,10 @@ func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) { } func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) { - resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{ + resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + }, map[string]interface{}{ "brokers": []interface{}{"broker:9092"}, "sasl_mechanism": "plain", "sasl_username": " user-with-spaces ", @@ -77,8 +84,10 @@ func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) { func TestResolveConnectionConfig_RequiresTLSWhenTLSFilesAreConfigured(t *testing.T) { _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ - Brokers: []string{"broker:9092"}, - TLSCAFile: "/tmp/ca.crt", + Brokers: []string{"broker:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLSCAFile: "/tmp/ca.crt", }, nil) if err == nil { t.Fatalf("expected error when TLS files are set with TLS disabled") @@ -93,18 +102,22 @@ func TestResolveConnectionConfig_ValidatesReadableTLSFiles(t *testing.T) { } _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ - Brokers: []string{"broker:9092"}, - TLS: true, - TLSCAFile: caPath, + Brokers: []string{"broker:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLS: true, + TLSCAFile: caPath, }, nil) if err != nil { t.Fatalf("expected readable CA file to validate, got %v", err) } _, err = ResolveConnectionConfig(runtimeconfig.KafkaConfig{ - Brokers: []string{"broker:9092"}, - TLS: true, - TLSCAFile: filepath.Join(tempDir, "missing.crt"), + Brokers: []string{"broker:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLS: true, + TLSCAFile: filepath.Join(tempDir, "missing.crt"), }, nil) if err == nil { t.Fatalf("expected missing CA file to fail validation") @@ -112,7 +125,10 @@ func TestResolveConnectionConfig_ValidatesReadableTLSFiles(t *testing.T) { } func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) { - _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{ + _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + }, map[string]interface{}{ "brokers": []interface{}{"broker:9092"}, "sasl_mechanism": "scram-sha-512", "sasl_username": "user", @@ -121,3 +137,55 @@ func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) { t.Fatalf("expected missing SASL password to fail validation") } } + +func TestIntOverride_AcceptsIntegerFloat64(t *testing.T) { + got, ok, err := intOverride(float64(3)) + if err != nil { + t.Fatalf("expected integer float64 override to succeed, got %v", err) + } + if !ok { + t.Fatalf("expected integer float64 override to be accepted") + } + if got != 3 { + t.Fatalf("expected integer float64 override to convert to 3, got %d", got) + } +} + +func TestIntOverride_RejectsInvalidFloat64(t *testing.T) { + tests := []struct { + name string + value float64 + wantErr string + }{ + { + name: "non integer", + value: 3.5, + wantErr: "non-integer", + }, + { + name: "out of bounds", + value: float64(math.MaxInt32) + 1, + wantErr: "within", + }, + { + name: "non finite", + value: math.NaN(), + wantErr: "non-finite", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, ok, err := intOverride(tt.value) + if err == nil { + t.Fatalf("expected float64 override %v to fail", tt.value) + } + if ok { + t.Fatalf("expected invalid float64 override %v to be rejected", tt.value) + } + if !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("expected error %q to contain %q", err.Error(), tt.wantErr) + } + }) + } +} From 120b8dce897bf513fdd2bb01cefda2c95d0272aa Mon Sep 17 00:00:00 2001 From: AnujaK Date: Sat, 9 May 2026 00:27:01 +0530 Subject: [PATCH 06/19] Add upper-bound validation before narrowing numeric casts --- .../internal/connectors/brokerdriver/kafka/config.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go index 94b3b225ec..045ce595fc 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go @@ -159,9 +159,15 @@ func validateConnectionConfig(cfg ConnectionConfig) error { if cfg.CompactTopicPartitions <= 0 { return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions) } + if cfg.CompactTopicPartitions > math.MaxInt32 { + return fmt.Errorf("kafka.compact_topic_partitions must be <= %d, got %d", math.MaxInt32, cfg.CompactTopicPartitions) + } if cfg.CompactTopicReplicationFactor <= 0 { return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor) } + if cfg.CompactTopicReplicationFactor > math.MaxInt16 { + return fmt.Errorf("kafka.compact_topic_replication_factor must be <= %d, got %d", math.MaxInt16, cfg.CompactTopicReplicationFactor) + } if !cfg.TLS { if cfg.TLSCAFile != "" || cfg.TLSCertFile != "" || cfg.TLSKeyFile != "" || cfg.TLSServerName != "" { From 78574504deb87752fa32c1bf3164ca8d4324eb0b Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 4 May 2026 10:20:28 +0530 Subject: [PATCH 07/19] Update consumer grp hash length --- .../internal/connectors/receiver/websub/consumer_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go index f909444aa3..45e1598639 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go @@ -218,8 +218,8 @@ func (cm *ConsumerManager) createConsumer(groupID string, topics []string, callb } // consumerGroupID generates a unique, safe consumer group ID for a callback URL. -// Format: {prefix}-websub-{sha256(callbackURL)[:16]} +// Format: {prefix}-websub-{sha256(callbackURL)[:32]} func (cm *ConsumerManager) consumerGroupID(callbackURL string) string { h := sha256.Sum256([]byte(callbackURL)) - return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:16] + return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:32] } From 78ec4f9ab745c8f63ad1cc5fa912bb3e6596758f Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 4 May 2026 10:24:31 +0530 Subject: [PATCH 08/19] Update forbidden by policy to return 401 --- .../internal/connectors/receiver/websub/handler.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go index 02ac6a981e..5fd124d2c9 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go @@ -99,14 +99,14 @@ func (h *HubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { // Enforce subscribe policies before processing. subMsg := httpRequestToMessage(r) - message, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) + _, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) if err != nil { slog.Error("Subscribe policy execution failed", "error", err) http.Error(w, "policy execution failed", http.StatusInternalServerError) return } if shortCircuited { - writePolicyResponse(w, message, http.StatusForbidden, "forbidden by policy") + writePolicyResponse(w, nil, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) return } @@ -203,14 +203,14 @@ func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { func (h *HubHandler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { // Enforce subscribe policies before processing. subMsg := httpRequestToMessage(r) - message, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) + _, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) if err != nil { slog.Error("Unsubscribe policy execution failed", "error", err) http.Error(w, "policy execution failed", http.StatusInternalServerError) return } if shortCircuited { - writePolicyResponse(w, message, http.StatusForbidden, "forbidden by policy") + writePolicyResponse(w, nil, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) return } @@ -342,7 +342,7 @@ func (h *WebhookReceiverHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques } if shortCircuited { slog.Info("Inbound request rejected by policy", "channel", channelName, "binding", h.bindingName) - writePolicyResponse(w, processed, http.StatusForbidden, "forbidden by policy") + writePolicyResponse(w, processed, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) return } From f29de768577f75c3bcf45f85481c28d86fc2ae22 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 4 May 2026 12:08:41 +0530 Subject: [PATCH 09/19] Fix missing mandatory WWW-Authenticate header --- .../internal/connectors/receiver/websub/handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go index 5fd124d2c9..8ee3691b61 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go @@ -379,6 +379,9 @@ func writePolicyResponse(w http.ResponseWriter, msg *connectors.Message, fallbac w.WriteHeader(statusCode) return } + if fallbackStatus == http.StatusUnauthorized { + w.Header().Set("WWW-Authenticate", `Bearer realm="event-gateway"`) + } http.Error(w, fallbackBody, fallbackStatus) } From 8ad31b777926856c5802085aef54e834d449e1c6 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Sat, 9 May 2026 01:26:35 +0530 Subject: [PATCH 10/19] Confirm that ApplyBindingDelta must synchronize mutations of e.channel.Channels with concurrent request handlers --- .../connectors/receiver/websub/connector.go | 33 +++++++++++-------- .../connectors/receiver/websub/delta.go | 4 +++ .../connectors/receiver/websub/handler.go | 13 ++++++++ 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go index 526a27c34c..bda53d5a0d 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "log/slog" + "sync" "time" "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/binding" @@ -55,6 +56,7 @@ type WebSubReceiver struct { syncProducer *subscription.SyncProducer brokerDriver connectors.BrokerDriver channel connectors.ChannelInfo + channelMu sync.RWMutex opts Options } @@ -98,17 +100,28 @@ func NewReceiver(cfg connectors.ReceiverConfig, opts Options) (connectors.Receiv verificationTimeout := time.Duration(opts.VerificationTimeoutSeconds) * time.Second + receiver := &WebSubReceiver{ + deliverer: deliverer, + topics: topics, + store: store, + consumerMgr: consumerMgr, + syncProducer: syncProducer, + brokerDriver: cfg.BrokerDriver, + channel: cfg.Channel, + opts: opts, + } + // Create HubHandler for subscribe/unsubscribe on {context}/{version}/hub. hubHandler := NewHubHandler( topics, store, verificationTimeout, opts.DefaultLeaseSeconds, cfg.Processor, cfg.BrokerDriver, cfg.Channel.Name, - cfg.Channel.Channels, consumerMgr, syncProducer, + cfg.Channel.Channels, &receiver.channelMu, consumerMgr, syncProducer, ) // Create WebhookReceiverHandler for ingress on {context}/{version}/webhook-receiver. webhookHandler := NewWebhookReceiverHandler( topics, cfg.Processor, cfg.BrokerDriver, - cfg.Channel.Name, cfg.Channel.Channels, + cfg.Channel.Name, cfg.Channel.Channels, &receiver.channelMu, ) // Register handlers on shared mux. @@ -116,18 +129,10 @@ func NewReceiver(cfg connectors.ReceiverConfig, opts Options) (connectors.Receiv cfg.Mux.Handle(basePath+"/hub", hubHandler) cfg.Mux.Handle(basePath+"/webhook-receiver", webhookHandler) - return &WebSubReceiver{ - hubHandler: hubHandler, - webhookHandler: webhookHandler, - deliverer: deliverer, - topics: topics, - store: store, - consumerMgr: consumerMgr, - syncProducer: syncProducer, - brokerDriver: cfg.BrokerDriver, - channel: cfg.Channel, - opts: opts, - }, nil + receiver.hubHandler = hubHandler + receiver.webhookHandler = webhookHandler + + return receiver, nil } // Start ensures Kafka topics exist and sets up the consumer manager context. diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go index 59ba534299..f687465c24 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go @@ -59,11 +59,15 @@ func (e *WebSubReceiver) ApplyBindingDelta(ctx context.Context, removedChannels } } + e.channelMu.Lock() delete(e.channel.Channels, channelName) + e.channelMu.Unlock() } for channelName, kafkaTopic := range addedChannels { + e.channelMu.Lock() e.channel.Channels[channelName] = kafkaTopic + e.channelMu.Unlock() e.topics.Register(channelName) } diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go index 02ac6a981e..b3c0e737e7 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go @@ -25,6 +25,7 @@ import ( "log/slog" "net/http" "strconv" + "sync" "time" "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" @@ -41,6 +42,7 @@ type HubHandler struct { brokerDriver connectors.BrokerDriver bindingName string channels map[string]string // channel-name → Kafka topic + channelMu *sync.RWMutex consumerMgr *ConsumerManager syncProducer *subscription.SyncProducer defaultLease int @@ -56,6 +58,7 @@ func NewHubHandler( brokerDriver connectors.BrokerDriver, bindingName string, channels map[string]string, + channelMu *sync.RWMutex, consumerMgr *ConsumerManager, syncProducer *subscription.SyncProducer, ) *HubHandler { @@ -67,6 +70,7 @@ func NewHubHandler( brokerDriver: brokerDriver, bindingName: bindingName, channels: channels, + channelMu: channelMu, consumerMgr: consumerMgr, syncProducer: syncProducer, defaultLease: defaultLease, @@ -131,7 +135,9 @@ func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { } // Resolve the Kafka topic for this channel. + h.channelMu.RLock() kafkaTopic, ok := h.channels[topic] + h.channelMu.RUnlock() if !ok { http.Error(w, fmt.Sprintf("no kafka topic for channel: %s", topic), http.StatusNotFound) return @@ -239,7 +245,9 @@ func (h *HubHandler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { } // Stop/update per-callback consumer. + h.channelMu.RLock() kafkaTopic := h.channels[topic] + h.channelMu.RUnlock() if kafkaTopic != "" { if err := h.consumerMgr.RemoveSubscription(callback, kafkaTopic); err != nil { slog.Error("Failed to update consumer on unsubscribe", "callback", callback, "error", err) @@ -271,6 +279,7 @@ type WebhookReceiverHandler struct { brokerDriver connectors.BrokerDriver bindingName string channels map[string]string // channel-name → Kafka topic + channelMu *sync.RWMutex } // NewWebhookReceiverHandler creates a new webhook receiver handler. @@ -280,6 +289,7 @@ func NewWebhookReceiverHandler( brokerDriver connectors.BrokerDriver, bindingName string, channels map[string]string, + channelMu *sync.RWMutex, ) *WebhookReceiverHandler { return &WebhookReceiverHandler{ topics: topics, @@ -287,6 +297,7 @@ func NewWebhookReceiverHandler( brokerDriver: brokerDriver, bindingName: bindingName, channels: channels, + channelMu: channelMu, } } @@ -309,7 +320,9 @@ func (h *WebhookReceiverHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques return } + h.channelMu.RLock() kafkaTopic, ok := h.channels[channelName] + h.channelMu.RUnlock() if !ok { http.Error(w, fmt.Sprintf("no kafka topic for channel: %s", channelName), http.StatusNotFound) return From f5a4c1753a09be74ed8f7b3bbdbc774cfbedd5cb Mon Sep 17 00:00:00 2001 From: AnujaK Date: Sun, 10 May 2026 23:13:16 +0530 Subject: [PATCH 11/19] Release r.mu before broker-driver and receiver operations --- .../internal/runtime/runtime.go | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime.go b/event-gateway/gateway-runtime/internal/runtime/runtime.go index 029c2e4841..be484d6822 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime.go @@ -904,19 +904,21 @@ func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding } r.mu.Lock() - defer r.mu.Unlock() receiver, ok := r.activeReceivers[oldWSB.Name] if !ok { + r.mu.Unlock() return fmt.Errorf("active receiver not found for WebSubApi %q", oldWSB.Name) } updater, ok := receiver.(webSubBindingUpdater) if !ok { + r.mu.Unlock() return fmt.Errorf("receiver for WebSubApi %q does not support delta updates", oldWSB.Name) } brokerDriver, ok := r.activeBrokerDrivers[oldWSB.Name] if !ok { + r.mu.Unlock() return fmt.Errorf("active broker-driver not found for WebSubApi %q", oldWSB.Name) } @@ -924,6 +926,7 @@ func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding newChannels := webSubChannelTopicMap(newWSB) removedChannels, addedChannels := diffChannelTopics(oldChannels, newChannels) oldBinding := r.hub.GetBinding(oldWSB.Name) + r.mu.Unlock() if len(addedChannels) > 0 { topicsToEnsure := make([]string, 0, len(addedChannels)) @@ -962,6 +965,21 @@ func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding return fmt.Errorf("failed to build chains for updated WebSubApi %q: %w", newWSB.Name, err) } + r.mu.Lock() + defer r.mu.Unlock() + + currentReceiver, ok := r.activeReceivers[oldWSB.Name] + if !ok || currentReceiver != receiver { + return fmt.Errorf("active receiver changed during WebSubApi update for %q", newWSB.Name) + } + currentBrokerDriver, ok := r.activeBrokerDrivers[oldWSB.Name] + if !ok || currentBrokerDriver != brokerDriver { + return fmt.Errorf("active broker-driver changed during WebSubApi update for %q", newWSB.Name) + } + if r.hub.GetBinding(oldWSB.Name) != oldBinding { + return fmt.Errorf("hub binding changed during WebSubApi update for %q", newWSB.Name) + } + r.unregisterStaleBindingChains(oldBinding, webSubActiveChainKeys(newWSB, vhost)) r.hub.RegisterBinding(hub.ChannelBinding{ APIID: newWSB.APIID, From f33f8f0e1bc4ec3be4be6b075b17aa479c7bb268 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Sun, 10 May 2026 23:18:56 +0530 Subject: [PATCH 12/19] Build the replacement policy chains before applying the live delta --- .../gateway-runtime/internal/runtime/runtime.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime.go b/event-gateway/gateway-runtime/internal/runtime/runtime.go index be484d6822..93f8dea8ba 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime.go @@ -928,6 +928,12 @@ func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding oldBinding := r.hub.GetBinding(oldWSB.Name) r.mu.Unlock() + vhost := defaultVhost(newWSB.Vhost) + subKey, inKey, outKey, chChainKeys, err := r.buildWebSubApiPolicyChains(newWSB, vhost) + if err != nil { + return fmt.Errorf("failed to build chains for updated WebSubApi %q: %w", newWSB.Name, err) + } + if len(addedChannels) > 0 { topicsToEnsure := make([]string, 0, len(addedChannels)) for _, kafkaTopic := range addedChannels { @@ -959,12 +965,6 @@ func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding cancel() } - vhost := defaultVhost(newWSB.Vhost) - subKey, inKey, outKey, chChainKeys, err := r.buildWebSubApiPolicyChains(newWSB, vhost) - if err != nil { - return fmt.Errorf("failed to build chains for updated WebSubApi %q: %w", newWSB.Name, err) - } - r.mu.Lock() defer r.mu.Unlock() From d85ad09976e1852590cb18da9c775c04b9a242d4 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 11 May 2026 09:55:15 +0530 Subject: [PATCH 13/19] Use the configured subscription-sync topic helper in UpdateWebsubBinding --- event-gateway/gateway-runtime/internal/runtime/runtime.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event-gateway/gateway-runtime/internal/runtime/runtime.go b/event-gateway/gateway-runtime/internal/runtime/runtime.go index 93f8dea8ba..a324620dc5 100644 --- a/event-gateway/gateway-runtime/internal/runtime/runtime.go +++ b/event-gateway/gateway-runtime/internal/runtime/runtime.go @@ -995,7 +995,7 @@ func (r *Runtime) UpdateWebSubApiBinding(oldWSB, newWSB binding.WebSubApiBinding ChannelChainKeys: chChainKeys, }) - internalSubTopic := binding.WebSubApiSubscriptionTopic(newWSB.Name, newWSB.Version) + internalSubTopic := r.webSubSubscriptionSyncTopic(newWSB.Name, newWSB.Version) r.bindingTopics[newWSB.Name] = webSubTopicList(newChannels, internalSubTopic) slog.Info("Dynamically updated WebSubApi binding", From f9c705b5e178d89814ed525f032279c2421d1e50 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 11 May 2026 09:57:11 +0530 Subject: [PATCH 14/19] Return immediately after the 404 response --- .../gateway-controller/pkg/api/handlers/websub_api_handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go b/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go index f71c6eb048..15b46a4144 100644 --- a/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go +++ b/gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go @@ -202,6 +202,7 @@ func (s *APIServer) UpdateWebSubAPI(c *gin.Context, id string) { Status: "error", Message: fmt.Sprintf("WebSub API configuration with handle '%s' not found", handle), }) + return } else if storage.IsConflictError(err) { c.JSON(http.StatusConflict, api.ErrorResponse{ Status: "error", From 9309125e48480658369bb015bb4d520c0c799b7c Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 11 May 2026 10:13:59 +0530 Subject: [PATCH 15/19] Preserve the existing handle when metadata.name is omitted --- gateway/gateway-controller/pkg/service/websubapi/service.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gateway/gateway-controller/pkg/service/websubapi/service.go b/gateway/gateway-controller/pkg/service/websubapi/service.go index a1c44292c2..305fcbba41 100644 --- a/gateway/gateway-controller/pkg/service/websubapi/service.go +++ b/gateway/gateway-controller/pkg/service/websubapi/service.go @@ -149,6 +149,10 @@ func (s *WebSubAPIService) Update(params UpdateParams) (*UpdateResult, error) { return &UpdateResult{Config: result.StoredConfig}, nil } + if apiConfig.Metadata.Name == "" { + apiConfig.Metadata.Name = params.Handle + } + existing.Configuration = apiConfig existing.SourceConfiguration = apiConfig From d4ebbf11fac885805d55c370287f5416b2763d5e Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 11 May 2026 10:24:39 +0530 Subject: [PATCH 16/19] Fix possibility of rendered WebSub config beign written back to storage --- gateway/gateway-controller/pkg/service/websubapi/service.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gateway/gateway-controller/pkg/service/websubapi/service.go b/gateway/gateway-controller/pkg/service/websubapi/service.go index 305fcbba41..4a9208aca5 100644 --- a/gateway/gateway-controller/pkg/service/websubapi/service.go +++ b/gateway/gateway-controller/pkg/service/websubapi/service.go @@ -156,11 +156,12 @@ func (s *WebSubAPIService) Update(params UpdateParams) (*UpdateResult, error) { existing.Configuration = apiConfig existing.SourceConfiguration = apiConfig - if err := templateengine.RenderSpec(existing, s.secretResolver, log); err != nil { + renderedExisting := *existing + if err := templateengine.RenderSpec(&renderedExisting, s.secretResolver, log); err != nil { return nil, err } - renderedConfig, ok := existing.Configuration.(api.WebSubAPI) + renderedConfig, ok := renderedExisting.Configuration.(api.WebSubAPI) if !ok { return nil, fmt.Errorf("failed to render websub api configuration") } From 97fd9a8684b276d0d18247220da0fdbbd2e59018 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 11 May 2026 11:37:18 +0530 Subject: [PATCH 17/19] Revert websub/handler changes --- .../internal/connectors/receiver/websub/handler.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go index 8ee3691b61..02ac6a981e 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go @@ -99,14 +99,14 @@ func (h *HubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { // Enforce subscribe policies before processing. subMsg := httpRequestToMessage(r) - _, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) + message, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) if err != nil { slog.Error("Subscribe policy execution failed", "error", err) http.Error(w, "policy execution failed", http.StatusInternalServerError) return } if shortCircuited { - writePolicyResponse(w, nil, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) + writePolicyResponse(w, message, http.StatusForbidden, "forbidden by policy") return } @@ -203,14 +203,14 @@ func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { func (h *HubHandler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { // Enforce subscribe policies before processing. subMsg := httpRequestToMessage(r) - _, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) + message, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) if err != nil { slog.Error("Unsubscribe policy execution failed", "error", err) http.Error(w, "policy execution failed", http.StatusInternalServerError) return } if shortCircuited { - writePolicyResponse(w, nil, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) + writePolicyResponse(w, message, http.StatusForbidden, "forbidden by policy") return } @@ -342,7 +342,7 @@ func (h *WebhookReceiverHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques } if shortCircuited { slog.Info("Inbound request rejected by policy", "channel", channelName, "binding", h.bindingName) - writePolicyResponse(w, processed, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) + writePolicyResponse(w, processed, http.StatusForbidden, "forbidden by policy") return } @@ -379,9 +379,6 @@ func writePolicyResponse(w http.ResponseWriter, msg *connectors.Message, fallbac w.WriteHeader(statusCode) return } - if fallbackStatus == http.StatusUnauthorized { - w.Header().Set("WWW-Authenticate", `Bearer realm="event-gateway"`) - } http.Error(w, fallbackBody, fallbackStatus) } From 88948d747c220601e956729eaf8376e21225694d Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 11 May 2026 12:50:59 +0530 Subject: [PATCH 18/19] Fix it build failure --- .../pkg/api/handlers/handlers_test.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/gateway/gateway-controller/pkg/api/handlers/handlers_test.go b/gateway/gateway-controller/pkg/api/handlers/handlers_test.go index 98a4c81479..d836e1af45 100644 --- a/gateway/gateway-controller/pkg/api/handlers/handlers_test.go +++ b/gateway/gateway-controller/pkg/api/handlers/handlers_test.go @@ -3034,21 +3034,6 @@ func TestUpdateWebSubAPIUndeployUsesDedicatedServicePath(t *testing.T) { Handle: "repo-watcher-v1-0", DisplayName: "repo-watcher", Version: "v1.0", - SourceConfiguration: api.WebSubAPI{ - ApiVersion: api.WebSubAPIApiVersionGatewayApiPlatformWso2Comv1alpha1, - Kind: api.WebSubAPIKindWebSubApi, - Metadata: api.Metadata{ - Name: "repo-watcher-v1-0", - }, - Spec: api.WebhookAPIData{ - DisplayName: "repo-watcher", - Version: "v1.0", - Context: "/repos", - Hub: api.WebSubHub{ - Channels: []api.HubChannel{{Name: "issues"}}, - }, - }, - }, DesiredState: models.StateDeployed, Origin: models.OriginGatewayAPI, CreatedAt: time.Now(), From d32a52de3a1c8c98718d4af6f7382a551c34c7c9 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 11 May 2026 13:56:40 +0530 Subject: [PATCH 19/19] Add test fix --- .../pkg/api/handlers/handlers_test.go | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/gateway/gateway-controller/pkg/api/handlers/handlers_test.go b/gateway/gateway-controller/pkg/api/handlers/handlers_test.go index d836e1af45..d588f0a243 100644 --- a/gateway/gateway-controller/pkg/api/handlers/handlers_test.go +++ b/gateway/gateway-controller/pkg/api/handlers/handlers_test.go @@ -3034,6 +3034,36 @@ func TestUpdateWebSubAPIUndeployUsesDedicatedServicePath(t *testing.T) { Handle: "repo-watcher-v1-0", DisplayName: "repo-watcher", Version: "v1.0", + Configuration: api.WebSubAPI{ + ApiVersion: api.WebSubAPIApiVersionGatewayApiPlatformWso2Comv1alpha1, + Kind: api.WebSubAPIKindWebSubApi, + Metadata: api.Metadata{ + Name: "repo-watcher-v1-0", + }, + Spec: api.WebhookAPIData{ + DisplayName: "repo-watcher", + Version: "v1.0", + Context: "/repos", + Hub: api.WebSubHub{ + Channels: []api.HubChannel{{Name: "issues"}}, + }, + }, + }, + SourceConfiguration: api.WebSubAPI{ + ApiVersion: api.WebSubAPIApiVersionGatewayApiPlatformWso2Comv1alpha1, + Kind: api.WebSubAPIKindWebSubApi, + Metadata: api.Metadata{ + Name: "repo-watcher-v1-0", + }, + Spec: api.WebhookAPIData{ + DisplayName: "repo-watcher", + Version: "v1.0", + Context: "/repos", + Hub: api.WebSubHub{ + Channels: []api.HubChannel{{Name: "issues"}}, + }, + }, + }, DesiredState: models.StateDeployed, Origin: models.OriginGatewayAPI, CreatedAt: time.Now(),