Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cfc3efe
Fix websub undeploy, update gateway websub handler to use websub serv…
AnujaKalahara99 May 8, 2026
a74fabc
Fix Websub api delta update on event-gateway
AnujaKalahara99 May 8, 2026
1fbd909
Make the compacted topic's partition/replication settings configurable
AnujaKalahara99 May 8, 2026
7de2034
Merge pull request #1918 from AnujaKalahara99/egw/fix-undeploy
senthuran16 May 8, 2026
123be89
Avoid silently defaulting invalid compact-topic values
AnujaKalahara99 May 8, 2026
a560ac4
intOverride rejects valid numeric values from YAML/JSON deserialization
AnujaKalahara99 May 8, 2026
120b8dc
Add upper-bound validation before narrowing numeric casts
AnujaKalahara99 May 8, 2026
7857450
Update consumer grp hash length
AnujaKalahara99 May 4, 2026
78ec4f9
Update forbidden by policy to return 401
AnujaKalahara99 May 4, 2026
f29de76
Fix missing mandatory WWW-Authenticate header
AnujaKalahara99 May 4, 2026
8ad31b7
Confirm that ApplyBindingDelta must synchronize mutations of e.channe…
AnujaKalahara99 May 8, 2026
f5a4c17
Release r.mu before broker-driver and receiver operations
AnujaKalahara99 May 10, 2026
f33f8f0
Build the replacement policy chains before applying the live delta
AnujaKalahara99 May 10, 2026
d85ad09
Use the configured subscription-sync topic helper in UpdateWebsubBinding
AnujaKalahara99 May 11, 2026
f9c705b
Return immediately after the 404 response
AnujaKalahara99 May 11, 2026
9309125
Preserve the existing handle when metadata.name is omitted
AnujaKalahara99 May 11, 2026
d4ebbf1
Fix possibility of rendered WebSub config beign written back to storage
AnujaKalahara99 May 11, 2026
97fd9a8
Revert websub/handler changes
AnujaKalahara99 May 11, 2026
8a49616
Merge pull request #1924 from AnujaKalahara99/egw/fix-undeploy
senthuran16 May 11, 2026
91b5c1a
Merge pull request #1917 from AnujaKalahara99/egw/kafka-sub-topic-par…
senthuran16 May 11, 2026
88948d7
Fix it build failure
AnujaKalahara99 May 11, 2026
2fcdc92
Merge pull request #1927 from AnujaKalahara99/websub-kafka-improvemen…
senthuran16 May 11, 2026
d32a52d
Add test fix
AnujaKalahara99 May 11, 2026
bddfed4
Merge pull request #1928 from AnujaKalahara99/websub-kafka-improvemen…
senthuran16 May 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion event-gateway/gateway-runtime/configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ metrics_port = 9003
# Default Kafka brokers. Channels can override with broker-driver.config.brokers.
brokers = ["localhost:9092"]
consumer_group_prefix = "event-gateway"
# Partitions and replication factor used for internal compacted topics.
compact_topic_partitions = 1
compact_topic_replication_factor = 1
tls = false
# Optional PEM CA file for self-signed or private Kafka CAs.
# tls_ca_file = "/etc/event-gateway/kafka/ca.crt"
Expand Down Expand Up @@ -70,4 +73,3 @@ allow_payloads = false
enabled = true
xds_address = "localhost:18001"
# node_id = ""

