Skip to content

Commit ca301d2

Browse files
authored
Merge pull request #2003 from senthuran16/websub-topic-hash
Hash WebSub API topics
2 parents 21e07f4 + 6a51848 commit ca301d2

7 files changed

Lines changed: 54 additions & 26 deletions

File tree

event-gateway/ARCHITECTURE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1220,7 +1220,7 @@ type BrokerDriver interface {
12201220
Publish(ctx context.Context, topic string, msg *Message) error
12211221
Subscribe(groupID string, topics []string, handler MessageHandler) (Receiver, error)
12221222
TopicExists(ctx context.Context, topic string) (bool, error)
1223-
EnsureTopics(ctx context.Context, topics []string) error
1223+
EnsureTopics(ctx context.Context, topics []string, metadata map[string]map[string]string) error
12241224
DeleteTopics(ctx context.Context, topics []string) error
12251225
Close() error
12261226
}

event-gateway/gateway-runtime/internal/binding/types.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package binding
2020

2121
import (
22+
"crypto/sha256"
2223
"fmt"
2324
"path"
2425
"strings"
@@ -140,18 +141,22 @@ type ChannelsConfig struct {
140141
Channels []Binding `yaml:"channels"`
141142
}
142143

143-
// JoinNormalizedTopic derives a Kafka topic name by normalizing each logical
144-
// segment and joining them with underscores.
144+
// JoinNormalizedTopic derives a Kafka topic name by hashing the parts joined with underscores.
145+
// Each part is written as `<length>:<value>|` to ensure uniqueness and prevent collisions.
146+
// eg: "my-api", "v1", "/orders" -> "6:my-api|2:v1|7:/orders|" -> SHA-256 hash of that string.
147+
// Returns the SHA-256 hash of the joined string.
145148
func JoinNormalizedTopic(parts ...string) string {
146149
if len(parts) == 0 {
147150
return ""
148151
}
149-
150-
normalizedParts := make([]string, 0, len(parts))
152+
var joined strings.Builder
151153
for _, part := range parts {
152-
normalizedParts = append(normalizedParts, NormalizeTopicSegment(part))
154+
fmt.Fprintf(&joined, "%d:%s|", len(part), part)
153155
}
154-
return strings.Join(normalizedParts, "_")
156+
// Calculate SHA-256 hash
157+
hash := sha256.Sum256([]byte(joined.String()))
158+
// Return hex-encoded hash
159+
return fmt.Sprintf("%x", hash)
155160
}
156161

157162
// WebSubApiTopicName derives a Kafka topic name for a WebSubApi channel.

event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -100,22 +100,36 @@ func (e *KafkaBrokerDriver) TopicExists(ctx context.Context, topic string) (bool
100100
}
101101

102102
// EnsureTopics creates topics if they don't already exist (idempotent).
103-
func (e *KafkaBrokerDriver) EnsureTopics(ctx context.Context, topics []string) error {
104-
resp, err := e.admin.CreateTopics(ctx, 1, 1, nil, topics...)
105-
if err != nil {
106-
return fmt.Errorf("failed to create topics: %w", err)
107-
}
103+
// metadata is an optional map of topic name -> key-value pairs for logging/tracking purposes.
104+
// For WebSubAPI topics, metadata typically includes: apiName, apiVersion, channelName.
105+
// Note: metadata is logged but not stored in Kafka (Kafka only accepts specific configuration keys).
106+
func (e *KafkaBrokerDriver) EnsureTopics(ctx context.Context, topics []string, metadata map[string]map[string]string) error {
107+
// Create topics one by one to allow logging metadata per topic
108+
for _, topic := range topics {
109+
resp, err := e.admin.CreateTopics(ctx, 1, 1, nil, topic)
110+
if err != nil {
111+
return fmt.Errorf("failed to create topic %s: %w", topic, err)
112+
}
108113

109-
for _, t := range resp.Sorted() {
110-
if t.Err != nil {
111-
// "TOPIC_ALREADY_EXISTS" is not a real failure for idempotent creates.
112-
if isTopicAlreadyExistsErr(t.Err) {
113-
slog.Debug("Topic already exists", "topic", t.Topic)
114-
continue
114+
for _, t := range resp.Sorted() {
115+
if t.Err != nil {
116+
// "TOPIC_ALREADY_EXISTS" is not a real failure for idempotent creates.
117+
if isTopicAlreadyExistsErr(t.Err) {
118+
slog.Debug("Topic already exists", "topic", t.Topic)
119+
continue
120+
}
121+
return fmt.Errorf("failed to create topic %s: %w", t.Topic, t.Err)
122+
}
123+
slog.Info("Created topic", "topic", t.Topic)
124+
// Log metadata for debugging/tracking purposes
125+
if metadata != nil && metadata[t.Topic] != nil {
126+
slog.Info("Topic metadata",
127+
"topic_hash", t.Topic,
128+
"apiName", metadata[t.Topic]["apiName"],
129+
"apiVersion", metadata[t.Topic]["apiVersion"],
130+
"channelName", metadata[t.Topic]["channelName"])
115131
}
116-
return fmt.Errorf("failed to create topic %s: %w", t.Topic, t.Err)
117132
}
118-
slog.Info("Created topic", "topic", t.Topic)
119133
}
120134

121135
return nil

event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (e *WebBrokerApiReceiver) Start(ctx context.Context) error {
102102
slog.Info("Ensuring Kafka topics exist",
103103
"channel", e.channel.Name,
104104
"topics", e.opts.Topics)
105-
if err := e.brokerDriver.EnsureTopics(ctx, e.opts.Topics); err != nil {
105+
if err := e.brokerDriver.EnsureTopics(ctx, e.opts.Topics, nil); err != nil {
106106
return fmt.Errorf("failed to ensure kafka topics: %w", err)
107107
}
108108
slog.Info("Kafka topics verified",

event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,22 @@ func NewReceiver(cfg connectors.ReceiverConfig, opts Options) (connectors.Receiv
133133
// Start ensures Kafka topics exist and sets up the consumer manager context.
134134
// The HTTP server is managed by the runtime.
135135
func (e *WebSubReceiver) Start(ctx context.Context) error {
136-
// Collect all Kafka topics to ensure.
136+
// Collect all Kafka topics to ensure and build metadata for WebSubAPI topics.
137137
var topicsToEnsure []string
138-
for _, kafkaTopic := range e.channel.Channels {
138+
topicMetadata := make(map[string]map[string]string)
139+
140+
for channelName, kafkaTopic := range e.channel.Channels {
139141
topicsToEnsure = append(topicsToEnsure, kafkaTopic)
142+
// Add metadata for WebSubAPI topics
143+
topicMetadata[kafkaTopic] = map[string]string{
144+
"apiName": e.channel.Name,
145+
"apiVersion": e.channel.Version,
146+
"channelName": channelName,
147+
}
140148
}
141149

142150
if len(topicsToEnsure) > 0 {
143-
if err := e.brokerDriver.EnsureTopics(ctx, topicsToEnsure); err != nil {
151+
if err := e.brokerDriver.EnsureTopics(ctx, topicsToEnsure, topicMetadata); err != nil {
144152
return fmt.Errorf("failed to ensure kafka topics: %w", err)
145153
}
146154
}

event-gateway/gateway-runtime/internal/connectors/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type BrokerDriver interface {
6565
SubscribeManual(groupID string, topics []string, handler MessageHandler) (Receiver, error)
6666
Replay(ctx context.Context, topic string, handler MessageHandler) error
6767
TopicExists(ctx context.Context, topic string) (bool, error)
68-
EnsureTopics(ctx context.Context, topics []string) error
68+
EnsureTopics(ctx context.Context, topics []string, metadata map[string]map[string]string) error
6969
EnsureCompactedTopic(ctx context.Context, topic string) error
7070
DeleteTopics(ctx context.Context, topics []string) error
7171
Close() error

event-gateway/gateway-runtime/internal/runtime/runtime_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ func TestRemoveWebSubApiBinding_ClearsStalePolicyChains(t *testing.T) {
132132

133133
func TestJoinNormalizedTopic_NormalizesUnsupportedCharacters(t *testing.T) {
134134
got := binding.JoinNormalizedTopic("/orders/eu", "v1/test", "order_events")
135-
want := "_2f_orders_2f_eu_v1_2f_test_order__events"
135+
// SHA-256 hash of "10:/orders/eu|7:v1/test|12:order_events|"
136+
want := "2836e285c333e251f0929fbbbe31dec3bb7d61423761103d790bcef2429b194b"
136137
if got != want {
137138
t.Fatalf("JoinNormalizedTopic() = %q, want %q", got, want)
138139
}

0 commit comments

Comments
 (0)