From 92ae86607b95f61abee28c81f3f64ddf415c1cae Mon Sep 17 00:00:00 2001 From: AnujaK Date: Fri, 8 May 2026 13:15:27 +0530 Subject: [PATCH 1/8] Make the compacted topic's partition/replication settings configurable --- .../gateway-runtime/configs/config.toml | 3 + .../gateway-runtime/internal/config/config.go | 36 ++++++--- .../connectors/brokerdriver/kafka/config.go | 73 ++++++++++++++----- .../connectors/brokerdriver/kafka/endpoint.go | 12 ++- 4 files changed, 91 insertions(+), 33 deletions(-) diff --git a/event-gateway/gateway-runtime/configs/config.toml b/event-gateway/gateway-runtime/configs/config.toml index 687b18b53..ff8f44461 100644 --- a/event-gateway/gateway-runtime/configs/config.toml +++ b/event-gateway/gateway-runtime/configs/config.toml @@ -26,6 +26,9 @@ metrics_port = 9003 # Default Kafka brokers. Channels can override with broker-driver.config.brokers. brokers = ["localhost:9092"] consumer_group_prefix = "event-gateway" +# Partitions and replication factor used for internal compacted topics. +compact_topic_partitions = 1 +compact_topic_replication_factor = 1 tls = false # Optional PEM CA file for self-signed or private Kafka CAs. # tls_ca_file = "/etc/event-gateway/kafka/ca.crt" diff --git a/event-gateway/gateway-runtime/internal/config/config.go b/event-gateway/gateway-runtime/internal/config/config.go index e7ae25cd3..198decd3f 100644 --- a/event-gateway/gateway-runtime/internal/config/config.go +++ b/event-gateway/gateway-runtime/internal/config/config.go @@ -57,16 +57,18 @@ type ServerConfig struct { // KafkaConfig holds Kafka connection settings. type KafkaConfig struct { - Brokers []string `koanf:"brokers"` - ConsumerGroupPrefix string `koanf:"consumer_group_prefix"` - TLS bool `koanf:"tls"` - TLSCAFile string `koanf:"tls_ca_file"` - TLSCertFile string `koanf:"tls_cert_file"` - TLSKeyFile string `koanf:"tls_key_file"` - TLSServerName string `koanf:"tls_server_name"` - SASLMechanism string `koanf:"sasl_mechanism"` - SASLUsername string `koanf:"sasl_username"` - SASLPassword string `koanf:"sasl_password"` + Brokers []string `koanf:"brokers"` + ConsumerGroupPrefix string `koanf:"consumer_group_prefix"` + CompactTopicPartitions int `koanf:"compact_topic_partitions"` + CompactTopicReplicationFactor int `koanf:"compact_topic_replication_factor"` + TLS bool `koanf:"tls"` + TLSCAFile string `koanf:"tls_ca_file"` + TLSCertFile string `koanf:"tls_cert_file"` + TLSKeyFile string `koanf:"tls_key_file"` + TLSServerName string `koanf:"tls_server_name"` + SASLMechanism string `koanf:"sasl_mechanism"` + SASLUsername string `koanf:"sasl_username"` + SASLPassword string `koanf:"sasl_password"` } // WebSubConfig holds WebSub-specific settings. @@ -111,8 +113,10 @@ func DefaultConfig() *Config { MetricsPort: 9003, }, Kafka: KafkaConfig{ - Brokers: []string{"localhost:9092"}, - ConsumerGroupPrefix: "event-gateway", + Brokers: []string{"localhost:9092"}, + ConsumerGroupPrefix: "event-gateway", + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, }, WebSub: WebSubConfig{ VerificationTimeoutSeconds: 10, @@ -223,6 +227,8 @@ func mapEnvValue(path, value string) interface{} { "server.websocket_port", "server.admin_port", "server.metrics_port", + "kafka.compact_topic_partitions", + "kafka.compact_topic_replication_factor", "websub.verification_timeout_seconds", "websub.delivery_max_retries", "websub.delivery_initial_delay_ms", @@ -295,6 +301,12 @@ func validateKafkaConfig(kafkaCfg KafkaConfig) error { if len(kafkaCfg.Brokers) == 0 { return fmt.Errorf("kafka.brokers must contain at least one broker") } + if kafkaCfg.CompactTopicPartitions <= 0 { + return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", kafkaCfg.CompactTopicPartitions) + } + if kafkaCfg.CompactTopicReplicationFactor <= 0 { + return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", kafkaCfg.CompactTopicReplicationFactor) + } if kafkaCfg.TLS { if strings.TrimSpace(kafkaCfg.TLSCAFile) != "" { diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go index 914fa6c85..5a0ce9781 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go @@ -34,29 +34,39 @@ import ( // ConnectionConfig holds the Kafka connection settings used by the driver. type ConnectionConfig struct { - Brokers []string - TLS bool - TLSCAFile string - TLSCertFile string - TLSKeyFile string - TLSServerName string - SASLMechanism string - SASLUsername string - SASLPassword string + Brokers []string + CompactTopicPartitions int + CompactTopicReplicationFactor int + TLS bool + TLSCAFile string + TLSCertFile string + TLSKeyFile string + TLSServerName string + SASLMechanism string + SASLUsername string + SASLPassword string } // ResolveConnectionConfig merges global runtime config with per-binding overrides. func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]interface{}) (ConnectionConfig, error) { cfg := ConnectionConfig{ - Brokers: append([]string(nil), global.Brokers...), - TLS: global.TLS, - TLSCAFile: global.TLSCAFile, - TLSCertFile: global.TLSCertFile, - TLSKeyFile: global.TLSKeyFile, - TLSServerName: global.TLSServerName, - SASLMechanism: global.SASLMechanism, - SASLUsername: global.SASLUsername, - SASLPassword: global.SASLPassword, + Brokers: append([]string(nil), global.Brokers...), + CompactTopicPartitions: global.CompactTopicPartitions, + CompactTopicReplicationFactor: global.CompactTopicReplicationFactor, + TLS: global.TLS, + TLSCAFile: global.TLSCAFile, + TLSCertFile: global.TLSCertFile, + TLSKeyFile: global.TLSKeyFile, + TLSServerName: global.TLSServerName, + SASLMechanism: global.SASLMechanism, + SASLUsername: global.SASLUsername, + SASLPassword: global.SASLPassword, + } + if cfg.CompactTopicPartitions <= 0 { + cfg.CompactTopicPartitions = 1 + } + if cfg.CompactTopicReplicationFactor <= 0 { + cfg.CompactTopicReplicationFactor = 1 } if overrides != nil { @@ -65,6 +75,16 @@ func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]int } else if ok { cfg.Brokers = brokers } + if v, ok, err := intOverride(overrides["compact_topic_partitions"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.CompactTopicPartitions = v + } + if v, ok, err := intOverride(overrides["compact_topic_replication_factor"]); err != nil { + return ConnectionConfig{}, err + } else if ok { + cfg.CompactTopicReplicationFactor = v + } if v, ok, err := boolOverride(overrides["tls"]); err != nil { return ConnectionConfig{}, err } else if ok { @@ -135,6 +155,12 @@ func validateConnectionConfig(cfg ConnectionConfig) error { if len(cfg.Brokers) == 0 { return fmt.Errorf("kafka brokers must not be empty") } + if cfg.CompactTopicPartitions <= 0 { + return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions) + } + if cfg.CompactTopicReplicationFactor <= 0 { + return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor) + } if !cfg.TLS { if cfg.TLSCAFile != "" || cfg.TLSCertFile != "" || cfg.TLSKeyFile != "" || cfg.TLSServerName != "" { @@ -264,6 +290,17 @@ func boolOverride(value interface{}) (bool, bool, error) { return v, true, nil } +func intOverride(value interface{}) (int, bool, error) { + if value == nil { + return 0, false, nil + } + v, ok := value.(int) + if !ok { + return 0, false, fmt.Errorf("expected int override, got %T", value) + } + return v, true, nil +} + func stringOverride(value interface{}) (string, bool, error) { if value == nil { return "", false, nil diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go index ffa77ffeb..d8367526e 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go @@ -123,9 +123,15 @@ func (e *KafkaBrokerDriver) EnsureTopics(ctx context.Context, topics []string) e // EnsureCompactedTopic creates a compacted topic if it does not already exist. func (e *KafkaBrokerDriver) EnsureCompactedTopic(ctx context.Context, topic string) error { - resp, err := e.admin.CreateTopics(ctx, 1, 1, map[string]*string{ - "cleanup.policy": kadm.StringPtr("compact"), - }, topic) + resp, err := e.admin.CreateTopics( + ctx, + int32(e.cfg.CompactTopicPartitions), + int16(e.cfg.CompactTopicReplicationFactor), + map[string]*string{ + "cleanup.policy": kadm.StringPtr("compact"), + }, + topic, + ) if err != nil { return fmt.Errorf("failed to create compacted topic %s: %w", topic, err) } From 68db625c3b9bb05fbef43acdf1f51ea8ca1f86dc Mon Sep 17 00:00:00 2001 From: AnujaK Date: Sat, 9 May 2026 00:19:57 +0530 Subject: [PATCH 2/8] Avoid silently defaulting invalid compact-topic values --- .../internal/connectors/brokerdriver/kafka/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go index 5a0ce9781..7953dfe05 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go @@ -63,10 +63,10 @@ func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]int SASLPassword: global.SASLPassword, } if cfg.CompactTopicPartitions <= 0 { - cfg.CompactTopicPartitions = 1 + return ConnectionConfig{}, fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions) } if cfg.CompactTopicReplicationFactor <= 0 { - cfg.CompactTopicReplicationFactor = 1 + return ConnectionConfig{}, fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor) } if overrides != nil { From eb8b297f6f27a055e6bd5687fb671c4965809204 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Sat, 9 May 2026 00:25:13 +0530 Subject: [PATCH 3/8] intOverride rejects valid numeric values from YAML/JSON deserialization --- .../connectors/brokerdriver/kafka/config.go | 19 +++- .../brokerdriver/kafka/config_test.go | 102 +++++++++++++++--- 2 files changed, 101 insertions(+), 20 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go index 7953dfe05..94b3b225e 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "math" "os" "strings" @@ -294,11 +295,23 @@ func intOverride(value interface{}) (int, bool, error) { if value == nil { return 0, false, nil } - v, ok := value.(int) - if !ok { + switch v := value.(type) { + case int: + return v, true, nil + case float64: + if math.IsNaN(v) || math.IsInf(v, 0) { + return 0, false, fmt.Errorf("expected integer Kafka config override, got non-finite float64 %v", v) + } + if v != math.Trunc(v) { + return 0, false, fmt.Errorf("expected integer Kafka config override, got non-integer float64 %v", v) + } + if v < math.MinInt32 || v > math.MaxInt32 { + return 0, false, fmt.Errorf("expected integer Kafka config override within [%d, %d], got float64 %v", math.MinInt32, math.MaxInt32, v) + } + return int(v), true, nil + default: return 0, false, fmt.Errorf("expected int override, got %T", value) } - return v, true, nil } func stringOverride(value interface{}) (string, bool, error) { diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go index 3a46a34f7..abe611092 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go @@ -1,9 +1,11 @@ package kafka import ( + "math" "os" "path/filepath" "reflect" + "strings" "testing" runtimeconfig "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/config" @@ -17,13 +19,15 @@ func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) { } global := runtimeconfig.KafkaConfig{ - Brokers: []string{"broker-1:9092"}, - TLS: true, - TLSCAFile: caPath, - TLSServerName: "global-kafka", - SASLMechanism: "plain", - SASLUsername: "global-user", - SASLPassword: "global-pass", + Brokers: []string{"broker-1:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLS: true, + TLSCAFile: caPath, + TLSServerName: "global-kafka", + SASLMechanism: "plain", + SASLUsername: "global-user", + SASLPassword: "global-pass", } resolved, err := ResolveConnectionConfig(global, map[string]interface{}{ @@ -57,7 +61,10 @@ func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) { } func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) { - resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{ + resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + }, map[string]interface{}{ "brokers": []interface{}{"broker:9092"}, "sasl_mechanism": "plain", "sasl_username": " user-with-spaces ", @@ -77,8 +84,10 @@ func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) { func TestResolveConnectionConfig_RequiresTLSWhenTLSFilesAreConfigured(t *testing.T) { _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ - Brokers: []string{"broker:9092"}, - TLSCAFile: "/tmp/ca.crt", + Brokers: []string{"broker:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLSCAFile: "/tmp/ca.crt", }, nil) if err == nil { t.Fatalf("expected error when TLS files are set with TLS disabled") @@ -93,18 +102,22 @@ func TestResolveConnectionConfig_ValidatesReadableTLSFiles(t *testing.T) { } _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ - Brokers: []string{"broker:9092"}, - TLS: true, - TLSCAFile: caPath, + Brokers: []string{"broker:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLS: true, + TLSCAFile: caPath, }, nil) if err != nil { t.Fatalf("expected readable CA file to validate, got %v", err) } _, err = ResolveConnectionConfig(runtimeconfig.KafkaConfig{ - Brokers: []string{"broker:9092"}, - TLS: true, - TLSCAFile: filepath.Join(tempDir, "missing.crt"), + Brokers: []string{"broker:9092"}, + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + TLS: true, + TLSCAFile: filepath.Join(tempDir, "missing.crt"), }, nil) if err == nil { t.Fatalf("expected missing CA file to fail validation") @@ -112,7 +125,10 @@ func TestResolveConnectionConfig_ValidatesReadableTLSFiles(t *testing.T) { } func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) { - _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{ + _, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{ + CompactTopicPartitions: 1, + CompactTopicReplicationFactor: 1, + }, map[string]interface{}{ "brokers": []interface{}{"broker:9092"}, "sasl_mechanism": "scram-sha-512", "sasl_username": "user", @@ -121,3 +137,55 @@ func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) { t.Fatalf("expected missing SASL password to fail validation") } } + +func TestIntOverride_AcceptsIntegerFloat64(t *testing.T) { + got, ok, err := intOverride(float64(3)) + if err != nil { + t.Fatalf("expected integer float64 override to succeed, got %v", err) + } + if !ok { + t.Fatalf("expected integer float64 override to be accepted") + } + if got != 3 { + t.Fatalf("expected integer float64 override to convert to 3, got %d", got) + } +} + +func TestIntOverride_RejectsInvalidFloat64(t *testing.T) { + tests := []struct { + name string + value float64 + wantErr string + }{ + { + name: "non integer", + value: 3.5, + wantErr: "non-integer", + }, + { + name: "out of bounds", + value: float64(math.MaxInt32) + 1, + wantErr: "within", + }, + { + name: "non finite", + value: math.NaN(), + wantErr: "non-finite", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, ok, err := intOverride(tt.value) + if err == nil { + t.Fatalf("expected float64 override %v to fail", tt.value) + } + if ok { + t.Fatalf("expected invalid float64 override %v to be rejected", tt.value) + } + if !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("expected error %q to contain %q", err.Error(), tt.wantErr) + } + }) + } +} From 070698b13c8225d118978704affb50ea03739511 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Sat, 9 May 2026 00:27:01 +0530 Subject: [PATCH 4/8] Add upper-bound validation before narrowing numeric casts --- .../internal/connectors/brokerdriver/kafka/config.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go index 94b3b225e..045ce595f 100644 --- a/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go +++ b/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go @@ -159,9 +159,15 @@ func validateConnectionConfig(cfg ConnectionConfig) error { if cfg.CompactTopicPartitions <= 0 { return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions) } + if cfg.CompactTopicPartitions > math.MaxInt32 { + return fmt.Errorf("kafka.compact_topic_partitions must be <= %d, got %d", math.MaxInt32, cfg.CompactTopicPartitions) + } if cfg.CompactTopicReplicationFactor <= 0 { return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor) } + if cfg.CompactTopicReplicationFactor > math.MaxInt16 { + return fmt.Errorf("kafka.compact_topic_replication_factor must be <= %d, got %d", math.MaxInt16, cfg.CompactTopicReplicationFactor) + } if !cfg.TLS { if cfg.TLSCAFile != "" || cfg.TLSCertFile != "" || cfg.TLSKeyFile != "" || cfg.TLSServerName != "" { From 479abfaa353ec4d3ea5dd9ef3d62f604e0996c72 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 4 May 2026 10:20:28 +0530 Subject: [PATCH 5/8] Update consumer grp hash length --- .../internal/connectors/receiver/websub/consumer_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go index f909444aa..45e159863 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go @@ -218,8 +218,8 @@ func (cm *ConsumerManager) createConsumer(groupID string, topics []string, callb } // consumerGroupID generates a unique, safe consumer group ID for a callback URL. -// Format: {prefix}-websub-{sha256(callbackURL)[:16]} +// Format: {prefix}-websub-{sha256(callbackURL)[:32]} func (cm *ConsumerManager) consumerGroupID(callbackURL string) string { h := sha256.Sum256([]byte(callbackURL)) - return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:16] + return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:32] } From f07a7129473f1c06f39e7183aab8e16758c5bf22 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 4 May 2026 10:24:31 +0530 Subject: [PATCH 6/8] Update forbidden by policy to return 401 --- .../internal/connectors/receiver/websub/handler.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go index 5ac7b5871..ba417ffc1 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go @@ -99,14 +99,14 @@ func (h *HubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { // Enforce subscribe policies before processing. subMsg := httpRequestToMessage(r) - message, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) + _, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) if err != nil { slog.Error("Subscribe policy execution failed", "error", err) http.Error(w, "policy execution failed", http.StatusInternalServerError) return } if shortCircuited { - writePolicyResponse(w, message, http.StatusForbidden, "forbidden by policy") + writePolicyResponse(w, nil, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) return } @@ -203,14 +203,14 @@ func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { func (h *HubHandler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { // Enforce unsubscribe policies before processing. subMsg := httpRequestToMessage(r) - message, shortCircuited, err := h.processor.ProcessUnsubscribe(r.Context(), h.bindingName, subMsg) + _, shortCircuited, err := h.processor.ProcessUnsubscribe(r.Context(), h.bindingName, subMsg) if err != nil { slog.Error("Unsubscribe policy execution failed", "error", err) http.Error(w, "policy execution failed", http.StatusInternalServerError) return } if shortCircuited { - writePolicyResponse(w, message, http.StatusForbidden, "forbidden by policy") + writePolicyResponse(w, nil, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) return } @@ -342,7 +342,7 @@ func (h *WebhookReceiverHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques } if shortCircuited { slog.Info("Inbound request rejected by policy", "channel", channelName, "binding", h.bindingName) - writePolicyResponse(w, processed, http.StatusForbidden, "forbidden by policy") + writePolicyResponse(w, processed, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) return } From e0ceb4d049cee596bc0405b6528679d91f9c62e5 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 4 May 2026 12:08:41 +0530 Subject: [PATCH 7/8] Fix missing mandatory WWW-Authenticate header --- .../internal/connectors/receiver/websub/handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go index ba417ffc1..e31c43bbb 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go @@ -379,6 +379,9 @@ func writePolicyResponse(w http.ResponseWriter, msg *connectors.Message, fallbac w.WriteHeader(statusCode) return } + if fallbackStatus == http.StatusUnauthorized { + w.Header().Set("WWW-Authenticate", `Bearer realm="event-gateway"`) + } http.Error(w, fallbackBody, fallbackStatus) } From 01f4293ffcc3dc560361393edc15f28508733a26 Mon Sep 17 00:00:00 2001 From: AnujaK Date: Mon, 11 May 2026 11:37:18 +0530 Subject: [PATCH 8/8] Revert websub/handler changes --- .../internal/connectors/receiver/websub/handler.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go index e31c43bbb..5ac7b5871 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go @@ -99,14 +99,14 @@ func (h *HubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { // Enforce subscribe policies before processing. subMsg := httpRequestToMessage(r) - _, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) + message, shortCircuited, err := h.processor.ProcessSubscribe(r.Context(), h.bindingName, subMsg) if err != nil { slog.Error("Subscribe policy execution failed", "error", err) http.Error(w, "policy execution failed", http.StatusInternalServerError) return } if shortCircuited { - writePolicyResponse(w, nil, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) + writePolicyResponse(w, message, http.StatusForbidden, "forbidden by policy") return } @@ -203,14 +203,14 @@ func (h *HubHandler) handleSubscribe(w http.ResponseWriter, r *http.Request) { func (h *HubHandler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { // Enforce unsubscribe policies before processing. subMsg := httpRequestToMessage(r) - _, shortCircuited, err := h.processor.ProcessUnsubscribe(r.Context(), h.bindingName, subMsg) + message, shortCircuited, err := h.processor.ProcessUnsubscribe(r.Context(), h.bindingName, subMsg) if err != nil { slog.Error("Unsubscribe policy execution failed", "error", err) http.Error(w, "policy execution failed", http.StatusInternalServerError) return } if shortCircuited { - writePolicyResponse(w, nil, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) + writePolicyResponse(w, message, http.StatusForbidden, "forbidden by policy") return } @@ -342,7 +342,7 @@ func (h *WebhookReceiverHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques } if shortCircuited { slog.Info("Inbound request rejected by policy", "channel", channelName, "binding", h.bindingName) - writePolicyResponse(w, processed, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)) + writePolicyResponse(w, processed, http.StatusForbidden, "forbidden by policy") return } @@ -379,9 +379,6 @@ func writePolicyResponse(w http.ResponseWriter, msg *connectors.Message, fallbac w.WriteHeader(statusCode) return } - if fallbackStatus == http.StatusUnauthorized { - w.Header().Set("WWW-Authenticate", `Bearer realm="event-gateway"`) - } http.Error(w, fallbackBody, fallbackStatus) }