36 changes: 24 additions & 12 deletions event-gateway/gateway-runtime/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,18 @@ type ServerConfig struct {

// KafkaConfig holds Kafka connection settings.
type KafkaConfig struct {
Brokers []string `koanf:"brokers"`
ConsumerGroupPrefix string `koanf:"consumer_group_prefix"`
TLS bool `koanf:"tls"`
TLSCAFile string `koanf:"tls_ca_file"`
TLSCertFile string `koanf:"tls_cert_file"`
TLSKeyFile string `koanf:"tls_key_file"`
TLSServerName string `koanf:"tls_server_name"`
SASLMechanism string `koanf:"sasl_mechanism"`
SASLUsername string `koanf:"sasl_username"`
SASLPassword string `koanf:"sasl_password"`
Brokers []string `koanf:"brokers"`
ConsumerGroupPrefix string `koanf:"consumer_group_prefix"`
CompactTopicPartitions int `koanf:"compact_topic_partitions"`
CompactTopicReplicationFactor int `koanf:"compact_topic_replication_factor"`
TLS bool `koanf:"tls"`
TLSCAFile string `koanf:"tls_ca_file"`
TLSCertFile string `koanf:"tls_cert_file"`
TLSKeyFile string `koanf:"tls_key_file"`
TLSServerName string `koanf:"tls_server_name"`
SASLMechanism string `koanf:"sasl_mechanism"`
SASLUsername string `koanf:"sasl_username"`
SASLPassword string `koanf:"sasl_password"`
}

// WebSubConfig holds WebSub-specific settings.
Expand Down Expand Up @@ -111,8 +113,10 @@ func DefaultConfig() *Config {
MetricsPort: 9003,
},
Kafka: KafkaConfig{
Brokers: []string{"localhost:9092"},
ConsumerGroupPrefix: "event-gateway",
Brokers: []string{"localhost:9092"},
ConsumerGroupPrefix: "event-gateway",
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
},
WebSub: WebSubConfig{
VerificationTimeoutSeconds: 10,
Expand Down Expand Up @@ -223,6 +227,8 @@ func mapEnvValue(path, value string) interface{} {
"server.websocket_port",
"server.admin_port",
"server.metrics_port",
"kafka.compact_topic_partitions",
"kafka.compact_topic_replication_factor",
"websub.verification_timeout_seconds",
"websub.delivery_max_retries",
"websub.delivery_initial_delay_ms",
Expand Down Expand Up @@ -295,6 +301,12 @@ func validateKafkaConfig(kafkaCfg KafkaConfig) error {
if len(kafkaCfg.Brokers) == 0 {
return fmt.Errorf("kafka.brokers must contain at least one broker")
}
if kafkaCfg.CompactTopicPartitions <= 0 {
return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", kafkaCfg.CompactTopicPartitions)
}
if kafkaCfg.CompactTopicReplicationFactor <= 0 {
return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", kafkaCfg.CompactTopicReplicationFactor)
}

if kafkaCfg.TLS {
if strings.TrimSpace(kafkaCfg.TLSCAFile) != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"math"
"os"
"strings"

Expand All @@ -34,29 +35,39 @@ import (

// ConnectionConfig holds the Kafka connection settings used by the driver.
type ConnectionConfig struct {
Brokers []string
TLS bool
TLSCAFile string
TLSCertFile string
TLSKeyFile string
TLSServerName string
SASLMechanism string
SASLUsername string
SASLPassword string
Brokers []string
CompactTopicPartitions int
CompactTopicReplicationFactor int
TLS bool
TLSCAFile string
TLSCertFile string
TLSKeyFile string
TLSServerName string
SASLMechanism string
SASLUsername string
SASLPassword string
}

// ResolveConnectionConfig merges global runtime config with per-binding overrides.
func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]interface{}) (ConnectionConfig, error) {
cfg := ConnectionConfig{
Brokers: append([]string(nil), global.Brokers...),
TLS: global.TLS,
TLSCAFile: global.TLSCAFile,
TLSCertFile: global.TLSCertFile,
TLSKeyFile: global.TLSKeyFile,
TLSServerName: global.TLSServerName,
SASLMechanism: global.SASLMechanism,
SASLUsername: global.SASLUsername,
SASLPassword: global.SASLPassword,
Brokers: append([]string(nil), global.Brokers...),
CompactTopicPartitions: global.CompactTopicPartitions,
CompactTopicReplicationFactor: global.CompactTopicReplicationFactor,
TLS: global.TLS,
TLSCAFile: global.TLSCAFile,
TLSCertFile: global.TLSCertFile,
TLSKeyFile: global.TLSKeyFile,
TLSServerName: global.TLSServerName,
SASLMechanism: global.SASLMechanism,
SASLUsername: global.SASLUsername,
SASLPassword: global.SASLPassword,
}
if cfg.CompactTopicPartitions <= 0 {
return ConnectionConfig{}, fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions)
}
if cfg.CompactTopicReplicationFactor <= 0 {
return ConnectionConfig{}, fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor)
}

if overrides != nil {
Expand All @@ -65,6 +76,16 @@ func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]int
} else if ok {
cfg.Brokers = brokers
}
if v, ok, err := intOverride(overrides["compact_topic_partitions"]); err != nil {
return ConnectionConfig{}, err
} else if ok {
cfg.CompactTopicPartitions = v
}
if v, ok, err := intOverride(overrides["compact_topic_replication_factor"]); err != nil {
return ConnectionConfig{}, err
} else if ok {
cfg.CompactTopicReplicationFactor = v
}
if v, ok, err := boolOverride(overrides["tls"]); err != nil {
return ConnectionConfig{}, err
} else if ok {
Expand Down Expand Up @@ -135,6 +156,18 @@ func validateConnectionConfig(cfg ConnectionConfig) error {
if len(cfg.Brokers) == 0 {
return fmt.Errorf("kafka brokers must not be empty")
}
if cfg.CompactTopicPartitions <= 0 {
return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions)
}
if cfg.CompactTopicPartitions > math.MaxInt32 {
return fmt.Errorf("kafka.compact_topic_partitions must be <= %d, got %d", math.MaxInt32, cfg.CompactTopicPartitions)
}
if cfg.CompactTopicReplicationFactor <= 0 {
return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor)
}
if cfg.CompactTopicReplicationFactor > math.MaxInt16 {
return fmt.Errorf("kafka.compact_topic_replication_factor must be <= %d, got %d", math.MaxInt16, cfg.CompactTopicReplicationFactor)
}

if !cfg.TLS {
if cfg.TLSCAFile != "" || cfg.TLSCertFile != "" || cfg.TLSKeyFile != "" || cfg.TLSServerName != "" {
Expand Down Expand Up @@ -264,6 +297,29 @@ func boolOverride(value interface{}) (bool, bool, error) {
return v, true, nil
}

func intOverride(value interface{}) (int, bool, error) {
if value == nil {
return 0, false, nil
}
switch v := value.(type) {
case int:
return v, true, nil
case float64:
if math.IsNaN(v) || math.IsInf(v, 0) {
return 0, false, fmt.Errorf("expected integer Kafka config override, got non-finite float64 %v", v)
}
if v != math.Trunc(v) {
return 0, false, fmt.Errorf("expected integer Kafka config override, got non-integer float64 %v", v)
}
if v < math.MinInt32 || v > math.MaxInt32 {
return 0, false, fmt.Errorf("expected integer Kafka config override within [%d, %d], got float64 %v", math.MinInt32, math.MaxInt32, v)
}
return int(v), true, nil
default:
return 0, false, fmt.Errorf("expected int override, got %T", value)
}
}

func stringOverride(value interface{}) (string, bool, error) {
if value == nil {
return "", false, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package kafka

import (
"math"
"os"
"path/filepath"
"reflect"
"strings"
"testing"

runtimeconfig "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/config"
Expand All @@ -17,13 +19,15 @@ func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) {
}

global := runtimeconfig.KafkaConfig{
Brokers: []string{"broker-1:9092"},
TLS: true,
TLSCAFile: caPath,
TLSServerName: "global-kafka",
SASLMechanism: "plain",
SASLUsername: "global-user",
SASLPassword: "global-pass",
Brokers: []string{"broker-1:9092"},
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
TLS: true,
TLSCAFile: caPath,
TLSServerName: "global-kafka",
SASLMechanism: "plain",
SASLUsername: "global-user",
SASLPassword: "global-pass",
}

resolved, err := ResolveConnectionConfig(global, map[string]interface{}{
Expand Down Expand Up @@ -57,7 +61,10 @@ func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) {
}

func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) {
resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{
resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
}, map[string]interface{}{
"brokers": []interface{}{"broker:9092"},
"sasl_mechanism": "plain",
"sasl_username": " user-with-spaces ",
Expand All @@ -77,8 +84,10 @@ func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) {

func TestResolveConnectionConfig_RequiresTLSWhenTLSFilesAreConfigured(t *testing.T) {
_, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{
Brokers: []string{"broker:9092"},
TLSCAFile: "/tmp/ca.crt",
Brokers: []string{"broker:9092"},
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
TLSCAFile: "/tmp/ca.crt",
}, nil)
if err == nil {
t.Fatalf("expected error when TLS files are set with TLS disabled")
Expand All @@ -93,26 +102,33 @@ func TestResolveConnectionConfig_ValidatesReadableTLSFiles(t *testing.T) {
}

_, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{
Brokers: []string{"broker:9092"},
TLS: true,
TLSCAFile: caPath,
Brokers: []string{"broker:9092"},
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
TLS: true,
TLSCAFile: caPath,
}, nil)
if err != nil {
t.Fatalf("expected readable CA file to validate, got %v", err)
}

_, err = ResolveConnectionConfig(runtimeconfig.KafkaConfig{
Brokers: []string{"broker:9092"},
TLS: true,
TLSCAFile: filepath.Join(tempDir, "missing.crt"),
Brokers: []string{"broker:9092"},
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
TLS: true,
TLSCAFile: filepath.Join(tempDir, "missing.crt"),
}, nil)
if err == nil {
t.Fatalf("expected missing CA file to fail validation")
}
}

func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) {
_, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{
_, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
}, map[string]interface{}{
"brokers": []interface{}{"broker:9092"},
"sasl_mechanism": "scram-sha-512",
"sasl_username": "user",
Expand All @@ -121,3 +137,55 @@ func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) {
t.Fatalf("expected missing SASL password to fail validation")
}
}

func TestIntOverride_AcceptsIntegerFloat64(t *testing.T) {
got, ok, err := intOverride(float64(3))
if err != nil {
t.Fatalf("expected integer float64 override to succeed, got %v", err)
}
if !ok {
t.Fatalf("expected integer float64 override to be accepted")
}
if got != 3 {
t.Fatalf("expected integer float64 override to convert to 3, got %d", got)
}
}

func TestIntOverride_RejectsInvalidFloat64(t *testing.T) {
tests := []struct {
name string
value float64
wantErr string
}{
{
name: "non integer",
value: 3.5,
wantErr: "non-integer",
},
{
name: "out of bounds",
value: float64(math.MaxInt32) + 1,
wantErr: "within",
},
{
name: "non finite",
value: math.NaN(),
wantErr: "non-finite",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, ok, err := intOverride(tt.value)
if err == nil {
t.Fatalf("expected float64 override %v to fail", tt.value)
}
if ok {
t.Fatalf("expected invalid float64 override %v to be rejected", tt.value)
}
if !strings.Contains(err.Error(), tt.wantErr) {
t.Fatalf("expected error %q to contain %q", err.Error(), tt.wantErr)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,15 @@ func (e *KafkaBrokerDriver) EnsureTopics(ctx context.Context, topics []string) e

// EnsureCompactedTopic creates a compacted topic if it does not already exist.
func (e *KafkaBrokerDriver) EnsureCompactedTopic(ctx context.Context, topic string) error {
resp, err := e.admin.CreateTopics(ctx, 1, 1, map[string]*string{
"cleanup.policy": kadm.StringPtr("compact"),
}, topic)
resp, err := e.admin.CreateTopics(
ctx,
int32(e.cfg.CompactTopicPartitions),
int16(e.cfg.CompactTopicReplicationFactor),
map[string]*string{
"cleanup.policy": kadm.StringPtr("compact"),
},
topic,
)
if err != nil {
return fmt.Errorf("failed to create compacted topic %s: %w", topic, err)
}
Expand Down
Loading
Loading